Compare commits

...

17 Commits

22 changed files with 1024 additions and 578 deletions

29
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,29 @@
name: Run Unit Tests
on:
push:
branches:
- "**"
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'zulu'
- name: Setup Gradle
uses: gradle/gradle-build-action@v3
- name: Make gradlew executable
run: chmod +x ./gradlew
- name: Run unit tests
run: ./gradlew test --stacktrace

View File

@ -2,6 +2,7 @@ package no.iktdev.eventi.events
import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.util.UUID import java.util.UUID
@ -11,12 +12,18 @@ open class EventDispatcher(val eventStore: EventStore) {
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet() val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId } val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
val candidates = events val candidates = events
.filterNot { it is SignalEvent }
.filter { it.eventId !in derivedFromIds } .filter { it.eventId !in derivedFromIds }
.filter { it.eventId !in deletedEventIds } .filter { it.eventId !in deletedEventIds }
val effectiveHistory = events
.filter { it.eventId !in deletedEventIds } // fjern slettede events
.filterNot { it is DeleteEvent } // fjern selve delete-eventet
EventListenerRegistry.getListeners().forEach { listener -> EventListenerRegistry.getListeners().forEach { listener ->
for (candidate in candidates) { for (candidate in candidates) {
val result = listener.onEvent(candidate, events) val result = listener.onEvent(candidate, effectiveHistory)
if (result != null) { if (result != null) {
eventStore.persist(result) eventStore.persist(result)
} }

View File

@ -8,23 +8,30 @@ import no.iktdev.eventi.stores.EventStore
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import kotlin.collections.iterator
abstract class EventPollerImplementation( abstract class EventPollerImplementation(
private val eventStore: EventStore, private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue, private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher private val dispatcher: EventDispatcher
) { ) {
// Erstatter ikke lastSeenTime, men supplerer den private val log = KotlinLogging.logger {}
protected val refWatermark = mutableMapOf<UUID, Instant>()
// lastSeenTime brukes kun som scan hint /**
* Per-reference watermark:
* - first = last seen persistedAt
* - second = last seen persistedId
*/
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
/**
* Global scan hint (timestamp only).
* Used to avoid scanning entire table every time.
*/
var lastSeenTime: Instant = Instant.EPOCH var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() { open suspend fun start() {
log.info { "EventPoller starting with initial backoff=$backoff" } log.info { "EventPoller starting with initial backoff=$backoff" }
@ -32,7 +39,7 @@ abstract class EventPollerImplementation(
try { try {
pollOnce() pollOnce()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() log.error(e) { "Error in poller loop" }
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
} }
@ -43,11 +50,11 @@ abstract class EventPollerImplementation(
val pollStartedAt = MyTime.utcNow() val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" } log.debug { "🔍 Polling for new events" }
// Global scan hint: kombiner refWatermark og lastSeenTime // Determine global scan start
val watermarkMin = refWatermark.values.minOrNull() val minRefTs = refWatermark.values.minOfOrNull { it.first }
val scanFrom = when (watermarkMin) { val scanFrom = when (minRefTs) {
null -> lastSeenTime null -> lastSeenTime
else -> maxOf(lastSeenTime, watermarkMin) else -> maxOf(lastSeenTime, minRefTs)
} }
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
@ -59,76 +66,73 @@ abstract class EventPollerImplementation(
return return
} }
// Vi har sett nye events globalt reset backoff // Reset backoff
backoff = Duration.ofSeconds(2) backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" } log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false var anyProcessed = false
// Track høyeste persistedAt vi har sett i denne runden // Track highest persistedAt seen globally this round
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt } val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) { for ((ref, eventsForRef) in grouped) {
val refSeen = refWatermark[ref] ?: Instant.EPOCH val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
// Filter new events using (timestamp, id) ordering
val newForRef = eventsForRef.filter { ev ->
ev.persistedAt > refSeenAt ||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
}
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
if (newForRef.isEmpty()) { if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since $refSeen" } log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
continue continue
} }
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch // If ref is busy, skip dispatch
if (dispatchQueue.isProcessing(ref)) { if (dispatchQueue.isProcessing(ref)) {
log.debug { "$ref is busy — deferring ${newForRef.size} events" } log.debug { "$ref is busy — deferring ${newForRef.size} events" }
continue continue
} }
// Hent full sekvens for ref (Eventi-invariant) // Fetch full sequence for dispatch
val fullLog = eventStore.getPersistedEventsFor(ref) val fullLog = eventStore.getPersistedEventsFor(ref)
val events = fullLog.mapNotNull { it.toEvent() } val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "🚀 Dispatching ${events.size} events for $ref" } log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher) dispatchQueue.dispatch(ref, events, dispatcher)
// Oppdater watermark for denne refen // Update watermark for this reference
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt } val maxEvent = newForRef.maxWith(
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1) compareBy({ it.persistedAt }, { it.id })
)
refWatermark[ref] = newWatermark val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
val newWatermarkId = maxEvent.id
refWatermark[ref] = newWatermarkAt to newWatermarkId
anyProcessed = true anyProcessed = true
log.debug { "⏩ Updated watermark for $ref$newWatermark" } log.debug { "⏩ Updated watermark for $ref($newWatermarkAt, id=$newWatermarkId)" }
} }
// Oppdater global scan hint uansett vi har sett nye events // Update global scan hint
// Dette hindrer livelock når alle events er <= watermark for sine refs
val newLastSeen = maxOf( val newLastSeen = maxOf(
lastSeenTime, lastSeenTime,
maxPersistedThisRound.plusNanos(1) maxPersistedThisRound.plusNanos(1)
) )
if (anyProcessed) { if (anyProcessed) {
// Behold intensjonen din: globalt hint basert på laveste watermark, val minRef = refWatermark.values.minOfOrNull { it.first }
// men aldri gå bakover i tid ift lastSeenTime lastSeenTime = when (minRef) {
val minRefWatermark = refWatermark.values.minOrNull()
lastSeenTime = when (minRefWatermark) {
null -> newLastSeen null -> newLastSeen
else -> maxOf(newLastSeen, minRefWatermark) else -> maxOf(newLastSeen, minRef)
} }
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" } log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
} else { } else {
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
// → trygt å flytte lastSeenTime forbi dem
lastSeenTime = newLastSeen lastSeenTime = newLastSeen
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" } log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
} }
} }
} }

View File

