diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index d48155fd..02bbb7ce 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -60,8 +60,6 @@ class Coordinator() : CoordinatorBase): Boolean { + fun isMissingEncodeWorkCreated(messages: List): PersistentMessage? { val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED } - return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() } + return if (existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }) { + messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } + } else null } - fun isMissingExtractWorkCreated(messages: List): Boolean { + fun isMissingExtractWorkCreated(messages: List): PersistentMessage? { val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } - return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() } + return if (existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }) { + messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED } + } else null } + fun produceAllMissingProcesserEvents( producer: CoordinatorProducer, - referenceId: String, - eventId: String, messages: List ) { - val currentMessage = messages.find { it.eventId == eventId } - if (!currentMessage?.data.isSuccess()) { - return + val missingEncode = isMissingEncodeWorkCreated(messages) + val missingExtract = isMissingExtractWorkCreated(messages) + + if (missingEncode != null && missingEncode.data.isSuccess()) { + produceEncodeWork(producer, missingEncode) } - when (currentMessage?.event) { - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED -> { - if (isMissingEncodeWorkCreated(messages)) { - produceEncodeWork(producer, currentMessage) - } - } + if (missingExtract != null && missingExtract.data.isSuccess()) { + produceExtractWork(producer, missingExtract) - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> { - if (isMissingExtractWorkCreated(messages)) { - produceExtractWork(producer, currentMessage) - } - } - - else -> {} } }