Compare commits

...

8 Commits

14 changed files with 470 additions and 269 deletions

29
.github/workflows/test.yml vendored Normal file
View File

@ -0,0 +1,29 @@
name: Run Unit Tests
on:
push:
branches:
- "**"
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'zulu'
- name: Setup Gradle
uses: gradle/gradle-build-action@v3
- name: Make gradlew executable
run: chmod +x ./gradlew
- name: Run unit tests
run: ./gradlew test --stacktrace

View File

@ -2,6 +2,7 @@ package no.iktdev.eventi.events
import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.stores.EventStore import no.iktdev.eventi.stores.EventStore
import java.util.UUID import java.util.UUID
@ -11,6 +12,7 @@ open class EventDispatcher(val eventStore: EventStore) {
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet() val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId } val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
val candidates = events val candidates = events
.filterNot { it is SignalEvent }
.filter { it.eventId !in derivedFromIds } .filter { it.eventId !in derivedFromIds }
.filter { it.eventId !in deletedEventIds } .filter { it.eventId !in deletedEventIds }

View File

@ -8,23 +8,30 @@ import no.iktdev.eventi.stores.EventStore
import java.time.Duration import java.time.Duration
import java.time.Instant import java.time.Instant
import java.util.UUID import java.util.UUID
import kotlin.collections.iterator
abstract class EventPollerImplementation( abstract class EventPollerImplementation(
private val eventStore: EventStore, private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue, private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher private val dispatcher: EventDispatcher
) { ) {
// Erstatter ikke lastSeenTime, men supplerer den private val log = KotlinLogging.logger {}
protected val refWatermark = mutableMapOf<UUID, Instant>()
// lastSeenTime brukes kun som scan hint /**
* Per-reference watermark:
* - first = last seen persistedAt
* - second = last seen persistedId
*/
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
/**
* Global scan hint (timestamp only).
* Used to avoid scanning entire table every time.
*/
var lastSeenTime: Instant = Instant.EPOCH var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2) open var backoff = Duration.ofSeconds(2)
protected set protected set
private val maxBackoff = Duration.ofMinutes(1) private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() { open suspend fun start() {
log.info { "EventPoller starting with initial backoff=$backoff" } log.info { "EventPoller starting with initial backoff=$backoff" }
@ -32,7 +39,7 @@ abstract class EventPollerImplementation(
try { try {
pollOnce() pollOnce()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() log.error(e) { "Error in poller loop" }
delay(backoff.toMillis()) delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff) backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
} }
@ -43,11 +50,11 @@ abstract class EventPollerImplementation(
val pollStartedAt = MyTime.utcNow() val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" } log.debug { "🔍 Polling for new events" }
// Global scan hint: kombiner refWatermark og lastSeenTime // Determine global scan start
val watermarkMin = refWatermark.values.minOrNull() val minRefTs = refWatermark.values.minOfOrNull { it.first }
val scanFrom = when (watermarkMin) { val scanFrom = when (minRefTs) {
null -> lastSeenTime null -> lastSeenTime
else -> maxOf(lastSeenTime, watermarkMin) else -> maxOf(lastSeenTime, minRefTs)
} }
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom) val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
@ -59,76 +66,73 @@ abstract class EventPollerImplementation(
return return
} }
// Vi har sett nye events globalt reset backoff // Reset backoff
backoff = Duration.ofSeconds(2) backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" } log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false var anyProcessed = false
// Track høyeste persistedAt vi har sett i denne runden // Track highest persistedAt seen globally this round
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt } val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) { for ((ref, eventsForRef) in grouped) {
val refSeen = refWatermark[ref] ?: Instant.EPOCH val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
// Filter new events using (timestamp, id) ordering
val newForRef = eventsForRef.filter { ev ->
ev.persistedAt > refSeenAt ||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
}
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
if (newForRef.isEmpty()) { if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since $refSeen" } log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
continue continue
} }
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch // If ref is busy, skip dispatch
if (dispatchQueue.isProcessing(ref)) { if (dispatchQueue.isProcessing(ref)) {
log.debug { "$ref is busy — deferring ${newForRef.size} events" } log.debug { "$ref is busy — deferring ${newForRef.size} events" }
continue continue
} }
// Hent full sekvens for ref (Eventi-invariant) // Fetch full sequence for dispatch
val fullLog = eventStore.getPersistedEventsFor(ref) val fullLog = eventStore.getPersistedEventsFor(ref)
val events = fullLog.mapNotNull { it.toEvent() } val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "🚀 Dispatching ${events.size} events for $ref" } log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher) dispatchQueue.dispatch(ref, events, dispatcher)
// Oppdater watermark for denne refen // Update watermark for this reference
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt } val maxEvent = newForRef.maxWith(
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1) compareBy({ it.persistedAt }, { it.id })
)
refWatermark[ref] = newWatermark val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
val newWatermarkId = maxEvent.id
refWatermark[ref] = newWatermarkAt to newWatermarkId
anyProcessed = true anyProcessed = true
log.debug { "⏩ Updated watermark for $ref$newWatermark" } log.debug { "⏩ Updated watermark for $ref($newWatermarkAt, id=$newWatermarkId)" }
} }
// Oppdater global scan hint uansett vi har sett nye events // Update global scan hint
// Dette hindrer livelock når alle events er <= watermark for sine refs
val newLastSeen = maxOf( val newLastSeen = maxOf(
lastSeenTime, lastSeenTime,
maxPersistedThisRound.plusNanos(1) maxPersistedThisRound.plusNanos(1)
) )
if (anyProcessed) { if (anyProcessed) {
// Behold intensjonen din: globalt hint basert på laveste watermark, val minRef = refWatermark.values.minOfOrNull { it.first }
// men aldri gå bakover i tid ift lastSeenTime lastSeenTime = when (minRef) {
val minRefWatermark = refWatermark.values.minOrNull()
lastSeenTime = when (minRefWatermark) {
null -> newLastSeen null -> newLastSeen
else -> maxOf(newLastSeen, minRefWatermark) else -> maxOf(newLastSeen, minRef)
} }
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" } log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
} else { } else {
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
// → trygt å flytte lastSeenTime forbi dem
lastSeenTime = newLastSeen lastSeenTime = newLastSeen
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" } log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
} }
} }
} }