@ -4,7 +4,7 @@ import java.util.UUID
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
abstract class Event { abstract class Event {
var referenceId: UUID = UUID.randomUUID() lateinit var referenceId: UUID
protected set protected set
var eventId: UUID = UUID.randomUUID() var eventId: UUID = UUID.randomUUID()
private set private set
@ -39,9 +39,10 @@ inline fun <reified T> Event.requireAs(): T {
return this as? T ?: throw IllegalArgumentException("Expected ${T::class.java.name}, got ${this::class.java.name}") return this as? T ?: throw IllegalArgumentException("Expected ${T::class.java.name}, got ${this::class.java.name}")
} }
abstract class DeleteEvent: Event() { abstract class DeleteEvent(
open lateinit var deletedEventId: UUID open val deletedEventId: UUID
} ) : Event()
abstract class SignalEvent(): Event()

View File

@ -21,5 +21,6 @@ enum class TaskStatus {
Pending, Pending,
InProgress, InProgress,
Completed, Completed,
Failed Failed,
Cancelled
} }

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.jetbrains.annotations.VisibleForTesting import org.jetbrains.annotations.VisibleForTesting
import java.util.UUID import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.cancellation.CancellationException
@ -67,9 +68,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch { currentJob = getDispatcherForTask(task).launch {
try { try {
val result = onTask(task) val result = onTask(task)
reporter.markCompleted(task.taskId)
onComplete(task, result) onComplete(task, result)
} catch (e: CancellationException) { } catch (e: CancellationException) {
// Dette er en ekte kansellering // Dette er en ekte kansellering
onCancelled(task) onCancelled(task)
@ -88,14 +87,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
this@TaskListener.reporter = null this@TaskListener.reporter = null
} }
} }
return true return true
} }
abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event
override fun onError(task: Task, exception: Exception) { override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}") reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace() exception.printStackTrace()
reporter?.markCompleted(task.taskId) reporter?.markFailed(task.referenceId, task.taskId)
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception))
} }
override fun onComplete(task: Task, result: Event?) { override fun onComplete(task: Task, result: Event?) {
@ -107,10 +108,11 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
} }
override fun onCancelled(task: Task) { override fun onCancelled(task: Task) {
reporter!!.markCancelled(task.taskId) reporter!!.markCancelled(task.referenceId, task.taskId)
currentJob?.cancel() currentJob?.cancel()
heartbeatRunner?.cancel() heartbeatRunner?.cancel()
currentTask = null currentTask = null
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled))
} }
} }
@ -134,8 +136,8 @@ interface TaskReporter {
fun markClaimed(taskId: UUID, workerId: String) fun markClaimed(taskId: UUID, workerId: String)
fun updateLastSeen(taskId: UUID) fun updateLastSeen(taskId: UUID)
fun markCompleted(taskId: UUID) fun markCompleted(taskId: UUID)
fun markFailed(taskId: UUID) fun markFailed(referenceId: UUID, taskId: UUID)
fun markCancelled(taskId: UUID) fun markCancelled(referenceId: UUID, taskId: UUID)
fun updateProgress(taskId: UUID, progress: Int) fun updateProgress(taskId: UUID, progress: Int)
fun log(taskId: UUID, message: String) fun log(taskId: UUID, message: String)
fun publishEvent(event: Event) fun publishEvent(event: Event)

View File

@ -8,7 +8,7 @@ import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
@ -20,40 +20,51 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
@DisplayName(
"""
EventDispatcher
Når hendelser dispatches til lyttere
Hvis hendelsene inneholder avledede, slettede eller nye events
skal dispatcheren håndtere filtrering, replays og historikk korrekt
"""
)
class EventDispatcherTest : TestBase() { class EventDispatcherTest : TestBase() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
class DerivedEvent(): Event() class DerivedEvent : Event()
class TriggerEvent(): Event() { class TriggerEvent : Event()
class OtherEvent : Event()
class DummyEvent : Event() {
} }
class OtherEvent(): Event()
class DummyEvent(): Event() {
fun putMetadata(metadata: Metadata) {
this.metadata = metadata
}
}
@BeforeEach @BeforeEach
fun setup() { fun setup() {
EventTypeRegistry.wipe() EventTypeRegistry.wipe()
EventListenerRegistry.wipe() EventListenerRegistry.wipe()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf( EventTypeRegistry.register(
listOf(
DerivedEvent::class.java, DerivedEvent::class.java,
TriggerEvent::class.java, TriggerEvent::class.java,
OtherEvent::class.java, OtherEvent::class.java,
DummyEvent::class.java DummyEvent::class.java
)) )
)
} }
@Test @Test
fun `should produce one event and stop`() { @DisplayName(
val listener = ProducingListener() """
Når en TriggerEvent dispatches
Hvis en lytter produserer én DerivedEvent
skal kun én ny event produseres og prosessen stoppe
"""
)
fun shouldProduceOneEventAndStop() {
ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val produced = eventStore.all().firstOrNull() val produced = eventStore.all().firstOrNull()
@ -61,52 +72,85 @@ class EventDispatcherTest: TestBase() {
val event = produced!!.toEvent() val event = produced!!.toEvent()
assertThat(event!!.metadata.derivedFromId).hasSize(1) assertThat(event!!.metadata.derivedFromId).hasSize(1)
assertThat(event!!.metadata.derivedFromId).contains(trigger.eventId) assertThat(event.metadata.derivedFromId).contains(trigger.eventId)
assertTrue(event is DerivedEvent) assertTrue(event is DerivedEvent)
} }
@Test @Test
fun `should skip already derived events`() { @DisplayName(
val listener = ProducingListener() """
Når en event allerede har avledet en DerivedEvent
Hvis dispatcheren replays historikken
skal ikke DerivedEvent produseres nytt
"""
)
fun shouldSkipAlreadyDerivedEvents() {
ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow()) val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
eventStore.persist(derived!!.toEvent()!!) // simulate prior production eventStore.persist(derived!!.toEvent()!!) // simulate prior production
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived!!.toEvent()!!)) dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent()!!))
assertEquals(1, eventStore.all().size) // no new event produced assertEquals(1, eventStore.all().size)
} }
@Test @Test
fun `should pass full context to listener`() { @DisplayName(
"""
Når flere events dispatches
Hvis en lytter mottar en event
skal hele historikken leveres i context
"""
)
fun shouldPassFullContextToListener() {
val listener = ContextCapturingListener() val listener = ContextCapturingListener()
val e1 = TriggerEvent() val e1 = TriggerEvent().newReferenceId()
val e2 = OtherEvent() val e2 = OtherEvent().newReferenceId()
dispatcher.dispatch(e1.referenceId, listOf(e1, e2)) dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
assertEquals(2, listener.context.size) assertEquals(2, listener.context.size)
} }
@Test @Test
fun `should behave deterministically across replays`() { @DisplayName(
val listener = ProducingListener() """
Når en replay skjer
Hvis en event allerede har produsert en DerivedEvent
skal ikke DerivedEvent produseres nytt
"""
)
fun shouldBehaveDeterministicallyAcrossReplays() {
val referenceId = UUID.randomUUID()
val trigger = TriggerEvent() ProducingListener()
val trigger = TriggerEvent().usingReferenceId(referenceId)
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() } val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
dispatcher.dispatch(trigger.referenceId, replayContext) dispatcher.dispatch(trigger.referenceId, replayContext)
assertEquals(1, eventStore.all().size) // no duplicate assertEquals(1, eventStore.all().size)
} }
@Test @Test
fun `should not deliver deleted events as candidates`() { @DisplayName(
"""
Når en DeleteEvent peker en tidligere event
Hvis dispatcheren filtrerer kandidater
skal slettede events ikke leveres som kandidater
"""
)
fun shouldNotDeliverDeletedEventsAsCandidates() {
val referenceId = UUID.randomUUID()
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
object : EventListener() { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
received += event received += event
@ -114,12 +158,10 @@ class EventDispatcherTest: TestBase() {
} }
} }
// Original hendelse // Original hendelse
val original = TriggerEvent() val original = TriggerEvent().usingReferenceId(referenceId)
// Slettehendelse som peker på original // Slettehendelse som peker på original
val deleted = object : DeleteEvent() { val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
override var deletedEventId = original.eventId
}
// Dispatch med begge hendelser // Dispatch med begge hendelser
dispatcher.dispatch(original.referenceId, listOf(original, deleted)) dispatcher.dispatch(original.referenceId, listOf(original, deleted))
@ -136,29 +178,42 @@ class EventDispatcherTest: TestBase() {
} }
@Test @Test
fun `should deliver DeleteEvent to listeners that react to it`() { @DisplayName(
"""
Når en DeleteEvent dispatches alene
Hvis en lytter reagerer DeleteEvent
skal DeleteEvent leveres som kandidat
"""
)
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
val listener = object : EventListener() { val referenceId = UUID.randomUUID()
override fun onEvent(event: Event, context: List<Event>): Event? {
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
if (event is DeleteEvent) received += event if (event is DeleteEvent) received += event
return null return null
} }
} }
val deleted = object : DeleteEvent() { val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) }
override var deletedEventId = UUID.randomUUID()
}
dispatcher.dispatch(deleted.referenceId, listOf(deleted)) dispatcher.dispatch(deleted.referenceId, listOf(deleted))
assertTrue(received.contains(deleted)) assertTrue(received.contains(deleted))
} }
@Test @Test
@DisplayName("Replay skal ikke levere en event som allerede har avledet en ny") @DisplayName(
fun `should not re-deliver events that have produced derived events`() { """
val listener = ProducingListener() Når en event har avledet en ny event
Hvis dispatcheren replays historikken
skal ikke original-eventen leveres som kandidat igjen
"""
)
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
// Første dispatch: trigger produserer en DerivedEvent // Første dispatch: trigger produserer en DerivedEvent
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
@ -177,20 +232,157 @@ class EventDispatcherTest: TestBase() {
} }
} }
@Test
@DisplayName(
"""
Når en DeleteEvent slettet en tidligere event
Hvis dispatcheren bygger historikk
skal slettede events ikke være med i history
"""
)
fun historyShouldExcludeDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) }
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
}
}
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
assertFalse(receivedHistory.contains(original))
assertFalse(receivedHistory.contains(deleted))
}
@Test
@DisplayName(
"""
Når en DeleteEvent slettet en event
Hvis andre events fortsatt er gyldige
skal history kun inneholde de ikke-slettede events
"""
)
fun historyShouldKeepNonDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val referenceId = UUID.randomUUID()
val e1 = TriggerEvent().usingReferenceId(referenceId)
val e2 = OtherEvent().usingReferenceId(referenceId)
val deleted = object : DeleteEvent(e1.eventId) {}
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
}
}
dispatcher.dispatch(e1.referenceId, listOf(e1, e2, deleted))
assertTrue(receivedHistory.contains(e2))
assertFalse(receivedHistory.contains(e1))
assertFalse(receivedHistory.contains(deleted))
}
@Test
@DisplayName(
"""
Når en DeleteEvent er kandidat
Hvis historikken kun inneholder slettede events
skal history være tom
"""
)
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
var receivedEvent: Event? = null
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedEvent = event
receivedHistory = history
return null
}
}
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
assertTrue(receivedEvent is DeleteEvent)
assertTrue(receivedHistory.isEmpty())
}
@Test
@DisplayName(
"""
Når en SignalEvent dispatches
Hvis SignalEvent ikke skal være kandidat
skal den ikke leveres til lyttere, men fortsatt være i historikken
"""
)
fun shouldNotDeliverSignalEventAsCandidate() {
// Arrange
class TestSignalEvent : SignalEvent()
EventTypeRegistry.register(listOf(TestSignalEvent::class.java,))
val received = mutableListOf<Event>()
var finalHistory: List<Event>? = null
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
received += event
finalHistory = history
return null
}
}
val refId = UUID.randomUUID()
val trigger = TriggerEvent().usingReferenceId(refId)
val signal = TestSignalEvent().usingReferenceId(refId)
// Act
dispatcher.dispatch(trigger.referenceId, listOf(trigger, signal))
// Assert
// 1) TriggerEvent skal leveres
assertTrue(received.any { it is TriggerEvent }) {
"TriggerEvent skal leveres som kandidat"
}
// 2) SignalEvent skal IKKE leveres
assertFalse(received.any { it is TestSignalEvent }) {
"SignalEvent skal ikke leveres som kandidat"
}
assertNotNull(finalHistory)
assertTrue(finalHistory!!.any { it is TestSignalEvent }) {
"SignalEvent skal være i historikken selv om den ikke er kandidat"
}
}
// --- Test helpers --- // --- Test helpers ---
class ProducingListener : EventListener() { class ProducingListener : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
return if (event is TriggerEvent) DerivedEvent().derivedOf(event) else null return if (event is TriggerEvent) DerivedEvent().derivedOf(event) else null
} }
} }
class ContextCapturingListener : EventListener() { class ContextCapturingListener : EventListener() {
var context: List<Event> = emptyList() var context: List<Event> = emptyList()
override fun onEvent(event: Event, context: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
this.context = context this.context = history
return null return null
} }
} }

