From 13681fb696418627da2b863129fdedd3b357a609 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sun, 30 Jun 2024 23:55:19 +0200 Subject: [PATCH] Fix --- .../tasks/event/CompleteMediaTask.kt | 23 ++++++- .../tasks/event/CreateExtractWorkTask.kt | 3 +- .../coordinator/utils/TasksUtil.kt | 67 +++++++++++++++++++ .../common/persistance/PersistentMessage.kt | 12 ++-- .../shared/kafka/dto/Status.kt | 4 ++ 5 files changed, 100 insertions(+), 9 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index d5f21c36..68715a51 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -5,8 +5,10 @@ import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.EventCoordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping +import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.* @@ -16,6 +18,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired +import org.springframework.scheduling.support.TaskUtils import org.springframework.stereotype.Service @Service @@ -92,10 +95,26 @@ class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) : return null } + val taskEvents = startedData.operations.map { + when(it) { + StartOperationEvents.ENCODE -> TaskType.Encode + StartOperationEvents.EXTRACT -> TaskType.Extract + StartOperationEvents.CONVERT -> TaskType.Convert + } + } - val mapper = ProcessMapping(events) - if (mapper.canCollect()) { + val isWaiting = taskEvents.map { + isAwaitingTask(it, events) + }.any { it } + + + //val mapper = ProcessMapping(events) + + + + //if (mapper.canCollect()) { + if (isWaiting) { return ProcessCompleted(Status.COMPLETED, event.eventId) } return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 8778c7e0..adb9f755 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.az import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @@ -39,7 +40,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato sevent ?: event } else event - forwardEvent.data.az()?.let { + forwardEvent.data.az()?.takeIf { it.isSuccess() }?.let { it.entries.forEach { argsGroup -> val ffmpegTask = FfmpegTaskData( inputFile = it.inputFile, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt new file mode 100644 index 00000000..4805c9d1 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt @@ -0,0 +1,67 @@ +package no.iktdev.mediaprocessing.coordinator.utils + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper +import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent +import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess +import no.iktdev.mediaprocessing.shared.common.task.TaskType +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents + +fun isAwaitingTask(task: TaskType, events: List): Boolean { + return when (task) { + TaskType.Encode -> { + val argumentEvent = KafkaEvents.EventMediaParameterEncodeCreated + val taskCreatedEvent = KafkaEvents.EventWorkEncodeCreated + val taskCompletedEvent = KafkaEvents.EventWorkEncodePerformed + + val argument = events.findLast { it.event == argumentEvent } ?: return true + if (!argument.isSuccess()) return false + + val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { + it.event in listOf( + argumentEvent, + taskCreatedEvent, + taskCompletedEvent + ) + } + + trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size + + + } + TaskType.Extract -> { + val argumentEvent = KafkaEvents.EventMediaParameterExtractCreated + val taskCreatedEvent = KafkaEvents.EventWorkExtractCreated + val taskCompletedEvent = KafkaEvents.EventWorkExtractPerformed + + val argument = events.findLast { it.event == argumentEvent } ?: return true + if (!argument.isSuccess()) return false + val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { + it.event in listOf( + argumentEvent, + taskCreatedEvent, + taskCompletedEvent + ) + } + trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size + } + TaskType.Convert -> { + val argumentEvent = KafkaEvents.EventMediaParameterConvertCreated + val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated + val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed + + val argument = events.findLast { it.event == argumentEvent } ?: return true + if (!argument.isSuccess()) return false + + val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { + it.event in listOf( + argumentEvent, + taskCreatedEvent, + taskCompletedEvent + ) + } + trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size + + } + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 12e51f3b..e1547bd8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -58,21 +58,21 @@ class PersistentMessageHelper(val messages: List) { for (event in usableEvents) { derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId) } - val eventsToDelete = mutableSetOf() + val eventsToFind = mutableSetOf() // Utfør DFS for å finne alle avledede hendelser som skal slettes - dfs(triggered.eventId, derivedEventsMap, eventsToDelete) + dfs(triggered.eventId, derivedEventsMap, eventsToFind) - return messages.filter { it.eventId in eventsToDelete } + return messages.filter { it.eventId in eventsToFind } } /** * @param eventId Initial eventId */ - private fun dfs(eventId: String, derivedEventsMap: Map>, eventsToDelete: MutableSet) { - eventsToDelete.add(eventId) + private fun dfs(eventId: String, derivedEventsMap: Map>, eventsToFind: MutableSet) { + eventsToFind.add(eventId) derivedEventsMap[eventId]?.forEach { derivedEventId -> - dfs(derivedEventId, derivedEventsMap, eventsToDelete) + dfs(derivedEventId, derivedEventsMap, eventsToFind) } } } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt index 0e95023c..370125fb 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt @@ -4,4 +4,8 @@ enum class Status { SKIPPED, COMPLETED, ERROR +} + +fun Status.isCompleted(): Boolean { + return this == Status.COMPLETED } \ No newline at end of file