This commit is contained in:
Brage Skjønborg 2025-11-09 16:19:59 +01:00
parent a4caf711b9
commit 751436a789
4 changed files with 17 additions and 34 deletions

View File

@ -5,10 +5,9 @@ import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.stores.EventStore
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import kotlin.collections.iterator
abstract class AbstractEventPoller(
abstract class EventPollerImplementation(
private val eventStore: EventStore,
private val dispatchQueue: SequenceDispatchQueue,
private val dispatcher: EventDispatcher
@ -19,7 +18,7 @@ abstract class AbstractEventPoller(
private val maxBackoff = Duration.ofMinutes(1)
suspend fun start() {
open suspend fun start() {
while (true) {
pollOnce()
}

View File

@ -6,7 +6,7 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.stores.TaskStore
import java.time.Duration
abstract class AbstractTaskPoller(
abstract class TaskPollerImplementation(
private val taskStore: TaskStore,
private val reporterFactory: (Task) -> TaskReporter
) {
@ -15,7 +15,7 @@ abstract class AbstractTaskPoller(
protected set
private val maxBackoff = Duration.ofMinutes(1)
suspend fun start() {
open suspend fun start() {
while (true) {
pollOnce()
}

View File

@ -3,14 +3,8 @@ package no.iktdev.eventi.events
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.isActive
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
@ -20,24 +14,22 @@ import no.iktdev.eventi.EventDispatcherTest.OtherEvent
import no.iktdev.eventi.EventDispatcherTest.TriggerEvent
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.stores.EventStore
import no.iktdev.eventi.testUtil.wipe
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertFalse
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import sun.rmi.transport.DGCAckHandler.received
import java.time.Duration
import java.time.LocalDateTime
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
class AbstractEventPollerTest : TestBase() {
class EventPollerImplementationTest : TestBase() {
val dispatcher = EventDispatcher(eventStore)
val queue = SequenceDispatchQueue(maxConcurrency = 8)
val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {}
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {}
@BeforeEach
fun setup() {
@ -83,7 +75,7 @@ class AbstractEventPollerTest : TestBase() {
@Test
fun `pollOnce should increase backoff when no events and reset when events arrive`() = runTest {
val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
fun currentBackoff(): Duration = backoff
}
@ -143,7 +135,7 @@ class AbstractEventPollerTest : TestBase() {
val refId = UUID.randomUUID()
val ignored = TriggerEvent().usingReferenceId(refId)
val testPoller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
val testPoller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
init {
lastSeenTime = LocalDateTime.now().plusSeconds(1)
}
@ -176,7 +168,7 @@ class AbstractEventPollerTest : TestBase() {
}
}
val poller = object : AbstractEventPoller(eventStore, queue, dispatcher) {
val poller = object : EventPollerImplementation(eventStore, queue, dispatcher) {
}
// Original event

View File

@ -1,18 +1,12 @@
package no.iktdev.eventi.tasks
import io.mockk.mockk
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.test.setMain
import no.iktdev.eventi.InMemoryTaskStore
import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
@ -25,12 +19,10 @@ import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.time.Duration
import java.util.UUID
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
import kotlin.time.Duration.Companion.nanoseconds
import kotlin.time.Duration.Companion.seconds
class AbstractTaskPollerTest : TestBase() {
class TaskPollerImplementationTest : TestBase() {
@BeforeEach
fun setup() {
@ -59,7 +51,7 @@ class AbstractTaskPollerTest : TestBase() {
data class EchoEvent(var data: String) : Event() {
}
class TaskPollerTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): AbstractTaskPoller(taskStore, reporterFactory) {
class TaskPollerImplementationTest(taskStore: TaskStore, reporterFactory: (Task) -> TaskReporter): TaskPollerImplementation(taskStore, reporterFactory) {
fun overrideSetBackoff(duration: java.time.Duration) {
backoff = duration
}
@ -113,7 +105,7 @@ class AbstractTaskPollerTest : TestBase() {
val listener = EchoListener()
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val task = EchoTask("Hello").newReferenceId().derivedOf(object : Event() {})
taskStore.persist(task)
@ -133,7 +125,7 @@ class AbstractTaskPollerTest : TestBase() {
EventTypeRegistry.register(EchoEvent::class.java)
val listener = EchoListener()
val poller = TaskPollerTest(taskStore, reporterFactory)
val poller = TaskPollerImplementationTest(taskStore, reporterFactory)
val initialBackoff = poller.backoff
poller.overrideSetBackoff(Duration.ofSeconds(16))
@ -152,7 +144,7 @@ class AbstractTaskPollerTest : TestBase() {
@Test
fun `poller increases backoff when no tasks`() = runTest {
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
val totalBackoff = initialBackoff.multiply(2)
@ -164,7 +156,7 @@ class AbstractTaskPollerTest : TestBase() {
@Test
fun `poller increases backoff when no listener supports task`() = runTest {
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val initialBackoff = poller.backoff
// as long as the task is not added to registry this will be unsupported
@ -182,7 +174,7 @@ class AbstractTaskPollerTest : TestBase() {
override val isBusy = true
}
val poller = object : AbstractTaskPoller(taskStore, reporterFactory) {}
val poller = object : TaskPollerImplementation(taskStore, reporterFactory) {}
val intialBackoff = poller.backoff
val task = EchoTask("Busy").newReferenceId()
@ -204,7 +196,7 @@ class AbstractTaskPollerTest : TestBase() {
val failingStore = object : InMemoryTaskStore() {
override fun claim(taskId: UUID, workerId: String): Boolean = false
}
val pollerWithFailingClaim = object : AbstractTaskPoller(failingStore, reporterFactory) {}
val pollerWithFailingClaim = object : TaskPollerImplementation(failingStore, reporterFactory) {}
val initialBackoff = pollerWithFailingClaim.backoff
failingStore.persist(task)