From 9c6aa304a807112d171f39b5344353685af27c1e Mon Sep 17 00:00:00 2001 From: bskjon Date: Sun, 27 Apr 2025 01:58:08 +0200 Subject: [PATCH] enabled mode --- .../converter/ConverterApplication.kt | 2 +- .../converter/TaskCoordinator.kt | 6 +- .../coordinator/CoordinatorApplication.kt | 2 +- .../CoordinatorEventCoordinator.kt | 6 + .../processer/ProcesserApplication.kt | 2 +- .../processer/TaskCoordinator.kt | 6 +- .../mediaprocessing/processer/Resources.kt | 20 +++ .../processer/ffmpeg/FfmpegRunnerTest.kt | 15 ++ .../progress/FfmpegProgressDecoderTest.kt | 26 ++++ .../src/test/resources/encodeProgress1.txt | 141 ++++++++++++++++++ .../shared/common/TaskCoordinatorBase.kt | 17 ++- .../common/database/cal/RunnerManager.kt | 17 ++- .../shared/common/database/tables/runners.kt | 1 + .../implementations/EventCoordinator.kt | 20 ++- 14 files changed, 271 insertions(+), 10 deletions(-) create mode 100644 apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/Resources.kt create mode 100644 apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunnerTest.kt create mode 100644 apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/progress/FfmpegProgressDecoderTest.kt create mode 100644 apps/processer/src/test/resources/encodeProgress1.txt diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt index b6ccc64b..93d0f5ec 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -50,7 +50,7 @@ fun main(args: Array) { eventsDatabase.createTables(tasks, runners) taskManager = TasksManager(eventsDatabase) - runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName) + runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ConvertApplication::class.java.simpleName) runnerManager.assignRunner() runApplication(*args) diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskCoordinator.kt index 0862730f..7763f2cc 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskCoordinator.kt @@ -19,7 +19,7 @@ class TaskCoordinator(): TaskCoordinatorBase() { override fun onCoordinatorReady() { super.onCoordinatorReady() - runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName) + runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ConvertApplication::class.java.simpleName) runnerManager.assignRunner() } @@ -77,6 +77,10 @@ class TaskCoordinator(): TaskCoordinatorBase() { } } + override fun getEnabledState(): Boolean { + return runnerManager.amIEnabled() + } + interface TaskEvents { fun onCancelOrStopProcess(eventId: String) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index 04194044..0f80ffe4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -104,7 +104,7 @@ fun main(args: Array) { ) storeDatabase.createTables(*tables) - runnerManager = RunnerManager(dataSource = eventDatabase.database, name = CoordinatorApplication::class.java.simpleName) + runnerManager = RunnerManager(dataSource = eventDatabase.database, applicationName = CoordinatorApplication::class.java.simpleName) runnerManager.assignRunner() runApplication(*args) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index 37b585db..cc5d15b5 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -98,4 +98,10 @@ class Coordinator( } return taskMode } + + override fun updateEnabledState(): Boolean { + isEnabled = runnerManager.amIEnabled() + return isEnabled + } + } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index ba8ebc30..8f53d457 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -59,7 +59,7 @@ fun main(args: Array) { taskManager = TasksManager(eventsDatabase) - runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName) + runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ProcesserApplication::class.java.simpleName) runnerManager.assignRunner() runApplication(*args) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskCoordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskCoordinator.kt index 58642971..d39fbe75 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskCoordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskCoordinator.kt @@ -17,7 +17,7 @@ class TaskCoordinator(): TaskCoordinatorBase() { override fun onCoordinatorReady() { super.onCoordinatorReady() - runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName) + runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ProcesserApplication::class.java.simpleName) runnerManager.assignRunner() } @@ -68,6 +68,10 @@ class TaskCoordinator(): TaskCoordinatorBase() { taskManager.produceEvent(event) } + override fun getEnabledState(): Boolean { + return runnerManager.amIEnabled() + } + override fun clearExpiredClaims() { val expiredClaims = taskManager.getTasksWithExpiredClaim().filter { it.task in listOf(TaskType.Encode, TaskType.Extract) } expiredClaims.forEach { diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/Resources.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/Resources.kt new file mode 100644 index 00000000..9a043430 --- /dev/null +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/Resources.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.shared.common.contract.Events +import no.iktdev.mediaprocessing.shared.common.contract.data.Event +import no.iktdev.mediaprocessing.shared.common.contract.jsonToEvent +import org.json.JSONArray + +enum class Files(val fileName: String) { + Output1("encodeProgress1.txt") +} + + +fun Files.getAsList(): List { + return this.javaClass.classLoader.getResource(this.fileName)?.readText()?.lines() ?: emptyList() +} + + +fun Files.getContent(): String? { + return this.javaClass.classLoader.getResource(this.fileName)?.readText() +} diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunnerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunnerTest.kt new file mode 100644 index 00000000..20d36b42 --- /dev/null +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegRunnerTest.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.processer.ffmpeg + +import no.iktdev.mediaprocessing.processer.Files +import no.iktdev.mediaprocessing.processer.getAsList +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class FfmpegRunnerTest { + + + + + +} \ No newline at end of file diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/progress/FfmpegProgressDecoderTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/progress/FfmpegProgressDecoderTest.kt new file mode 100644 index 00000000..c77ff31b --- /dev/null +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/progress/FfmpegProgressDecoderTest.kt @@ -0,0 +1,26 @@ +package no.iktdev.mediaprocessing.processer.ffmpeg.progress + +import no.iktdev.mediaprocessing.processer.Files +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegRunner +import no.iktdev.mediaprocessing.processer.getAsList +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class FfmpegProgressDecoderTest { + + @Test + @DisplayName("Verify that progress can be decoded") + fun parseReadout1() { + val lines = Files.Output1.getAsList() + val decoder = FfmpegProgressDecoder() + lines.forEach { + decoder.defineDuration(it) + } + val result = decoder.parseVideoProgress(lines) + assertThat(result?.progress).isNotNull() + val progress = decoder.getProgress(result!!) + assertThat(progress.progress).isGreaterThanOrEqualTo(0) + } +} \ No newline at end of file diff --git a/apps/processer/src/test/resources/encodeProgress1.txt b/apps/processer/src/test/resources/encodeProgress1.txt new file mode 100644 index 00000000..2abf5423 --- /dev/null +++ b/apps/processer/src/test/resources/encodeProgress1.txt @@ -0,0 +1,141 @@ +Guessed Channel Layout for Input Stream #0.1 : 5.1 +Input #0, matroska,webm, from '/src/input/completed/standalone/Potato.mkv': + Metadata: + encoder : libebml v1.3.10 + libmatroska v1.5.2 + creation_time : 2020-02-17T16:42:19.000000Z + Duration: 00:29:09.75, start: 0.000000, bitrate: 9882 kb/s + Stream #0:0: Video: h264 (High), yuv420p(tv, bt709, progressive), 1920x1080 [SAR 1:1 DAR 16:9], 23.98 fps, 23.98 tbr, 1k tbn, 47.95 tbc (default) + Metadata: + BPS-eng : 9239506 + DURATION-eng : 00:29:09.748000000 + NUMBER_OF_FRAMES-eng: 41952 + NUMBER_OF_BYTES-eng: 2020851044 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES + Stream #0:1(eng): Audio: eac3, 48000 Hz, 5.1, fltp (default) + Metadata: + BPS-eng : 640000 + DURATION-eng : 00:29:09.728000000 + NUMBER_OF_FRAMES-eng: 54679 + NUMBER_OF_BYTES-eng: 139978240 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES + Stream #0:2(eng): Subtitle: subrip + Metadata: + BPS-eng : 120 + DURATION-eng : 00:28:08.917000000 + NUMBER_OF_FRAMES-eng: 718 + NUMBER_OF_BYTES-eng: 25445 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES + Stream #0:3(eng): Subtitle: subrip + Metadata: + title : SDH + BPS-eng : 120 + DURATION-eng : 00:28:08.917000000 + NUMBER_OF_FRAMES-eng: 718 + NUMBER_OF_BYTES-eng: 25445 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES +Stream mapping: + Stream #0:0 -> #0:0 (h264 (native) -> hevc (libx265)) + Stream #0:1 -> #0:1 (copy) +x265 [info]: HEVC encoder version 3.4 +x265 [info]: build info [Linux][GCC 9.3.0][64 bit] 8bit+10bit+12bit +x265 [info]: using cpu capabilities: MMX2 SSE2Fast LZCNT SSSE3 SSE4.2 AVX FMA3 BMI2 AVX2 +x265 [info]: Main profile, Level-4 (Main tier) +x265 [info]: Thread pool created using 16 threads +x265 [info]: Slices : 1 +x265 [info]: frame threads / pool features : 4 / wpp(17 rows) +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +set_mempolicy: Operation not permitted +x265 [info]: Coding QT: max CU size, min CU size : 64 / 8 +x265 [info]: Residual QT: max TU size, max depth : 32 / 1 inter / 1 intra +x265 [info]: ME / range / subpel / merge : hex / 57 / 2 / 3 +x265 [info]: Keyframe min / max / scenecut / bias : 23 / 250 / 40 / 5.00 +x265 [info]: Lookahead / bframes / badapt : 20 / 4 / 2 +x265 [info]: b-pyramid / weightp / weightb : 1 / 1 / 0 +x265 [info]: References / ref-limit cu / depth : 3 / off / on +x265 [info]: AQ: mode / str / qg-size / cu-tree : 2 / 1.0 / 32 / 1 +x265 [info]: Rate Control / qCompress : CRF-16.0 / 0.60 +x265 [info]: tools: rd=3 psy-rd=2.00 early-skip rskip mode=1 signhide tmvp +x265 [info]: tools: b-intra strong-intra-smoothing lslices=6 deblock sao +[mp4 @ 0x64a098f4e140] track 1: codec frame size is not set +Output #0, mp4, to '/src/cache/Potato.work.mp4': + Metadata: + encoder : Lavf58.45.100 + Stream #0:0: Video: hevc (libx265) (hev1 / 0x31766568), yuv420p, 1920x1080 [SAR 1:1 DAR 16:9], q=-1--1, 23.98 fps, 24k tbn, 23.98 tbc (default) + Metadata: + BPS-eng : 9239506 + DURATION-eng : 00:29:09.748000000 + NUMBER_OF_FRAMES-eng: 41952 + NUMBER_OF_BYTES-eng: 2020851044 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES + encoder : Lavc58.91.100 libx265 + Side data: + cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: N/A + Stream #0:1(eng): Audio: eac3 (ec-3 / 0x332D6365), 48000 Hz, 5.1, fltp (default) + Metadata: + BPS-eng : 640000 + DURATION-eng : 00:29:09.728000000 + NUMBER_OF_FRAMES-eng: 54679 + NUMBER_OF_BYTES-eng: 139978240 + _STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit + _STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19 + _STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES +frame=34 +fps=0.00 +stream_0_0_q=21.9 +bitrate= 0.2kbits/s +total_size=44 +out_time_us=2198000 +out_time_ms=2198000 +out_time=00:00:02.198000 +dup_frames=0 +drop_frames=0 +speed=4.04x +progress=continue \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/TaskCoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/TaskCoordinatorBase.kt index 94b154bf..ad6ca7e2 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/TaskCoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/TaskCoordinatorBase.kt @@ -13,6 +13,7 @@ import javax.annotation.PostConstruct abstract class TaskCoordinatorBase() { private val log = KotlinLogging.logger {} var taskMode: ActiveMode = ActiveMode.Active + var isEnabled: Boolean = true private var ready: Boolean = false fun isReady() = ready @@ -43,7 +44,7 @@ abstract class TaskCoordinatorBase() { @Scheduled(fixedDelay = (5_000)) fun pullAvailable() { - if (taskMode != ActiveMode.Active) { + if (taskMode != ActiveMode.Active || !isEnabled) { return } pullForAvailableTasks() @@ -51,6 +52,20 @@ abstract class TaskCoordinatorBase() { abstract fun clearExpiredClaims() + abstract fun getEnabledState(): Boolean + + @Scheduled(fixedDelay = 10_000) + fun pullEnabledState() { + val prevState = isEnabled + isEnabled = getEnabledState() + if (prevState != isEnabled) { + log.info { "State changed for coordinator: $prevState -> $isEnabled" } + if (isEnabled) { + log.info { "New tasks will now be processed" } + } + } + } + @Scheduled(fixedDelay = (300_000)) fun resetExpiredClaims() { if (taskMode != ActiveMode.Active) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/RunnerManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/RunnerManager.kt index bb545401..a1f00f7a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/RunnerManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/RunnerManager.kt @@ -11,24 +11,35 @@ import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.select import java.util.UUID -class RunnerManager(private val dataSource: DataSource, private val startId: String = UUID.randomUUID().toString(), val name: String) { +class RunnerManager(private val dataSource: DataSource, val startId: String = UUID.randomUUID().toString(), val applicationName: String) { private val log = KotlinLogging.logger {} fun assignRunner(): Boolean { return executeOrException(dataSource.database) { runners.insert { it[runners.startId] = this@RunnerManager.startId - it[runners.application] = this@RunnerManager.name + it[runners.application] = this@RunnerManager.applicationName it[runners.version] = getAppVersion() } } == null } + fun amIEnabled(): Boolean { + return withDirtyRead(dataSource.database) { + runners.select { + (runners.application eq applicationName) and + (runners.startId eq startId) + }.singleOrNull()?.get(runners.enabled) + } ?: run { + log.error { "Failed to get a response, reporting false for enabled" } + false + } + } fun iAmSuperseded(): Boolean { return withDirtyRead(dataSource.database) { val runnerVersionCodes = runners.select { - (runners.application eq this@RunnerManager.name) and + (runners.application eq this@RunnerManager.applicationName) and (runners.startId neq this@RunnerManager.startId) }.map { it[runners.version] } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/runners.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/runners.kt index d0c00a72..58423274 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/runners.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/runners.kt @@ -11,4 +11,5 @@ object runners: IntIdTable() { val application: Column = varchar("application", 50) val version: Column = integer("version") val created: Column = datetime("created").defaultExpression(CurrentDateTime) + val enabled: Column = bool("enabled").default(true) } \ No newline at end of file diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index bf93d147..491083e4 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -36,6 +36,7 @@ abstract class EventCoordinator> { open var taskMode: ActiveMode = ActiveMode.Active + open var isEnabled: Boolean = true private val referencePool: MutableMap> = mutableMapOf() private fun referencePoolIsReadyForEvents(): Boolean { return (referencePool.isEmpty() || referencePool.any { !it.value.isActive }) @@ -44,6 +45,7 @@ abstract class EventCoordinator> { private var newEventProduced: Boolean = false abstract fun getActiveTaskMode(): ActiveMode + abstract fun updateEnabledState(): Boolean private var activePolls: Int = 0 data class PollStats(val active: Int, val total: Int) @@ -119,7 +121,8 @@ abstract class EventCoordinator> { var cachedReferenceList: MutableList = mutableListOf() private fun pullForEvents() { coroutine.launch { - while (taskMode == ActiveMode.Active && coroutine.isActive) { + while (taskMode == ActiveMode.Active && coroutine.isActive && isEnabled) { + if (referencePoolIsReadyForEvents()) { log.debug { "New pull on database" } val referenceIdsAvailable = eventManager.getAvailableReferenceIds() @@ -155,6 +158,21 @@ abstract class EventCoordinator> { } taskMode = getActiveTaskMode() } + coroutine.launch { + while (taskMode == ActiveMode.Active && coroutine.isActive) { + val previousState = isEnabled + val iminentStatus = updateEnabledState() + if (previousState != iminentStatus) { + log.info { "Observed change to enabled state, changed $previousState -> $iminentStatus" } + if (iminentStatus) { + log.info { "Ovserved change to enabled state, new state is enabled\n\tRestarting pullForEvents!" } + onReady() + return@launch + } + } + delay(10_000) + } + } } private var cachedListeners: List = emptyList()