diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt index f8c38cd..06c6539 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventDispatcher.kt @@ -5,9 +5,9 @@ import no.iktdev.eventi.models.Event import no.iktdev.eventi.stores.EventStore import java.util.UUID -class EventDispatcher(val eventStore: EventStore) { +open class EventDispatcher(val eventStore: EventStore) { - fun dispatch(referenceId: UUID, events: List) { + open fun dispatch(referenceId: UUID, events: List) { val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet() val deletedEventIds = events.filterIsInstance().map { it.deletedEventId } val candidates = events diff --git a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt index 3f2376f..5b4c4ee 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/SequenceDispatchQueue.kt @@ -11,7 +11,7 @@ import no.iktdev.eventi.models.Event import java.util.UUID import java.util.concurrent.ConcurrentHashMap -class SequenceDispatchQueue( +open class SequenceDispatchQueue( private val maxConcurrency: Int = 8, private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob()) ) { @@ -25,9 +25,9 @@ class SequenceDispatchQueue( private val log = KotlinLogging.logger {} - fun isProcessing(referenceId: UUID): Boolean = referenceId in active + open fun isProcessing(referenceId: UUID): Boolean = referenceId in active - fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job? { + open fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job? { if (!active.add(referenceId)) { log.debug {"⚠️ Already processing $referenceId, skipping dispatch"} return null diff --git a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt index 9d366c2..229ae01 100644 --- a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt @@ -71,7 +71,7 @@ class EventDispatcherTest: TestBase() { val listener = ProducingListener() val trigger = TriggerEvent() - val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now()) + val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.UtcNow()) eventStore.persist(derived!!.toEvent()!!) // simulate prior production diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt index bc87180..88194d4 100644 --- a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt +++ b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt @@ -18,10 +18,16 @@ class InMemoryEventStore : EventStore { persisted.filter { it.referenceId == referenceId } override fun persist(event: Event) { - val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now()) + val persistedEvent = event.toPersisted(nextId++, MyTime.UtcNow()) persisted += persistedEvent!! } + fun persistAt(event: Event, persistedAt: LocalDateTime) { + val persistedEvent = event.toPersisted(nextId++, persistedAt) + persisted += persistedEvent!! + } + + fun all(): List = persisted fun clear() { persisted.clear(); nextId = 1L } } diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt index ec3fafc..82aa4e9 100644 --- a/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt +++ b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt @@ -30,13 +30,13 @@ open class InMemoryTaskStore : TaskStore { override fun claim(taskId: UUID, workerId: String): Boolean { val task = findByTaskId(taskId) ?: return false if (task.claimed && !isExpired(task)) return false - update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = LocalDateTime.now())) + update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = MyTime.UtcNow())) return true } override fun heartbeat(taskId: UUID) { val task = findByTaskId(taskId) ?: return - update(task.copy(lastCheckIn = LocalDateTime.now())) + update(task.copy(lastCheckIn = MyTime.UtcNow())) } override fun markConsumed(taskId: UUID, status: TaskStatus) { @@ -45,7 +45,7 @@ open class InMemoryTaskStore : TaskStore { } override fun releaseExpiredTasks(timeout: Duration) { - val now = LocalDateTime.now() + val now = MyTime.UtcNow() tasks.filter { it.claimed && !it.consumed && it.lastCheckIn?.isBefore(now.minus(timeout)) == true }.forEach { @@ -60,7 +60,7 @@ open class InMemoryTaskStore : TaskStore { } private fun isExpired(task: PersistedTask): Boolean { - val now = LocalDateTime.now() + val now = MyTime.UtcNow() return task.lastCheckIn?.isBefore(now.minusMinutes(15)) == true } diff --git a/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt b/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt index 1a93b86..8bad4e3 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt @@ -12,6 +12,7 @@ import no.iktdev.eventi.EventDispatcherTest import no.iktdev.eventi.EventDispatcherTest.DerivedEvent import no.iktdev.eventi.EventDispatcherTest.OtherEvent import no.iktdev.eventi.EventDispatcherTest.TriggerEvent +import no.iktdev.eventi.MyTime import no.iktdev.eventi.TestBase import no.iktdev.eventi.models.Event import no.iktdev.eventi.testUtil.wipe @@ -137,7 +138,7 @@ class EventPollerImplementationTest : TestBase() { val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { init { - lastSeenTime = LocalDateTime.now().plusSeconds(1) + lastSeenTime = MyTime.UtcNow().plusSeconds(1) } } diff --git a/src/test/kotlin/no/iktdev/eventi/events/RunSimulationTest.kt b/src/test/kotlin/no/iktdev/eventi/events/RunSimulationTest.kt new file mode 100644 index 0000000..6e84bc1 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/RunSimulationTest.kt @@ -0,0 +1,244 @@ +package no.iktdev.eventi.events + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.* +import kotlinx.coroutines.test.* +import no.iktdev.eventi.InMemoryEventStore +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.LocalDateTime +import java.util.UUID +import kotlinx.coroutines.* +import no.iktdev.eventi.ZDS.toPersisted +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.Metadata +import java.util.concurrent.ConcurrentHashMap + + +class FakeDispatchQueue( + private val scope: CoroutineScope +) : SequenceDispatchQueue(8, scope) { + + private val active = ConcurrentHashMap.newKeySet() + + override fun isProcessing(referenceId: UUID): Boolean = referenceId in active + + override fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job { + active.add(referenceId) + return scope.launch { + try { + dispatcher.dispatch(referenceId, events) + } finally { + active.remove(referenceId) + } + } + } +} + + +class FakeDispatcher : EventDispatcher(InMemoryEventStore()) { + + val dispatched = mutableListOf>>() + + override fun dispatch(referenceId: UUID, events: List) { + dispatched += referenceId to events + } +} + +class TestEvent : Event() { + fun withReference(id: UUID): TestEvent { + this.referenceId = id + return this + } + fun setMetadata(metadata: Metadata): TestEvent { + this.metadata = metadata + return this + } +} + + +class FakeClock(var now: LocalDateTime) { + fun advanceSeconds(sec: Long) { + now = now.plusSeconds(sec) + } +} + + +class RunSimulationTestTest { + + private lateinit var store: InMemoryEventStore + private lateinit var dispatcher: FakeDispatcher + private lateinit var testDispatcher: TestDispatcher + private lateinit var scope: CoroutineScope + private lateinit var queue: FakeDispatchQueue + private lateinit var poller: EventPollerImplementation + + @BeforeEach + fun setup() { + store = InMemoryEventStore() + dispatcher = FakeDispatcher() + testDispatcher = StandardTestDispatcher() + scope = CoroutineScope(testDispatcher) + queue = FakeDispatchQueue(scope) + EventTypeRegistry.register(TestEvent::class.java) + poller = object : EventPollerImplementation(store, queue, dispatcher) { + override suspend fun start() = error("Do not call start() in tests") + } + } + + private fun persistEvent(ref: UUID, time: LocalDateTime) { + val e = TestEvent().withReference(ref) + store.persist(e.setMetadata(Metadata())) + } + + @Test + fun `poller updates lastSeenTime when dispatch happens`() = runTest(testDispatcher) { + val ref = UUID.randomUUID() + val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0) + + persistEvent(ref, t) + + poller.pollOnce() + advanceUntilIdle() + + assertThat(poller.lastSeenTime).isAfter(LocalDateTime.of(1970,1,1,0,0)) + assertThat(dispatcher.dispatched).hasSize(1) + } + + + class AlwaysBusyDispatchQueue : SequenceDispatchQueue(8, CoroutineScope(Dispatchers.Default)) { + override fun isProcessing(referenceId: UUID): Boolean = true + override fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher) = null + } + + @Test + fun `poller does NOT update lastSeenTime when queue is busy`() = runTest { + val ref = UUID.randomUUID() + val t = LocalDateTime.of(2026,1,22,12,0,0) + + store.persistAt(TestEvent().withReference(ref), t) + + val busyQueue = AlwaysBusyDispatchQueue() + + val poller = object : EventPollerImplementation(store, busyQueue, dispatcher) {} + + poller.pollOnce() + advanceUntilIdle() + + assertThat(poller.lastSeenTime) + .isEqualTo(LocalDateTime.of(1970,1,1,0,0)) + } + + + + @Test + fun `poller does not double-dispatch`() = runTest(testDispatcher) { + val ref = UUID.randomUUID() + val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0) + + persistEvent(ref, t) + + poller.pollOnce() + advanceUntilIdle() + + poller.pollOnce() + advanceUntilIdle() + + assertThat(dispatcher.dispatched).hasSize(1) + } + + @Test + fun `poller handles multiple referenceIds`() = runTest(testDispatcher) { + val refA = UUID.randomUUID() + val refB = UUID.randomUUID() + val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0) + + persistEvent(refA, t) + persistEvent(refB, t.plusSeconds(1)) + + poller.pollOnce() + advanceUntilIdle() + + assertThat(dispatcher.dispatched).hasSize(2) + } + + @Test + fun `poller handles identical timestamps`() = runTest(testDispatcher) { + val refA = UUID.randomUUID() + val refB = UUID.randomUUID() + val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0) + + persistEvent(refA, t) + persistEvent(refB, t) + + poller.pollOnce() + advanceUntilIdle() + + assertThat(dispatcher.dispatched).hasSize(2) + } + + @Test + fun `poller backs off when no new events`() = runTest(testDispatcher) { + val before = poller.backoff + + poller.pollOnce() + advanceUntilIdle() + + assertThat(poller.backoff).isGreaterThan(before) + } + + class ControlledDispatchQueue( + private val scope: CoroutineScope + ) : SequenceDispatchQueue(8, scope) { + + val busyRefs = mutableSetOf() + + override fun isProcessing(referenceId: UUID): Boolean = + referenceId in busyRefs + + override fun dispatch(referenceId: UUID, events: List, dispatcher: EventDispatcher): Job { + return scope.launch { + dispatcher.dispatch(referenceId, events) + } + } + } + + + + @Test + fun `poller processes events arriving while queue is busy`() = runTest(testDispatcher) { + val ref = UUID.randomUUID() + val t1 = LocalDateTime.of(2026, 1, 22, 12, 0, 0) + val t2 = t1.plusSeconds(5) + + persistEvent(ref, t1) + + val controlledQueue = ControlledDispatchQueue(scope) + controlledQueue.busyRefs += ref + + val poller = object : EventPollerImplementation(store, controlledQueue, dispatcher) {} + + // Poll #1: busy → no dispatch + poller.pollOnce() + advanceUntilIdle() + + assertThat(dispatcher.dispatched).isEmpty() + + // Now free + controlledQueue.busyRefs.clear() + + // Add new event + persistEvent(ref, t2) + + // Poll #2: should dispatch both events + poller.pollOnce() + advanceUntilIdle() + + assertThat(dispatcher.dispatched).hasSize(1) + assertThat(dispatcher.dispatched.single().second).hasSize(2) + } + + +} diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt new file mode 100644 index 0000000..6eea18d --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/PollerStartLoopTest.kt @@ -0,0 +1,110 @@ +package no.iktdev.eventi.events.poller + +import kotlinx.coroutines.test.* +import no.iktdev.eventi.InMemoryEventStore +import no.iktdev.eventi.events.EventTypeRegistry +import no.iktdev.eventi.events.FakeDispatcher +import no.iktdev.eventi.events.RunSimulationTestTest +import no.iktdev.eventi.events.TestEvent +import no.iktdev.eventi.models.Metadata +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.LocalDateTime +import java.util.UUID + +class PollerStartLoopTest { + + private lateinit var store: InMemoryEventStore + private lateinit var dispatcher: FakeDispatcher + private lateinit var testDispatcher: TestDispatcher + private lateinit var scope: TestScope + private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue + private lateinit var poller: TestablePoller + + @BeforeEach + fun setup() { + store = InMemoryEventStore() + dispatcher = FakeDispatcher() + testDispatcher = StandardTestDispatcher() + scope = TestScope(testDispatcher) + queue = RunSimulationTestTest.ControlledDispatchQueue(scope) + EventTypeRegistry.register(TestEvent::class.java) + + poller = TestablePoller(store, queue, dispatcher, scope) + } + + private fun persistAt(ref: UUID, time: LocalDateTime) { + val e = TestEvent().withReference(ref).setMetadata(Metadata()) + store.persistAt(e, time) + } + + @Test + fun `poller does not spin when no events exist`() = runTest { + val startBackoff = poller.backoff + + poller.startFor(iterations = 10) + + assertThat(poller.backoff).isGreaterThan(startBackoff) + assertThat(dispatcher.dispatched).isEmpty() + } + + @Test + fun `poller increases backoff exponentially`() = runTest { + val b1 = poller.backoff + + poller.startFor(iterations = 1) + val b2 = poller.backoff + + poller.startFor(iterations = 1) + val b3 = poller.backoff + + assertThat(b2).isGreaterThan(b1) + assertThat(b3).isGreaterThan(b2) + } + + @Test + fun `poller resets backoff when events appear`() = runTest { + poller.startFor(iterations = 5) + val before = poller.backoff + + val ref = UUID.randomUUID() + persistAt(ref, LocalDateTime.now()) + + poller.startFor(iterations = 1) + + assertThat(poller.backoff).isEqualTo(java.time.Duration.ofSeconds(2)) + } + + @Test + fun `poller processes events that arrive while sleeping`() = runTest { + val ref = UUID.randomUUID() + + poller.startFor(iterations = 3) + + persistAt(ref, LocalDateTime.now()) + + poller.startFor(iterations = 1) + + assertThat(dispatcher.dispatched).hasSize(1) + } + + @Test + fun `poller does not lose events under concurrency`() = runTest { + val ref = UUID.randomUUID() + + queue.busyRefs += ref + + persistAt(ref, LocalDateTime.now()) + + poller.startFor(iterations = 1) + + assertThat(dispatcher.dispatched).isEmpty() + + queue.busyRefs.clear() + + poller.startFor(iterations = 1) + + assertThat(dispatcher.dispatched).hasSize(1) + } +} diff --git a/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt new file mode 100644 index 0000000..3cd95f0 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/events/poller/TestablePoller.kt @@ -0,0 +1,29 @@ +package no.iktdev.eventi.events.poller + +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.advanceTimeBy +import no.iktdev.eventi.events.EventDispatcher +import no.iktdev.eventi.events.EventPollerImplementation +import no.iktdev.eventi.events.SequenceDispatchQueue +import no.iktdev.eventi.stores.EventStore + +class TestablePoller( + eventStore: EventStore, + dispatchQueue: SequenceDispatchQueue, + dispatcher: EventDispatcher, + val scope: TestScope +) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher) { + + suspend fun startFor(iterations: Int) { + repeat(iterations) { + try { + pollOnce() + } catch (_: Exception) { + // same as prod + } + + // Simuler delay(backoff) + scope.testScheduler.advanceTimeBy(backoff.toMillis()) + } + } +}