From 3211cb2608eaf3c24b29456a1cb108b5ec7785fb Mon Sep 17 00:00:00 2001 From: bskjon Date: Thu, 18 Apr 2024 02:47:57 +0200 Subject: [PATCH] Minor update --- .../tasks/event/CompleteMediaTask.kt | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 8697fe47..64435022 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -6,10 +6,12 @@ import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.* import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired @@ -40,11 +42,48 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task val receivedEvents = events.map { it.event } // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. + + + + val ffmpegEvents = listOf(KafkaEvents.EventMediaParameterEncodeCreated, EventMediaParameterExtractCreated) if (ffmpegEvents.any { receivedEvents.contains(it) } && events.none { e -> KafkaEvents.isOfWork(e.event) }) { return null } + val startedData: MediaProcessStarted? = started.data as MediaProcessStarted? + if (startedData == null) { + log.error { "${event.referenceId} contains a started event without proper data object" } + return null + } + + + val hasEncodeAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.ENCODE)) { + events.any { it.event == EventWorkEncodePerformed } + } else true + + val hasExtractAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.EXTRACT)) { + events.any { it.event == EventWorkExtractPerformed} + } else true + + val hasConvertAndIsRequired = if (startedData.operations.contains(ProcessStartOperationEvents.CONVERT)) { + events.any { it.event == EventWorkConvertPerformed } + } else true + + val missingRequired: MutableMap = mutableMapOf( + ProcessStartOperationEvents.ENCODE to hasEncodeAndIsRequired, + ProcessStartOperationEvents.EXTRACT to hasExtractAndIsRequired, + ProcessStartOperationEvents.CONVERT to hasConvertAndIsRequired + ) + + + + if (missingRequired.values.any { !it }) { + log.info { "Waiting for ${missingRequired.entries.filter { !it.value }.map { it.key.name }}" } + return null + } + +