From 78518514b8a0fc15925546abb3792ba82a21f490 Mon Sep 17 00:00:00 2001 From: bskjon Date: Tue, 2 Jul 2024 23:11:54 +0200 Subject: [PATCH] Updated ffmpeg stuff --- .../coordinator/tasks/event/CreateEncodeWorkTask.kt | 6 +++--- .../coordinator/tasks/event/CreateExtractWorkTask.kt | 8 +++++--- .../mediaprocessing/shared/common/persistance/tasks.kt | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index 2e7ad728..748a94be 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -42,9 +42,6 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator log.warn { "Cannot continue until permitted event is present" } } - val batchEvents = createMessagesByArgs(event) - - val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterEncodeCreated) { val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterEncodeCreated } @@ -56,6 +53,9 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator sevent ?: event } else event + val batchEvents = createMessagesByArgs(forwardEvent) + + batchEvents.forEach { e -> val createdTask = if (e is FfmpegWorkRequestCreated) { FfmpegTaskData( diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 35f875c1..5c9d63e9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -18,6 +18,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgu import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service +import java.util.* @Service class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinator) : CreateProcesserWorkTask(coordinator) { @@ -34,6 +35,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato if (events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }?.isSuccess() != true) { + log.warn { "Last instance of ${KafkaEvents.EventMediaParameterExtractCreated} was unsuccessful or null. Skipping.." } return null } @@ -41,8 +43,6 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato log.warn { "Cannot continue until permitted event is present" } } - val batchEvents = createMessagesByArgs(event) - val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterExtractCreated) { val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterExtractCreated } if (sevent != null) { @@ -53,6 +53,8 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato sevent ?: event } else event + val batchEvents = createMessagesByArgs(forwardEvent) + batchEvents.forEach { e -> val createdTask = if (e is FfmpegWorkRequestCreated) { FfmpegTaskData( @@ -60,7 +62,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato outFile = e.outFile, arguments = e.arguments ).let { task -> - val status = taskManager.createTask(referenceId = event.referenceId, derivedFromEventId = event.eventId, task= TaskType.Encode, data = Gson().toJson(task)) + val status = taskManager.createTask(referenceId = event.referenceId, eventId = UUID.randomUUID().toString(), derivedFromEventId = event.eventId, task= TaskType.Encode, data = Gson().toJson(task)) if (!status) { log.error { "Failed to create Extract task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" } } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/tasks.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/tasks.kt index 147c859b..1b5752c3 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/tasks.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/tasks.kt @@ -21,6 +21,6 @@ object tasks: IntIdTable() { val integrity: Column = varchar("integrity", 100) init { - uniqueIndex(referenceId, task, integrity) + uniqueIndex(referenceId, task, eventId) } } \ No newline at end of file