diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index 02b9d9fc..938b56ff 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -24,12 +24,19 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean -val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0 +val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: -1 @Service class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { + override fun onReady() { + super.onReady() + if (metadataTimeoutMinutes == 0) { + log.warn { "Metadata timeout is set to 0 minutes.. This will block proceeding until metadata is found.." } + } + } + override fun getProducerName(): String { return this::class.java.simpleName } @@ -83,10 +90,19 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event */ override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { - if (metadataTimeoutMinutes <= 0) { - return + if (metadataTimeoutMinutes <= -1) { + log.info { "Metadata has no timeout, a timeout will be created.." } + val meta = incomingEvent.metadata() + onProduceEvent(MediaMetadataReceivedEvent( + metadata = meta.copy( + status = EventStatus.Failed, + source = getProducerName() + ), + data = null + )) } + val searchPerformedEvent: MediaMetadataReceivedEvent? = events.findEventOf() if (searchPerformedEvent != null) { @@ -98,19 +114,17 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { val baseInfo = events.findFirstEventOf() - if (baseInfo?.isSuccessful() != true) { return } if (incomingEvent.isOfEvent(Events.BaseInfoRead)) { - val digestEvent = incomingEvent.consume() ?: return - if (timeoutJobs.containsKey(digestEvent.referenceId())) + if (timeoutJobs.containsKey(incomingEvent.metadata().referenceId)) return val ttsc = timeoutScope.launch { - createTimeout(digestEvent.referenceId(), digestEvent.eventId(), baseInfo) + createTimeout(incomingEvent.metadata().referenceId, incomingEvent.metadata().eventId, baseInfo) } - timeoutJobs[digestEvent.referenceId()] = ttsc + timeoutJobs[incomingEvent.metadata().referenceId] = ttsc } }