diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index ffd7c491..2638ff93 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson @@ -40,23 +41,26 @@ class ConvertWorkTaskListener: WorkTaskListener() { return false } - if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) { - return false - } val producedEvents = events.filter { it.eventType == produceEvent } val shouldIHandleAndProduce = producedEvents.none { it.derivedFromEventId() == incomingEvent.eventId() } val extractedEvent = events.findFirstEventOf() if (extractedEvent?.isSuccessful() == true && shouldIHandleAndProduce) { log.info { "Permitting handling of event: ${extractedEvent.data?.outputFile}" } - } - val startedWithOperations = events.findFirstEventOf()?.data?.operations ?: return false - if (startedWithOperations.isOnly(OperationEvents.CONVERT) && shouldIHandleAndProduce) { + val startOperation = events.findFirstOf(Events.ProcessStarted)?.dataAs() + if (startOperation == null) { + log.error { "Could not find 'ProcessStarted' event" } + return false + } + + if (startOperation.data?.operations?.isOnly(OperationEvents.CONVERT) == true) { + log.info { "StartOperation should only be Convert, ${WGson.toJson(startOperation)}" } return true + } else { + return shouldIHandleAndProduce } - return shouldIHandleAndProduce } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { val event = incomingEvent.consume() diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index 0f4e296f..fdc907ed 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -47,6 +47,11 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { Events.ProcessCompleted ) + private val internalFilter = listOf( + Events.BaseInfoRead, + Events.MetadataSearchPerformed, + Events.ProcessCompleted + ) val metadataTimeout = metadataTimeoutMinutes * 60 @@ -57,6 +62,12 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { return true } + override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { + if (!super.shouldIProcessAndHandleEvent(incomingEvent, events)) + return false + return (events.any { it.eventType == Events.BaseInfoRead }) + } + /** * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event */ @@ -65,41 +76,53 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { return } + val searchPerformedEvent: MediaMetadataReceivedEvent? = events.findEventOf() + + if (searchPerformedEvent != null) { + if (timeoutJobs.containsKey(searchPerformedEvent.referenceId())) { + val job = timeoutJobs.remove(searchPerformedEvent.referenceId()) + job?.cancel() + } + } + + val baseInfo = events.findFirstEventOf() if (baseInfo?.isSuccessful() != true) { return } - val digestEvent = incomingEvent.consume() ?: return - if (digestEvent.eventType == Events.BaseInfoRead) { - if (!timeoutJobs.containsKey(digestEvent.referenceId())) { - timeoutScope.launch { - val expiryTime = (Instant.now().epochSecond + metadataTimeout) - val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC) - val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) - log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } - delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis()) - coordinator!!.produceNewEvent( - MediaMetadataReceivedEvent( - metadata = EventMetadata( - referenceId = digestEvent.referenceId(), - derivedFromEventId = digestEvent.eventId(), - status = EventStatus.Skipped, - source = getProducerName() - ) - ) - - ) - }.also { - timeoutJobs[digestEvent.referenceId()] = it - } - } else { - log.error { "Timeout for ${digestEvent.referenceId()} has already been set!" } + if (incomingEvent.isOfEvent(Events.BaseInfoRead)) { + val digestEvent = incomingEvent.consume() ?: return + if (timeoutJobs.containsKey(digestEvent.referenceId())) + return + val ttsc = timeoutScope.launch { + createTimeout(digestEvent.referenceId(), digestEvent.eventId(), baseInfo) } - } else { - timeoutJobs.remove(digestEvent.referenceId())?.cancel() + timeoutJobs[digestEvent.referenceId()] = ttsc } } + + suspend fun createTimeout(referenceId: String, eventId: String, baseInfo: BaseInfoEvent) { + val expiryTime = (Instant.now().epochSecond + metadataTimeout) + val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC) + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) + log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } + delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis()) + if (!this.isActive()) { + return + } + coordinator!!.produceNewEvent( + MediaMetadataReceivedEvent( + metadata = EventMetadata( + referenceId = referenceId, + derivedFromEventId = eventId, + status = EventStatus.Skipped, + source = getProducerName() + ) + ) + + ) + } } \ No newline at end of file