diff --git a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt index 81ffca8..40b771a 100644 --- a/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt +++ b/src/main/kotlin/no/iktdev/eventi/tasks/TaskListener.kt @@ -3,11 +3,16 @@ package no.iktdev.eventi.tasks import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job +import kotlinx.coroutines.currentCoroutineContext +import kotlinx.coroutines.delay +import kotlinx.coroutines.isActive import kotlinx.coroutines.launch import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import java.util.UUID import kotlin.coroutines.cancellation.CancellationException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes /** * Abstract base class for handling tasks with asynchronous processing and reporting. @@ -39,6 +44,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta } } + private var heartbeatRunner: Job? = null + suspend fun withHeartbeatRunner(interval: Duration = 5.minutes, block: () -> Unit): Job { + return CoroutineScope(currentCoroutineContext()).launch { + while (isActive) { + block() + delay(interval) + } + }.also { heartbeatRunner = it } + } + override fun accept(task: Task, reporter: TaskReporter): Boolean { if (isBusy || !supports(task)) return false this.reporter = reporter @@ -55,6 +70,8 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta } catch (e: Exception) { onError(task, e) } finally { + heartbeatRunner?.cancel() + heartbeatRunner = null currentJob = null currentTask = null this@TaskListener.reporter = null diff --git a/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt index e19ca5f..ddfaf73 100644 --- a/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt +++ b/src/test/kotlin/no/iktdev/eventi/tasks/AbstractTaskPollerTest.kt @@ -2,9 +2,14 @@ package no.iktdev.eventi.tasks import io.mockk.mockk import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.setMain import no.iktdev.eventi.InMemoryTaskStore import no.iktdev.eventi.TestBase import no.iktdev.eventi.events.EventListener @@ -20,6 +25,10 @@ import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import java.time.Duration import java.util.UUID +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.minutes +import kotlin.time.Duration.Companion.nanoseconds +import kotlin.time.Duration.Companion.seconds class AbstractTaskPollerTest : TestBase() { @@ -60,6 +69,8 @@ class AbstractTaskPollerTest : TestBase() { open class EchoListener : TaskListener(TaskType.MIXED) { var result: Event? = null + fun getJob() = currentJob + override fun getWorkerId() = this.javaClass.simpleName override fun supports(task: Task): Boolean { @@ -67,6 +78,9 @@ class AbstractTaskPollerTest : TestBase() { } override suspend fun onTask(task: Task): Event { + withHeartbeatRunner(1.seconds) { + println("Heartbeat") + } if (task is EchoTask) { return EchoEvent(task.data + " Potetmos").producedFrom(task) } @@ -79,6 +93,15 @@ class AbstractTaskPollerTest : TestBase() { reporter?.publishEvent(result!!) } + override fun onError(task: Task, exception: Exception) { + exception.printStackTrace() + super.onError(task, exception) + } + + override fun onCancelled() { + super.onCancelled() + } + } @OptIn(ExperimentalCoroutinesApi::class) @@ -118,6 +141,9 @@ class AbstractTaskPollerTest : TestBase() { taskStore.persist(task) poller.pollOnce() + + listener.getJob()?.join() + advanceTimeBy(1.minutes) advanceUntilIdle() assertEquals(initialBackoff, poller.backoff)