diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt index 3d5cfa8..9756a39 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt @@ -43,8 +43,12 @@ abstract class EventPollerImplementation( val pollStartedAt = MyTime.UtcNow() log.debug { "🔍 Polling for new events" } - // Global scan hint: start fra laveste watermark - val scanFrom = refWatermark.values.minOrNull() ?: lastSeenTime + // Global scan hint: kombiner refWatermark og lastSeenTime + val watermarkMin = refWatermark.values.minOrNull() + val scanFrom = when (watermarkMin) { + null -> lastSeenTime + else -> maxOf(lastSeenTime, watermarkMin) + } val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) @@ -55,12 +59,16 @@ abstract class EventPollerImplementation( return } + // Vi har sett nye events globalt – reset backoff backoff = Duration.ofSeconds(2) - log.debug { "📬 Found ${newPersisted.size} new events" } + log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" } val grouped = newPersisted.groupBy { it.referenceId } var anyProcessed = false + // Track høyeste persistedAt vi har sett i denne runden + val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt } + for ((ref, eventsForRef) in grouped) { val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0) @@ -85,8 +93,8 @@ abstract class EventPollerImplementation( dispatchQueue.dispatch(ref, events, dispatcher) // Oppdater watermark for denne ref’en - val maxPersistedAt = newForRef.maxOf { it.persistedAt } - val newWatermark = minOf(pollStartedAt, maxPersistedAt).plusNanos(1) + val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt } + val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1) refWatermark[ref] = newWatermark anyProcessed = true @@ -94,12 +102,27 @@ abstract class EventPollerImplementation( log.debug { "⏩ Updated watermark for $ref → $newWatermark" } } - // Oppdater global scan hint + // Oppdater global scan hint uansett – vi har sett nye events + // Dette hindrer livelock når alle events er <= watermark for sine refs + val newLastSeen = maxOf( + lastSeenTime, + maxPersistedThisRound.plusNanos(1) + ) + if (anyProcessed) { - lastSeenTime = refWatermark.values.minOrNull() ?: lastSeenTime - log.debug { "📉 Global scanFrom updated → $lastSeenTime" } + // Behold intensjonen din: globalt hint basert på laveste watermark, + // men aldri gå bakover i tid ift lastSeenTime + val minRefWatermark = refWatermark.values.minOrNull() + lastSeenTime = when (minRefWatermark) { + null -> newLastSeen + else -> maxOf(newLastSeen, minRefWatermark) + } + log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" } } else { - log.debug { "🔁 No refs processed — global scanFrom unchanged ($lastSeenTime)" } + // Ingen refs prosessert, men vi vet at alle events vi så er <= watermark + // → trygt å flytte lastSeenTime forbi dem + lastSeenTime = newLastSeen + log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" } } } @@ -107,4 +130,5 @@ abstract class EventPollerImplementation( + } diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt index 8a96c6d..d2457e1 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt @@ -2,11 +2,17 @@ package no.iktdev.eventi.events.poller import kotlinx.coroutines.test.* import no.iktdev.eventi.InMemoryEventStore +import no.iktdev.eventi.TestBase +import no.iktdev.eventi.events.EventDispatcher import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.FakeDispatcher import no.iktdev.eventi.events.RunSimulationTestTest +import no.iktdev.eventi.events.SequenceDispatchQueue import no.iktdev.eventi.events.TestEvent +import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Metadata +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.eventi.stores.EventStore import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName @@ -14,7 +20,7 @@ import org.junit.jupiter.api.Test import java.time.LocalDateTime import java.util.UUID -class PollerStartLoopTest { +class PollerStartLoopTest: TestBase() { private lateinit var store: InMemoryEventStore private lateinit var dispatcher: FakeDispatcher @@ -293,6 +299,71 @@ class PollerStartLoopTest { } } + @Test + fun `poller should not livelock when global scan sees events but watermark rejects them`() = runTest { + val ref = UUID.randomUUID() + + // Fake EventStore som alltid returnerer samme event + val fakeStore = object : EventStore { + override fun getPersistedEventsAfter(ts: LocalDateTime): List { + // Alltid returner én event som ligger før watermark + return listOf( + PersistedEvent( + id = 1, + referenceId = ref, + eventId = UUID.randomUUID(), + event = "test", + data = """{"x":1}""", + persistedAt = t(50) // før watermark + ) + ) + } + + override fun getPersistedEventsFor(ref: UUID): List { + return emptyList() // spiller ingen rolle + } + + override fun persist(event: Event) { + TODO("Not yet implemented") + } + } + + val queue = SequenceDispatchQueue() + class NoopDispatcher : EventDispatcher(fakeStore) { + override fun dispatch(referenceId: UUID, events: List) { + // Do nothing + } + } + + + val dispatcher = NoopDispatcher() + + val poller = TestablePoller(fakeStore, queue, dispatcher, scope) + + // Sett watermark høyt (polleren setter watermark selv i ekte drift, + // men i denne testen må vi simulere det) + poller.setWatermarkFor(ref, t(100)) + + // Sett lastSeenTime bak eventen + poller.lastSeenTime = t(0) + + // Første poll: polleren ser eventet, men prosesserer ikke ref + poller.pollOnce() + + // Fixen skal flytte lastSeenTime forbi eventen + assertThat(poller.lastSeenTime) + .isAfter(t(50)) + + // Andre poll: nå skal polleren IKKE spinne + val before = poller.lastSeenTime + poller.pollOnce() + val after = poller.lastSeenTime + + assertThat(after).isEqualTo(before) + } + + + diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt index bda92f6..8ec840b 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt @@ -37,8 +37,13 @@ class TestablePoller( } } + override fun setWatermarkFor(ref: UUID, time: LocalDateTime) { + refWatermark[ref] = time + } + } interface WatermarkDebugView { fun watermarkFor(ref: UUID): LocalDateTime? + fun setWatermarkFor(ref: UUID, time: LocalDateTime) }