This commit is contained in:
bskjon 2024-04-20 03:24:00 +02:00
parent a275514696
commit 645c7b7a8e
3 changed files with 77 additions and 14 deletions

View File

@ -7,12 +7,16 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@EnableScheduling
@Service @Service
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() { class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
val io = Coroutines.io() val io = Coroutines.io()
@ -36,6 +40,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady() super.onCoordinatorReady()
log.info { "Converter Coordinator is ready" } log.info { "Converter Coordinator is ready" }
generateMissingEvents()
readAllInQueue() readAllInQueue()
} }
@ -46,8 +51,11 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
if (!success) { if (!success) {
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" } log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
} else { } else {
io.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId) readAllMessagesFor(event.value.referenceId, event.value.eventId)
} }
}
} else if (event.key == KafkaEvents.EventWorkExtractPerformed) { } else if (event.key == KafkaEvents.EventWorkExtractPerformed) {
readAllInQueue() readAllInQueue()
} else { } else {
@ -65,9 +73,34 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
} }
} }
private fun generateMissingEvents() {
val existing = eventManager.getAllProcessEvents().filter { it.event == KafkaEvents.EventWorkConvertCreated }.map { it.eventId }
val messages = eventManager.getEventsUncompleted()
val myEvents = messages.flatten()
.filter { it.event == KafkaEvents.EventWorkConvertCreated }
.filter { existing.none { en -> en == it.eventId } }
myEvents.forEach {
eventManager.setProcessEvent(it.event, Message(
referenceId = it.referenceId,
eventId = it.eventId,
data = it.data
))
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) { fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = eventManager.getProcessEventsClaimable() // persistentReader.getAvailableProcessEvents() val messages = eventManager.getProcessEventsClaimable() // persistentReader.getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
} }
@Scheduled(fixedDelay = (5*6_0000))
fun checkForWork() {
log.info { "Checking if there is any work to do.." }
readAllInQueue()
generateMissingEvents()
}
} }

View File

@ -76,14 +76,17 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
StartOperationEvents.CONVERT to hasConvertAndIsRequired StartOperationEvents.CONVERT to hasConvertAndIsRequired
) )
if (missingRequired.values.any { !it }) { if (missingRequired.values.any { !it }) {
log.info { "Waiting for ${missingRequired.entries.filter { !it.value }.map { it.key.name }}" } log.info { "Waiting for ${missingRequired.entries.filter { !it.value }.map { it.key.name }}" }
return null return null
} }
val ch = CompleteHandler(events)
val chEvents = ch.getMissingCompletions()
if (chEvents.isNotEmpty()) {
log.info { "Waiting for ${chEvents.joinToString { "," }}" }
return null
}
@ -93,4 +96,35 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
} }
return null return null
} }
class CompleteHandler(val events: List<PersistentMessage>) {
var report: Map<KafkaEvents, Int> = listOf(
EventReport.fromEvents(EventWorkEncodeCreated, events),
EventReport.fromEvents(EventWorkExtractCreated, events),
EventReport.fromEvents(EventWorkConvertCreated, events),
EventReport.fromEvents(EventWorkEncodePerformed, events),
EventReport.fromEvents(EventWorkExtractPerformed, events),
EventReport.fromEvents(EventWorkConvertPerformed, events)
).associate { it.event to it.count }
fun getMissingCompletions(): List<StartOperationEvents> {
val missings = mutableListOf<StartOperationEvents>()
if (report[EventWorkEncodeCreated] != report[EventWorkEncodePerformed])
missings.add(StartOperationEvents.ENCODE)
if (report[EventWorkExtractCreated] != report[EventWorkExtractPerformed])
missings.add(StartOperationEvents.EXTRACT)
if (report[EventWorkConvertCreated] == report[EventWorkConvertPerformed])
missings.add(StartOperationEvents.CONVERT)
return missings
}
data class EventReport(val event: KafkaEvents, val count: Int) {
companion object {
fun fromEvents(event: KafkaEvents, events: List<PersistentMessage>): EventReport {
return EventReport(event = event, count = events.filter { it.event == event }.size)
}
}
}
}
} }

View File

@ -96,28 +96,23 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
} }
private fun generateMissingEvents() { private fun generateMissingEvents() {
val existing = eventManager.getAllProcessEvents().map { it.eventId } val existing = eventManager.getAllProcessEvents().filter { it.event in processKafkaEvents }.map { it.eventId }
val messages = eventManager.getEventsUncompleted() val messages = eventManager.getEventsUncompleted()
val usableMessages = messages.filter { lists -> lists.any { it.event in processKafkaEvents } } val myEvents = messages.flatten()
val validMessages = usableMessages.filter { lists ->
lists.any { it.event == KafkaEvents.EventMediaProcessStarted && (it.data as MediaProcessStarted).type == ProcessType.FLOW } ||
lists.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted }
}
.flatten()
.filter { it.event in processKafkaEvents } .filter { it.event in processKafkaEvents }
.filter { existing.none { en -> en == it.eventId } }
validMessages.filter { it.eventId !in existing }.forEach { myEvents.forEach {
eventManager.setProcessEvent(it.event, Message( eventManager.setProcessEvent(it.event, Message(
referenceId = it.referenceId, referenceId = it.referenceId,
eventId = it.eventId, eventId = it.eventId,
data = it.data data = it.data
)) ))
} }
} }
fun readAllMessagesFor(referenceId: String, eventId: String) { fun readAllMessagesFor(referenceId: String, eventId: String) {
@ -139,6 +134,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
fun checkForWork() { fun checkForWork() {
log.info { "Checking if there is any work to do.." } log.info { "Checking if there is any work to do.." }
readAllAvailableInQueue() readAllAvailableInQueue()
generateMissingEvents()
} }
interface CoordinatorEvents { interface CoordinatorEvents {