Compare commits

..

No commits in common. "master" and "v1.0-rc38" have entirely different histories.

9 changed files with 76 additions and 119 deletions

View File

@ -8,30 +8,23 @@ import no.iktdev.eventi.stores.EventStore
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import kotlin.collections.iterator
abstract class EventPollerImplementation( abstract class EventPollerImplementation(
private val eventStore: EventStore, private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue, private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher private val dispatcher: EventDispatcher
) { ) {
private val log = KotlinLogging.logger {} // Erstatter ikke lastSeenTime, men supplerer den
protected val refWatermark = mutableMapOf<UUID, Instant>()
/** // lastSeenTime brukes kun som scan hint
* Per-reference watermark:
* - first = last seen persistedAt
* - second = last seen persistedId
*/
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
/**
* Global scan hint (timestamp only).
* Used to avoid scanning entire table every time.
*/
var lastSeenTime: Instant = Instant.EPOCH var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() { open suspend fun start() {
log.info { "EventPoller starting with initial backoff=$backoff" } log.info { "EventPoller starting with initial backoff=$backoff" }
@ -39,7 +32,7 @@ abstract class EventPollerImplementation(
try { try {
pollOnce() pollOnce()
} catch (e: Exception) { } catch (e: Exception) {
log.error(e) { "Error in poller loop" } e.printStackTrace()
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
} }
@ -50,11 +43,11 @@ abstract class EventPollerImplementation(
val pollStartedAt = MyTime.utcNow() val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" } log.debug { "🔍 Polling for new events" }
// Determine global scan start // Global scan hint: kombiner refWatermark og lastSeenTime
val minRefTs = refWatermark.values.minOfOrNull { it.first } val watermarkMin = refWatermark.values.minOrNull()
val scanFrom = when (minRefTs) { val scanFrom = when (watermarkMin) {
null -> lastSeenTime null -> lastSeenTime
else -> maxOf(lastSeenTime, minRefTs) else -> maxOf(lastSeenTime, watermarkMin)
} }
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
@ -66,73 +59,76 @@ abstract class EventPollerImplementation(
return return
} }
// Reset backoff // Vi har sett nye events globalt reset backoff
backoff = Duration.ofSeconds(2) backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" } log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false var anyProcessed = false
// Track highest persistedAt seen globally this round // Track høyeste persistedAt vi har sett i denne runden
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt } val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) { for ((ref, eventsForRef) in grouped) {
val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L) val refSeen = refWatermark[ref] ?: Instant.EPOCH
// Filter new events using (timestamp, id) ordering
val newForRef = eventsForRef.filter { ev ->
ev.persistedAt > refSeenAt ||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
}
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
if (newForRef.isEmpty()) { if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" } log.debug { "🧊 No new events for $ref since $refSeen" }
continue continue
} }
// If ref is busy, skip dispatch // Hvis ref er busy → ikke oppdater watermark, ikke dispatch
if (dispatchQueue.isProcessing(ref)) { if (dispatchQueue.isProcessing(ref)) {
log.debug { "$ref is busy — deferring ${newForRef.size} events" } log.debug { "$ref is busy — deferring ${newForRef.size} events" }
continue continue
} }
// Fetch full sequence for dispatch // Hent full sekvens for ref (Eventi-invariant)
val fullLog = eventStore.getPersistedEventsFor(ref) val fullLog = eventStore.getPersistedEventsFor(ref)
val events = fullLog.mapNotNull { it.toEvent() } val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "🚀 Dispatching ${events.size} events for $ref" } log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher) dispatchQueue.dispatch(ref, events, dispatcher)
// Update watermark for this reference // Oppdater watermark for denne refen
val maxEvent = newForRef.maxWith( val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
compareBy({ it.persistedAt }, { it.id }) val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
)
val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt) refWatermark[ref] = newWatermark
val newWatermarkId = maxEvent.id
refWatermark[ref] = newWatermarkAt to newWatermarkId
anyProcessed = true anyProcessed = true
log.debug { "⏩ Updated watermark for $ref($newWatermarkAt, id=$newWatermarkId)" } log.debug { "⏩ Updated watermark for $ref$newWatermark" }
} }
// Update global scan hint // Oppdater global scan hint uansett vi har sett nye events
// Dette hindrer livelock når alle events er <= watermark for sine refs
val newLastSeen = maxOf( val newLastSeen = maxOf(
lastSeenTime, lastSeenTime,
maxPersistedThisRound.plusNanos(1) maxPersistedThisRound.plusNanos(1)
) )
if (anyProcessed) { if (anyProcessed) {
val minRef = refWatermark.values.minOfOrNull { it.first } // Behold intensjonen din: globalt hint basert på laveste watermark,
lastSeenTime = when (minRef) { // men aldri gå bakover i tid ift lastSeenTime
val minRefWatermark = refWatermark.values.minOrNull()
lastSeenTime = when (minRefWatermark) {
null -> newLastSeen null -> newLastSeen
else -> maxOf(newLastSeen, minRef) else -> maxOf(newLastSeen, minRefWatermark)
} }
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" } log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
} else { } else {
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
// → trygt å flytte lastSeenTime forbi dem
lastSeenTime = newLastSeen lastSeenTime = newLastSeen
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" } log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
} }
} }
} }

