diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt index d271888d..37b585db 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEventCoordinator.kt @@ -14,7 +14,9 @@ import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents import no.iktdev.mediaprocessing.shared.common.database.cal.EventsManager import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.ApplicationContext +import org.springframework.context.event.EventListener import org.springframework.stereotype.Component import java.io.File import java.util.* @@ -28,7 +30,9 @@ class Coordinator( ) : EventCoordinator() { - init { + @EventListener(ApplicationReadyEvent::class) + fun onApplicationReady() { + onReady() } fun getProducerName(): String { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index cbd782fe..7861b441 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -107,6 +107,9 @@ class CompletedTaskListener : CoordinatorEventListener() { } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { + if (doNotProduceComplete) { + return false + } val result = super.shouldIProcessAndHandleEvent(incomingEvent, events) return result && incomingEvent.eventType == Events.PersistContent } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt index 3a8afee3..b033952b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt @@ -76,6 +76,9 @@ class PersistContentTaskListener : CoordinatorEventListener() { } override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { + if (doNotProduceComplete) { + return false + } val result = super.shouldIProcessAndHandleEvent(incomingEvent, events) return result } @@ -84,6 +87,10 @@ class PersistContentTaskListener : CoordinatorEventListener() { val event = incomingEvent.consume() ?: return active = true + if (doNotProduceComplete) { + return + } + val mediaInfo: ComposedMediaInfo = composeMediaInfo(events) ?: run { log.error { "Unable to compose media info for ${event.referenceId()}" } return diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index f3d0f02c..08a2fd40 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -22,17 +22,16 @@ abstract class EventCoordinator> { //private val listeners: MutableList> = mutableListOf() private val log = KotlinLogging.logger {} - private var coroutine = CoroutineScope(Dispatchers.IO + Job()) + private var coroutine = CoroutineScope(Dispatchers.IO + SupervisorJob()) - private var ready: Boolean = false + open var ready: Boolean = false fun isReady(): Boolean { return ready } - init { + open fun onReady() { ready = true pullForEvents() - } @@ -52,35 +51,6 @@ abstract class EventCoordinator> { return PollStats(active = activePolls, total = referencePool.values.size) } - private fun onEventGroupsReceived(eventGroup: List>) { - val egRefIds = eventGroup.map { it.first().referenceId() } - - val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key } - orphanedReferences.forEach { id -> referencePool.remove(id) } - - activePolls = referencePool.values.filter { it.isActive }.size - if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) { - log.info { "Last active references removed from pull pool, " } - } - - if (eventGroup.isEmpty()) { - return - } - - eventGroup.forEach { - val referenceId = it.first().referenceId() - - val isAvailable = if (referenceId in referencePool.keys) { - referencePool[referenceId]?.isActive != true - } else true - - if (isAvailable) { - referencePool[referenceId] = coroutine.async { - onEventsReceived(it) - } - } - } - } private var wasActiveNotify: Boolean = true private fun onEventCollectionReceived(referenceId: String, events: List) { @@ -130,7 +100,7 @@ abstract class EventCoordinator> { var cachedReferenceList: MutableList = mutableListOf() private fun pullForEvents() { coroutine.launch { - while (taskMode == ActiveMode.Active) { + while (taskMode == ActiveMode.Active && coroutine.isActive) { if (referencePoolIsReadyForEvents()) { log.debug { "New pull on database" } val referenceIdsAvailable = eventManager.getAvailableReferenceIds()