Compare commits
No commits in common. "master" and "v1.0-rc37" have entirely different histories.
29
.github/workflows/test.yml
vendored
29
.github/workflows/test.yml
vendored
@ -1,29 +0,0 @@
|
|||||||
name: Run Unit Tests
|
|
||||||
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- "**"
|
|
||||||
|
|
||||||
jobs:
|
|
||||||
test:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
|
|
||||||
steps:
|
|
||||||
- name: Checkout repository
|
|
||||||
uses: actions/checkout@v4
|
|
||||||
|
|
||||||
- name: Set up JDK 21
|
|
||||||
uses: actions/setup-java@v4
|
|
||||||
with:
|
|
||||||
java-version: '21'
|
|
||||||
distribution: 'zulu'
|
|
||||||
|
|
||||||
- name: Setup Gradle
|
|
||||||
uses: gradle/gradle-build-action@v3
|
|
||||||
|
|
||||||
- name: Make gradlew executable
|
|
||||||
run: chmod +x ./gradlew
|
|
||||||
|
|
||||||
- name: Run unit tests
|
|
||||||
run: ./gradlew test --stacktrace
|
|
||||||
@ -2,7 +2,6 @@ package no.iktdev.eventi.events
|
|||||||
|
|
||||||
import no.iktdev.eventi.models.DeleteEvent
|
import no.iktdev.eventi.models.DeleteEvent
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.SignalEvent
|
|
||||||
import no.iktdev.eventi.stores.EventStore
|
import no.iktdev.eventi.stores.EventStore
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
@ -12,7 +11,6 @@ open class EventDispatcher(val eventStore: EventStore) {
|
|||||||
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
|
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
|
||||||
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
|
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
|
||||||
val candidates = events
|
val candidates = events
|
||||||
.filterNot { it is SignalEvent }
|
|
||||||
.filter { it.eventId !in derivedFromIds }
|
.filter { it.eventId !in derivedFromIds }
|
||||||
.filter { it.eventId !in deletedEventIds }
|
.filter { it.eventId !in deletedEventIds }
|
||||||
|
|
||||||
|
|||||||
@ -8,30 +8,23 @@ import no.iktdev.eventi.stores.EventStore
|
|||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
import java.time.Instant
|
import java.time.Instant
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
import kotlin.collections.iterator
|
||||||
|
|
||||||
abstract class EventPollerImplementation(
|
abstract class EventPollerImplementation(
|
||||||
private val eventStore: EventStore,
|
private val eventStore: EventStore,
|
||||||
private val dispatchQueue: SequenceDispatchQueue,
|
private val dispatchQueue: SequenceDispatchQueue,
|
||||||
private val dispatcher: EventDispatcher
|
private val dispatcher: EventDispatcher
|
||||||
) {
|
) {
|
||||||
private val log = KotlinLogging.logger {}
|
// Erstatter ikke lastSeenTime, men supplerer den
|
||||||
|
protected val refWatermark = mutableMapOf<UUID, Instant>()
|
||||||
|
|
||||||
/**
|
// lastSeenTime brukes kun som scan hint
|
||||||
* Per-reference watermark:
|
|
||||||
* - first = last seen persistedAt
|
|
||||||
* - second = last seen persistedId
|
|
||||||
*/
|
|
||||||
protected val refWatermark = mutableMapOf<UUID, Pair<Instant, Long>>()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Global scan hint (timestamp only).
|
|
||||||
* Used to avoid scanning entire table every time.
|
|
||||||
*/
|
|
||||||
var lastSeenTime: Instant = Instant.EPOCH
|
var lastSeenTime: Instant = Instant.EPOCH
|
||||||
|
|
||||||
open var backoff = Duration.ofSeconds(2)
|
open var backoff = Duration.ofSeconds(2)
|
||||||
protected set
|
protected set
|
||||||
private val maxBackoff = Duration.ofMinutes(1)
|
private val maxBackoff = Duration.ofMinutes(1)
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
open suspend fun start() {
|
open suspend fun start() {
|
||||||
log.info { "EventPoller starting with initial backoff=$backoff" }
|
log.info { "EventPoller starting with initial backoff=$backoff" }
|
||||||
@ -39,7 +32,7 @@ abstract class EventPollerImplementation(
|
|||||||
try {
|
try {
|
||||||
pollOnce()
|
pollOnce()
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
log.error(e) { "Error in poller loop" }
|
e.printStackTrace()
|
||||||
delay(backoff.toMillis())
|
delay(backoff.toMillis())
|
||||||
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
||||||
}
|
}
|
||||||
@ -50,11 +43,11 @@ abstract class EventPollerImplementation(
|
|||||||
val pollStartedAt = MyTime.utcNow()
|
val pollStartedAt = MyTime.utcNow()
|
||||||
log.debug { "🔍 Polling for new events" }
|
log.debug { "🔍 Polling for new events" }
|
||||||
|
|
||||||
// Determine global scan start
|
// Global scan hint: kombiner refWatermark og lastSeenTime
|
||||||
val minRefTs = refWatermark.values.minOfOrNull { it.first }
|
val watermarkMin = refWatermark.values.minOrNull()
|
||||||
val scanFrom = when (minRefTs) {
|
val scanFrom = when (watermarkMin) {
|
||||||
null -> lastSeenTime
|
null -> lastSeenTime
|
||||||
else -> maxOf(lastSeenTime, minRefTs)
|
else -> maxOf(lastSeenTime, watermarkMin)
|
||||||
}
|
}
|
||||||
|
|
||||||
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
val newPersisted = eventStore.getPersistedEventsAfter(scanFrom)
|
||||||
@ -66,73 +59,76 @@ abstract class EventPollerImplementation(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset backoff
|
// Vi har sett nye events globalt – reset backoff
|
||||||
backoff = Duration.ofSeconds(2)
|
backoff = Duration.ofSeconds(2)
|
||||||
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
|
log.debug { "📬 Found ${newPersisted.size} new events after $scanFrom" }
|
||||||
|
|
||||||
val grouped = newPersisted.groupBy { it.referenceId }
|
val grouped = newPersisted.groupBy { it.referenceId }
|
||||||
var anyProcessed = false
|
var anyProcessed = false
|
||||||
|
|
||||||
// Track highest persistedAt seen globally this round
|
// Track høyeste persistedAt vi har sett i denne runden
|
||||||
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
|
val maxPersistedThisRound = newPersisted.maxOf { it.persistedAt }
|
||||||
|
|
||||||
for ((ref, eventsForRef) in grouped) {
|
for ((ref, eventsForRef) in grouped) {
|
||||||
val (refSeenAt, refSeenId) = refWatermark[ref] ?: (Instant.EPOCH to 0L)
|
val refSeen = refWatermark[ref] ?: Instant.EPOCH
|
||||||
|
|
||||||
// Filter new events using (timestamp, id) ordering
|
|
||||||
val newForRef = eventsForRef.filter { ev ->
|
|
||||||
ev.persistedAt > refSeenAt ||
|
|
||||||
(ev.persistedAt == refSeenAt && ev.id > refSeenId)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Finn kun nye events for denne ref’en
|
||||||
|
val newForRef = eventsForRef.filter { it.persistedAt > refSeen }
|
||||||
if (newForRef.isEmpty()) {
|
if (newForRef.isEmpty()) {
|
||||||
log.debug { "🧊 No new events for $ref since ($refSeenAt, id=$refSeenId)" }
|
log.debug { "🧊 No new events for $ref since $refSeen" }
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// If ref is busy, skip dispatch
|
// Hvis ref er busy → ikke oppdater watermark, ikke dispatch
|
||||||
if (dispatchQueue.isProcessing(ref)) {
|
if (dispatchQueue.isProcessing(ref)) {
|
||||||
log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" }
|
log.debug { "⏳ $ref is busy — deferring ${newForRef.size} events" }
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetch full sequence for dispatch
|
// Hent full sekvens for ref (Eventi-invariant)
|
||||||
val fullLog = eventStore.getPersistedEventsFor(ref)
|
val fullLog = eventStore.getPersistedEventsFor(ref)
|
||||||
val events = fullLog.mapNotNull { it.toEvent() }
|
val events = fullLog.mapNotNull { it.toEvent() }
|
||||||
|
|
||||||
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
|
log.debug { "🚀 Dispatching ${events.size} events for $ref" }
|
||||||
dispatchQueue.dispatch(ref, events, dispatcher)
|
dispatchQueue.dispatch(ref, events, dispatcher)
|
||||||
|
|
||||||
// Update watermark for this reference
|
// Oppdater watermark for denne ref’en
|
||||||
val maxEvent = newForRef.maxWith(
|
val maxPersistedAtForRef = newForRef.maxOf { it.persistedAt }
|
||||||
compareBy({ it.persistedAt }, { it.id })
|
val newWatermark = minOf(pollStartedAt, maxPersistedAtForRef).plusNanos(1)
|
||||||
)
|
|
||||||
|
|
||||||
val newWatermarkAt = minOf(pollStartedAt, maxEvent.persistedAt)
|
refWatermark[ref] = newWatermark
|
||||||
val newWatermarkId = maxEvent.id
|
|
||||||
|
|
||||||
refWatermark[ref] = newWatermarkAt to newWatermarkId
|
|
||||||
anyProcessed = true
|
anyProcessed = true
|
||||||
|
|
||||||
log.debug { "⏩ Updated watermark for $ref → ($newWatermarkAt, id=$newWatermarkId)" }
|
log.debug { "⏩ Updated watermark for $ref → $newWatermark" }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update global scan hint
|
// Oppdater global scan hint uansett – vi har sett nye events
|
||||||
|
// Dette hindrer livelock når alle events er <= watermark for sine refs
|
||||||
val newLastSeen = maxOf(
|
val newLastSeen = maxOf(
|
||||||
lastSeenTime,
|
lastSeenTime,
|
||||||
maxPersistedThisRound.plusNanos(1)
|
maxPersistedThisRound.plusNanos(1)
|
||||||
)
|
)
|
||||||
|
|
||||||
if (anyProcessed) {
|
if (anyProcessed) {
|
||||||
val minRef = refWatermark.values.minOfOrNull { it.first }
|
// Behold intensjonen din: globalt hint basert på laveste watermark,
|
||||||
lastSeenTime = when (minRef) {
|
// men aldri gå bakover i tid ift lastSeenTime
|
||||||
|
val minRefWatermark = refWatermark.values.minOrNull()
|
||||||
|
lastSeenTime = when (minRefWatermark) {
|
||||||
null -> newLastSeen
|
null -> newLastSeen
|
||||||
else -> maxOf(newLastSeen, minRef)
|
else -> maxOf(newLastSeen, minRefWatermark)
|
||||||
}
|
}
|
||||||
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
|
log.debug { "📉 Global scanFrom updated → $lastSeenTime (anyProcessed=true)" }
|
||||||
} else {
|
} else {
|
||||||
|
// Ingen refs prosessert, men vi vet at alle events vi så er <= watermark
|
||||||
|
// → trygt å flytte lastSeenTime forbi dem
|
||||||
lastSeenTime = newLastSeen
|
lastSeenTime = newLastSeen
|
||||||
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
|
log.debug { "🔁 No refs processed — advancing global scanFrom to $lastSeenTime" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,7 +4,7 @@ import java.util.UUID
|
|||||||
|
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
abstract class Event {
|
abstract class Event {
|
||||||
lateinit var referenceId: UUID
|
var referenceId: UUID = UUID.randomUUID()
|
||||||
protected set
|
protected set
|
||||||
var eventId: UUID = UUID.randomUUID()
|
var eventId: UUID = UUID.randomUUID()
|
||||||
private set
|
private set
|
||||||
@ -43,6 +43,6 @@ abstract class DeleteEvent(
|
|||||||
open val deletedEventId: UUID
|
open val deletedEventId: UUID
|
||||||
) : Event()
|
) : Event()
|
||||||
|
|
||||||
abstract class SignalEvent(): Event()
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -9,7 +9,6 @@ import kotlinx.coroutines.isActive
|
|||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
|
||||||
import org.jetbrains.annotations.VisibleForTesting
|
import org.jetbrains.annotations.VisibleForTesting
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
import kotlin.coroutines.cancellation.CancellationException
|
import kotlin.coroutines.cancellation.CancellationException
|
||||||
@ -68,7 +67,9 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
currentJob = getDispatcherForTask(task).launch {
|
currentJob = getDispatcherForTask(task).launch {
|
||||||
try {
|
try {
|
||||||
val result = onTask(task)
|
val result = onTask(task)
|
||||||
|
reporter.markCompleted(task.taskId)
|
||||||
onComplete(task, result)
|
onComplete(task, result)
|
||||||
|
|
||||||
} catch (e: CancellationException) {
|
} catch (e: CancellationException) {
|
||||||
// Dette er en ekte kansellering
|
// Dette er en ekte kansellering
|
||||||
onCancelled(task)
|
onCancelled(task)
|
||||||
@ -87,16 +88,14 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
this@TaskListener.reporter = null
|
this@TaskListener.reporter = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event
|
|
||||||
|
|
||||||
override fun onError(task: Task, exception: Exception) {
|
override fun onError(task: Task, exception: Exception) {
|
||||||
reporter?.log(task.taskId, "Error processing task: ${exception.message}")
|
reporter?.log(task.taskId, "Error processing task: ${exception.message}")
|
||||||
exception.printStackTrace()
|
exception.printStackTrace()
|
||||||
reporter?.markFailed(task.referenceId, task.taskId)
|
reporter?.markFailed(task.referenceId, task.taskId)
|
||||||
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onComplete(task: Task, result: Event?) {
|
override fun onComplete(task: Task, result: Event?) {
|
||||||
@ -112,7 +111,6 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
|||||||
currentJob?.cancel()
|
currentJob?.cancel()
|
||||||
heartbeatRunner?.cancel()
|
heartbeatRunner?.cancel()
|
||||||
currentTask = null
|
currentTask = null
|
||||||
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import no.iktdev.eventi.events.EventListenerRegistry
|
|||||||
import no.iktdev.eventi.events.EventTypeRegistry
|
import no.iktdev.eventi.events.EventTypeRegistry
|
||||||
import no.iktdev.eventi.models.DeleteEvent
|
import no.iktdev.eventi.models.DeleteEvent
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.SignalEvent
|
|
||||||
import no.iktdev.eventi.testUtil.wipe
|
import no.iktdev.eventi.testUtil.wipe
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
@ -20,14 +19,12 @@ import org.junit.jupiter.api.DisplayName
|
|||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
import java.util.UUID
|
import java.util.UUID
|
||||||
|
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
EventDispatcher
|
EventDispatcher
|
||||||
Når hendelser dispatches til lyttere
|
Når hendelser dispatches til lyttere
|
||||||
Hvis hendelsene inneholder avledede, slettede eller nye events
|
Hvis hendelsene inneholder avledede, slettede eller nye events
|
||||||
Så skal dispatcheren håndtere filtrering, replays og historikk korrekt
|
Så skal dispatcheren håndtere filtrering, replays og historikk korrekt
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
class EventDispatcherTest : TestBase() {
|
class EventDispatcherTest : TestBase() {
|
||||||
|
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
@ -54,17 +51,15 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en TriggerEvent dispatches
|
Når en TriggerEvent dispatches
|
||||||
Hvis en lytter produserer én DerivedEvent
|
Hvis en lytter produserer én DerivedEvent
|
||||||
Så skal kun én ny event produseres og prosessen stoppe
|
Så skal kun én ny event produseres og prosessen stoppe
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldProduceOneEventAndStop() {
|
fun shouldProduceOneEventAndStop() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent().newReferenceId()
|
val trigger = TriggerEvent()
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
|
|
||||||
val produced = eventStore.all().firstOrNull()
|
val produced = eventStore.all().firstOrNull()
|
||||||
@ -77,17 +72,15 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en event allerede har avledet en DerivedEvent
|
Når en event allerede har avledet en DerivedEvent
|
||||||
Hvis dispatcheren replays historikken
|
Hvis dispatcheren replays historikken
|
||||||
Så skal ikke DerivedEvent produseres på nytt
|
Så skal ikke DerivedEvent produseres på nytt
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldSkipAlreadyDerivedEvents() {
|
fun shouldSkipAlreadyDerivedEvents() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent().newReferenceId()
|
val trigger = TriggerEvent()
|
||||||
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
|
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.utcNow())
|
||||||
|
|
||||||
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
||||||
@ -98,37 +91,31 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når flere events dispatches
|
Når flere events dispatches
|
||||||
Hvis en lytter mottar en event
|
Hvis en lytter mottar en event
|
||||||
Så skal hele historikken leveres i context
|
Så skal hele historikken leveres i context
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldPassFullContextToListener() {
|
fun shouldPassFullContextToListener() {
|
||||||
val listener = ContextCapturingListener()
|
val listener = ContextCapturingListener()
|
||||||
|
|
||||||
val e1 = TriggerEvent().newReferenceId()
|
val e1 = TriggerEvent()
|
||||||
val e2 = OtherEvent().newReferenceId()
|
val e2 = OtherEvent()
|
||||||
dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
|
dispatcher.dispatch(e1.referenceId, listOf(e1, e2))
|
||||||
|
|
||||||
assertEquals(2, listener.context.size)
|
assertEquals(2, listener.context.size)
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en replay skjer
|
Når en replay skjer
|
||||||
Hvis en event allerede har produsert en DerivedEvent
|
Hvis en event allerede har produsert en DerivedEvent
|
||||||
Så skal ikke DerivedEvent produseres på nytt
|
Så skal ikke DerivedEvent produseres på nytt
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldBehaveDeterministicallyAcrossReplays() {
|
fun shouldBehaveDeterministicallyAcrossReplays() {
|
||||||
val referenceId = UUID.randomUUID()
|
|
||||||
|
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent().usingReferenceId(referenceId)
|
val trigger = TriggerEvent()
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
|
val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() }
|
||||||
|
|
||||||
@ -138,16 +125,12 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en DeleteEvent peker på en tidligere event
|
Når en DeleteEvent peker på en tidligere event
|
||||||
Hvis dispatcheren filtrerer kandidater
|
Hvis dispatcheren filtrerer kandidater
|
||||||
Så skal slettede events ikke leveres som kandidater
|
Så skal slettede events ikke leveres som kandidater
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldNotDeliverDeletedEventsAsCandidates() {
|
fun shouldNotDeliverDeletedEventsAsCandidates() {
|
||||||
val referenceId = UUID.randomUUID()
|
|
||||||
|
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
|
|
||||||
@ -158,10 +141,11 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Original hendelse
|
// Original hendelse
|
||||||
val original = TriggerEvent().usingReferenceId(referenceId)
|
val original = TriggerEvent()
|
||||||
|
|
||||||
// Slettehendelse som peker på original
|
// Slettehendelse som peker på original
|
||||||
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
|
val deleted = object : DeleteEvent(original.eventId) {
|
||||||
|
}
|
||||||
|
|
||||||
// Dispatch med begge hendelser
|
// Dispatch med begge hendelser
|
||||||
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
|
dispatcher.dispatch(original.referenceId, listOf(original, deleted))
|
||||||
@ -178,17 +162,13 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en DeleteEvent dispatches alene
|
Når en DeleteEvent dispatches alene
|
||||||
Hvis en lytter reagerer på DeleteEvent
|
Hvis en lytter reagerer på DeleteEvent
|
||||||
Så skal DeleteEvent leveres som kandidat
|
Så skal DeleteEvent leveres som kandidat
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
|
fun shouldDeliverDeleteEventToListenersThatReactToIt() {
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
val referenceId = UUID.randomUUID()
|
|
||||||
|
|
||||||
object : EventListener() {
|
object : EventListener() {
|
||||||
override fun onEvent(event: Event, history: List<Event>): Event? {
|
override fun onEvent(event: Event, history: List<Event>): Event? {
|
||||||
if (event is DeleteEvent) received += event
|
if (event is DeleteEvent) received += event
|
||||||
@ -196,24 +176,22 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val deleted = object : DeleteEvent(UUID.randomUUID()) {}.apply { usingReferenceId(referenceId) }
|
val deleted = object : DeleteEvent(UUID.randomUUID()) {}
|
||||||
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
|
dispatcher.dispatch(deleted.referenceId, listOf(deleted))
|
||||||
|
|
||||||
assertTrue(received.contains(deleted))
|
assertTrue(received.contains(deleted))
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en event har avledet en ny event
|
Når en event har avledet en ny event
|
||||||
Hvis dispatcheren replays historikken
|
Hvis dispatcheren replays historikken
|
||||||
Så skal ikke original-eventen leveres som kandidat igjen
|
Så skal ikke original-eventen leveres som kandidat igjen
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
|
fun shouldNotRedeliverEventsThatHaveProducedDerivedEvents() {
|
||||||
ProducingListener()
|
ProducingListener()
|
||||||
|
|
||||||
val trigger = TriggerEvent().newReferenceId()
|
val trigger = TriggerEvent()
|
||||||
// Første dispatch: trigger produserer en DerivedEvent
|
// Første dispatch: trigger produserer en DerivedEvent
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
dispatcher.dispatch(trigger.referenceId, listOf(trigger))
|
||||||
|
|
||||||
@ -233,18 +211,16 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en DeleteEvent slettet en tidligere event
|
Når en DeleteEvent slettet en tidligere event
|
||||||
Hvis dispatcheren bygger historikk
|
Hvis dispatcheren bygger historikk
|
||||||
Så skal slettede events ikke være med i history
|
Så skal slettede events ikke være med i history
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun historyShouldExcludeDeletedEvents() {
|
fun historyShouldExcludeDeletedEvents() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
|
||||||
val original = TriggerEvent().newReferenceId()
|
val original = TriggerEvent()
|
||||||
val deleted = object : DeleteEvent(original.eventId) {}.apply { usingReferenceId(original.referenceId) }
|
val deleted = object : DeleteEvent(original.eventId) {}
|
||||||
|
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
|
|
||||||
@ -262,18 +238,16 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en DeleteEvent slettet en event
|
Når en DeleteEvent slettet en event
|
||||||
Hvis andre events fortsatt er gyldige
|
Hvis andre events fortsatt er gyldige
|
||||||
Så skal history kun inneholde de ikke-slettede events
|
Så skal history kun inneholde de ikke-slettede events
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun historyShouldKeepNonDeletedEvents() {
|
fun historyShouldKeepNonDeletedEvents() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
val referenceId = UUID.randomUUID()
|
|
||||||
val e1 = TriggerEvent().usingReferenceId(referenceId)
|
val e1 = TriggerEvent()
|
||||||
val e2 = OtherEvent().usingReferenceId(referenceId)
|
val e2 = OtherEvent()
|
||||||
val deleted = object : DeleteEvent(e1.eventId) {}
|
val deleted = object : DeleteEvent(e1.eventId) {}
|
||||||
|
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
@ -293,18 +267,16 @@ class EventDispatcherTest : TestBase() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName(
|
@DisplayName("""
|
||||||
"""
|
|
||||||
Når en DeleteEvent er kandidat
|
Når en DeleteEvent er kandidat
|
||||||
Hvis historikken kun inneholder slettede events
|
Hvis historikken kun inneholder slettede events
|
||||||
Så skal history være tom
|
Så skal history være tom
|
||||||
"""
|
""")
|
||||||
)
|
|
||||||
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
|
fun deleteEventShouldBeDeliveredButHistoryEmpty() {
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
|
||||||
val original = TriggerEvent().newReferenceId()
|
val original = TriggerEvent()
|
||||||
val deleted = object : DeleteEvent(original.eventId) {}.apply { newReferenceId() }
|
val deleted = object : DeleteEvent(original.eventId) {}
|
||||||
|
|
||||||
var receivedEvent: Event? = null
|
var receivedEvent: Event? = null
|
||||||
var receivedHistory: List<Event> = emptyList()
|
var receivedHistory: List<Event> = emptyList()
|
||||||
@ -323,54 +295,6 @@ class EventDispatcherTest : TestBase() {
|
|||||||
assertTrue(receivedHistory.isEmpty())
|
assertTrue(receivedHistory.isEmpty())
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
@DisplayName(
|
|
||||||
"""
|
|
||||||
Når en SignalEvent dispatches
|
|
||||||
Hvis SignalEvent ikke skal være kandidat
|
|
||||||
Så skal den ikke leveres til lyttere, men fortsatt være i historikken
|
|
||||||
"""
|
|
||||||
)
|
|
||||||
fun shouldNotDeliverSignalEventAsCandidate() {
|
|
||||||
// Arrange
|
|
||||||
class TestSignalEvent : SignalEvent()
|
|
||||||
EventTypeRegistry.register(listOf(TestSignalEvent::class.java,))
|
|
||||||
|
|
||||||
val received = mutableListOf<Event>()
|
|
||||||
var finalHistory: List<Event>? = null
|
|
||||||
object : EventListener() {
|
|
||||||
override fun onEvent(event: Event, history: List<Event>): Event? {
|
|
||||||
received += event
|
|
||||||
finalHistory = history
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
val refId = UUID.randomUUID()
|
|
||||||
val trigger = TriggerEvent().usingReferenceId(refId)
|
|
||||||
val signal = TestSignalEvent().usingReferenceId(refId)
|
|
||||||
|
|
||||||
// Act
|
|
||||||
dispatcher.dispatch(trigger.referenceId, listOf(trigger, signal))
|
|
||||||
|
|
||||||
// Assert
|
|
||||||
// 1) TriggerEvent skal leveres
|
|
||||||
assertTrue(received.any { it is TriggerEvent }) {
|
|
||||||
"TriggerEvent skal leveres som kandidat"
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2) SignalEvent skal IKKE leveres
|
|
||||||
assertFalse(received.any { it is TestSignalEvent }) {
|
|
||||||
"SignalEvent skal ikke leveres som kandidat"
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNotNull(finalHistory)
|
|
||||||
assertTrue(finalHistory!!.any { it is TestSignalEvent }) {
|
|
||||||
"SignalEvent skal være i historikken selv om den ikke er kandidat"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// --- Test helpers ---
|
// --- Test helpers ---
|
||||||
|
|
||||||
class ProducingListener : EventListener() {
|
class ProducingListener : EventListener() {
|
||||||
|
|||||||
@ -39,7 +39,7 @@ class ZDSTest {
|
|||||||
fun scenario1() {
|
fun scenario1() {
|
||||||
EventTypeRegistry.register(EchoEvent::class.java)
|
EventTypeRegistry.register(EchoEvent::class.java)
|
||||||
|
|
||||||
val echo = EchoEvent("hello").newReferenceId()
|
val echo = EchoEvent("hello")
|
||||||
val persisted = echo.toPersisted(id = 1L)
|
val persisted = echo.toPersisted(id = 1L)
|
||||||
|
|
||||||
val restored = persisted!!.toEvent()
|
val restored = persisted!!.toEvent()
|
||||||
|
|||||||
@ -1,23 +1,19 @@
|
|||||||
package no.iktdev.eventi.events
|
package no.iktdev.eventi.events
|
||||||
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
import kotlinx.coroutines.CoroutineDispatcher
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
import kotlinx.coroutines.SupervisorJob
|
import kotlinx.coroutines.awaitAll
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.test.StandardTestDispatcher
|
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.test.runTest
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.coroutines.withTimeout
|
import kotlinx.coroutines.withTimeout
|
||||||
import kotlinx.coroutines.awaitAll
|
|
||||||
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
|
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
|
||||||
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
|
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
|
||||||
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
|
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
|
||||||
import no.iktdev.eventi.MyTime
|
import no.iktdev.eventi.MyTime
|
||||||
import no.iktdev.eventi.TestBase
|
import no.iktdev.eventi.TestBase
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue
|
|
||||||
import no.iktdev.eventi.testUtil.wipe
|
import no.iktdev.eventi.testUtil.wipe
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.Assertions.assertFalse
|
import org.junit.jupiter.api.Assertions.assertFalse
|
||||||
@ -37,7 +33,9 @@ Så skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
|
|||||||
""")
|
""")
|
||||||
class EventPollerImplementationTest : TestBase() {
|
class EventPollerImplementationTest : TestBase() {
|
||||||
|
|
||||||
private val dispatcher = EventDispatcher(eventStore)
|
val dispatcher = EventDispatcher(eventStore)
|
||||||
|
val queue = SequenceDispatchQueue(maxConcurrency = 8)
|
||||||
|
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
fun setup() {
|
fun setup() {
|
||||||
@ -61,10 +59,6 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
|
Så skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
|
||||||
""")
|
""")
|
||||||
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
|
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
|
||||||
val testDispatcher = StandardTestDispatcher(testScheduler)
|
|
||||||
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
|
||||||
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
|
||||||
|
|
||||||
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
|
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
|
||||||
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
|
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
|
||||||
|
|
||||||
@ -99,9 +93,6 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal backoff øke, og resettes når nye events ankommer
|
Så skal backoff øke, og resettes når nye events ankommer
|
||||||
""")
|
""")
|
||||||
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
|
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
|
||||||
val testDispatcher = StandardTestDispatcher(testScheduler)
|
|
||||||
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
|
||||||
|
|
||||||
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
||||||
fun currentBackoff(): Duration = backoff
|
fun currentBackoff(): Duration = backoff
|
||||||
}
|
}
|
||||||
@ -130,10 +121,6 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal polleren gruppere og dispatch'e alle tre i én batch
|
Så skal polleren gruppere og dispatch'e alle tre i én batch
|
||||||
""")
|
""")
|
||||||
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
|
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
|
||||||
val testDispatcher = StandardTestDispatcher(testScheduler)
|
|
||||||
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
|
||||||
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
|
||||||
|
|
||||||
val refId = UUID.randomUUID()
|
val refId = UUID.randomUUID()
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
val done = CompletableDeferred<Unit>()
|
val done = CompletableDeferred<Unit>()
|
||||||
@ -170,8 +157,8 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal polleren ignorere dem
|
Så skal polleren ignorere dem
|
||||||
""")
|
""")
|
||||||
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
|
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
|
||||||
val testDispatcher = StandardTestDispatcher(testScheduler)
|
val refId = UUID.randomUUID()
|
||||||
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
val ignored = TriggerEvent().usingReferenceId(refId)
|
||||||
|
|
||||||
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
||||||
init {
|
init {
|
||||||
@ -179,9 +166,6 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val refId = UUID.randomUUID()
|
|
||||||
val ignored = TriggerEvent().usingReferenceId(refId)
|
|
||||||
|
|
||||||
eventStore.persist(ignored)
|
eventStore.persist(ignored)
|
||||||
testPoller.pollOnce()
|
testPoller.pollOnce()
|
||||||
|
|
||||||
@ -196,12 +180,7 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal begge events prosesseres, men uten å produsere duplikate derived events
|
Så skal begge events prosesseres, men uten å produsere duplikate derived events
|
||||||
""")
|
""")
|
||||||
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
|
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
|
||||||
val testDispatcher = StandardTestDispatcher(testScheduler)
|
|
||||||
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
|
||||||
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
|
||||||
|
|
||||||
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
|
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
|
||||||
|
|
||||||
val channel = Channel<Event>(Channel.UNLIMITED)
|
val channel = Channel<Event>(Channel.UNLIMITED)
|
||||||
val handled = mutableListOf<Event>()
|
val handled = mutableListOf<Event>()
|
||||||
|
|
||||||
@ -214,14 +193,16 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val original = EchoEvent("Hello").newReferenceId()
|
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
||||||
|
|
||||||
|
val original = EchoEvent("Hello")
|
||||||
eventStore.persist(original)
|
eventStore.persist(original)
|
||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|
||||||
withContext(testDispatcher) {
|
withContext(Dispatchers.Default.limitedParallelism(1)) {
|
||||||
withTimeout(60_000) {
|
withTimeout(Duration.ofMinutes(1).toMillis()) {
|
||||||
channel.receive()
|
repeat(1) { channel.receive() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -230,9 +211,9 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|
||||||
withContext(testDispatcher) {
|
withContext(Dispatchers.Default.limitedParallelism(1)) {
|
||||||
withTimeout(60_000) {
|
withTimeout(Duration.ofMinutes(1).toMillis()) {
|
||||||
channel.receive()
|
repeat(1) { channel.receive() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -60,42 +60,6 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
store.persistAt(e, time)
|
store.persistAt(e, time)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Test
|
|
||||||
@DisplayName("""
|
|
||||||
Når to events har identisk persistedAt
|
|
||||||
Hvis polleren kjører
|
|
||||||
Så skal begge events prosesseres og ingen mistes
|
|
||||||
""")
|
|
||||||
fun `poller handles same-timestamp events without losing any`() = runTest {
|
|
||||||
val ref = UUID.randomUUID()
|
|
||||||
val ts = Instant.parse("2025-01-01T12:00:00Z")
|
|
||||||
|
|
||||||
// Two events with same timestamp but different IDs
|
|
||||||
val e1 = TestEvent().withReference(ref).setMetadata(Metadata())
|
|
||||||
val e2 = TestEvent().withReference(ref).setMetadata(Metadata())
|
|
||||||
|
|
||||||
store.persistAt(e1, ts) // id=1
|
|
||||||
store.persistAt(e2, ts) // id=2
|
|
||||||
|
|
||||||
poller.startFor(iterations = 1)
|
|
||||||
|
|
||||||
// Verify dispatch happened
|
|
||||||
assertThat(dispatcher.dispatched).hasSize(1)
|
|
||||||
|
|
||||||
val (_, events) = dispatcher.dispatched.single()
|
|
||||||
|
|
||||||
// Both events must be present
|
|
||||||
assertThat(events.map { it.eventId })
|
|
||||||
.hasSize(2)
|
|
||||||
.doesNotHaveDuplicates()
|
|
||||||
|
|
||||||
// Watermark must reflect highest ID
|
|
||||||
val wm = poller.watermarkFor(ref)
|
|
||||||
assertThat(wm!!.first).isEqualTo(ts)
|
|
||||||
assertThat(wm.second).isEqualTo(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når polleren kjører flere iterasjoner uten events
|
Når polleren kjører flere iterasjoner uten events
|
||||||
@ -307,15 +271,11 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
|
|
||||||
poller.startFor(iterations = 1)
|
poller.startFor(iterations = 1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// A skal IKKE ha flyttet watermark
|
// A skal IKKE ha flyttet watermark
|
||||||
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
|
assertThat(poller.watermarkFor(refA)).isEqualTo(wmA1)
|
||||||
|
|
||||||
// B skal ha flyttet watermark (på timestamp-nivå)
|
// B skal ha flyttet watermark
|
||||||
val wmB2 = poller.watermarkFor(refB)
|
assertThat(poller.watermarkFor(refB)).isGreaterThan(wmB1)
|
||||||
assertThat(wmB2!!.first).isGreaterThan(wmB1!!.first)
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
|
@DisplayName("🍌 Bananastesten™ — stress-test av watermark, busy refs og dispatch-semantikk")
|
||||||
@ -473,7 +433,7 @@ class PollerStartLoopTest : TestBase() {
|
|||||||
|
|
||||||
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
|
// Sett watermark høyt (polleren setter watermark selv i ekte drift,
|
||||||
// men i denne testen må vi simulere det)
|
// men i denne testen må vi simulere det)
|
||||||
poller.setWatermarkFor(ref, t(100), id = 999)
|
poller.setWatermarkFor(ref, t(100))
|
||||||
|
|
||||||
// Sett lastSeenTime bak eventen
|
// Sett lastSeenTime bak eventen
|
||||||
poller.lastSeenTime = t(0)
|
poller.lastSeenTime = t(0)
|
||||||
|
|||||||
@ -17,6 +17,8 @@ class TestablePoller(
|
|||||||
val scope: TestScope
|
val scope: TestScope
|
||||||
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
|
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher), WatermarkDebugView {
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
suspend fun startFor(iterations: Int) {
|
suspend fun startFor(iterations: Int) {
|
||||||
repeat(iterations) {
|
repeat(iterations) {
|
||||||
try {
|
try {
|
||||||
@ -30,17 +32,19 @@ class TestablePoller(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun watermarkFor(ref: UUID): Pair<Instant, Long>? {
|
override fun watermarkFor(ref: UUID): Instant? {
|
||||||
return refWatermark[ref]
|
return refWatermark[ref]?.let {
|
||||||
}
|
return it
|
||||||
|
|
||||||
override fun setWatermarkFor(ref: UUID, time: Instant, id: Long) {
|
|
||||||
refWatermark[ref] = time to id
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun setWatermarkFor(ref: UUID, time: Instant) {
|
||||||
|
refWatermark[ref] = time
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
interface WatermarkDebugView {
|
interface WatermarkDebugView {
|
||||||
fun watermarkFor(ref: UUID): Pair<Instant, Long>?
|
fun watermarkFor(ref: UUID): Instant?
|
||||||
fun setWatermarkFor(ref: UUID, time: Instant, id: Long)
|
fun setWatermarkFor(ref: UUID, time: Instant)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import no.iktdev.eventi.events.EventListener
|
|||||||
import no.iktdev.eventi.events.EventListenerRegistry
|
import no.iktdev.eventi.events.EventListenerRegistry
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
|
||||||
import no.iktdev.eventi.testUtil.wipe
|
import no.iktdev.eventi.testUtil.wipe
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.Assertions.*
|
import org.junit.jupiter.api.Assertions.*
|
||||||
@ -20,15 +19,6 @@ class TaskListenerRegistryTest {
|
|||||||
override fun getWorkerId(): String {
|
override fun getWorkerId(): String {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task,
|
|
||||||
status: TaskStatus,
|
|
||||||
exception: Exception?
|
|
||||||
): Event {
|
|
||||||
TODO("Not yet implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun supports(task: Task): Boolean {
|
override fun supports(task: Task): Boolean {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
@ -42,15 +32,6 @@ class TaskListenerRegistryTest {
|
|||||||
override fun getWorkerId(): String {
|
override fun getWorkerId(): String {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task,
|
|
||||||
status: TaskStatus,
|
|
||||||
exception: Exception?
|
|
||||||
): Event {
|
|
||||||
TODO("Not yet implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun supports(task: Task): Boolean {
|
override fun supports(task: Task): Boolean {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
@ -64,15 +45,6 @@ class TaskListenerRegistryTest {
|
|||||||
override fun getWorkerId(): String {
|
override fun getWorkerId(): String {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task,
|
|
||||||
status: TaskStatus,
|
|
||||||
exception: Exception?
|
|
||||||
): Event {
|
|
||||||
TODO("Not yet implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun supports(task: Task): Boolean {
|
override fun supports(task: Task): Boolean {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
@ -85,15 +57,6 @@ class TaskListenerRegistryTest {
|
|||||||
override fun getWorkerId(): String {
|
override fun getWorkerId(): String {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task,
|
|
||||||
status: TaskStatus,
|
|
||||||
exception: Exception?
|
|
||||||
): Event {
|
|
||||||
TODO("Not yet implemented")
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun supports(task: Task): Boolean {
|
override fun supports(task: Task): Boolean {
|
||||||
TODO("Not yet implemented")
|
TODO("Not yet implemented")
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,13 +2,13 @@ package no.iktdev.eventi.tasks
|
|||||||
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.Job
|
||||||
import kotlinx.coroutines.delay
|
import kotlinx.coroutines.delay
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.test.runTest
|
||||||
import kotlinx.coroutines.yield
|
import kotlinx.coroutines.yield
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
|
||||||
import org.junit.jupiter.api.Assertions.*
|
import org.junit.jupiter.api.Assertions.*
|
||||||
import org.junit.jupiter.api.DisplayName
|
import org.junit.jupiter.api.DisplayName
|
||||||
import org.junit.jupiter.api.Test
|
import org.junit.jupiter.api.Test
|
||||||
@ -23,29 +23,32 @@ Så skal state, heartbeat og cleanup fungere korrekt
|
|||||||
""")
|
""")
|
||||||
class TaskListenerTest {
|
class TaskListenerTest {
|
||||||
|
|
||||||
|
// -------------------------
|
||||||
|
// Fake Task + Reporter
|
||||||
|
// -------------------------
|
||||||
|
|
||||||
class FakeTask : Task()
|
class FakeTask : Task()
|
||||||
|
|
||||||
class FakeReporter : TaskReporter {
|
class FakeReporter : TaskReporter {
|
||||||
var claimed = false
|
var claimed = false
|
||||||
var completed = false
|
var consumed = false
|
||||||
var failed = false
|
var logs = mutableListOf<String>()
|
||||||
var cancelled = false
|
var events = mutableListOf<Event>()
|
||||||
val logs = mutableListOf<String>()
|
|
||||||
val events = mutableListOf<Event>()
|
|
||||||
|
|
||||||
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
|
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
|
||||||
override fun markCompleted(taskId: UUID) { completed = true }
|
override fun markCompleted(taskId: UUID) { consumed = true }
|
||||||
override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
|
override fun markFailed(referenceId: UUID, taskId: UUID) { consumed = true }
|
||||||
override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true }
|
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
|
||||||
override fun updateProgress(taskId: UUID, progress: Int) {}
|
override fun updateProgress(taskId: UUID, progress: Int) {}
|
||||||
override fun publishEvent(event: Event) { events.add(event) }
|
override fun publishEvent(event: Event) { events.add(event) }
|
||||||
override fun updateLastSeen(taskId: UUID) {}
|
override fun updateLastSeen(taskId: UUID) {}
|
||||||
override fun log(taskId: UUID, message: String) { logs.add(message) }
|
override fun log(taskId: UUID, message: String) { logs.add(message) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
// -------------------------
|
||||||
// 1 — Heartbeat starter og stopper riktig
|
// Tests
|
||||||
// ---------------------------------------------------------
|
// -------------------------
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når onTask starter heartbeat-runner
|
Når onTask starter heartbeat-runner
|
||||||
@ -55,15 +58,13 @@ class TaskListenerTest {
|
|||||||
fun heartbeatStartsAndStopsCorrectly() = runTest {
|
fun heartbeatStartsAndStopsCorrectly() = runTest {
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
|
|
||||||
var heartbeatRan = false
|
var heartbeatStarted: Job? = null
|
||||||
|
var heartbeatRan: Boolean = false
|
||||||
|
private set
|
||||||
|
|
||||||
var onTaskCalled = false
|
var onTaskCalled = false
|
||||||
|
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
@ -71,18 +72,25 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
withHeartbeatRunner(10.milliseconds) {
|
withHeartbeatRunner(10.milliseconds) {
|
||||||
heartbeatRan = true
|
heartbeatRan = true
|
||||||
}
|
}.also { heartbeatStarted = it }
|
||||||
|
|
||||||
|
// Gi heartbeat en sjanse til å kjøre
|
||||||
yield()
|
yield()
|
||||||
|
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask(), reporter)
|
val task = FakeTask()
|
||||||
|
|
||||||
listener.currentJob?.join()
|
val accepted = listener.accept(task, reporter)
|
||||||
|
assertTrue(accepted)
|
||||||
|
|
||||||
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
assertNotNull(listener.heartbeatStarted)
|
||||||
|
assertFalse(listener.heartbeatStarted!!.isActive)
|
||||||
assertTrue(listener.heartbeatRan)
|
assertTrue(listener.heartbeatRan)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
@ -90,9 +98,6 @@ class TaskListenerTest {
|
|||||||
assertNull(listener.reporter)
|
assertNull(listener.reporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 2 — Heartbeat blokkerer ikke annen jobb
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når heartbeat kjører i bakgrunnen
|
Når heartbeat kjører i bakgrunnen
|
||||||
@ -105,51 +110,62 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
|
|
||||||
|
var heartbeatStarted: Job? = null
|
||||||
var heartbeatRan = false
|
var heartbeatRan = false
|
||||||
|
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
|
|
||||||
|
// Start heartbeat
|
||||||
withHeartbeatRunner(10.milliseconds) {
|
withHeartbeatRunner(10.milliseconds) {
|
||||||
heartbeatRan = true
|
heartbeatRan = true
|
||||||
}
|
}.also { heartbeatStarted = it }
|
||||||
|
|
||||||
|
// Simuler annen coroutine-oppgave (VideoTaskListener/Converter)
|
||||||
launch {
|
launch {
|
||||||
delay(30)
|
delay(30)
|
||||||
otherWorkCompleted.complete(Unit)
|
otherWorkCompleted.complete(Unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ⭐ Ikke fullfør onTask før testen sier det
|
||||||
allowFinish.await()
|
allowFinish.await()
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask(), reporter)
|
val task = FakeTask()
|
||||||
|
|
||||||
|
listener.accept(task, reporter)
|
||||||
|
|
||||||
|
// Vent på annen jobb
|
||||||
otherWorkCompleted.await()
|
otherWorkCompleted.await()
|
||||||
|
|
||||||
assertTrue(listener.heartbeatRan)
|
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
|
||||||
assertNotNull(listener.currentJob)
|
assertNotNull(listener.currentJob)
|
||||||
assertTrue(listener.currentJob!!.isActive)
|
assertTrue(listener.currentJob!!.isActive)
|
||||||
|
|
||||||
allowFinish.complete(Unit)
|
// Heartbeat kjørte
|
||||||
listener.currentJob?.join()
|
assertNotNull(listener.heartbeatStarted)
|
||||||
|
assertTrue(listener.heartbeatRan)
|
||||||
|
|
||||||
|
// ⭐ Nå lar vi onTask fullføre
|
||||||
|
allowFinish.complete(Unit)
|
||||||
|
|
||||||
|
// Vent på listener-jobben
|
||||||
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// Heartbeat ble kansellert
|
||||||
|
assertFalse(listener.heartbeatStarted!!.isActive)
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 3 — Heartbeat + CPU + IO arbeid
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når heartbeat kjører og flere parallelle jobber startes
|
Når heartbeat kjører og flere parallelle jobber startes
|
||||||
@ -163,56 +179,73 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
|
|
||||||
|
var heartbeatStarted: Job? = null
|
||||||
var heartbeatRan = false
|
var heartbeatRan = false
|
||||||
|
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
|
|
||||||
|
// Start heartbeat
|
||||||
withHeartbeatRunner(10.milliseconds) {
|
withHeartbeatRunner(10.milliseconds) {
|
||||||
heartbeatRan = true
|
heartbeatRan = true
|
||||||
}
|
}.also { heartbeatStarted = it }
|
||||||
|
|
||||||
|
// Simuler Converter (CPU)
|
||||||
launch(Dispatchers.Default) {
|
launch(Dispatchers.Default) {
|
||||||
repeat(1000) {}
|
repeat(1000) { /* CPU work */ }
|
||||||
converterDone.complete(Unit)
|
converterDone.complete(Unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Simuler VideoTaskListener (IO)
|
||||||
launch(Dispatchers.IO) {
|
launch(Dispatchers.IO) {
|
||||||
delay(40)
|
delay(40)
|
||||||
videoDone.complete(Unit)
|
videoDone.complete(Unit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ⭐ Vent til testen sier "nå kan du fullføre"
|
||||||
allowFinish.await()
|
allowFinish.await()
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask(), reporter)
|
val task = FakeTask()
|
||||||
|
|
||||||
|
listener.accept(task, reporter)
|
||||||
|
|
||||||
|
// Vent på begge "andre" oppgaver
|
||||||
converterDone.await()
|
converterDone.await()
|
||||||
videoDone.await()
|
videoDone.await()
|
||||||
|
|
||||||
assertTrue(listener.heartbeatRan)
|
// ⭐ Verifiser at begge faktisk ble fullført
|
||||||
|
assertTrue(converterDone.isCompleted)
|
||||||
|
assertTrue(videoDone.isCompleted)
|
||||||
|
|
||||||
|
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
|
||||||
assertNotNull(listener.currentJob)
|
assertNotNull(listener.currentJob)
|
||||||
|
assertTrue(listener.currentJob!!.isActive)
|
||||||
|
|
||||||
|
// Heartbeat kjørte
|
||||||
|
assertNotNull(listener.heartbeatStarted)
|
||||||
|
assertTrue(listener.heartbeatRan)
|
||||||
|
|
||||||
|
// ⭐ Nå lar vi onTask fullføre
|
||||||
allowFinish.complete(Unit)
|
allowFinish.complete(Unit)
|
||||||
listener.currentJob?.join()
|
|
||||||
|
|
||||||
|
// Vent på listener-jobben
|
||||||
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// Heartbeat ble kansellert
|
||||||
|
assertFalse(listener.heartbeatStarted!!.isActive)
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 4 — Arbeid fullføres, heartbeat kjører
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når onTask gjør ferdig arbeid
|
Når onTask gjør ferdig arbeid
|
||||||
@ -224,25 +257,24 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
|
|
||||||
|
var heartbeatStarted: Job? = null
|
||||||
var heartbeatRan = false
|
var heartbeatRan = false
|
||||||
var onTaskCalled = false
|
var onTaskCalled = false
|
||||||
|
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
onTaskCalled = true
|
onTaskCalled = true
|
||||||
|
|
||||||
withHeartbeatRunner(10.milliseconds) {
|
withHeartbeatRunner(10.milliseconds) {
|
||||||
heartbeatRan = true
|
heartbeatRan = true
|
||||||
}
|
}.also { heartbeatStarted = it }
|
||||||
|
|
||||||
|
// Simuler arbeid
|
||||||
delay(20)
|
delay(20)
|
||||||
|
|
||||||
|
// ⭐ signaliser at arbeidet er ferdig
|
||||||
workCompleted.complete(Unit)
|
workCompleted.complete(Unit)
|
||||||
|
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
@ -250,23 +282,34 @@ class TaskListenerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask(), reporter)
|
val task = FakeTask()
|
||||||
|
|
||||||
|
val accepted = listener.accept(task, reporter)
|
||||||
|
assertTrue(accepted)
|
||||||
|
|
||||||
|
// ⭐ Verifiser at arbeidet faktisk ble fullført
|
||||||
workCompleted.await()
|
workCompleted.await()
|
||||||
listener.currentJob?.join()
|
|
||||||
|
|
||||||
|
// Vent på jobben
|
||||||
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// onTask ble kalt
|
||||||
assertTrue(listener.onTaskCalled)
|
assertTrue(listener.onTaskCalled)
|
||||||
|
|
||||||
|
// Heartbeat ble startet
|
||||||
|
assertNotNull(listener.heartbeatStarted)
|
||||||
assertTrue(listener.heartbeatRan)
|
assertTrue(listener.heartbeatRan)
|
||||||
|
|
||||||
|
// Heartbeat ble kansellert
|
||||||
|
assertFalse(listener.heartbeatStarted!!.isActive)
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.reporter)
|
assertNull(listener.reporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 5 — accept() returnerer false når busy
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når listener er opptatt med en task
|
Når listener er opptatt med en task
|
||||||
@ -278,34 +321,36 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
|
// Hold jobben i live
|
||||||
allowFinish.await()
|
allowFinish.await()
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
|
val task1 = FakeTask()
|
||||||
|
val task2 = FakeTask()
|
||||||
|
|
||||||
assertTrue(listener.accept(FakeTask(), reporter))
|
// Første task aksepteres
|
||||||
assertFalse(listener.accept(FakeTask(), reporter))
|
val accepted1 = listener.accept(task1, reporter)
|
||||||
|
assertTrue(accepted1)
|
||||||
|
|
||||||
|
// Listener er busy → andre task skal avvises
|
||||||
|
val accepted2 = listener.accept(task2, reporter)
|
||||||
|
assertFalse(accepted2)
|
||||||
|
|
||||||
|
// Fullfør første task
|
||||||
allowFinish.complete(Unit)
|
allowFinish.complete(Unit)
|
||||||
listener.currentJob?.join()
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 6 — accept() returnerer false når unsupported
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når supports() returnerer false
|
Når supports() returnerer false
|
||||||
@ -315,26 +360,21 @@ class TaskListenerTest {
|
|||||||
fun acceptReturnsFalseWhenUnsupported() = runTest {
|
fun acceptReturnsFalseWhenUnsupported() = runTest {
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = false
|
override fun supports(task: Task) = false
|
||||||
override suspend fun onTask(task: Task): Event? = error("Should not be called")
|
override suspend fun onTask(task: Task): Event? = error("Should not be called")
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
|
val task = FakeTask()
|
||||||
|
|
||||||
assertFalse(listener.accept(FakeTask(), reporter))
|
val accepted = listener.accept(task, reporter)
|
||||||
|
|
||||||
|
assertFalse(accepted)
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.reporter)
|
assertNull(listener.reporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 7 — onError kalles når onTask kaster
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når onTask kaster en exception
|
Når onTask kaster en exception
|
||||||
@ -346,11 +386,6 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
@ -364,19 +399,22 @@ class TaskListenerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask().newReferenceId(), reporter)
|
val task = FakeTask().newReferenceId()
|
||||||
|
|
||||||
|
listener.accept(task, reporter)
|
||||||
|
|
||||||
|
// Vent på error-path
|
||||||
errorLogged.await()
|
errorLogged.await()
|
||||||
|
|
||||||
|
// ⭐ Vent på at cleanup i finally kjører
|
||||||
listener.currentJob?.join()
|
listener.currentJob?.join()
|
||||||
|
|
||||||
|
// Cleanup verifisering
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 8 — onCancelled kalles når jobben kanselleres
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når jobben kanselleres mens onTask kjører
|
Når jobben kanselleres mens onTask kjører
|
||||||
@ -389,16 +427,11 @@ class TaskListenerTest {
|
|||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
allowStart.complete(Unit)
|
allowStart.complete(Unit)
|
||||||
delay(Long.MAX_VALUE)
|
delay(Long.MAX_VALUE) // hold jobben i live
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,22 +442,28 @@ class TaskListenerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
listener.accept(FakeTask().newReferenceId(), reporter)
|
val task = FakeTask().newReferenceId()
|
||||||
|
|
||||||
|
listener.accept(task, reporter)
|
||||||
|
|
||||||
|
// Vent til onTask har startet
|
||||||
allowStart.await()
|
allowStart.await()
|
||||||
|
|
||||||
|
// Kanseller jobben
|
||||||
listener.currentJob!!.cancel()
|
listener.currentJob!!.cancel()
|
||||||
|
|
||||||
|
// Vent til onCancelled() ble kalt
|
||||||
cancelledCalled.await()
|
cancelledCalled.await()
|
||||||
|
|
||||||
|
// ⭐ Vent til cleanup i finally har kjørt
|
||||||
listener.currentJob?.join()
|
listener.currentJob?.join()
|
||||||
|
|
||||||
|
// Cleanup verifisering
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
// 9 — Sekvensiell kjøring uten state‑lekkasje
|
|
||||||
// ---------------------------------------------------------
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("""
|
@DisplayName("""
|
||||||
Når listener prosesserer to tasks sekvensielt
|
Når listener prosesserer to tasks sekvensielt
|
||||||
@ -432,10 +471,7 @@ class TaskListenerTest {
|
|||||||
Så skal ingen state lekke mellom tasks
|
Så skal ingen state lekke mellom tasks
|
||||||
""")
|
""")
|
||||||
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
|
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
|
||||||
val started1 = CompletableDeferred<Unit>()
|
|
||||||
val finish1 = CompletableDeferred<Unit>()
|
val finish1 = CompletableDeferred<Unit>()
|
||||||
|
|
||||||
val started2 = CompletableDeferred<Unit>()
|
|
||||||
val finish2 = CompletableDeferred<Unit>()
|
val finish2 = CompletableDeferred<Unit>()
|
||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
@ -443,50 +479,41 @@ class TaskListenerTest {
|
|||||||
var callCount = 0
|
var callCount = 0
|
||||||
|
|
||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
|
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task, status: TaskStatus, exception: Exception?
|
|
||||||
) = object : Event() {}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
callCount++
|
callCount++
|
||||||
|
if (callCount == 1) finish1.await()
|
||||||
if (callCount == 1) {
|
if (callCount == 2) finish2.await()
|
||||||
started1.complete(Unit)
|
|
||||||
finish1.await()
|
|
||||||
}
|
|
||||||
|
|
||||||
if (callCount == 2) {
|
|
||||||
started2.complete(Unit)
|
|
||||||
finish2.await()
|
|
||||||
}
|
|
||||||
|
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
|
|
||||||
listener.accept(FakeTask(), reporter)
|
// Task 1
|
||||||
started1.await()
|
val task1 = FakeTask()
|
||||||
|
listener.accept(task1, reporter)
|
||||||
finish1.complete(Unit)
|
finish1.complete(Unit)
|
||||||
listener.currentJob?.join()
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// Verifiser cleanup
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
|
|
||||||
listener.accept(FakeTask(), reporter)
|
// Task 2
|
||||||
started2.await()
|
val task2 = FakeTask()
|
||||||
|
listener.accept(task2, reporter)
|
||||||
finish2.complete(Unit)
|
finish2.complete(Unit)
|
||||||
listener.currentJob?.join()
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
|
// Verifiser cleanup igjen
|
||||||
assertNull(listener.currentJob)
|
assertNull(listener.currentJob)
|
||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
|
|
||||||
|
// onTask ble kalt to ganger
|
||||||
assertEquals(2, listener.callCount)
|
assertEquals(2, listener.callCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -10,7 +10,6 @@ import no.iktdev.eventi.TestBase
|
|||||||
import no.iktdev.eventi.events.EventTypeRegistry
|
import no.iktdev.eventi.events.EventTypeRegistry
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
|
||||||
import no.iktdev.eventi.stores.TaskStore
|
import no.iktdev.eventi.stores.TaskStore
|
||||||
import no.iktdev.eventi.testUtil.multiply
|
import no.iktdev.eventi.testUtil.multiply
|
||||||
import no.iktdev.eventi.testUtil.wipe
|
import no.iktdev.eventi.testUtil.wipe
|
||||||
@ -73,14 +72,6 @@ class TaskPollerImplementationTest : TestBase() {
|
|||||||
|
|
||||||
fun getJob() = currentJob
|
fun getJob() = currentJob
|
||||||
override fun getWorkerId() = this.javaClass.simpleName
|
override fun getWorkerId() = this.javaClass.simpleName
|
||||||
override fun createIncompleteStateTaskEvent(
|
|
||||||
task: Task,
|
|
||||||
status: TaskStatus,
|
|
||||||
exception: Exception?
|
|
||||||
): Event {
|
|
||||||
return object : Event() {}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun supports(task: Task) = task is EchoTask
|
override fun supports(task: Task) = task is EchoTask
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event {
|
override suspend fun onTask(task: Task): Event {
|
||||||
@ -114,7 +105,7 @@ class TaskPollerImplementationTest : TestBase() {
|
|||||||
val listener = EchoListener()
|
val listener = EchoListener()
|
||||||
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
|
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
|
||||||
|
|
||||||
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {}.apply { newReferenceId() })
|
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
|
||||||
taskStore.persist(task)
|
taskStore.persist(task)
|
||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|||||||
@ -1,14 +0,0 @@
|
|||||||
package no.iktdev.eventi.testUtil
|
|
||||||
|
|
||||||
import kotlinx.coroutines.CoroutineDispatcher
|
|
||||||
import kotlinx.coroutines.CoroutineScope
|
|
||||||
import kotlinx.coroutines.SupervisorJob
|
|
||||||
import no.iktdev.eventi.events.SequenceDispatchQueue
|
|
||||||
|
|
||||||
class TestSequenceDispatchQueue(
|
|
||||||
maxConcurrency: Int,
|
|
||||||
dispatcher: CoroutineDispatcher
|
|
||||||
) : SequenceDispatchQueue(
|
|
||||||
maxConcurrency,
|
|
||||||
CoroutineScope(dispatcher + SupervisorJob())
|
|
||||||
)
|
|
||||||
Loading…
Reference in New Issue
Block a user