diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 969b6321..5638ff2f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -5,6 +5,7 @@ import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.EventCoordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping +import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingPrecondition import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -80,12 +81,20 @@ class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) : } } + val isWaitingForPrecondition = isAwaitingPrecondition(taskEvents, events) + if (isWaitingForPrecondition) { + log.info { "Waiting for preconditions" } + return null + } + val isWaiting = taskEvents.map { isAwaitingTask(it, events) }.any { it } + + //val mapper = ProcessMapping(events) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 9d1a6d42..53da4adc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -15,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.contract.dto.isOnly import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.az import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest @@ -38,6 +39,10 @@ class CreateConvertWorkTask(@Autowired override var coordinator: EventCoordinato super.onProcessEventsAccepted(event, events) val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az() + if (event.event == KafkaEvents.EventWorkExtractPerformed && !event.isSuccess()) { + return SimpleMessageData(status = Status.SKIPPED, "Extract failed, skipping..", derivedFromEventId = event.eventId) + } + val result = if (event.isOfEvent(KafkaEvents.EventMediaProcessStarted) && event.data.az()?.operations?.isOnly(StartOperationEvents.CONVERT) == true ) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt index 4805c9d1..80d7c591 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt @@ -4,9 +4,49 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess +import no.iktdev.mediaprocessing.shared.common.task.Task import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents + +fun isAwaitingPrecondition(tasks: List, events: List): Boolean { + if (tasks.contains(TaskType.Encode)) { + if (events.lastOrNull { it.isOfEvent( + KafkaEvents.EventMediaParameterEncodeCreated + ) } == null) { + return true + } + } + + if (tasks.contains(TaskType.Convert) && !tasks.contains(TaskType.Extract)) { + if (events.lastOrNull { it.isOfEvent( + KafkaEvents.EventWorkConvertCreated + ) } == null) { + return true + } + } + + if (tasks.contains(TaskType.Extract)) { + if (events.lastOrNull { it.isOfEvent( + KafkaEvents.EventMediaParameterExtractCreated + ) } == null) { + return true + } + } + + if (tasks.contains(TaskType.Convert)) { + if (events.lastOrNull { it.isOfEvent( + KafkaEvents.EventMediaParameterExtractCreated + ) } == null) { + return true + } + } + + + return false +} + + fun isAwaitingTask(task: TaskType, events: List): Boolean { return when (task) { TaskType.Encode -> { @@ -46,16 +86,14 @@ fun isAwaitingTask(task: TaskType, events: List): Boolean { trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size } TaskType.Convert -> { - val argumentEvent = KafkaEvents.EventMediaParameterConvertCreated val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed - val argument = events.findLast { it.event == argumentEvent } ?: return true + val argument = events.findLast { it.event == taskCreatedEvent } ?: return true if (!argument.isSuccess()) return false val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { it.event in listOf( - argumentEvent, taskCreatedEvent, taskCompletedEvent )