View File

@ -13,28 +13,38 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@DisplayName("""
ZDS Serialization/Deserialization System
Når Event- og Task-objekter persisteres og gjenopprettes
Hvis type-registrene er korrekt konfigurert
skal ZDS kunne serialisere og deserialisere objektene uten tap av data
""")
class ZDSTest { class ZDSTest {
@BeforeEach @BeforeEach
fun setup() { fun setup() {
EventTypeRegistry.wipe() EventTypeRegistry.wipe()
TaskTypeRegistry.wipe() TaskTypeRegistry.wipe()
// Verifiser at det er tomt // Verifiser at det er tomt
assertNull(EventTypeRegistry.resolve("SomeEvent")) assertNull(EventTypeRegistry.resolve("SomeEvent"))
} }
@Test @Test
@DisplayName("Test ZDS with Event object") @DisplayName("""
Når et Event-objekt persisteres via ZDS
Hvis typen er registrert i EventTypeRegistry
skal det kunne gjenopprettes som riktig Event-type med samme data
""")
fun scenario1() { fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java) EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello") val echo = EchoEvent("hello").newReferenceId()
val persisted = echo.toPersisted(id = 1L) val persisted = echo.toPersisted(id = 1L)
val restored = persisted!!.toEvent() val restored = persisted!!.toEvent()
assert(restored is EchoEvent) assert(restored is EchoEvent)
assert((restored as EchoEvent).data == "hello") assert((restored as EchoEvent).data == "hello")
} }
data class TestTask( data class TestTask(
@ -42,9 +52,12 @@ class ZDSTest {
) : Task() ) : Task()
@Test @Test
@DisplayName("Test ZDS with Task object") @DisplayName("""
Når et Task-objekt persisteres via ZDS
Hvis typen er registrert i TaskTypeRegistry
skal det kunne gjenopprettes som riktig Task-type med metadata intakt
""")
fun scenario2() { fun scenario2() {
TaskTypeRegistry.register(TestTask::class.java) TaskTypeRegistry.register(TestTask::class.java)
val task = TestTask("Potato") val task = TestTask("Potato")
@ -57,7 +70,5 @@ class ZDSTest {
assert((restored as TestTask).data == "Potato") assert((restored as TestTask).data == "Potato")
assert(restored.metadata.created == task.metadata.created) assert(restored.metadata.created == task.metadata.created)
assert(restored.metadata.derivedFromId == task.metadata.derivedFromId) assert(restored.metadata.derivedFromId == task.metadata.derivedFromId)
} }
} }

View File

@ -6,35 +6,34 @@ import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@DisplayName("""
EventListenerRegistry
Når lyttere registreres med og uten @ListenerOrder
Hvis registry sorterer dem etter annotasjonen
skal rekkefølgen være deterministisk og korrekt
""")
class EventListenerRegistryTest { class EventListenerRegistryTest {
@ListenerOrder(1) @ListenerOrder(1)
class MockTest1() : EventListener() { class MockTest1 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? = null
return null
}
} }
@ListenerOrder(2) @ListenerOrder(2)
class MockTest2() : EventListener() { class MockTest2 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? = null
return null
}
} }
@ListenerOrder(3) @ListenerOrder(3)
class MockTest3() : EventListener() { class MockTest3 : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? = null
return null
}
} }
class MockTestRandom() : EventListener() { class MockTestRandom : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? = null
return null
}
} }
@BeforeEach @BeforeEach
@ -43,13 +42,19 @@ class EventListenerRegistryTest {
} }
@Test @Test
@DisplayName("""
Når flere lyttere registreres i vilkårlig rekkefølge
Hvis noen har @ListenerOrder og andre ikke
skal registry returnere dem sortert etter order, og usorterte sist
""")
fun validateOrder() { fun validateOrder() {
MockTestRandom() MockTestRandom()
MockTest1() MockTest1()
MockTest2() MockTest2()
MockTest3() MockTest3()
val listeners = EventListenerRegistry.getListeners() val listeners = EventListenerRegistry.getListeners()
// Assert
assertThat(listeners.map { it::class.simpleName }).containsExactly( assertThat(listeners.map { it::class.simpleName }).containsExactly(
MockTest1::class.simpleName, // @ListenerOrder(1) MockTest1::class.simpleName, // @ListenerOrder(1)
MockTest2::class.simpleName, // @ListenerOrder(2) MockTest2::class.simpleName, // @ListenerOrder(2)
@ -57,5 +62,4 @@ class EventListenerRegistryTest {
MockTestRandom::class.simpleName // no annotation → goes last MockTestRandom::class.simpleName // no annotation → goes last
) )
} }
} }

View File

@ -1,80 +1,107 @@
package no.iktdev.eventi.events package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitAll import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeout
import no.iktdev.eventi.EventDispatcherTest import kotlinx.coroutines.awaitAll
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.MyTime import no.iktdev.eventi.MyTime
import no.iktdev.eventi.TestBase import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.time.Duration import java.time.Duration
import java.util.UUID import java.util.UUID
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@DisplayName("""
EventPollerImplementation
Når polleren leser nye events fra EventStore og samarbeider med SequenceDispatchQueue
Hvis nye events ankommer, køen er travel, eller duplikater dukker opp
skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
""")
class EventPollerImplementationTest : TestBase() { class EventPollerImplementationTest : TestBase() {
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} private val dispatcher = EventDispatcher(eventStore)
@BeforeEach @BeforeEach
fun setup() { fun setup() {
EventTypeRegistry.wipe() EventTypeRegistry.wipe()
EventListenerRegistry.wipe() EventListenerRegistry.wipe()
eventStore.clear() eventStore.clear()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf( EventTypeRegistry.register(
listOf(
DerivedEvent::class.java, DerivedEvent::class.java,
TriggerEvent::class.java, TriggerEvent::class.java,
OtherEvent::class.java OtherEvent::class.java
)) )
)
} }
@Test @Test
fun `pollOnce should dispatch all new referenceIds and update lastSeenTime`() = runTest { @DisplayName("""
Når polleren finner nye referenceId-er med events
Hvis pollOnce kjøres
skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
""")
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val dispatched = ConcurrentHashMap.newKeySet<UUID>() val dispatched = ConcurrentHashMap.newKeySet<UUID>()
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>() val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
EventListenerRegistry.registerListener(object : EventListener() { EventListenerRegistry.registerListener(
override fun onEvent(event: Event, context: List<Event>): Event? { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
dispatched += event.referenceId dispatched += event.referenceId
completionMap[event.referenceId]?.complete(Unit) completionMap[event.referenceId]?.complete(Unit)
return null return null
} }
}) }
)
val referenceIds = (1..10).map { UUID.randomUUID() } val referenceIds = (1..10).map { UUID.randomUUID() }
referenceIds.forEach { refId -> referenceIds.forEach { refId ->
val e = EventDispatcherTest.TriggerEvent().usingReferenceId(refId) val e = TriggerEvent().usingReferenceId(refId)
eventStore.persist(e) // persistedAt settes automatisk her eventStore.persist(e)
completionMap[refId] = CompletableDeferred() completionMap[refId] = CompletableDeferred()
} }
poller.pollOnce() poller.pollOnce()
completionMap.values.awaitAll() completionMap.values.awaitAll()
assertEquals(referenceIds.toSet(), dispatched) assertEquals(referenceIds.toSet(), dispatched)
} }
@Test @Test
fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest { @DisplayName("""
Når polleren ikke finner nye events
Hvis pollOnce kjøres flere ganger
skal backoff øke, og resettes når nye events ankommer
""")
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff fun currentBackoff(): Duration = backoff
} }
@ -97,20 +124,28 @@ class EventPollerImplementationTest : TestBase() {
} }
@Test @Test
fun `pollOnce should group and dispatch exactly 3 events for one referenceId`() = runTest { @DisplayName("""
Når flere events med samme referenceId ligger i EventStore
Hvis pollOnce kjøres
skal polleren gruppere og dispatch'e alle tre i én batch
""")
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val refId = UUID.randomUUID() val refId = UUID.randomUUID()
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
val done = CompletableDeferred<Unit>() val done = CompletableDeferred<Unit>()
// Wipe alt før test
EventTypeRegistry.wipe() EventTypeRegistry.wipe()
EventListenerRegistry.wipe() EventListenerRegistry.wipe()
eventStore.clear() // sørg for at InMemoryEventStore støtter dette eventStore.clear()
EventTypeRegistry.register(listOf(TriggerEvent::class.java)) EventTypeRegistry.register(listOf(TriggerEvent::class.java))
object : EventListener() { object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
received += event received += event
if (received.size == 3) done.complete(Unit) if (received.size == 3) done.complete(Unit)
return null return null
@ -122,18 +157,21 @@ class EventPollerImplementationTest : TestBase() {
} }
poller.pollOnce() poller.pollOnce()
done.await() done.await()
assertEquals(3, received.size) assertEquals(3, received.size)
assertTrue(received.all { it.referenceId == refId }) assertTrue(received.all { it.referenceId == refId })
} }
@Test @Test
fun `pollOnce should ignore events before lastSeenTime`() = runTest { @DisplayName("""
val refId = UUID.randomUUID() Når polleren har en lastSeenTime i fremtiden
val ignored = TriggerEvent().usingReferenceId(refId) Hvis events ankommer med eldre timestamp
skal polleren ignorere dem
""")
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
init { init {
@ -141,8 +179,10 @@ class EventPollerImplementationTest : TestBase() {
} }
} }
eventStore.persist(ignored) val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
eventStore.persist(ignored)
testPoller.pollOnce() testPoller.pollOnce()
assertFalse(queue.isProcessing(refId)) assertFalse(queue.isProcessing(refId))
@ -150,61 +190,53 @@ class EventPollerImplementationTest : TestBase() {
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
@Test @Test
fun `poller handles manually injected duplicate event`() = runTest { @DisplayName("""
Når en duplikat-event injiseres manuelt i EventStore
Hvis polleren kjører igjen
skal begge events prosesseres, men uten å produsere duplikate derived events
""")
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java)) EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
val channel = Channel<Event>(Channel.UNLIMITED) val channel = Channel<Event>(Channel.UNLIMITED)
val handled = mutableListOf<Event>() val handled = mutableListOf<Event>()
// Setup
object : EventListener() { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? { if (event !is EchoEvent) return null
if (event !is EchoEvent)
return null
handled += event handled += event
channel.trySend(event) channel.trySend(event)
return MarcoEvent(true).derivedOf(event) return MarcoEvent(true).derivedOf(event)
} }
} }
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) { val original = EchoEvent("Hello").newReferenceId()
}
// Original event
val original = EchoEvent(data = "Hello")
eventStore.persist(original) eventStore.persist(original)
// Act
poller.pollOnce() poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) { withContext(testDispatcher) {
repeat(1) { channel.receive() } withTimeout(60_000) {
channel.receive()
} }
} }
// Manual replay with new eventId, same referenceId
val duplicateEvent = EchoEvent("Test me").usingReferenceId(original.referenceId) val duplicateEvent = EchoEvent("Test me").usingReferenceId(original.referenceId)
eventStore.persist(duplicateEvent) eventStore.persist(duplicateEvent)
// Act
poller.pollOnce() poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) { withContext(testDispatcher) {
withTimeout(Duration.ofMinutes(1).toMillis()) { withTimeout(60_000) {
repeat(1) { channel.receive() } channel.receive()
} }
} }
// Assert
assertEquals(2, handled.size) assertEquals(2, handled.size)
assertTrue(handled.any { it.eventId == original.eventId }) assertTrue(handled.any { it.eventId == original.eventId })
} }
} }

