From 919339d306b32d3dc05ea84143f8628a06da2761 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Thu, 22 Jan 2026 21:16:21 +0100 Subject: [PATCH] Changes to puller logic --- .../events/EventPollerImplementation.kt | 67 +++++-- .../events/poller/PollerStartLoopTest.kt | 189 ++++++++++++++++++ .../eventi/events/poller/TestablePoller.kt | 17 +- 3 files changed, 254 insertions(+), 19 deletions(-) diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt index 4668c3a..3d5cfa8 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt @@ -7,6 +7,7 @@ import no.iktdev.eventi.ZDS.toEvent import no.iktdev.eventi.stores.EventStore import java.time.Duration import java.time.LocalDateTime +import java.util.UUID import kotlin.collections.iterator abstract class EventPollerImplementation( @@ -14,7 +15,12 @@ abstract class EventPollerImplementation( private val dispatchQueue: SequenceDispatchQueue, private val dispatcher: EventDispatcher ) { + // Erstatter ikke lastSeenTime, men supplerer den + protected val refWatermark = mutableMapOf() + + // lastSeenTime brukes kun som scan hint var lastSeenTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0) + open var backoff = Duration.ofSeconds(2) protected set private val maxBackoff = Duration.ofMinutes(1) @@ -35,45 +41,70 @@ abstract class EventPollerImplementation( suspend fun pollOnce() { val pollStartedAt = MyTime.UtcNow() - log.debug { "Polling for new events" } - val newPersisted = eventStore.getPersistedEventsAfter(lastSeenTime) + log.debug { "🔍 Polling for new events" } + + // Global scan hint: start fra laveste watermark + val scanFrom = refWatermark.values.minOrNull() ?: lastSeenTime + + val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) if (newPersisted.isEmpty()) { - log.debug { "No new events found. Backing off for $backoff" } + log.debug { "😴 No new events found. Backing off for $backoff" } delay(backoff.toMillis()) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) return } backoff = Duration.ofSeconds(2) + log.debug { "📬 Found ${newPersisted.size} new events" } val grouped = newPersisted.groupBy { it.referenceId } + var anyProcessed = false - // Samle persistedAt KUN for referanser vi faktisk dispatch’et - val processedTimes = mutableListOf() + for ((ref, eventsForRef) in grouped) { + val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0) - for ((referenceId, _) in grouped) { - if (dispatchQueue.isProcessing(referenceId)) { - log.debug { "Skipping dispatch for $referenceId as it is already being processed" } + // Finn kun nye events for denne ref’en + val newForRef = eventsForRef.filter { it.persistedAt > refSeen } + if (newForRef.isEmpty()) { + log.debug { "🧊 No new events for $ref since $refSeen" } continue } - val fullLog = eventStore.getPersistedEventsFor(referenceId) + // Hvis ref er busy → ikke oppdater watermark, ikke dispatch + if (dispatchQueue.isProcessing(ref)) { + log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" } + continue + } + + // Hent full sekvens for ref (Eventi-invariant) + val fullLog = eventStore.getPersistedEventsFor(ref) val events = fullLog.mapNotNull { it.toEvent() } - processedTimes += fullLog.map { it.persistedAt } - dispatchQueue.dispatch(referenceId, events, dispatcher) + + log.debug { "🚀 Dispatching ${events.size} events for $ref" } + dispatchQueue.dispatch(ref, events, dispatcher) + + // Oppdater watermark for denne ref’en + val maxPersistedAt = newForRef.maxOf { it.persistedAt } + val newWatermark = minOf(pollStartedAt, maxPersistedAt).plusNanos(1) + + refWatermark[ref] = newWatermark + anyProcessed = true + + log.debug { "⏩ Updated watermark for $ref → $newWatermark" } } - if (processedTimes.isNotEmpty()) { - val maxPersistedAt = processedTimes.max() - val newLastSeen = minOf(pollStartedAt, maxPersistedAt).plusNanos(1) - log.debug { "Updating lastSeenTime from $lastSeenTime to $newLastSeen" } - lastSeenTime = newLastSeen + // Oppdater global scan hint + if (anyProcessed) { + lastSeenTime = refWatermark.values.minOrNull() ?: lastSeenTime + log.debug { "📉 Global scanFrom updated → $lastSeenTime" } } else { - // Ingen referanser ble dispatch’et → IKKE oppdater lastSeenTime - log.debug { "No dispatches performed; lastSeenTime remains $lastSeenTime" } + log.debug { "🔁 No refs processed — global scanFrom unchanged ($lastSeenTime)" } } } + + + } diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt index 6eea18d..8a96c6d 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt @@ -9,6 +9,7 @@ import no.iktdev.eventi.events.TestEvent import no.iktdev.eventi.models.Metadata import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test import java.time.LocalDateTime import java.util.UUID @@ -22,6 +23,10 @@ class PollerStartLoopTest { private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue private lateinit var poller: TestablePoller + private fun t(seconds: Long): LocalDateTime = + LocalDateTime.of(2024, 1, 1, 12, 0).plusSeconds(seconds) + + @BeforeEach fun setup() { store = InMemoryEventStore() @@ -107,4 +112,188 @@ class PollerStartLoopTest { assertThat(dispatcher.dispatched).hasSize(1) } + + @Test + fun `poller does not dispatch when no new events for ref`() = runTest { + val ref = UUID.randomUUID() + + // E1 + persistAt(ref, t(0)) + + poller.startFor(iterations = 1) + assertThat(dispatcher.dispatched).hasSize(1) + + // Ingen nye events + poller.startFor(iterations = 3) + + // Fremdeles bare én dispatch + assertThat(dispatcher.dispatched).hasSize(1) + } + + @Test + fun `event arriving while ref is busy is not lost`() = runTest { + val ref = UUID.randomUUID() + + persistAt(ref, t(0)) + persistAt(ref, t(5)) + + // Første poll: dispatcher E1+E2 + poller.startFor(iterations = 1) + assertThat(dispatcher.dispatched).hasSize(1) + + // Marker ref som busy + queue.busyRefs += ref + + // E3 kommer mens ref er busy + persistAt(ref, t(10)) + + // Polleren skal IKKE dispatch’e nå + poller.startFor(iterations = 2) + assertThat(dispatcher.dispatched).hasSize(1) + + // Frigjør ref + queue.busyRefs.clear() + + // Nå skal E3 bli dispatch’et + poller.startFor(iterations = 1) + + assertThat(dispatcher.dispatched).hasSize(2) + val events = dispatcher.dispatched.last().second + assertThat(events).hasSize(3) + } + + @Test + fun `busy ref does not block dispatch of other refs`() = runTest { + val refA = UUID.randomUUID() + val refB = UUID.randomUUID() + + persistAt(refA, t(0)) + persistAt(refB, t(0)) + + // Marker A som busy + queue.busyRefs += refA + + poller.startFor(iterations = 1) + + // refA skal ikke dispatch’es + // refB skal dispatch’es + assertThat(dispatcher.dispatched).hasSize(1) + assertThat(dispatcher.dispatched.first().first).isEqualTo(refB) + } + + @Test + fun `watermark advances only for refs that were processed`() = runTest { + val refA = UUID.randomUUID() + val refB = UUID.randomUUID() + + persistAt(refA, t(0)) + persistAt(refB, t(0)) + + // Første poll: begge refs blir dispatch’et + poller.startFor(iterations = 1) + + val wmA1 = poller.watermarkFor(refA!!) + val wmB1 = poller.watermarkFor(refB!!) + + // Marker A som busy + queue.busyRefs += refA + + // Nye events for begge refs + persistAt(refA, t(10)) + persistAt(refB, t(10)) + + poller.startFor(iterations = 1) + + // A skal IKKE ha flyttet watermark + assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1) + + // B skal ha flyttet watermark + assertThat(poller.watermarkFor(refB)).isAfter(wmB1) + } + + @DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk") + @Test + fun `stress test with many refs random busy states and interleaved events`() = runTest { + val refs = List(50) { UUID.randomUUID() } + val eventCountPerRef = 20 + + // 1. Initial events + refs.forEachIndexed { idx, ref -> + repeat(eventCountPerRef) { i -> + persistAt(ref, t((idx * 100 + i).toLong())) + } + } + + // 2. Random busy refs + val busyRefs = refs.shuffled().take(10).toSet() + queue.busyRefs += busyRefs + + // 3. First poll: only non-busy refs dispatch + poller.startFor(iterations = 1) + + val dispatchedFirstRound = dispatcher.dispatched.groupBy { it.first } + val dispatchedRefsFirstRound = dispatchedFirstRound.keys + val expectedFirstRound = refs - busyRefs + + assertThat(dispatchedRefsFirstRound) + .containsExactlyInAnyOrder(*expectedFirstRound.toTypedArray()) + + // 4. Add new events for all refs + refs.forEachIndexed { idx, ref -> + persistAt(ref, t((10_000 + idx).toLong())) + } + + // 5. Second poll: only non-busy refs dispatch again + poller.startFor(iterations = 1) + + val dispatchedSecondRound = dispatcher.dispatched.groupBy { it.first } + val secondRoundCounts = dispatchedSecondRound.mapValues { (_, v) -> v.size } + + // Non-busy refs should now have 2 dispatches total + expectedFirstRound.forEach { ref -> + assertThat(secondRoundCounts[ref]).isEqualTo(2) + } + + // Busy refs should still have 0 dispatches + busyRefs.forEach { ref -> + assertThat(secondRoundCounts).doesNotContainKey(ref) + } + + // 6. Free busy refs + queue.busyRefs.clear() + + // 7. Third poll: busy refs dispatch their backlog + poller.startFor(iterations = 1) + + val dispatchedThirdRound = dispatcher.dispatched.groupBy { it.first } + val thirdRoundCounts = dispatchedThirdRound.mapValues { (_, v) -> v.size } + + refs.forEach { ref -> + if (ref in busyRefs) { + // Busy refs: 1 dispatch total (only in third poll) + assertThat(thirdRoundCounts[ref]).isEqualTo(1) + } else { + // Non-busy refs: 2 dispatches total (first + second) + assertThat(thirdRoundCounts[ref]).isEqualTo(2) + } + } + + // 8. No ref should have more than 2 dispatches (no spinning) + refs.forEach { ref -> + assertThat(thirdRoundCounts[ref]).isLessThanOrEqualTo(2) + } + + // 9. Verify all refs processed all unique events + refs.forEach { ref -> + val uniqueEvents = dispatchedThirdRound[ref]!! + .flatMap { it.second } + .distinctBy { it.eventId } + + assertThat(uniqueEvents).hasSize(eventCountPerRef + 1) + } + } + + + + } diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt index 3cd95f0..bda92f6 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt @@ -6,13 +6,17 @@ import no.iktdev.eventi.events.EventDispatcher import no.iktdev.eventi.events.EventPollerImplementation import no.iktdev.eventi.events.SequenceDispatchQueue import no.iktdev.eventi.stores.EventStore +import java.time.LocalDateTime +import java.util.UUID class TestablePoller( eventStore: EventStore, dispatchQueue: SequenceDispatchQueue, dispatcher: EventDispatcher, val scope: TestScope -) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher) { +) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView { + + suspend fun startFor(iterations: Int) { repeat(iterations) { @@ -26,4 +30,15 @@ class TestablePoller( scope.testScheduler.advanceTimeBy(backoff.toMillis()) } } + + override fun watermarkFor(ref: UUID): LocalDateTime? { + return refWatermark[ref]?.let { + return it + } + } + + +} +interface WatermarkDebugView { + fun watermarkFor(ref: UUID): LocalDateTime? }