This commit is contained in:
Brage Skjønborg 2026-01-22 01:56:19 +01:00
parent 550636c076
commit 518f4726cf
4 changed files with 61 additions and 8 deletions

View File

@ -20,6 +20,7 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
//testImplementation(kotlin("test")) //testImplementation(kotlin("test"))
testImplementation("org.assertj:assertj-core:3.4.1") testImplementation("org.assertj:assertj-core:3.4.1")

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.events package no.iktdev.eventi.events
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import mu.KotlinLogging
import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.time.Duration import java.time.Duration
@ -16,11 +17,17 @@ abstract class EventPollerImplementation(
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() { open suspend fun start() {
while (true) { while (true) {
pollOnce() try {
pollOnce()
} catch (e: Exception) {
e.printStackTrace()
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
}
} }
} }
@ -38,7 +45,10 @@ abstract class EventPollerImplementation(
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
for ((referenceId, _) in grouped) { for ((referenceId, _) in grouped) {
if (dispatchQueue.isProcessing(referenceId)) continue if (dispatchQueue.isProcessing(referenceId)){
log.debug { "Skipping dispatch for $referenceId as it is already being processed" }
continue
}
val fullLog = eventStore.getPersistedEventsFor(referenceId) val fullLog = eventStore.getPersistedEventsFor(referenceId)
val events = fullLog.mapNotNull { it.toEvent() } val events = fullLog.mapNotNull { it.toEvent() }

View File

@ -6,6 +6,7 @@ import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Semaphore import kotlinx.coroutines.sync.Semaphore
import mu.KotlinLogging
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import java.util.UUID import java.util.UUID
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -21,24 +22,38 @@ class SequenceDispatchQueue(
return scope return scope
} }
private val log = KotlinLogging.logger {}
fun isProcessing(referenceId: UUID): Boolean = referenceId in active fun isProcessing(referenceId: UUID): Boolean = referenceId in active
fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job? { fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job? {
if (!active.add(referenceId)) return null // already processing if (!active.add(referenceId)) {
log.debug {"⚠️ Already processing $referenceId, skipping dispatch"}
return null
}
log.debug {"▶️ Starting dispatch for $referenceId with ${events.size} events"}
return scope.launch { return scope.launch {
try { try {
log.debug {"⏳ Waiting for semaphore for $referenceId"}
semaphore.acquire() semaphore.acquire()
log.debug {"🔓 Acquired semaphore for $referenceId"}
try { try {
dispatcher.dispatch(referenceId, events) dispatcher.dispatch(referenceId, events)
} catch (e: Exception) { } catch (e: Exception) {
println("Dispatch failed for $referenceId: ${e.message}") log.error("Dispatch failed for $referenceId: ${e.message}")
e.printStackTrace() e.printStackTrace()
} finally { } finally {
semaphore.release() semaphore.release()
log.debug {"✅ Released semaphore for $referenceId"}
} }
} finally { } finally {
active.remove(referenceId) active.remove(referenceId)
log.debug {"🏁 Finished dispatch for $referenceId"}
} }
} }
} }

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.tasks package no.iktdev.eventi.tasks
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import mu.KotlinLogging
import no.iktdev.eventi.ZDS.toTask import no.iktdev.eventi.ZDS.toTask
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.stores.TaskStore import no.iktdev.eventi.stores.TaskStore
@ -10,42 +11,68 @@ abstract class TaskPollerImplementation(
private val taskStore: TaskStore, private val taskStore: TaskStore,
private val reporterFactory: (Task) -> TaskReporter private val reporterFactory: (Task) -> TaskReporter
) { ) {
private val log = KotlinLogging.logger {}
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
open suspend fun start() { open suspend fun start() {
log.info { "TaskPoller starting with initial backoff=$backoff" }
while (true) { while (true) {
pollOnce() try {
pollOnce()
} catch (e: Exception) {
e.printStackTrace()
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
}
} }
} }
suspend fun pollOnce() { suspend fun pollOnce() {
log.debug { "Polling for pending tasks…" }
val newPersistedTasks = taskStore.getPendingTasks() val newPersistedTasks = taskStore.getPendingTasks()
if (newPersistedTasks.isEmpty()) { if (newPersistedTasks.isEmpty()) {
log.debug { "No pending tasks found. Backing off for $backoff" }
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
return return
} }
log.debug { "Found ${newPersistedTasks.size} persisted tasks" }
val tasks = newPersistedTasks.mapNotNull { it.toTask() } val tasks = newPersistedTasks.mapNotNull { it.toTask() }
var acceptedAny = false var acceptedAny = false
for (task in tasks) { for (task in tasks) {
val listener = TaskListenerRegistry.getListeners().firstOrNull { it.supports(task) && !it.isBusy } ?: continue val listener = TaskListenerRegistry.getListeners().firstOrNull { it.supports(task) && !it.isBusy } ?: continue
val claimed = taskStore.claim(task.taskId, listener.getWorkerId()) val claimed = taskStore.claim(task.taskId, listener.getWorkerId())
if (!claimed) continue if (!claimed) {
log.debug { "Task ${task.taskId} is already claimed by another worker" }
continue
}
log.debug { "Task ${task.taskId} claimed by ${listener.getWorkerId()}" }
val reporter = reporterFactory(task) val reporter = reporterFactory(task)
val accepted = listener.accept(task, reporter) val accepted = try {
listener.accept(task, reporter)
} catch (e: Exception) {
log.error("Error while processing task ${task.taskId} by listener ${listener.getWorkerId()}: ${e.message}")
e.printStackTrace()
false
}
acceptedAny = acceptedAny || accepted acceptedAny = acceptedAny || accepted
} }
if (!acceptedAny) { if (!acceptedAny) {
log.debug { "No tasks were accepted. Backing off for $backoff" }
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
} else { } else {
log.debug { "At least one task accepted. Resetting backoff." }
backoff = Duration.ofSeconds(2) backoff = Duration.ofSeconds(2)
} }
} }