View File

@ -1,7 +1,7 @@
@file:OptIn(ExperimentalCoroutinesApi::class)
package no.iktdev.eventi.events package no.iktdev.eventi.events
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.test.* import kotlinx.coroutines.test.*
import no.iktdev.eventi.InMemoryEventStore import no.iktdev.eventi.InMemoryEventStore
@ -9,11 +9,9 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
import kotlinx.coroutines.*
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata import no.iktdev.eventi.models.Metadata
import org.junit.jupiter.api.DisplayName
import java.time.Instant import java.time.Instant
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@ -60,13 +58,12 @@ class TestEvent : Event() {
} }
class FakeClock(var now: Instant) { @DisplayName("""
fun advanceSeconds(sec: Long) { EventPollerImplementation simulert og dispatch
now = MyTime.utcNow().plusSeconds(sec) Når polleren leser events fra EventStore og samarbeider med SequenceDispatchQueue
} Hvis køen er ledig, travel, eller events ankommer i ulike tidsrekkefølger
} skal polleren oppdatere lastSeenTime, unngå duplikater og prosessere riktig
""")
class RunSimulationTestTest { class RunSimulationTestTest {
private lateinit var store: InMemoryEventStore private lateinit var store: InMemoryEventStore
@ -89,17 +86,21 @@ class RunSimulationTestTest {
} }
} }
private fun persistEvent(ref: UUID, time: Instant) { private fun persistEvent(ref: UUID) {
val e = TestEvent().withReference(ref) val e = TestEvent().withReference(ref)
store.persist(e.setMetadata(Metadata())) store.persist(e.setMetadata(Metadata()))
} }
@Test @Test
fun `poller updates lastSeenTime when dispatch happens`() = runTest(testDispatcher) { @DisplayName("""
Når polleren finner nye events
Hvis dispatch skjer normalt
skal lastSeenTime oppdateres og dispatcheren én dispatch
""")
fun pollerUpdatesLastSeenTimeWhenDispatchHappens() = runTest(testDispatcher) {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref, t) persistEvent(ref)
poller.pollOnce() poller.pollOnce()
advanceUntilIdle() advanceUntilIdle()
@ -108,21 +109,24 @@ class RunSimulationTestTest {
assertThat(dispatcher.dispatched).hasSize(1) assertThat(dispatcher.dispatched).hasSize(1)
} }
class AlwaysBusyDispatchQueue : SequenceDispatchQueue(8, CoroutineScope(Dispatchers.Default)) { class AlwaysBusyDispatchQueue : SequenceDispatchQueue(8, CoroutineScope(Dispatchers.Default)) {
override fun isProcessing(referenceId: UUID): Boolean = true override fun isProcessing(referenceId: UUID): Boolean = true
override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher) = null override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher) = null
} }
@Test @Test
fun `poller DOES update lastSeenTime even when queue is busy`() = runTest { @DisplayName("""
Når køen er travel og ikke kan dispatch'e
Hvis polleren likevel ser nye events
skal lastSeenTime fortsatt oppdateres (livelock-fix)
""")
fun pollerUpdatesLastSeenTimeEvenWhenQueueBusy() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z") val t = Instant.parse("2026-01-22T12:00:00Z")
store.persistAt(TestEvent().withReference(ref), t) store.persistAt(TestEvent().withReference(ref), t)
val busyQueue = AlwaysBusyDispatchQueue() val busyQueue = AlwaysBusyDispatchQueue()
val poller = object : EventPollerImplementation(store, busyQueue, dispatcher) {} val poller = object : EventPollerImplementation(store, busyQueue, dispatcher) {}
poller.pollOnce() poller.pollOnce()
@ -133,15 +137,16 @@ class RunSimulationTestTest {
.isGreaterThan(t) .isGreaterThan(t)
} }
@Test @Test
fun `poller does not double-dispatch`() = runTest(testDispatcher) { @DisplayName("""
Når polleren kjører flere ganger uten nye events
Hvis første poll allerede dispatch'et eventet
skal polleren ikke dispatch'e samme event to ganger
""")
fun pollerDoesNotDoubleDispatch() = runTest(testDispatcher) {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref, t) persistEvent(ref)
poller.pollOnce() poller.pollOnce()
advanceUntilIdle() advanceUntilIdle()
@ -153,13 +158,17 @@ class RunSimulationTestTest {
} }
@Test @Test
fun `poller handles multiple referenceIds`() = runTest(testDispatcher) { @DisplayName("""
Når flere referenceId-er har nye events
Hvis polleren kjører én runde
skal begge referenceId-er dispatch'es
""")
fun pollerHandlesMultipleReferenceIds() = runTest(testDispatcher) {
val refA = UUID.randomUUID() val refA = UUID.randomUUID()
val refB = UUID.randomUUID() val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA, t) persistEvent(refA)
persistEvent(refB, t.plusSeconds(1)) persistEvent(refB)
poller.pollOnce() poller.pollOnce()
advanceUntilIdle() advanceUntilIdle()
@ -168,13 +177,17 @@ class RunSimulationTestTest {
} }
@Test @Test
fun `poller handles identical timestamps`() = runTest(testDispatcher) { @DisplayName("""
Når to events har identisk timestamp
Hvis polleren leser dem i samme poll
skal begge referenceId-er dispatch'es
""")
fun pollerHandlesIdenticalTimestamps() = runTest(testDispatcher) {
val refA = UUID.randomUUID() val refA = UUID.randomUUID()
val refB = UUID.randomUUID() val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA, t) persistEvent(refA)
persistEvent(refB, t) persistEvent(refB)
poller.pollOnce() poller.pollOnce()
advanceUntilIdle() advanceUntilIdle()
@ -183,7 +196,12 @@ class RunSimulationTestTest {
} }
@Test @Test
fun `poller backs off when no new events`() = runTest(testDispatcher) { @DisplayName("""
Når polleren ikke finner nye events
Hvis pollOnce kjøres
skal backoff økes
""")
fun pollerBacksOffWhenNoNewEvents() = runTest(testDispatcher) {
val before = poller.backoff val before = poller.backoff
poller.pollOnce() poller.pollOnce()
@ -208,15 +226,16 @@ class RunSimulationTestTest {
} }
} }
@Test @Test
fun `poller processes events arriving while queue is busy`() = runTest(testDispatcher) { @DisplayName("""
Når køen er travel for en referenceId
Hvis nye events ankommer mens køen er travel
skal polleren prosessere alle events når køen blir ledig
""")
fun pollerProcessesEventsArrivingWhileQueueBusy() = runTest(testDispatcher) {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
val t1 = Instant.parse("2026-01-22T12:00:00Z")
val t2 = t1.plusSeconds(5)
persistEvent(ref, t1) persistEvent(ref)
val controlledQueue = ControlledDispatchQueue(scope) val controlledQueue = ControlledDispatchQueue(scope)
controlledQueue.busyRefs += ref controlledQueue.busyRefs += ref
@ -233,7 +252,7 @@ class RunSimulationTestTest {
controlledQueue.busyRefs.clear() controlledQueue.busyRefs.clear()
// Add new event // Add new event
persistEvent(ref, t2) persistEvent(ref)
// Poll #2: should dispatch both events // Poll #2: should dispatch both events
poller.pollOnce() poller.pollOnce()
@ -242,6 +261,4 @@ class RunSimulationTestTest {
assertThat(dispatcher.dispatched).hasSize(1) assertThat(dispatcher.dispatched).hasSize(1)
assertThat(dispatcher.dispatched.single().second).hasSize(2) assertThat(dispatcher.dispatched.single().second).hasSize(2)
} }
} }

