This commit is contained in:
Brage 2024-03-14 01:49:19 +01:00
parent 24db5444f2
commit 2d263d009e

View File

@ -60,8 +60,6 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
if (getProcessStarted(messages)?.type == ProcessType.FLOW) { if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents( forwarder.produceAllMissingProcesserEvents(
producer = producer, producer = producer,
referenceId = referenceId,
eventId = eventId,
messages = messages messages = messages
) )
} else { } else {
@ -91,6 +89,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
messages.forEach { messages.forEach {
delay(1000) delay(1000)
listeners.forwardBatchEventMessagesToListeners(it) listeners.forwardBatchEventMessagesToListeners(it)
if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(it)) {
if (getProcessStarted(it)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents(
producer = producer,
messages = it
)
}
}
} }
} }
} }
@ -145,40 +151,34 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
.isNotEmpty() .isNotEmpty()
} }
fun isMissingEncodeWorkCreated(messages: List<PersistentMessage>): Boolean { fun isMissingEncodeWorkCreated(messages: List<PersistentMessage>): PersistentMessage? {
val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED } val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED }
return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() } return if (existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }) {
messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED }
} else null
} }
fun isMissingExtractWorkCreated(messages: List<PersistentMessage>): Boolean { fun isMissingExtractWorkCreated(messages: List<PersistentMessage>): PersistentMessage? {
val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() } return if (existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }) {
messages.lastOrNull { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }
} else null
} }
fun produceAllMissingProcesserEvents( fun produceAllMissingProcesserEvents(
producer: CoordinatorProducer, producer: CoordinatorProducer,
referenceId: String,
eventId: String,
messages: List<PersistentMessage> messages: List<PersistentMessage>
) { ) {
val currentMessage = messages.find { it.eventId == eventId } val missingEncode = isMissingEncodeWorkCreated(messages)
if (!currentMessage?.data.isSuccess()) { val missingExtract = isMissingExtractWorkCreated(messages)
return
if (missingEncode != null && missingEncode.data.isSuccess()) {
produceEncodeWork(producer, missingEncode)
} }
when (currentMessage?.event) { if (missingExtract != null && missingExtract.data.isSuccess()) {
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED -> { produceExtractWork(producer, missingExtract)
if (isMissingEncodeWorkCreated(messages)) {
produceEncodeWork(producer, currentMessage)
}
}
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> {
if (isMissingExtractWorkCreated(messages)) {
produceExtractWork(producer, currentMessage)
}
}
else -> {}
} }
} }