From e169fc39e5fb1ba929d88e3960eb4811aeff84f1 Mon Sep 17 00:00:00 2001 From: bskjon Date: Wed, 21 Aug 2024 01:43:54 +0200 Subject: [PATCH] Small tweaks --- .../CoordinatorEventCoordinator.kt | 2 +- .../listeners/CompletedTaskListener.kt | 5 ++- .../MetadataWaitOrDefaultTaskListener.kt | 6 ++-- .../watcher/InputDirectoryWatcher.kt | 2 ++ .../common/database/cal/EventsManager.kt | 17 ++++++++-- .../implementations/EventCoordinator.kt | 33 +++++++++++++++++-- .../implementations/EventsManagerImpl.kt | 2 ++ 7 files changed, 56 insertions(+), 11 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index 8c73b889..b6017365 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -75,7 +75,7 @@ class Coordinator( produceNewEvent( - no.iktdev.mediaprocessing.shared.common.contract.data.PermitWorkCreationEvent( + PermitWorkCreationEvent( metadata = EventMetadata( referenceId = referenceId, derivedFromEventId = eventToAttachTo.eventId(), diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index a4c84f36..47223269 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -172,7 +172,7 @@ class CompletedTaskListener: CoordinatorEventListener() { val viableEvents = events.filter { it.isSuccessful() } - if (!req1(started, viableEvents)) { + if (!req1(started, events)) { //log.info { "${this::class.java.simpleName} Failed Req1" } return false } @@ -442,6 +442,9 @@ class CompletedTaskListener: CoordinatorEventListener() { ) )) } + + + active = false } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index e08f01a8..0206ad94 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -63,12 +63,10 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { return } - if (hasPollerForMetadataEvent && hasMetadataSearched) { + if (hasMetadataSearched) { waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId) return - } - - if (!hasMetadataSearched && !hasPollerForMetadataEvent) { + } else if (!hasPollerForMetadataEvent) { val consumedIncoming = incomingEvent.consume() if (consumedIncoming == null) { log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt index 3b90c117..3b094c55 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.watcher import dev.vishna.watchservice.KWatchEvent.Kind.Deleted import dev.vishna.watchservice.KWatchEvent.Kind.Initialized import dev.vishna.watchservice.asWatchChannel +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -51,6 +52,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche } + @OptIn(ExperimentalCoroutinesApi::class) suspend fun watchFiles() { log.info { "Starting Watcher" } watcherChannel.consumeEach { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/EventsManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/EventsManager.kt index bd0024ae..fd6dd5a8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/EventsManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/EventsManager.kt @@ -8,6 +8,7 @@ import no.iktdev.eventi.data.toJson import no.iktdev.eventi.database.DataSource import no.iktdev.eventi.database.isCausedByDuplicateError import no.iktdev.eventi.database.isExposedSqlException +import no.iktdev.eventi.database.withDirtyRead import no.iktdev.eventi.implementations.EventsManagerImpl import no.iktdev.mediaprocessing.shared.common.database.tables.allEvents import no.iktdev.mediaprocessing.shared.common.database.tables.events @@ -90,10 +91,22 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource) return event in exemptedFromSingleEvent } + override fun getAvailableReferenceIds(): List { + return withDirtyRead(dataSource.database) { + val completedEvents = events + .slice(events.referenceId) + .select { events.event eq Events.EventMediaProcessCompleted.event } + events + .slice(events.referenceId) + .select { events.referenceId notInSubQuery completedEvents } + .withDistinct() + .map { it[events.referenceId] } ?: emptyList() + } ?: emptyList() + } override fun readAvailableEvents(): List> { - return no.iktdev.eventi.database.withTransaction(dataSource.database) { + return withDirtyRead(dataSource.database) { events.selectAll() .groupBy { it[events.referenceId] } .mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }.filter { it.none { e -> e.eventType == Events.EventMediaProcessCompleted } } @@ -101,7 +114,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource) } override fun readAvailableEventsFor(referenceId: String): List { - val events = no.iktdev.eventi.database.withTransaction(dataSource.database) { + val events = withDirtyRead(dataSource.database) { events.select { events.referenceId eq referenceId } .mapNotNull { it.toEvent() } } ?: emptyList() diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index bf75f0de..2f643678 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -53,6 +53,7 @@ abstract class EventCoordinator> { private fun onEventGroupsReceived(eventGroup: List>) { val egRefIds = eventGroup.map { it.first().referenceId() } + val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key } orphanedReferences.forEach { id -> referencePool.remove(id) } @@ -80,6 +81,28 @@ abstract class EventCoordinator> { } } + private fun onEventCollectionReceived(referenceId: String, events: List) { + val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in referenceId }.map { it.key } + orphanedReferences.forEach { id -> referencePool.remove(id) } + + activePolls = referencePool.values.filter { it.isActive }.size + if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) { + log.info { "Last active references removed from pull pool, " } + } + + val isAvailable = if (referenceId in referencePool.keys) { + referencePool[referenceId]?.isActive != true + } else true + + if (isAvailable) { + referencePool[referenceId] = coroutine.async { + onEventsReceived(events) + } + } + + } + + private suspend fun onEventsReceived(events: List): Boolean = coroutineScope { val listeners = getListeners() events.forEach { event -> @@ -104,9 +127,13 @@ abstract class EventCoordinator> { while (taskMode == ActiveMode.Active) { if (referencePoolIsReadyForEvents()) { log.debug { "New pull on database" } - val events = eventManager.readAvailableEvents() - onEventGroupsReceived(events) - if (events.isNotEmpty()) { + val referenceIdsAvailable = eventManager.getAvailableReferenceIds() + for (referenceId in referenceIdsAvailable) { + val events = eventManager.readAvailableEventsFor(referenceId) + onEventCollectionReceived(referenceId, events) + } + + if (referenceIdsAvailable.isNotEmpty()) { if (pullDelay.get() != fastPullDelay.get()) { log.info { "Available events found, switching to fast pull @ Delay -> ${fastPullDelay.get()}" } } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventsManagerImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventsManagerImpl.kt index ab5fcd36..1ac0e4ff 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventsManagerImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventsManagerImpl.kt @@ -7,6 +7,8 @@ import no.iktdev.eventi.database.DataSource * Interacts with the database, needs to be within the Coordinator */ abstract class EventsManagerImpl(val dataSource: DataSource) { + + abstract fun getAvailableReferenceIds(): List abstract fun readAvailableEvents(): List> abstract fun readAvailableEventsFor(referenceId: String): List