This commit is contained in:
bskjon 2024-04-20 14:39:51 +02:00
parent 6b5c115a2c
commit 174e20e11f
4 changed files with 40 additions and 11 deletions

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
@ -84,7 +85,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
val ch = CompleteHandler(events) val ch = CompleteHandler(events)
val chEvents = ch.getMissingCompletions() val chEvents = ch.getMissingCompletions()
if (chEvents.isNotEmpty()) { if (chEvents.isNotEmpty()) {
log.info { "Waiting for ${chEvents.joinToString { "," }}" } log.info { "Waiting for ${chEvents.joinToString(",")}" }
log.warn { "Waiting report: ${Gson().toJson(chEvents)}" }
return null return null
} }
@ -110,11 +112,11 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
fun getMissingCompletions(): List<StartOperationEvents> { fun getMissingCompletions(): List<StartOperationEvents> {
val missings = mutableListOf<StartOperationEvents>() val missings = mutableListOf<StartOperationEvents>()
if (report[EventWorkEncodeCreated] != report[EventWorkEncodePerformed]) if ((report[EventWorkEncodeCreated]?: 0) > (report[EventWorkEncodePerformed] ?: 0))
missings.add(StartOperationEvents.ENCODE) missings.add(StartOperationEvents.ENCODE)
if (report[EventWorkExtractCreated] != report[EventWorkExtractPerformed]) if ((report[EventWorkExtractCreated] ?: 0) > (report[EventWorkExtractPerformed] ?: 0))
missings.add(StartOperationEvents.EXTRACT) missings.add(StartOperationEvents.EXTRACT)
if (report[EventWorkConvertCreated] == report[EventWorkConvertPerformed]) if ((report[EventWorkConvertCreated] ?: 0) > (report[EventWorkConvertPerformed] ?: 0))
missings.add(StartOperationEvents.CONVERT) missings.add(StartOperationEvents.CONVERT)
return missings return missings
} }

View File

@ -29,20 +29,25 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }
if (!event.data.isSuccess()) { // Check what it is and create based on it
return null
} val derivedInfoObject = if (event.event in requiredEvents) {
val eventData = event.data as FfmpegWorkRequestCreated? ?: return null DerivedInfoObject.fromExtractWorkCreated(event)
} else {
val extractEvent = events.findLast { it.event == KafkaEvents.EventWorkExtractCreated }
extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) }
} ?: return null
val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) {
event.eventId event.eventId
} else null; } else null;
val outFile = File(eventData.outFile) val outFile = File(derivedInfoObject.outputFile)
return ConvertWorkerRequest( return ConvertWorkerRequest(
status = Status.COMPLETED, status = Status.COMPLETED,
requiresEventId = requiredEventId, requiresEventId = requiredEventId,
inputFile = eventData.outFile, inputFile = derivedInfoObject.outputFile,
allowOverwrite = true, allowOverwrite = true,
outFileBaseName = outFile.nameWithoutExtension, outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath, outDirectory = outFile.parentFile.absolutePath,
@ -50,4 +55,23 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
) )
} }
private data class DerivedInfoObject(
val outputFile: String,
val derivedFromEventId: String,
val requiresEventId: String
) {
companion object {
fun fromExtractWorkCreated(event: PersistentMessage): DerivedInfoObject? {
return if (event.event != KafkaEvents.EventWorkExtractCreated) null else {
val data: FfmpegWorkRequestCreated = event.data as FfmpegWorkRequestCreated
DerivedInfoObject(
outputFile = data.outFile,
derivedFromEventId = event.eventId,
requiresEventId = event.eventId
)
}
}
}
}
} }

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.log import no.iktdev.mediaprocessing.coordinator.log
@ -14,7 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) { abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted? val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted?
if (started == null) { if (started == null) {

View File

@ -215,6 +215,8 @@ class PersistentEventManager(private val dataSource: DataSource) {
log.info { "Error code is: ${exception.errorCode}" } log.info { "Error code is: ${exception.errorCode}" }
exception.printStackTrace() exception.printStackTrace()
} }
} else {
exception.printStackTrace()
} }
false false
} else { } else {