Updated check

This commit is contained in:
bskjon 2024-07-01 21:13:28 +02:00
parent 44e34cead6
commit 8024583d36
2 changed files with 12 additions and 6 deletions

View File

@ -83,7 +83,7 @@ class CompleteMediaTask(@Autowired override var coordinator: EventCoordinator) :
val isWaitingForPrecondition = isAwaitingPrecondition(taskEvents, events) val isWaitingForPrecondition = isAwaitingPrecondition(taskEvents, events)
if (isWaitingForPrecondition.isNotEmpty()) { if (isWaitingForPrecondition.isNotEmpty()) {
log.info { "Waiting for preconditions: ${isWaitingForPrecondition.keys.joinToString(",") }}" } log.info { "Waiting for preconditions: ${isWaitingForPrecondition.keys.joinToString(",") }" }
return null return null
} }

View File

@ -1,10 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.utils package no.iktdev.mediaprocessing.coordinator.utils
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.* import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.common.task.Task import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
val log = KotlinLogging.logger {}
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage>): Map<TaskType, Boolean> { fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage>): Map<TaskType, Boolean> {
val response = mutableMapOf<TaskType, Boolean>() val response = mutableMapOf<TaskType, Boolean>()
@ -13,20 +15,22 @@ fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage
KafkaEvents.EventMediaParameterEncodeCreated KafkaEvents.EventMediaParameterEncodeCreated
) } == null) { ) } == null) {
response[TaskType.Encode] = true response[TaskType.Encode] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterEncodeCreated}" }
} }
} }
val convertEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventWorkConvertCreated) } == null
if (tasks.any { it == TaskType.Convert } && tasks.none {it == TaskType.Extract}) { if (tasks.any { it == TaskType.Convert } && tasks.none {it == TaskType.Extract}) {
if (events.lastOrNull { it.isOfEvent( if (convertEvent) {
KafkaEvents.EventWorkConvertCreated
) } == null) {
response[TaskType.Convert] = true response[TaskType.Convert] = true
log.info { "Waiting for ${KafkaEvents.EventWorkConvertCreated}" }
} }
} else if (tasks.any { it == TaskType.Convert }) { } else if (tasks.any { it == TaskType.Convert }) {
val extractEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) } val extractEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }
if (extractEvent == null || extractEvent.isSuccess()) { if (extractEvent == null || extractEvent.isSuccess() && !convertEvent) {
response[TaskType.Convert] = true response[TaskType.Convert] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterExtractCreated}" }
} }
} }
@ -35,6 +39,8 @@ fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage
KafkaEvents.EventMediaParameterExtractCreated KafkaEvents.EventMediaParameterExtractCreated
) } == null) { ) } == null) {
response[TaskType.Extract] = true response[TaskType.Extract] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterExtractCreated}" }
} }
} }