Adjustments

This commit is contained in:
bskjon 2024-04-13 20:41:46 +02:00
parent 50b171df65
commit 66f35401bc

View File

@ -97,10 +97,12 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
val existing = eventManager.getAllProcessEvents().map { it.eventId }
val messages = eventManager.getEventsUncompleted()
val validMessages = messages.filter { list -> list.any { it.event in processKafkaEvents } }
.filter { list ->
list.any { it.event == KafkaEvents.EventMediaProcessStarted } ||
(list.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } && list.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED })
val usableMessages = messages.filter { lists -> lists.any { it.event in processKafkaEvents } }
val validMessages = usableMessages.filter { lists -> lists.any { it.event == KafkaEvents.EventMediaProcessStarted } ||
(lists.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED } && lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } )
}.flatten().filter { it.event in processKafkaEvents }
validMessages.filter { it.eventId !in existing }.forEach {