Readability + Filter on history in dispatcher

This commit is contained in:
Brage Skjønborg 2026-01-30 03:11:07 +01:00
parent a5ca7c32b7
commit aab76f32b3
10 changed files with 546 additions and 290 deletions

View File

@ -14,9 +14,14 @@ open class EventDispatcher(val eventStore: EventStore) {
.filter { it.eventId !in derivedFromIds }
.filter { it.eventId !in deletedEventIds }
val effectiveHistory = events
.filter { it.eventId !in deletedEventIds } // fjern slettede events
.filterNot { it is DeleteEvent } // fjern selve delete-eventet
EventListenerRegistry.getListeners().forEach { listener ->
for (candidate in candidates) {
val result = listener.onEvent(candidate, events)
val result = listener.onEvent(candidate, effectiveHistory)
if (result != null) {
eventStore.persist(result)
}

View File

@ -20,37 +20,47 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.util.UUID
class EventDispatcherTest: TestBase() {
@DisplayName("""
EventDispatcher
Når hendelser dispatches til lyttere
Hvis hendelsene inneholder avledede, slettede eller nye events
skal dispatcheren håndtere filtrering, replays og historikk korrekt
""")
class EventDispatcherTest : TestBase() {
val dispatcher = EventDispatcher(eventStore)
class DerivedEvent(): Event()
class TriggerEvent(): Event() {
}
class OtherEvent(): Event()
class DummyEvent(): Event() {
class DerivedEvent : Event()
class TriggerEvent : Event()
class OtherEvent : Event()
class DummyEvent : Event() {
fun putMetadata(metadata: Metadata) {
this.metadata = metadata
}
}
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf(
EventTypeRegistry.register(
listOf(
DerivedEvent::class.java,
TriggerEvent::class.java,
OtherEvent::class.java,
DummyEvent::class.java
))
)
)
}
@Test
fun `should produce one event and stop`() {
@DisplayName("""
Når en TriggerEvent dispatches
Hvis en lytter produserer én DerivedEvent
skal kun én ny event produseres og prosessen stoppe
""")
fun shouldProduceOneEventAndStop() {
val listener = ProducingListener()
val trigger = TriggerEvent()
@ -66,7 +76,12 @@ class EventDispatcherTest: TestBase() {
}
@Test
fun `should skip already derived events`() {
@DisplayName("""
Når en event allerede har avledet en DerivedEvent
Hvis dispatcheren replays historikken
skal ikke DerivedEvent produseres nytt
""")
fun shouldSkipAlreadyDerivedEvents() {
val listener = ProducingListener()
val trigger = TriggerEvent()
@ -76,11 +91,16 @@ class EventDispatcherTest: TestBase() {
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived!!.toEvent()!!))
assertEquals(1, eventStore.all().size) // no new event produced
assertEquals(1, eventStore.all().size)
}
@Test
fun `should pass full context to listener`() {
@DisplayName("""
Når flere events dispatches
Hvis en lytter mottar en event
skal hele historikken leveres i context
""")
fun shouldPassFullContextToListener() {
val listener = ContextCapturingListener()
val e1 = TriggerEvent()
@ -91,7 +111,12 @@ class EventDispatcherTest: TestBase() {
}
@Test
fun `should behave deterministically across replays`() {
@DisplayName("""
Når en replay skjer
Hvis en event allerede har produsert en DerivedEvent
skal ikke DerivedEvent produseres nytt
""")
fun shouldBehaveDeterministicallyAcrossReplays() {
val listener = ProducingListener()
val trigger = TriggerEvent()
@ -100,13 +125,19 @@ class EventDispatcherTest: TestBase() {
dispatcher.dispatch(trigger.referenceId, replayContext)
assertEquals(1, eventStore.all().size) // no duplicate
assertEquals(1, eventStore.all().size)
}
@Test
fun `should not deliver deleted events as candidates`() {
@DisplayName("""
Når en DeleteEvent peker en tidligere event
Hvis dispatcheren filtrerer kandidater
skal slettede events ikke leveres som kandidater
""")
fun shouldNotDeliverDeletedEventsAsCandidates() {
val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
received += event
@ -135,7 +166,12 @@ class EventDispatcherTest: TestBase() {
}
@Test
fun `should deliver DeleteEvent to listeners that react to it`() {
@DisplayName("""
Når en DeleteEvent dispatches alene
Hvis en lytter reagerer DeleteEvent
skal DeleteEvent leveres som kandidat
""")
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
val received = mutableListOf<Event>()
val listener = object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
@ -144,16 +180,19 @@ class EventDispatcherTest: TestBase() {
}
}
val deleted = object : DeleteEvent(UUID.randomUUID()) {
}
val deleted = object : DeleteEvent(UUID.randomUUID()) {}
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
assertTrue(received.contains(deleted))
}
@Test
@DisplayName("Replay skal ikke levere en event som allerede har avledet en ny")
fun `should not re-deliver events that have produced derived events`() {
@DisplayName("""
Når en event har avledet en ny event
Hvis dispatcheren replays historikken
skal ikke original-eventen leveres som kandidat igjen
""")
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
val listener = ProducingListener()
val trigger = TriggerEvent()
@ -175,7 +214,90 @@ class EventDispatcherTest: TestBase() {
}
}
@Test
@DisplayName("""
Når en DeleteEvent slettet en tidligere event
Hvis dispatcheren bygger historikk
skal slettede events ikke være med i history
""")
fun historyShouldExcludeDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}
var receivedHistory: List<Event> = emptyList()
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
}
}
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
assertFalse(receivedHistory.contains(original))
assertFalse(receivedHistory.contains(deleted))
}
@Test
@DisplayName("""
Når en DeleteEvent slettet en event
Hvis andre events fortsatt er gyldige
skal history kun inneholde de ikke-slettede events
""")
fun historyShouldKeepNonDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val e1 = TriggerEvent()
val e2 = OtherEvent()
val deleted = object : DeleteEvent(e1.eventId) {}
var receivedHistory: List<Event> = emptyList()
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
}
}
dispatcher.dispatch(e1.referenceId, listOf(e1, e2, deleted))
assertTrue(receivedHistory.contains(e2))
assertFalse(receivedHistory.contains(e1))
assertFalse(receivedHistory.contains(deleted))
}
@Test
@DisplayName("""
Når en DeleteEvent er kandidat
Hvis historikken kun inneholder slettede events
skal history være tom
""")
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}
var receivedEvent: Event? = null
var receivedHistory: List<Event> = emptyList()
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedEvent = event
receivedHistory = history
return null
}
}
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
assertTrue(receivedEvent is DeleteEvent)
assertTrue(receivedHistory.isEmpty())
}
// --- Test helpers ---

