diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index afa812a8..a5e17ecb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -98,6 +98,14 @@ class Coordinator() : CoordinatorBase): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null val completed = events.lastOrSuccessOf(EventMediaProcessCompleted) ?: return null if (!started.data.isSuccess() || !completed.data.isSuccess()) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index d5a7313c..e11a3166 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -31,6 +31,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null if (!started.data.isSuccess()) { return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt index ce871a79..b09bdb33 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt @@ -29,6 +29,8 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + val started = events.lastOrSuccessOf(EVENT_REQUEST_PROCESS_STARTED) ?: return null if (!started.data.isSuccess()) { return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 9b009f32..17f0b5fb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -15,6 +16,7 @@ import java.io.File @Service class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents get() = KafkaEvents.EventWorkConvertCreated @@ -25,6 +27,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + if (!event.data.isSuccess()) { return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index c62ef747..d6652c02 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -1,17 +1,26 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { + val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents get() = KafkaEvents.EventWorkEncodeCreated override val requiredEvents: List get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated) + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + return super.onProcessEvents(event, events) + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 186f19d0..d149dabc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -1,16 +1,26 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) { + val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents get() = KafkaEvents.EventWorkExtractCreated override val requiredEvents: List get() = listOf(KafkaEvents.EventMediaParameterExtractCreated) + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${event.referenceId} triggered by ${event.event}" } + + return super.onProcessEvents(event, events) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 0fd5494f..4a7051e2 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -42,6 +42,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } + val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) { log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index 04d598ef..1eb36c4c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -46,6 +46,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${event.referenceId} triggered by ${event.event}" } + if (!requiredEvents.contains(event.event)) { log.info { "Ignored ${event.event} @ ${event.eventId}" } return null