From ffd44d37eb064b69c051f20434ff533a579fc341 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sat, 31 Jan 2026 19:24:55 +0100 Subject: [PATCH] Fixing CI flakyness --- .../iktdev/eventi/tasks/TaskListenerTest.kt | 307 ++++++------------ 1 file changed, 94 insertions(+), 213 deletions(-) diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt index 52669c1..d5d1952 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/TaskListenerTest.kt @@ -2,7 +2,6 @@ package no.iktdev.eventi.tasks import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest @@ -24,32 +23,29 @@ Så skal state, heartbeat og cleanup fungere korrekt """) class TaskListenerTest { - // ------------------------- - // Fake Task + Reporter - // ------------------------- - class FakeTask : Task() class FakeReporter : TaskReporter { var claimed = false - var consumed = false - var logs = mutableListOf() - var events = mutableListOf() + var completed = false + var failed = false + var cancelled = false + val logs = mutableListOf() + val events = mutableListOf() override fun markClaimed(taskId: UUID, workerId: String) { claimed = true } - override fun markCompleted(taskId: UUID) { consumed = true } - override fun markFailed(referenceId: UUID, taskId: UUID) { consumed = true } - override fun markCancelled(referenceId: UUID, taskId: UUID) {} + override fun markCompleted(taskId: UUID) { completed = true } + override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true } + override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true } override fun updateProgress(taskId: UUID, progress: Int) {} override fun publishEvent(event: Event) { events.add(event) } override fun updateLastSeen(taskId: UUID) {} override fun log(taskId: UUID, message: String) { logs.add(message) } } - // ------------------------- - // Tests - // ------------------------- - + // --------------------------------------------------------- + // 1 — Heartbeat starter og stopper riktig + // --------------------------------------------------------- @Test @DisplayName(""" Når onTask starter heartbeat-runner @@ -59,20 +55,14 @@ class TaskListenerTest { fun heartbeatStartsAndStopsCorrectly() = runTest { val listener = object : TaskListener() { - var heartbeatStarted: Job? = null - var heartbeatRan: Boolean = false - private set - + var heartbeatRan = false var onTaskCalled = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true @@ -81,25 +71,18 @@ class TaskListenerTest { withHeartbeatRunner(10.milliseconds) { heartbeatRan = true - }.also { heartbeatStarted = it } + } - // Gi heartbeat en sjanse til å kjøre yield() - return object : Event() {} } } val reporter = FakeReporter() - val task = FakeTask() + listener.accept(FakeTask(), reporter) - val accepted = listener.accept(task, reporter) - assertTrue(accepted) + listener.currentJob?.join() - listener.currentJob!!.join() - - assertNotNull(listener.heartbeatStarted) - assertFalse(listener.heartbeatStarted!!.isActive) assertTrue(listener.heartbeatRan) assertNull(listener.heartbeatRunner) assertNull(listener.currentJob) @@ -107,6 +90,9 @@ class TaskListenerTest { assertNull(listener.reporter) } + // --------------------------------------------------------- + // 2 — Heartbeat blokkerer ikke annen jobb + // --------------------------------------------------------- @Test @DisplayName(""" Når heartbeat kjører i bakgrunnen @@ -119,72 +105,51 @@ class TaskListenerTest { val listener = object : TaskListener() { - var heartbeatStarted: Job? = null var heartbeatRan = false - override fun getWorkerId(): String { - return UUID.randomUUID().toString() - } + + override fun getWorkerId() = "worker" override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event { - - // Start heartbeat withHeartbeatRunner(10.milliseconds) { heartbeatRan = true - }.also { heartbeatStarted = it } + } - // Simuler annen coroutine-oppgave (VideoTaskListener/Converter) launch { delay(30) otherWorkCompleted.complete(Unit) } - // ⭐ Ikke fullfør onTask før testen sier det allowFinish.await() return object : Event() {} } } val reporter = FakeReporter() - val task = FakeTask() + listener.accept(FakeTask(), reporter) - listener.accept(task, reporter) - - // Vent på annen jobb otherWorkCompleted.await() - // ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd + assertTrue(listener.heartbeatRan) assertNotNull(listener.currentJob) assertTrue(listener.currentJob!!.isActive) - // Heartbeat kjørte - assertNotNull(listener.heartbeatStarted) - assertTrue(listener.heartbeatRan) - - // ⭐ Nå lar vi onTask fullføre allowFinish.complete(Unit) + listener.currentJob?.join() - // Vent på listener-jobben - listener.currentJob!!.join() - - // Heartbeat ble kansellert - assertFalse(listener.heartbeatStarted!!.isActive) - - // Cleanup assertNull(listener.heartbeatRunner) assertNull(listener.currentJob) assertNull(listener.currentTask) } + // --------------------------------------------------------- + // 3 — Heartbeat + CPU + IO arbeid + // --------------------------------------------------------- @Test @DisplayName(""" Når heartbeat kjører og flere parallelle jobber startes @@ -198,81 +163,56 @@ class TaskListenerTest { val listener = object : TaskListener() { - var heartbeatStarted: Job? = null var heartbeatRan = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { - - // Start heartbeat withHeartbeatRunner(10.milliseconds) { heartbeatRan = true - }.also { heartbeatStarted = it } + } - // Simuler Converter (CPU) launch(Dispatchers.Default) { - repeat(1000) { /* CPU work */ } + repeat(1000) {} converterDone.complete(Unit) } - // Simuler VideoTaskListener (IO) launch(Dispatchers.IO) { delay(40) videoDone.complete(Unit) } - // ⭐ Vent til testen sier "nå kan du fullføre" allowFinish.await() return object : Event() {} } } val reporter = FakeReporter() - val task = FakeTask() + listener.accept(FakeTask(), reporter) - listener.accept(task, reporter) - - // Vent på begge "andre" oppgaver converterDone.await() videoDone.await() - // ⭐ Verifiser at begge faktisk ble fullført - assertTrue(converterDone.isCompleted) - assertTrue(videoDone.isCompleted) - - // ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd - assertNotNull(listener.currentJob) - assertTrue(listener.currentJob!!.isActive) - - // Heartbeat kjørte - assertNotNull(listener.heartbeatStarted) assertTrue(listener.heartbeatRan) + assertNotNull(listener.currentJob) - // ⭐ Nå lar vi onTask fullføre allowFinish.complete(Unit) + listener.currentJob?.join() - // Vent på listener-jobben - listener.currentJob!!.join() - - // Heartbeat ble kansellert - assertFalse(listener.heartbeatStarted!!.isActive) - - // Cleanup assertNull(listener.heartbeatRunner) assertNull(listener.currentJob) assertNull(listener.currentTask) } + // --------------------------------------------------------- + // 4 — Arbeid fullføres, heartbeat kjører + // --------------------------------------------------------- @Test @DisplayName(""" Når onTask gjør ferdig arbeid @@ -284,18 +224,14 @@ class TaskListenerTest { val listener = object : TaskListener() { - var heartbeatStarted: Job? = null var heartbeatRan = false var onTaskCalled = false override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true @@ -304,12 +240,9 @@ class TaskListenerTest { withHeartbeatRunner(10.milliseconds) { heartbeatRan = true - }.also { heartbeatStarted = it } + } - // Simuler arbeid delay(20) - - // ⭐ signaliser at arbeidet er ferdig workCompleted.complete(Unit) return object : Event() {} @@ -317,34 +250,23 @@ class TaskListenerTest { } val reporter = FakeReporter() - val task = FakeTask() + listener.accept(FakeTask(), reporter) - val accepted = listener.accept(task, reporter) - assertTrue(accepted) - - // ⭐ Verifiser at arbeidet faktisk ble fullført workCompleted.await() + listener.currentJob?.join() - // Vent på jobben - listener.currentJob!!.join() - - // onTask ble kalt assertTrue(listener.onTaskCalled) - - // Heartbeat ble startet - assertNotNull(listener.heartbeatStarted) assertTrue(listener.heartbeatRan) - // Heartbeat ble kansellert - assertFalse(listener.heartbeatStarted!!.isActive) - - // Cleanup assertNull(listener.heartbeatRunner) assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.reporter) } + // --------------------------------------------------------- + // 5 — accept() returnerer false når busy + // --------------------------------------------------------- @Test @DisplayName(""" Når listener er opptatt med en task @@ -356,44 +278,34 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { - // Hold jobben i live allowFinish.await() return object : Event() {} } } val reporter = FakeReporter() - val task1 = FakeTask() - val task2 = FakeTask() - // Første task aksepteres - val accepted1 = listener.accept(task1, reporter) - assertTrue(accepted1) + assertTrue(listener.accept(FakeTask(), reporter)) + assertFalse(listener.accept(FakeTask(), reporter)) - // Listener er busy → andre task skal avvises - val accepted2 = listener.accept(task2, reporter) - assertFalse(accepted2) - - // Fullfør første task allowFinish.complete(Unit) - listener.currentJob!!.join() + listener.currentJob?.join() - // Cleanup assertNull(listener.currentJob) assertNull(listener.currentTask) } + // --------------------------------------------------------- + // 6 — accept() returnerer false når unsupported + // --------------------------------------------------------- @Test @DisplayName(""" Når supports() returnerer false @@ -403,29 +315,26 @@ class TaskListenerTest { fun acceptReturnsFalseWhenUnsupported() = runTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = false override suspend fun onTask(task: Task): Event? = error("Should not be called") } val reporter = FakeReporter() - val task = FakeTask() - val accepted = listener.accept(task, reporter) - - assertFalse(accepted) + assertFalse(listener.accept(FakeTask(), reporter)) assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.reporter) } + // --------------------------------------------------------- + // 7 — onError kalles når onTask kaster + // --------------------------------------------------------- @Test @DisplayName(""" Når onTask kaster en exception @@ -437,13 +346,10 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true @@ -458,22 +364,19 @@ class TaskListenerTest { } val reporter = FakeReporter() - val task = FakeTask().newReferenceId() + listener.accept(FakeTask().newReferenceId(), reporter) - listener.accept(task, reporter) - - // Vent på error-path errorLogged.await() - - // ⭐ Vent på at cleanup i finally kjører listener.currentJob?.join() - // Cleanup verifisering assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.heartbeatRunner) } + // --------------------------------------------------------- + // 8 — onCancelled kalles når jobben kanselleres + // --------------------------------------------------------- @Test @DisplayName(""" Når jobben kanselleres mens onTask kjører @@ -486,19 +389,16 @@ class TaskListenerTest { val listener = object : TaskListener() { override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true override suspend fun onTask(task: Task): Event? { allowStart.complete(Unit) - delay(Long.MAX_VALUE) // hold jobben i live + delay(Long.MAX_VALUE) return null } @@ -509,28 +409,22 @@ class TaskListenerTest { } val reporter = FakeReporter() - val task = FakeTask().newReferenceId() + listener.accept(FakeTask().newReferenceId(), reporter) - listener.accept(task, reporter) - - // Vent til onTask har startet allowStart.await() - - // Kanseller jobben listener.currentJob!!.cancel() - // Vent til onCancelled() ble kalt cancelledCalled.await() - - // ⭐ Vent til cleanup i finally har kjørt listener.currentJob?.join() - // Cleanup verifisering assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.heartbeatRunner) } + // --------------------------------------------------------- + // 9 — Sekvensiell kjøring uten state‑lekkasje + // --------------------------------------------------------- @Test @DisplayName(""" Når listener prosesserer to tasks sekvensielt @@ -549,13 +443,10 @@ class TaskListenerTest { var callCount = 0 override fun getWorkerId() = "worker" + override fun createIncompleteStateTaskEvent( - task: Task, - status: TaskStatus, - exception: Exception? - ): Event { - return object : Event() {} - } + task: Task, status: TaskStatus, exception: Exception? + ) = object : Event() {} override fun supports(task: Task) = true @@ -563,8 +454,8 @@ class TaskListenerTest { callCount++ if (callCount == 1) { - started1.complete(Unit) // signal: coroutine har startet - finish1.await() // vent til testen sier "fortsett" + started1.complete(Unit) + finish1.await() } if (callCount == 2) { @@ -578,34 +469,24 @@ class TaskListenerTest { val reporter = FakeReporter() - // --- Task 1 --- - val task1 = FakeTask() - listener.accept(task1, reporter) + listener.accept(FakeTask(), reporter) + started1.await() + finish1.complete(Unit) + listener.currentJob?.join() - started1.await() // garanterer at coroutine kjører - finish1.complete(Unit) // la coroutine fullføre - listener.currentJob!!.join() - - // Verifiser cleanup assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.heartbeatRunner) - // --- Task 2 --- - val task2 = FakeTask() - listener.accept(task2, reporter) - + listener.accept(FakeTask(), reporter) started2.await() finish2.complete(Unit) - listener.currentJob!!.join() + listener.currentJob?.join() - // Verifiser cleanup igjen assertNull(listener.currentJob) assertNull(listener.currentTask) assertNull(listener.heartbeatRunner) - // onTask ble kalt to ganger assertEquals(2, listener.callCount) } - }