diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 8a0fa730..001d515d 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -15,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerfo import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled @@ -57,23 +58,34 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) event: PersistentProcessDataMessage, events: List ): MessageDataWrapper? { - val convertEvent = - events.find { it.event == KafkaEvents.EventWorkConvertCreated && it.data is ConvertWorkerRequest } - if (convertEvent == null) { - // No convert here.. - return null + + val waitsForEventId = if (event.event == KafkaEvents.EventWorkConvertCreated) { + // Do convert check + val convertRequest = event.data as ConvertWorkerRequest? ?: return null + convertRequest.requiresEventId + + } else if (event.event == KafkaEvents.EventWorkExtractPerformed) { + if (event.data is ProcesserExtractWorkPerformed) event.data.derivedFromEventId else return null + } else null + + val convertData = if (event.event == KafkaEvents.EventWorkConvertCreated) { + event.data as ConvertWorkerRequest? ?: return null + } else { + val convertEvent = events.find { it.referenceId == event.referenceId && it.event == KafkaEvents.EventWorkConvertCreated } ?: return null + convertEvent.data as ConvertWorkerRequest? ?: return null } - val convertRequest = convertEvent.data as ConvertWorkerRequest? ?: return null - val requiredEventId = convertRequest.requiresEventId - if (requiredEventId != null) { + + + + + if (waitsForEventId != null) { // Requires the eventId to be defined as consumed val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation( referenceId = event.referenceId, - requiresEventId = requiredEventId + requiresEventId = waitsForEventId ) if (requiredEventToBeCompleted == null) { - log.warn { "$requiredEventId extract event with eventId: $requiredEventId was not found" } - log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" } + /*log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" } val existing = scheduled_deferred_events[event.referenceId] val newList = (existing ?: listOf()) + listOf( DerivedProcessIterationHolder( @@ -81,7 +93,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) event = convertEvent ) ) - scheduled_deferred_events[event.referenceId] = newList + scheduled_deferred_events[event.referenceId] = newList*/ return null } @@ -106,8 +118,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) return null } - val payload = event.data as ConvertWorkerRequest - val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) + val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = convertData) if (!converter.canRead()) { // Make claim regardless but push to schedule return ConvertWorkPerformed( @@ -179,7 +190,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) } - val scheduled_deferred_events: MutableMap> = mutableMapOf() + /*val scheduled_deferred_events: MutableMap> = mutableMapOf() @Scheduled(fixedDelay = (300_000)) fun validatePresenceOfRequiredEvent() { val continueDeferral: MutableMap> = mutableMapOf() @@ -217,6 +228,6 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) scheduled_deferred_events.clear() scheduled_deferred_events.putAll(continueDeferral) - } + }*/ } \ No newline at end of file