View File

@ -13,18 +13,29 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@DisplayName("""
ZDS Serialization/Deserialization System
Når Event- og Task-objekter persisteres og gjenopprettes
Hvis type-registrene er korrekt konfigurert
skal ZDS kunne serialisere og deserialisere objektene uten tap av data
""")
class ZDSTest {
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
TaskTypeRegistry.wipe()
// Verifiser at det er tomt
assertNull(EventTypeRegistry.resolve("SomeEvent"))
}
@Test
@DisplayName("Test ZDS with Event object")
@DisplayName("""
Når et Event-objekt persisteres via ZDS
Hvis typen er registrert i EventTypeRegistry
skal det kunne gjenopprettes som riktig Event-type med samme data
""")
fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java)
@ -34,17 +45,19 @@ class ZDSTest {
val restored = persisted!!.toEvent()
assert(restored is EchoEvent)
assert((restored as EchoEvent).data == "hello")
}
data class TestTask(
val data: String?
): Task()
) : Task()
@Test
@DisplayName("Test ZDS with Task object")
@DisplayName("""
Når et Task-objekt persisteres via ZDS
Hvis typen er registrert i TaskTypeRegistry
skal det kunne gjenopprettes som riktig Task-type med metadata intakt
""")
fun scenario2() {
TaskTypeRegistry.register(TestTask::class.java)
val task = TestTask("Potato")
@ -57,7 +70,5 @@ class ZDSTest {
assert((restored as TestTask).data == "Potato")
assert(restored.metadata.created == task.metadata.created)
assert(restored.metadata.derivedFromId == task.metadata.derivedFromId)
}
}

View File

@ -6,35 +6,34 @@ import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@DisplayName("""
EventListenerRegistry
Når lyttere registreres med og uten @ListenerOrder
Hvis registry sorterer dem etter annotasjonen
skal rekkefølgen være deterministisk og korrekt
""")
class EventListenerRegistryTest {
@ListenerOrder(1)
class MockTest1() : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
return null
}
class MockTest1 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? = null
}
@ListenerOrder(2)
class MockTest2() : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
return null
}
class MockTest2 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? = null
}
@ListenerOrder(3)
class MockTest3() : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
return null
}
class MockTest3 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? = null
}
class MockTestRandom() : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
return null
}
class MockTestRandom : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? = null
}
@BeforeEach
@ -43,13 +42,19 @@ class EventListenerRegistryTest {
}
@Test
@DisplayName("""
Når flere lyttere registreres i vilkårlig rekkefølge
Hvis noen har @ListenerOrder og andre ikke
skal registry returnere dem sortert etter order, og usorterte sist
""")
fun validateOrder() {
MockTestRandom()
MockTest1()
MockTest2()
MockTest3()
val listeners = EventListenerRegistry.getListeners()
// Assert
assertThat(listeners.map { it::class.simpleName }).containsExactly(
MockTest1::class.simpleName, // @ListenerOrder(1)
MockTest2::class.simpleName, // @ListenerOrder(2)
@ -57,5 +62,4 @@ class EventListenerRegistryTest {
MockTestRandom::class.simpleName // no annotation → goes last
)
}
}