View File

@ -4,7 +4,7 @@ import java.util.UUID
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
abstract class Event { abstract class Event {
var referenceId: UUID = UUID.randomUUID() lateinit var referenceId: UUID
protected set protected set
var eventId: UUID = UUID.randomUUID() var eventId: UUID = UUID.randomUUID()
private set private set
@ -43,6 +43,6 @@ abstract class DeleteEvent(
open val deletedEventId: UUID open val deletedEventId: UUID
) : Event() ) : Event()
abstract class SignalEvent(): Event()

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.jetbrains.annotations.VisibleForTesting import org.jetbrains.annotations.VisibleForTesting
import java.util.UUID import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.cancellation.CancellationException
@ -67,9 +68,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch { currentJob = getDispatcherForTask(task).launch {
try { try {
val result = onTask(task) val result = onTask(task)
reporter.markCompleted(task.taskId)
onComplete(task, result) onComplete(task, result)
} catch (e: CancellationException) { } catch (e: CancellationException) {
// Dette er en ekte kansellering // Dette er en ekte kansellering
onCancelled(task) onCancelled(task)
@ -88,14 +87,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
this@TaskListener.reporter = null this@TaskListener.reporter = null
} }
} }
return true return true
} }
abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event
override fun onError(task: Task, exception: Exception) { override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}") reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace() exception.printStackTrace()
reporter?.markFailed(task.referenceId, task.taskId) reporter?.markFailed(task.referenceId, task.taskId)
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception))
} }
override fun onComplete(task: Task, result: Event?) { override fun onComplete(task: Task, result: Event?) {
@ -111,6 +112,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob?.cancel() currentJob?.cancel()
heartbeatRunner?.cancel() heartbeatRunner?.cancel()
currentTask = null currentTask = null
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled))
} }
} }

View File

