diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index 58519fb..b24fab8 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -4,7 +4,6 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.currentCoroutineContext import kotlinx.coroutines.delay import kotlinx.coroutines.isActive import kotlinx.coroutines.launch @@ -68,7 +67,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta currentJob = getDispatcherForTask(task).launch { try { val result = onTask(task) - reporter.markConsumed(task.taskId) + reporter.markCompleted(task.taskId) onComplete(task, result) } catch (e: CancellationException) { onCancelled() @@ -89,11 +88,11 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta override fun onError(task: Task, exception: Exception) { reporter?.log(task.taskId, "Error processing task: ${exception.message}") exception.printStackTrace() - reporter?.markConsumed(task.taskId) + reporter?.markCompleted(task.taskId) } override fun onComplete(task: Task, result: Event?) { - reporter!!.markConsumed(task.taskId) + reporter!!.markCompleted(task.taskId) reporter!!.log(task.taskId, "Task completed successfully.") result?.let { reporter!!.publishEvent(result) @@ -126,7 +125,8 @@ interface TaskListenerImplementation { interface TaskReporter { fun markClaimed(taskId: UUID, workerId: String) fun updateLastSeen(taskId: UUID) - fun markConsumed(taskId: UUID) + fun markCompleted(taskId: UUID) + fun markFailed(taskId: UUID) fun updateProgress(taskId: UUID, progress: Int) fun log(taskId: UUID, message: String) fun publishEvent(event: Event) diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt index 5a6657d..665e58e 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt @@ -34,7 +34,11 @@ class TaskListenerTest { claimed = true } - override fun markConsumed(taskId: UUID) { + override fun markCompleted(taskId: UUID) { + consumed = true + } + + override fun markFailed(taskId: UUID) { consumed = true } diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt index d4e7036..98abd74 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskPollerImplementationTest.kt @@ -36,7 +36,9 @@ class TaskPollerImplementationTest : TestBase() { object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) {