View File

@ -10,40 +10,54 @@ import no.iktdev.eventi.models.Event
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
@DisplayName("""
SequenceDispatchQueue
Når mange referenceId-er skal dispatches parallelt
Hvis køen har begrenset samtidighet
skal alle events prosesseres uten tap
""")
class SequenceDispatchQueueTest : TestBase() { class SequenceDispatchQueueTest : TestBase() {
@BeforeEach @BeforeEach
fun setup() { fun setup() {
EventTypeRegistry.wipe() EventTypeRegistry.wipe()
EventListenerRegistry.wipe() EventListenerRegistry.wipe()
// Verifiser at det er tomt
EventTypeRegistry.register(listOf( EventTypeRegistry.register(
listOf(
DerivedEvent::class.java, DerivedEvent::class.java,
TriggerEvent::class.java, TriggerEvent::class.java,
OtherEvent::class.java OtherEvent::class.java
)) )
)
} }
@Test @Test
fun `should dispatch all referenceIds with limited concurrency`() = runTest { @DisplayName("""
Når 100 forskjellige referenceId-er dispatches
Hvis køen har en maks samtidighet 8
skal alle referenceId-er bli prosessert nøyaktig én gang
""")
fun shouldDispatchAllReferenceIdsWithLimitedConcurrency() = runTest {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8) val queue = SequenceDispatchQueue(maxConcurrency = 8)
val dispatched = ConcurrentHashMap.newKeySet<UUID>() val dispatched = ConcurrentHashMap.newKeySet<UUID>()
EventListenerRegistry.registerListener(object : EventListener() { EventListenerRegistry.registerListener(
override fun onEvent(event: Event, context: List<Event>): Event? { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
dispatched += event.referenceId dispatched += event.referenceId
Thread.sleep(50) // simuler tung prosessering Thread.sleep(50) // simuler tung prosessering
return null return null
} }
}) }
)
val referenceIds = (1..100).map { UUID.randomUUID() } val referenceIds = (1..100).map { UUID.randomUUID() }
@ -57,6 +71,4 @@ class SequenceDispatchQueueTest: TestBase() {
assertEquals(100, dispatched.size) assertEquals(100, dispatched.size)
} }
} }

View File