@ -8,6 +8,7 @@ import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.DeleteEvent import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
@ -19,12 +20,14 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
@DisplayName(""" @DisplayName(
"""
EventDispatcher EventDispatcher
Når hendelser dispatches til lyttere Når hendelser dispatches til lyttere
Hvis hendelsene inneholder avledede, slettede eller nye events Hvis hendelsene inneholder avledede, slettede eller nye events
skal dispatcheren håndtere filtrering, replays og historikk korrekt skal dispatcheren håndtere filtrering, replays og historikk korrekt
""") """
)
class EventDispatcherTest : TestBase() { class EventDispatcherTest : TestBase() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
@ -51,15 +54,17 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en TriggerEvent dispatches Når en TriggerEvent dispatches
Hvis en lytter produserer én DerivedEvent Hvis en lytter produserer én DerivedEvent
skal kun én ny event produseres og prosessen stoppe skal kun én ny event produseres og prosessen stoppe
""") """
)
fun shouldProduceOneEventAndStop() { fun shouldProduceOneEventAndStop() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val produced = eventStore.all().firstOrNull() val produced = eventStore.all().firstOrNull()
@ -72,15 +77,17 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en event allerede har avledet en DerivedEvent Når en event allerede har avledet en DerivedEvent
Hvis dispatcheren replays historikken Hvis dispatcheren replays historikken
skal ikke DerivedEvent produseres nytt skal ikke DerivedEvent produseres nytt
""") """
)
fun shouldSkipAlreadyDerivedEvents() { fun shouldSkipAlreadyDerivedEvents() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow()) val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
eventStore.persist(derived!!.toEvent()!!) // simulate prior production eventStore.persist(derived!!.toEvent()!!) // simulate prior production
@ -91,31 +98,37 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når flere events dispatches Når flere events dispatches
Hvis en lytter mottar en event Hvis en lytter mottar en event
skal hele historikken leveres i context skal hele historikken leveres i context
""") """
)
fun shouldPassFullContextToListener() { fun shouldPassFullContextToListener() {
val listener = ContextCapturingListener() val listener = ContextCapturingListener()
val e1 = TriggerEvent() val e1 = TriggerEvent().newReferenceId()
val e2 = OtherEvent() val e2 = OtherEvent().newReferenceId()
dispatcher.dispatch(e1.referenceId, listOf(e1, e2)) dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
assertEquals(2, listener.context.size) assertEquals(2, listener.context.size)
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en replay skjer Når en replay skjer
Hvis en event allerede har produsert en DerivedEvent Hvis en event allerede har produsert en DerivedEvent
skal ikke DerivedEvent produseres nytt skal ikke DerivedEvent produseres nytt
""") """
)
fun shouldBehaveDeterministicallyAcrossReplays() { fun shouldBehaveDeterministicallyAcrossReplays() {
val referenceId = UUID.randomUUID()
ProducingListener() ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().usingReferenceId(referenceId)
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() } val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
@ -125,12 +138,16 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en DeleteEvent peker en tidligere event Når en DeleteEvent peker en tidligere event
Hvis dispatcheren filtrerer kandidater Hvis dispatcheren filtrerer kandidater
skal slettede events ikke leveres som kandidater skal slettede events ikke leveres som kandidater
""") """
)
fun shouldNotDeliverDeletedEventsAsCandidates() { fun shouldNotDeliverDeletedEventsAsCandidates() {
val referenceId = UUID.randomUUID()
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
@ -141,11 +158,10 @@ class EventDispatcherTest : TestBase() {
} }
} }
// Original hendelse // Original hendelse
val original = TriggerEvent() val original = TriggerEvent().usingReferenceId(referenceId)
// Slettehendelse som peker på original // Slettehendelse som peker på original
val deleted = object : DeleteEvent(original.eventId) { val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
}
// Dispatch med begge hendelser // Dispatch med begge hendelser
dispatcher.dispatch(original.referenceId, listOf(original, deleted)) dispatcher.dispatch(original.referenceId, listOf(original, deleted))
@ -162,13 +178,17 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en DeleteEvent dispatches alene Når en DeleteEvent dispatches alene
Hvis en lytter reagerer DeleteEvent Hvis en lytter reagerer DeleteEvent
skal DeleteEvent leveres som kandidat skal DeleteEvent leveres som kandidat
""") """
)
fun shouldDeliverDeleteEventToListenersThatReactToIt() { fun shouldDeliverDeleteEventToListenersThatReactToIt() {
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
val referenceId = UUID.randomUUID()
object : EventListener() { object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? { override fun onEvent(event: Event, history: List<Event>): Event? {
if (event is DeleteEvent) received += event if (event is DeleteEvent) received += event
@ -176,22 +196,24 @@ class EventDispatcherTest : TestBase() {
} }
} }
val deleted = object : DeleteEvent(UUID.randomUUID()) {} val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) }
dispatcher.dispatch(deleted.referenceId, listOf(deleted)) dispatcher.dispatch(deleted.referenceId, listOf(deleted))
assertTrue(received.contains(deleted)) assertTrue(received.contains(deleted))
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en event har avledet en ny event Når en event har avledet en ny event
Hvis dispatcheren replays historikken Hvis dispatcheren replays historikken
skal ikke original-eventen leveres som kandidat igjen skal ikke original-eventen leveres som kandidat igjen
""") """
)
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() { fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
ProducingListener() ProducingListener()
val trigger = TriggerEvent() val trigger = TriggerEvent().newReferenceId()
// Første dispatch: trigger produserer en DerivedEvent // Første dispatch: trigger produserer en DerivedEvent
dispatcher.dispatch(trigger.referenceId, listOf(trigger)) dispatcher.dispatch(trigger.referenceId, listOf(trigger))
@ -211,16 +233,18 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en DeleteEvent slettet en tidligere event Når en DeleteEvent slettet en tidligere event
Hvis dispatcheren bygger historikk Hvis dispatcheren bygger historikk
skal slettede events ikke være med i history skal slettede events ikke være med i history
""") """
)
fun historyShouldExcludeDeletedEvents() { fun historyShouldExcludeDeletedEvents() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent() val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {} val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) }
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()
@ -238,16 +262,18 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en DeleteEvent slettet en event Når en DeleteEvent slettet en event
Hvis andre events fortsatt er gyldige Hvis andre events fortsatt er gyldige
skal history kun inneholde de ikke-slettede events skal history kun inneholde de ikke-slettede events
""") """
)
fun historyShouldKeepNonDeletedEvents() { fun historyShouldKeepNonDeletedEvents() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val referenceId = UUID.randomUUID()
val e1 = TriggerEvent() val e1 = TriggerEvent().usingReferenceId(referenceId)
val e2 = OtherEvent() val e2 = OtherEvent().usingReferenceId(referenceId)
val deleted = object : DeleteEvent(e1.eventId) {} val deleted = object : DeleteEvent(e1.eventId) {}
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()
@ -267,16 +293,18 @@ class EventDispatcherTest : TestBase() {
} }
@Test @Test
@DisplayName(""" @DisplayName(
"""
Når en DeleteEvent er kandidat Når en DeleteEvent er kandidat
Hvis historikken kun inneholder slettede events Hvis historikken kun inneholder slettede events
skal history være tom skal history være tom
""") """
)
fun deleteEventShouldBeDeliveredButHistoryEmpty() { fun deleteEventShouldBeDeliveredButHistoryEmpty() {
val dispatcher = EventDispatcher(eventStore) val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent() val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {} val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
var receivedEvent: Event? = null var receivedEvent: Event? = null
var receivedHistory: List<Event> = emptyList() var receivedHistory: List<Event> = emptyList()
@ -295,6 +323,54 @@ class EventDispatcherTest : TestBase() {
assertTrue(receivedHistory.isEmpty()) assertTrue(receivedHistory.isEmpty())
} }
@Test
@DisplayName(
"""
Når en SignalEvent dispatches
Hvis SignalEvent ikke skal være kandidat
skal den ikke leveres til lyttere, men fortsatt være i historikken
"""
)
fun shouldNotDeliverSignalEventAsCandidate() {
// Arrange
class TestSignalEvent : SignalEvent()
EventTypeRegistry.register(listOf(TestSignalEvent::class.java,))
val received = mutableListOf<Event>()
var finalHistory: List<Event>? = null
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
received += event
finalHistory = history
return null
}
}
val refId = UUID.randomUUID()
val trigger = TriggerEvent().usingReferenceId(refId)
val signal = TestSignalEvent().usingReferenceId(refId)
// Act
dispatcher.dispatch(trigger.referenceId, listOf(trigger, signal))
// Assert
// 1) TriggerEvent skal leveres
assertTrue(received.any { it is TriggerEvent }) {
"TriggerEvent skal leveres som kandidat"
}
// 2) SignalEvent skal IKKE leveres
assertFalse(received.any { it is TestSignalEvent }) {
"SignalEvent skal ikke leveres som kandidat"
}
assertNotNull(finalHistory)
assertTrue(finalHistory!!.any { it is TestSignalEvent }) {
"SignalEvent skal være i historikken selv om den ikke er kandidat"
}
}
// --- Test helpers --- // --- Test helpers ---
class ProducingListener : EventListener() { class ProducingListener : EventListener() {

View File

@ -39,7 +39,7 @@ class ZDSTest {
fun scenario1() { fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java) EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello") val echo = EchoEvent("hello").newReferenceId()
val persisted = echo.toPersisted(id = 1L) val persisted = echo.toPersisted(id = 1L)
val restored = persisted!!.toEvent() val restored = persisted!!.toEvent()

View File

@ -1,19 +1,23 @@
package no.iktdev.eventi.events package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitAll import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.awaitAll
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.MyTime import no.iktdev.eventi.MyTime
import no.iktdev.eventi.TestBase import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse import org.junit.jupiter.api.Assertions.assertFalse
@ -33,9 +37,7 @@ Så skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
""") """)
class EventPollerImplementationTest : TestBase() { class EventPollerImplementationTest : TestBase() {
val dispatcher = EventDispatcher(eventStore) private val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
@BeforeEach @BeforeEach
fun setup() { fun setup() {
@ -59,6 +61,10 @@ class EventPollerImplementationTest : TestBase() {
skal alle referenceId-er dispatch'es og lastSeenTime oppdateres skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
""") """)
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest { fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val dispatched = ConcurrentHashMap.newKeySet<UUID>() val dispatched = ConcurrentHashMap.newKeySet<UUID>()
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>() val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
@ -93,6 +99,9 @@ class EventPollerImplementationTest : TestBase() {
skal backoff øke, og resettes når nye events ankommer skal backoff øke, og resettes når nye events ankommer
""") """)
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest { fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff fun currentBackoff(): Duration = backoff
} }
@ -121,6 +130,10 @@ class EventPollerImplementationTest : TestBase() {
skal polleren gruppere og dispatch'e alle tre i én batch skal polleren gruppere og dispatch'e alle tre i én batch
""") """)
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest { fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val refId = UUID.randomUUID() val refId = UUID.randomUUID()
val received = mutableListOf<Event>() val received = mutableListOf<Event>()
val done = CompletableDeferred<Unit>() val done = CompletableDeferred<Unit>()
@ -157,8 +170,8 @@ class EventPollerImplementationTest : TestBase() {
skal polleren ignorere dem skal polleren ignorere dem
""") """)
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest { fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
val refId = UUID.randomUUID() val testDispatcher = StandardTestDispatcher(testScheduler)
val ignored = TriggerEvent().usingReferenceId(refId) val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) { val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
init { init {
@ -166,6 +179,9 @@ class EventPollerImplementationTest : TestBase() {
} }
} }
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
eventStore.persist(ignored) eventStore.persist(ignored)
testPoller.pollOnce() testPoller.pollOnce()
@ -180,7 +196,12 @@ class EventPollerImplementationTest : TestBase() {
skal begge events prosesseres, men uten å produsere duplikate derived events skal begge events prosesseres, men uten å produsere duplikate derived events
""") """)
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest { fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java)) EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
val channel = Channel<Event>(Channel.UNLIMITED) val channel = Channel<Event>(Channel.UNLIMITED)
val handled = mutableListOf<Event>() val handled = mutableListOf<Event>()
@ -193,16 +214,14 @@ class EventPollerImplementationTest : TestBase() {
} }
} }
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {} val original = EchoEvent("Hello").newReferenceId()
val original = EchoEvent("Hello")
eventStore.persist(original) eventStore.persist(original)
poller.pollOnce() poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) { withContext(testDispatcher) {
withTimeout(Duration.ofMinutes(1).toMillis()) { withTimeout(60_000) {
repeat(1) { channel.receive() } channel.receive()
} }
} }
@ -211,9 +230,9 @@ class EventPollerImplementationTest : TestBase() {
poller.pollOnce() poller.pollOnce()
withContext(Dispatchers.Default.limitedParallelism(1)) { withContext(testDispatcher) {
withTimeout(Duration.ofMinutes(1).toMillis()) { withTimeout(60_000) {
repeat(1) { channel.receive() } channel.receive()
} }
} }

View File

@ -60,6 +60,42 @@ class PollerStartLoopTest : TestBase() {
store.persistAt(e, time) store.persistAt(e, time)
} }
@Test
@DisplayName("""
Når to events har identisk persistedAt
Hvis polleren kjører
skal begge events prosesseres og ingen mistes
""")
fun `poller handles same-timestamp events without losing any`() = runTest {
val ref = UUID.randomUUID()
val ts = Instant.parse("2025-01-01T12:00:00Z")
// Two events with same timestamp but different IDs
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e1, ts) // id=1
store.persistAt(e2, ts) // id=2
poller.startFor(iterations = 1)
// Verify dispatch happened
assertThat(dispatcher.dispatched).hasSize(1)
val (_, events) = dispatcher.dispatched.single()
// Both events must be present
assertThat(events.map { it.eventId })
.hasSize(2)
.doesNotHaveDuplicates()
// Watermark must reflect highest ID
val wm = poller.watermarkFor(ref)
assertThat(wm!!.first).isEqualTo(ts)
assertThat(wm.second).isEqualTo(2)
}
@Test @Test
@DisplayName(""" @DisplayName("""
Når polleren kjører flere iterasjoner uten events Når polleren kjører flere iterasjoner uten events
@ -271,11 +307,15 @@ class PollerStartLoopTest : TestBase() {
poller.startFor(iterations = 1) poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark // A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1) assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark // B skal ha flyttet watermark (på timestamp-nivå)
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1) val wmB2 = poller.watermarkFor(refB)
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
} }
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk") @DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@ -433,7 +473,7 @@ class PollerStartLoopTest : TestBase() {
// Sett watermark høyt (polleren setter watermark selv i ekte drift, // Sett watermark høyt (polleren setter watermark selv i ekte drift,
// men i denne testen må vi simulere det) // men i denne testen må vi simulere det)
poller.setWatermarkFor(ref, t(100)) poller.setWatermarkFor(ref, t(100), id = 999)
// Sett lastSeenTime bak eventen // Sett lastSeenTime bak eventen
poller.lastSeenTime = t(0) poller.lastSeenTime = t(0)

