diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index dcb2b2f3..177ee2a9 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -34,9 +34,11 @@
-
+
-
+
+
+
@@ -136,87 +138,87 @@
- {
- "keyToString": {
- "Gradle.Build MediaProcessing2.executor": "Run",
- "Gradle.EncodeArgumentCreatorTaskTest.verifyThatEacStreamGetsCorrectArguments.executor": "Run",
- "Gradle.EventiApplicationTests.contextLoads.executor": "Run",
- "Gradle.EventiApplicationTests.executor": "Debug",
- "Gradle.EventiImplementationBase.executor": "Run",
- "Gradle.EventiImplementationBase.validateCoordinatorConstruction.executor": "Run",
- "Gradle.EventiImplementationBase.validateCreation.executor": "Run",
- "Gradle.FileNameDeterminateTest.serieWithTitleFroMMetadata.executor": "Run",
- "Gradle.FileNameParserTest.executor": "Run",
- "Gradle.FileNameParserTest.findSearchableTitle.executor": "Run",
- "Gradle.FileNameParserTest.findSearchableTitle2.executor": "Run",
- "Gradle.FileNameParserTest.findTitleWithYear.executor": "Run",
- "Gradle.FileNameParserTest.movieName.executor": "Debug",
- "Gradle.FileNameParserTest.movieName2.executor": "Debug",
- "Gradle.FileNameParserTest.serieName.executor": "Run",
- "Gradle.FileNameParserTest.serieNameWithNumbers (1).executor": "Run",
- "Gradle.FileNameParserTest.serieNameWithNumbers.executor": "Run",
- "Gradle.FileNameParserTest.testName.executor": "Run",
- "Gradle.FileNameParserTest.testParsing.executor": "Run",
- "Gradle.FirstEventListenerImplTestBase.validate2.executor": "Debug",
- "Gradle.FirstEventListenerTest.executor": "Run",
- "Gradle.ForthEventListenerTest.validate1.executor": "Run",
- "Gradle.ForthEventListenerTestBase.executor": "Run",
- "Gradle.ForthEventListenerTestBase.validate1.executor": "Debug",
- "Gradle.MediaProcessing2 [:apps:processer:bootJar].executor": "Run",
- "Gradle.MediaProcessing2 [:clean].executor": "Run",
- "Gradle.MediaProcessing2 [:shared:build].executor": "Run",
- "Gradle.MediaProcessing2 [:shared:eventi:compileKotlin].executor": "Run",
- "Gradle.MediaProcessing2 [apps:build].executor": "Run",
- "Gradle.MediaProcessing2 [build].executor": "Run",
- "Gradle.MediaProcessing2 [clean install].executor": "Run",
- "Gradle.MediaProcessing2 [clean].executor": "Run",
- "Gradle.MediaProcessing2:apps:processer [bootJar].executor": "Run",
- "Gradle.MediaProcessing2:shared [assemble].executor": "Run",
- "Gradle.MediaProcessing2:shared [build].executor": "Run",
- "Gradle.MediaProcessing2:shared:eventi [build].executor": "Run",
- "Gradle.MediaProcessing2:shared:eventi [clean].executor": "Run",
- "Gradle.MetadataAndBaseInfoToFileOutTest.testVideoData.executor": "Debug",
- "Gradle.PersistentEventMangerTest.executor": "Run",
- "Gradle.PersistentEventMangerTest.testConvertBatchFromExtract.executor": "Run",
- "Gradle.PersistentEventMangerTest.testSomeAreSingleSomeAreNot.executor": "Run",
- "Gradle.PersistentEventMangerTest.testSupersededButKeepWork.executor": "Run",
- "Gradle.PersistentEventMangerTest.testSupersededWork.executor": "Run",
- "Gradle.SecondEventListenerTest.executor": "Run",
- "Gradle.SecondEventListenerTest.validate1.executor": "Run",
- "Gradle.SecondEventListenerTestBase.validate1.executor": "Debug",
- "Gradle.Tests in 'MediaProcessing.apps.coordinator'.executor": "Run",
- "Gradle.Tests in 'MediaProcessing.apps.processer'.executor": "Run",
- "Gradle.Tests in 'MediaProcessing.shared.common.test'.executor": "Run",
- "Gradle.Tests in 'MediaProcessing.shared.eventi'.executor": "Run",
- "Gradle.Tests in 'MediaProcessing.shared.eventi.test'.executor": "Run",
- "Gradle.Tests in 'no.iktdev.eventi'.executor": "Run",
- "Gradle.ThirdEventListenerTest.validate1.executor": "Run",
- "Gradle.ThirdEventListenerTestBase.validate1.executor": "Run",
- "Gradle.ThridEventListenerTest.executor": "Run",
- "Gradle.ThridEventListenerTest.validate1.executor": "Debug",
- "Gradle.ThridEventListenerTest.validate2.executor": "Debug",
- "JUnit.All in MediaProcessing.executor": "Run",
- "Kotlin.ConverterApplicationKt.executor": "Run",
- "Kotlin.CoordinatorApplicationKt.executor": "Run",
- "Kotlin.Env - CoordinatorApplicationKt.executor": "Run",
- "Kotlin.EventiApplicationKt.executor": "Run",
- "Kotlin.ProcesserApplicationKt.executor": "Run",
- "Kotlin.UIApplicationKt.executor": "Run",
- "RunOnceActivity.OpenProjectViewOnStart": "true",
- "RunOnceActivity.ShowReadmeOnStart": "true",
- "SHARE_PROJECT_CONFIGURATION_FILES": "true",
- "ShowUsagesActions.previewPropertyKey": "true",
- "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrary": "JUnit5",
- "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrarySuperClass.JUnit5": "",
- "git-widget-placeholder": "v3",
- "jdk.selected.JAVA_MODULE": "azul-17",
- "kotlin-language-version-configured": "true",
- "last_opened_file_path": "D:/Workspace/MediaProcessing2/shared/eventi",
- "project.structure.last.edited": "Modules",
- "project.structure.proportion": "0.15",
- "project.structure.side.proportion": "0.2"
+
+}]]>
@@ -269,7 +271,7 @@
-
+
@@ -370,16 +372,16 @@
-
+
+
-
@@ -415,20 +417,6 @@
1712771068889
-
-
- 1719706352452
-
-
- 1719706352452
-
-
-
- 1719708288857
-
-
- 1719708288857
-
1719738757568
@@ -758,7 +746,21 @@
1721320748899
-
+
+
+ 1721321619065
+
+
+ 1721321619065
+
+
+
+ 1721329940691
+
+
+ 1721329940691
+
+
@@ -778,8 +780,6 @@
-
-
@@ -803,7 +803,9 @@
-
+
+
+
@@ -1004,14 +1006,6 @@
73
-
- file://$PROJECT_DIR$/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
- 67
-
-
-
-
-
file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt
44
@@ -1063,7 +1057,7 @@
file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
- 40
+ 43
@@ -1084,6 +1078,62 @@
+
+ file://$PROJECT_DIR$/apps/coordinator/build.gradle.kts
+ 85
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 46
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 53
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 59
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 64
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 88
+
+
+
+ file://$PROJECT_DIR$/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
+ 47
+
+
+
+
+
+
+ file://$PROJECT_DIR$/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
+ 64
+
+
+
+
+
+
+ file://$PROJECT_DIR$/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
+ 54
+
+
+
+ file://$PROJECT_DIR$/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+ 35
+
+
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt
index ae3792ea..c9722a87 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/EventsManager.kt
@@ -93,7 +93,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
return no.iktdev.eventi.database.withTransaction(dataSource.database) {
events.selectAll()
.groupBy { it[events.referenceId] }
- .mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }
+ .mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }.filter { it.none { e -> e.eventType == Events.EventMediaProcessCompleted } }
} ?: emptyList()
}
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
index 75b3a3ab..6eaa9740 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt
@@ -33,8 +33,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean {
- return super.shouldIProcessAndHandleEvent(incomingEvent, events) && incomingEvent.eventType in listensForEvents
-
+ val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
+ if (!state) {
+ return false
+ }
+ return incomingEvent.eventType in listensForEvents
}
override fun onEventsReceived(incomingEvent: ConsumableEvent, events: List) {
diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
index b3ae1d22..4c8f1e7e 100644
--- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
+++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt
@@ -33,41 +33,59 @@ abstract class EventCoordinator> {
}
- var taskMode: ActiveMode = ActiveMode.Active
+ open var taskMode: ActiveMode = ActiveMode.Active
+ private val referencePool: MutableMap> = mutableMapOf()
+ private fun referencePoolIsReadyForEvents(): Boolean {
+ return (referencePool.isEmpty() || referencePool.any { !it.value.isActive })
+ }
private var newEventProduced: Boolean = false
- private fun onEventsReceived(events: List) = runBlocking {
+
+
+ private fun onEventGroupsReceived(eventGroup: List>) {
+ val activePolls = referencePool.values.filter { it.isActive }.size
+ log.info { "Active polls $activePolls/${referencePool.values.size}" }
+ eventGroup.forEach {
+ val referenceId = it.first().referenceId()
+
+ val isAvailable = if (referenceId in referencePool.keys) {
+ referencePool[referenceId]?.isActive != true
+ } else true
+
+ if (isAvailable) {
+ referencePool[referenceId] = coroutine.async {
+ onEventsReceived(it)
+ }
+ }
+ }
+ }
+
+ private suspend fun onEventsReceived(events: List): Boolean = coroutineScope {
val listeners = getListeners()
- launch {
- events.forEach { event ->
- listeners.forEach { listener ->
- if (listener.shouldIProcessAndHandleEvent(event, events)) {
- val consumableEvent = ConsumableEvent(event)
- listener.onEventsReceived(consumableEvent, events)
- if (consumableEvent.isConsumed) {
- log.info { "Consumption detected for ${listener::class.java.simpleName} on event ${event.eventType}" }
- return@launch
- }
+ events.forEach { event ->
+ listeners.forEach { listener ->
+ if (listener.shouldIProcessAndHandleEvent(event, events)) {
+ val consumableEvent = ConsumableEvent(event)
+ listener.onEventsReceived(consumableEvent, events)
+ if (consumableEvent.isConsumed) {
+ log.info { "Consumption detected for ${events.first().referenceId()} -> ${listener::class.java.simpleName} on event ${event.eventType}" }
+ return@coroutineScope true
}
}
}
- log.debug { "No consumption detected for ${events.first().referenceId()}" }
-
}
+ log.debug { "No consumption detected for ${events.first().referenceId()}" }
+ false
}
private var newEventsProducedOnReferenceId: AtomicReference> = AtomicReference(emptyList())
private fun pullForEvents() {
coroutine.launch {
while (taskMode == ActiveMode.Active) {
- log.debug { "New pull on database" }
- val events = eventManager?.readAvailableEvents()
- if (events == null) {
- log.warn { "EventManager is not loaded!" }
- } else {
- events.forEach { group ->
- onEventsReceived(group)
- }
+ if (referencePoolIsReadyForEvents()) {
+ log.debug { "New pull on database" }
+ val events = eventManager.readAvailableEvents()
+ onEventGroupsReceived(events)
}
waitForConditionOrTimeout(pullDelay.get()) { newEventProduced }.also {
newEventProduced = false