From 07111f7b98a3cf4835289ff1c7df51803f7ff61c Mon Sep 17 00:00:00 2001 From: Brage Date: Mon, 25 Mar 2024 23:25:23 +0100 Subject: [PATCH] Changed work creation --- .../coordinator/Coordinator.kt | 14 ++--- .../tasks/event/CreateConvertWorkTask.kt | 48 ++++++++++++++++ .../tasks/event/CreateEncodeWorkTask.kt | 17 ++++++ .../tasks/event/CreateExtractWorkTask.kt | 16 ++++++ .../event/ffmpeg/CreateProcesserWorkTask.kt | 55 +++++++++++++++++++ .../shared/common/tasks/TaskCreatorImpl.kt | 2 +- .../shared/kafka/core/KafkaEvents.kt | 2 + .../events_result/FfmpegWorkRequestCreated.kt | 1 + 8 files changed, 147 insertions(+), 8 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 7c6d4be3..97591599 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -53,7 +53,7 @@ class Coordinator() : CoordinatorBase + get() = listOf( + KafkaEvents.EVENT_WORK_EXTRACT_CREATED + // TODO: Add event for request as well + ) + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + if (!event.data.isSuccess()) { + return null + } + val eventData = event.data as FfmpegWorkRequestCreated? ?: return null + + val requiredEventId = if (event.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED) { + event.eventId + } else null; + + val outFile = File(eventData.outFile) + return ConvertWorkerRequest( + status = Status.COMPLETED, + requiresEventId = requiredEventId, + inputFile = eventData.outFile, + allowOverwrite = true, + outFileBaseName = outFile.nameWithoutExtension, + outDirectory = outFile.parentFile.absolutePath + ) + + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt new file mode 100644 index 00000000..83f4d28a --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -0,0 +1,17 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event + +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service + +@Service +class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_WORK_ENCODE_CREATED + + override val requiredEvents: List + get() = listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt new file mode 100644 index 00000000..551ffd72 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -0,0 +1,16 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event + +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service + +@Service +class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_WORK_EXTRACT_CREATED + + override val requiredEvents: List + get() = listOf(KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt new file mode 100644 index 00000000..9f016e47 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/CreateProcesserWorkTask.kt @@ -0,0 +1,55 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg + +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.coordinator.log +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import org.springframework.beans.factory.annotation.Autowired + +abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) { + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted? + if (started == null) { + return null + } + + if (!event.data.isSuccess()) { + return null + } + + val proceed = events.find { it.event == KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED } + if (proceed == null && started.type == ProcessType.MANUAL) { + log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" } + return null + } + + + val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return null + if (earg == null || earg.entries.isEmpty()) { + return null + } + + val requestEvents = earg.entries.map { + FfmpegWorkRequestCreated( + status = Status.COMPLETED, + derivedFromEventId = event.eventId, + inputFile = earg.inputFile, + arguments = it.arguments, + outFile = it.outputFile + ) + } + requestEvents.forEach { + super.onResult(it) + } + return null + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt index 29550cf0..635e9a62 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -92,7 +92,7 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa } } - private fun onResult(data: MessageDataWrapper) { + protected fun onResult(data: MessageDataWrapper) { producer.sendMessage( referenceId = context[context_key_reference] as String, event = context[context_key_producesEvent] as KafkaEvents, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 09cc5fd2..9987f11b 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -16,6 +16,8 @@ enum class KafkaEvents(val event: String) { EVENT_MEDIA_CONVERT_PARAMETER_CREATED("event:media-convert-parameter:created"), EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED("event:media-download-cover-parameter:created"), + EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted"), + EVENT_WORK_ENCODE_CREATED("event:work-encode:created"), EVENT_WORK_EXTRACT_CREATED("event:work-extract:created"), EVENT_WORK_CONVERT_CREATED("event:work-convert:created"), diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt index 7778d425..dfe44cf7 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt @@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status ) data class FfmpegWorkRequestCreated( override val status: Status, + val derivedFromEventId: String, val inputFile: String, val arguments: List, val outFile: String