From e5bb1c173308e597ac225ebbda34afe3bf218adc Mon Sep 17 00:00:00 2001 From: bskjon Date: Mon, 22 Apr 2024 02:07:57 +0200 Subject: [PATCH] Adjustment --- .../kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt | 6 ++++-- .../coordinator/tasks/event/BaseInfoFromFile.kt | 2 +- .../coordinator/tasks/event/CollectAndStoreTask.kt | 3 +-- .../coordinator/tasks/event/CompleteMediaTask.kt | 2 +- .../coordinator/tasks/event/CreateConvertWorkTask.kt | 2 +- .../coordinator/tasks/event/CreateEncodeWorkTask.kt | 2 +- .../coordinator/tasks/event/CreateExtractWorkTask.kt | 2 +- .../coordinator/tasks/event/DownloadAndStoreCoverTask.kt | 2 +- .../tasks/event/MetadataAndBaseInfoToCoverTask.kt | 2 +- .../coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt | 2 +- .../coordinator/tasks/event/ParseVideoFileStreams.kt | 2 +- .../coordinator/tasks/event/ReadVideoFileStreams.kt | 2 +- .../tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt | 2 +- .../tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt | 2 +- 14 files changed, 17 insertions(+), 16 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt index 97a4dea8..25c0a6c2 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt @@ -44,7 +44,7 @@ abstract class TaskCreator(coordinator: Coordinator): /** * Will always return null */ - override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + open fun onProcessEventsAccepted(event: PersistentMessage, events: List) { val referenceId = event.referenceId val eventIds = events.filter { it.event in requiredEvents + listensForEvents }.map { it.eventId } @@ -52,7 +52,9 @@ abstract class TaskCreator(coordinator: Coordinator): current.toMutableSet().addAll(eventIds) processedEvents[referenceId] = current - return null + if (event.event == KafkaEvents.EventCollectAndStore) { + processedEvents.remove(referenceId) + } } override fun containsUnprocessedEvents(events: List): Boolean { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index e64ca40b..c67c6883 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -33,7 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null return readFileInfo(selected.data as MediaProcessStarted, event.eventId) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index c74cede0..421ffd70 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -45,8 +45,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) - + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index ffa1ba8c..b1d79d8a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -34,7 +34,7 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 2f516331..9ae05fde 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -27,7 +27,7 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt index d2d949d3..5ea121a0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateEncodeWorkTask.kt @@ -19,7 +19,7 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : C get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt index 6cb3b3d6..1de34804 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateExtractWorkTask.kt @@ -19,7 +19,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : get() = listOf(KafkaEvents.EventMediaParameterExtractCreated) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index 95c93513..b467312f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -39,7 +39,7 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index 8cffae84..3c6d9e8c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -37,7 +37,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 382d6dce..50e40a6d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -50,7 +50,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index 5625a96a..9731f50f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -38,7 +38,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 10b1de72..99238761 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -41,7 +41,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 85ddddd1..7290a0b1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -41,7 +41,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index a94d83ec..75053d34 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -45,7 +45,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - super.onProcessEvents(event, events) + super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} triggered by ${event.event}" }