View File

@ -17,8 +17,6 @@ class TestablePoller(
val scope: TestScope val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView { ) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) { suspend fun startFor(iterations: Int) {
repeat(iterations) { repeat(iterations) {
try { try {
@ -32,19 +30,17 @@ class TestablePoller(
} }
} }
override fun watermarkFor(ref: UUID): Instant? { override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
return refWatermark[ref]?.let { return refWatermark[ref]
return it
}
} }
override fun setWatermarkFor(ref: UUID, time: Instant) { override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
refWatermark[ref] = time refWatermark[ref] = time to id
} }
} }
interface WatermarkDebugView { interface WatermarkDebugView {
fun watermarkFor(ref: UUID): Instant? fun watermarkFor(ref: UUID): Pair<Instant, Long>?
fun setWatermarkFor(ref: UUID, time: Instant) fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
} }

View File

@ -5,6 +5,7 @@ import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
@ -19,6 +20,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -32,6 +42,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -45,6 +64,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -57,6 +85,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }

View File

@ -2,13 +2,13 @@ package no.iktdev.eventi.tasks
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield import kotlinx.coroutines.yield
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -23,32 +23,29 @@ Så skal state, heartbeat og cleanup fungere korrekt
""") """)
class TaskListenerTest { class TaskListenerTest {
// -------------------------
// Fake Task + Reporter
// -------------------------
class FakeTask : Task() class FakeTask : Task()
class FakeReporter : TaskReporter { class FakeReporter : TaskReporter {
var claimed = false var claimed = false
var consumed = false var completed = false
var logs = mutableListOf<String>() var failed = false
var events = mutableListOf<Event>() var cancelled = false
val logs = mutableListOf<String>()
val events = mutableListOf<Event>()
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true } override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
override fun markCompleted(taskId: UUID) { consumed = true } override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { consumed = true } override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true }
override fun updateProgress(taskId: UUID, progress: Int) {} override fun updateProgress(taskId: UUID, progress: Int) {}
override fun publishEvent(event: Event) { events.add(event) } override fun publishEvent(event: Event) { events.add(event) }
override fun updateLastSeen(taskId: UUID) {} override fun updateLastSeen(taskId: UUID) {}
override fun log(taskId: UUID, message: String) { logs.add(message) } override fun log(taskId: UUID, message: String) { logs.add(message) }
} }
// ------------------------- // ---------------------------------------------------------
// Tests // 1 — Heartbeat starter og stopper riktig
// ------------------------- // ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask starter heartbeat-runner Når onTask starter heartbeat-runner
@ -58,13 +55,15 @@ class TaskListenerTest {
fun heartbeatStartsAndStopsCorrectly() = runTest { fun heartbeatStartsAndStopsCorrectly() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null var heartbeatRan = false
var heartbeatRan: Boolean = false
private set
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -72,25 +71,18 @@ class TaskListenerTest {
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Gi heartbeat en sjanse til å kjøre
yield() yield()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter) listener.currentJob?.join()
assertTrue(accepted)
listener.currentJob!!.join()
assertNotNull(listener.heartbeatStarted)
assertFalse(listener.heartbeatStarted!!.isActive)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
@ -98,6 +90,9 @@ class TaskListenerTest {
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 2 — Heartbeat blokkerer ikke annen jobb
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når heartbeat kjører i bakgrunnen Når heartbeat kjører i bakgrunnen
@ -110,62 +105,51 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler annen coroutine-oppgave (VideoTaskListener/Converter)
launch { launch {
delay(30) delay(30)
otherWorkCompleted.complete(Unit) otherWorkCompleted.complete(Unit)
} }
// ⭐ Ikke fullfør onTask før testen sier det
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på annen jobb
otherWorkCompleted.await() otherWorkCompleted.await()
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob) assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive) assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 3 — Heartbeat + CPU + IO arbeid
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når heartbeat kjører og flere parallelle jobber startes Når heartbeat kjører og flere parallelle jobber startes
@ -179,73 +163,56 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler Converter (CPU)
launch(Dispatchers.Default) { launch(Dispatchers.Default) {
repeat(1000) { /* CPU work */ } repeat(1000) {}
converterDone.complete(Unit) converterDone.complete(Unit)
} }
// Simuler VideoTaskListener (IO)
launch(Dispatchers.IO) { launch(Dispatchers.IO) {
delay(40) delay(40)
videoDone.complete(Unit) videoDone.complete(Unit)
} }
// ⭐ Vent til testen sier "nå kan du fullføre"
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på begge "andre" oppgaver
converterDone.await() converterDone.await()
videoDone.await() videoDone.await()
// ⭐ Verifiser at begge faktisk ble fullført
assertTrue(converterDone.isCompleted)
assertTrue(videoDone.isCompleted)
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 4 — Arbeid fullføres, heartbeat kjører
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask gjør ferdig arbeid Når onTask gjør ferdig arbeid
@ -257,24 +224,25 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
onTaskCalled = true onTaskCalled = true
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler arbeid
delay(20) delay(20)
// ⭐ signaliser at arbeidet er ferdig
workCompleted.complete(Unit) workCompleted.complete(Unit)
return object : Event() {} return object : Event() {}
@ -282,34 +250,23 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
// ⭐ Verifiser at arbeidet faktisk ble fullført
workCompleted.await() workCompleted.await()
listener.currentJob?.join()
// Vent på jobben
listener.currentJob!!.join()
// onTask ble kalt
assertTrue(listener.onTaskCalled) assertTrue(listener.onTaskCalled)
// Heartbeat ble startet
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 5 — accept() returnerer false når busy
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når listener er opptatt med en task Når listener er opptatt med en task
@ -321,36 +278,34 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Hold jobben i live
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task1 = FakeTask()
val task2 = FakeTask()
// Første task aksepteres assertTrue(listener.accept(FakeTask(), reporter))
val accepted1 = listener.accept(task1, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertTrue(accepted1)
// Listener er busy → andre task skal avvises
val accepted2 = listener.accept(task2, reporter)
assertFalse(accepted2)
// Fullfør første task
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 6 — accept() returnerer false når unsupported
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når supports() returnerer false Når supports() returnerer false
@ -360,21 +315,26 @@ class TaskListenerTest {
fun acceptReturnsFalseWhenUnsupported() = runTest { fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = false override fun supports(task: Task) = false
override suspend fun onTask(task: Task): Event? = error("Should not be called") override suspend fun onTask(task: Task): Event? = error("Should not be called")
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask()
val accepted = listener.accept(task, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertFalse(accepted)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 7 — onError kalles når onTask kaster
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask kaster en exception Når onTask kaster en exception
@ -386,6 +346,11 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -399,22 +364,19 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask().newReferenceId() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent på error-path
errorLogged.await() errorLogged.await()
// ⭐ Vent på at cleanup i finally kjører
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 8 — onCancelled kalles når jobben kanselleres
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når jobben kanselleres mens onTask kjører Når jobben kanselleres mens onTask kjører
@ -427,11 +389,16 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
allowStart.complete(Unit) allowStart.complete(Unit)
delay(Long.MAX_VALUE) // hold jobben i live delay(Long.MAX_VALUE)
return null return null
} }
@ -442,28 +409,22 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask().newReferenceId() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent til onTask har startet
allowStart.await() allowStart.await()
// Kanseller jobben
listener.currentJob!!.cancel() listener.currentJob!!.cancel()
// Vent til onCancelled() ble kalt
cancelledCalled.await() cancelledCalled.await()
// ⭐ Vent til cleanup i finally har kjørt
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 9 — Sekvensiell kjøring uten statelekkasje
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når listener prosesserer to tasks sekvensielt Når listener prosesserer to tasks sekvensielt
@ -471,7 +432,10 @@ class TaskListenerTest {
skal ingen state lekke mellom tasks skal ingen state lekke mellom tasks
""") """)
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest { fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
val started1 = CompletableDeferred<Unit>()
val finish1 = CompletableDeferred<Unit>() val finish1 = CompletableDeferred<Unit>()
val started2 = CompletableDeferred<Unit>()
val finish2 = CompletableDeferred<Unit>() val finish2 = CompletableDeferred<Unit>()
val listener = object : TaskListener() { val listener = object : TaskListener() {
@ -479,41 +443,50 @@ class TaskListenerTest {
var callCount = 0 var callCount = 0
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
callCount++ callCount++
if (callCount == 1) finish1.await()
if (callCount == 2) finish2.await() if (callCount == 1) {
started1.complete(Unit)
finish1.await()
}
if (callCount == 2) {
started2.complete(Unit)
finish2.await()
}
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
// Task 1 listener.accept(FakeTask(), reporter)
val task1 = FakeTask() started1.await()
listener.accept(task1, reporter)
finish1.complete(Unit) finish1.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Verifiser cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// Task 2 listener.accept(FakeTask(), reporter)
val task2 = FakeTask() started2.await()
listener.accept(task2, reporter)
finish2.complete(Unit) finish2.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Verifiser cleanup igjen
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// onTask ble kalt to ganger
assertEquals(2, listener.callCount) assertEquals(2, listener.callCount)
} }
} }

View File

@ -10,6 +10,7 @@ import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore import no.iktdev.eventi.stores.TaskStore
import no.iktdev.eventi.testUtil.multiply import no.iktdev.eventi.testUtil.multiply
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
@ -72,6 +73,14 @@ class TaskPollerImplementationTest : TestBase() {
fun getJob() = currentJob fun getJob() = currentJob
override fun getWorkerId() = this.javaClass.simpleName override fun getWorkerId() = this.javaClass.simpleName
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = task is EchoTask override fun supports(task: Task) = task is EchoTask
override suspend fun onTask(task: Task): Event { override suspend fun onTask(task: Task): Event {
@ -105,7 +114,7 @@ class TaskPollerImplementationTest : TestBase() {
val listener = EchoListener() val listener = EchoListener()
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {} val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}) val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() })
taskStore.persist(task) taskStore.persist(task)
poller.pollOnce() poller.pollOnce()

View File

@ -0,0 +1,14 @@
package no.iktdev.eventi.testUtil
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import no.iktdev.eventi.events.SequenceDispatchQueue
class TestSequenceDispatchQueue(
maxConcurrency: Int,
dispatcher: CoroutineDispatcher
) : SequenceDispatchQueue(
maxConcurrency,
CoroutineScope(dispatcher + SupervisorJob())
)