diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index d11bda7..ecc3a62 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -15,7 +15,7 @@ import kotlin.coroutines.cancellation.CancellationException * @param T The type of result produced by processing the task. * @param reporter An instance of [TaskReporter] for reporting task status and events. */ -abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation { +abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): TaskListenerImplementation { init { TaskListenerRegistry.registerListener(this) @@ -69,9 +69,12 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): reporter?.markConsumed(task.taskId) } - override fun onComplete(task: Task, result: T?) { - reporter?.markConsumed(task.taskId) - reporter?.log(task.taskId, "Task completed successfully.") + override fun onComplete(task: Task, result: Event?) { + reporter!!.markConsumed(task.taskId) + reporter!!.log(task.taskId, "Task completed successfully.") + result?.let { + reporter!!.publishEvent(result) + } } override fun onCancelled() { @@ -88,11 +91,11 @@ enum class TaskType { } -interface TaskListenerImplementation { +interface TaskListenerImplementation { fun supports(task: Task): Boolean fun accept(task: Task, reporter: TaskReporter): Boolean - suspend fun onTask(task: Task): T - fun onComplete(task: Task, result: T?) + suspend fun onTask(task: Task): Event? + fun onComplete(task: Task, result: Event?) fun onError(task: Task, exception: Exception) fun onCancelled() } diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt index 5844a4d..11f0213 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt @@ -58,7 +58,7 @@ class AbstractTaskPollerTest : TestBase() { open class EchoListener : TaskListener(TaskType.MIXED) { - var result: String? = null + var result: Event? = null override fun getWorkerId() = this.javaClass.simpleName @@ -66,17 +66,17 @@ class AbstractTaskPollerTest : TestBase() { return task is EchoTask } - override suspend fun onTask(task: Task): String { + override suspend fun onTask(task: Task): Event { if (task is EchoTask) { - return task.data + " Potetmos" + return EchoEvent(task.data + " Potetmos").producedFrom(task) } throw IllegalArgumentException("Unsupported task type: ${task::class.java}") } - override fun onComplete(task: Task, result: String?) { + override fun onComplete(task: Task, result: Event?) { super.onComplete(task, result) this.result = result; - reporter?.publishEvent(EchoEvent(result!!).producedFrom(task)) + reporter?.publishEvent(result!!) } } @@ -99,7 +99,7 @@ class AbstractTaskPollerTest : TestBase() { val producedEvent = eventDeferred.await() assertThat(producedEvent).isNotNull assertThat(producedEvent!!.metadata.derivedFromId).isEqualTo(task.taskId) - assertThat(listener.result).isEqualTo("Hello Potetmos") + assertThat((listener.result as EchoEvent).data).isEqualTo("Hello Potetmos") } @OptIn(ExperimentalCoroutinesApi::class) @@ -120,7 +120,7 @@ class AbstractTaskPollerTest : TestBase() { advanceUntilIdle() assertEquals(initialBackoff, poller.backoff) - assertEquals("Hello Potetmos", listener.result) + assertEquals("Hello Potetmos", (listener.result as EchoEvent).data) } @Test