From 289ee88be06874b16e3426ba37de1246ded3e621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sat, 31 Jan 2026 18:51:39 +0100 Subject: [PATCH] Test runner --- .../events/EventPollerImplementationTest.kt | 49 +++++++++++++------ .../iktdev/eventi/tasks/TaskListenerTest.kt | 37 ++++++++++---- .../testUtil/TestSequenceDispatchQueue.kt | 14 ++++++ 3 files changed, 75 insertions(+), 25 deletions(-) create mode 100644 src/test/kotlin/no/iktdev/eventi/testUtil/TestSequenceDispatchQueue.kt diff --git a/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt b/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt index 0dc789e..d81ab29 100644 --- a/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/events/EventPollerImplementationTest.kt @@ -1,19 +1,23 @@ package no.iktdev.eventi.events import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.test.StandardTestDispatcher import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.awaitAll import no.iktdev.eventi.EventDispatcherTest.DerivedEvent import no.iktdev.eventi.EventDispatcherTest.OtherEvent import no.iktdev.eventi.EventDispatcherTest.TriggerEvent import no.iktdev.eventi.MyTime import no.iktdev.eventi.TestBase import no.iktdev.eventi.models.Event +import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue import no.iktdev.eventi.testUtil.wipe import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertFalse @@ -33,9 +37,7 @@ Så skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater """) class EventPollerImplementationTest : TestBase() { - val dispatcher = EventDispatcher(eventStore) - val queue = SequenceDispatchQueue(maxConcurrency = 8) - val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} + private val dispatcher = EventDispatcher(eventStore) @BeforeEach fun setup() { @@ -59,6 +61,10 @@ class EventPollerImplementationTest : TestBase() { Så skal alle referenceId-er dispatch'es og lastSeenTime oppdateres """) fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest { + val testDispatcher = StandardTestDispatcher(testScheduler) + val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher) + val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} + val dispatched = ConcurrentHashMap.newKeySet() val completionMap = mutableMapOf>() @@ -93,6 +99,9 @@ class EventPollerImplementationTest : TestBase() { Så skal backoff øke, og resettes når nye events ankommer """) fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest { + val testDispatcher = StandardTestDispatcher(testScheduler) + val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher) + val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { fun currentBackoff(): Duration = backoff } @@ -121,6 +130,10 @@ class EventPollerImplementationTest : TestBase() { Så skal polleren gruppere og dispatch'e alle tre i én batch """) fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest { + val testDispatcher = StandardTestDispatcher(testScheduler) + val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher) + val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} + val refId = UUID.randomUUID() val received = mutableListOf() val done = CompletableDeferred() @@ -157,8 +170,8 @@ class EventPollerImplementationTest : TestBase() { Så skal polleren ignorere dem """) fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest { - val refId = UUID.randomUUID() - val ignored = TriggerEvent().usingReferenceId(refId) + val testDispatcher = StandardTestDispatcher(testScheduler) + val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher) val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { init { @@ -166,6 +179,9 @@ class EventPollerImplementationTest : TestBase() { } } + val refId = UUID.randomUUID() + val ignored = TriggerEvent().usingReferenceId(refId) + eventStore.persist(ignored) testPoller.pollOnce() @@ -180,7 +196,12 @@ class EventPollerImplementationTest : TestBase() { Så skal begge events prosesseres, men uten å produsere duplikate derived events """) fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest { + val testDispatcher = StandardTestDispatcher(testScheduler) + val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher) + val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} + EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java)) + val channel = Channel(Channel.UNLIMITED) val handled = mutableListOf() @@ -193,16 +214,14 @@ class EventPollerImplementationTest : TestBase() { } } - val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} - val original = EchoEvent("Hello") eventStore.persist(original) poller.pollOnce() - withContext(Dispatchers.Default.limitedParallelism(1)) { - withTimeout(Duration.ofMinutes(1).toMillis()) { - repeat(1) { channel.receive() } + withContext(testDispatcher) { + withTimeout(60_000) { + channel.receive() } } @@ -211,9 +230,9 @@ class EventPollerImplementationTest : TestBase() { poller.pollOnce() - withContext(Dispatchers.Default.limitedParallelism(1)) { - withTimeout(Duration.ofMinutes(1).toMillis()) { - repeat(1) { channel.receive() } + withContext(testDispatcher) { + withTimeout(60_000) { + channel.receive() } } diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt index 56b4006..7bd0cfb 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt @@ -466,12 +466,15 @@ class TaskListenerTest { @Test @DisplayName(""" - Når listener prosesserer to tasks sekvensielt - Hvis cleanup fungerer riktig - Så skal ingen state lekke mellom tasks - """) +Når listener prosesserer to tasks sekvensielt +Hvis cleanup fungerer riktig +Så skal ingen state lekke mellom tasks +""") fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest { + val started1 = CompletableDeferred() val finish1 = CompletableDeferred() + + val started2 = CompletableDeferred() val finish2 = CompletableDeferred() val listener = object : TaskListener() { @@ -481,20 +484,31 @@ class TaskListenerTest { override fun getWorkerId() = "worker" override fun supports(task: Task) = true - override suspend fun onTask(task: Task): Event? { + override suspend fun onTask(task: Task): Event { callCount++ - if (callCount == 1) finish1.await() - if (callCount == 2) finish2.await() + + if (callCount == 1) { + started1.complete(Unit) // signal: coroutine har startet + finish1.await() // vent til testen sier "fortsett" + } + + if (callCount == 2) { + started2.complete(Unit) + finish2.await() + } + return object : Event() {} } } val reporter = FakeReporter() - // Task 1 + // --- Task 1 --- val task1 = FakeTask() listener.accept(task1, reporter) - finish1.complete(Unit) + + started1.await() // garanterer at coroutine kjører + finish1.complete(Unit) // la coroutine fullføre listener.currentJob!!.join() // Verifiser cleanup @@ -502,9 +516,11 @@ class TaskListenerTest { assertNull(listener.currentTask) assertNull(listener.heartbeatRunner) - // Task 2 + // --- Task 2 --- val task2 = FakeTask() listener.accept(task2, reporter) + + started2.await() finish2.complete(Unit) listener.currentJob!!.join() @@ -516,4 +532,5 @@ class TaskListenerTest { // onTask ble kalt to ganger assertEquals(2, listener.callCount) } + } diff --git a/src/test/kotlin/no/iktdev/eventi/testUtil/TestSequenceDispatchQueue.kt b/src/test/kotlin/no/iktdev/eventi/testUtil/TestSequenceDispatchQueue.kt new file mode 100644 index 0000000..6c72b73 --- /dev/null +++ b/src/test/kotlin/no/iktdev/eventi/testUtil/TestSequenceDispatchQueue.kt @@ -0,0 +1,14 @@ +package no.iktdev.eventi.testUtil + +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import no.iktdev.eventi.events.SequenceDispatchQueue + +class TestSequenceDispatchQueue( + maxConcurrency: Int, + dispatcher: CoroutineDispatcher +) : SequenceDispatchQueue( + maxConcurrency, + CoroutineScope(dispatcher + SupervisorJob()) +)