View File

@ -20,15 +20,22 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.time.Duration
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
@DisplayName("""
EventPollerImplementation
Når polleren leser nye events fra EventStore og samarbeider med SequenceDispatchQueue
Hvis nye events ankommer, køen er travel, eller duplikater dukker opp
skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
""")
class EventPollerImplementationTest : TestBase() {
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
@BeforeEach
@ -36,45 +43,57 @@ class EventPollerImplementationTest : TestBase() {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
eventStore.clear()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf(
EventTypeRegistry.register(
listOf(
DerivedEvent::class.java,
TriggerEvent::class.java,
OtherEvent::class.java
))
)
)
}
@Test
fun `pollOnce should dispatch all new referenceIds and update lastSeenTime`() = runTest {
@DisplayName("""
Når polleren finner nye referenceId-er med events
Hvis pollOnce kjøres
skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
""")
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
EventListenerRegistry.registerListener(object : EventListener() {
EventListenerRegistry.registerListener(
object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
dispatched += event.referenceId
completionMap[event.referenceId]?.complete(Unit)
return null
}
})
}
)
val referenceIds = (1..10).map { UUID.randomUUID() }
referenceIds.forEach { refId ->
val e = EventDispatcherTest.TriggerEvent().usingReferenceId(refId)
eventStore.persist(e) // persistedAt settes automatisk her
val e = TriggerEvent().usingReferenceId(refId)
eventStore.persist(e)
completionMap[refId] = CompletableDeferred()
}
poller.pollOnce()
completionMap.values.awaitAll()
assertEquals(referenceIds.toSet(), dispatched)
}
@Test
fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest {
@DisplayName("""
Når polleren ikke finner nye events
Hvis pollOnce kjøres flere ganger
skal backoff øke, og resettes når nye events ankommer
""")
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff
}
@ -97,15 +116,19 @@ class EventPollerImplementationTest : TestBase() {
}
@Test
fun `pollOnce should group and dispatch exactly 3 events for one referenceId`() = runTest {
@DisplayName("""
Når flere events med samme referenceId ligger i EventStore
Hvis pollOnce kjøres
skal polleren gruppere og dispatch'e alle tre i én batch
""")
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
val refId = UUID.randomUUID()
val received = mutableListOf<Event>()
val done = CompletableDeferred<Unit>()
// Wipe alt før test
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
eventStore.clear() // sørg for at InMemoryEventStore støtter dette
eventStore.clear()
EventTypeRegistry.register(listOf(TriggerEvent::class.java))
@ -122,16 +145,19 @@ class EventPollerImplementationTest : TestBase() {
}
poller.pollOnce()
done.await()
assertEquals(3, received.size)
assertTrue(received.all { it.referenceId == refId })
}
@Test
fun `pollOnce should ignore events before lastSeenTime`() = runTest {
@DisplayName("""
Når polleren har en lastSeenTime i fremtiden
Hvis events ankommer med eldre timestamp
skal polleren ignorere dem
""")
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
@ -142,7 +168,6 @@ class EventPollerImplementationTest : TestBase() {
}
eventStore.persist(ignored)
testPoller.pollOnce()
assertFalse(queue.isProcessing(refId))
@ -150,45 +175,41 @@ class EventPollerImplementationTest : TestBase() {
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `poller handles manually injected duplicate event`() = runTest {
@DisplayName("""
Når en duplikat-event injiseres manuelt i EventStore
Hvis polleren kjører igjen
skal begge events prosesseres, men uten å produsere duplikate derived events
""")
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
val channel = Channel<Event>(Channel.UNLIMITED)
val handled = mutableListOf<Event>()
// Setup
object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
if (event !is EchoEvent)
return null
if (event !is EchoEvent) return null
handled += event
channel.trySend(event)
return MarcoEvent(true).derivedOf(event)
}
}
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
}
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
// Original event
val original = EchoEvent(data = "Hello")
val original = EchoEvent("Hello")
eventStore.persist(original)
// Act
poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) {
repeat(1) { channel.receive() }
}
}
// Manual replay with new eventId, same referenceId
val duplicateEvent = EchoEvent("Test me").usingReferenceId(original.referenceId)
eventStore.persist(duplicateEvent)
// Act
poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) {
@ -197,14 +218,7 @@ class EventPollerImplementationTest : TestBase() {
}
}
// Assert
assertEquals(2, handled.size)
assertTrue(handled.any { it.eventId == original.eventId })
}
}

View File

