diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index cbca7155..fbe995ce 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator @@ -84,7 +85,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task val ch = CompleteHandler(events) val chEvents = ch.getMissingCompletions() if (chEvents.isNotEmpty()) { - log.info { "Waiting for ${chEvents.joinToString { "," }}" } + log.info { "Waiting for ${chEvents.joinToString(",")}" } + log.warn { "Waiting report: ${Gson().toJson(chEvents)}" } return null } @@ -110,11 +112,11 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task fun getMissingCompletions(): List { val missings = mutableListOf() - if (report[EventWorkEncodeCreated] != report[EventWorkEncodePerformed]) + if ((report[EventWorkEncodeCreated]?: 0) > (report[EventWorkEncodePerformed] ?: 0)) missings.add(StartOperationEvents.ENCODE) - if (report[EventWorkExtractCreated] != report[EventWorkExtractPerformed]) + if ((report[EventWorkExtractCreated] ?: 0) > (report[EventWorkExtractPerformed] ?: 0)) missings.add(StartOperationEvents.EXTRACT) - if (report[EventWorkConvertCreated] == report[EventWorkConvertPerformed]) + if ((report[EventWorkConvertCreated] ?: 0) > (report[EventWorkConvertPerformed] ?: 0)) missings.add(StartOperationEvents.CONVERT) return missings } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 4fa8f556..8a0805b9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -29,20 +29,25 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } - if (!event.data.isSuccess()) { - return null - } - val eventData = event.data as FfmpegWorkRequestCreated? ?: return null + // Check what it is and create based on it + + val derivedInfoObject = if (event.event in requiredEvents) { + DerivedInfoObject.fromExtractWorkCreated(event) + } else { + val extractEvent = events.findLast { it.event == KafkaEvents.EventWorkExtractCreated } + extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) } + } ?: return null + val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { event.eventId } else null; - val outFile = File(eventData.outFile) + val outFile = File(derivedInfoObject.outputFile) return ConvertWorkerRequest( status = Status.COMPLETED, requiresEventId = requiredEventId, - inputFile = eventData.outFile, + inputFile = derivedInfoObject.outputFile, allowOverwrite = true, outFileBaseName = outFile.nameWithoutExtension, outDirectory = outFile.parentFile.absolutePath, @@ -50,4 +55,23 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : ) } + + private data class DerivedInfoObject( + val outputFile: String, + val derivedFromEventId: String, + val requiresEventId: String + ) { + companion object { + fun fromExtractWorkCreated(event: PersistentMessage): DerivedInfoObject? { + return if (event.event != KafkaEvents.EventWorkExtractCreated) null else { + val data: FfmpegWorkRequestCreated = event.data as FfmpegWorkRequestCreated + DerivedInfoObject( + outputFile = data.outFile, + derivedFromEventId = event.eventId, + requiresEventId = event.eventId + ) + } + } + } + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt index 239bfcca..7900d2d7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.log @@ -14,7 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) { - + private val log = KotlinLogging.logger {} override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted? if (started == null) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index 0524721b..1c3b0dc3 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -215,6 +215,8 @@ class PersistentEventManager(private val dataSource: DataSource) { log.info { "Error code is: ${exception.errorCode}" } exception.printStackTrace() } + } else { + exception.printStackTrace() } false } else {