View File

@ -4,7 +4,7 @@ import java.util.UUID
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
abstract class Event { abstract class Event {
lateinit var referenceId: UUID var referenceId: UUID = UUID.randomUUID()
protected set protected set
var eventId: UUID = UUID.randomUUID() var eventId: UUID = UUID.randomUUID()
private set private set

View File

@ -68,7 +68,9 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch { currentJob = getDispatcherForTask(task).launch {
try { try {
val result = onTask(task) val result = onTask(task)
reporter.markCompleted(task.taskId)
onComplete(task, result) onComplete(task, result)
} catch (e: CancellationException) { } catch (e: CancellationException) {
// Dette er en ekte kansellering // Dette er en ekte kansellering
onCancelled(task) onCancelled(task)

View File

@ -64,7 +64,7 @@ class EventDispatcherTest : TestBase() {
fun shouldProduceOneEventAndStop() { fun shouldProduceOneEventAndStop() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent().newReferenceId() val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val produced = eventStore.all().firstOrNull() val produced = eventStore.all().firstOrNull()
@ -87,7 +87,7 @@ class EventDispatcherTest : TestBase() {
fun shouldSkipAlreadyDerivedEvents() { fun shouldSkipAlreadyDerivedEvents() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent().newReferenceId() val trigger = TriggerEvent()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow()) val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
eventStore.persist(derived!!.toEvent()!!) // simulate prior production eventStore.persist(derived!!.toEvent()!!) // simulate prior production
@ -108,8 +108,8 @@ class EventDispatcherTest : TestBase() {
fun shouldPassFullContextToListener() { fun shouldPassFullContextToListener() {
val listener = ContextCapturingListener() val listener = ContextCapturingListener()
val e1 = TriggerEvent().newReferenceId() val e1 = TriggerEvent()
val e2 = OtherEvent().newReferenceId() val e2 = OtherEvent()
dispatcher.dispatch(e1.referenceId, listOf(e1, e2)) dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
assertEquals(2, listener.context.size) assertEquals(2, listener.context.size)
@ -124,11 +124,9 @@ class EventDispatcherTest : TestBase() {
""" """
) )
fun shouldBehaveDeterministicallyAcrossReplays() { fun shouldBehaveDeterministicallyAcrossReplays() {
val referenceId = UUID.randomUUID()
ProducingListener() ProducingListener()
val trigger = TriggerEvent().usingReferenceId(referenceId) val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() } val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
@ -146,8 +144,6 @@ class EventDispatcherTest : TestBase() {
""" """
) )
fun shouldNotDeliverDeletedEventsAsCandidates() { fun shouldNotDeliverDeletedEventsAsCandidates() {
val referenceId = UUID.randomUUID()
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
@ -158,10 +154,11 @@ class EventDispatcherTest : TestBase() {
} }
} }
// Original hendelse // Original hendelse
val original = TriggerEvent().usingReferenceId(referenceId) val original = TriggerEvent()
// Slettehendelse som peker på original // Slettehendelse som peker på original
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() } val deleted = object : DeleteEvent(original.eventId) {
}
// Dispatch med begge hendelser // Dispatch med begge hendelser
dispatcher.dispatch(original.referenceId, listOf(original, deleted)) dispatcher.dispatch(original.referenceId, listOf(original, deleted))
@ -187,8 +184,6 @@ class EventDispatcherTest : TestBase() {
) )
fun shouldDeliverDeleteEventToListenersThatReactToIt() { fun shouldDeliverDeleteEventToListenersThatReactToIt() {
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
val referenceId = UUID.randomUUID()
object : EventListener() { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
if (event is DeleteEvent) received += event if (event is DeleteEvent) received += event
@ -196,7 +191,7 @@ class EventDispatcherTest : TestBase() {
} }
} }
val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) } val deleted = object : DeleteEvent(UUID.randomUUID()) {}
dispatcher.dispatch(deleted.referenceId, listOf(deleted)) dispatcher.dispatch(deleted.referenceId, listOf(deleted))
assertTrue(received.contains(deleted)) assertTrue(received.contains(deleted))
@ -213,7 +208,7 @@ class EventDispatcherTest : TestBase() {
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() { fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent().newReferenceId() val trigger = TriggerEvent()
// Første dispatch: trigger produserer en DerivedEvent // Første dispatch: trigger produserer en DerivedEvent
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
@ -243,8 +238,8 @@ class EventDispatcherTest : TestBase() {
fun historyShouldExcludeDeletedEvents() { fun historyShouldExcludeDeletedEvents() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId() val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) } val deleted = object : DeleteEvent(original.eventId) {}
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()
@ -271,9 +266,9 @@ class EventDispatcherTest : TestBase() {
) )
fun historyShouldKeepNonDeletedEvents() { fun historyShouldKeepNonDeletedEvents() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val referenceId = UUID.randomUUID()
val e1 = TriggerEvent().usingReferenceId(referenceId) val e1 = TriggerEvent()
val e2 = OtherEvent().usingReferenceId(referenceId) val e2 = OtherEvent()
val deleted = object : DeleteEvent(e1.eventId) {} val deleted = object : DeleteEvent(e1.eventId) {}
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()
@ -303,8 +298,8 @@ class EventDispatcherTest : TestBase() {
fun deleteEventShouldBeDeliveredButHistoryEmpty() { fun deleteEventShouldBeDeliveredButHistoryEmpty() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId() val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() } val deleted = object : DeleteEvent(original.eventId) {}
var receivedEvent: Event? = null var receivedEvent: Event? = null
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()

View File

@ -39,7 +39,7 @@ class ZDSTest {
fun scenario1() { fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java) EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello").newReferenceId() val echo = EchoEvent("hello")
val persisted = echo.toPersisted(id = 1L) val persisted = echo.toPersisted(id = 1L)
val restored = persisted!!.toEvent() val restored = persisted!!.toEvent()

View File

@ -214,7 +214,7 @@ class EventPollerImplementationTest : TestBase() {
} }
} }
val original = EchoEvent("Hello").newReferenceId() val original = EchoEvent("Hello")
eventStore.persist(original) eventStore.persist(original)
poller.pollOnce() poller.pollOnce()

View File

@ -60,42 +60,6 @@ class PollerStartLoopTest : TestBase() {
store.persistAt(e, time) store.persistAt(e, time)
} }
@Test
@DisplayName("""
Når to events har identisk persistedAt
Hvis polleren kjører
skal begge events prosesseres og ingen mistes
""")
fun `poller handles same-timestamp events without losing any`() = runTest {
val ref = UUID.randomUUID()
val ts = Instant.parse("2025-01-01T12:00:00Z")
// Two events with same timestamp but different IDs
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e1, ts) // id=1
store.persistAt(e2, ts) // id=2
poller.startFor(iterations = 1)
// Verify dispatch happened
assertThat(dispatcher.dispatched).hasSize(1)
val (_, events) = dispatcher.dispatched.single()
// Both events must be present
assertThat(events.map { it.eventId })
.hasSize(2)
.doesNotHaveDuplicates()
// Watermark must reflect highest ID
val wm = poller.watermarkFor(ref)
assertThat(wm!!.first).isEqualTo(ts)
assertThat(wm.second).isEqualTo(2)
}
@Test @Test
@DisplayName(""" @DisplayName("""
Når polleren kjører flere iterasjoner uten events Når polleren kjører flere iterasjoner uten events
@ -307,15 +271,11 @@ class PollerStartLoopTest : TestBase() {
poller.startFor(iterations = 1) poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark // A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1) assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark (på timestamp-nivå) // B skal ha flyttet watermark
val wmB2 = poller.watermarkFor(refB) assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
} }
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk") @DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@ -473,7 +433,7 @@ class PollerStartLoopTest : TestBase() {
// Sett watermark høyt (polleren setter watermark selv i ekte drift, // Sett watermark høyt (polleren setter watermark selv i ekte drift,
// men i denne testen må vi simulere det) // men i denne testen må vi simulere det)
poller.setWatermarkFor(ref, t(100), id = 999) poller.setWatermarkFor(ref, t(100))
// Sett lastSeenTime bak eventen // Sett lastSeenTime bak eventen
poller.lastSeenTime = t(0) poller.lastSeenTime = t(0)

View File

@ -17,6 +17,8 @@ class TestablePoller(
val scope: TestScope val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView { ) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) { suspend fun startFor(iterations: Int) {
repeat(iterations) { repeat(iterations) {
try { try {
@ -30,17 +32,19 @@ class TestablePoller(
} }
} }
override fun watermarkFor(ref: UUID): Pair<Instant, Long>? { override fun watermarkFor(ref: UUID): Instant? {
return refWatermark[ref] return refWatermark[ref]?.let {
return it
}
} }
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) { override fun setWatermarkFor(ref: UUID, time: Instant) {
refWatermark[ref] = time to id refWatermark[ref] = time
} }
} }
interface WatermarkDebugView { interface WatermarkDebugView {
fun watermarkFor(ref: UUID): Pair<Instant, Long>? fun watermarkFor(ref: UUID): Instant?
fun setWatermarkFor(ref: UUID, time: Instant, id: Long) fun setWatermarkFor(ref: UUID, time: Instant)
} }

View File

@ -114,7 +114,7 @@ class TaskPollerImplementationTest : TestBase() {
val listener = EchoListener() val listener = EchoListener()
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() }) val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
taskStore.persist(task) taskStore.persist(task)
poller.pollOnce() poller.pollOnce()