Compare commits

..

No commits in common. "master" and "v1.0-rc36" have entirely different histories.

20 changed files with 328 additions and 518 deletions

View File

@ -1,29 +0,0 @@
name: Run Unit Tests
on:
push:
branches:
- "**"
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up JDK 21
uses: actions/setup-java@v4
with:
java-version: '21'
distribution: 'zulu'
- name: Setup Gradle
uses: gradle/gradle-build-action@v3
- name: Make gradlew executable
run: chmod +x ./gradlew
- name: Run unit tests
run: ./gradlew test --stacktrace

View File

@ -2,7 +2,6 @@ package no.iktdev.eventi.events
import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.stores.EventStore
import java.util.UUID
@ -12,7 +11,6 @@ open class EventDispatcher(val eventStore: EventStore) {
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
val candidates = events
.filterNot { it is SignalEvent }
.filter { it.eventId !in derivedFromIds }
.filter { it.eventId !in deletedEventIds }

View File

@ -8,30 +8,23 @@ import no.iktdev.eventi.stores.EventStore
import java.time.Duration
import java.time.Instant
import java.util.UUID
import kotlin.collections.iterator
abstract class EventPollerImplementation(
private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher
) {
private val log = KotlinLogging.logger {}
// Erstatter ikke lastSeenTime, men supplerer den
protected val refWatermark = mutableMapOf<UUID, Instant>()
/**
* Per-reference watermark:
* - first = last seen persistedAt
* - second = last seen persistedId
*/
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
/**
* Global scan hint (timestamp only).
* Used to avoid scanning entire table every time.
*/
// lastSeenTime brukes kun som scan hint
var lastSeenTime: Instant = Instant.EPOCH
open var backoff = Duration.ofSeconds(2)
protected set
private val maxBackoff = Duration.ofMinutes(1)
private val log = KotlinLogging.logger {}
open suspend fun start() {
log.info { "EventPoller starting with initial backoff=$backoff" }
@ -39,7 +32,7 @@ abstract class EventPollerImplementation(
try {
pollOnce()
} catch (e: Exception) {
log.error(e) { "Error in poller loop" }
e.printStackTrace()
delay(backoff.toMillis())
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
}
@ -50,11 +43,11 @@ abstract class EventPollerImplementation(
val pollStartedAt = MyTime.utcNow()
log.debug { "🔍 Polling for new events" }
// Determine global scan start
val minRefTs = refWatermark.values.minOfOrNull { it.first }
val scanFrom = when (minRefTs) {
// Global scan hint: kombiner refWatermark og lastSeenTime
val watermarkMin = refWatermark.values.minOrNull()
val scanFrom = when (watermarkMin) {
null -> lastSeenTime
else -> maxOf(lastSeenTime, minRefTs)
else -> maxOf(lastSeenTime, watermarkMin)
}
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
@ -66,73 +59,76 @@ abstract class EventPollerImplementation(
return
}
// Reset backoff
// Vi har sett nye events globalt reset backoff
backoff = Duration.ofSeconds(2)
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
val grouped = newPersisted.groupBy { it.referenceId }
var anyProcessed = false
// Track highest persistedAt seen globally this round
// Track høyeste persistedAt vi har sett i denne runden
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
for ((ref, eventsForRef) in grouped) {
val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
// Filter new events using (timestamp, id) ordering
val newForRef = eventsForRef.filter { ev ->
ev.persistedAt > refSeenAt ||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
}
val refSeen = refWatermark[ref] ?: Instant.EPOCH
// Finn kun nye events for denne refen
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
if (newForRef.isEmpty()) {
log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
log.debug { "🧊 No new events for $ref since $refSeen" }
continue
}
// If ref is busy, skip dispatch
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch
if (dispatchQueue.isProcessing(ref)) {
log.debug { "$ref is busy — deferring ${newForRef.size} events" }
continue
}
// Fetch full sequence for dispatch
// Hent full sekvens for ref (Eventi-invariant)
val fullLog = eventStore.getPersistedEventsFor(ref)
val events = fullLog.mapNotNull { it.toEvent() }
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
dispatchQueue.dispatch(ref, events, dispatcher)
// Update watermark for this reference
val maxEvent = newForRef.maxWith(
compareBy({ it.persistedAt }, { it.id })
)
// Oppdater watermark for denne refen
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
val newWatermarkId = maxEvent.id
refWatermark[ref] = newWatermarkAt to newWatermarkId
refWatermark[ref] = newWatermark
anyProcessed = true
log.debug { "⏩ Updated watermark for $ref($newWatermarkAt, id=$newWatermarkId)" }
log.debug { "⏩ Updated watermark for $ref$newWatermark" }
}
// Update global scan hint
// Oppdater global scan hint uansett vi har sett nye events
// Dette hindrer livelock når alle events er <= watermark for sine refs
val newLastSeen = maxOf(
lastSeenTime,
maxPersistedThisRound.plusNanos(1)
)
if (anyProcessed) {
val minRef = refWatermark.values.minOfOrNull { it.first }
lastSeenTime = when (minRef) {
// Behold intensjonen din: globalt hint basert på laveste watermark,
// men aldri gå bakover i tid ift lastSeenTime
val minRefWatermark = refWatermark.values.minOrNull()
lastSeenTime = when (minRefWatermark) {
null -> newLastSeen
else -> maxOf(newLastSeen, minRef)
else -> maxOf(newLastSeen, minRefWatermark)
}
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
} else {
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
// → trygt å flytte lastSeenTime forbi dem
lastSeenTime = newLastSeen
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
}
}
}

View File

@ -4,7 +4,7 @@ import java.util.UUID
@Suppress("UNCHECKED_CAST")
abstract class Event {
lateinit var referenceId: UUID
var referenceId: UUID = UUID.randomUUID()
protected set
var eventId: UUID = UUID.randomUUID()
private set
@ -43,6 +43,6 @@ abstract class DeleteEvent(
open val deletedEventId: UUID
) : Event()
abstract class SignalEvent(): Event()

View File

@ -9,7 +9,6 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.jetbrains.annotations.VisibleForTesting
import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException
@ -68,7 +67,9 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch {
try {
val result = onTask(task)
reporter.markCompleted(task.taskId)
onComplete(task, result)
} catch (e: CancellationException) {
// Dette er en ekte kansellering
onCancelled(task)
@ -87,16 +88,14 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
this@TaskListener.reporter = null
}
}
return true
}
abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event
override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace()
reporter?.markFailed(task.referenceId, task.taskId)
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception))
reporter?.markFailed(task.taskId)
}
override fun onComplete(task: Task, result: Event?) {
@ -108,11 +107,10 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
}
override fun onCancelled(task: Task) {
reporter!!.markCancelled(task.referenceId, task.taskId)
reporter!!.markCancelled(task.taskId)
currentJob?.cancel()
heartbeatRunner?.cancel()
currentTask = null
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled))
}
}
@ -136,8 +134,8 @@ interface TaskReporter {
fun markClaimed(taskId: UUID, workerId: String)
fun updateLastSeen(taskId: UUID)
fun markCompleted(taskId: UUID)
fun markFailed(referenceId: UUID, taskId: UUID)
fun markCancelled(referenceId: UUID, taskId: UUID)
fun markFailed(taskId: UUID)
fun markCancelled(taskId: UUID)
fun updateProgress(taskId: UUID, progress: Int)
fun log(taskId: UUID, message: String)
fun publishEvent(event: Event)

View File

@ -8,7 +8,7 @@ import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.DeleteEvent
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.SignalEvent
import no.iktdev.eventi.models.Metadata
import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.assertEquals
@ -20,14 +20,12 @@ import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.util.UUID
@DisplayName(
"""
@DisplayName("""
EventDispatcher
Når hendelser dispatches til lyttere
Hvis hendelsene inneholder avledede, slettede eller nye events
skal dispatcheren håndtere filtrering, replays og historikk korrekt
"""
)
""")
class EventDispatcherTest : TestBase() {
val dispatcher = EventDispatcher(eventStore)
@ -36,6 +34,9 @@ class EventDispatcherTest : TestBase() {
class TriggerEvent : Event()
class OtherEvent : Event()
class DummyEvent : Event() {
fun putMetadata(metadata: Metadata) {
this.metadata = metadata
}
}
@BeforeEach
@ -54,17 +55,15 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en TriggerEvent dispatches
Hvis en lytter produserer én DerivedEvent
skal kun én ny event produseres og prosessen stoppe
"""
)
""")
fun shouldProduceOneEventAndStop() {
ProducingListener()
val listener = ProducingListener()
val trigger = TriggerEvent().newReferenceId()
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val produced = eventStore.all().firstOrNull()
@ -72,63 +71,55 @@ class EventDispatcherTest : TestBase() {
val event = produced!!.toEvent()
assertThat(event!!.metadata.derivedFromId).hasSize(1)
assertThat(event.metadata.derivedFromId).contains(trigger.eventId)
assertThat(event!!.metadata.derivedFromId).contains(trigger.eventId)
assertTrue(event is DerivedEvent)
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en event allerede har avledet en DerivedEvent
Hvis dispatcheren replays historikken
skal ikke DerivedEvent produseres nytt
"""
)
""")
fun shouldSkipAlreadyDerivedEvents() {
ProducingListener()
val listener = ProducingListener()
val trigger = TriggerEvent().newReferenceId()
val trigger = TriggerEvent()
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent()!!))
dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived!!.toEvent()!!))
assertEquals(1, eventStore.all().size)
}
@Test
@DisplayName(
"""
@DisplayName("""
Når flere events dispatches
Hvis en lytter mottar en event
skal hele historikken leveres i context
"""
)
""")
fun shouldPassFullContextToListener() {
val listener = ContextCapturingListener()
val e1 = TriggerEvent().newReferenceId()
val e2 = OtherEvent().newReferenceId()
val e1 = TriggerEvent()
val e2 = OtherEvent()
dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
assertEquals(2, listener.context.size)
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en replay skjer
Hvis en event allerede har produsert en DerivedEvent
skal ikke DerivedEvent produseres nytt
"""
)
""")
fun shouldBehaveDeterministicallyAcrossReplays() {
val referenceId = UUID.randomUUID()
val listener = ProducingListener()
ProducingListener()
val trigger = TriggerEvent().usingReferenceId(referenceId)
val trigger = TriggerEvent()
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
@ -138,16 +129,12 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en DeleteEvent peker en tidligere event
Hvis dispatcheren filtrerer kandidater
skal slettede events ikke leveres som kandidater
"""
)
""")
fun shouldNotDeliverDeletedEventsAsCandidates() {
val referenceId = UUID.randomUUID()
val dispatcher = EventDispatcher(eventStore)
val received = mutableListOf<Event>()
@ -158,10 +145,11 @@ class EventDispatcherTest : TestBase() {
}
}
// Original hendelse
val original = TriggerEvent().usingReferenceId(referenceId)
val original = TriggerEvent()
// Slettehendelse som peker på original
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
val deleted = object : DeleteEvent(original.eventId) {
}
// Dispatch med begge hendelser
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
@ -178,42 +166,36 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en DeleteEvent dispatches alene
Hvis en lytter reagerer DeleteEvent
skal DeleteEvent leveres som kandidat
"""
)
""")
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
val received = mutableListOf<Event>()
val referenceId = UUID.randomUUID()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
val listener = object : EventListener() {
override fun onEvent(event: Event, context: List<Event>): Event? {
if (event is DeleteEvent) received += event
return null
}
}
val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) }
val deleted = object : DeleteEvent(UUID.randomUUID()) {}
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
assertTrue(received.contains(deleted))
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en event har avledet en ny event
Hvis dispatcheren replays historikken
skal ikke original-eventen leveres som kandidat igjen
"""
)
""")
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
ProducingListener()
val listener = ProducingListener()
val trigger = TriggerEvent().newReferenceId()
val trigger = TriggerEvent()
// Første dispatch: trigger produserer en DerivedEvent
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
@ -233,22 +215,20 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en DeleteEvent slettet en tidligere event
Hvis dispatcheren bygger historikk
skal slettede events ikke være med i history
"""
)
""")
fun historyShouldExcludeDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) }
val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
@ -262,23 +242,21 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en DeleteEvent slettet en event
Hvis andre events fortsatt er gyldige
skal history kun inneholde de ikke-slettede events
"""
)
""")
fun historyShouldKeepNonDeletedEvents() {
val dispatcher = EventDispatcher(eventStore)
val referenceId = UUID.randomUUID()
val e1 = TriggerEvent().usingReferenceId(referenceId)
val e2 = OtherEvent().usingReferenceId(referenceId)
val e1 = TriggerEvent()
val e2 = OtherEvent()
val deleted = object : DeleteEvent(e1.eventId) {}
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedHistory = history
return null
@ -293,23 +271,21 @@ class EventDispatcherTest : TestBase() {
}
@Test
@DisplayName(
"""
@DisplayName("""
Når en DeleteEvent er kandidat
Hvis historikken kun inneholder slettede events
skal history være tom
"""
)
""")
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
val dispatcher = EventDispatcher(eventStore)
val original = TriggerEvent().newReferenceId()
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
val original = TriggerEvent()
val deleted = object : DeleteEvent(original.eventId) {}
var receivedEvent: Event? = null
var receivedHistory: List<Event> = emptyList()
object : EventListener() {
val listener = object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
receivedEvent = event
receivedHistory = history
@ -323,66 +299,18 @@ class EventDispatcherTest : TestBase() {
assertTrue(receivedHistory.isEmpty())
}
@Test
@DisplayName(
"""
Når en SignalEvent dispatches
Hvis SignalEvent ikke skal være kandidat
skal den ikke leveres til lyttere, men fortsatt være i historikken
"""
)
fun shouldNotDeliverSignalEventAsCandidate() {
// Arrange
class TestSignalEvent : SignalEvent()
EventTypeRegistry.register(listOf(TestSignalEvent::class.java,))
val received = mutableListOf<Event>()
var finalHistory: List<Event>? = null
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
received += event
finalHistory = history
return null
}
}
val refId = UUID.randomUUID()
val trigger = TriggerEvent().usingReferenceId(refId)
val signal = TestSignalEvent().usingReferenceId(refId)
// Act
dispatcher.dispatch(trigger.referenceId, listOf(trigger, signal))
// Assert
// 1) TriggerEvent skal leveres
assertTrue(received.any { it is TriggerEvent }) {
"TriggerEvent skal leveres som kandidat"
}
// 2) SignalEvent skal IKKE leveres
assertFalse(received.any { it is TestSignalEvent }) {
"SignalEvent skal ikke leveres som kandidat"
}
assertNotNull(finalHistory)
assertTrue(finalHistory!!.any { it is TestSignalEvent }) {
"SignalEvent skal være i historikken selv om den ikke er kandidat"
}
}
// --- Test helpers ---
class ProducingListener : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? {
return if (event is TriggerEvent) DerivedEvent().derivedOf(event) else null
}
}
class ContextCapturingListener : EventListener() {
var context: List<Event> = emptyList()
override fun onEvent(event: Event, history: List<Event>): Event? {
this.context = history
override fun onEvent(event: Event, context: List<Event>): Event? {
this.context = context
return null
}
}

View File

@ -39,7 +39,7 @@ class ZDSTest {
fun scenario1() {
EventTypeRegistry.register(EchoEvent::class.java)
val echo = EchoEvent("hello").newReferenceId()
val echo = EchoEvent("hello")
val persisted = echo.toPersisted(id = 1L)
val restored = persisted!!.toEvent()

View File

@ -1,23 +1,20 @@
package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.awaitAll
import no.iktdev.eventi.EventDispatcherTest
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
@ -37,7 +34,9 @@ Så skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
""")
class EventPollerImplementationTest : TestBase() {
private val dispatcher = EventDispatcher(eventStore)
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
@BeforeEach
fun setup() {
@ -61,16 +60,12 @@ class EventPollerImplementationTest : TestBase() {
skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
""")
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
EventListenerRegistry.registerListener(
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? {
dispatched += event.referenceId
completionMap[event.referenceId]?.complete(Unit)
return null
@ -99,9 +94,6 @@ class EventPollerImplementationTest : TestBase() {
skal backoff øke, og resettes når nye events ankommer
""")
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff
}
@ -130,10 +122,6 @@ class EventPollerImplementationTest : TestBase() {
skal polleren gruppere og dispatch'e alle tre i én batch
""")
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val refId = UUID.randomUUID()
val received = mutableListOf<Event>()
val done = CompletableDeferred<Unit>()
@ -145,7 +133,7 @@ class EventPollerImplementationTest : TestBase() {
EventTypeRegistry.register(listOf(TriggerEvent::class.java))
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? {
received += event
if (received.size == 3) done.complete(Unit)
return null
@ -170,8 +158,8 @@ class EventPollerImplementationTest : TestBase() {
skal polleren ignorere dem
""")
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
init {
@ -179,9 +167,6 @@ class EventPollerImplementationTest : TestBase() {
}
}
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
eventStore.persist(ignored)
testPoller.pollOnce()
@ -196,17 +181,12 @@ class EventPollerImplementationTest : TestBase() {
skal begge events prosesseres, men uten å produsere duplikate derived events
""")
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
val testDispatcher = StandardTestDispatcher(testScheduler)
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
val channel = Channel<Event>(Channel.UNLIMITED)
val handled = mutableListOf<Event>()
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? {
if (event !is EchoEvent) return null
handled += event
channel.trySend(event)
@ -214,14 +194,16 @@ class EventPollerImplementationTest : TestBase() {
}
}
val original = EchoEvent("Hello").newReferenceId()
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
val original = EchoEvent("Hello")
eventStore.persist(original)
poller.pollOnce()
withContext(testDispatcher) {
withTimeout(60_000) {
channel.receive()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) {
repeat(1) { channel.receive() }
}
}
@ -230,9 +212,9 @@ class EventPollerImplementationTest : TestBase() {
poller.pollOnce()
withContext(testDispatcher) {
withTimeout(60_000) {
channel.receive()
withContext(Dispatchers.Default.limitedParallelism(1)) {
withTimeout(Duration.ofMinutes(1).toMillis()) {
repeat(1) { channel.receive() }
}
}

View File

@ -1,7 +1,7 @@
@file:OptIn(ExperimentalCoroutinesApi::class)
package no.iktdev.eventi.events
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import no.iktdev.eventi.InMemoryEventStore
@ -9,6 +9,9 @@ import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.util.UUID
import kotlinx.coroutines.*
import no.iktdev.eventi.MyTime
import no.iktdev.eventi.ZDS.toPersisted
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Metadata
import org.junit.jupiter.api.DisplayName
@ -86,7 +89,7 @@ class RunSimulationTestTest {
}
}
private fun persistEvent(ref: UUID) {
private fun persistEvent(ref: UUID, time: Instant) {
val e = TestEvent().withReference(ref)
store.persist(e.setMetadata(Metadata()))
}
@ -99,8 +102,9 @@ class RunSimulationTestTest {
""")
fun pollerUpdatesLastSeenTimeWhenDispatchHappens() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref)
persistEvent(ref, t)
poller.pollOnce()
advanceUntilIdle()
@ -145,8 +149,9 @@ class RunSimulationTestTest {
""")
fun pollerDoesNotDoubleDispatch() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(ref)
persistEvent(ref, t)
poller.pollOnce()
advanceUntilIdle()
@ -166,9 +171,10 @@ class RunSimulationTestTest {
fun pollerHandlesMultipleReferenceIds() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA)
persistEvent(refB)
persistEvent(refA, t)
persistEvent(refB, t.plusSeconds(1))
poller.pollOnce()
advanceUntilIdle()
@ -185,9 +191,10 @@ class RunSimulationTestTest {
fun pollerHandlesIdenticalTimestamps() = runTest(testDispatcher) {
val refA = UUID.randomUUID()
val refB = UUID.randomUUID()
val t = Instant.parse("2026-01-22T12:00:00Z")
persistEvent(refA)
persistEvent(refB)
persistEvent(refA, t)
persistEvent(refB, t)
poller.pollOnce()
advanceUntilIdle()
@ -234,8 +241,10 @@ class RunSimulationTestTest {
""")
fun pollerProcessesEventsArrivingWhileQueueBusy() = runTest(testDispatcher) {
val ref = UUID.randomUUID()
val t1 = Instant.parse("2026-01-22T12:00:00Z")
val t2 = t1.plusSeconds(5)
persistEvent(ref)
persistEvent(ref, t1)
val controlledQueue = ControlledDispatchQueue(scope)
controlledQueue.busyRefs += ref
@ -252,7 +261,7 @@ class RunSimulationTestTest {
controlledQueue.busyRefs.clear()
// Add new event
persistEvent(ref)
persistEvent(ref, t2)
// Poll #2: should dispatch both events
poller.pollOnce()

View File

@ -51,7 +51,7 @@ class SequenceDispatchQueueTest : TestBase() {
EventListenerRegistry.registerListener(
object : EventListener() {
override fun onEvent(event: Event, history: List<Event>): Event? {
override fun onEvent(event: Event, context: List<Event>): Event? {
dispatched += event.referenceId
Thread.sleep(50) // simuler tung prosessering
return null

View File

@ -1,6 +1,5 @@
package no.iktdev.eventi.events.poller
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.*
import no.iktdev.eventi.InMemoryEventStore
import no.iktdev.eventi.MyTime
@ -24,7 +23,6 @@ import org.assertj.core.api.Assertions.assertThat
import java.time.Duration
@ExperimentalCoroutinesApi
@DisplayName("""
EventPollerImplementation start-loop
Når polleren kjører i en kontrollert test-loop
@ -60,42 +58,6 @@ class PollerStartLoopTest : TestBase() {
store.persistAt(e, time)
}
@Test
@DisplayName("""
Når to events har identisk persistedAt
Hvis polleren kjører
skal begge events prosesseres og ingen mistes
""")
fun `poller handles same-timestamp events without losing any`() = runTest {
val ref = UUID.randomUUID()
val ts = Instant.parse("2025-01-01T12:00:00Z")
// Two events with same timestamp but different IDs
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
store.persistAt(e1, ts) // id=1
store.persistAt(e2, ts) // id=2
poller.startFor(iterations = 1)
// Verify dispatch happened
assertThat(dispatcher.dispatched).hasSize(1)
val (_, events) = dispatcher.dispatched.single()
// Both events must be present
assertThat(events.map { it.eventId })
.hasSize(2)
.doesNotHaveDuplicates()
// Watermark must reflect highest ID
val wm = poller.watermarkFor(ref)
assertThat(wm!!.first).isEqualTo(ts)
assertThat(wm.second).isEqualTo(2)
}
@Test
@DisplayName("""
Når polleren kjører flere iterasjoner uten events
@ -138,6 +100,7 @@ class PollerStartLoopTest : TestBase() {
""")
fun `poller resets backoff when events appear`() = runTest {
poller.startFor(iterations = 5)
val before = poller.backoff
val ref = UUID.randomUUID()
persistAt(ref, MyTime.utcNow())
@ -307,15 +270,11 @@ class PollerStartLoopTest : TestBase() {
poller.startFor(iterations = 1)
// A skal IKKE ha flyttet watermark
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
// B skal ha flyttet watermark (på timestamp-nivå)
val wmB2 = poller.watermarkFor(refB)
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
// B skal ha flyttet watermark
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
}
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
@ -445,7 +404,7 @@ class PollerStartLoopTest : TestBase() {
// Fake EventStore som alltid returnerer samme event
val fakeStore = object : EventStore {
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
override fun getPersistedEventsAfter(ts: Instant): List<PersistedEvent> {
// Alltid returner én event som ligger før watermark
return listOf(
PersistedEvent(
@ -459,7 +418,7 @@ class PollerStartLoopTest : TestBase() {
)
}
override fun getPersistedEventsFor(referenceId: UUID): List<PersistedEvent> = emptyList()
override fun getPersistedEventsFor(ref: UUID): List<PersistedEvent> = emptyList()
override fun persist(event: Event) = Unit
}
@ -473,7 +432,7 @@ class PollerStartLoopTest : TestBase() {
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
// men i denne testen må vi simulere det)
poller.setWatermarkFor(ref, t(100), id = 999)
poller.setWatermarkFor(ref, t(100))
// Sett lastSeenTime bak eventen
poller.lastSeenTime = t(0)

View File

@ -1,15 +1,14 @@
package no.iktdev.eventi.events.poller
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import no.iktdev.eventi.events.EventDispatcher
import no.iktdev.eventi.events.EventPollerImplementation
import no.iktdev.eventi.events.SequenceDispatchQueue
import no.iktdev.eventi.stores.EventStore
import java.time.Instant
import java.util.*
import java.util.UUID
@ExperimentalCoroutinesApi
class TestablePoller(
eventStore: EventStore,
dispatchQueue: SequenceDispatchQueue,
@ -17,6 +16,8 @@ class TestablePoller(
val scope: TestScope
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
suspend fun startFor(iterations: Int) {
repeat(iterations) {
try {
@ -30,17 +31,19 @@ class TestablePoller(
}
}
override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
return refWatermark[ref]
override fun watermarkFor(ref: UUID): Instant? {
return refWatermark[ref]?.let {
return it
}
}
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
refWatermark[ref] = time to id
override fun setWatermarkFor(ref: UUID, time: Instant) {
refWatermark[ref] = time
}
}
interface WatermarkDebugView {
fun watermarkFor(ref: UUID): Pair<Instant, Long>?
fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
fun watermarkFor(ref: UUID): Instant?
fun setWatermarkFor(ref: UUID, time: Instant)
}

View File

@ -5,7 +5,6 @@ import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.*
@ -20,15 +19,6 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String {
TODO("Not yet implemented")
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean {
TODO("Not yet implemented")
}
@ -42,15 +32,6 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String {
TODO("Not yet implemented")
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean {
TODO("Not yet implemented")
}
@ -64,15 +45,6 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String {
TODO("Not yet implemented")
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean {
TODO("Not yet implemented")
}
@ -85,15 +57,6 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String {
TODO("Not yet implemented")
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean {
TODO("Not yet implemented")
}

View File

@ -2,13 +2,13 @@ package no.iktdev.eventi.tasks
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
@ -23,29 +23,32 @@ Så skal state, heartbeat og cleanup fungere korrekt
""")
class TaskListenerTest {
// -------------------------
// Fake Task + Reporter
// -------------------------
class FakeTask : Task()
class FakeReporter : TaskReporter {
var claimed = false
var completed = false
var failed = false
var cancelled = false
val logs = mutableListOf<String>()
val events = mutableListOf<Event>()
var consumed = false
var logs = mutableListOf<String>()
var events = mutableListOf<Event>()
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true }
override fun markCompleted(taskId: UUID) { consumed = true }
override fun markFailed(taskId: UUID) { consumed = true }
override fun markCancelled(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun publishEvent(event: Event) { events.add(event) }
override fun updateLastSeen(taskId: UUID) {}
override fun log(taskId: UUID, message: String) { logs.add(message) }
}
// ---------------------------------------------------------
// 1 — Heartbeat starter og stopper riktig
// ---------------------------------------------------------
// -------------------------
// Tests
// -------------------------
@Test
@DisplayName("""
Når onTask starter heartbeat-runner
@ -55,15 +58,13 @@ class TaskListenerTest {
fun heartbeatStartsAndStopsCorrectly() = runTest {
val listener = object : TaskListener() {
var heartbeatRan = false
var heartbeatStarted: Job? = null
var heartbeatRan: Boolean = false
private set
var onTaskCalled = false
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? {
@ -71,18 +72,25 @@ class TaskListenerTest {
withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true
}
}.also { heartbeatStarted = it }
// Gi heartbeat en sjanse til å kjøre
yield()
return object : Event() {}
}
}
val reporter = FakeReporter()
listener.accept(FakeTask(), reporter)
val task = FakeTask()
listener.currentJob?.join()
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
listener.currentJob!!.join()
assertNotNull(listener.heartbeatStarted)
assertFalse(listener.heartbeatStarted!!.isActive)
assertTrue(listener.heartbeatRan)
assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob)
@ -90,9 +98,6 @@ class TaskListenerTest {
assertNull(listener.reporter)
}
// ---------------------------------------------------------
// 2 — Heartbeat blokkerer ikke annen jobb
// ---------------------------------------------------------
@Test
@DisplayName("""
Når heartbeat kjører i bakgrunnen
@ -105,51 +110,62 @@ class TaskListenerTest {
val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event {
override suspend fun onTask(task: Task): Event? {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true
}
}.also { heartbeatStarted = it }
// Simuler annen coroutine-oppgave (VideoTaskListener/Converter)
launch {
delay(30)
otherWorkCompleted.complete(Unit)
}
// ⭐ Ikke fullfør onTask før testen sier det
allowFinish.await()
return object : Event() {}
}
}
val reporter = FakeReporter()
listener.accept(FakeTask(), reporter)
val task = FakeTask()
listener.accept(task, reporter)
// Vent på annen jobb
otherWorkCompleted.await()
assertTrue(listener.heartbeatRan)
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive)
allowFinish.complete(Unit)
listener.currentJob?.join()
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit)
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob)
assertNull(listener.currentTask)
}
// ---------------------------------------------------------
// 3 — Heartbeat + CPU + IO arbeid
// ---------------------------------------------------------
@Test
@DisplayName("""
Når heartbeat kjører og flere parallelle jobber startes
@ -163,56 +179,73 @@ class TaskListenerTest {
val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true
}
}.also { heartbeatStarted = it }
// Simuler Converter (CPU)
launch(Dispatchers.Default) {
repeat(1000) {}
repeat(1000) { /* CPU work */ }
converterDone.complete(Unit)
}
// Simuler VideoTaskListener (IO)
launch(Dispatchers.IO) {
delay(40)
videoDone.complete(Unit)
}
// ⭐ Vent til testen sier "nå kan du fullføre"
allowFinish.await()
return object : Event() {}
}
}
val reporter = FakeReporter()
listener.accept(FakeTask(), reporter)
val task = FakeTask()
listener.accept(task, reporter)
// Vent på begge "andre" oppgaver
converterDone.await()
videoDone.await()
assertTrue(listener.heartbeatRan)
// ⭐ Verifiser at begge faktisk ble fullført
assertTrue(converterDone.isCompleted)
assertTrue(videoDone.isCompleted)
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob)
assertNull(listener.currentTask)
}
// ---------------------------------------------------------
// 4 — Arbeid fullføres, heartbeat kjører
// ---------------------------------------------------------
@Test
@DisplayName("""
Når onTask gjør ferdig arbeid
@ -224,25 +257,24 @@ class TaskListenerTest {
val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false
var onTaskCalled = false
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event {
override suspend fun onTask(task: Task): Event? {
onTaskCalled = true
withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true
}
}.also { heartbeatStarted = it }
// Simuler arbeid
delay(20)
// ⭐ signaliser at arbeidet er ferdig
workCompleted.complete(Unit)
return object : Event() {}
@ -250,23 +282,34 @@ class TaskListenerTest {
}
val reporter = FakeReporter()
listener.accept(FakeTask(), reporter)
val task = FakeTask()
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
// ⭐ Verifiser at arbeidet faktisk ble fullført
workCompleted.await()
listener.currentJob?.join()
// Vent på jobben
listener.currentJob!!.join()
// onTask ble kalt
assertTrue(listener.onTaskCalled)
// Heartbeat ble startet
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.reporter)
}
// ---------------------------------------------------------
// 5 — accept() returnerer false når busy
// ---------------------------------------------------------
@Test
@DisplayName("""
Når listener er opptatt med en task
@ -278,34 +321,36 @@ class TaskListenerTest {
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? {
// Hold jobben i live
allowFinish.await()
return object : Event() {}
}
}
val reporter = FakeReporter()
val task1 = FakeTask()
val task2 = FakeTask()
assertTrue(listener.accept(FakeTask(), reporter))
assertFalse(listener.accept(FakeTask(), reporter))
// Første task aksepteres
val accepted1 = listener.accept(task1, reporter)
assertTrue(accepted1)
// Listener er busy → andre task skal avvises
val accepted2 = listener.accept(task2, reporter)
assertFalse(accepted2)
// Fullfør første task
allowFinish.complete(Unit)
listener.currentJob?.join()
listener.currentJob!!.join()
// Cleanup
assertNull(listener.currentJob)
assertNull(listener.currentTask)
}
// ---------------------------------------------------------
// 6 — accept() returnerer false når unsupported
// ---------------------------------------------------------
@Test
@DisplayName("""
Når supports() returnerer false
@ -315,26 +360,21 @@ class TaskListenerTest {
fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = false
override suspend fun onTask(task: Task): Event? = error("Should not be called")
}
val reporter = FakeReporter()
val task = FakeTask()
assertFalse(listener.accept(FakeTask(), reporter))
val accepted = listener.accept(task, reporter)
assertFalse(accepted)
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.reporter)
}
// ---------------------------------------------------------
// 7 — onError kalles når onTask kaster
// ---------------------------------------------------------
@Test
@DisplayName("""
Når onTask kaster en exception
@ -346,11 +386,6 @@ class TaskListenerTest {
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? {
@ -364,19 +399,22 @@ class TaskListenerTest {
}
val reporter = FakeReporter()
listener.accept(FakeTask().newReferenceId(), reporter)
val task = FakeTask()
listener.accept(task, reporter)
// Vent på error-path
errorLogged.await()
// ⭐ Vent på at cleanup i finally kjører
listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner)
}
// ---------------------------------------------------------
// 8 — onCancelled kalles når jobben kanselleres
// ---------------------------------------------------------
@Test
@DisplayName("""
Når jobben kanselleres mens onTask kjører
@ -389,16 +427,11 @@ class TaskListenerTest {
val listener = object : TaskListener() {
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? {
allowStart.complete(Unit)
delay(Long.MAX_VALUE)
delay(Long.MAX_VALUE) // hold jobben i live
return null
}
@ -409,22 +442,28 @@ class TaskListenerTest {
}
val reporter = FakeReporter()
listener.accept(FakeTask().newReferenceId(), reporter)
val task = FakeTask()
listener.accept(task, reporter)
// Vent til onTask har startet
allowStart.await()
// Kanseller jobben
listener.currentJob!!.cancel()
// Vent til onCancelled() ble kalt
cancelledCalled.await()
// ⭐ Vent til cleanup i finally har kjørt
listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner)
}
// ---------------------------------------------------------
// 9 — Sekvensiell kjøring uten statelekkasje
// ---------------------------------------------------------
@Test
@DisplayName("""
Når listener prosesserer to tasks sekvensielt
@ -432,10 +471,7 @@ class TaskListenerTest {
skal ingen state lekke mellom tasks
""")
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
val started1 = CompletableDeferred<Unit>()
val finish1 = CompletableDeferred<Unit>()
val started2 = CompletableDeferred<Unit>()
val finish2 = CompletableDeferred<Unit>()
val listener = object : TaskListener() {
@ -443,50 +479,41 @@ class TaskListenerTest {
var callCount = 0
override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task, status: TaskStatus, exception: Exception?
) = object : Event() {}
override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event {
override suspend fun onTask(task: Task): Event? {
callCount++
if (callCount == 1) {
started1.complete(Unit)
finish1.await()
}
if (callCount == 2) {
started2.complete(Unit)
finish2.await()
}
if (callCount == 1) finish1.await()
if (callCount == 2) finish2.await()
return object : Event() {}
}
}
val reporter = FakeReporter()
listener.accept(FakeTask(), reporter)
started1.await()
// Task 1
val task1 = FakeTask()
listener.accept(task1, reporter)
finish1.complete(Unit)
listener.currentJob?.join()
listener.currentJob!!.join()
// Verifiser cleanup
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner)
listener.accept(FakeTask(), reporter)
started2.await()
// Task 2
val task2 = FakeTask()
listener.accept(task2, reporter)
finish2.complete(Unit)
listener.currentJob?.join()
listener.currentJob!!.join()
// Verifiser cleanup igjen
assertNull(listener.currentJob)
assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner)
// onTask ble kalt to ganger
assertEquals(2, listener.callCount)
}
}

View File

@ -10,7 +10,6 @@ import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore
import no.iktdev.eventi.testUtil.multiply
import no.iktdev.eventi.testUtil.wipe
@ -46,8 +45,8 @@ class TaskPollerImplementationTest : TestBase() {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markFailed(referenceId: UUID,taskId: UUID) {}
override fun markCancelled(referenceId: UUID,taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {
@ -73,14 +72,6 @@ class TaskPollerImplementationTest : TestBase() {
fun getJob() = currentJob
override fun getWorkerId() = this.javaClass.simpleName
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = task is EchoTask
override suspend fun onTask(task: Task): Event {
@ -114,7 +105,7 @@ class TaskPollerImplementationTest : TestBase() {
val listener = EchoListener()
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() })
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
taskStore.persist(task)
poller.pollOnce()

View File

@ -13,7 +13,6 @@ fun EventListenerRegistry.wipe() {
// Tøm mapen
val mutableList = field.get(EventListenerRegistry) as MutableList<*>
@Suppress("UNCHECKED_CAST")
(mutableList as MutableList<Class<out EventListener>>).clear()
// Verifiser at det er tomt

View File

@ -13,7 +13,6 @@ fun EventTypeRegistry.wipe() {
// Tøm mapen
val typesMap = field.get(EventTypeRegistry) as MutableMap<*, *>
@Suppress("UNCHECKED_CAST")
(typesMap as MutableMap<String, Class<out Event>>).clear()
// Verifiser at det er tomt

View File

@ -1,5 +1,7 @@
package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskListenerRegistry
import org.assertj.core.api.Assertions.assertThat
@ -13,7 +15,6 @@ fun TaskListenerRegistry.wipe() {
// Tøm mapen
val mutableList = field.get(TaskListenerRegistry) as MutableList<*>
@Suppress("UNCHECKED_CAST")
(mutableList as MutableList<Class<out TaskListener>>).clear()
// Verifiser at det er tomt

View File

@ -1,11 +1,12 @@
package no.iktdev.eventi.testUtil
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.tasks.TaskTypeRegistry
import org.junit.jupiter.api.Assertions.assertNull
import java.lang.reflect.Field
@Suppress("UNUSED_RECEIVER_PARAMETER")
fun TaskTypeRegistry.wipe() {
val field: Field = TaskTypeRegistry::class.java
.superclass
@ -14,7 +15,6 @@ fun TaskTypeRegistry.wipe() {
// Tøm mapen
val typesMap = field.get(TaskTypeRegistry) as MutableMap<*, *>
@Suppress("UNCHECKED_CAST")
(typesMap as MutableMap<String, Class<out Task>>).clear()
// Verifiser at det er tomt

View File

@ -1,14 +0,0 @@
package no.iktdev.eventi.testUtil
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import no.iktdev.eventi.events.SequenceDispatchQueue
class TestSequenceDispatchQueue(
maxConcurrency: Int,
dispatcher: CoroutineDispatcher
) : SequenceDispatchQueue(
maxConcurrency,
CoroutineScope(dispatcher + SupervisorJob())
)