Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 73f5a3d4da | |||
| 85e1e805b7 | |||
| 5082571ce8 |
@ -8,23 +8,30 @@ import no.iktdev.eventi.stores.EventStore
|
|||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
import kotlin.collections.iterator
|
|
||||||
|
|
||||||
abstract class EventPollerImplementation(
|
abstract class EventPollerImplementation(
|
||||||
private val eventStore: EventStore,
|
private val eventStore: EventStore,
|
||||||
private val dispatchQueue: SequenceDispatchQueue,
|
private val dispatchQueue: SequenceDispatchQueue,
|
||||||
private val dispatcher: EventDispatcher
|
private val dispatcher: EventDispatcher
|
||||||
) {
|
) {
|
||||||
// Erstatter ikke lastSeenTime, men supplerer den
|
private val log = KotlinLogging.logger {}
|
||||||
protected val refWatermark = mutableMapOf<UUID, Instant>()
|
|
||||||
|
|
||||||
// lastSeenTime brukes kun som scan hint
|
/**
|
||||||
|
* Per-reference watermark:
|
||||||
|
* - first = last seen persistedAt
|
||||||
|
* - second = last seen persistedId
|
||||||
|
*/
|
||||||
|
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global scan hint (timestamp only).
|
||||||
|
* Used to avoid scanning entire table every time.
|
||||||
|
*/
|
||||||
var lastSeenTime: Instant = Instant.EPOCH
|
var lastSeenTime: Instant = Instant.EPOCH
|
||||||
|
|
||||||
open var backoff = Duration.ofSeconds(2)
|
open var backoff = Duration.ofSeconds(2)
|
||||||
protected set
|
protected set
|
||||||
private val maxBackoff = Duration.ofMinutes(1)
|
private val maxBackoff = Duration.ofMinutes(1)
|
||||||
private val log = KotlinLogging.logger {}
|
|
||||||
|
|
||||||
open suspend fun start() {
|
open suspend fun start() {
|
||||||
log.info { "EventPoller starting with initial backoff=$backoff" }
|
log.info { "EventPoller starting with initial backoff=$backoff" }
|
||||||
@ -32,7 +39,7 @@ abstract class EventPollerImplementation(
|
|||||||
try {
|
try {
|
||||||
pollOnce()
|
pollOnce()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
e.printStackTrace()
|
log.error(e) { "Error in poller loop" }
|
||||||
delay(backoff.toMillis())
|
delay(backoff.toMillis())
|
||||||
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
||||||
}
|
}
|
||||||
@ -43,11 +50,11 @@ abstract class EventPollerImplementation(
|
|||||||
val pollStartedAt = MyTime.utcNow()
|
val pollStartedAt = MyTime.utcNow()
|
||||||
log.debug { "🔍 Polling for new events" }
|
log.debug { "🔍 Polling for new events" }
|
||||||
|
|
||||||
// Global scan hint: kombiner refWatermark og lastSeenTime
|
// Determine global scan start
|
||||||
val watermarkMin = refWatermark.values.minOrNull()
|
val minRefTs = refWatermark.values.minOfOrNull { it.first }
|
||||||
val scanFrom = when (watermarkMin) {
|
val scanFrom = when (minRefTs) {
|
||||||
null -> lastSeenTime
|
null -> lastSeenTime
|
||||||
else -> maxOf(lastSeenTime, watermarkMin)
|
else -> maxOf(lastSeenTime, minRefTs)
|
||||||
}
|
}
|
||||||
|
|
||||||
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
||||||
@ -59,76 +66,73 @@ abstract class EventPollerImplementation(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Vi har sett nye events globalt – reset backoff
|
// Reset backoff
|
||||||
backoff = Duration.ofSeconds(2)
|
backoff = Duration.ofSeconds(2)
|
||||||
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
|
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
|
||||||
|
|
||||||
val grouped = newPersisted.groupBy { it.referenceId }
|
val grouped = newPersisted.groupBy { it.referenceId }
|
||||||
var anyProcessed = false
|
var anyProcessed = false
|
||||||
|
|
||||||
// Track høyeste persistedAt vi har sett i denne runden
|
// Track highest persistedAt seen globally this round
|
||||||
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
|
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
|
||||||
|
|
||||||
for ((ref, eventsForRef) in grouped) {
|
for ((ref, eventsForRef) in grouped) {
|
||||||
val refSeen = refWatermark[ref] ?: Instant.EPOCH
|
val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
|
||||||
|
|
||||||
|
// Filter new events using (timestamp, id) ordering
|
||||||
|
val newForRef = eventsForRef.filter { ev ->
|
||||||
|
ev.persistedAt > refSeenAt ||
|
||||||
|
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
|
||||||
|
}
|
||||||
|
|
||||||
// Finn kun nye events for denne ref’en
|
|
||||||
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
|
|
||||||
if (newForRef.isEmpty()) {
|
if (newForRef.isEmpty()) {
|
||||||
log.debug { "🧊 No new events for $ref since $refSeen" }
|
log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch
|
// If ref is busy, skip dispatch
|
||||||
if (dispatchQueue.isProcessing(ref)) {
|
if (dispatchQueue.isProcessing(ref)) {
|
||||||
log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" }
|
log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" }
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Hent full sekvens for ref (Eventi-invariant)
|
// Fetch full sequence for dispatch
|
||||||
val fullLog = eventStore.getPersistedEventsFor(ref)
|
val fullLog = eventStore.getPersistedEventsFor(ref)
|
||||||
val events = fullLog.mapNotNull { it.toEvent() }
|
val events = fullLog.mapNotNull { it.toEvent() }
|
||||||
|
|
||||||
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
|
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
|
||||||
dispatchQueue.dispatch(ref, events, dispatcher)
|
dispatchQueue.dispatch(ref, events, dispatcher)
|
||||||
|
|
||||||
// Oppdater watermark for denne ref’en
|
// Update watermark for this reference
|
||||||
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
|
val maxEvent = newForRef.maxWith(
|
||||||
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
|
compareBy({ it.persistedAt }, { it.id })
|
||||||
|
)
|
||||||
|
|
||||||
refWatermark[ref] = newWatermark
|
val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
|
||||||
|
val newWatermarkId = maxEvent.id
|
||||||
|
|
||||||
|
refWatermark[ref] = newWatermarkAt to newWatermarkId
|
||||||
anyProcessed = true
|
anyProcessed = true
|
||||||
|
|
||||||
log.debug { "⏩ Updated watermark for $ref → $newWatermark" }
|
log.debug { "⏩ Updated watermark for $ref → ($newWatermarkAt, id=$newWatermarkId)" }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Oppdater global scan hint uansett – vi har sett nye events
|
// Update global scan hint
|
||||||
// Dette hindrer livelock når alle events er <= watermark for sine refs
|
|
||||||
val newLastSeen = maxOf(
|
val newLastSeen = maxOf(
|
||||||
lastSeenTime,
|
lastSeenTime,
|
||||||
maxPersistedThisRound.plusNanos(1)
|
maxPersistedThisRound.plusNanos(1)
|
||||||
)
|
)
|
||||||
|
|
||||||
if (anyProcessed) {
|
if (anyProcessed) {
|
||||||
// Behold intensjonen din: globalt hint basert på laveste watermark,
|
val minRef = refWatermark.values.minOfOrNull { it.first }
|
||||||
// men aldri gå bakover i tid ift lastSeenTime
|
lastSeenTime = when (minRef) {
|
||||||
val minRefWatermark = refWatermark.values.minOrNull()
|
|
||||||
lastSeenTime = when (minRefWatermark) {
|
|
||||||
null -> newLastSeen
|
null -> newLastSeen
|
||||||
else -> maxOf(newLastSeen, minRefWatermark)
|
else -> maxOf(newLastSeen, minRef)
|
||||||
}
|
}
|
||||||
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
|
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
|
||||||
} else {
|
} else {
|
||||||
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
|
|
||||||
// → trygt å flytte lastSeenTime forbi dem
|
|
||||||
lastSeenTime = newLastSeen
|
lastSeenTime = newLastSeen
|
||||||
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
|
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,7 @@ import java.util.UUID
|
|||||||
|
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
abstract class Event {
|
abstract class Event {
|
||||||
var referenceId: UUID = UUID.randomUUID()
|
lateinit var referenceId: UUID
|
||||||
protected set
|
protected set
|
||||||
var eventId: UUID = UUID.randomUUID()
|
var eventId: UUID = UUID.randomUUID()
|
||||||
private set
|
private set
|
||||||
|
|||||||
@ -68,9 +68,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
currentJob = getDispatcherForTask(task).launch {
|
currentJob = getDispatcherForTask(task).launch {
|
||||||
try {
|
try {
|
||||||
val result = onTask(task)
|
val result = onTask(task)
|
||||||
reporter.markCompleted(task.taskId)
|
|
||||||
onComplete(task, result)
|
onComplete(task, result)
|
||||||
|
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
// Dette er en ekte kansellering
|
// Dette er en ekte kansellering
|
||||||
onCancelled(task)
|
onCancelled(task)
|
||||||
|
|||||||
@ -64,7 +64,7 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun shouldProduceOneEventAndStop() {
|
fun shouldProduceOneEventAndStop() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent()
|
val trigger = TriggerEvent().newReferenceId()
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
|
|
||||||
val produced = eventStore.all().firstOrNull()
|
val produced = eventStore.all().firstOrNull()
|
||||||
@ -87,7 +87,7 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun shouldSkipAlreadyDerivedEvents() {
|
fun shouldSkipAlreadyDerivedEvents() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent()
|
val trigger = TriggerEvent().newReferenceId()
|
||||||
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
|
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
|
||||||
|
|
||||||
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
||||||
@ -108,8 +108,8 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun shouldPassFullContextToListener() {
|
fun shouldPassFullContextToListener() {
|
||||||
val listener = ContextCapturingListener()
|
val listener = ContextCapturingListener()
|
||||||
|
|
||||||
val e1 = TriggerEvent()
|
val e1 = TriggerEvent().newReferenceId()
|
||||||
val e2 = OtherEvent()
|
val e2 = OtherEvent().newReferenceId()
|
||||||
dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
|
dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
|
||||||
|
|
||||||
assertEquals(2, listener.context.size)
|
assertEquals(2, listener.context.size)
|
||||||
@ -124,9 +124,11 @@ class EventDispatcherTest : TestBase() {
|
|||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
fun shouldBehaveDeterministicallyAcrossReplays() {
|
fun shouldBehaveDeterministicallyAcrossReplays() {
|
||||||
|
val referenceId = UUID.randomUUID()
|
||||||
|
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent()
|
val trigger = TriggerEvent().usingReferenceId(referenceId)
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
|
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
|
||||||
|
|
||||||
@ -144,6 +146,8 @@ class EventDispatcherTest : TestBase() {
|
|||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
fun shouldNotDeliverDeletedEventsAsCandidates() {
|
fun shouldNotDeliverDeletedEventsAsCandidates() {
|
||||||
|
val referenceId = UUID.randomUUID()
|
||||||
|
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
|
|
||||||
@ -154,11 +158,10 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Original hendelse
|
// Original hendelse
|
||||||
val original = TriggerEvent()
|
val original = TriggerEvent().usingReferenceId(referenceId)
|
||||||
|
|
||||||
// Slettehendelse som peker på original
|
// Slettehendelse som peker på original
|
||||||
val deleted = object : DeleteEvent(original.eventId) {
|
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
|
||||||
}
|
|
||||||
|
|
||||||
// Dispatch med begge hendelser
|
// Dispatch med begge hendelser
|
||||||
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
|
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
|
||||||
@ -184,6 +187,8 @@ class EventDispatcherTest : TestBase() {
|
|||||||
)
|
)
|
||||||
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
|
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
|
val referenceId = UUID.randomUUID()
|
||||||
|
|
||||||
object : EventListener() {
|
object : EventListener() {
|
||||||
override fun onEvent(event: Event, history: List<Event>): Event? {
|
override fun onEvent(event: Event, history: List<Event>): Event? {
|
||||||
if (event is DeleteEvent) received += event
|
if (event is DeleteEvent) received += event
|
||||||
@ -191,7 +196,7 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val deleted = object : DeleteEvent(UUID.randomUUID()) {}
|
val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) }
|
||||||
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
|
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
|
||||||
|
|
||||||
assertTrue(received.contains(deleted))
|
assertTrue(received.contains(deleted))
|
||||||
@ -208,7 +213,7 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
|
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent()
|
val trigger = TriggerEvent().newReferenceId()
|
||||||
// Første dispatch: trigger produserer en DerivedEvent
|
// Første dispatch: trigger produserer en DerivedEvent
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
|
|
||||||
@ -238,8 +243,8 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun historyShouldExcludeDeletedEvents() {
|
fun historyShouldExcludeDeletedEvents() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
|
||||||
val original = TriggerEvent()
|
val original = TriggerEvent().newReferenceId()
|
||||||
val deleted = object : DeleteEvent(original.eventId) {}
|
val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) }
|
||||||
|
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
|
|
||||||
@ -266,9 +271,9 @@ class EventDispatcherTest : TestBase() {
|
|||||||
)
|
)
|
||||||
fun historyShouldKeepNonDeletedEvents() {
|
fun historyShouldKeepNonDeletedEvents() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
val referenceId = UUID.randomUUID()
|
||||||
val e1 = TriggerEvent()
|
val e1 = TriggerEvent().usingReferenceId(referenceId)
|
||||||
val e2 = OtherEvent()
|
val e2 = OtherEvent().usingReferenceId(referenceId)
|
||||||
val deleted = object : DeleteEvent(e1.eventId) {}
|
val deleted = object : DeleteEvent(e1.eventId) {}
|
||||||
|
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
@ -298,8 +303,8 @@ class EventDispatcherTest : TestBase() {
|
|||||||
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
|
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
|
||||||
val original = TriggerEvent()
|
val original = TriggerEvent().newReferenceId()
|
||||||
val deleted = object : DeleteEvent(original.eventId) {}
|
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
|
||||||
|
|
||||||
var receivedEvent: Event? = null
|
var receivedEvent: Event? = null
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
|
|||||||
@ -39,7 +39,7 @@ class ZDSTest {
|
|||||||
fun scenario1() {
|
fun scenario1() {
|
||||||
EventTypeRegistry.register(EchoEvent::class.java)
|
EventTypeRegistry.register(EchoEvent::class.java)
|
||||||
|
|
||||||
val echo = EchoEvent("hello")
|
val echo = EchoEvent("hello").newReferenceId()
|
||||||
val persisted = echo.toPersisted(id = 1L)
|
val persisted = echo.toPersisted(id = 1L)
|
||||||
|
|
||||||
val restored = persisted!!.toEvent()
|
val restored = persisted!!.toEvent()
|
||||||
|
|||||||
@ -214,7 +214,7 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val original = EchoEvent("Hello")
|
val original = EchoEvent("Hello").newReferenceId()
|
||||||
eventStore.persist(original)
|
eventStore.persist(original)
|
||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|||||||
@ -60,6 +60,42 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
store.persistAt(e, time)
|
store.persistAt(e, time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("""
|
||||||
|
Når to events har identisk persistedAt
|
||||||
|
Hvis polleren kjører
|
||||||
|
Så skal begge events prosesseres og ingen mistes
|
||||||
|
""")
|
||||||
|
fun `poller handles same-timestamp events without losing any`() = runTest {
|
||||||
|
val ref = UUID.randomUUID()
|
||||||
|
val ts = Instant.parse("2025-01-01T12:00:00Z")
|
||||||
|
|
||||||
|
// Two events with same timestamp but different IDs
|
||||||
|
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
|
||||||
|
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
|
||||||
|
|
||||||
|
store.persistAt(e1, ts) // id=1
|
||||||
|
store.persistAt(e2, ts) // id=2
|
||||||
|
|
||||||
|
poller.startFor(iterations = 1)
|
||||||
|
|
||||||
|
// Verify dispatch happened
|
||||||
|
assertThat(dispatcher.dispatched).hasSize(1)
|
||||||
|
|
||||||
|
val (_, events) = dispatcher.dispatched.single()
|
||||||
|
|
||||||
|
// Both events must be present
|
||||||
|
assertThat(events.map { it.eventId })
|
||||||
|
.hasSize(2)
|
||||||
|
.doesNotHaveDuplicates()
|
||||||
|
|
||||||
|
// Watermark must reflect highest ID
|
||||||
|
val wm = poller.watermarkFor(ref)
|
||||||
|
assertThat(wm!!.first).isEqualTo(ts)
|
||||||
|
assertThat(wm.second).isEqualTo(2)
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når polleren kjører flere iterasjoner uten events
|
Når polleren kjører flere iterasjoner uten events
|
||||||
@ -271,11 +307,15 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
|
|
||||||
poller.startFor(iterations = 1)
|
poller.startFor(iterations = 1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// A skal IKKE ha flyttet watermark
|
// A skal IKKE ha flyttet watermark
|
||||||
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
|
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
|
||||||
|
|
||||||
// B skal ha flyttet watermark
|
// B skal ha flyttet watermark (på timestamp-nivå)
|
||||||
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
|
val wmB2 = poller.watermarkFor(refB)
|
||||||
|
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
|
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
|
||||||
@ -433,7 +473,7 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
|
|
||||||
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
|
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
|
||||||
// men i denne testen må vi simulere det)
|
// men i denne testen må vi simulere det)
|
||||||
poller.setWatermarkFor(ref, t(100))
|
poller.setWatermarkFor(ref, t(100), id = 999)
|
||||||
|
|
||||||
// Sett lastSeenTime bak eventen
|
// Sett lastSeenTime bak eventen
|
||||||
poller.lastSeenTime = t(0)
|
poller.lastSeenTime = t(0)
|
||||||
|
|||||||
@ -17,8 +17,6 @@ class TestablePoller(
|
|||||||
val scope: TestScope
|
val scope: TestScope
|
||||||
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
|
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
suspend fun startFor(iterations: Int) {
|
suspend fun startFor(iterations: Int) {
|
||||||
repeat(iterations) {
|
repeat(iterations) {
|
||||||
try {
|
try {
|
||||||
@ -32,19 +30,17 @@ class TestablePoller(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun watermarkFor(ref: UUID): Instant? {
|
override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
|
||||||
return refWatermark[ref]?.let {
|
return refWatermark[ref]
|
||||||
return it
|
}
|
||||||
|
|
||||||
|
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
|
||||||
|
refWatermark[ref] = time to id
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun setWatermarkFor(ref: UUID, time: Instant) {
|
|
||||||
refWatermark[ref] = time
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
interface WatermarkDebugView {
|
interface WatermarkDebugView {
|
||||||
fun watermarkFor(ref: UUID): Instant?
|
fun watermarkFor(ref: UUID): Pair<Instant, Long>?
|
||||||
fun setWatermarkFor(ref: UUID, time: Instant)
|
fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -114,7 +114,7 @@ class TaskPollerImplementationTest : TestBase() {
|
|||||||
val listener = EchoListener()
|
val listener = EchoListener()
|
||||||
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
|
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
|
||||||
|
|
||||||
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
|
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() })
|
||||||
taskStore.persist(task)
|
taskStore.persist(task)
|
||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user