Fix for stuck process
This commit is contained in:
parent
7a5c872ab0
commit
1543ef6aba
@ -30,9 +30,12 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
|
|||||||
)
|
)
|
||||||
|
|
||||||
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
|
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
|
||||||
|
val startEvent = events.findFirstEventOf<MediaProcessStartEvent>()
|
||||||
|
val hasExtract = startEvent?.data?.operations?.contains(OperationEvents.EXTRACT) ?: false
|
||||||
|
|
||||||
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
|
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
|
||||||
val eventType = events.map { it.eventType }
|
val eventType = events.map { it.eventType }
|
||||||
return state && eventType.containsAll(listensForEvents)
|
return hasExtract && state && eventType.containsAll(listensForEvents)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
|
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
|
||||||
|
|||||||
@ -0,0 +1,54 @@
|
|||||||
|
package no.iktdev.mediaprocessing.shared.common
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
data class LogKey(
|
||||||
|
val referenceId: String,
|
||||||
|
val listener: String,
|
||||||
|
val eventType: String
|
||||||
|
)
|
||||||
|
|
||||||
|
object EventDeadlockDetector {
|
||||||
|
|
||||||
|
private val stuckEvents = ConcurrentHashMap<LogKey, Int>()
|
||||||
|
private val suppressedEvents = ConcurrentHashMap<LogKey, Boolean>()
|
||||||
|
private val threshold = 5 // Antall ganger før det anses som deadlock
|
||||||
|
private val resetInterval = 10L // Reset telling hvert 10. sekund
|
||||||
|
|
||||||
|
init {
|
||||||
|
Executors.newScheduledThreadPool(1).scheduleAtFixedRate({
|
||||||
|
stuckEvents.clear()
|
||||||
|
}, resetInterval, resetInterval, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun detect(referenceId: String, listener: String, eventType: String): Boolean {
|
||||||
|
val key = LogKey(referenceId, listener, eventType)
|
||||||
|
|
||||||
|
if (suppressedEvents[key] == true) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
val count = stuckEvents.merge(key, 1) { old, _ -> old + 1 } ?: 1
|
||||||
|
|
||||||
|
if (count > threshold) {
|
||||||
|
suppressedEvents[key] = true
|
||||||
|
onDeadlockDetected(key)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
fun resolve(referenceId: String, listener: String, eventType: String) {
|
||||||
|
val key = LogKey(referenceId, listener, eventType)
|
||||||
|
stuckEvents.remove(key)
|
||||||
|
suppressedEvents.remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun onDeadlockDetected(key: LogKey) {
|
||||||
|
println("🚨 Deadlock detected! ReferenceId=${key.referenceId}, Listener=${key.listener}, EventType=${key.eventType}")
|
||||||
|
// Her kan du f.eks. sende et varsel, restarte prosess, etc.
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -17,7 +17,7 @@ data class UnknownEvent(
|
|||||||
|
|
||||||
inline fun <reified T: Event> Event.az(): T? {
|
inline fun <reified T: Event> Event.az(): T? {
|
||||||
return if (this !is T) {
|
return if (this !is T) {
|
||||||
//System.err.println("${this::class.java.name} is not a type of ${T::class.java.name}")
|
System.err.println("${this::class.java.name} is not a type of ${T::class.java.name}")
|
||||||
null
|
null
|
||||||
} else this
|
} else this
|
||||||
}
|
}
|
||||||
|
|||||||
54
shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt
Normal file
54
shared/eventi/src/main/kotlin/no/iktdev/eventi/LogHelper.kt
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package no.iktdev.eventi
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
data class LogKey(
|
||||||
|
val referenceId: String,
|
||||||
|
val listener: String,
|
||||||
|
val eventType: String
|
||||||
|
)
|
||||||
|
|
||||||
|
object EventDeadlockDetector {
|
||||||
|
|
||||||
|
private val stuckEvents = ConcurrentHashMap<LogKey, Int>()
|
||||||
|
private val suppressedEvents = ConcurrentHashMap<LogKey, Boolean>()
|
||||||
|
private val threshold = 5 // Antall ganger før det anses som deadlock
|
||||||
|
private val resetInterval = 10L // Reset telling hvert 10. sekund
|
||||||
|
|
||||||
|
init {
|
||||||
|
Executors.newScheduledThreadPool(1).scheduleAtFixedRate({
|
||||||
|
stuckEvents.clear()
|
||||||
|
}, resetInterval, resetInterval, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun detect(referenceId: String, listener: String, eventType: String): Boolean {
|
||||||
|
val key = LogKey(referenceId, listener, eventType)
|
||||||
|
|
||||||
|
if (suppressedEvents[key] == true) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
val count = stuckEvents.merge(key, 1) { old, _ -> old + 1 } ?: 1
|
||||||
|
|
||||||
|
if (count > threshold) {
|
||||||
|
suppressedEvents[key] = true
|
||||||
|
onDeadlockDetected(key)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
fun resolve(referenceId: String, listener: String, eventType: String) {
|
||||||
|
val key = LogKey(referenceId, listener, eventType)
|
||||||
|
stuckEvents.remove(key)
|
||||||
|
suppressedEvents.remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun onDeadlockDetected(key: LogKey) {
|
||||||
|
println("🚨 Deadlock detected! ReferenceId=${key.referenceId}, Listener=${key.listener}, EventType=${key.eventType}")
|
||||||
|
// Her kan du f.eks. sende et varsel, restarte prosess, etc.
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package no.iktdev.eventi.implementations
|
|||||||
|
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
|
import no.iktdev.eventi.EventDeadlockDetector
|
||||||
import no.iktdev.eventi.core.ConsumableEvent
|
import no.iktdev.eventi.core.ConsumableEvent
|
||||||
import no.iktdev.eventi.data.EventImpl
|
import no.iktdev.eventi.data.EventImpl
|
||||||
import no.iktdev.eventi.data.referenceId
|
import no.iktdev.eventi.data.referenceId
|
||||||
@ -109,7 +110,13 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
|||||||
val consumableEvent = ConsumableEvent(event)
|
val consumableEvent = ConsumableEvent(event)
|
||||||
listener.onEventsReceived(consumableEvent, events)
|
listener.onEventsReceived(consumableEvent, events)
|
||||||
if (consumableEvent.isConsumed) {
|
if (consumableEvent.isConsumed) {
|
||||||
log.info { "Consumption detected for ${events.first().referenceId()} -> ${listener::class.java.simpleName} on event ${event.eventType}" }
|
// 🚨 Suppress logging hvis det er en deadlock
|
||||||
|
val referenceId = events.first().referenceId()
|
||||||
|
val listenerName = listener::class.java.simpleName
|
||||||
|
if (EventDeadlockDetector.detect(referenceId, listenerName, event.eventType.toString())) {
|
||||||
|
log.info { "Consumption detected for $referenceId -> $listenerName on event ${event.eventType}" }
|
||||||
|
EventDeadlockDetector.resolve(referenceId, listenerName, event.eventType.toString())
|
||||||
|
}
|
||||||
return@coroutineScope true
|
return@coroutineScope true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user