From d6a8ea62977c89d132753789aa65afbbb0a31ea8 Mon Sep 17 00:00:00 2001 From: bskjon Date: Mon, 22 Apr 2024 00:17:14 +0200 Subject: [PATCH] Creating a cache of "consumed" eventIds --- .../coordinator/Coordinator.kt | 1 - .../mediaprocessing/coordinator/Task.kt | 26 +++++++++++++++++++ .../tasks/event/BaseInfoFromFile.kt | 1 + .../tasks/event/CollectAndStoreTask.kt | 2 ++ .../tasks/event/CompleteMediaTask.kt | 2 ++ .../tasks/event/CreateConvertWorkTask.kt | 2 ++ .../tasks/event/CreateEncodeWorkTask.kt | 2 ++ .../tasks/event/CreateExtractWorkTask.kt | 2 ++ .../tasks/event/DownloadAndStoreCoverTask.kt | 2 ++ .../event/MetadataAndBaseInfoToCoverTask.kt | 2 ++ .../event/MetadataAndBaseInfoToFileOut.kt | 2 ++ .../tasks/event/ParseVideoFileStreams.kt | 2 ++ .../tasks/event/ReadVideoFileStreams.kt | 2 ++ .../event/ffmpeg/EncodeArgumentCreatorTask.kt | 3 ++- .../ffmpeg/ExtractArgumentCreatorTask.kt | 2 ++ .../shared/common/tasks/TaskCreatorImpl.kt | 20 ++++++++++++++ 16 files changed, 71 insertions(+), 2 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 4041bf74..da66f044 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -34,7 +34,6 @@ class Coordinator() : CoordinatorBase(coordinator) { + override fun isPrerequisiteEventsOk(events: List): Boolean { val currentEvents = events.map { it.event } return requiredEvents.all { currentEvents.contains(it) } @@ -39,4 +41,28 @@ abstract class TaskCreator(coordinator: Coordinator): return listOf() } + /** + * Will always return null + */ + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + val referenceId = event.referenceId + val eventIds = events.filter { it.event in requiredEvents }.map { it.eventId } + + val current = processedEvents[referenceId] ?: setOf() + current.toMutableSet().addAll(eventIds) + processedEvents[referenceId] = current + + return null + } + + override fun containsUnprocessedEvents(events: List): Boolean { + val referenceId = events.firstOrNull()?.referenceId ?:return false + val preExistingEvents = processedEvents[referenceId]?: setOf() + + val forwardedEvents = events.filter { it.event in requiredEvents }.map { it.eventId } + val newEvents = forwardedEvents.filter { it !in preExistingEvents } + return newEvents.isNotEmpty() + + } + } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index 2401a636..e64ca40b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -33,6 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null return readFileInfo(selected.data as MediaProcessStarted, event.eventId) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index 4d89aacc..c74cede0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -45,6 +45,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index fbe995ce..ffa1ba8c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -34,6 +34,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 8a0805b9..2f516331 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -27,6 +27,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } // Check what it is and create based on it diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index 1ad52efb..d2d949d3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -19,6 +19,8 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : C get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterEncodeCreated) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 31761d6b..6cb3b3d6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -19,6 +19,8 @@ class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : get() = listOf(KafkaEvents.EventMediaParameterExtractCreated) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterExtractCreated) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 5b609c6f..95c93513 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -39,6 +39,8 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index d43ce21a..8cffae84 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -37,6 +37,8 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 37c8c194..382d6dce 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -50,6 +50,8 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val baseInfo = events.lastOrSuccessOf(KafkaEvents.EventMediaReadBaseInfoPerformed) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index ad3cb44c..5625a96a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -38,6 +38,8 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 22043288..10b1de72 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -41,6 +41,8 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index b8aef299..85ddddd1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -41,6 +41,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted @@ -68,7 +70,6 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator //val outDir = SharedConfig.outgoingContent.using(baseInfo.title) - return getFfmpegVideoArguments( inputFile = inputFile.file, outFullName = videoInfo.fullName, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index 38a222b7..a94d83ec 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -45,6 +45,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + super.onProcessEvents(event, events) + log.info { "${event.referenceId} triggered by ${event.event}" } if (!requiredEvents.contains(event.event)) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt index 102aaf82..b7c07a7a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -1,6 +1,8 @@ package no.iktdev.mediaprocessing.shared.common.tasks +import mu.KotlinLogging import no.iktdev.mediaprocessing.shared.common.CoordinatorBase +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -10,6 +12,9 @@ import javax.annotation.PostConstruct abstract class TaskCreatorImpl, V, L : EventBasedMessageListener>( open var coordinator: C ) : ITaskCreatorListener { + private val log = KotlinLogging.logger {} + + protected open val processedEvents: MutableMap> = mutableMapOf() companion object { fun isInstanceOfTaskCreatorImpl(clazz: Class): Boolean { @@ -82,6 +87,7 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa private val context: MutableMap = mutableMapOf() private val context_key_reference = "reference" private val context_key_producesEvent = "event" + final override fun onEventReceived(referenceId: String, event: V, events: List) { context[context_key_reference] = referenceId getListener().producesEvent.let { @@ -89,6 +95,12 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa } if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) { + + if (!containsUnprocessedEvents(events)) { + log.warn { "Event register blocked proceeding" } + return + } + val result = onProcessEvents(event, events) if (result != null) { onResult(result) @@ -99,7 +111,15 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa } } + /** + * This function is intended to cache the referenceId and its eventid's + * This is to prevent dupliation + * */ + abstract fun containsUnprocessedEvents(events: List): Boolean + + protected fun onResult(data: MessageDataWrapper) { + producer.sendMessage( referenceId = context[context_key_reference] as String, event = context[context_key_producesEvent] as KafkaEvents,