diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index 9d50a9dc..0033a08e 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -7,12 +7,16 @@ import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service +@EnableScheduling @Service class ConverterCoordinator() : CoordinatorBase() { val io = Coroutines.io() @@ -36,6 +40,7 @@ class ConverterCoordinator() : CoordinatorBase en == it.eventId } } + + myEvents.forEach { + eventManager.setProcessEvent(it.event, Message( + referenceId = it.referenceId, + eventId = it.eventId, + data = it.data + )) + } + } + + fun readAllMessagesFor(referenceId: String, eventId: String) { val messages = eventManager.getProcessEventsClaimable() // persistentReader.getAvailableProcessEvents() createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) } + @Scheduled(fixedDelay = (5*6_0000)) + fun checkForWork() { + log.info { "Checking if there is any work to do.." } + readAllInQueue() + generateMissingEvents() + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 9afa9246..cbca7155 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -76,14 +76,17 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task StartOperationEvents.CONVERT to hasConvertAndIsRequired ) - - if (missingRequired.values.any { !it }) { log.info { "Waiting for ${missingRequired.entries.filter { !it.value }.map { it.key.name }}" } return null } - + val ch = CompleteHandler(events) + val chEvents = ch.getMissingCompletions() + if (chEvents.isNotEmpty()) { + log.info { "Waiting for ${chEvents.joinToString { "," }}" } + return null + } @@ -93,4 +96,35 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task } return null } + + class CompleteHandler(val events: List) { + var report: Map = listOf( + EventReport.fromEvents(EventWorkEncodeCreated, events), + EventReport.fromEvents(EventWorkExtractCreated, events), + EventReport.fromEvents(EventWorkConvertCreated, events), + + EventReport.fromEvents(EventWorkEncodePerformed, events), + EventReport.fromEvents(EventWorkExtractPerformed, events), + EventReport.fromEvents(EventWorkConvertPerformed, events) + ).associate { it.event to it.count } + + fun getMissingCompletions(): List { + val missings = mutableListOf() + if (report[EventWorkEncodeCreated] != report[EventWorkEncodePerformed]) + missings.add(StartOperationEvents.ENCODE) + if (report[EventWorkExtractCreated] != report[EventWorkExtractPerformed]) + missings.add(StartOperationEvents.EXTRACT) + if (report[EventWorkConvertCreated] == report[EventWorkConvertPerformed]) + missings.add(StartOperationEvents.CONVERT) + return missings + } + + data class EventReport(val event: KafkaEvents, val count: Int) { + companion object { + fun fromEvents(event: KafkaEvents, events: List): EventReport { + return EventReport(event = event, count = events.filter { it.event == event }.size) + } + } + } + } } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index 0fcc7149..7c0319da 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -96,28 +96,23 @@ class Coordinator(): CoordinatorBase lists.any { it.event in processKafkaEvents } } - - - val validMessages = usableMessages.filter { lists -> - lists.any { it.event == KafkaEvents.EventMediaProcessStarted && (it.data as MediaProcessStarted).type == ProcessType.FLOW } || - lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } - } - .flatten() + val myEvents = messages.flatten() .filter { it.event in processKafkaEvents } + .filter { existing.none { en -> en == it.eventId } } - validMessages.filter { it.eventId !in existing }.forEach { + myEvents.forEach { eventManager.setProcessEvent(it.event, Message( referenceId = it.referenceId, eventId = it.eventId, data = it.data )) } + } fun readAllMessagesFor(referenceId: String, eventId: String) { @@ -139,6 +134,7 @@ class Coordinator(): CoordinatorBase