diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt index 8ea6808..8036cae 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt @@ -8,23 +8,30 @@ import no.iktdev.eventi.stores.EventStore import java.time.Duration import java.time.Instant import java.util.UUID -import kotlin.collections.iterator abstract class EventPollerImplementation( private val eventStore: EventStore, private val dispatchQueue: SequenceDispatchQueue, private val dispatcher: EventDispatcher ) { - // Erstatter ikke lastSeenTime, men supplerer den - protected val refWatermark = mutableMapOf() + private val log = KotlinLogging.logger {} - // lastSeenTime brukes kun som scan hint + /** + * Per-reference watermark: + * - first = last seen persistedAt + * - second = last seen persistedId + */ + protected val refWatermark = mutableMapOf>() + + /** + * Global scan hint (timestamp only). + * Used to avoid scanning entire table every time. + */ var lastSeenTime: Instant = Instant.EPOCH open var backoff = Duration.ofSeconds(2) protected set private val maxBackoff = Duration.ofMinutes(1) - private val log = KotlinLogging.logger {} open suspend fun start() { log.info { "EventPoller starting with initial backoff=$backoff" } @@ -32,7 +39,7 @@ abstract class EventPollerImplementation( try { pollOnce() } catch (e: Exception) { - e.printStackTrace() + log.error(e) { "Error in poller loop" } delay(backoff.toMillis()) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) } @@ -43,11 +50,11 @@ abstract class EventPollerImplementation( val pollStartedAt = MyTime.utcNow() log.debug { "🔍 Polling for new events" } - // Global scan hint: kombiner refWatermark og lastSeenTime - val watermarkMin = refWatermark.values.minOrNull() - val scanFrom = when (watermarkMin) { + // Determine global scan start + val minRefTs = refWatermark.values.minOfOrNull { it.first } + val scanFrom = when (minRefTs) { null -> lastSeenTime - else -> maxOf(lastSeenTime, watermarkMin) + else -> maxOf(lastSeenTime, minRefTs) } val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) @@ -59,76 +66,73 @@ abstract class EventPollerImplementation( return } - // Vi har sett nye events globalt – reset backoff + // Reset backoff backoff = Duration.ofSeconds(2) log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" } val grouped = newPersisted.groupBy { it.referenceId } var anyProcessed = false - // Track høyeste persistedAt vi har sett i denne runden + // Track highest persistedAt seen globally this round val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt } for ((ref, eventsForRef) in grouped) { - val refSeen = refWatermark[ref] ?: Instant.EPOCH + val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L) + + // Filter new events using (timestamp, id) ordering + val newForRef = eventsForRef.filter { ev -> + ev.persistedAt > refSeenAt || + (ev.persistedAt == refSeenAt && ev.id > refSeenId) + } - // Finn kun nye events for denne ref’en - val newForRef = eventsForRef.filter { it.persistedAt > refSeen } if (newForRef.isEmpty()) { - log.debug { "🧊 No new events for $ref since $refSeen" } + log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" } continue } - // Hvis ref er busy → ikke oppdater watermark, ikke dispatch + // If ref is busy, skip dispatch if (dispatchQueue.isProcessing(ref)) { log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" } continue } - // Hent full sekvens for ref (Eventi-invariant) + // Fetch full sequence for dispatch val fullLog = eventStore.getPersistedEventsFor(ref) val events = fullLog.mapNotNull { it.toEvent() } log.debug { "🚀 Dispatching ${events.size} events for $ref" } dispatchQueue.dispatch(ref, events, dispatcher) - // Oppdater watermark for denne ref’en - val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt } - val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1) + // Update watermark for this reference + val maxEvent = newForRef.maxWith( + compareBy({ it.persistedAt }, { it.id }) + ) - refWatermark[ref] = newWatermark + val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt) + val newWatermarkId = maxEvent.id + + refWatermark[ref] = newWatermarkAt to newWatermarkId anyProcessed = true - log.debug { "⏩ Updated watermark for $ref → $newWatermark" } + log.debug { "⏩ Updated watermark for $ref → ($newWatermarkAt, id=$newWatermarkId)" } } - // Oppdater global scan hint uansett – vi har sett nye events - // Dette hindrer livelock når alle events er <= watermark for sine refs + // Update global scan hint val newLastSeen = maxOf( lastSeenTime, maxPersistedThisRound.plusNanos(1) ) if (anyProcessed) { - // Behold intensjonen din: globalt hint basert på laveste watermark, - // men aldri gå bakover i tid ift lastSeenTime - val minRefWatermark = refWatermark.values.minOrNull() - lastSeenTime = when (minRefWatermark) { + val minRef = refWatermark.values.minOfOrNull { it.first } + lastSeenTime = when (minRef) { null -> newLastSeen - else -> maxOf(newLastSeen, minRefWatermark) + else -> maxOf(newLastSeen, minRef) } log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" } } else { - // Ingen refs prosessert, men vi vet at alle events vi så er <= watermark - // → trygt å flytte lastSeenTime forbi dem lastSeenTime = newLastSeen log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" } } } - - - - - - } diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index ce02b29..1bd4d6b 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -68,9 +68,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta currentJob = getDispatcherForTask(task).launch { try { val result = onTask(task) - reporter.markCompleted(task.taskId) onComplete(task, result) - } catch (e: CancellationException) { // Dette er en ekte kansellering onCancelled(task) diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt index 7a8f489..7f4dac0 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt @@ -60,6 +60,42 @@ class PollerStartLoopTest : TestBase() { store.persistAt(e, time) } + + @Test + @DisplayName(""" + Når to events har identisk persistedAt + Hvis polleren kjører + Så skal begge events prosesseres og ingen mistes + """) + fun `poller handles same-timestamp events without losing any`() = runTest { + val ref = UUID.randomUUID() + val ts = Instant.parse("2025-01-01T12:00:00Z") + + // Two events with same timestamp but different IDs + val e1 = TestEvent().withReference(ref).setMetadata(Metadata()) + val e2 = TestEvent().withReference(ref).setMetadata(Metadata()) + + store.persistAt(e1, ts) // id=1 + store.persistAt(e2, ts) // id=2 + + poller.startFor(iterations = 1) + + // Verify dispatch happened + assertThat(dispatcher.dispatched).hasSize(1) + + val (_, events) = dispatcher.dispatched.single() + + // Both events must be present + assertThat(events.map { it.eventId }) + .hasSize(2) + .doesNotHaveDuplicates() + + // Watermark must reflect highest ID + val wm = poller.watermarkFor(ref) + assertThat(wm!!.first).isEqualTo(ts) + assertThat(wm.second).isEqualTo(2) + } + @Test @DisplayName(""" Når polleren kjører flere iterasjoner uten events @@ -271,11 +307,15 @@ class PollerStartLoopTest : TestBase() { poller.startFor(iterations = 1) + + // A skal IKKE ha flyttet watermark assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1) - // B skal ha flyttet watermark - assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1) + // B skal ha flyttet watermark (på timestamp-nivå) + val wmB2 = poller.watermarkFor(refB) + assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first) + } @DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk") @@ -433,7 +473,7 @@ class PollerStartLoopTest : TestBase() { // Sett watermark høyt (polleren setter watermark selv i ekte drift, // men i denne testen må vi simulere det) - poller.setWatermarkFor(ref, t(100)) + poller.setWatermarkFor(ref, t(100), id = 999) // Sett lastSeenTime bak eventen poller.lastSeenTime = t(0) diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt index 4878cc2..06615fc 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt @@ -17,8 +17,6 @@ class TestablePoller( val scope: TestScope ) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView { - - suspend fun startFor(iterations: Int) { repeat(iterations) { try { @@ -32,19 +30,17 @@ class TestablePoller( } } - override fun watermarkFor(ref: UUID): Instant? { - return refWatermark[ref]?.let { - return it - } + override fun watermarkFor(ref: UUID): Pair? { + return refWatermark[ref] } - override fun setWatermarkFor(ref: UUID, time: Instant) { - refWatermark[ref] = time + override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) { + refWatermark[ref] = time to id } - - } + interface WatermarkDebugView { - fun watermarkFor(ref: UUID): Instant? - fun setWatermarkFor(ref: UUID, time: Instant) + fun watermarkFor(ref: UUID): Pair? + fun setWatermarkFor(ref: UUID, time: Instant, id: Long) } +