Added more tests
This commit is contained in:
parent
23f6afb483
commit
6d60c5f74c
@ -5,9 +5,9 @@ import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.stores.EventStore
|
||||
import java.util.UUID
|
||||
|
||||
class EventDispatcher(val eventStore: EventStore) {
|
||||
open class EventDispatcher(val eventStore: EventStore) {
|
||||
|
||||
fun dispatch(referenceId: UUID, events: List<Event>) {
|
||||
open fun dispatch(referenceId: UUID, events: List<Event>) {
|
||||
val derivedFromIds = events.mapNotNull { it.metadata.derivedFromId }.flatten().toSet()
|
||||
val deletedEventIds = events.filterIsInstance<DeleteEvent>().map { it.deletedEventId }
|
||||
val candidates = events
|
||||
|
||||
@ -11,7 +11,7 @@ import no.iktdev.eventi.models.Event
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
class SequenceDispatchQueue(
|
||||
open class SequenceDispatchQueue(
|
||||
private val maxConcurrency: Int = 8,
|
||||
private val scope: CoroutineScope = CoroutineScope(Dispatchers.Default + SupervisorJob())
|
||||
) {
|
||||
@ -25,9 +25,9 @@ class SequenceDispatchQueue(
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
|
||||
fun isProcessing(referenceId: UUID): Boolean = referenceId in active
|
||||
open fun isProcessing(referenceId: UUID): Boolean = referenceId in active
|
||||
|
||||
fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job? {
|
||||
open fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job? {
|
||||
if (!active.add(referenceId)) {
|
||||
log.debug {"⚠️ Already processing $referenceId, skipping dispatch"}
|
||||
return null
|
||||
|
||||
@ -71,7 +71,7 @@ class EventDispatcherTest: TestBase() {
|
||||
val listener = ProducingListener()
|
||||
|
||||
val trigger = TriggerEvent()
|
||||
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now())
|
||||
val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, MyTime.UtcNow())
|
||||
|
||||
eventStore.persist(derived!!.toEvent()!!) // simulate prior production
|
||||
|
||||
|
||||
@ -18,10 +18,16 @@ class InMemoryEventStore : EventStore {
|
||||
persisted.filter { it.referenceId == referenceId }
|
||||
|
||||
override fun persist(event: Event) {
|
||||
val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now())
|
||||
val persistedEvent = event.toPersisted(nextId++, MyTime.UtcNow())
|
||||
persisted += persistedEvent!!
|
||||
}
|
||||
|
||||
fun persistAt(event: Event, persistedAt: LocalDateTime) {
|
||||
val persistedEvent = event.toPersisted(nextId++, persistedAt)
|
||||
persisted += persistedEvent!!
|
||||
}
|
||||
|
||||
|
||||
fun all(): List<PersistedEvent> = persisted
|
||||
fun clear() { persisted.clear(); nextId = 1L }
|
||||
}
|
||||
|
||||
@ -30,13 +30,13 @@ open class InMemoryTaskStore : TaskStore {
|
||||
override fun claim(taskId: UUID, workerId: String): Boolean {
|
||||
val task = findByTaskId(taskId) ?: return false
|
||||
if (task.claimed && !isExpired(task)) return false
|
||||
update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = LocalDateTime.now()))
|
||||
update(task.copy(claimed = true, claimedBy = workerId, lastCheckIn = MyTime.UtcNow()))
|
||||
return true
|
||||
}
|
||||
|
||||
override fun heartbeat(taskId: UUID) {
|
||||
val task = findByTaskId(taskId) ?: return
|
||||
update(task.copy(lastCheckIn = LocalDateTime.now()))
|
||||
update(task.copy(lastCheckIn = MyTime.UtcNow()))
|
||||
}
|
||||
|
||||
override fun markConsumed(taskId: UUID, status: TaskStatus) {
|
||||
@ -45,7 +45,7 @@ open class InMemoryTaskStore : TaskStore {
|
||||
}
|
||||
|
||||
override fun releaseExpiredTasks(timeout: Duration) {
|
||||
val now = LocalDateTime.now()
|
||||
val now = MyTime.UtcNow()
|
||||
tasks.filter {
|
||||
it.claimed && !it.consumed && it.lastCheckIn?.isBefore(now.minus(timeout)) == true
|
||||
}.forEach {
|
||||
@ -60,7 +60,7 @@ open class InMemoryTaskStore : TaskStore {
|
||||
}
|
||||
|
||||
private fun isExpired(task: PersistedTask): Boolean {
|
||||
val now = LocalDateTime.now()
|
||||
val now = MyTime.UtcNow()
|
||||
return task.lastCheckIn?.isBefore(now.minusMinutes(15)) == true
|
||||
}
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@ import no.iktdev.eventi.EventDispatcherTest
|
||||
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
|
||||
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
|
||||
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
|
||||
import no.iktdev.eventi.MyTime
|
||||
import no.iktdev.eventi.TestBase
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.testUtil.wipe
|
||||
@ -137,7 +138,7 @@ class EventPollerImplementationTest : TestBase() {
|
||||
|
||||
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
||||
init {
|
||||
lastSeenTime = LocalDateTime.now().plusSeconds(1)
|
||||
lastSeenTime = MyTime.UtcNow().plusSeconds(1)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
244
src/test/kotlin/no/iktdev/eventi/events/RunSimulationTest.kt
Normal file
244
src/test/kotlin/no/iktdev/eventi/events/RunSimulationTest.kt
Normal file
@ -0,0 +1,244 @@
|
||||
package no.iktdev.eventi.events
|
||||
|
||||
import io.mockk.every
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.test.*
|
||||
import no.iktdev.eventi.InMemoryEventStore
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.time.LocalDateTime
|
||||
import java.util.UUID
|
||||
import kotlinx.coroutines.*
|
||||
import no.iktdev.eventi.ZDS.toPersisted
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.Metadata
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
|
||||
class FakeDispatchQueue(
|
||||
private val scope: CoroutineScope
|
||||
) : SequenceDispatchQueue(8, scope) {
|
||||
|
||||
private val active = ConcurrentHashMap.newKeySet<UUID>()
|
||||
|
||||
override fun isProcessing(referenceId: UUID): Boolean = referenceId in active
|
||||
|
||||
override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job {
|
||||
active.add(referenceId)
|
||||
return scope.launch {
|
||||
try {
|
||||
dispatcher.dispatch(referenceId, events)
|
||||
} finally {
|
||||
active.remove(referenceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class FakeDispatcher : EventDispatcher(InMemoryEventStore()) {
|
||||
|
||||
val dispatched = mutableListOf<Pair<UUID, List<Event>>>()
|
||||
|
||||
override fun dispatch(referenceId: UUID, events: List<Event>) {
|
||||
dispatched += referenceId to events
|
||||
}
|
||||
}
|
||||
|
||||
class TestEvent : Event() {
|
||||
fun withReference(id: UUID): TestEvent {
|
||||
this.referenceId = id
|
||||
return this
|
||||
}
|
||||
fun setMetadata(metadata: Metadata): TestEvent {
|
||||
this.metadata = metadata
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class FakeClock(var now: LocalDateTime) {
|
||||
fun advanceSeconds(sec: Long) {
|
||||
now = now.plusSeconds(sec)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class RunSimulationTestTest {
|
||||
|
||||
private lateinit var store: InMemoryEventStore
|
||||
private lateinit var dispatcher: FakeDispatcher
|
||||
private lateinit var testDispatcher: TestDispatcher
|
||||
private lateinit var scope: CoroutineScope
|
||||
private lateinit var queue: FakeDispatchQueue
|
||||
private lateinit var poller: EventPollerImplementation
|
||||
|
||||
@BeforeEach
|
||||
fun setup() {
|
||||
store = InMemoryEventStore()
|
||||
dispatcher = FakeDispatcher()
|
||||
testDispatcher = StandardTestDispatcher()
|
||||
scope = CoroutineScope(testDispatcher)
|
||||
queue = FakeDispatchQueue(scope)
|
||||
EventTypeRegistry.register(TestEvent::class.java)
|
||||
poller = object : EventPollerImplementation(store, queue, dispatcher) {
|
||||
override suspend fun start() = error("Do not call start() in tests")
|
||||
}
|
||||
}
|
||||
|
||||
private fun persistEvent(ref: UUID, time: LocalDateTime) {
|
||||
val e = TestEvent().withReference(ref)
|
||||
store.persist(e.setMetadata(Metadata()))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller updates lastSeenTime when dispatch happens`() = runTest(testDispatcher) {
|
||||
val ref = UUID.randomUUID()
|
||||
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
|
||||
|
||||
persistEvent(ref, t)
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(poller.lastSeenTime).isAfter(LocalDateTime.of(1970,1,1,0,0))
|
||||
assertThat(dispatcher.dispatched).hasSize(1)
|
||||
}
|
||||
|
||||
|
||||
class AlwaysBusyDispatchQueue : SequenceDispatchQueue(8, CoroutineScope(Dispatchers.Default)) {
|
||||
override fun isProcessing(referenceId: UUID): Boolean = true
|
||||
override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher) = null
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller does NOT update lastSeenTime when queue is busy`() = runTest {
|
||||
val ref = UUID.randomUUID()
|
||||
val t = LocalDateTime.of(2026,1,22,12,0,0)
|
||||
|
||||
store.persistAt(TestEvent().withReference(ref), t)
|
||||
|
||||
val busyQueue = AlwaysBusyDispatchQueue()
|
||||
|
||||
val poller = object : EventPollerImplementation(store, busyQueue, dispatcher) {}
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(poller.lastSeenTime)
|
||||
.isEqualTo(LocalDateTime.of(1970,1,1,0,0))
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
fun `poller does not double-dispatch`() = runTest(testDispatcher) {
|
||||
val ref = UUID.randomUUID()
|
||||
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
|
||||
|
||||
persistEvent(ref, t)
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller handles multiple referenceIds`() = runTest(testDispatcher) {
|
||||
val refA = UUID.randomUUID()
|
||||
val refB = UUID.randomUUID()
|
||||
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
|
||||
|
||||
persistEvent(refA, t)
|
||||
persistEvent(refB, t.plusSeconds(1))
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller handles identical timestamps`() = runTest(testDispatcher) {
|
||||
val refA = UUID.randomUUID()
|
||||
val refB = UUID.randomUUID()
|
||||
val t = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
|
||||
|
||||
persistEvent(refA, t)
|
||||
persistEvent(refB, t)
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller backs off when no new events`() = runTest(testDispatcher) {
|
||||
val before = poller.backoff
|
||||
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(poller.backoff).isGreaterThan(before)
|
||||
}
|
||||
|
||||
class ControlledDispatchQueue(
|
||||
private val scope: CoroutineScope
|
||||
) : SequenceDispatchQueue(8, scope) {
|
||||
|
||||
val busyRefs = mutableSetOf<UUID>()
|
||||
|
||||
override fun isProcessing(referenceId: UUID): Boolean =
|
||||
referenceId in busyRefs
|
||||
|
||||
override fun dispatch(referenceId: UUID, events: List<Event>, dispatcher: EventDispatcher): Job {
|
||||
return scope.launch {
|
||||
dispatcher.dispatch(referenceId, events)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
fun `poller processes events arriving while queue is busy`() = runTest(testDispatcher) {
|
||||
val ref = UUID.randomUUID()
|
||||
val t1 = LocalDateTime.of(2026, 1, 22, 12, 0, 0)
|
||||
val t2 = t1.plusSeconds(5)
|
||||
|
||||
persistEvent(ref, t1)
|
||||
|
||||
val controlledQueue = ControlledDispatchQueue(scope)
|
||||
controlledQueue.busyRefs += ref
|
||||
|
||||
val poller = object : EventPollerImplementation(store, controlledQueue, dispatcher) {}
|
||||
|
||||
// Poll #1: busy → no dispatch
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(dispatcher.dispatched).isEmpty()
|
||||
|
||||
// Now free
|
||||
controlledQueue.busyRefs.clear()
|
||||
|
||||
// Add new event
|
||||
persistEvent(ref, t2)
|
||||
|
||||
// Poll #2: should dispatch both events
|
||||
poller.pollOnce()
|
||||
advanceUntilIdle()
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(1)
|
||||
assertThat(dispatcher.dispatched.single().second).hasSize(2)
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,110 @@
|
||||
package no.iktdev.eventi.events.poller
|
||||
|
||||
import kotlinx.coroutines.test.*
|
||||
import no.iktdev.eventi.InMemoryEventStore
|
||||
import no.iktdev.eventi.events.EventTypeRegistry
|
||||
import no.iktdev.eventi.events.FakeDispatcher
|
||||
import no.iktdev.eventi.events.RunSimulationTestTest
|
||||
import no.iktdev.eventi.events.TestEvent
|
||||
import no.iktdev.eventi.models.Metadata
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.time.LocalDateTime
|
||||
import java.util.UUID
|
||||
|
||||
class PollerStartLoopTest {
|
||||
|
||||
private lateinit var store: InMemoryEventStore
|
||||
private lateinit var dispatcher: FakeDispatcher
|
||||
private lateinit var testDispatcher: TestDispatcher
|
||||
private lateinit var scope: TestScope
|
||||
private lateinit var queue: RunSimulationTestTest.ControlledDispatchQueue
|
||||
private lateinit var poller: TestablePoller
|
||||
|
||||
@BeforeEach
|
||||
fun setup() {
|
||||
store = InMemoryEventStore()
|
||||
dispatcher = FakeDispatcher()
|
||||
testDispatcher = StandardTestDispatcher()
|
||||
scope = TestScope(testDispatcher)
|
||||
queue = RunSimulationTestTest.ControlledDispatchQueue(scope)
|
||||
EventTypeRegistry.register(TestEvent::class.java)
|
||||
|
||||
poller = TestablePoller(store, queue, dispatcher, scope)
|
||||
}
|
||||
|
||||
private fun persistAt(ref: UUID, time: LocalDateTime) {
|
||||
val e = TestEvent().withReference(ref).setMetadata(Metadata())
|
||||
store.persistAt(e, time)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller does not spin when no events exist`() = runTest {
|
||||
val startBackoff = poller.backoff
|
||||
|
||||
poller.startFor(iterations = 10)
|
||||
|
||||
assertThat(poller.backoff).isGreaterThan(startBackoff)
|
||||
assertThat(dispatcher.dispatched).isEmpty()
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller increases backoff exponentially`() = runTest {
|
||||
val b1 = poller.backoff
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
val b2 = poller.backoff
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
val b3 = poller.backoff
|
||||
|
||||
assertThat(b2).isGreaterThan(b1)
|
||||
assertThat(b3).isGreaterThan(b2)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller resets backoff when events appear`() = runTest {
|
||||
poller.startFor(iterations = 5)
|
||||
val before = poller.backoff
|
||||
|
||||
val ref = UUID.randomUUID()
|
||||
persistAt(ref, LocalDateTime.now())
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
|
||||
assertThat(poller.backoff).isEqualTo(java.time.Duration.ofSeconds(2))
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller processes events that arrive while sleeping`() = runTest {
|
||||
val ref = UUID.randomUUID()
|
||||
|
||||
poller.startFor(iterations = 3)
|
||||
|
||||
persistAt(ref, LocalDateTime.now())
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(1)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun `poller does not lose events under concurrency`() = runTest {
|
||||
val ref = UUID.randomUUID()
|
||||
|
||||
queue.busyRefs += ref
|
||||
|
||||
persistAt(ref, LocalDateTime.now())
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
|
||||
assertThat(dispatcher.dispatched).isEmpty()
|
||||
|
||||
queue.busyRefs.clear()
|
||||
|
||||
poller.startFor(iterations = 1)
|
||||
|
||||
assertThat(dispatcher.dispatched).hasSize(1)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,29 @@
|
||||
package no.iktdev.eventi.events.poller
|
||||
|
||||
import kotlinx.coroutines.test.TestScope
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import no.iktdev.eventi.events.EventDispatcher
|
||||
import no.iktdev.eventi.events.EventPollerImplementation
|
||||
import no.iktdev.eventi.events.SequenceDispatchQueue
|
||||
import no.iktdev.eventi.stores.EventStore
|
||||
|
||||
class TestablePoller(
|
||||
eventStore: EventStore,
|
||||
dispatchQueue: SequenceDispatchQueue,
|
||||
dispatcher: EventDispatcher,
|
||||
val scope: TestScope
|
||||
) : EventPollerImplementation(eventStore, dispatchQueue, dispatcher) {
|
||||
|
||||
suspend fun startFor(iterations: Int) {
|
||||
repeat(iterations) {
|
||||
try {
|
||||
pollOnce()
|
||||
} catch (_: Exception) {
|
||||
// same as prod
|
||||
}
|
||||
|
||||
// Simuler delay(backoff)
|
||||
scope.testScheduler.advanceTimeBy(backoff.toMillis())
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user