Changes to puller logic

This commit is contained in:
Brage Skjønborg 2026-01-22 21:16:21 +01:00
parent 6d60c5f74c
commit 919339d306
3 changed files with 254 additions and 19 deletions

View File

@ -7,6 +7,7 @@ import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.time.Duration import java.time.Duration
import java.time.LocalDateTime import java.time.LocalDateTime
import java.util.UUID
import kotlin.collections.iterator import kotlin.collections.iterator
abstract class EventPollerImplementation( abstract class EventPollerImplementation(
@ -14,7 +15,12 @@ abstract class EventPollerImplementation(
private val dispatchQueue: SequenceDispatchQueue, private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher private val dispatcher: EventDispatcher
) { ) {
// Erstatter ikke lastSeenTime, men supplerer den
protected val refWatermark = mutableMapOf<UUID, LocalDateTime>()
// lastSeenTime brukes kun som scan hint
var lastSeenTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0) var lastSeenTime: LocalDateTime = LocalDateTime.of(1970, 1, 1, 0, 0)
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
@ -35,45 +41,70 @@ abstract class EventPollerImplementation(
suspend fun pollOnce() { suspend fun pollOnce() {
val pollStartedAt = MyTime.UtcNow() val pollStartedAt = MyTime.UtcNow()
log.debug { "Polling for new events" } log.debug { "🔍 Polling for new events" }
val newPersisted = eventStore.getPersistedEventsAfter(lastSeenTime)
// Global scan hint: start fra laveste watermark
val scanFrom = refWatermark.values.minOrNull() ?: lastSeenTime
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
if (newPersisted.isEmpty()) { if (newPersisted.isEmpty()) {
log.debug { "No new events found. Backing off for $backoff" } log.debug { "😴 No new events found. Backing off for $backoff" }
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
return return
} }
backoff = Duration.ofSeconds(2) backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events" }
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false
// Samle persistedAt KUN for referanser vi faktisk dispatchet for ((ref, eventsForRef) in grouped) {
val processedTimes = mutableListOf<LocalDateTime>() val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0)
for ((referenceId, _) in grouped) { // Finn kun nye events for denne refen
if (dispatchQueue.isProcessing(referenceId)) { val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
log.debug { "Skipping dispatch for $referenceId as it is already being processed" } if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since $refSeen" }
continue continue
} }
val fullLog = eventStore.getPersistedEventsFor(referenceId) // Hvis ref er busy → ikke oppdater watermark, ikke dispatch
val events = fullLog.mapNotNull { it.toEvent() } if (dispatchQueue.isProcessing(ref)) {
processedTimes += fullLog.map { it.persistedAt } log.debug { "$ref is busy — deferring ${newForRef.size} events" }
dispatchQueue.dispatch(referenceId, events, dispatcher) continue
} }
if (processedTimes.isNotEmpty()) { // Hent full sekvens for ref (Eventi-invariant)
val maxPersistedAt = processedTimes.max() val fullLog = eventStore.getPersistedEventsFor(ref)
val newLastSeen = minOf(pollStartedAt, maxPersistedAt).plusNanos(1) val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "Updating lastSeenTime from $lastSeenTime to $newLastSeen" }
lastSeenTime = newLastSeen log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher)
// Oppdater watermark for denne refen
val maxPersistedAt = newForRef.maxOf { it.persistedAt }
val newWatermark = minOf(pollStartedAt, maxPersistedAt).plusNanos(1)
refWatermark[ref] = newWatermark
anyProcessed = true
log.debug { "⏩ Updated watermark for $ref$newWatermark" }
}
// Oppdater global scan hint
if (anyProcessed) {
lastSeenTime = refWatermark.values.minOrNull() ?: lastSeenTime
log.debug { "📉 Global scanFrom updated → $lastSeenTime" }
} else { } else {
// Ingen referanser ble dispatchet → IKKE oppdater lastSeenTime log.debug { "🔁 No refs processed — global scanFrom unchanged ($lastSeenTime)" }
log.debug { "No dispatches performed; lastSeenTime remains $lastSeenTime" }
} }
} }
} }

View File

