diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index 0206ad94..1bdeeb1c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -23,7 +23,7 @@ import java.time.ZoneOffset import java.time.format.DateTimeFormatter import java.util.* -val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10 +val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0 @Service @@ -55,6 +55,9 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event */ override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { + if (metadataTimeoutMinutes <= 0) { + return + } val hasReadBaseInfo = events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed && it.isSuccessful() } val hasMetadataSearched = events.any { it.eventType == Events.EventMediaMetadataSearchPerformed } val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt deleted file mode 100644 index 8516ef7d..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/utils/TasksUtil.kt +++ /dev/null @@ -1,116 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.utils - -import mu.KotlinLogging -import no.iktdev.mediaprocessing.shared.common.task.TaskType -import no.iktdev.mediaprocessing.shared.common.contract.data.Event - -val log = KotlinLogging.logger {} - -/* -fun isAwaitingPrecondition(tasks: List, events: List): Map { - val response = mutableMapOf() - if (tasks.any { it == TaskType.Encode }) { - if (events.lastOrNull { it.isOfEvent( - Events.EventMediaParameterEncodeCreated - ) } == null) { - response[TaskType.Encode] = true - log.info { "Waiting for ${Events.EventMediaParameterEncodeCreated}" } - - } - } - - val convertEvent = events.lastOrNull { it.isOfEvent(Events.EventWorkConvertCreated) } - if (tasks.any { it == TaskType.Convert } && tasks.none { it == TaskType.Extract }) { - if (convertEvent == null) { - response[TaskType.Convert] = true - log.info { "Waiting for ${Events.EventWorkConvertCreated}" } - } - } else if (tasks.any { it == TaskType.Convert }) { - val extractEvent = events.lastOrNull { it.isOfEvent(Events.EventMediaParameterExtractCreated) } - if (extractEvent == null || extractEvent.isSuccess() && convertEvent == null) { - response[TaskType.Convert] = true - log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" } - } - } - - if (tasks.contains(TaskType.Extract)) { - if (events.lastOrNull { it.isOfEvent( - Events.EventMediaParameterExtractCreated - ) } == null) { - response[TaskType.Extract] = true - log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" } - - } - } - - - return response -} - - -fun isAwaitingTask(task: TaskType, events: List): Boolean { - val taskStatus = when (task) { - TaskType.Encode -> { - val argumentEvent = Events.EventMediaParameterEncodeCreated - val taskCreatedEvent = Events.EventWorkEncodeCreated - val taskCompletedEvent = Events.EventWorkEncodePerformed - - val argument = events.findLast { it.event == argumentEvent } ?: return true - if (!argument.isSuccess()) return false - - val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { - it.event in listOf( - argumentEvent, - taskCreatedEvent, - taskCompletedEvent - ) - } - - val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size - waiting - } - TaskType.Extract -> { - val argumentEvent = Events.EventMediaParameterExtractCreated - val taskCreatedEvent = Events.EventWorkExtractCreated - val taskCompletedEvent = Events.EventWorkExtractPerformed - - val argument = events.findLast { it.event == argumentEvent } ?: return true - if (!argument.isSuccess()) return false - val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { - it.event in listOf( - argumentEvent, - taskCreatedEvent, - taskCompletedEvent - ) - } - val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size - waiting - } - TaskType.Convert -> { - - val extractEvents = events.findLast { it.isOfEvent(Events.EventMediaParameterExtractCreated) } - if (extractEvents == null || extractEvents.isSkipped()) { - false - } else { - val taskCreatedEvent = Events.EventWorkConvertCreated - val taskCompletedEvent = Events.EventWorkConvertPerformed - - val argument = events.findLast { it.event == taskCreatedEvent } ?: return true - if (!argument.isSuccess()) return false - - val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter { - it.event in listOf( - taskCreatedEvent, - taskCompletedEvent - ) - } - val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size - waiting - } - } - } - if (taskStatus) { - log.info { "isAwaiting for $task" } - } - return taskStatus -}*/ \ No newline at end of file