Added watermark
This commit is contained in:
parent
c7395b883e
commit
ddf5c699cd
@ -43,8 +43,12 @@ abstract class EventPollerImplementation(
|
|||||||
val pollStartedAt = MyTime.UtcNow()
|
val pollStartedAt = MyTime.UtcNow()
|
||||||
log.debug { "🔍 Polling for new events" }
|
log.debug { "🔍 Polling for new events" }
|
||||||
|
|
||||||
// Global scan hint: start fra laveste watermark
|
// Global scan hint: kombiner refWatermark og lastSeenTime
|
||||||
val scanFrom = refWatermark.values.minOrNull() ?: lastSeenTime
|
val watermarkMin = refWatermark.values.minOrNull()
|
||||||
|
val scanFrom = when (watermarkMin) {
|
||||||
|
null -> lastSeenTime
|
||||||
|
else -> maxOf(lastSeenTime, watermarkMin)
|
||||||
|
}
|
||||||
|
|
||||||
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
||||||
|
|
||||||
@ -55,12 +59,16 @@ abstract class EventPollerImplementation(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Vi har sett nye events globalt – reset backoff
|
||||||
backoff = Duration.ofSeconds(2)
|
backoff = Duration.ofSeconds(2)
|
||||||
log.debug { "📬 Found ${newPersisted.size} new events" }
|
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
|
||||||
|
|
||||||
val grouped = newPersisted.groupBy { it.referenceId }
|
val grouped = newPersisted.groupBy { it.referenceId }
|
||||||
var anyProcessed = false
|
var anyProcessed = false
|
||||||
|
|
||||||
|
// Track høyeste persistedAt vi har sett i denne runden
|
||||||
|
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
|
||||||
|
|
||||||
for ((ref, eventsForRef) in grouped) {
|
for ((ref, eventsForRef) in grouped) {
|
||||||
val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0)
|
val refSeen = refWatermark[ref] ?: LocalDateTime.of(1970, 1, 1, 0, 0)
|
||||||
|
|
||||||
@ -85,8 +93,8 @@ abstract class EventPollerImplementation(
|
|||||||
dispatchQueue.dispatch(ref, events, dispatcher)
|
dispatchQueue.dispatch(ref, events, dispatcher)
|
||||||
|
|
||||||
// Oppdater watermark for denne ref’en
|
// Oppdater watermark for denne ref’en
|
||||||
val maxPersistedAt = newForRef.maxOf { it.persistedAt }
|
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
|
||||||
val newWatermark = minOf(pollStartedAt, maxPersistedAt).plusNanos(1)
|
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
|
||||||
|
|
||||||
refWatermark[ref] = newWatermark
|
refWatermark[ref] = newWatermark
|
||||||
anyProcessed = true
|
anyProcessed = true
|
||||||
@ -94,12 +102,27 @@ abstract class EventPollerImplementation(
|
|||||||
log.debug { "⏩ Updated watermark for $ref → $newWatermark" }
|
log.debug { "⏩ Updated watermark for $ref → $newWatermark" }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Oppdater global scan hint
|
// Oppdater global scan hint uansett – vi har sett nye events
|
||||||
|
// Dette hindrer livelock når alle events er <= watermark for sine refs
|
||||||
|
val newLastSeen = maxOf(
|
||||||
|
lastSeenTime,
|
||||||
|
maxPersistedThisRound.plusNanos(1)
|
||||||
|
)
|
||||||
|
|
||||||
if (anyProcessed) {
|
if (anyProcessed) {
|
||||||
lastSeenTime = refWatermark.values.minOrNull() ?: lastSeenTime
|
// Behold intensjonen din: globalt hint basert på laveste watermark,
|
||||||
log.debug { "📉 Global scanFrom updated → $lastSeenTime" }
|
// men aldri gå bakover i tid ift lastSeenTime
|
||||||
|
val minRefWatermark = refWatermark.values.minOrNull()
|
||||||
|
lastSeenTime = when (minRefWatermark) {
|
||||||
|
null -> newLastSeen
|
||||||
|
else -> maxOf(newLastSeen, minRefWatermark)
|
||||||
|
}
|
||||||
|
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
|
||||||
} else {
|
} else {
|
||||||
log.debug { "🔁 No refs processed — global scanFrom unchanged ($lastSeenTime)" }
|
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
|
||||||
|
// → trygt å flytte lastSeenTime forbi dem
|
||||||
|
lastSeenTime = newLastSeen
|
||||||
|
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,4 +130,5 @@ abstract class EventPollerImplementation(
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,11 +2,17 @@ package no.iktdev.eventi.events.poller
|
|||||||
|
|
||||||
import kotlinx.coroutines.test.*
|
import kotlinx.coroutines.test.*
|
||||||
import no.iktdev.eventi.InMemoryEventStore
|
import no.iktdev.eventi.InMemoryEventStore
|
||||||
|
import no.iktdev.eventi.TestBase
|
||||||
|
import no.iktdev.eventi.events.EventDispatcher
|
||||||
import no.iktdev.eventi.events.EventTypeRegistry
|
import no.iktdev.eventi.events.EventTypeRegistry
|
||||||
import no.iktdev.eventi.events.FakeDispatcher
|
import no.iktdev.eventi.events.FakeDispatcher
|
||||||
import no.iktdev.eventi.events.RunSimulationTestTest
|
import no.iktdev.eventi.events.RunSimulationTestTest
|
||||||
|
import no.iktdev.eventi.events.SequenceDispatchQueue
|
||||||
import no.iktdev.eventi.events.TestEvent
|
import no.iktdev.eventi.events.TestEvent
|
||||||
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Metadata
|
import no.iktdev.eventi.models.Metadata
|
||||||
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
|
import no.iktdev.eventi.stores.EventStore
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.BeforeEach
|
import org.junit.jupiter.api.BeforeEach
|
||||||
import org.junit.jupiter.api.DisplayName
|
import org.junit.jupiter.api.DisplayName
|
||||||
@ -14,7 +20,7 @@ import org.junit.jupiter.api.Test
|
|||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
class PollerStartLoopTest {
|
class PollerStartLoopTest: TestBase() {
|
||||||
|
|
||||||
private lateinit var store: InMemoryEventStore
|
private lateinit var store: InMemoryEventStore
|
||||||
private lateinit var dispatcher: FakeDispatcher
|
private lateinit var dispatcher: FakeDispatcher
|
||||||
@ -293,6 +299,71 @@ class PollerStartLoopTest {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun `poller should not livelock when global scan sees events but watermark rejects them`() = runTest {
|
||||||
|
val ref = UUID.randomUUID()
|
||||||
|
|
||||||
|
// Fake EventStore som alltid returnerer samme event
|
||||||
|
val fakeStore = object : EventStore {
|
||||||
|
override fun getPersistedEventsAfter(ts: LocalDateTime): List<PersistedEvent> {
|
||||||
|
// Alltid returner én event som ligger før watermark
|
||||||
|
return listOf(
|
||||||
|
PersistedEvent(
|
||||||
|
id = 1,
|
||||||
|
referenceId = ref,
|
||||||
|
eventId = UUID.randomUUID(),
|
||||||
|
event = "test",
|
||||||
|
data = """{"x":1}""",
|
||||||
|
persistedAt = t(50) // før watermark
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun getPersistedEventsFor(ref: UUID): List<PersistedEvent> {
|
||||||
|
return emptyList() // spiller ingen rolle
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun persist(event: Event) {
|
||||||
|
TODO("Not yet implemented")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val queue = SequenceDispatchQueue()
|
||||||
|
class NoopDispatcher : EventDispatcher(fakeStore) {
|
||||||
|
override fun dispatch(referenceId: UUID, events: List<Event>) {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
val dispatcher = NoopDispatcher()
|
||||||
|
|
||||||
|
val poller = TestablePoller(fakeStore, queue, dispatcher, scope)
|
||||||
|
|
||||||
|
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
|
||||||
|
// men i denne testen må vi simulere det)
|
||||||
|
poller.setWatermarkFor(ref, t(100))
|
||||||
|
|
||||||
|
// Sett lastSeenTime bak eventen
|
||||||
|
poller.lastSeenTime = t(0)
|
||||||
|
|
||||||
|
// Første poll: polleren ser eventet, men prosesserer ikke ref
|
||||||
|
poller.pollOnce()
|
||||||
|
|
||||||
|
// Fixen skal flytte lastSeenTime forbi eventen
|
||||||
|
assertThat(poller.lastSeenTime)
|
||||||
|
.isAfter(t(50))
|
||||||
|
|
||||||
|
// Andre poll: nå skal polleren IKKE spinne
|
||||||
|
val before = poller.lastSeenTime
|
||||||
|
poller.pollOnce()
|
||||||
|
val after = poller.lastSeenTime
|
||||||
|
|
||||||
|
assertThat(after).isEqualTo(before)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -37,8 +37,13 @@ class TestablePoller(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun setWatermarkFor(ref: UUID, time: LocalDateTime) {
|
||||||
|
refWatermark[ref] = time
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
interface WatermarkDebugView {
|
interface WatermarkDebugView {
|
||||||
fun watermarkFor(ref: UUID): LocalDateTime?
|
fun watermarkFor(ref: UUID): LocalDateTime?
|
||||||
|
fun setWatermarkFor(ref: UUID, time: LocalDateTime)
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user