From 518f4726cf1466e3f02378ead6532ea94d67f7bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Thu, 22 Jan 2026 01:56:19 +0100 Subject: [PATCH] Logging --- build.gradle.kts | 1 + .../events/EventPollerImplementation.kt | 16 +++++++-- .../eventi/events/SequenceDispatchQueue.kt | 19 +++++++++-- .../eventi/tasks/TaskPollerImplementation.kt | 33 +++++++++++++++++-- 4 files changed, 61 insertions(+), 8 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 00d1a77..a31e80e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -20,6 +20,7 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.10.2") + implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") //testImplementation(kotlin("test")) testImplementation("org.assertj:assertj-core:3.4.1") diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt index 179663b..955a2b1 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt @@ -1,6 +1,7 @@ package no.iktdev.eventi.events import kotlinx.coroutines.delay +import mu.KotlinLogging import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.stores.EventStore import java.time.Duration @@ -16,11 +17,17 @@ abstract class EventPollerImplementation( open var backoff = Duration.ofSeconds(2) protected set private val maxBackoff = Duration.ofMinutes(1) - + private val log = KotlinLogging.logger {} open suspend fun start() { while (true) { - pollOnce() + try { + pollOnce() + } catch (e: Exception) { + e.printStackTrace() + delay(backoff.toMillis()) + backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) + } } } @@ -38,7 +45,10 @@ abstract class EventPollerImplementation( val grouped = newPersisted.groupBy { it.referenceId } for ((referenceId, _) in grouped) { - if (dispatchQueue.isProcessing(referenceId)) continue + if (dispatchQueue.isProcessing(referenceId)){ + log.debug { "Skipping dispatch for $referenceId as it is already being processed" } + continue + } val fullLog = eventStore.getPersistedEventsFor(referenceId) val events = fullLog.mapNotNull { it.toEvent() } diff --git a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt index 4d2d44f..3f2376f 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Semaphore +import mu.KotlinLogging import no.iktdev.eventi.models.Event import java.util.UUID import java.util.concurrent.ConcurrentHashMap @@ -21,24 +22,38 @@ class SequenceDispatchQueue( return scope } + private val log = KotlinLogging.logger {} + + fun isProcessing(referenceId: UUID): Boolean = referenceId in active fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job? { - if (!active.add(referenceId)) return null // already processing + if (!active.add(referenceId)) { + log.debug {"⚠️ Already processing $referenceId, skipping dispatch"} + return null + } + log.debug {"▶️ Starting dispatch for $referenceId with ${events.size} events"} + return scope.launch { try { + log.debug {"⏳ Waiting for semaphore for $referenceId"} semaphore.acquire() + log.debug {"🔓 Acquired semaphore for $referenceId"} + try { dispatcher.dispatch(referenceId, events) } catch (e: Exception) { - println("Dispatch failed for $referenceId: ${e.message}") + log.error("Dispatch failed for $referenceId: ${e.message}") e.printStackTrace() } finally { + semaphore.release() + log.debug {"✅ Released semaphore for $referenceId"} } } finally { active.remove(referenceId) + log.debug {"🏁 Finished dispatch for $referenceId"} } } } diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementation.kt index b56c33a..dcaaaa6 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementation.kt @@ -1,6 +1,7 @@ package no.iktdev.eventi.tasks import kotlinx.coroutines.delay +import mu.KotlinLogging import no.iktdev.eventi.ZDS.toTask import no.iktdev.eventi.models.Task import no.iktdev.eventi.stores.TaskStore @@ -10,42 +11,68 @@ abstract class TaskPollerImplementation( private val taskStore: TaskStore, private val reporterFactory: (Task) -> TaskReporter ) { + private val log = KotlinLogging.logger {} + open var backoff = Duration.ofSeconds(2) protected set private val maxBackoff = Duration.ofMinutes(1) open suspend fun start() { + log.info { "TaskPoller starting with initial backoff=$backoff" } while (true) { - pollOnce() + try { + pollOnce() + } catch (e: Exception) { + e.printStackTrace() + delay(backoff.toMillis()) + backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) + } } } suspend fun pollOnce() { + log.debug { "Polling for pending tasks…" } val newPersistedTasks = taskStore.getPendingTasks() if (newPersistedTasks.isEmpty()) { + log.debug { "No pending tasks found. Backing off for $backoff" } delay(backoff.toMillis()) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) return } + log.debug { "Found ${newPersistedTasks.size} persisted tasks" } + val tasks = newPersistedTasks.mapNotNull { it.toTask() } var acceptedAny = false for (task in tasks) { val listener = TaskListenerRegistry.getListeners().firstOrNull { it.supports(task) && !it.isBusy } ?: continue val claimed = taskStore.claim(task.taskId, listener.getWorkerId()) - if (!claimed) continue + if (!claimed) { + log.debug { "Task ${task.taskId} is already claimed by another worker" } + continue + } + + log.debug { "Task ${task.taskId} claimed by ${listener.getWorkerId()}" } val reporter = reporterFactory(task) - val accepted = listener.accept(task, reporter) + val accepted = try { + listener.accept(task, reporter) + } catch (e: Exception) { + log.error("Error while processing task ${task.taskId} by listener ${listener.getWorkerId()}: ${e.message}") + e.printStackTrace() + false + } acceptedAny = acceptedAny || accepted } if (!acceptedAny) { + log.debug { "No tasks were accepted. Backing off for $backoff" } delay(backoff.toMillis()) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) } else { + log.debug { "At least one task accepted. Resetting backoff." } backoff = Duration.ofSeconds(2) } }