Compare commits

..

1 Commits

Author SHA1 Message Date
73f5a3d4da Offset id + persisted 2026-02-02 19:30:13 +01:00
4 changed files with 93 additions and 55 deletions

View File

@ -8,23 +8,30 @@ import no.iktdev.eventi.stores.EventStore
import java.time.Duration
import java.time.Instant
import java.util.UUID
import kotlin.collections.iterator
abstract class EventPollerImplementation(
private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher
) {
// Erstatter ikke lastSeenTime, men supplerer den
protected val refWatermark = mutableMapOf<UUID, Instant>()
private val log = KotlinLogging.logger {}
// lastSeenTime brukes kun som scan hint
/**
* Per-reference watermark:
* - first = last seen persistedAt
* - second = last seen persistedId
*/
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
/**
* Global scan hint (timestamp only).
* Used to avoid scanning entire table every time.
*/
var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2)
protected set
private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() {
log.info { "EventPoller starting with initial backoff=$backoff" }
@ -32,7 +39,7 @@ abstract class EventPollerImplementation(
try {
pollOnce()
} catch (e: Exception) {
e.printStackTrace()
log.error(e) { "Error in poller loop" }
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
}
@ -43,11 +50,11 @@ abstract class EventPollerImplementation(
val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" }
// Global scan hint: kombiner refWatermark og lastSeenTime
val watermarkMin = refWatermark.values.minOrNull()
val scanFrom = when (watermarkMin) {
// Determine global scan start
val minRefTs = refWatermark.values.minOfOrNull { it.first }
val scanFrom = when (minRefTs) {
null -> lastSeenTime
else -> maxOf(lastSeenTime, watermarkMin)
else -> maxOf(lastSeenTime, minRefTs)
}
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
@ -59,76 +66,73 @@ abstract class EventPollerImplementation(
return
}
// Vi har sett nye events globalt reset backoff
// Reset backoff
backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false
// Track høyeste persistedAt vi har sett i denne runden
// Track highest persistedAt seen globally this round
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) {
val refSeen = refWatermark[ref] ?: Instant.EPOCH
val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
// Filter new events using (timestamp, id) ordering
val newForRef = eventsForRef.filter { ev ->
ev.persistedAt > refSeenAt ||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
}
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since $refSeen" }
log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
continue
}
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch
// If ref is busy, skip dispatch
if (dispatchQueue.isProcessing(ref)) {
log.debug { "$ref is busy — deferring ${newForRef.size} events" }
continue
}
// Hent full sekvens for ref (Eventi-invariant)
// Fetch full sequence for dispatch
val fullLog = eventStore.getPersistedEventsFor(ref)
val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher)
// Oppdater watermark for denne refen
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
// Update watermark for this reference
val maxEvent = newForRef.maxWith(
compareBy({ it.persistedAt }, { it.id })
)
refWatermark[ref] = newWatermark
val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
val newWatermarkId = maxEvent.id
refWatermark[ref] = newWatermarkAt to newWatermarkId
anyProcessed = true
log.debug { "⏩ Updated watermark for $ref$newWatermark" }
log.debug { "⏩ Updated watermark for $ref($newWatermarkAt, id=$newWatermarkId)" }
}
// Oppdater global scan hint uansett vi har sett nye events
// Dette hindrer livelock når alle events er <= watermark for sine refs
// Update global scan hint
val newLastSeen = maxOf(
lastSeenTime,
maxPersistedThisRound.plusNanos(1)
)
if (anyProcessed) {
// Behold intensjonen din: globalt hint basert på laveste watermark,
// men aldri gå bakover i tid ift lastSeenTime
val minRefWatermark = refWatermark.values.minOrNull()
lastSeenTime = when (minRefWatermark) {
val minRef = refWatermark.values.minOfOrNull { it.first }
lastSeenTime = when (minRef) {
null -> newLastSeen
else -> maxOf(newLastSeen, minRefWatermark)
else -> maxOf(newLastSeen, minRef)
}
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
} else {
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
// → trygt å flytte lastSeenTime forbi dem
lastSeenTime = newLastSeen
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
}
}
}

View File

@ -68,9 +68,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch {
try {
val result = onTask(task)
reporter.markCompleted(task.taskId)
onComplete(task, result)
} catch (e: CancellationException) {
// Dette er en ekte kansellering
onCancelled(task)

View File

@ -60,6 +60,42 @@ class PollerStartLoopTest : TestBase() {
store.persistAt(e, time)
}
@Test
@DisplayName("""
Når to events har identisk persistedAt
Hvis polleren kjører
skal begge events prosesseres og ingen mistes
""")
fun `poller handles same-timestamp events without losing any`() = runTest {
val ref = UUID.randomUUID()
val ts = Instant.parse("2025-01-01T12:00:00Z")
// Two events with same timestamp but different IDs
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e1, ts) // id=1
store.persistAt(e2, ts) // id=2
poller.startFor(iterations = 1)
// Verify dispatch happened
assertThat(dispatcher.dispatched).hasSize(1)
val (_, events) = dispatcher.dispatched.single()
// Both events must be present
assertThat(events.map { it.eventId })
.hasSize(2)
.doesNotHaveDuplicates()
// Watermark must reflect highest ID
val wm = poller.watermarkFor(ref)
assertThat(wm!!.first).isEqualTo(ts)
assertThat(wm.second).isEqualTo(2)
}
@Test
@DisplayName("""
Når polleren kjører flere iterasjoner uten events
@ -271,11 +307,15 @@ class PollerStartLoopTest : TestBase() {
poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
// B skal ha flyttet watermark (på timestamp-nivå)
val wmB2 = poller.watermarkFor(refB)
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
}
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@ -433,7 +473,7 @@ class PollerStartLoopTest : TestBase() {
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
// men i denne testen må vi simulere det)
poller.setWatermarkFor(ref, t(100))
poller.setWatermarkFor(ref, t(100), id = 999)
// Sett lastSeenTime bak eventen
poller.lastSeenTime = t(0)

View File

@ -17,8 +17,6 @@ class TestablePoller(
val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) {
repeat(iterations) {
try {
@ -32,19 +30,17 @@ class TestablePoller(
}
}
override fun watermarkFor(ref: UUID): Instant? {
return refWatermark[ref]?.let {
return it
override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
return refWatermark[ref]
}
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
refWatermark[ref] = time to id
}
}
override fun setWatermarkFor(ref: UUID, time: Instant) {
refWatermark[ref] = time
}
}
interface WatermarkDebugView {
fun watermarkFor(ref: UUID): Instant?
fun setWatermarkFor(ref: UUID, time: Instant)
fun watermarkFor(ref: UUID): Pair<Instant, Long>?
fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
}