diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index 17f34a9c..0e72a313 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -14,6 +14,7 @@ import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.az +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @@ -29,13 +30,22 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { super.onProcessEventsAccepted(event, events) - log.info { "${event.referenceId} triggered by ${event.event}" } + + if (events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterEncodeCreated) }?.isSuccess() != true) { return null } + if (!isPermittedToCreateTasks(events)) { + log.warn { "Cannot continue until permitted event is present" } + } + + val batchEvents = createMessagesByArgs(event) + + + val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterEncodeCreated) { val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterEncodeCreated } if (sevent != null) { @@ -46,22 +56,24 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator sevent ?: event } else event - - forwardEvent.data.az()?.let { - val entries = it.entries.firstOrNull() ?: return@let - val ffmpegTask = FfmpegTaskData( - inputFile = it.inputFile, - outFile = entries.outputFile, - arguments = entries.arguments - ) - val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Encode, Gson().toJson(ffmpegTask)) - if (!status) { - log.error { "Failed to create Encode task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" } - } + batchEvents.forEach { e -> + val createdTask = if (e is FfmpegWorkRequestCreated) { + FfmpegTaskData( + inputFile = e.inputFile, + outFile = e.outFile, + arguments = e.arguments + ).let { task -> + val status = taskManager.createTask(referenceId = event.referenceId, task= TaskType.Encode, data = Gson().toJson(task)) + if (!status) { + log.error { "Failed to create Encode task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" } + } + status + } + } else false + if (createdTask) + onResult(e) } - - - return super.onProcessEvents(forwardEvent, events) + return null } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 955b94cf..f8e7ddec 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -13,6 +13,7 @@ import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.az +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired @@ -31,10 +32,17 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } + if (events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }?.isSuccess() != true) { return null } + if (!isPermittedToCreateTasks(events)) { + log.warn { "Cannot continue until permitted event is present" } + } + + val batchEvents = createMessagesByArgs(event) + val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterExtractCreated) { val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterExtractCreated } if (sevent != null) { @@ -45,20 +53,23 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato sevent ?: event } else event - forwardEvent.data.az()?.takeIf { it.isSuccess() }?.let { - it.entries.forEach { argsGroup -> - val ffmpegTask = FfmpegTaskData( - inputFile = it.inputFile, - outFile = argsGroup.outputFile, - arguments = argsGroup.arguments - ) - val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Extract, Gson().toJson(ffmpegTask)) - if (!status) { - log.error { "Failed to create Extract task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" } + batchEvents.forEach { e -> + val createdTask = if (e is FfmpegWorkRequestCreated) { + FfmpegTaskData( + inputFile = e.inputFile, + outFile = e.outFile, + arguments = e.arguments + ).let { task -> + val status = taskManager.createTask(referenceId = event.referenceId, task= TaskType.Encode, data = Gson().toJson(task)) + if (!status) { + log.error { "Failed to create Extract task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" } + } + status } - } + } else false + if (createdTask) + onResult(e) } - - return super.onProcessEvents(forwardEvent, events) + return null } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt index 7e7c2b0c..580c96b5 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt @@ -14,26 +14,34 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar abstract class CreateProcesserWorkTask(override var coordinator: EventCoordinator) : TaskCreator(coordinator) { private val log = KotlinLogging.logger {} - override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + + open fun isPermittedToCreateTasks(events: List): Boolean { + val event = events.firstOrNull() ?: return false val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted? if (started == null) { log.info { "${event.referenceId} couldn't find start event" } - return null + return false } else if (started.type == ProcessType.MANUAL) { val proceed = events.find { it.event == KafkaEvents.EventMediaWorkProceedPermitted } if (proceed == null) { log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" } - return null + return false } else { log.warn { "${event.referenceId} registered proceed permitted" } } } + return true + } - val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return null + + + fun createMessagesByArgs(event: PersistentMessage): List { + val events: MutableList = mutableListOf() + val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return events if (earg == null || earg.entries.isEmpty()) { log.info { "${event.referenceId} ffargument is empty" } - return null + return events } val requestEvents = earg.entries.map { @@ -47,8 +55,9 @@ abstract class CreateProcesserWorkTask(override var coordinator: EventCoordinato } requestEvents.forEach { log.info { "${event.referenceId} creating work request based on ${it.derivedFromEventId}" } - super.onResult(it) + events.add(it) } - return null + return events } + } \ No newline at end of file