Changed behaviour

This commit is contained in:
bskjon 2024-04-21 20:16:09 +02:00
parent fecd87c136
commit 7663f4d7e5

View File

@ -15,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerfo
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
@ -57,23 +58,34 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage>
): MessageDataWrapper? {
val convertEvent =
events.find { it.event == KafkaEvents.EventWorkConvertCreated && it.data is ConvertWorkerRequest }
if (convertEvent == null) {
// No convert here..
return null
val waitsForEventId = if (event.event == KafkaEvents.EventWorkConvertCreated) {
// Do convert check
val convertRequest = event.data as ConvertWorkerRequest? ?: return null
convertRequest.requiresEventId
} else if (event.event == KafkaEvents.EventWorkExtractPerformed) {
if (event.data is ProcesserExtractWorkPerformed) event.data.derivedFromEventId else return null
} else null
val convertData = if (event.event == KafkaEvents.EventWorkConvertCreated) {
event.data as ConvertWorkerRequest? ?: return null
} else {
val convertEvent = events.find { it.referenceId == event.referenceId && it.event == KafkaEvents.EventWorkConvertCreated } ?: return null
convertEvent.data as ConvertWorkerRequest? ?: return null
}
val convertRequest = convertEvent.data as ConvertWorkerRequest? ?: return null
val requiredEventId = convertRequest.requiresEventId
if (requiredEventId != null) {
if (waitsForEventId != null) {
// Requires the eventId to be defined as consumed
val requiredEventToBeCompleted = getRequiredExtractProcessForContinuation(
referenceId = event.referenceId,
requiresEventId = requiredEventId
requiresEventId = waitsForEventId
)
if (requiredEventToBeCompleted == null) {
log.warn { "$requiredEventId extract event with eventId: $requiredEventId was not found" }
log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" }
/*log.info { "Sending ${event.eventId} @ ${event.referenceId} to deferred check" }
val existing = scheduled_deferred_events[event.referenceId]
val newList = (existing ?: listOf()) + listOf(
DerivedProcessIterationHolder(
@ -81,7 +93,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
event = convertEvent
)
)
scheduled_deferred_events[event.referenceId] = newList
scheduled_deferred_events[event.referenceId] = newList*/
return null
}
@ -106,8 +118,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
return null
}
val payload = event.data as ConvertWorkerRequest
val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
val converter = Converter(referenceId = event.referenceId, eventId = event.eventId, data = convertData)
if (!converter.canRead()) {
// Make claim regardless but push to schedule
return ConvertWorkPerformed(
@ -179,7 +190,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
/*val scheduled_deferred_events: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
@Scheduled(fixedDelay = (300_000))
fun validatePresenceOfRequiredEvent() {
val continueDeferral: MutableMap<String, List<DerivedProcessIterationHolder>> = mutableMapOf()
@ -217,6 +228,6 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
scheduled_deferred_events.clear()
scheduled_deferred_events.putAll(continueDeferral)
}
}*/
}