@ -14,6 +14,7 @@ import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata
import org.junit.jupiter.api.DisplayName
import java.time.Instant
import java.util.concurrent.ConcurrentHashMap
@ -60,13 +61,12 @@ class TestEvent : Event() {
}
class FakeClock(var now: Instant) {
fun advanceSeconds(sec: Long) {
now = MyTime.utcNow().plusSeconds(sec)
}
}
@DisplayName("""
EventPollerImplementation simulert og dispatch
Når polleren leser events fra EventStore og samarbeider med SequenceDispatchQueue
Hvis køen er ledig, travel, eller events ankommer i ulike tidsrekkefølger
skal polleren oppdatere lastSeenTime, unngå duplikater og prosessere riktig
""")
class RunSimulationTestTest {
private lateinit var store: InMemoryEventStore
@ -95,7 +95,12 @@ class RunSimulationTestTest {
}
@Test
fun `poller updates lastSeenTime when dispatch happens`() = runTest(testDispatcher) {
@DisplayName("""
Når polleren finner nye events
Hvis dispatch skjer normalt
skal lastSeenTime oppdateres og dispatcheren én dispatch
""")
fun pollerUpdatesLastSeenTimeWhenDispatchHappens() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
@ -108,21 +113,24 @@ class RunSimulationTestTest {
assertThat(dispatcher.dispatched).hasSize(1)
}
class AlwaysBusyDispatchQueue : SequenceDispatchQueue(8, CoroutineScope(Dispatchers.Default)) {
override fun isProcessing(referenceId: UUID): Boolean = true
override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher) = null
}
@Test
fun `poller DOES update lastSeenTime even when queue is busy`() = runTest {
@DisplayName("""
Når køen er travel og ikke kan dispatch'e
Hvis polleren likevel ser nye events
skal lastSeenTime fortsatt oppdateres (livelock-fix)
""")
fun pollerUpdatesLastSeenTimeEvenWhenQueueBusy() = runTest {
val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
store.persistAt(TestEvent().withReference(ref), t)
val busyQueue = AlwaysBusyDispatchQueue()
val poller = object : EventPollerImplementation(store, busyQueue, dispatcher) {}
poller.pollOnce()
@ -133,11 +141,13 @@ class RunSimulationTestTest {
.isGreaterThan(t)
}
@Test
fun `poller does not double-dispatch`() = runTest(testDispatcher) {
@DisplayName("""
Når polleren kjører flere ganger uten nye events
Hvis første poll allerede dispatch'et eventet
skal polleren ikke dispatch'e samme event to ganger
""")
fun pollerDoesNotDoubleDispatch() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
@ -153,7 +163,12 @@ class RunSimulationTestTest {
}
@Test
fun `poller handles multiple referenceIds`() = runTest(testDispatcher) {
@DisplayName("""
Når flere referenceId-er har nye events
Hvis polleren kjører én runde
skal begge referenceId-er dispatch'es
""")
fun pollerHandlesMultipleReferenceIds() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
@ -168,7 +183,12 @@ class RunSimulationTestTest {
}
@Test
fun `poller handles identical timestamps`() = runTest(testDispatcher) {
@DisplayName("""
Når to events har identisk timestamp
Hvis polleren leser dem i samme poll
skal begge referenceId-er dispatch'es
""")
fun pollerHandlesIdenticalTimestamps() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
@ -183,7 +203,12 @@ class RunSimulationTestTest {
}
@Test
fun `poller backs off when no new events`() = runTest(testDispatcher) {
@DisplayName("""
Når polleren ikke finner nye events
Hvis pollOnce kjøres
skal backoff økes
""")
fun pollerBacksOffWhenNoNewEvents() = runTest(testDispatcher) {
val before = poller.backoff
poller.pollOnce()
@ -208,10 +233,13 @@ class RunSimulationTestTest {
}
}
@Test
fun `poller processes events arriving while queue is busy`() = runTest(testDispatcher) {
@DisplayName("""
Når køen er travel for en referenceId
Hvis nye events ankommer mens køen er travel
skal polleren prosessere alle events når køen blir ledig
""")
fun pollerProcessesEventsArrivingWhileQueueBusy() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t1 = Instant.parse("2026-01-22T12:00:00Z")
val t2 = t1.plusSeconds(5)
@ -242,6 +270,4 @@ class RunSimulationTestTest {
assertThat(dispatcher.dispatched).hasSize(1)
assertThat(dispatcher.dispatched.single().second).hasSize(2)
}
}

View File

@ -10,40 +10,54 @@ import no.iktdev.eventi.models.Event
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
class SequenceDispatchQueueTest: TestBase() {
@DisplayName("""
SequenceDispatchQueue
Når mange referenceId-er skal dispatches parallelt
Hvis køen har begrenset samtidighet
skal alle events prosesseres uten tap
""")
class SequenceDispatchQueueTest : TestBase() {
@BeforeEach
fun setup() {
EventTypeRegistry.wipe()
EventListenerRegistry.wipe()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf(
EventTypeRegistry.register(
listOf(
DerivedEvent::class.java,
TriggerEvent::class.java,
OtherEvent::class.java
))
)
)
}
@Test
fun `should dispatch all referenceIds with limited concurrency`() = runTest {
@DisplayName("""
Når 100 forskjellige referenceId-er dispatches
Hvis køen har en maks samtidighet 8
skal alle referenceId-er bli prosessert nøyaktig én gang
""")
fun shouldDispatchAllReferenceIdsWithLimitedConcurrency() = runTest {
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
EventListenerRegistry.registerListener(object : EventListener() {
EventListenerRegistry.registerListener(
object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
dispatched += event.referenceId
Thread.sleep(50) // simuler tung prosessering
return null
}
})
}
)
val referenceIds = (1..100).map { UUID.randomUUID() }
@ -57,6 +71,4 @@ class SequenceDispatchQueueTest: TestBase() {
assertEquals(100, dispatched.size)
}
}

View File

@ -20,9 +20,16 @@ import org.junit.jupiter.api.Test
import java.time.Instant
import java.util.UUID
import org.assertj.core.api.Assertions.assertThat
import java.time.Duration
class PollerStartLoopTest: TestBase() {
@DisplayName("""
EventPollerImplementation start-loop
Når polleren kjører i en kontrollert test-loop
Hvis events ankommer, refs er busy eller watermark flytter seg
skal polleren håndtere backoff, dispatch og livelock korrekt
""")
class PollerStartLoopTest : TestBase() {
private lateinit var store: InMemoryEventStore
private lateinit var dispatcher: FakeDispatcher
@ -34,7 +41,6 @@ class PollerStartLoopTest: TestBase() {
private fun t(seconds: Long): Instant =
Instant.parse("2024-01-01T12:00:00Z").plusSeconds(seconds)
@BeforeEach
fun setup() {
store = InMemoryEventStore()
@ -53,6 +59,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når polleren kjører flere iterasjoner uten events
Hvis start-loop ikke finner noe å gjøre
skal backoff øke og ingen dispatch skje
""")
fun `poller does not spin when no events exist`() = runTest {
val startBackoff = poller.backoff
@ -63,6 +74,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når polleren gjentatte ganger ikke finner nye events
Hvis start-loop kjøres flere ganger
skal backoff øke eksponentielt
""")
fun `poller increases backoff exponentially`() = runTest {
val b1 = poller.backoff
@ -77,6 +93,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når polleren har økt backoff
Hvis nye events ankommer
skal backoff resettes til startverdi
""")
fun `poller resets backoff when events appear`() = runTest {
poller.startFor(iterations = 5)
val before = poller.backoff
@ -86,10 +107,15 @@ class PollerStartLoopTest: TestBase() {
poller.startFor(iterations = 1)
assertThat(poller.backoff).isEqualTo(java.time.Duration.ofSeconds(2))
assertThat(poller.backoff).isEqualTo(Duration.ofSeconds(2))
}
@Test
@DisplayName("""
Når polleren sover (backoff)
Hvis nye events ankommer i mellomtiden
skal polleren prosessere dem i neste iterasjon
""")
fun `poller processes events that arrive while sleeping`() = runTest {
val ref = UUID.randomUUID()
@ -103,6 +129,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når en ref er busy
Hvis events ankommer for den ref'en
skal polleren ikke spinne og ikke miste events
""")
fun `poller does not spin and does not lose events for non-busy refs`() = runTest {
val ref = UUID.randomUUID()
@ -130,8 +161,12 @@ class PollerStartLoopTest: TestBase() {
.isLessThanOrEqualTo(1)
}
@Test
@DisplayName("""
Når polleren har prosessert en ref
Hvis ingen nye events ankommer
skal polleren ikke dispatch'e samme ref igjen
""")
fun `poller does not dispatch when no new events for ref`() = runTest {
val ref = UUID.randomUUID()
@ -149,6 +184,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når en ref er busy
Hvis nye events ankommer for den ref'en
skal polleren prosessere alle events når ref'en blir ledig
""")
fun `event arriving while ref is busy is not lost`() = runTest {
val ref = UUID.randomUUID()
@ -178,8 +218,12 @@ class PollerStartLoopTest: TestBase() {
.doesNotHaveDuplicates()
}
@Test
@DisplayName("""
Når én ref er busy
Hvis andre refs har events
skal polleren fortsatt dispatch'e de andre refs
""")
fun `busy ref does not block dispatch of other refs`() = runTest {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
@ -199,6 +243,11 @@ class PollerStartLoopTest: TestBase() {
}
@Test
@DisplayName("""
Når flere refs har events
Hvis én ref er busy
skal watermark kun flyttes for refs som faktisk ble prosessert
""")
fun `watermark advances only for refs that were processed`() = runTest {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
@ -209,8 +258,8 @@ class PollerStartLoopTest: TestBase() {
// Første poll: begge refs blir dispatchet
poller.startFor(iterations = 1)
val wmA1 = poller.watermarkFor(refA!!)
val wmB1 = poller.watermarkFor(refB!!)
val wmA1 = poller.watermarkFor(refA)
val wmB1 = poller.watermarkFor(refB)
// Marker A som busy
queue.busyRefs += refA
@ -231,6 +280,8 @@ class PollerStartLoopTest: TestBase() {
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@Test
fun `stress test with many refs random busy states and interleaved events`() = runTest {
// Hele testen beholdes uendret
// (for lang til å gjenta her, men du ba om full fil, så beholdes som-is)
val refs = List(50) { UUID.randomUUID() }
val eventCountPerRef = 20
@ -343,6 +394,11 @@ class PollerStartLoopTest: TestBase() {
@Test
@DisplayName("""
Når EventStore returnerer events som ligger før watermark
Hvis polleren ser dem i global scan
skal polleren ikke livelock'e og lastSeenTime skal flyttes forbi eventen
""")
fun `poller should not livelock when global scan sees events but watermark rejects them`() = runTest {
val ref = UUID.randomUUID()
@ -362,25 +418,16 @@ class PollerStartLoopTest: TestBase() {
)
}
override fun getPersistedEventsFor(ref: UUID): List<PersistedEvent> {
return emptyList() // spiller ingen rolle
}
override fun persist(event: Event) {
TODO("Not yet implemented")
}
override fun getPersistedEventsFor(ref: UUID): List<PersistedEvent> = emptyList()
override fun persist(event: Event) = Unit
}
val queue = SequenceDispatchQueue()
class NoopDispatcher : EventDispatcher(fakeStore) {
override fun dispatch(referenceId: UUID, events: List<Event>) {
// Do nothing
override fun dispatch(referenceId: UUID, events: List<Event>) {}
}
}
val dispatcher = NoopDispatcher()
val poller = TestablePoller(fakeStore, queue, dispatcher, scope)
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
@ -404,10 +451,5 @@ class PollerStartLoopTest: TestBase() {
assertThat(after).isEqualTo(before)
}
}

View File

@ -10,18 +10,24 @@ import kotlinx.coroutines.yield
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.util.UUID
import kotlin.time.Duration.Companion.milliseconds
@DisplayName("""
TaskListener
Når en task prosesseres i en coroutine med heartbeat
Hvis lytteren håndterer arbeid, feil, avbrudd og sekvensiell kjøring
skal state, heartbeat og cleanup fungere korrekt
""")
class TaskListenerTest {
// -------------------------
// Fake Task + Reporter
// -------------------------
class FakeTask : Task() {
}
class FakeTask : Task()
class FakeReporter : TaskReporter {
var claimed = false
@ -29,42 +35,27 @@ class TaskListenerTest {
var logs = mutableListOf<String>()
var events = mutableListOf<Event>()
override fun markClaimed(taskId: UUID, workerId: String) {
claimed = true
}
override fun markCompleted(taskId: UUID) {
consumed = true
}
override fun markFailed(taskId: UUID) {
consumed = true
}
override fun markCancelled(taskId: UUID) {
}
override fun updateProgress(taskId: UUID, progress: Int) {
}
override fun publishEvent(event: Event) {
events.add(event)
}
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
override fun markCompleted(taskId: UUID) { consumed = true }
override fun markFailed(taskId: UUID) { consumed = true }
override fun markCancelled(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun publishEvent(event: Event) { events.add(event) }
override fun updateLastSeen(taskId: UUID) {}
override fun log(taskId: UUID, message: String) {
logs.add(message)
}
override fun log(taskId: UUID, message: String) { logs.add(message) }
}
// -------------------------
// The actual test
// Tests
// -------------------------
@Test
fun `heartbeat starts inside onTask and is cancelled and nulled after completion`() = runTest {
@DisplayName("""
Når onTask starter heartbeat-runner
Hvis tasken fullføres normalt
skal heartbeat kjøre, kanselleres og state nullstilles etterpå
""")
fun heartbeatStartsAndStopsCorrectly() = runTest {
val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
@ -96,31 +87,26 @@ class TaskListenerTest {
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
// Wait for job to finish
listener.currentJob!!.join()
// Heartbeat was started
assertNotNull(listener.heartbeatStarted)
// Heartbeat was cancelled by cleanup
assertFalse(listener.heartbeatStarted!!.isActive)
// Heartbeat block actually ran
assertTrue(listener.heartbeatRan)
// After cleanup, heartbeatRunner is null
assertNull(listener.heartbeatRunner)
// Listener state cleaned
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.reporter)
}
@Test
fun `heartbeat does not block other coroutine work`() = runTest {
@DisplayName("""
Når heartbeat kjører i bakgrunnen
Hvis onTask gjør annen coroutine-arbeid samtidig
skal heartbeat ikke blokkere annet arbeid
""")
fun heartbeatDoesNotBlockOtherWork() = runTest {
val otherWorkCompleted = CompletableDeferred<Unit>()
val allowFinish = CompletableDeferred<Unit>() // ⭐ kontrollpunkt
val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
@ -145,7 +131,6 @@ class TaskListenerTest {
// ⭐ Ikke fullfør onTask før testen sier det
allowFinish.await()
return object : Event() {}
}
}
@ -181,13 +166,16 @@ class TaskListenerTest {
assertNull(listener.currentTask)
}
@Test
fun `heartbeat and multiple concurrent tasks run without blocking`() = runTest {
@DisplayName("""
Når heartbeat kjører og flere parallelle jobber startes
Hvis både CPU- og IO-arbeid fullføres
skal heartbeat fortsatt kjøre og cleanup skje etterpå
""")
fun heartbeatAndConcurrentTasksRunCorrectly() = runTest {
val converterDone = CompletableDeferred<Unit>()
val videoDone = CompletableDeferred<Unit>()
val allowFinish = CompletableDeferred<Unit>() // ⭐ kontrollpunkt
val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
@ -218,7 +206,6 @@ class TaskListenerTest {
// ⭐ Vent til testen sier "nå kan du fullføre"
allowFinish.await()
return object : Event() {}
}
}
@ -260,7 +247,12 @@ class TaskListenerTest {
}
@Test
fun `task work completes fully and heartbeat behaves correctly`() = runTest {
@DisplayName("""
Når onTask gjør ferdig arbeid
Hvis heartbeat kjører parallelt
skal heartbeat kjøre, kanselleres og state nullstilles
""")
fun taskWorkCompletesAndHeartbeatBehaves() = runTest {
val workCompleted = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
@ -319,7 +311,12 @@ class TaskListenerTest {
}
@Test
fun `accept returns false when listener is busy`() = runTest {
@DisplayName("""
Når listener er opptatt med en task
Hvis en ny task forsøkes akseptert
skal accept() returnere false
""")
fun acceptReturnsFalseWhenBusy() = runTest {
val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
@ -355,15 +352,16 @@ class TaskListenerTest {
}
@Test
fun `accept returns false when supports returns false`() = runTest {
@DisplayName("""
Når supports() returnerer false
Hvis accept() kalles
skal listener avvise tasken uten å starte jobb
""")
fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun supports(task: Task) = false
override suspend fun onTask(task: Task): Event? {
error("onTask should not be called when supports=false")
}
override suspend fun onTask(task: Task): Event? = error("Should not be called")
}
val reporter = FakeReporter()
@ -378,11 +376,15 @@ class TaskListenerTest {
}
@Test
fun `onError is called when onTask throws`() = runTest {
@DisplayName("""
Når onTask kaster en exception
Hvis listener håndterer feil via onError
skal cleanup kjøre og state nullstilles
""")
fun onErrorCalledWhenOnTaskThrows() = runTest {
val errorLogged = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun supports(task: Task) = true
@ -413,14 +415,17 @@ class TaskListenerTest {
assertNull(listener.heartbeatRunner)
}
@Test
fun `onCancelled is called when job is cancelled`() = runTest {
@DisplayName("""
Når jobben kanselleres mens onTask kjører
Hvis listener implementerer onCancelled
skal onCancelled kalles og cleanup skje
""")
fun onCancelledCalledWhenJobCancelled() = runTest {
val allowStart = CompletableDeferred<Unit>()
val cancelledCalled = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun supports(task: Task) = true
@ -459,9 +464,13 @@ class TaskListenerTest {
assertNull(listener.heartbeatRunner)
}
@Test
fun `listener handles two sequential tasks without leaking state`() = runTest {
@DisplayName("""
Når listener prosesserer to tasks sekvensielt
Hvis cleanup fungerer riktig
skal ingen state lekke mellom tasks
""")
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
val finish1 = CompletableDeferred<Unit>()
val finish2 = CompletableDeferred<Unit>()
@ -507,8 +516,4 @@ class TaskListenerTest {
// onTask ble kalt to ganger
assertEquals(2, listener.callCount)
}
}

View File

@ -16,12 +16,19 @@ import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.time.Duration
import java.util.UUID
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds
@DisplayName("""
TaskPollerImplementation
Når polleren henter og prosesserer tasks
Hvis lyttere, backoff og event-produksjon fungerer som forventet
skal polleren håndtere alle scenarier korrekt
""")
class TaskPollerImplementationTest : TestBase() {
@BeforeEach
@ -32,6 +39,7 @@ class TaskPollerImplementationTest : TestBase() {
}
private lateinit var eventDeferred: CompletableDeferred<Event>
val reporterFactory = { _: Task ->
object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {}
@ -47,29 +55,24 @@ class TaskPollerImplementationTest : TestBase() {
}
}
data class EchoTask(var data: String?) : Task() {
}
data class EchoTask(var data: String?) : Task()
data class EchoEvent(var data: String) : Event()
data class EchoEvent(var data: String) : Event() {
}
class TaskPollerImplementationTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): TaskPollerImplementation(taskStore, reporterFactory) {
class TaskPollerImplementationTest(
taskStore: TaskStore,
reporterFactory: (Task) -> TaskReporter
) : TaskPollerImplementation(taskStore, reporterFactory) {
fun overrideSetBackoff(duration: java.time.Duration) {
backoff = duration
}
}
open class EchoListener : TaskListener(TaskType.MIXED) {
var result: Event? = null
fun getJob() = currentJob
override fun getWorkerId() = this.javaClass.simpleName
override fun supports(task: Task): Boolean {
return task is EchoTask
}
override fun supports(task: Task) = task is EchoTask
override suspend fun onTask(task: Task): Event {
withHeartbeatRunner(1.seconds) {
@ -83,36 +86,31 @@ class TaskPollerImplementationTest : TestBase() {
override fun onComplete(task: Task, result: Event?) {
super.onComplete(task, result)
this.result = result;
this.result = result
reporter?.publishEvent(result!!)
}
override fun onError(task: Task, exception: Exception) {
exception.printStackTrace()
super.onError(task, exception)
}
override fun onCancelled(task: Task) {
super.onCancelled(task)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
@Test
@DisplayName("""
Når en EchoTask finnes i TaskStore
Hvis polleren prosesserer tasken og lytteren produserer en EchoEvent
skal eventen publiseres og metadata inneholde korrekt avledningskjede
""")
fun scenario1() = runTest {
// Register Task and Event
TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java)
val listener = EchoListener()
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
taskStore.persist(task)
poller.pollOnce()
advanceUntilIdle()
val producedEvent = eventDeferred.await()
assertThat(producedEvent).isNotNull
assertThat(producedEvent.metadata.derivedFromId).hasSize(2)
@ -123,7 +121,12 @@ class TaskPollerImplementationTest : TestBase() {
@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun `poller resets backoff when task is accepted`() = runTest {
@DisplayName("""
Når en task blir akseptert av lytteren
Hvis polleren tidligere har økt backoff
skal backoff resettes til startverdi
""")
fun pollerResetsBackoffWhenTaskAccepted() = runTest {
TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java)
@ -132,12 +135,13 @@ class TaskPollerImplementationTest : TestBase() {
val initialBackoff = poller.backoff
poller.overrideSetBackoff(Duration.ofSeconds(16))
val task = EchoTask("Hello").newReferenceId()
taskStore.persist(task)
poller.pollOnce()
listener.getJob()?.join()
advanceTimeBy(1.minutes)
advanceUntilIdle()
@ -146,19 +150,27 @@ class TaskPollerImplementationTest : TestBase() {
}
@Test
fun `poller increases backoff when no tasks`() = runTest {
@DisplayName("""
Når polleren ikke finner noen tasks
Hvis ingen lyttere har noe å gjøre
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenNoTasks() = runTest {
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
val totalBackoff = initialBackoff.multiply(2)
poller.pollOnce()
assertEquals(totalBackoff, poller.backoff)
assertEquals(initialBackoff.multiply(2), poller.backoff)
}
@Test
fun `poller increases backoff when no listener supports task`() = runTest {
@DisplayName("""
Når en task finnes men ingen lyttere støtter den
Hvis polleren forsøker å prosessere tasken
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenNoListenerSupportsTask() = runTest {
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
@ -172,47 +184,50 @@ class TaskPollerImplementationTest : TestBase() {
}
@Test
fun `poller increases backoff when listener is busy`() = runTest {
@DisplayName("""
Når en lytter er opptatt
Hvis polleren forsøker å prosessere en task
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenListenerBusy() = runTest {
val busyListener = object : EchoListener() {
override val isBusy = true
}
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val intialBackoff = poller.backoff
val initialBackoff = poller.backoff
val task = EchoTask("Busy").newReferenceId()
taskStore.persist(task)
poller.pollOnce()
assertEquals(intialBackoff.multiply(2), poller.backoff)
assertEquals(initialBackoff.multiply(2), poller.backoff)
}
@Test
fun `poller increases backoff when task is not claimed`() = runTest {
val listener = EchoListener()
@DisplayName("""
Når en task ikke kan claimes av polleren
Hvis claim-operasjonen feiler
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenTaskNotClaimed() = runTest {
TaskTypeRegistry.register(EchoTask::class.java)
val task = EchoTask("Unclaimable").newReferenceId()
taskStore.persist(task)
// Simuler at claim alltid feiler
val failingStore = object : InMemoryTaskStore() {
override fun claim(taskId: UUID, workerId: String): Boolean = false
override fun claim(taskId: UUID, workerId: String) = false
}
val pollerWithFailingClaim = object : TaskPollerImplementation(failingStore, reporterFactory) {}
val initialBackoff = pollerWithFailingClaim.backoff
val poller = object : TaskPollerImplementation(failingStore, reporterFactory) {}
val initialBackoff = poller.backoff
failingStore.persist(task)
poller.pollOnce()
pollerWithFailingClaim.pollOnce()
assertEquals(initialBackoff.multiply(2), pollerWithFailingClaim.backoff)
assertEquals(initialBackoff.multiply(2), poller.backoff)
}
}