Test runner
This commit is contained in:
parent
a9779d2371
commit
289ee88be0
@ -1,19 +1,23 @@
|
|||||||
package no.iktdev.eventi.events
|
package no.iktdev.eventi.events
|
||||||
|
|
||||||
import kotlinx.coroutines.CompletableDeferred
|
import kotlinx.coroutines.CompletableDeferred
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.CoroutineDispatcher
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||||
import kotlinx.coroutines.awaitAll
|
import kotlinx.coroutines.SupervisorJob
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
|
import kotlinx.coroutines.test.StandardTestDispatcher
|
||||||
import kotlinx.coroutines.test.runTest
|
import kotlinx.coroutines.test.runTest
|
||||||
import kotlinx.coroutines.withContext
|
import kotlinx.coroutines.withContext
|
||||||
import kotlinx.coroutines.withTimeout
|
import kotlinx.coroutines.withTimeout
|
||||||
|
import kotlinx.coroutines.awaitAll
|
||||||
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
|
import no.iktdev.eventi.EventDispatcherTest.DerivedEvent
|
||||||
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
|
import no.iktdev.eventi.EventDispatcherTest.OtherEvent
|
||||||
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
|
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
|
||||||
import no.iktdev.eventi.MyTime
|
import no.iktdev.eventi.MyTime
|
||||||
import no.iktdev.eventi.TestBase
|
import no.iktdev.eventi.TestBase
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
|
import no.iktdev.eventi.testUtil.TestSequenceDispatchQueue
|
||||||
import no.iktdev.eventi.testUtil.wipe
|
import no.iktdev.eventi.testUtil.wipe
|
||||||
import org.junit.jupiter.api.Assertions.assertEquals
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
import org.junit.jupiter.api.Assertions.assertFalse
|
import org.junit.jupiter.api.Assertions.assertFalse
|
||||||
@ -33,9 +37,7 @@ Så skal polleren dispatch'e riktig, oppdatere lastSeenTime og unngå duplikater
|
|||||||
""")
|
""")
|
||||||
class EventPollerImplementationTest : TestBase() {
|
class EventPollerImplementationTest : TestBase() {
|
||||||
|
|
||||||
val dispatcher = EventDispatcher(eventStore)
|
private val dispatcher = EventDispatcher(eventStore)
|
||||||
val queue = SequenceDispatchQueue(maxConcurrency = 8)
|
|
||||||
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
|
||||||
|
|
||||||
@BeforeEach
|
@BeforeEach
|
||||||
fun setup() {
|
fun setup() {
|
||||||
@ -59,6 +61,10 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
|
Så skal alle referenceId-er dispatch'es og lastSeenTime oppdateres
|
||||||
""")
|
""")
|
||||||
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
|
fun pollOnceDispatchesAllNewReferenceIdsAndUpdatesLastSeenTime() = runTest {
|
||||||
|
val testDispatcher = StandardTestDispatcher(testScheduler)
|
||||||
|
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
||||||
|
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
||||||
|
|
||||||
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
|
val dispatched = ConcurrentHashMap.newKeySet<UUID>()
|
||||||
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
|
val completionMap = mutableMapOf<UUID, CompletableDeferred<Unit>>()
|
||||||
|
|
||||||
@ -93,6 +99,9 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal backoff øke, og resettes når nye events ankommer
|
Så skal backoff øke, og resettes når nye events ankommer
|
||||||
""")
|
""")
|
||||||
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
|
fun pollOnceIncreasesBackoffWhenNoEventsAndResetsWhenEventsArrive() = runTest {
|
||||||
|
val testDispatcher = StandardTestDispatcher(testScheduler)
|
||||||
|
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
||||||
|
|
||||||
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
||||||
fun currentBackoff(): Duration = backoff
|
fun currentBackoff(): Duration = backoff
|
||||||
}
|
}
|
||||||
@ -121,6 +130,10 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal polleren gruppere og dispatch'e alle tre i én batch
|
Så skal polleren gruppere og dispatch'e alle tre i én batch
|
||||||
""")
|
""")
|
||||||
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
|
fun pollOnceGroupsAndDispatchesExactlyThreeEventsForOneReferenceId() = runTest {
|
||||||
|
val testDispatcher = StandardTestDispatcher(testScheduler)
|
||||||
|
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
||||||
|
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
||||||
|
|
||||||
val refId = UUID.randomUUID()
|
val refId = UUID.randomUUID()
|
||||||
val received = mutableListOf<Event>()
|
val received = mutableListOf<Event>()
|
||||||
val done = CompletableDeferred<Unit>()
|
val done = CompletableDeferred<Unit>()
|
||||||
@ -157,8 +170,8 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal polleren ignorere dem
|
Så skal polleren ignorere dem
|
||||||
""")
|
""")
|
||||||
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
|
fun pollOnceIgnoresEventsBeforeLastSeenTime() = runTest {
|
||||||
val refId = UUID.randomUUID()
|
val testDispatcher = StandardTestDispatcher(testScheduler)
|
||||||
val ignored = TriggerEvent().usingReferenceId(refId)
|
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
||||||
|
|
||||||
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
|
||||||
init {
|
init {
|
||||||
@ -166,6 +179,9 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val refId = UUID.randomUUID()
|
||||||
|
val ignored = TriggerEvent().usingReferenceId(refId)
|
||||||
|
|
||||||
eventStore.persist(ignored)
|
eventStore.persist(ignored)
|
||||||
testPoller.pollOnce()
|
testPoller.pollOnce()
|
||||||
|
|
||||||
@ -180,7 +196,12 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
Så skal begge events prosesseres, men uten å produsere duplikate derived events
|
Så skal begge events prosesseres, men uten å produsere duplikate derived events
|
||||||
""")
|
""")
|
||||||
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
|
fun pollerHandlesManuallyInjectedDuplicateEvent() = runTest {
|
||||||
|
val testDispatcher = StandardTestDispatcher(testScheduler)
|
||||||
|
val queue = TestSequenceDispatchQueue(maxConcurrency = 8, dispatcher = testDispatcher)
|
||||||
|
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
||||||
|
|
||||||
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
|
EventTypeRegistry.register(listOf(MarcoEvent::class.java, EchoEvent::class.java))
|
||||||
|
|
||||||
val channel = Channel<Event>(Channel.UNLIMITED)
|
val channel = Channel<Event>(Channel.UNLIMITED)
|
||||||
val handled = mutableListOf<Event>()
|
val handled = mutableListOf<Event>()
|
||||||
|
|
||||||
@ -193,16 +214,14 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
|
|
||||||
|
|
||||||
val original = EchoEvent("Hello")
|
val original = EchoEvent("Hello")
|
||||||
eventStore.persist(original)
|
eventStore.persist(original)
|
||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|
||||||
withContext(Dispatchers.Default.limitedParallelism(1)) {
|
withContext(testDispatcher) {
|
||||||
withTimeout(Duration.ofMinutes(1).toMillis()) {
|
withTimeout(60_000) {
|
||||||
repeat(1) { channel.receive() }
|
channel.receive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,9 +230,9 @@ class EventPollerImplementationTest : TestBase() {
|
|||||||
|
|
||||||
poller.pollOnce()
|
poller.pollOnce()
|
||||||
|
|
||||||
withContext(Dispatchers.Default.limitedParallelism(1)) {
|
withContext(testDispatcher) {
|
||||||
withTimeout(Duration.ofMinutes(1).toMillis()) {
|
withTimeout(60_000) {
|
||||||
repeat(1) { channel.receive() }
|
channel.receive()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -471,7 +471,10 @@ class TaskListenerTest {
|
|||||||
Så skal ingen state lekke mellom tasks
|
Så skal ingen state lekke mellom tasks
|
||||||
""")
|
""")
|
||||||
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
|
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
|
||||||
|
val started1 = CompletableDeferred<Unit>()
|
||||||
val finish1 = CompletableDeferred<Unit>()
|
val finish1 = CompletableDeferred<Unit>()
|
||||||
|
|
||||||
|
val started2 = CompletableDeferred<Unit>()
|
||||||
val finish2 = CompletableDeferred<Unit>()
|
val finish2 = CompletableDeferred<Unit>()
|
||||||
|
|
||||||
val listener = object : TaskListener() {
|
val listener = object : TaskListener() {
|
||||||
@ -481,20 +484,31 @@ class TaskListenerTest {
|
|||||||
override fun getWorkerId() = "worker"
|
override fun getWorkerId() = "worker"
|
||||||
override fun supports(task: Task) = true
|
override fun supports(task: Task) = true
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event {
|
||||||
callCount++
|
callCount++
|
||||||
if (callCount == 1) finish1.await()
|
|
||||||
if (callCount == 2) finish2.await()
|
if (callCount == 1) {
|
||||||
|
started1.complete(Unit) // signal: coroutine har startet
|
||||||
|
finish1.await() // vent til testen sier "fortsett"
|
||||||
|
}
|
||||||
|
|
||||||
|
if (callCount == 2) {
|
||||||
|
started2.complete(Unit)
|
||||||
|
finish2.await()
|
||||||
|
}
|
||||||
|
|
||||||
return object : Event() {}
|
return object : Event() {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val reporter = FakeReporter()
|
val reporter = FakeReporter()
|
||||||
|
|
||||||
// Task 1
|
// --- Task 1 ---
|
||||||
val task1 = FakeTask()
|
val task1 = FakeTask()
|
||||||
listener.accept(task1, reporter)
|
listener.accept(task1, reporter)
|
||||||
finish1.complete(Unit)
|
|
||||||
|
started1.await() // garanterer at coroutine kjører
|
||||||
|
finish1.complete(Unit) // la coroutine fullføre
|
||||||
listener.currentJob!!.join()
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
// Verifiser cleanup
|
// Verifiser cleanup
|
||||||
@ -502,9 +516,11 @@ class TaskListenerTest {
|
|||||||
assertNull(listener.currentTask)
|
assertNull(listener.currentTask)
|
||||||
assertNull(listener.heartbeatRunner)
|
assertNull(listener.heartbeatRunner)
|
||||||
|
|
||||||
// Task 2
|
// --- Task 2 ---
|
||||||
val task2 = FakeTask()
|
val task2 = FakeTask()
|
||||||
listener.accept(task2, reporter)
|
listener.accept(task2, reporter)
|
||||||
|
|
||||||
|
started2.await()
|
||||||
finish2.complete(Unit)
|
finish2.complete(Unit)
|
||||||
listener.currentJob!!.join()
|
listener.currentJob!!.join()
|
||||||
|
|
||||||
@ -516,4 +532,5 @@ class TaskListenerTest {
|
|||||||
// onTask ble kalt to ganger
|
// onTask ble kalt to ganger
|
||||||
assertEquals(2, listener.callCount)
|
assertEquals(2, listener.callCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,14 @@
|
|||||||
|
package no.iktdev.eventi.testUtil
|
||||||
|
|
||||||
|
import kotlinx.coroutines.CoroutineDispatcher
|
||||||
|
import kotlinx.coroutines.CoroutineScope
|
||||||
|
import kotlinx.coroutines.SupervisorJob
|
||||||
|
import no.iktdev.eventi.events.SequenceDispatchQueue
|
||||||
|
|
||||||
|
class TestSequenceDispatchQueue(
|
||||||
|
maxConcurrency: Int,
|
||||||
|
dispatcher: CoroutineDispatcher
|
||||||
|
) : SequenceDispatchQueue(
|
||||||
|
maxConcurrency,
|
||||||
|
CoroutineScope(dispatcher + SupervisorJob())
|
||||||
|
)
|
||||||
Loading…
Reference in New Issue
Block a user