Disabling metadata timeout
This commit is contained in:
parent
b6ee215e2f
commit
48d1eec2a4
@ -23,7 +23,7 @@ import java.time.ZoneOffset
|
|||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10
|
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0
|
||||||
|
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
@ -55,6 +55,9 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
|
|||||||
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
|
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
|
||||||
*/
|
*/
|
||||||
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
|
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
|
||||||
|
if (metadataTimeoutMinutes <= 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
val hasReadBaseInfo = events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed && it.isSuccessful() }
|
val hasReadBaseInfo = events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed && it.isSuccessful() }
|
||||||
val hasMetadataSearched = events.any { it.eventType == Events.EventMediaMetadataSearchPerformed }
|
val hasMetadataSearched = events.any { it.eventType == Events.EventMediaMetadataSearchPerformed }
|
||||||
val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)
|
val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)
|
||||||
|
|||||||
@ -1,116 +0,0 @@
|
|||||||
package no.iktdev.mediaprocessing.coordinator.utils
|
|
||||||
|
|
||||||
import mu.KotlinLogging
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.task.TaskType
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
|
|
||||||
|
|
||||||
val log = KotlinLogging.logger {}
|
|
||||||
|
|
||||||
/*
|
|
||||||
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<Event>): Map<TaskType, Boolean> {
|
|
||||||
val response = mutableMapOf<TaskType, Boolean>()
|
|
||||||
if (tasks.any { it == TaskType.Encode }) {
|
|
||||||
if (events.lastOrNull { it.isOfEvent(
|
|
||||||
Events.EventMediaParameterEncodeCreated
|
|
||||||
) } == null) {
|
|
||||||
response[TaskType.Encode] = true
|
|
||||||
log.info { "Waiting for ${Events.EventMediaParameterEncodeCreated}" }
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val convertEvent = events.lastOrNull { it.isOfEvent(Events.EventWorkConvertCreated) }
|
|
||||||
if (tasks.any { it == TaskType.Convert } && tasks.none { it == TaskType.Extract }) {
|
|
||||||
if (convertEvent == null) {
|
|
||||||
response[TaskType.Convert] = true
|
|
||||||
log.info { "Waiting for ${Events.EventWorkConvertCreated}" }
|
|
||||||
}
|
|
||||||
} else if (tasks.any { it == TaskType.Convert }) {
|
|
||||||
val extractEvent = events.lastOrNull { it.isOfEvent(Events.EventMediaParameterExtractCreated) }
|
|
||||||
if (extractEvent == null || extractEvent.isSuccess() && convertEvent == null) {
|
|
||||||
response[TaskType.Convert] = true
|
|
||||||
log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tasks.contains(TaskType.Extract)) {
|
|
||||||
if (events.lastOrNull { it.isOfEvent(
|
|
||||||
Events.EventMediaParameterExtractCreated
|
|
||||||
) } == null) {
|
|
||||||
response[TaskType.Extract] = true
|
|
||||||
log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" }
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
return response
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
fun isAwaitingTask(task: TaskType, events: List<Event>): Boolean {
|
|
||||||
val taskStatus = when (task) {
|
|
||||||
TaskType.Encode -> {
|
|
||||||
val argumentEvent = Events.EventMediaParameterEncodeCreated
|
|
||||||
val taskCreatedEvent = Events.EventWorkEncodeCreated
|
|
||||||
val taskCompletedEvent = Events.EventWorkEncodePerformed
|
|
||||||
|
|
||||||
val argument = events.findLast { it.event == argumentEvent } ?: return true
|
|
||||||
if (!argument.isSuccess()) return false
|
|
||||||
|
|
||||||
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
|
|
||||||
it.event in listOf(
|
|
||||||
argumentEvent,
|
|
||||||
taskCreatedEvent,
|
|
||||||
taskCompletedEvent
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
|
|
||||||
waiting
|
|
||||||
}
|
|
||||||
TaskType.Extract -> {
|
|
||||||
val argumentEvent = Events.EventMediaParameterExtractCreated
|
|
||||||
val taskCreatedEvent = Events.EventWorkExtractCreated
|
|
||||||
val taskCompletedEvent = Events.EventWorkExtractPerformed
|
|
||||||
|
|
||||||
val argument = events.findLast { it.event == argumentEvent } ?: return true
|
|
||||||
if (!argument.isSuccess()) return false
|
|
||||||
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
|
|
||||||
it.event in listOf(
|
|
||||||
argumentEvent,
|
|
||||||
taskCreatedEvent,
|
|
||||||
taskCompletedEvent
|
|
||||||
)
|
|
||||||
}
|
|
||||||
val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
|
|
||||||
waiting
|
|
||||||
}
|
|
||||||
TaskType.Convert -> {
|
|
||||||
|
|
||||||
val extractEvents = events.findLast { it.isOfEvent(Events.EventMediaParameterExtractCreated) }
|
|
||||||
if (extractEvents == null || extractEvents.isSkipped()) {
|
|
||||||
false
|
|
||||||
} else {
|
|
||||||
val taskCreatedEvent = Events.EventWorkConvertCreated
|
|
||||||
val taskCompletedEvent = Events.EventWorkConvertPerformed
|
|
||||||
|
|
||||||
val argument = events.findLast { it.event == taskCreatedEvent } ?: return true
|
|
||||||
if (!argument.isSuccess()) return false
|
|
||||||
|
|
||||||
val trailingEvents = PersistentMessageHelper(events).getEventsRelatedTo(argument.eventId).filter {
|
|
||||||
it.event in listOf(
|
|
||||||
taskCreatedEvent,
|
|
||||||
taskCompletedEvent
|
|
||||||
)
|
|
||||||
}
|
|
||||||
val waiting = trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size != trailingEvents.filter { it.isOfEvent(taskCreatedEvent) }.size
|
|
||||||
waiting
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (taskStatus) {
|
|
||||||
log.info { "isAwaiting for $task" }
|
|
||||||
}
|
|
||||||
return taskStatus
|
|
||||||
}*/
|
|
||||||
Loading…
Reference in New Issue
Block a user