From 8993546cb3c96a8eaa4793cd7d5609a21b92f8ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sun, 12 Oct 2025 13:49:04 +0200 Subject: [PATCH] Adjustments --- src/main/kotlin/no/iktdev/eventi/ZDS.kt | 21 +++++++++++++------ .../eventi/events/AbstractEventPoller.kt | 6 +++--- .../iktdev/eventi/tasks/AbstractTaskPoller.kt | 3 ++- .../no/iktdev/eventi/tasks/TaskListener.kt | 2 +- .../no/iktdev/eventi/EventDispatcherTest.kt | 9 ++++---- .../no/iktdev/eventi/InMemoryEventStore.kt | 2 +- .../no/iktdev/eventi/InMemoryTaskStore.kt | 2 +- src/test/kotlin/no/iktdev/eventi/ZDSTest.kt | 4 ++-- 8 files changed, 30 insertions(+), 19 deletions(-) diff --git a/src/main/kotlin/no/iktdev/eventi/ZDS.kt b/src/main/kotlin/no/iktdev/eventi/ZDS.kt index abd94b4..deb34ef 100644 --- a/src/main/kotlin/no/iktdev/eventi/ZDS.kt +++ b/src/main/kotlin/no/iktdev/eventi/ZDS.kt @@ -21,9 +21,12 @@ import java.time.format.DateTimeFormatter object ZDS { val gson = WGson.gson - fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent { + fun Event.toPersisted(id: Long, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedEvent? { val payloadJson = gson.toJson(this) - val eventName = this::class.simpleName ?: error("Missing class name") + val eventName = this::class.simpleName ?: run { + println("Missing class name for event: $this") + return null + } return PersistedEvent( id = id, referenceId = referenceId, @@ -37,15 +40,21 @@ object ZDS { /** * Convert a PersistedEvent back to its original Event type using the event type registry and Gson for deserialization. */ - fun PersistedEvent.toEvent(): Event { + fun PersistedEvent.toEvent(): Event? { val clazz = EventTypeRegistry.resolve(event) - ?: error("Unknown event type: $event") + ?: run { + println("Missing class name for event: $this") + return null + } return gson.fromJson(data, clazz) } - fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask { + fun Task.toPersisted(id: Long, status: TaskStatus = TaskStatus.Pending, persistedAt: LocalDateTime = LocalDateTime.now()): PersistedTask? { val payloadJson = gson.toJson(this) - val taskName = this::class.simpleName ?: error("Missing class name") + val taskName = this::class.simpleName ?: run { + println("Missing class name for task: $this") + return null + } return PersistedTask( id = id, referenceId = referenceId, diff --git a/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt b/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt index faf4d2b..332569b 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/AbstractEventPoller.kt @@ -14,8 +14,8 @@ abstract class AbstractEventPoller( private val dispatcher: EventDispatcher ) { var lastSeenTime: LocalDateTime = LocalDateTime.MIN - var backoff = Duration.ofSeconds(2) - private set + open var backoff = Duration.ofSeconds(2) + protected set private val maxBackoff = Duration.ofMinutes(1) @@ -42,7 +42,7 @@ abstract class AbstractEventPoller( if (dispatchQueue.isProcessing(referenceId)) continue val fullLog = eventStore.getPersistedEventsFor(referenceId) - val events = fullLog.map { it.toEvent() } + val events = fullLog.mapNotNull { it.toEvent() } dispatchQueue.dispatch(referenceId, events, dispatcher) lastSeenTime = fullLog.maxOf { it.persistedAt } diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt b/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt index fbb8dfe..90147c1 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/AbstractTaskPoller.kt @@ -10,7 +10,8 @@ abstract class AbstractTaskPoller( private val taskStore: TaskStore, private val reporterFactory: (Task) -> TaskReporter ) { - var backoff = Duration.ofSeconds(2) + + open var backoff = Duration.ofSeconds(2) protected set private val maxBackoff = Duration.ofMinutes(1) diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index 18d39f9..7d51534 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -15,7 +15,7 @@ import kotlin.coroutines.cancellation.CancellationException * @param T The type of result produced by processing the task. * @param reporter An instance of [TaskReporter] for reporting task status and events. */ -abstract class TaskListener(val taskType: TaskType): TaskListenerImplementation { +abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation { init { TaskListenerRegistry.registerListener(this) diff --git a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt index a790a13..a9e9018 100644 --- a/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/EventDispatcherTest.kt @@ -52,7 +52,7 @@ class EventDispatcherTest: TestBase() { assertNotNull(produced) val event = produced!!.toEvent() - assertEquals(trigger.eventId, event.metadata.derivedFromId) + assertEquals(trigger.eventId, event!!.metadata.derivedFromId) assertTrue(event is DerivedEvent) } @@ -62,9 +62,10 @@ class EventDispatcherTest: TestBase() { val trigger = TriggerEvent() val derived = DerivedEvent().derivedOf(trigger).toPersisted(1L, LocalDateTime.now()) - eventStore.persist(derived.toEvent()) // simulate prior production - dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived.toEvent())) + eventStore.persist(derived!!.toEvent()!!) // simulate prior production + + dispatcher.dispatch(trigger.referenceId, listOf(trigger, derived!!.toEvent()!!)) assertEquals(1, eventStore.all().size) // no new event produced } @@ -86,7 +87,7 @@ class EventDispatcherTest: TestBase() { val trigger = TriggerEvent() dispatcher.dispatch(trigger.referenceId, listOf(trigger)) - val replayContext = listOf(trigger) + eventStore.all().map { it.toEvent() } + val replayContext = listOf(trigger) + eventStore.all().mapNotNull { it.toEvent() } dispatcher.dispatch(trigger.referenceId, replayContext) diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt index 1dbc5ea..bc87180 100644 --- a/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt +++ b/src/test/kotlin/no/iktdev/eventi/InMemoryEventStore.kt @@ -19,7 +19,7 @@ class InMemoryEventStore : EventStore { override fun persist(event: Event) { val persistedEvent = event.toPersisted(nextId++, LocalDateTime.now()) - persisted += persistedEvent + persisted += persistedEvent!! } fun all(): List = persisted diff --git a/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt index 86eb8c9..6c0602e 100644 --- a/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt +++ b/src/test/kotlin/no/iktdev/eventi/InMemoryTaskStore.kt @@ -15,7 +15,7 @@ open class InMemoryTaskStore : TaskStore { override fun persist(task: Task) { val persistedTask = task.toPersisted(nextId++) - tasks += persistedTask + tasks += persistedTask!! } override fun findByTaskId(taskId: UUID) = tasks.find { it.taskId == taskId } diff --git a/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt index 7b2628e..d3dff2c 100644 --- a/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/ZDSTest.kt @@ -32,7 +32,7 @@ class ZDSTest { val echo = EchoEvent("hello") val persisted = echo.toPersisted(id = 1L) - val restored = persisted.toEvent() + val restored = persisted!!.toEvent() assert(restored is EchoEvent) assert((restored as EchoEvent).data == "hello") @@ -53,7 +53,7 @@ class ZDSTest { val persisted = task.toPersisted(id = 1L) - val restored = persisted.toTask() + val restored = persisted!!.toTask() assert(restored is TestTask) assert((restored as TestTask).data == "Potato") assert(restored.metadata.created == task.metadata.created)