Changed timeout + task complete

This commit is contained in:
bskjon 2025-04-01 19:49:27 +02:00
parent 6284db2bea
commit 48b7a0d2ad
4 changed files with 67 additions and 71 deletions

View File

@ -8,10 +8,9 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfo
import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfoEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -30,6 +29,17 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
return this::class.java.simpleName
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (!super.shouldIProcessAndHandleEvent(incomingEvent, events)) {
return false
}
val startedWith = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.operations;
if (startedWith?.isOnly(OperationEvents.CONVERT) == true) {
return false
}
return true
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {

View File

@ -28,7 +28,8 @@ class ConvertWorkTaskListener: WorkTaskListener() {
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.ConvertTaskCreated
override val listensForEvents: List<Events> = listOf(
Events.ExtractTaskCompleted
Events.ExtractTaskCompleted,
Events.ProcessStarted
)
override fun canProduceMultipleEvents(): Boolean {
@ -49,6 +50,10 @@ class ConvertWorkTaskListener: WorkTaskListener() {
if (shouldIHandleAndProduce) {
log.info { "Permitting handling of event: ${incomingEvent.dataAs<ExtractedData>()?.outputFile}" }
}
val startedWithOperations = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.operations ?: return false
if (startedWithOperations.isOnly(OperationEvents.CONVERT)) {
return true
}
return shouldIHandleAndProduce
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {

View File

@ -1,33 +1,33 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.eventi.data.*
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.eventi.database.toEpochSeconds
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.BaseInfoEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaMetadataReceivedEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.az
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.lastOrSuccessOf
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
import java.time.Duration
import java.time.Instant
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0
@Service
@EnableScheduling
class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
override fun getProducerName(): String {
@ -43,13 +43,19 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
override val produceEvent: Events = Events.MetadataSearchPerformed
override val listensForEvents: List<Events> = listOf(
Events.BaseInfoRead,
Events.MetadataSearchPerformed
Events.MetadataSearchPerformed,
Events.ProcessCompleted
)
val metadataTimeout = metadataTimeoutMinutes * 60
val waitingProcessesForMeta: MutableMap<String, MetadataTriggerData> = mutableMapOf()
private val timeoutScope = CoroutineScope(Dispatchers.Default)
val timeoutJobs = ConcurrentHashMap<String, Job>()
override fun shouldIHandleFailedEvents(incomingEvent: Event): Boolean {
return true
}
/**
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
@ -58,68 +64,42 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
if (metadataTimeoutMinutes <= 0) {
return
}
val hasReadBaseInfo = events.any { it.eventType == Events.BaseInfoRead && it.isSuccessful() }
val hasMetadataSearched = events.any { it.eventType == Events.MetadataSearchPerformed }
val hasPollerForMetadataEvent = waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)
if (!hasReadBaseInfo) {
val baseInfo = events.findFirstEventOf<BaseInfoEvent>()
if (baseInfo?.isSuccessful() != true) {
return
}
if (hasMetadataSearched) {
waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId)
return
} else if (!hasPollerForMetadataEvent) {
val consumedIncoming = incomingEvent.consume()
if (consumedIncoming == null) {
log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val digestEvent = incomingEvent.consume() ?: return
if (digestEvent.eventType == Events.BaseInfoRead) {
if (!timeoutJobs.containsKey(digestEvent.referenceId())) {
timeoutScope.launch {
val expiryTime = (Instant.now().epochSecond + metadataTimeout)
val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
log.info { "Sending ${baseInfo.data?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
delay(Duration.ofSeconds(metadataTimeout.toLong()).toMillis())
coordinator!!.produceNewEvent(
MediaMetadataReceivedEvent(
metadata = EventMetadata(
referenceId = digestEvent.referenceId(),
derivedFromEventId = digestEvent.eventId(),
status = EventStatus.Skipped,
source = getProducerName()
)
)
val baseInfo = events.find { it.eventType == Events.BaseInfoRead}?.az<BaseInfoEvent>()?.data
if (baseInfo == null) {
log.error { "BaseInfoEvent is null for referenceId: ${consumedIncoming.metadata.referenceId} on eventId: ${consumedIncoming.metadata.eventId}" }
return
}
val estimatedTimeout = LocalDateTime.now().toEpochSeconds() + metadataTimeout
val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
waitingProcessesForMeta[consumedIncoming.metadata.referenceId] =
MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now())
log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
}
}
@Scheduled(fixedDelay = (5_000))
fun sendErrorMessageForMetadata() {
val expired = waitingProcessesForMeta.filter {
LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout)
}
expired.forEach {
log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
if (coordinator == null) {
log.error { "Coordinator is null, not able to get timeout stored!" }
}
coordinator!!.produceNewEvent(
MediaMetadataReceivedEvent(
metadata = EventMetadata(
referenceId = it.key,
derivedFromEventId = it.value.eventId,
status = EventStatus.Skipped,
source = getProducerName()
)
)
)
waitingProcessesForMeta.remove(it.key)
}.also {
timeoutJobs[digestEvent.referenceId()] = it
}
} else {
log.error { "Timeout for ${digestEvent.referenceId()} has already been set!" }
}
} else {
timeoutJobs.remove(digestEvent.referenceId())?.cancel()
}
active = expired.isNotEmpty()
}
data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime)
}

View File

@ -7,6 +7,7 @@ import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly
import java.io.File
/**
@ -85,7 +86,7 @@ object CompletionValidator {
if (OperationEvents.CONVERT in operations) {
val convertWork = events.filter { it.eventType == Events.ConvertTaskCreated }
val convertPerformed = events.filter { it.eventType == Events.ConvertTaskCompleted }
if (convertPerformed.size < convertWork.size)
if (convertPerformed.size < convertWork.size || (operations.isOnly(OperationEvents.CONVERT) && convertWork.isEmpty()))
return false
}