Adding failed

This commit is contained in:
Brage Skjønborg 2026-01-29 17:39:57 +01:00
parent a9a06a41f9
commit b62079f65f
3 changed files with 13 additions and 7 deletions

View File

@ -4,7 +4,6 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -68,7 +67,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob = getDispatcherForTask(task).launch { currentJob = getDispatcherForTask(task).launch {
try { try {
val result = onTask(task) val result = onTask(task)
reporter.markConsumed(task.taskId) reporter.markCompleted(task.taskId)
onComplete(task, result) onComplete(task, result)
} catch (e: CancellationException) { } catch (e: CancellationException) {
onCancelled() onCancelled()
@ -89,11 +88,11 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
override fun onError(task: Task, exception: Exception) { override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}") reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace() exception.printStackTrace()
reporter?.markConsumed(task.taskId) reporter?.markCompleted(task.taskId)
} }
override fun onComplete(task: Task, result: Event?) { override fun onComplete(task: Task, result: Event?) {
reporter!!.markConsumed(task.taskId) reporter!!.markCompleted(task.taskId)
reporter!!.log(task.taskId, "Task completed successfully.") reporter!!.log(task.taskId, "Task completed successfully.")
result?.let { result?.let {
reporter!!.publishEvent(result) reporter!!.publishEvent(result)
@ -126,7 +125,8 @@ interface TaskListenerImplementation {
interface TaskReporter { interface TaskReporter {
fun markClaimed(taskId: UUID, workerId: String) fun markClaimed(taskId: UUID, workerId: String)
fun updateLastSeen(taskId: UUID) fun updateLastSeen(taskId: UUID)
fun markConsumed(taskId: UUID) fun markCompleted(taskId: UUID)
fun markFailed(taskId: UUID)
fun updateProgress(taskId: UUID, progress: Int) fun updateProgress(taskId: UUID, progress: Int)
fun log(taskId: UUID, message: String) fun log(taskId: UUID, message: String)
fun publishEvent(event: Event) fun publishEvent(event: Event)

View File

@ -34,7 +34,11 @@ class TaskListenerTest {
claimed = true claimed = true
} }
override fun markConsumed(taskId: UUID) { override fun markCompleted(taskId: UUID) {
consumed = true
}
override fun markFailed(taskId: UUID) {
consumed = true consumed = true
} }

View File

@ -36,7 +36,9 @@ class TaskPollerImplementationTest : TestBase() {
object : TaskReporter { object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {} override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {} override fun updateLastSeen(taskId: UUID) {}
override fun markConsumed(taskId: UUID) {} override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {} override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {} override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) { override fun publishEvent(event: Event) {