Changes to timeout

This commit is contained in:
bskjon 2025-04-21 00:15:41 +02:00
parent 6ce7094119
commit 77ca12963c

View File

@ -24,12 +24,19 @@ import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 0 val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: -1
@Service @Service
class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
override fun onReady() {
super.onReady()
if (metadataTimeoutMinutes == 0) {
log.warn { "Metadata timeout is set to 0 minutes.. This will block proceeding until metadata is found.." }
}
}
override fun getProducerName(): String { override fun getProducerName(): String {
return this::class.java.simpleName return this::class.java.simpleName
} }
@ -83,10 +90,19 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event * This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
*/ */
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
if (metadataTimeoutMinutes <= 0) { if (metadataTimeoutMinutes <= -1) {
return log.info { "Metadata has no timeout, a timeout will be created.." }
val meta = incomingEvent.metadata()
onProduceEvent(MediaMetadataReceivedEvent(
metadata = meta.copy(
status = EventStatus.Failed,
source = getProducerName()
),
data = null
))
} }
val searchPerformedEvent: MediaMetadataReceivedEvent? = events.findEventOf<MediaMetadataReceivedEvent>() val searchPerformedEvent: MediaMetadataReceivedEvent? = events.findEventOf<MediaMetadataReceivedEvent>()
if (searchPerformedEvent != null) { if (searchPerformedEvent != null) {
@ -98,19 +114,17 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
val baseInfo = events.findFirstEventOf<BaseInfoEvent>() val baseInfo = events.findFirstEventOf<BaseInfoEvent>()
if (baseInfo?.isSuccessful() != true) { if (baseInfo?.isSuccessful() != true) {
return return
} }
if (incomingEvent.isOfEvent(Events.BaseInfoRead)) { if (incomingEvent.isOfEvent(Events.BaseInfoRead)) {
val digestEvent = incomingEvent.consume() ?: return if (timeoutJobs.containsKey(incomingEvent.metadata().referenceId))
if (timeoutJobs.containsKey(digestEvent.referenceId()))
return return
val ttsc = timeoutScope.launch { val ttsc = timeoutScope.launch {
createTimeout(digestEvent.referenceId(), digestEvent.eventId(), baseInfo) createTimeout(incomingEvent.metadata().referenceId, incomingEvent.metadata().eventId, baseInfo)
} }
timeoutJobs[digestEvent.referenceId()] = ttsc timeoutJobs[incomingEvent.metadata().referenceId] = ttsc
} }
} }