This commit is contained in:
bskjon 2025-04-09 00:15:18 +02:00
parent 4dc54379d8
commit a36db258af
2 changed files with 61 additions and 34 deletions

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import com.google.gson.Gson
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson import no.iktdev.eventi.core.WGson
@ -40,23 +41,26 @@ class ConvertWorkTaskListener: WorkTaskListener() {
return false return false
} }
if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) {
return false
}
val producedEvents = events.filter { it.eventType == produceEvent } val producedEvents = events.filter { it.eventType == produceEvent }
val shouldIHandleAndProduce = producedEvents.none { it.derivedFromEventId() == incomingEvent.eventId() } val shouldIHandleAndProduce = producedEvents.none { it.derivedFromEventId() == incomingEvent.eventId() }
val extractedEvent = events.findFirstEventOf<ExtractWorkPerformedEvent>() val extractedEvent = events.findFirstEventOf<ExtractWorkPerformedEvent>()
if (extractedEvent?.isSuccessful() == true && shouldIHandleAndProduce) { if (extractedEvent?.isSuccessful() == true && shouldIHandleAndProduce) {
log.info { "Permitting handling of event: ${extractedEvent.data?.outputFile}" } log.info { "Permitting handling of event: ${extractedEvent.data?.outputFile}" }
} }
val startedWithOperations = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.operations ?: return false val startOperation = events.findFirstOf(Events.ProcessStarted)?.dataAs<MediaProcessStartEvent>()
if (startedWithOperations.isOnly(OperationEvents.CONVERT) && shouldIHandleAndProduce) { if (startOperation == null) {
log.error { "Could not find 'ProcessStarted' event" }
return false
}
if (startOperation.data?.operations?.isOnly(OperationEvents.CONVERT) == true) {
log.info { "StartOperation should only be Convert, ${WGson.toJson(startOperation)}" }
return true return true
} else {
return shouldIHandleAndProduce
} }
return shouldIHandleAndProduce
} }
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume() val event = incomingEvent.consume()

View File

@ -47,6 +47,11 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
Events.ProcessCompleted Events.ProcessCompleted
) )
private val internalFilter = listOf(
Events.BaseInfoRead,
Events.MetadataSearchPerformed,
Events.ProcessCompleted
)
val metadataTimeout = metadataTimeoutMinutes * 60 val metadataTimeout = metadataTimeoutMinutes * 60
@ -57,6 +62,12 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
return true return true
} }
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (!super.shouldIProcessAndHandleEvent(incomingEvent, events))
return false
return (events.any { it.eventType == Events.BaseInfoRead })
}
/** /**
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
*/ */
@ -65,41 +76,53 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
return return
} }
val searchPerformedEvent: MediaMetadataReceivedEvent? = events.findEventOf<MediaMetadataReceivedEvent>()
if (searchPerformedEvent != null) {
if (timeoutJobs.containsKey(searchPerformedEvent.referenceId())) {
val job = timeoutJobs.remove(searchPerformedEvent.referenceId())
job?.cancel()
}
}
val baseInfo = events.findFirstEventOf<BaseInfoEvent>() val baseInfo = events.findFirstEventOf<BaseInfoEvent>()
if (baseInfo?.isSuccessful() != true) { if (baseInfo?.isSuccessful() != true) {
return return
} }
val digestEvent = incomingEvent.consume() ?: return if (incomingEvent.isOfEvent(Events.BaseInfoRead)) {
if (digestEvent.eventType == Events.BaseInfoRead) { val digestEvent = incomingEvent.consume() ?: return
if (!timeoutJobs.containsKey(digestEvent.referenceId())) { if (timeoutJobs.containsKey(digestEvent.referenceId()))
timeoutScope.launch { return
val expiryTime = (Instant.now().epochSecond + metadataTimeout) val ttsc = timeoutScope.launch {
val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC) createTimeout(digestEvent.referenceId(), digestEvent.eventId(), baseInfo)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis())
coordinator!!.produceNewEvent(
MediaMetadataReceivedEvent(
metadata = EventMetadata(
referenceId = digestEvent.referenceId(),
derivedFromEventId = digestEvent.eventId(),
status = EventStatus.Skipped,
source = getProducerName()
)
)
)
}.also {
timeoutJobs[digestEvent.referenceId()] = it
}
} else {
log.error { "Timeout for ${digestEvent.referenceId()} has already been set!" }
} }
} else { timeoutJobs[digestEvent.referenceId()] = ttsc
timeoutJobs.remove(digestEvent.referenceId())?.cancel()
} }
} }
suspend fun createTimeout(referenceId: String, eventId: String, baseInfo: BaseInfoEvent) {
val expiryTime = (Instant.now().epochSecond + metadataTimeout)
val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis())
if (!this.isActive()) {
return
}
coordinator!!.produceNewEvent(
MediaMetadataReceivedEvent(
metadata = EventMetadata(
referenceId = referenceId,
derivedFromEventId = eventId,
status = EventStatus.Skipped,
source = getProducerName()
)
)
)
}
} }