Fixing CI flakyness

This commit is contained in:
Brage Skjønborg 2026-01-31 19:24:55 +01:00
parent e129258f39
commit ffd44d37eb

View File

@ -2,7 +2,6 @@ package no.iktdev.eventi.tasks
import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
@ -24,32 +23,29 @@ Så skal state, heartbeat og cleanup fungere korrekt
""") """)
class TaskListenerTest { class TaskListenerTest {
// -------------------------
// Fake Task + Reporter
// -------------------------
class FakeTask : Task() class FakeTask : Task()
class FakeReporter : TaskReporter { class FakeReporter : TaskReporter {
var claimed = false var claimed = false
var consumed = false var completed = false
var logs = mutableListOf<String>() var failed = false
var events = mutableListOf<Event>() var cancelled = false
val logs = mutableListOf<String>()
val events = mutableListOf<Event>()
override fun markClaimed(taskId: UUID, workerId: String) { claimed = true } override fun markClaimed(taskId: UUID, workerId: String) { claimed = true }
override fun markCompleted(taskId: UUID) { consumed = true } override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { consumed = true } override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun markCancelled(referenceId: UUID, taskId: UUID) { cancelled = true }
override fun updateProgress(taskId: UUID, progress: Int) {} override fun updateProgress(taskId: UUID, progress: Int) {}
override fun publishEvent(event: Event) { events.add(event) } override fun publishEvent(event: Event) { events.add(event) }
override fun updateLastSeen(taskId: UUID) {} override fun updateLastSeen(taskId: UUID) {}
override fun log(taskId: UUID, message: String) { logs.add(message) } override fun log(taskId: UUID, message: String) { logs.add(message) }
} }
// ------------------------- // ---------------------------------------------------------
// Tests // 1 — Heartbeat starter og stopper riktig
// ------------------------- // ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask starter heartbeat-runner Når onTask starter heartbeat-runner
@ -59,20 +55,14 @@ class TaskListenerTest {
fun heartbeatStartsAndStopsCorrectly() = runTest { fun heartbeatStartsAndStopsCorrectly() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null var heartbeatRan = false
var heartbeatRan: Boolean = false
private set
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
@ -81,25 +71,18 @@ class TaskListenerTest {
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Gi heartbeat en sjanse til å kjøre
yield() yield()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter) listener.currentJob?.join()
assertTrue(accepted)
listener.currentJob!!.join()
assertNotNull(listener.heartbeatStarted)
assertFalse(listener.heartbeatStarted!!.isActive)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
@ -107,6 +90,9 @@ class TaskListenerTest {
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 2 — Heartbeat blokkerer ikke annen jobb
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når heartbeat kjører i bakgrunnen Når heartbeat kjører i bakgrunnen
@ -119,72 +105,51 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId(): String {
return UUID.randomUUID().toString() override fun getWorkerId() = "worker"
}
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event { override suspend fun onTask(task: Task): Event {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler annen coroutine-oppgave (VideoTaskListener/Converter)
launch { launch {
delay(30) delay(30)
otherWorkCompleted.complete(Unit) otherWorkCompleted.complete(Unit)
} }
// ⭐ Ikke fullfør onTask før testen sier det
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på annen jobb
otherWorkCompleted.await() otherWorkCompleted.await()
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob) assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive) assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 3 — Heartbeat + CPU + IO arbeid
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når heartbeat kjører og flere parallelle jobber startes Når heartbeat kjører og flere parallelle jobber startes
@ -198,81 +163,56 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Start heartbeat
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler Converter (CPU)
launch(Dispatchers.Default) { launch(Dispatchers.Default) {
repeat(1000) { /* CPU work */ } repeat(1000) {}
converterDone.complete(Unit) converterDone.complete(Unit)
} }
// Simuler VideoTaskListener (IO)
launch(Dispatchers.IO) { launch(Dispatchers.IO) {
delay(40) delay(40)
videoDone.complete(Unit) videoDone.complete(Unit)
} }
// ⭐ Vent til testen sier "nå kan du fullføre"
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
listener.accept(task, reporter)
// Vent på begge "andre" oppgaver
converterDone.await() converterDone.await()
videoDone.await() videoDone.await()
// ⭐ Verifiser at begge faktisk ble fullført
assertTrue(converterDone.isCompleted)
assertTrue(videoDone.isCompleted)
// ⭐ Nå er onTask fortsatt i live, cleanup har ikke skjedd
assertNotNull(listener.currentJob)
assertTrue(listener.currentJob!!.isActive)
// Heartbeat kjørte
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
assertNotNull(listener.currentJob)
// ⭐ Nå lar vi onTask fullføre
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob?.join()
// Vent på listener-jobben
listener.currentJob!!.join()
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 4 — Arbeid fullføres, heartbeat kjører
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask gjør ferdig arbeid Når onTask gjør ferdig arbeid
@ -284,18 +224,14 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
var heartbeatStarted: Job? = null
var heartbeatRan = false var heartbeatRan = false
var onTaskCalled = false var onTaskCalled = false
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
@ -304,12 +240,9 @@ class TaskListenerTest {
withHeartbeatRunner(10.milliseconds) { withHeartbeatRunner(10.milliseconds) {
heartbeatRan = true heartbeatRan = true
}.also { heartbeatStarted = it } }
// Simuler arbeid
delay(20) delay(20)
// ⭐ signaliser at arbeidet er ferdig
workCompleted.complete(Unit) workCompleted.complete(Unit)
return object : Event() {} return object : Event() {}
@ -317,34 +250,23 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask() listener.accept(FakeTask(), reporter)
val accepted = listener.accept(task, reporter)
assertTrue(accepted)
// ⭐ Verifiser at arbeidet faktisk ble fullført
workCompleted.await() workCompleted.await()
listener.currentJob?.join()
// Vent på jobben
listener.currentJob!!.join()
// onTask ble kalt
assertTrue(listener.onTaskCalled) assertTrue(listener.onTaskCalled)
// Heartbeat ble startet
assertNotNull(listener.heartbeatStarted)
assertTrue(listener.heartbeatRan) assertTrue(listener.heartbeatRan)
// Heartbeat ble kansellert
assertFalse(listener.heartbeatStarted!!.isActive)
// Cleanup
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 5 — accept() returnerer false når busy
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når listener er opptatt med en task Når listener er opptatt med en task
@ -356,44 +278,34 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
// Hold jobben i live
allowFinish.await() allowFinish.await()
return object : Event() {} return object : Event() {}
} }
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task1 = FakeTask()
val task2 = FakeTask()
// Første task aksepteres assertTrue(listener.accept(FakeTask(), reporter))
val accepted1 = listener.accept(task1, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertTrue(accepted1)
// Listener er busy → andre task skal avvises
val accepted2 = listener.accept(task2, reporter)
assertFalse(accepted2)
// Fullfør første task
allowFinish.complete(Unit) allowFinish.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
} }
// ---------------------------------------------------------
// 6 — accept() returnerer false når unsupported
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når supports() returnerer false Når supports() returnerer false
@ -403,29 +315,26 @@ class TaskListenerTest {
fun acceptReturnsFalseWhenUnsupported() = runTest { fun acceptReturnsFalseWhenUnsupported() = runTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = false override fun supports(task: Task) = false
override suspend fun onTask(task: Task): Event? = error("Should not be called") override suspend fun onTask(task: Task): Event? = error("Should not be called")
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask()
val accepted = listener.accept(task, reporter) assertFalse(listener.accept(FakeTask(), reporter))
assertFalse(accepted)
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.reporter) assertNull(listener.reporter)
} }
// ---------------------------------------------------------
// 7 — onError kalles når onTask kaster
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når onTask kaster en exception Når onTask kaster en exception
@ -437,13 +346,10 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
@ -458,22 +364,19 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask().newReferenceId() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent på error-path
errorLogged.await() errorLogged.await()
// ⭐ Vent på at cleanup i finally kjører
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 8 — onCancelled kalles når jobben kanselleres
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når jobben kanselleres mens onTask kjører Når jobben kanselleres mens onTask kjører
@ -486,19 +389,16 @@ class TaskListenerTest {
val listener = object : TaskListener() { val listener = object : TaskListener() {
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
allowStart.complete(Unit) allowStart.complete(Unit)
delay(Long.MAX_VALUE) // hold jobben i live delay(Long.MAX_VALUE)
return null return null
} }
@ -509,28 +409,22 @@ class TaskListenerTest {
} }
val reporter = FakeReporter() val reporter = FakeReporter()
val task = FakeTask().newReferenceId() listener.accept(FakeTask().newReferenceId(), reporter)
listener.accept(task, reporter)
// Vent til onTask har startet
allowStart.await() allowStart.await()
// Kanseller jobben
listener.currentJob!!.cancel() listener.currentJob!!.cancel()
// Vent til onCancelled() ble kalt
cancelledCalled.await() cancelledCalled.await()
// ⭐ Vent til cleanup i finally har kjørt
listener.currentJob?.join() listener.currentJob?.join()
// Cleanup verifisering
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
} }
// ---------------------------------------------------------
// 9 — Sekvensiell kjøring uten statelekkasje
// ---------------------------------------------------------
@Test @Test
@DisplayName(""" @DisplayName("""
Når listener prosesserer to tasks sekvensielt Når listener prosesserer to tasks sekvensielt
@ -549,13 +443,10 @@ class TaskListenerTest {
var callCount = 0 var callCount = 0
override fun getWorkerId() = "worker" override fun getWorkerId() = "worker"
override fun createIncompleteStateTaskEvent( override fun createIncompleteStateTaskEvent(
task: Task, task: Task, status: TaskStatus, exception: Exception?
status: TaskStatus, ) = object : Event() {}
exception: Exception?
): Event {
return object : Event() {}
}
override fun supports(task: Task) = true override fun supports(task: Task) = true
@ -563,8 +454,8 @@ class TaskListenerTest {
callCount++ callCount++
if (callCount == 1) { if (callCount == 1) {
started1.complete(Unit) // signal: coroutine har startet started1.complete(Unit)
finish1.await() // vent til testen sier "fortsett" finish1.await()
} }
if (callCount == 2) { if (callCount == 2) {
@ -578,34 +469,24 @@ class TaskListenerTest {
val reporter = FakeReporter() val reporter = FakeReporter()
// --- Task 1 --- listener.accept(FakeTask(), reporter)
val task1 = FakeTask() started1.await()
listener.accept(task1, reporter) finish1.complete(Unit)
listener.currentJob?.join()
started1.await() // garanterer at coroutine kjører
finish1.complete(Unit) // la coroutine fullføre
listener.currentJob!!.join()
// Verifiser cleanup
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// --- Task 2 --- listener.accept(FakeTask(), reporter)
val task2 = FakeTask()
listener.accept(task2, reporter)
started2.await() started2.await()
finish2.complete(Unit) finish2.complete(Unit)
listener.currentJob!!.join() listener.currentJob?.join()
// Verifiser cleanup igjen
assertNull(listener.currentJob) assertNull(listener.currentJob)
assertNull(listener.currentTask) assertNull(listener.currentTask)
assertNull(listener.heartbeatRunner) assertNull(listener.heartbeatRunner)
// onTask ble kalt to ganger
assertEquals(2, listener.callCount) assertEquals(2, listener.callCount)
} }
} }