From e129258f39d098b44e795840bab7d36455c5b700 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sat, 31 Jan 2026 19:03:31 +0100 Subject: [PATCH] Tests + failure/canceled event producer --- .../no/iktdev/eventi/tasks/TaskListener.kt | 6 +- .../eventi/tasks/TaskListenerRegistryTest.kt | 37 ++++++++ .../iktdev/eventi/tasks/TaskListenerTest.kt | 89 +++++++++++++++++-- .../tasks/TaskPollerImplementationTest.kt | 9 ++ 4 files changed, 133 insertions(+), 8 deletions(-) diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index ee89fde..ce02b29 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task +import no.iktdev.eventi.models.store.TaskStatus import org.jetbrains.annotations.VisibleForTesting import java.util.UUID import kotlin.coroutines.cancellation.CancellationException @@ -88,14 +89,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta this@TaskListener.reporter = null } } - return true } + abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event + override fun onError(task: Task, exception: Exception) { reporter?.log(task.taskId, "Error processing task: ${exception.message}") exception.printStackTrace() reporter?.markFailed(task.referenceId, task.taskId) + reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception)) } override fun onComplete(task: Task, result: Event?) { @@ -111,6 +114,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta currentJob?.cancel() heartbeatRunner?.cancel() currentTask = null + reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled)) } } diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistryTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistryTest.kt index dfadac7..e84c348 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistryTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerRegistryTest.kt @@ -5,6 +5,7 @@ import no.iktdev.eventi.events.EventListener import no.iktdev.eventi.events.EventListenerRegistry import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task +import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.testUtil.wipe import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Assertions.* @@ -19,6 +20,15 @@ class TaskListenerRegistryTest { override fun getWorkerId(): String { TODO("Not yet implemented") } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + TODO("Not yet implemented") + } + override fun supports(task: Task): Boolean { TODO("Not yet implemented") } @@ -32,6 +42,15 @@ class TaskListenerRegistryTest { override fun getWorkerId(): String { TODO("Not yet implemented") } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + TODO("Not yet implemented") + } + override fun supports(task: Task): Boolean { TODO("Not yet implemented") } @@ -45,6 +64,15 @@ class TaskListenerRegistryTest { override fun getWorkerId(): String { TODO("Not yet implemented") } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + TODO("Not yet implemented") + } + override fun supports(task: Task): Boolean { TODO("Not yet implemented") } @@ -57,6 +85,15 @@ class TaskListenerRegistryTest { override fun getWorkerId(): String { TODO("Not yet implemented") } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + TODO("Not yet implemented") + } + override fun supports(task: Task): Boolean { TODO("Not yet implemented") } diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt index 7bd0cfb..52669c1 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt @@ -9,6 +9,7 @@ import kotlinx.coroutines.test.runTest import kotlinx.coroutines.yield import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task +import no.iktdev.eventi.models.store.TaskStatus import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -65,6 +66,14 @@ class TaskListenerTest { var onTaskCalled = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { @@ -112,11 +121,21 @@ class TaskListenerTest { var heartbeatStarted: Job? = null var heartbeatRan = false + override fun getWorkerId(): String { + return UUID.randomUUID().toString() + } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } - override fun getWorkerId() = "worker" override fun supports(task: Task) = true - override suspend fun onTask(task: Task): Event? { + override suspend fun onTask(task: Task): Event { // Start heartbeat withHeartbeatRunner(10.milliseconds) { @@ -183,6 +202,14 @@ class TaskListenerTest { var heartbeatRan = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { @@ -262,9 +289,17 @@ class TaskListenerTest { var onTaskCalled = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true - override suspend fun onTask(task: Task): Event? { + override suspend fun onTask(task: Task): Event { onTaskCalled = true withHeartbeatRunner(10.milliseconds) { @@ -321,6 +356,14 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { @@ -360,6 +403,14 @@ class TaskListenerTest { fun acceptReturnsFalseWhenUnsupported() = runTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = false override suspend fun onTask(task: Task): Event? = error("Should not be called") } @@ -386,6 +437,14 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { @@ -427,6 +486,14 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { @@ -466,10 +533,10 @@ class TaskListenerTest { @Test @DisplayName(""" -Når listener prosesserer to tasks sekvensielt -Hvis cleanup fungerer riktig -Så skal ingen state lekke mellom tasks -""") + Når listener prosesserer to tasks sekvensielt + Hvis cleanup fungerer riktig + Så skal ingen state lekke mellom tasks + """) fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest { val started1 = CompletableDeferred() val finish1 = CompletableDeferred() @@ -482,6 +549,14 @@ Så skal ingen state lekke mellom tasks var callCount = 0 override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event { diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt index c924a08..bfcbbd8 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt @@ -10,6 +10,7 @@ import no.iktdev.eventi.TestBase import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task +import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.stores.TaskStore import no.iktdev.eventi.testUtil.multiply import no.iktdev.eventi.testUtil.wipe @@ -72,6 +73,14 @@ class TaskPollerImplementationTest : TestBase() { fun getJob() = currentJob override fun getWorkerId() = this.javaClass.simpleName + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + return object : Event() {} + } + override fun supports(task: Task) = task is EchoTask override suspend fun onTask(task: Task): Event {