Tests + failure/canceled event producer

This commit is contained in:
Brage Skjønborg 2026-01-31 19:03:31 +01:00
parent 289ee88be0
commit e129258f39
4 changed files with 133 additions and 8 deletions

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.jetbrains.annotations.VisibleForTesting import org.jetbrains.annotations.VisibleForTesting
import java.util.UUID import java.util.UUID
import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.cancellation.CancellationException
@ -88,14 +89,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
this@TaskListener.reporter = null this@TaskListener.reporter = null
} }
} }
return true return true
} }
abstract fun createIncompleteStateTaskEvent(task: Task, status: TaskStatus, exception: Exception? = null): Event
override fun onError(task: Task, exception: Exception) { override fun onError(task: Task, exception: Exception) {
reporter?.log(task.taskId, "Error processing task: ${exception.message}") reporter?.log(task.taskId, "Error processing task: ${exception.message}")
exception.printStackTrace() exception.printStackTrace()
reporter?.markFailed(task.referenceId, task.taskId) reporter?.markFailed(task.referenceId, task.taskId)
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Failed, exception))
} }
override fun onComplete(task: Task, result: Event?) { override fun onComplete(task: Task, result: Event?) {
@ -111,6 +114,7 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
currentJob?.cancel() currentJob?.cancel()
heartbeatRunner?.cancel() heartbeatRunner?.cancel()
currentTask = null currentTask = null
reporter!!.publishEvent(createIncompleteStateTaskEvent(task, TaskStatus.Cancelled))
} }
} }

View File

@ -5,6 +5,7 @@ import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
@ -19,6 +20,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -32,6 +42,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -45,6 +64,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
@ -57,6 +85,15 @@ class TaskListenerRegistryTest {
override fun getWorkerId(): String { override fun getWorkerId(): String {
TODO("Not yet implemented") TODO("Not yet implemented")
} }
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
TODO("Not yet implemented")
}
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
TODO("Not yet implemented") TODO("Not yet implemented")
} }

View File

@ -9,6 +9,7 @@ import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield import kotlinx.coroutines.yield
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
@ -65,6 +66,14 @@ class TaskListenerTest {
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -112,11 +121,21 @@ class TaskListenerTest {
var heartbeatStarted: Job? = null var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId(): String {
return UUID.randomUUID().toString()
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun getWorkerId() = "worker"
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
// Start heartbeat // Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
@ -183,6 +202,14 @@ class TaskListenerTest {
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -262,9 +289,17 @@ class TaskListenerTest {
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event {
onTaskCalled = true onTaskCalled = true
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
@ -321,6 +356,14 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -360,6 +403,14 @@ class TaskListenerTest {
fun acceptReturnsFalseWhenUnsupported() = runTest { fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = false override fun supports(task: Task) = false
override suspend fun onTask(task: Task): Event? = error("Should not be called") override suspend fun onTask(task: Task): Event? = error("Should not be called")
} }
@ -386,6 +437,14 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -427,6 +486,14 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
@ -466,10 +533,10 @@ class TaskListenerTest {
@Test @Test
@DisplayName(""" @DisplayName("""
Når listener prosesserer to tasks sekvensielt Når listener prosesserer to tasks sekvensielt
Hvis cleanup fungerer riktig Hvis cleanup fungerer riktig
skal ingen state lekke mellom tasks skal ingen state lekke mellom tasks
""") """)
fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest { fun listenerHandlesSequentialTasksWithoutLeakingState() = runTest {
val started1 = CompletableDeferred<Unit>() val started1 = CompletableDeferred<Unit>()
val finish1 = CompletableDeferred<Unit>() val finish1 = CompletableDeferred<Unit>()
@ -482,6 +549,14 @@ Så skal ingen state lekke mellom tasks
var callCount = 0 var callCount = 0
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event { override suspend fun onTask(task: Task): Event {

View File

@ -10,6 +10,7 @@ import no.iktdev.eventi.TestBase
import no.iktdev.eventi.events.EventTypeRegistry import no.iktdev.eventi.events.EventTypeRegistry
import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.stores.TaskStore import no.iktdev.eventi.stores.TaskStore
import no.iktdev.eventi.testUtil.multiply import no.iktdev.eventi.testUtil.multiply
import no.iktdev.eventi.testUtil.wipe import no.iktdev.eventi.testUtil.wipe
@ -72,6 +73,14 @@ class TaskPollerImplementationTest : TestBase() {
fun getJob() = currentJob fun getJob() = currentJob
override fun getWorkerId() = this.javaClass.simpleName override fun getWorkerId() = this.javaClass.simpleName
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = task is EchoTask override fun supports(task: Task) = task is EchoTask
override suspend fun onTask(task: Task): Event { override suspend fun onTask(task: Task): Event {