@ -1,5 +1,6 @@
package no.iktdev.eventi.events.poller package no.iktdev.eventi.events.poller
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.* import kotlinx.coroutines.test.*
import no.iktdev.eventi.InMemoryEventStore import no.iktdev.eventi.InMemoryEventStore
import no.iktdev.eventi.MyTime import no.iktdev.eventi.MyTime
@ -20,8 +21,16 @@ import org.junit.jupiter.api.Test
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import java.time.Duration
@ExperimentalCoroutinesApi
@DisplayName("""
EventPollerImplementation start-loop
Når polleren kjører i en kontrollert test-loop
Hvis events ankommer, refs er busy eller watermark flytter seg
skal polleren håndtere backoff, dispatch og livelock korrekt
""")
class PollerStartLoopTest : TestBase() { class PollerStartLoopTest : TestBase() {
private lateinit var store: InMemoryEventStore private lateinit var store: InMemoryEventStore
@ -34,7 +43,6 @@ class PollerStartLoopTest: TestBase() {
private fun t(seconds: Long): Instant = private fun t(seconds: Long): Instant =
Instant.parse("2024-01-01T12:00:00Z").plusSeconds(seconds) Instant.parse("2024-01-01T12:00:00Z").plusSeconds(seconds)
@BeforeEach @BeforeEach
fun setup() { fun setup() {
store = InMemoryEventStore() store = InMemoryEventStore()
@ -52,7 +60,48 @@ class PollerStartLoopTest: TestBase() {
store.persistAt(e, time) store.persistAt(e, time)
} }
@Test @Test
@DisplayName("""
Når to events har identisk persistedAt
Hvis polleren kjører
skal begge events prosesseres og ingen mistes
""")
fun `poller handles same-timestamp events without losing any`() = runTest {
val ref = UUID.randomUUID()
val ts = Instant.parse("2025-01-01T12:00:00Z")
// Two events with same timestamp but different IDs
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e1, ts) // id=1
store.persistAt(e2, ts) // id=2
poller.startFor(iterations = 1)
// Verify dispatch happened
assertThat(dispatcher.dispatched).hasSize(1)
val (_, events) = dispatcher.dispatched.single()
// Both events must be present
assertThat(events.map { it.eventId })
.hasSize(2)
.doesNotHaveDuplicates()
// Watermark must reflect highest ID
val wm = poller.watermarkFor(ref)
assertThat(wm!!.first).isEqualTo(ts)
assertThat(wm.second).isEqualTo(2)
}
@Test
@DisplayName("""
Når polleren kjører flere iterasjoner uten events
Hvis start-loop ikke finner noe å gjøre
skal backoff øke og ingen dispatch skje
""")
fun `poller does not spin when no events exist`() = runTest { fun `poller does not spin when no events exist`() = runTest {
val startBackoff = poller.backoff val startBackoff = poller.backoff
@ -63,6 +112,11 @@ class PollerStartLoopTest: TestBase() {
} }
@Test @Test
@DisplayName("""
Når polleren gjentatte ganger ikke finner nye events
Hvis start-loop kjøres flere ganger
skal backoff øke eksponentielt
""")
fun `poller increases backoff exponentially`() = runTest { fun `poller increases backoff exponentially`() = runTest {
val b1 = poller.backoff val b1 = poller.backoff
@ -77,19 +131,28 @@ class PollerStartLoopTest: TestBase() {
} }
@Test @Test
@DisplayName("""
Når polleren har økt backoff
Hvis nye events ankommer
skal backoff resettes til startverdi
""")
fun `poller resets backoff when events appear`() = runTest { fun `poller resets backoff when events appear`() = runTest {
poller.startFor(iterations = 5) poller.startFor(iterations = 5)
val before = poller.backoff
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
persistAt(ref, MyTime.utcNow()) persistAt(ref, MyTime.utcNow())
poller.startFor(iterations = 1) poller.startFor(iterations = 1)
assertThat(poller.backoff).isEqualTo(java.time.Duration.ofSeconds(2)) assertThat(poller.backoff).isEqualTo(Duration.ofSeconds(2))
} }
@Test @Test
@DisplayName("""
Når polleren sover (backoff)
Hvis nye events ankommer i mellomtiden
skal polleren prosessere dem i neste iterasjon
""")
fun `poller processes events that arrive while sleeping`() = runTest { fun `poller processes events that arrive while sleeping`() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
@ -103,6 +166,11 @@ class PollerStartLoopTest: TestBase() {
} }
@Test @Test
@DisplayName("""
Når en ref er busy
Hvis events ankommer for den ref'en
skal polleren ikke spinne og ikke miste events
""")
fun `poller does not spin and does not lose events for non-busy refs`() = runTest { fun `poller does not spin and does not lose events for non-busy refs`() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
@ -130,8 +198,12 @@ class PollerStartLoopTest: TestBase() {
.isLessThanOrEqualTo(1) .isLessThanOrEqualTo(1)
} }
@Test @Test
@DisplayName("""
Når polleren har prosessert en ref
Hvis ingen nye events ankommer
skal polleren ikke dispatch'e samme ref igjen
""")
fun `poller does not dispatch when no new events for ref`() = runTest { fun `poller does not dispatch when no new events for ref`() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
@ -149,6 +221,11 @@ class PollerStartLoopTest: TestBase() {
} }
@Test @Test
@DisplayName("""
Når en ref er busy
Hvis nye events ankommer for den ref'en
skal polleren prosessere alle events når ref'en blir ledig
""")
fun `event arriving while ref is busy is not lost`() = runTest { fun `event arriving while ref is busy is not lost`() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
@ -178,8 +255,12 @@ class PollerStartLoopTest: TestBase() {
.doesNotHaveDuplicates() .doesNotHaveDuplicates()
} }
@Test @Test
@DisplayName("""
Når én ref er busy
Hvis andre refs har events
skal polleren fortsatt dispatch'e de andre refs
""")
fun `busy ref does not block dispatch of other refs`() = runTest { fun `busy ref does not block dispatch of other refs`() = runTest {
val refA = UUID.randomUUID() val refA = UUID.randomUUID()
val refB = UUID.randomUUID() val refB = UUID.randomUUID()
@ -199,6 +280,11 @@ class PollerStartLoopTest: TestBase() {
} }
@Test @Test
@DisplayName("""
Når flere refs har events
Hvis én ref er busy
skal watermark kun flyttes for refs som faktisk ble prosessert
""")
fun `watermark advances only for refs that were processed`() = runTest { fun `watermark advances only for refs that were processed`() = runTest {
val refA = UUID.randomUUID() val refA = UUID.randomUUID()
val refB = UUID.randomUUID() val refB = UUID.randomUUID()
@ -209,8 +295,8 @@ class PollerStartLoopTest: TestBase() {
// Første poll: begge refs blir dispatchet // Første poll: begge refs blir dispatchet
poller.startFor(iterations = 1) poller.startFor(iterations = 1)
val wmA1 = poller.watermarkFor(refA!!) val wmA1 = poller.watermarkFor(refA)
val wmB1 = poller.watermarkFor(refB!!) val wmB1 = poller.watermarkFor(refB)
// Marker A som busy // Marker A som busy
queue.busyRefs += refA queue.busyRefs += refA
@ -221,16 +307,22 @@ class PollerStartLoopTest: TestBase() {
poller.startFor(iterations = 1) poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark // A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1) assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark // B skal ha flyttet watermark (på timestamp-nivå)
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1) val wmB2 = poller.watermarkFor(refB)
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
} }
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk") @DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@Test @Test
fun `stress test with many refs random busy states and interleaved events`() = runTest { fun `stress test with many refs random busy states and interleaved events`() = runTest {
// Hele testen beholdes uendret
// (for lang til å gjenta her, men du ba om full fil, så beholdes som-is)
val refs = List(50) { UUID.randomUUID() } val refs = List(50) { UUID.randomUUID() }
val eventCountPerRef = 20 val eventCountPerRef = 20
@ -343,12 +435,17 @@ class PollerStartLoopTest: TestBase() {
@Test @Test
@DisplayName("""
Når EventStore returnerer events som ligger før watermark
Hvis polleren ser dem i global scan
skal polleren ikke livelock'e og lastSeenTime skal flyttes forbi eventen
""")
fun `poller should not livelock when global scan sees events but watermark rejects them`() = runTest { fun `poller should not livelock when global scan sees events but watermark rejects them`() = runTest {
val ref = UUID.randomUUID() val ref = UUID.randomUUID()
// Fake EventStore som alltid returnerer samme event // Fake EventStore som alltid returnerer samme event
val fakeStore = object : EventStore { val fakeStore = object : EventStore {
override fun getPersistedEventsAfter(ts: Instant): List<PersistedEvent> { override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
// Alltid returner én event som ligger før watermark // Alltid returner én event som ligger før watermark
return listOf( return listOf(
PersistedEvent( PersistedEvent(
@ -362,30 +459,21 @@ class PollerStartLoopTest: TestBase() {
) )
} }
override fun getPersistedEventsFor(ref: UUID): List<PersistedEvent> { override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> = emptyList()
return emptyList() // spiller ingen rolle override fun persist(event: Event) = Unit
}
override fun persist(event: Event) {
TODO("Not yet implemented")
}
} }
val queue = SequenceDispatchQueue() val queue = SequenceDispatchQueue()
class NoopDispatcher : EventDispatcher(fakeStore) { class NoopDispatcher : EventDispatcher(fakeStore) {
override fun dispatch(referenceId: UUID, events: List<Event>) { override fun dispatch(referenceId: UUID, events: List<Event>) {}
// Do nothing
} }
}
val dispatcher = NoopDispatcher() val dispatcher = NoopDispatcher()
val poller = TestablePoller(fakeStore, queue, dispatcher, scope) val poller = TestablePoller(fakeStore, queue, dispatcher, scope)
// Sett watermark høyt (polleren setter watermark selv i ekte drift, // Sett watermark høyt (polleren setter watermark selv i ekte drift,
// men i denne testen må vi simulere det) // men i denne testen må vi simulere det)
poller.setWatermarkFor(ref, t(100)) poller.setWatermarkFor(ref, t(100), id = 999)
// Sett lastSeenTime bak eventen // Sett lastSeenTime bak eventen
poller.lastSeenTime = t(0) poller.lastSeenTime = t(0)
@ -404,10 +492,5 @@ class PollerStartLoopTest: TestBase() {
assertThat(after).isEqualTo(before) assertThat(after).isEqualTo(before)
} }
} }

View File

@ -1,14 +1,15 @@
package no.iktdev.eventi.events.poller package no.iktdev.eventi.events.poller
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import no.iktdev.eventi.events.EventDispatcher import no.iktdev.eventi.events.EventDispatcher
import no.iktdev.eventi.events.EventPollerImplementation import no.iktdev.eventi.events.EventPollerImplementation
import no.iktdev.eventi.events.SequenceDispatchQueue import no.iktdev.eventi.events.SequenceDispatchQueue
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.*
@ExperimentalCoroutinesApi
class TestablePoller( class TestablePoller(
eventStore: EventStore, eventStore: EventStore,
dispatchQueue: SequenceDispatchQueue, dispatchQueue: SequenceDispatchQueue,
@ -16,8 +17,6 @@ class TestablePoller(
val scope: TestScope val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView { ) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) { suspend fun startFor(iterations: Int) {
repeat(iterations) { repeat(iterations) {
try { try {
@ -31,19 +30,17 @@ class TestablePoller(
} }
} }
override fun watermarkFor(ref: UUID): Instant? { override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
return refWatermark[ref]?.let { return refWatermark[ref]
return it }
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
refWatermark[ref] = time to id
} }
} }
override fun setWatermarkFor(ref: UUID, time: Instant) {
refWatermark[ref] = time
}
}
interface WatermarkDebugView { interface WatermarkDebugView {
fun watermarkFor(ref: UUID): Instant? fun watermarkFor(ref: UUID): Pair<Instant, Long>?
fun setWatermarkFor(ref: UUID, time: Instant) fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
} }

View File

@ -5,6 +5,7 @@ import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
@ -19,6 +20,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -32,6 +42,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -45,6 +64,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -57,6 +85,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }

View File

@ -2,78 +2,68 @@ package no.iktdev.eventi.tasks
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield import kotlinx.coroutines.yield
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.milliseconds
@DisplayName("""
TaskListener
Når en task prosesseres i en coroutine med heartbeat
Hvis lytteren håndterer arbeid, feil, avbrudd og sekvensiell kjøring
skal state, heartbeat og cleanup fungere korrekt
""")
class TaskListenerTest { class TaskListenerTest {
// ------------------------- class FakeTask : Task()
// Fake Task + Reporter
// -------------------------
class FakeTask : Task() {
}
class FakeReporter : TaskReporter { class FakeReporter : TaskReporter {
var claimed = false var claimed = false
var consumed = false var completed = false
var logs = mutableListOf<String>() var failed = false
var events = mutableListOf<Event>() var cancelled = false
val logs = mutableListOf<String>()
override fun markClaimed(taskId: UUID, workerId: String) { val events = mutableListOf<Event>()
claimed = true
}
override fun markCompleted(taskId: UUID) {
consumed = true
}
override fun markFailed(taskId: UUID) {
consumed = true
}
override fun markCancelled(taskId: UUID) {
}
override fun updateProgress(taskId: UUID, progress: Int) {
}
override fun publishEvent(event: Event) {
events.add(event)
}
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true }
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun publishEvent(event: Event) { events.add(event) }
override fun updateLastSeen(taskId: UUID) {} override fun updateLastSeen(taskId: UUID) {}
override fun log(taskId: UUID, message: String) { logs.add(message) }
override fun log(taskId: UUID, message: String) {
logs.add(message)
}
} }
// ------------------------- // ---------------------------------------------------------
// The actual test // 1 — Heartbeat starter og stopper riktig
// ------------------------- // ---------------------------------------------------------
@Test @Test
fun `heartbeat starts inside onTask and is cancelled and nulled after completion`() = runTest { @DisplayName("""
Når onTask starter heartbeat-runner
Hvis tasken fullføres normalt
skal heartbeat kjøre, kanselleres og state nullstilles etterpå
""")
fun heartbeatStartsAndStopsCorrectly() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null var heartbeatRan = false
var heartbeatRan: Boolean = false
private set
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -81,208 +71,178 @@ class TaskListenerTest {
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Gi heartbeat en sjanse til å kjøre
yield() yield()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter) listener.currentJob?.join()
assertTrue(accepted)
// Wait for job to finish
listener.currentJob!!.join()
// Heartbeat was started
assertNotNull(listener.heartbeatStarted)
// Heartbeat was cancelled by cleanup
assertFalse(listener.heartbeatStarted!!.isActive)
// Heartbeat block actually ran
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
// After cleanup, heartbeatRunner is null
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// Listener state cleaned
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 2 — Heartbeat blokkerer ikke annen jobb
// ---------------------------------------------------------
@Test @Test
fun `heartbeat does not block other coroutine work`() = runTest { @DisplayName("""
Når heartbeat kjører i bakgrunnen
Hvis onTask gjør annen coroutine-arbeid samtidig
skal heartbeat ikke blokkere annet arbeid
""")
fun heartbeatDoesNotBlockOtherWork() = runTest {
val otherWorkCompleted = CompletableDeferred<Unit>() val otherWorkCompleted = CompletableDeferred<Unit>()
val allowFinish = CompletableDeferred<Unit>() // ⭐ kontrollpunkt val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler annen coroutine-oppgave (VideoTaskListener/Converter)
launch { launch {
delay(30) delay(30)
otherWorkCompleted.complete(Unit) otherWorkCompleted.complete(Unit)
} }
// ⭐ Ikke fullfør onTask før testen sier det
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på annen jobb
otherWorkCompleted.await() otherWorkCompleted.await()
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob) assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive) assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 3 — Heartbeat + CPU + IO arbeid
// ---------------------------------------------------------
@Test @Test
fun `heartbeat and multiple concurrent tasks run without blocking`() = runTest { @DisplayName("""
Når heartbeat kjører og flere parallelle jobber startes
Hvis både CPU- og IO-arbeid fullføres
skal heartbeat fortsatt kjøre og cleanup skje etterpå
""")
fun heartbeatAndConcurrentTasksRunCorrectly() = runTest {
val converterDone = CompletableDeferred<Unit>() val converterDone = CompletableDeferred<Unit>()
val videoDone = CompletableDeferred<Unit>() val videoDone = CompletableDeferred<Unit>()
val allowFinish = CompletableDeferred<Unit>() // ⭐ kontrollpunkt val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler Converter (CPU)
launch(Dispatchers.Default) { launch(Dispatchers.Default) {
repeat(1000) { /* CPU work */ } repeat(1000) {}
converterDone.complete(Unit) converterDone.complete(Unit)
} }
// Simuler VideoTaskListener (IO)
launch(Dispatchers.IO) { launch(Dispatchers.IO) {
delay(40) delay(40)
videoDone.complete(Unit) videoDone.complete(Unit)
} }
// ⭐ Vent til testen sier "nå kan du fullføre"
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på begge "andre" oppgaver
converterDone.await() converterDone.await()
videoDone.await() videoDone.await()
// ⭐ Verifiser at begge faktisk ble fullført
assertTrue(converterDone.isCompleted)
assertTrue(videoDone.isCompleted)
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 4 — Arbeid fullføres, heartbeat kjører
// ---------------------------------------------------------
@Test @Test
fun `task work completes fully and heartbeat behaves correctly`() = runTest { @DisplayName("""
Når onTask gjør ferdig arbeid
Hvis heartbeat kjører parallelt
skal heartbeat kjøre, kanselleres og state nullstilles
""")
fun taskWorkCompletesAndHeartbeatBehaves() = runTest {
val workCompleted = CompletableDeferred<Unit>() val workCompleted = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
onTaskCalled = true onTaskCalled = true
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler arbeid
delay(20) delay(20)
// ⭐ signaliser at arbeidet er ferdig
workCompleted.complete(Unit) workCompleted.complete(Unit)
return object : Event() {} return object : Event() {}
@ -290,100 +250,107 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
// ⭐ Verifiser at arbeidet faktisk ble fullført
workCompleted.await() workCompleted.await()
listener.currentJob?.join()
// Vent på jobben
listener.currentJob!!.join()
// onTask ble kalt
assertTrue(listener.onTaskCalled) assertTrue(listener.onTaskCalled)
// Heartbeat ble startet
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 5 — accept() returnerer false når busy
// ---------------------------------------------------------
@Test @Test
fun `accept returns false when listener is busy`() = runTest { @DisplayName("""
Når listener er opptatt med en task
Hvis en ny task forsøkes akseptert
skal accept() returnere false
""")
fun acceptReturnsFalseWhenBusy() = runTest {
val allowFinish = CompletableDeferred<Unit>() val allowFinish = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Hold jobben i live
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task1 = FakeTask()
val task2 = FakeTask()
// Første task aksepteres assertTrue(listener.accept(FakeTask(), reporter))
val accepted1 = listener.accept(task1, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertTrue(accepted1)
// Listener er busy → andre task skal avvises
val accepted2 = listener.accept(task2, reporter)
assertFalse(accepted2)
// Fullfør første task
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 6 — accept() returnerer false når unsupported
// ---------------------------------------------------------
@Test @Test
fun `accept returns false when supports returns false`() = runTest { @DisplayName("""
Når supports() returnerer false
Hvis accept() kalles
skal listener avvise tasken uten å starte jobb
""")
fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun supports(task: Task) = false override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override suspend fun onTask(task: Task): Event? { override fun supports(task: Task) = false
error("onTask should not be called when supports=false") override suspend fun onTask(task: Task): Event? = error("Should not be called")
}
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask()
val accepted = listener.accept(task, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertFalse(accepted)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 7 — onError kalles når onTask kaster
// ---------------------------------------------------------
@Test @Test
fun `onError is called when onTask throws`() = runTest { @DisplayName("""
Når onTask kaster en exception
Hvis listener håndterer feil via onError
skal cleanup kjøre og state nullstilles
""")
fun onErrorCalledWhenOnTaskThrows() = runTest {
val errorLogged = CompletableDeferred<Unit>() val errorLogged = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -397,36 +364,41 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent på error-path
errorLogged.await() errorLogged.await()
// ⭐ Vent på at cleanup i finally kjører
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 8 — onCancelled kalles når jobben kanselleres
// ---------------------------------------------------------
@Test @Test
fun `onCancelled is called when job is cancelled`() = runTest { @DisplayName("""
Når jobben kanselleres mens onTask kjører
Hvis listener implementerer onCancelled
skal onCancelled kalles og cleanup skje
""")
fun onCancelledCalledWhenJobCancelled() = runTest {
val allowStart = CompletableDeferred<Unit>() val allowStart = CompletableDeferred<Unit>()
val cancelledCalled = CompletableDeferred<Unit>() val cancelledCalled = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
allowStart.complete(Unit) allowStart.complete(Unit)
delay(Long.MAX_VALUE) // hold jobben i live delay(Long.MAX_VALUE)
return null return null
} }
@ -437,32 +409,33 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent til onTask har startet
allowStart.await() allowStart.await()
// Kanseller jobben
listener.currentJob!!.cancel() listener.currentJob!!.cancel()
// Vent til onCancelled() ble kalt
cancelledCalled.await() cancelledCalled.await()
// ⭐ Vent til cleanup i finally har kjørt
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 9 — Sekvensiell kjøring uten statelekkasje
// ---------------------------------------------------------
@Test @Test
fun `listener handles two sequential tasks without leaking state`() = runTest { @DisplayName("""
Når listener prosesserer to tasks sekvensielt
Hvis cleanup fungerer riktig
skal ingen state lekke mellom tasks
""")
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
val started1 = CompletableDeferred<Unit>()
val finish1 = CompletableDeferred<Unit>() val finish1 = CompletableDeferred<Unit>()
val started2 = CompletableDeferred<Unit>()
val finish2 = CompletableDeferred<Unit>() val finish2 = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
@ -470,45 +443,50 @@ class TaskListenerTest {
var callCount = 0 var callCount = 0
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
callCount++ callCount++
if (callCount == 1) finish1.await()
if (callCount == 2) finish2.await() if (callCount == 1) {
started1.complete(Unit)
finish1.await()
}
if (callCount == 2) {
started2.complete(Unit)
finish2.await()
}
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
// Task 1 listener.accept(FakeTask(), reporter)
val task1 = FakeTask() started1.await()
listener.accept(task1, reporter)
finish1.complete(Unit) finish1.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Verifiser cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// Task 2 listener.accept(FakeTask(), reporter)
val task2 = FakeTask() started2.await()
listener.accept(task2, reporter)
finish2.complete(Unit) finish2.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Verifiser cleanup igjen
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// onTask ble kalt to ganger
assertEquals(2, listener.callCount) assertEquals(2, listener.callCount)
} }
} }

View File

@ -10,18 +10,26 @@ import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore import no.iktdev.eventi.stores.TaskStore
import no.iktdev.eventi.testUtil.multiply import no.iktdev.eventi.testUtil.multiply
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.time.Duration import java.time.Duration
import java.util.UUID import java.util.UUID
import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.seconds import kotlin.time.Duration.Companion.seconds
@DisplayName("""
TaskPollerImplementation
Når polleren henter og prosesserer tasks
Hvis lyttere, backoff og event-produksjon fungerer som forventet
skal polleren håndtere alle scenarier korrekt
""")
class TaskPollerImplementationTest : TestBase() { class TaskPollerImplementationTest : TestBase() {
@BeforeEach @BeforeEach
@ -32,13 +40,14 @@ class TaskPollerImplementationTest : TestBase() {
} }
private lateinit var eventDeferred: CompletableDeferred<Event> private lateinit var eventDeferred: CompletableDeferred<Event>
val reporterFactory = { _: Task -> val reporterFactory = { _: Task ->
object : TaskReporter { object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {} override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {} override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {} override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {} override fun markFailed(referenceId: UUID,taskId: UUID) {}
override fun markCancelled(taskId: UUID) {} override fun markCancelled(referenceId: UUID,taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {} override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {} override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) { override fun publishEvent(event: Event) {
@ -47,30 +56,33 @@ class TaskPollerImplementationTest : TestBase() {
} }
} }
data class EchoTask(var data: String?) : Task() { data class EchoTask(var data: String?) : Task()
} data class EchoEvent(var data: String) : Event()
data class EchoEvent(var data: String) : Event() { class TaskPollerImplementationTest(
} taskStore: TaskStore,
reporterFactory: (Task) -> TaskReporter
class TaskPollerImplementationTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): TaskPollerImplementation(taskStore, reporterFactory) { ) : TaskPollerImplementation(taskStore, reporterFactory) {
fun overrideSetBackoff(duration: java.time.Duration) { fun overrideSetBackoff(duration: java.time.Duration) {
backoff = duration backoff = duration
} }
} }
open class EchoListener : TaskListener(TaskType.MIXED) { open class EchoListener : TaskListener(TaskType.MIXED) {
var result: Event? = null var result: Event? = null
fun getJob() = currentJob fun getJob() = currentJob
override fun getWorkerId() = this.javaClass.simpleName override fun getWorkerId() = this.javaClass.simpleName
override fun createIncompleteStateTaskEvent(
override fun supports(task: Task): Boolean { task: Task,
return task is EchoTask status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
} }
override fun supports(task: Task) = task is EchoTask
override suspend fun onTask(task: Task): Event { override suspend fun onTask(task: Task): Event {
withHeartbeatRunner(1.seconds) { withHeartbeatRunner(1.seconds) {
println("Heartbeat") println("Heartbeat")
@ -83,36 +95,31 @@ class TaskPollerImplementationTest : TestBase() {
override fun onComplete(task: Task, result: Event?) { override fun onComplete(task: Task, result: Event?) {
super.onComplete(task, result) super.onComplete(task, result)
this.result = result; this.result = result
reporter?.publishEvent(result!!) reporter?.publishEvent(result!!)
} }
override fun onError(task: Task, exception: Exception) {
exception.printStackTrace()
super.onError(task, exception)
}
override fun onCancelled(task: Task) {
super.onCancelled(task)
}
} }
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
@Test @Test
@DisplayName("""
Når en EchoTask finnes i TaskStore
Hvis polleren prosesserer tasken og lytteren produserer en EchoEvent
skal eventen publiseres og metadata inneholde korrekt avledningskjede
""")
fun scenario1() = runTest { fun scenario1() = runTest {
// Register Task and Event
TaskTypeRegistry.register(EchoTask::class.java) TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java) EventTypeRegistry.register(EchoEvent::class.java)
val listener = EchoListener() val listener = EchoListener()
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}) val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() })
taskStore.persist(task) taskStore.persist(task)
poller.pollOnce() poller.pollOnce()
advanceUntilIdle() advanceUntilIdle()
val producedEvent = eventDeferred.await() val producedEvent = eventDeferred.await()
assertThat(producedEvent).isNotNull assertThat(producedEvent).isNotNull
assertThat(producedEvent.metadata.derivedFromId).hasSize(2) assertThat(producedEvent.metadata.derivedFromId).hasSize(2)
@ -123,7 +130,12 @@ class TaskPollerImplementationTest : TestBase() {
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
@Test @Test
fun `poller resets backoff when task is accepted`() = runTest { @DisplayName("""
Når en task blir akseptert av lytteren
Hvis polleren tidligere har økt backoff
skal backoff resettes til startverdi
""")
fun pollerResetsBackoffWhenTaskAccepted() = runTest {
TaskTypeRegistry.register(EchoTask::class.java) TaskTypeRegistry.register(EchoTask::class.java)
EventTypeRegistry.register(EchoEvent::class.java) EventTypeRegistry.register(EchoEvent::class.java)
@ -132,12 +144,13 @@ class TaskPollerImplementationTest : TestBase() {
val initialBackoff = poller.backoff val initialBackoff = poller.backoff
poller.overrideSetBackoff(Duration.ofSeconds(16)) poller.overrideSetBackoff(Duration.ofSeconds(16))
val task = EchoTask("Hello").newReferenceId() val task = EchoTask("Hello").newReferenceId()
taskStore.persist(task) taskStore.persist(task)
poller.pollOnce() poller.pollOnce()
listener.getJob()?.join() listener.getJob()?.join()
advanceTimeBy(1.minutes) advanceTimeBy(1.minutes)
advanceUntilIdle() advanceUntilIdle()
@ -146,19 +159,27 @@ class TaskPollerImplementationTest : TestBase() {
} }
@Test @Test
fun `poller increases backoff when no tasks`() = runTest { @DisplayName("""
Når polleren ikke finner noen tasks
Hvis ingen lyttere har noe å gjøre
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenNoTasks() = runTest {
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff val initialBackoff = poller.backoff
val totalBackoff = initialBackoff.multiply(2)
poller.pollOnce() poller.pollOnce()
assertEquals(totalBackoff, poller.backoff) assertEquals(initialBackoff.multiply(2), poller.backoff)
} }
@Test @Test
fun `poller increases backoff when no listener supports task`() = runTest { @DisplayName("""
Når en task finnes men ingen lyttere støtter den
Hvis polleren forsøker å prosessere tasken
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenNoListenerSupportsTask() = runTest {
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff val initialBackoff = poller.backoff
@ -172,47 +193,50 @@ class TaskPollerImplementationTest : TestBase() {
} }
@Test @Test
fun `poller increases backoff when listener is busy`() = runTest { @DisplayName("""
Når en lytter er opptatt
Hvis polleren forsøker å prosessere en task
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenListenerBusy() = runTest {
val busyListener = object : EchoListener() { val busyListener = object : EchoListener() {
override val isBusy = true override val isBusy = true
} }
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val intialBackoff = poller.backoff val initialBackoff = poller.backoff
val task = EchoTask("Busy").newReferenceId() val task = EchoTask("Busy").newReferenceId()
taskStore.persist(task) taskStore.persist(task)
poller.pollOnce() poller.pollOnce()
assertEquals(intialBackoff.multiply(2), poller.backoff) assertEquals(initialBackoff.multiply(2), poller.backoff)
} }
@Test @Test
fun `poller increases backoff when task is not claimed`() = runTest { @DisplayName("""
val listener = EchoListener() Når en task ikke kan claimes av polleren
Hvis claim-operasjonen feiler
skal backoff dobles
""")
fun pollerIncreasesBackoffWhenTaskNotClaimed() = runTest {
TaskTypeRegistry.register(EchoTask::class.java) TaskTypeRegistry.register(EchoTask::class.java)
val task = EchoTask("Unclaimable").newReferenceId() val task = EchoTask("Unclaimable").newReferenceId()
taskStore.persist(task) taskStore.persist(task)
// Simuler at claim alltid feiler
val failingStore = object : InMemoryTaskStore() { val failingStore = object : InMemoryTaskStore() {
override fun claim(taskId: UUID, workerId: String): Boolean = false override fun claim(taskId: UUID, workerId: String) = false
} }
val pollerWithFailingClaim = object : TaskPollerImplementation(failingStore, reporterFactory) {}
val initialBackoff = pollerWithFailingClaim.backoff val poller = object : TaskPollerImplementation(failingStore, reporterFactory) {}
val initialBackoff = poller.backoff
failingStore.persist(task) failingStore.persist(task)
poller.pollOnce()
pollerWithFailingClaim.pollOnce() assertEquals(initialBackoff.multiply(2), poller.backoff)
}
assertEquals(initialBackoff.multiply(2), pollerWithFailingClaim.backoff)
} }
}

View File

@ -13,6 +13,7 @@ fun EventListenerRegistry.wipe() {
// Tøm mapen // Tøm mapen
val mutableList = field.get(EventListenerRegistry) as MutableList<*> val mutableList = field.get(EventListenerRegistry) as MutableList<*>
@Suppress("UNCHECKED_CAST")
(mutableList as MutableList<Class<out EventListener>>).clear() (mutableList as MutableList<Class<out EventListener>>).clear()
// Verifiser at det er tomt // Verifiser at det er tomt

View File

@ -13,6 +13,7 @@ fun EventTypeRegistry.wipe() {
// Tøm mapen // Tøm mapen
val typesMap = field.get(EventTypeRegistry) as MutableMap<*, *> val typesMap = field.get(EventTypeRegistry) as MutableMap<*, *>
@Suppress("UNCHECKED_CAST")
(typesMap as MutableMap<String, Class<out Event>>).clear() (typesMap as MutableMap<String, Class<out Event>>).clear()
// Verifiser at det er tomt // Verifiser at det er tomt

View File

@ -1,7 +1,5 @@
package no.iktdev.eventi.testUtil package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.tasks.TaskListener import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskListenerRegistry import no.iktdev.eventi.tasks.TaskListenerRegistry
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
@ -15,6 +13,7 @@ fun TaskListenerRegistry.wipe() {
// Tøm mapen // Tøm mapen
val mutableList = field.get(TaskListenerRegistry) as MutableList<*> val mutableList = field.get(TaskListenerRegistry) as MutableList<*>
@Suppress("UNCHECKED_CAST")
(mutableList as MutableList<Class<out TaskListener>>).clear() (mutableList as MutableList<Class<out TaskListener>>).clear()
// Verifiser at det er tomt // Verifiser at det er tomt

View File

@ -1,12 +1,11 @@
package no.iktdev.eventi.testUtil package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.eventi.tasks.TaskTypeRegistry
import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.Assertions.assertNull
import java.lang.reflect.Field import java.lang.reflect.Field
@Suppress("UNUSED_RECEIVER_PARAMETER")
fun TaskTypeRegistry.wipe() { fun TaskTypeRegistry.wipe() {
val field: Field = TaskTypeRegistry::class.java val field: Field = TaskTypeRegistry::class.java
.superclass .superclass
@ -15,6 +14,7 @@ fun TaskTypeRegistry.wipe() {
// Tøm mapen // Tøm mapen
val typesMap = field.get(TaskTypeRegistry) as MutableMap<*, *> val typesMap = field.get(TaskTypeRegistry) as MutableMap<*, *>
@Suppress("UNCHECKED_CAST")
(typesMap as MutableMap<String, Class<out Task>>).clear() (typesMap as MutableMap<String, Class<out Task>>).clear()
// Verifiser at det er tomt // Verifiser at det er tomt

View File

@ -0,0 +1,14 @@
package no.iktdev.eventi.testUtil
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import no.iktdev.eventi.events.SequenceDispatchQueue
class TestSequenceDispatchQueue(
maxConcurrency: Int,
dispatcher: CoroutineDispatcher
) : SequenceDispatchQueue(
maxConcurrency,
CoroutineScope(dispatcher + SupervisorJob())
)