heartbeat function
This commit is contained in:
parent
b7552dbc67
commit
b7c9e2827a
@ -3,11 +3,16 @@ package no.iktdev.eventi.tasks
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.currentCoroutineContext
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.isActive
|
||||
import kotlinx.coroutines.launch
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.Task
|
||||
import java.util.UUID
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
import kotlin.time.Duration
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
|
||||
/**
|
||||
* Abstract base class for handling tasks with asynchronous processing and reporting.
|
||||
@ -39,6 +44,16 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
||||
}
|
||||
}
|
||||
|
||||
private var heartbeatRunner: Job? = null
|
||||
suspend fun withHeartbeatRunner(interval: Duration = 5.minutes, block: () -> Unit): Job {
|
||||
return CoroutineScope(currentCoroutineContext()).launch {
|
||||
while (isActive) {
|
||||
block()
|
||||
delay(interval)
|
||||
}
|
||||
}.also { heartbeatRunner = it }
|
||||
}
|
||||
|
||||
override fun accept(task: Task, reporter: TaskReporter): Boolean {
|
||||
if (isBusy || !supports(task)) return false
|
||||
this.reporter = reporter
|
||||
@ -55,6 +70,8 @@ abstract class TaskListener(val taskType: TaskType = TaskType.CPU_INTENSIVE): Ta
|
||||
} catch (e: Exception) {
|
||||
onError(task, e)
|
||||
} finally {
|
||||
heartbeatRunner?.cancel()
|
||||
heartbeatRunner = null
|
||||
currentJob = null
|
||||
currentTask = null
|
||||
this@TaskListener.reporter = null
|
||||
|
||||
@ -2,9 +2,14 @@ package no.iktdev.eventi.tasks
|
||||
|
||||
import io.mockk.mockk
|
||||
import kotlinx.coroutines.CompletableDeferred
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.test.StandardTestDispatcher
|
||||
import kotlinx.coroutines.test.advanceTimeBy
|
||||
import kotlinx.coroutines.test.advanceUntilIdle
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import kotlinx.coroutines.test.setMain
|
||||
import no.iktdev.eventi.InMemoryTaskStore
|
||||
import no.iktdev.eventi.TestBase
|
||||
import no.iktdev.eventi.events.EventListener
|
||||
@ -20,6 +25,10 @@ import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.time.Duration
|
||||
import java.util.UUID
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.minutes
|
||||
import kotlin.time.Duration.Companion.nanoseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
|
||||
class AbstractTaskPollerTest : TestBase() {
|
||||
|
||||
@ -60,6 +69,8 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
open class EchoListener : TaskListener(TaskType.MIXED) {
|
||||
var result: Event? = null
|
||||
|
||||
fun getJob() = currentJob
|
||||
|
||||
override fun getWorkerId() = this.javaClass.simpleName
|
||||
|
||||
override fun supports(task: Task): Boolean {
|
||||
@ -67,6 +78,9 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
}
|
||||
|
||||
override suspend fun onTask(task: Task): Event {
|
||||
withHeartbeatRunner(1.seconds) {
|
||||
println("Heartbeat")
|
||||
}
|
||||
if (task is EchoTask) {
|
||||
return EchoEvent(task.data + " Potetmos").producedFrom(task)
|
||||
}
|
||||
@ -79,6 +93,15 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
reporter?.publishEvent(result!!)
|
||||
}
|
||||
|
||||
override fun onError(task: Task, exception: Exception) {
|
||||
exception.printStackTrace()
|
||||
super.onError(task, exception)
|
||||
}
|
||||
|
||||
override fun onCancelled() {
|
||||
super.onCancelled()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
@ -118,6 +141,9 @@ class AbstractTaskPollerTest : TestBase() {
|
||||
taskStore.persist(task)
|
||||
|
||||
poller.pollOnce()
|
||||
|
||||
listener.getJob()?.join()
|
||||
advanceTimeBy(1.minutes)
|
||||
advanceUntilIdle()
|
||||
|
||||
assertEquals(initialBackoff, poller.backoff)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user