diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index efe36eaa..206c90da 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -19,6 +19,8 @@ import org.springframework.context.annotation.Bean val log = KotlinLogging.logger {} private lateinit var eventDatabase: EventsDatabase private lateinit var eventsManager: EventsManager +lateinit var runnerManager: RunnerManager + @SpringBootApplication class CoordinatorApplication { @@ -85,6 +87,10 @@ fun main(args: Array) { titles ) storeDatabase.createTables(*tables) + + runnerManager = RunnerManager(dataSource = eventDatabase.database, name = CoordinatorApplication::class.java.simpleName) + runnerManager.assignRunner() + runApplication(*args) log.info { "App Version: ${getAppVersion()}" } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index 476e150c..c8e00720 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator import no.iktdev.eventi.data.EventMetadata import no.iktdev.eventi.data.EventStatus import no.iktdev.eventi.data.eventId +import no.iktdev.eventi.implementations.ActiveMode import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.mediaprocessing.shared.contract.Events import no.iktdev.mediaprocessing.shared.contract.ProcessType @@ -82,4 +83,12 @@ class Coordinator( data = message )) } + + override fun getActiveTaskMode(): ActiveMode { + if (runnerManager.iAmSuperseded()) { + // This will let the application complete but not consume new + taskMode = ActiveMode.Passive + } + return taskMode + } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt index ebd7fbff..86858a5d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt @@ -36,6 +36,7 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true val message = try { readFileInfo(event.data as StartEventData, event.metadata.eventId)?.let { BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it) @@ -45,6 +46,7 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())) } onProduceEvent(message) + active = false } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index 472f611d..2861a624 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -19,9 +19,13 @@ import no.iktdev.streamit.library.db.query.CatalogQuery import no.iktdev.streamit.library.db.query.GenreQuery import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.db.query.SummaryQuery +import no.iktdev.streamit.library.db.tables.catalog import no.iktdev.streamit.library.db.tables.titles import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insertIgnore +import org.jetbrains.exposed.sql.select +import org.jetbrains.exposed.sql.update import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @@ -323,14 +327,34 @@ class CompletedTaskListener: CoordinatorEventListener() { } fun storeMetadata(catalogId: Int, metadata: MetadataDto) { - metadata.summary.forEach { + if (!metadata.cover.isNullOrBlank()) { withTransaction(getStoreDatabase()) { + val storedCatalogCover = catalog.select { + (catalog.id eq catalogId) + }.map { it[catalog.cover] }.firstOrNull() + if (storedCatalogCover.isNullOrBlank()) { + catalog.update({ + catalog.id eq catalogId + }) { + it[catalog.cover] = metadata.cover + } + } + } + } + + + metadata.summary.forEach { + val result = executeOrException(getStoreDatabase().database) { SummaryQuery( cid = catalogId, language = it.language, description = it.summary ).insert() } + val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062 + if (!ignoreException) { + result?.printStackTrace() + } } } @@ -371,11 +395,9 @@ class CompletedTaskListener: CoordinatorEventListener() { else -> throw RuntimeException("${videoDetails.type} is not supported!") } val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062 - return if (result == null || ignoreException ) { - return withTransaction(getStoreDatabase()) { - precreatedCatalogQuery.getId() - } - } else null + return withTransaction(getStoreDatabase()) { + precreatedCatalogQuery.getId() + } } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { @@ -388,6 +410,7 @@ class CompletedTaskListener: CoordinatorEventListener() { override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { val event = incomingEvent.consume() ?: return + active = true val metadata = getMetadata(events) val genres = getGenres(events) @@ -418,6 +441,7 @@ class CompletedTaskListener: CoordinatorEventListener() { events.map { it.eventId() } ) )) + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index ca89dfc1..e4180ac9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -57,7 +57,9 @@ class ConvertWorkTaskListener: WorkTaskListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true if (!canStart(event, events)) { + active = false return } @@ -104,5 +106,6 @@ class ConvertWorkTaskListener: WorkTaskListener() { ) } } + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt index 215ef80b..b37b502d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt @@ -35,7 +35,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } - + active = true val failedEventDefault = MediaCoverDownloadedEvent( metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) @@ -45,6 +45,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { if (data == null) { log.error { "No valid data for use to obtain cover" } onProduceEvent(failedEventDefault) + active = false return } @@ -84,6 +85,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { } else { if (!result.exists() || !result.canRead()) { onProduceEvent(failedEventDefault) + active = false return } onProduceEvent(MediaCoverDownloadedEvent( @@ -91,6 +93,6 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { data = DownloadedCover(result.absolutePath) )) } - + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index e839a54f..a94265fb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -50,11 +50,12 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } - + active = true val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az()?.data if (baseInfo == null) { log.info { "No base info" } + active = false return } @@ -64,6 +65,7 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { val mediaOutInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az()?.data if (mediaOutInfo == null) { log.info { "No Media out info" } + active = false return } val videoInfo = mediaOutInfo.toValueObject() @@ -89,6 +91,6 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { ) } onProduceEvent(result) - + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt index a4051f4d..e4ef3858 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt @@ -49,23 +49,38 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } - - val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az() ?: return + active = true + val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az() + if (started == null) { + active = false + return + } if (started.data == null || started.data?.operations?.contains(StartOperationEvents.ENCODE) == false) { + active = false return } val streams = events.find { it.eventType == Events.EventMediaParseStreamPerformed }?.az()?.data if (streams == null) { + active = false return } val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az() if (mediaInfo?.data == null) { + active = false + return + } + val mediaInfoData = mediaInfo.data?.toValueObject() + if (mediaInfoData == null) { + active = false return } - val mediaInfoData = mediaInfo.data?.toValueObject() ?: return - val inputFile = started.data?.file ?: return + val inputFile = started.data?.file + if (inputFile == null) { + active = false + return + } val mapper = EncodeWorkArgumentsMapping( inputFile = inputFile, outFileFullName = mediaInfoData.fullName, @@ -85,7 +100,6 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { data = result )) } - - + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt index 455d6f0c..46bad07b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt @@ -43,8 +43,9 @@ class EncodeWorkTaskListener : WorkTaskListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } - + active = true if (!canStart(event, events)) { + active = false return } @@ -56,6 +57,7 @@ class EncodeWorkTaskListener : WorkTaskListener() { } if (encodeArguments == null) { log.error { "No Encode arguments found.. referenceId: ${event.referenceId()}" } + active = false return } EncodeWorkCreatedEvent( @@ -72,6 +74,6 @@ class EncodeWorkTaskListener : WorkTaskListener() { inputFile = event.data!!.inputFile ) } - + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 6d461c16..4e59d55d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -45,22 +45,34 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az() ?: return if (started.data == null || started.data?.operations?.contains(StartOperationEvents.EXTRACT) == false) { + active = false return } val streams = events.find { it.eventType == Events.EventMediaParseStreamPerformed }?.az()?.data if (streams == null) { + active = false return } val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az() if (mediaInfo?.data == null) { + active = false + return + } + val mediaInfoData = mediaInfo.data?.toValueObject() + if (mediaInfoData == null) { + active = false return } - val mediaInfoData = mediaInfo.data?.toValueObject() ?: return - val inputFile = started.data?.file ?: return + val inputFile = started.data?.file + if (inputFile == null) { + active = false + return + } val mapper = ExtractWorkArgumentsMapping( inputFile = inputFile, @@ -80,6 +92,6 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { data = result )) } - + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt index 1a55936f..c1dba2c3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt @@ -47,10 +47,14 @@ class ExtractWorkTaskListener: WorkTaskListener() { val event = incomingEvent.consume() if (event == null) { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } + active = false return } + active = true + if (!canStart(event, events)) { + active = false return } @@ -62,12 +66,11 @@ class ExtractWorkTaskListener: WorkTaskListener() { } if (arguments == null) { log.error { "No Extract arguments found.. referenceId: ${event.referenceId()}" } + active = false return } if (arguments.isEmpty()) { - ExtractWorkCreatedEvent( - metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) - ) + active = false return } @@ -87,5 +90,6 @@ class ExtractWorkTaskListener: WorkTaskListener() { inputFile = event.data!!.inputFile ) } + active = false } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt index 00164e36..135789ea 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt @@ -49,6 +49,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true val metadataResult = event.az() val mediaBaseInfo = events.findLast { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az()?.data @@ -59,6 +60,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) ) ) + active = false return } val pm = ProcessMediaInfoAndMetadata(mediaBaseInfo, metadataResult?.data) @@ -78,6 +80,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { ) } onProduceEvent(result) + active = false } class ProcessMediaInfoAndMetadata(val baseInfo: BaseInfo, val metadata: pyMetadata? = null) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index c54d8bf1..85e4a8bd 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -114,6 +114,7 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { ) waitingProcessesForMeta.remove(it.key) } + active = expired.isNotEmpty() } data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt index 863abdca..a2803c9e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt @@ -50,6 +50,8 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true + val readData = event.dataAs() val result = try { @@ -64,6 +66,7 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { ) } onProduceEvent(result) + active = false } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt index f840fdcc..1ccc9df6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt @@ -52,10 +52,12 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" } return } + active = true val startEvent = event.dataAs() if (startEvent == null || !startEvent.operations.any { it in requiredOperations }) { log.info { "${event.metadata.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" } + active = false return } val result = runBlocking { @@ -73,6 +75,7 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { } } onProduceEvent(result) + active = false } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index 667be2e6..1b075eae 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -43,6 +43,9 @@ abstract class EventCoordinator> { private var newEventProduced: Boolean = false + abstract fun getActiveTaskMode(): ActiveMode + + private fun onEventGroupsReceived(eventGroup: List>) { val egRefIds = eventGroup.map { it.first().referenceId() } @@ -108,7 +111,7 @@ abstract class EventCoordinator> { pullDelay.set(fastPullDelay.get()) } else { if (pullDelay.get() != slowPullDelay.get()) { - log.info { "None events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" } + log.info { "No events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" } } pullDelay.set(slowPullDelay.get()) } @@ -118,6 +121,7 @@ abstract class EventCoordinator> { } newEventProduced = false } + taskMode = getActiveTaskMode() } } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt index 0f95d928..32b5d27a 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt @@ -10,6 +10,11 @@ abstract class EventListenerImpl> { abstract val produceEvent: Any abstract val listensForEvents: List + protected var active: Boolean = false + open fun isActive(): Boolean { + return active + } + open fun onReady() { }