From a74e39ed1ed082a6fab248aad6247fb7cdca8b69 Mon Sep 17 00:00:00 2001 From: bskjon Date: Thu, 18 Jul 2024 23:35:53 +0200 Subject: [PATCH] v3 26 --- .idea/workspace.xml | 274 +++++++++++------- .../coordinator/EventsManager.kt | 2 +- .../CoverFromMetadataTaskListener.kt | 7 +- .../implementations/EventCoordinator.kt | 62 ++-- 4 files changed, 208 insertions(+), 137 deletions(-) diff --git a/.idea/workspace.xml b/.idea/workspace.xml index dcb2b2f3..177ee2a9 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -34,9 +34,11 @@ - + - + + + - { - "keyToString": { - "Gradle.Build MediaProcessing2.executor": "Run", - "Gradle.EncodeArgumentCreatorTaskTest.verifyThatEacStreamGetsCorrectArguments.executor": "Run", - "Gradle.EventiApplicationTests.contextLoads.executor": "Run", - "Gradle.EventiApplicationTests.executor": "Debug", - "Gradle.EventiImplementationBase.executor": "Run", - "Gradle.EventiImplementationBase.validateCoordinatorConstruction.executor": "Run", - "Gradle.EventiImplementationBase.validateCreation.executor": "Run", - "Gradle.FileNameDeterminateTest.serieWithTitleFroMMetadata.executor": "Run", - "Gradle.FileNameParserTest.executor": "Run", - "Gradle.FileNameParserTest.findSearchableTitle.executor": "Run", - "Gradle.FileNameParserTest.findSearchableTitle2.executor": "Run", - "Gradle.FileNameParserTest.findTitleWithYear.executor": "Run", - "Gradle.FileNameParserTest.movieName.executor": "Debug", - "Gradle.FileNameParserTest.movieName2.executor": "Debug", - "Gradle.FileNameParserTest.serieName.executor": "Run", - "Gradle.FileNameParserTest.serieNameWithNumbers (1).executor": "Run", - "Gradle.FileNameParserTest.serieNameWithNumbers.executor": "Run", - "Gradle.FileNameParserTest.testName.executor": "Run", - "Gradle.FileNameParserTest.testParsing.executor": "Run", - "Gradle.FirstEventListenerImplTestBase.validate2.executor": "Debug", - "Gradle.FirstEventListenerTest.executor": "Run", - "Gradle.ForthEventListenerTest.validate1.executor": "Run", - "Gradle.ForthEventListenerTestBase.executor": "Run", - "Gradle.ForthEventListenerTestBase.validate1.executor": "Debug", - "Gradle.MediaProcessing2 [:apps:processer:bootJar].executor": "Run", - "Gradle.MediaProcessing2 [:clean].executor": "Run", - "Gradle.MediaProcessing2 [:shared:build].executor": "Run", - "Gradle.MediaProcessing2 [:shared:eventi:compileKotlin].executor": "Run", - "Gradle.MediaProcessing2 [apps:build].executor": "Run", - "Gradle.MediaProcessing2 [build].executor": "Run", - "Gradle.MediaProcessing2 [clean install].executor": "Run", - "Gradle.MediaProcessing2 [clean].executor": "Run", - "Gradle.MediaProcessing2:apps:processer [bootJar].executor": "Run", - "Gradle.MediaProcessing2:shared [assemble].executor": "Run", - "Gradle.MediaProcessing2:shared [build].executor": "Run", - "Gradle.MediaProcessing2:shared:eventi [build].executor": "Run", - "Gradle.MediaProcessing2:shared:eventi [clean].executor": "Run", - "Gradle.MetadataAndBaseInfoToFileOutTest.testVideoData.executor": "Debug", - "Gradle.PersistentEventMangerTest.executor": "Run", - "Gradle.PersistentEventMangerTest.testConvertBatchFromExtract.executor": "Run", - "Gradle.PersistentEventMangerTest.testSomeAreSingleSomeAreNot.executor": "Run", - "Gradle.PersistentEventMangerTest.testSupersededButKeepWork.executor": "Run", - "Gradle.PersistentEventMangerTest.testSupersededWork.executor": "Run", - "Gradle.SecondEventListenerTest.executor": "Run", - "Gradle.SecondEventListenerTest.validate1.executor": "Run", - "Gradle.SecondEventListenerTestBase.validate1.executor": "Debug", - "Gradle.Tests in 'MediaProcessing.apps.coordinator'.executor": "Run", - "Gradle.Tests in 'MediaProcessing.apps.processer'.executor": "Run", - "Gradle.Tests in 'MediaProcessing.shared.common.test'.executor": "Run", - "Gradle.Tests in 'MediaProcessing.shared.eventi'.executor": "Run", - "Gradle.Tests in 'MediaProcessing.shared.eventi.test'.executor": "Run", - "Gradle.Tests in 'no.iktdev.eventi'.executor": "Run", - "Gradle.ThirdEventListenerTest.validate1.executor": "Run", - "Gradle.ThirdEventListenerTestBase.validate1.executor": "Run", - "Gradle.ThridEventListenerTest.executor": "Run", - "Gradle.ThridEventListenerTest.validate1.executor": "Debug", - "Gradle.ThridEventListenerTest.validate2.executor": "Debug", - "JUnit.All in MediaProcessing.executor": "Run", - "Kotlin.ConverterApplicationKt.executor": "Run", - "Kotlin.CoordinatorApplicationKt.executor": "Run", - "Kotlin.Env - CoordinatorApplicationKt.executor": "Run", - "Kotlin.EventiApplicationKt.executor": "Run", - "Kotlin.ProcesserApplicationKt.executor": "Run", - "Kotlin.UIApplicationKt.executor": "Run", - "RunOnceActivity.OpenProjectViewOnStart": "true", - "RunOnceActivity.ShowReadmeOnStart": "true", - "SHARE_PROJECT_CONFIGURATION_FILES": "true", - "ShowUsagesActions.previewPropertyKey": "true", - "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrary": "JUnit5", - "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrarySuperClass.JUnit5": "", - "git-widget-placeholder": "v3", - "jdk.selected.JAVA_MODULE": "azul-17", - "kotlin-language-version-configured": "true", - "last_opened_file_path": "D:/Workspace/MediaProcessing2/shared/eventi", - "project.structure.last.edited": "Modules", - "project.structure.proportion": "0.15", - "project.structure.side.proportion": "0.2" + +}]]> @@ -269,7 +271,7 @@ - + @@ -778,8 +780,6 @@ - - @@ -803,7 +803,9 @@ - @@ -1004,14 +1006,6 @@ 73 diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt index ae3792ea..c9722a87 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt @@ -93,7 +93,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource) return no.iktdev.eventi.database.withTransaction(dataSource.database) { events.selectAll() .groupBy { it[events.referenceId] } - .mapNotNull { it.value.mapNotNull { v -> v.toEvent() } } + .mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }.filter { it.none { e -> e.eventType == Events.EventMediaProcessCompleted } } } ?: emptyList() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index 75b3a3ab..6eaa9740 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -33,8 +33,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { - return super.shouldIProcessAndHandleEvent(incomingEvent, events) && incomingEvent.eventType in listensForEvents - + val state = super.shouldIProcessAndHandleEvent(incomingEvent, events) + if (!state) { + return false + } + return incomingEvent.eventType in listensForEvents } override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) { diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index b3ae1d22..4c8f1e7e 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -33,41 +33,59 @@ abstract class EventCoordinator> { } - var taskMode: ActiveMode = ActiveMode.Active + open var taskMode: ActiveMode = ActiveMode.Active + private val referencePool: MutableMap> = mutableMapOf() + private fun referencePoolIsReadyForEvents(): Boolean { + return (referencePool.isEmpty() || referencePool.any { !it.value.isActive }) + } private var newEventProduced: Boolean = false - private fun onEventsReceived(events: List) = runBlocking { + + + private fun onEventGroupsReceived(eventGroup: List>) { + val activePolls = referencePool.values.filter { it.isActive }.size + log.info { "Active polls $activePolls/${referencePool.values.size}" } + eventGroup.forEach { + val referenceId = it.first().referenceId() + + val isAvailable = if (referenceId in referencePool.keys) { + referencePool[referenceId]?.isActive != true + } else true + + if (isAvailable) { + referencePool[referenceId] = coroutine.async { + onEventsReceived(it) + } + } + } + } + + private suspend fun onEventsReceived(events: List): Boolean = coroutineScope { val listeners = getListeners() - launch { - events.forEach { event -> - listeners.forEach { listener -> - if (listener.shouldIProcessAndHandleEvent(event, events)) { - val consumableEvent = ConsumableEvent(event) - listener.onEventsReceived(consumableEvent, events) - if (consumableEvent.isConsumed) { - log.info { "Consumption detected for ${listener::class.java.simpleName} on event ${event.eventType}" } - return@launch - } + events.forEach { event -> + listeners.forEach { listener -> + if (listener.shouldIProcessAndHandleEvent(event, events)) { + val consumableEvent = ConsumableEvent(event) + listener.onEventsReceived(consumableEvent, events) + if (consumableEvent.isConsumed) { + log.info { "Consumption detected for ${events.first().referenceId()} -> ${listener::class.java.simpleName} on event ${event.eventType}" } + return@coroutineScope true } } } - log.debug { "No consumption detected for ${events.first().referenceId()}" } - } + log.debug { "No consumption detected for ${events.first().referenceId()}" } + false } private var newEventsProducedOnReferenceId: AtomicReference> = AtomicReference(emptyList()) private fun pullForEvents() { coroutine.launch { while (taskMode == ActiveMode.Active) { - log.debug { "New pull on database" } - val events = eventManager?.readAvailableEvents() - if (events == null) { - log.warn { "EventManager is not loaded!" } - } else { - events.forEach { group -> - onEventsReceived(group) - } + if (referencePoolIsReadyForEvents()) { + log.debug { "New pull on database" } + val events = eventManager.readAvailableEvents() + onEventGroupsReceived(events) } waitForConditionOrTimeout(pullDelay.get()) { newEventProduced }.also { newEventProduced = false