This commit is contained in:
bskjon 2025-04-06 22:43:05 +02:00
parent cd821b7e08
commit 33736ceca7
4 changed files with 19 additions and 35 deletions

View File

@ -14,7 +14,9 @@ import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.database.cal.EventsManager
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.ApplicationContext
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Component
import java.io.File
import java.util.*
@ -28,7 +30,9 @@ class Coordinator(
) : EventCoordinator<Event, EventsManager>() {
init {
@EventListener(ApplicationReadyEvent::class)
fun onApplicationReady() {
onReady()
}
fun getProducerName(): String {

View File

@ -107,6 +107,9 @@ class CompletedTaskListener : CoordinatorEventListener() {
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (doNotProduceComplete) {
return false
}
val result = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return result && incomingEvent.eventType == Events.PersistContent
}

View File

@ -76,6 +76,9 @@ class PersistContentTaskListener : CoordinatorEventListener() {
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (doNotProduceComplete) {
return false
}
val result = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return result
}
@ -84,6 +87,10 @@ class PersistContentTaskListener : CoordinatorEventListener() {
val event = incomingEvent.consume() ?: return
active = true
if (doNotProduceComplete) {
return
}
val mediaInfo: ComposedMediaInfo = composeMediaInfo(events) ?: run {
log.error { "Unable to compose media info for ${event.referenceId()}" }
return

View File

@ -22,17 +22,16 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
//private val listeners: MutableList<EventListener<T>> = mutableListOf()
private val log = KotlinLogging.logger {}
private var coroutine = CoroutineScope(Dispatchers.IO + Job())
private var coroutine = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var ready: Boolean = false
open var ready: Boolean = false
fun isReady(): Boolean {
return ready
}
init {
open fun onReady() {
ready = true
pullForEvents()
}
@ -52,35 +51,6 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
return PollStats(active = activePolls, total = referencePool.values.size)
}
private fun onEventGroupsReceived(eventGroup: List<List<T>>) {
val egRefIds = eventGroup.map { it.first().referenceId() }
val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key }
orphanedReferences.forEach { id -> referencePool.remove(id) }
activePolls = referencePool.values.filter { it.isActive }.size
if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) {
log.info { "Last active references removed from pull pool, " }
}
if (eventGroup.isEmpty()) {
return
}
eventGroup.forEach {
val referenceId = it.first().referenceId()
val isAvailable = if (referenceId in referencePool.keys) {
referencePool[referenceId]?.isActive != true
} else true
if (isAvailable) {
referencePool[referenceId] = coroutine.async {
onEventsReceived(it)
}
}
}
}
private var wasActiveNotify: Boolean = true
private fun onEventCollectionReceived(referenceId: String, events: List<T>) {
@ -130,7 +100,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
var cachedReferenceList: MutableList<String> = mutableListOf()
private fun pullForEvents() {
coroutine.launch {
while (taskMode == ActiveMode.Active) {
while (taskMode == ActiveMode.Active && coroutine.isActive) {
if (referencePoolIsReadyForEvents()) {
log.debug { "New pull on database" }
val referenceIdsAvailable = eventManager.getAvailableReferenceIds()