@ -9,6 +9,7 @@ import no.iktdev.eventi.events.TestEvent
import no.iktdev.eventi.models.Metadata import no.iktdev.eventi.models.Metadata
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.time.LocalDateTime import java.time.LocalDateTime
import java.util.UUID import java.util.UUID
@ -22,6 +23,10 @@ class PollerStartLoopTest {
private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue
private lateinit var poller: TestablePoller private lateinit var poller: TestablePoller
private fun t(seconds: Long): LocalDateTime =
LocalDateTime.of(2024, 1, 1, 12, 0).plusSeconds(seconds)
@BeforeEach @BeforeEach
fun setup() { fun setup() {
store = InMemoryEventStore() store = InMemoryEventStore()
@ -107,4 +112,188 @@ class PollerStartLoopTest {
assertThat(dispatcher.dispatched).hasSize(1) assertThat(dispatcher.dispatched).hasSize(1)
} }
@Test
fun `poller does not dispatch when no new events for ref`() = runTest {
val ref = UUID.randomUUID()
// E1
persistAt(ref, t(0))
poller.startFor(iterations = 1)
assertThat(dispatcher.dispatched).hasSize(1)
// Ingen nye events
poller.startFor(iterations = 3)
// Fremdeles bare én dispatch
assertThat(dispatcher.dispatched).hasSize(1)
}
@Test
fun `event arriving while ref is busy is not lost`() = runTest {
val ref = UUID.randomUUID()
persistAt(ref, t(0))
persistAt(ref, t(5))
// Første poll: dispatcher E1+E2
poller.startFor(iterations = 1)
assertThat(dispatcher.dispatched).hasSize(1)
// Marker ref som busy
queue.busyRefs += ref
// E3 kommer mens ref er busy
persistAt(ref, t(10))
// Polleren skal IKKE dispatche nå
poller.startFor(iterations = 2)
assertThat(dispatcher.dispatched).hasSize(1)
// Frigjør ref
queue.busyRefs.clear()
// Nå skal E3 bli dispatchet
poller.startFor(iterations = 1)
assertThat(dispatcher.dispatched).hasSize(2)
val events = dispatcher.dispatched.last().second
assertThat(events).hasSize(3)
}
@Test
fun `busy ref does not block dispatch of other refs`() = runTest {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
persistAt(refA, t(0))
persistAt(refB, t(0))
// Marker A som busy
queue.busyRefs += refA
poller.startFor(iterations = 1)
// refA skal ikke dispatches
// refB skal dispatches
assertThat(dispatcher.dispatched).hasSize(1)
assertThat(dispatcher.dispatched.first().first).isEqualTo(refB)
}
@Test
fun `watermark advances only for refs that were processed`() = runTest {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
persistAt(refA, t(0))
persistAt(refB, t(0))
// Første poll: begge refs blir dispatchet
poller.startFor(iterations = 1)
val wmA1 = poller.watermarkFor(refA!!)
val wmB1 = poller.watermarkFor(refB!!)
// Marker A som busy
queue.busyRefs += refA
// Nye events for begge refs
persistAt(refA, t(10))
persistAt(refB, t(10))
poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark
assertThat(poller.watermarkFor(refB)).isAfter(wmB1)
}
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@Test
fun `stress test with many refs random busy states and interleaved events`() = runTest {
val refs = List(50) { UUID.randomUUID() }
val eventCountPerRef = 20
// 1. Initial events
refs.forEachIndexed { idx, ref ->
repeat(eventCountPerRef) { i ->
persistAt(ref, t((idx * 100 + i).toLong()))
}
}
// 2. Random busy refs
val busyRefs = refs.shuffled().take(10).toSet()
queue.busyRefs += busyRefs
// 3. First poll: only non-busy refs dispatch
poller.startFor(iterations = 1)
val dispatchedFirstRound = dispatcher.dispatched.groupBy { it.first }
val dispatchedRefsFirstRound = dispatchedFirstRound.keys
val expectedFirstRound = refs - busyRefs
assertThat(dispatchedRefsFirstRound)
.containsExactlyInAnyOrder(*expectedFirstRound.toTypedArray())
// 4. Add new events for all refs
refs.forEachIndexed { idx, ref ->
persistAt(ref, t((10_000 + idx).toLong()))
}
// 5. Second poll: only non-busy refs dispatch again
poller.startFor(iterations = 1)
val dispatchedSecondRound = dispatcher.dispatched.groupBy { it.first }
val secondRoundCounts = dispatchedSecondRound.mapValues { (_, v) -> v.size }
// Non-busy refs should now have 2 dispatches total
expectedFirstRound.forEach { ref ->
assertThat(secondRoundCounts[ref]).isEqualTo(2)
}
// Busy refs should still have 0 dispatches
busyRefs.forEach { ref ->
assertThat(secondRoundCounts).doesNotContainKey(ref)
}
// 6. Free busy refs
queue.busyRefs.clear()
// 7. Third poll: busy refs dispatch their backlog
poller.startFor(iterations = 1)
val dispatchedThirdRound = dispatcher.dispatched.groupBy { it.first }
val thirdRoundCounts = dispatchedThirdRound.mapValues { (_, v) -> v.size }
refs.forEach { ref ->
if (ref in busyRefs) {
// Busy refs: 1 dispatch total (only in third poll)
assertThat(thirdRoundCounts[ref]).isEqualTo(1)
} else {
// Non-busy refs: 2 dispatches total (first + second)
assertThat(thirdRoundCounts[ref]).isEqualTo(2)
}
}
// 8. No ref should have more than 2 dispatches (no spinning)
refs.forEach { ref ->
assertThat(thirdRoundCounts[ref]).isLessThanOrEqualTo(2)
}
// 9. Verify all refs processed all unique events
refs.forEach { ref ->
val uniqueEvents = dispatchedThirdRound[ref]!!
.flatMap { it.second }
.distinctBy { it.eventId }
assertThat(uniqueEvents).hasSize(eventCountPerRef + 1)
}
}
} }

View File

@ -6,13 +6,17 @@ import no.iktdev.eventi.events.EventDispatcher
import no.iktdev.eventi.events.EventPollerImplementation import no.iktdev.eventi.events.EventPollerImplementation
import no.iktdev.eventi.events.SequenceDispatchQueue import no.iktdev.eventi.events.SequenceDispatchQueue
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.time.LocalDateTime
import java.util.UUID
class TestablePoller( class TestablePoller(
eventStore: EventStore, eventStore: EventStore,
dispatchQueue: SequenceDispatchQueue, dispatchQueue: SequenceDispatchQueue,
dispatcher: EventDispatcher, dispatcher: EventDispatcher,
val scope: TestScope val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher) { ) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) { suspend fun startFor(iterations: Int) {
repeat(iterations) { repeat(iterations) {
@ -26,4 +30,15 @@ class TestablePoller(
scope.testScheduler.advanceTimeBy(backoff.toMillis()) scope.testScheduler.advanceTimeBy(backoff.toMillis())
} }
} }
override fun watermarkFor(ref: UUID): LocalDateTime? {
return refWatermark[ref]?.let {
return it
}
}
}
interface WatermarkDebugView {
fun watermarkFor(ref: UUID): LocalDateTime?
} }