From 1543ef6abaa95fe621bf1bb65048db97331db27f Mon Sep 17 00:00:00 2001 From: bskjon Date: Sun, 16 Mar 2025 14:10:24 +0100 Subject: [PATCH] Fix for stuck process --- .../ExtractWorkArgumentsTaskListener.kt | 5 +- .../shared/common/LogHelper.kt | 54 +++++++++++++++++++ .../shared/common/contract/data/Event.kt | 2 +- .../main/kotlin/no/iktdev/eventi/LogHelper.kt | 54 +++++++++++++++++++ .../implementations/EventCoordinator.kt | 9 +++- 5 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/LogHelper.kt create mode 100644 shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 78ed3fe2..4a2186ce 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -30,9 +30,12 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { ) override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { + val startEvent = events.findFirstEventOf() + val hasExtract = startEvent?.data?.operations?.contains(OperationEvents.EXTRACT) ?: false + val state = super.shouldIProcessAndHandleEvent(incomingEvent, events) val eventType = events.map { it.eventType } - return state && eventType.containsAll(listensForEvents) + return hasExtract && state && eventType.containsAll(listensForEvents) } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/LogHelper.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/LogHelper.kt new file mode 100644 index 00000000..a573494a --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/LogHelper.kt @@ -0,0 +1,54 @@ +package no.iktdev.mediaprocessing.shared.common + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +data class LogKey( + val referenceId: String, + val listener: String, + val eventType: String +) + +object EventDeadlockDetector { + + private val stuckEvents = ConcurrentHashMap() + private val suppressedEvents = ConcurrentHashMap() + private val threshold = 5 // Antall ganger før det anses som deadlock + private val resetInterval = 10L // Reset telling hvert 10. sekund + + init { + Executors.newScheduledThreadPool(1).scheduleAtFixedRate({ + stuckEvents.clear() + }, resetInterval, resetInterval, TimeUnit.SECONDS) + } + + fun detect(referenceId: String, listener: String, eventType: String): Boolean { + val key = LogKey(referenceId, listener, eventType) + + if (suppressedEvents[key] == true) { + return false + } + + val count = stuckEvents.merge(key, 1) { old, _ -> old + 1 } ?: 1 + + if (count > threshold) { + suppressedEvents[key] = true + onDeadlockDetected(key) + return false + } + + return true + } + + fun resolve(referenceId: String, listener: String, eventType: String) { + val key = LogKey(referenceId, listener, eventType) + stuckEvents.remove(key) + suppressedEvents.remove(key) + } + + private fun onDeadlockDetected(key: LogKey) { + println("🚨 Deadlock detected! ReferenceId=${key.referenceId}, Listener=${key.listener}, EventType=${key.eventType}") + // Her kan du f.eks. sende et varsel, restarte prosess, etc. + } +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/Event.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/Event.kt index e0824fd4..8005cc80 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/Event.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/Event.kt @@ -17,7 +17,7 @@ data class UnknownEvent( inline fun Event.az(): T? { return if (this !is T) { - //System.err.println("${this::class.java.name} is not a type of ${T::class.java.name}") + System.err.println("${this::class.java.name} is not a type of ${T::class.java.name}") null } else this } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt new file mode 100644 index 00000000..f12a6417 --- /dev/null +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt @@ -0,0 +1,54 @@ +package no.iktdev.eventi + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +data class LogKey( + val referenceId: String, + val listener: String, + val eventType: String +) + +object EventDeadlockDetector { + + private val stuckEvents = ConcurrentHashMap() + private val suppressedEvents = ConcurrentHashMap() + private val threshold = 5 // Antall ganger før det anses som deadlock + private val resetInterval = 10L // Reset telling hvert 10. sekund + + init { + Executors.newScheduledThreadPool(1).scheduleAtFixedRate({ + stuckEvents.clear() + }, resetInterval, resetInterval, TimeUnit.SECONDS) + } + + fun detect(referenceId: String, listener: String, eventType: String): Boolean { + val key = LogKey(referenceId, listener, eventType) + + if (suppressedEvents[key] == true) { + return false + } + + val count = stuckEvents.merge(key, 1) { old, _ -> old + 1 } ?: 1 + + if (count > threshold) { + suppressedEvents[key] = true + onDeadlockDetected(key) + return false + } + + return true + } + + fun resolve(referenceId: String, listener: String, eventType: String) { + val key = LogKey(referenceId, listener, eventType) + stuckEvents.remove(key) + suppressedEvents.remove(key) + } + + private fun onDeadlockDetected(key: LogKey) { + println("🚨 Deadlock detected! ReferenceId=${key.referenceId}, Listener=${key.listener}, EventType=${key.eventType}") + // Her kan du f.eks. sende et varsel, restarte prosess, etc. + } +} diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index 75f94305..f3d0f02c 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -2,6 +2,7 @@ package no.iktdev.eventi.implementations import kotlinx.coroutines.* import mu.KotlinLogging +import no.iktdev.eventi.EventDeadlockDetector import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.data.EventImpl import no.iktdev.eventi.data.referenceId @@ -109,7 +110,13 @@ abstract class EventCoordinator> { val consumableEvent = ConsumableEvent(event) listener.onEventsReceived(consumableEvent, events) if (consumableEvent.isConsumed) { - log.info { "Consumption detected for ${events.first().referenceId()} -> ${listener::class.java.simpleName} on event ${event.eventType}" } + // 🚨 Suppress logging hvis det er en deadlock + val referenceId = events.first().referenceId() + val listenerName = listener::class.java.simpleName + if (EventDeadlockDetector.detect(referenceId, listenerName, event.eventType.toString())) { + log.info { "Consumption detected for $referenceId -> $listenerName on event ${event.eventType}" } + EventDeadlockDetector.resolve(referenceId, listenerName, event.eventType.toString()) + } return@coroutineScope true } }