diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt index 22a39a64..f1540206 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt @@ -8,10 +8,9 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfo -import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfoEvent -import no.iktdev.mediaprocessing.shared.common.contract.data.Event -import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @@ -30,6 +29,17 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { return this::class.java.simpleName } + override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { + if (!super.shouldIProcessAndHandleEvent(incomingEvent, events)) { + return false + } + val startedWith = events.findFirstEventOf()?.data?.operations; + if (startedWith?.isOnly(OperationEvents.CONVERT) == true) { + return false + } + return true + } + override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { val event = incomingEvent.consume() if (event == null) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index 73270b41..5399f74b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -28,7 +28,8 @@ class ConvertWorkTaskListener: WorkTaskListener() { override var coordinator: Coordinator? = null override val produceEvent: Events = Events.ConvertTaskCreated override val listensForEvents: List = listOf( - Events.ExtractTaskCompleted + Events.ExtractTaskCompleted, + Events.ProcessStarted ) override fun canProduceMultipleEvents(): Boolean { @@ -49,6 +50,10 @@ class ConvertWorkTaskListener: WorkTaskListener() { if (shouldIHandleAndProduce) { log.info { "Permitting handling of event: ${incomingEvent.dataAs()?.outputFile}" } } + val startedWithOperations = events.findFirstEventOf()?.data?.operations ?: return false + if (startedWithOperations.isOnly(OperationEvents.CONVERT)) { + return true + } return shouldIHandleAndProduce } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index 0afe3474..0f4e296f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -1,33 +1,33 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners +import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson -import no.iktdev.eventi.data.EventMetadata -import no.iktdev.eventi.data.EventStatus -import no.iktdev.eventi.data.isSuccessful +import no.iktdev.eventi.data.* import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.eventi.database.toEpochSeconds import no.iktdev.mediaprocessing.shared.common.contract.Events -import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfoEvent -import no.iktdev.mediaprocessing.shared.common.contract.data.Event -import no.iktdev.mediaprocessing.shared.common.contract.data.MediaMetadataReceivedEvent -import no.iktdev.mediaprocessing.shared.common.contract.data.az +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.contract.lastOrSuccessOf import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service +import java.time.Duration +import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset import java.time.format.DateTimeFormatter import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0 @Service -@EnableScheduling class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { override fun getProducerName(): String { @@ -43,13 +43,19 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { override val produceEvent: Events = Events.MetadataSearchPerformed override val listensForEvents: List = listOf( Events.BaseInfoRead, - Events.MetadataSearchPerformed + Events.MetadataSearchPerformed, + Events.ProcessCompleted ) val metadataTimeout = metadataTimeoutMinutes * 60 - val waitingProcessesForMeta: MutableMap = mutableMapOf() + private val timeoutScope = CoroutineScope(Dispatchers.Default) + val timeoutJobs = ConcurrentHashMap() + + override fun shouldIHandleFailedEvents(incomingEvent: Event): Boolean { + return true + } /** * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event @@ -58,68 +64,42 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { if (metadataTimeoutMinutes <= 0) { return } - val hasReadBaseInfo = events.any { it.eventType == Events.BaseInfoRead && it.isSuccessful() } - val hasMetadataSearched = events.any { it.eventType == Events.MetadataSearchPerformed } - val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId) - if (!hasReadBaseInfo) { + val baseInfo = events.findFirstEventOf() + + if (baseInfo?.isSuccessful() != true) { return } - if (hasMetadataSearched) { - waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId) - return - } else if (!hasPollerForMetadataEvent) { - val consumedIncoming = incomingEvent.consume() - if (consumedIncoming == null) { - log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" } - return - } + val digestEvent = incomingEvent.consume() ?: return + if (digestEvent.eventType == Events.BaseInfoRead) { + if (!timeoutJobs.containsKey(digestEvent.referenceId())) { + timeoutScope.launch { + val expiryTime = (Instant.now().epochSecond + metadataTimeout) + val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC) + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) + log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } + delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis()) + coordinator!!.produceNewEvent( + MediaMetadataReceivedEvent( + metadata = EventMetadata( + referenceId = digestEvent.referenceId(), + derivedFromEventId = digestEvent.eventId(), + status = EventStatus.Skipped, + source = getProducerName() + ) + ) - val baseInfo = events.find { it.eventType == Events.BaseInfoRead}?.az()?.data - if (baseInfo == null) { - log.error { "BaseInfoEvent is null for referenceId: ${consumedIncoming.metadata.referenceId} on eventId: ${consumedIncoming.metadata.eventId}" } - return - } - - val estimatedTimeout = LocalDateTime.now().toEpochSeconds() + metadataTimeout - val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC) - - val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) - - waitingProcessesForMeta[consumedIncoming.metadata.referenceId] = - MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now()) - - log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } - } - } - - - @Scheduled(fixedDelay = (5_000)) - fun sendErrorMessageForMetadata() { - val expired = waitingProcessesForMeta.filter { - LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout) - } - expired.forEach { - log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } - if (coordinator == null) { - log.error { "Coordinator is null, not able to get timeout stored!" } - } - coordinator!!.produceNewEvent( - MediaMetadataReceivedEvent( - metadata = EventMetadata( - referenceId = it.key, - derivedFromEventId = it.value.eventId, - status = EventStatus.Skipped, - source = getProducerName() ) - ) - - ) - waitingProcessesForMeta.remove(it.key) + }.also { + timeoutJobs[digestEvent.referenceId()] = it + } + } else { + log.error { "Timeout for ${digestEvent.referenceId()} has already been set!" } + } + } else { + timeoutJobs.remove(digestEvent.referenceId())?.cancel() } - active = expired.isNotEmpty() } - data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime) } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt index 6af75810..87f63bf6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt @@ -7,6 +7,7 @@ import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.* import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats +import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly import java.io.File /** @@ -85,7 +86,7 @@ object CompletionValidator { if (OperationEvents.CONVERT in operations) { val convertWork = events.filter { it.eventType == Events.ConvertTaskCreated } val convertPerformed = events.filter { it.eventType == Events.ConvertTaskCompleted } - if (convertPerformed.size < convertWork.size) + if (convertPerformed.size < convertWork.size || (operations.isOnly(OperationEvents.CONVERT) && convertWork.isEmpty())) return false }