From 50b171df65bd054a7ff88097954296d1701903f2 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 13 Apr 2024 19:45:10 +0200 Subject: [PATCH] Adjustments --- .../mediaprocessing/processer/Coordinator.kt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 2625f0d1..9e5f3463 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.JsonDataMessage import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled @@ -46,6 +47,7 @@ class Coordinator(): CoordinatorBase list.any { it.event in processKafkaEvents } } + .filter { list -> + list.any { it.event == KafkaEvents.EventMediaProcessStarted } || + (list.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } && list.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED }) + }.flatten().filter { it.event in processKafkaEvents } + + validMessages.filter { it.eventId !in existing }.forEach { + eventManager.setProcessEvent(it.event, Message( + referenceId = it.referenceId, + eventId = it.eventId, + data = it.data + )) + } + } + fun readAllMessagesFor(referenceId: String, eventId: String) { val messages = eventManager.getProcessEventsClaimable() createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)