diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt index 80d7c591..973585d8 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt @@ -1,16 +1,13 @@ package no.iktdev.mediaprocessing.coordinator.utils -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessageHelper -import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent -import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess +import no.iktdev.mediaprocessing.shared.common.persistance.* import no.iktdev.mediaprocessing.shared.common.task.Task import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents fun isAwaitingPrecondition(tasks: List, events: List): Boolean { - if (tasks.contains(TaskType.Encode)) { + if (tasks.any { it == TaskType.Encode }) { if (events.lastOrNull { it.isOfEvent( KafkaEvents.EventMediaParameterEncodeCreated ) } == null) { @@ -18,23 +15,21 @@ fun isAwaitingPrecondition(tasks: List, events: List): Boolean { trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size } TaskType.Convert -> { - val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated - val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed - val argument = events.findLast { it.event == taskCreatedEvent } ?: return true - if (!argument.isSuccess()) return false + val extractEvents = events.findLast { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) } + if (extractEvents == null || extractEvents.isSkipped()) { + false + } else { + val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated + val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed - val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { - it.event in listOf( - taskCreatedEvent, - taskCompletedEvent - ) + val argument = events.findLast { it.event == taskCreatedEvent } ?: return true + if (!argument.isSuccess()) return false + + val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { + it.event in listOf( + taskCreatedEvent, + taskCompletedEvent + ) + } + trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size } - trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size - } } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/FileWatcherQueue.kt similarity index 97% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/FileWatcherQueue.kt index 28e1544d..3384d6b7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/FileWatcherQueue.kt @@ -1,4 +1,4 @@ -package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher +package no.iktdev.mediaprocessing.coordinator.watcher import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt similarity index 98% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt index 77a043c8..a25e31be 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt @@ -1,4 +1,4 @@ -package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher +package no.iktdev.mediaprocessing.coordinator.watcher import dev.vishna.watchservice.KWatchEvent.Kind.Deleted import dev.vishna.watchservice.KWatchEvent.Kind.Initialized diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index a1704e0f..46372605 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -28,7 +28,6 @@ enum class KafkaEvents(val event: String) { EventWorkConvertPerformed ("event:work-convert:performed"), EventWorkDownloadCoverPerformed ("event:work-download-cover:performed"), - EVENT_STORE_VIDEO_PERFORMED ("event:store-video:performed"), EVENT_STORE_SUBTITLE_PERFORMED ("event:store-subtitle:performed"), EVENT_STORE_COVER_PERFORMED ("event:store-cover:performed"),