Creating a cache of "consumed" eventIds
This commit is contained in:
parent
29fc5fa118
commit
d6a8ea6297
@ -34,7 +34,6 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
|
||||
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
|
||||
} else {
|
||||
io.launch {
|
||||
delay(1000) // Give the database a few sec to update
|
||||
readAllMessagesFor(event.value.referenceId, event.value.eventId, event.key.event)
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,12 +4,14 @@ import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMe
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
|
||||
|
||||
abstract class TaskCreator(coordinator: Coordinator):
|
||||
TaskCreatorImpl<Coordinator, PersistentMessage, PersistentEventBasedMessageListener>(coordinator) {
|
||||
|
||||
|
||||
|
||||
override fun isPrerequisiteEventsOk(events: List<PersistentMessage>): Boolean {
|
||||
val currentEvents = events.map { it.event }
|
||||
return requiredEvents.all { currentEvents.contains(it) }
|
||||
@ -39,4 +41,28 @@ abstract class TaskCreator(coordinator: Coordinator):
|
||||
return listOf()
|
||||
}
|
||||
|
||||
/**
|
||||
* Will always return null
|
||||
*/
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
val referenceId = event.referenceId
|
||||
val eventIds = events.filter { it.event in requiredEvents }.map { it.eventId }
|
||||
|
||||
val current = processedEvents[referenceId] ?: setOf()
|
||||
current.toMutableSet().addAll(eventIds)
|
||||
processedEvents[referenceId] = current
|
||||
|
||||
return null
|
||||
}
|
||||
|
||||
override fun containsUnprocessedEvents(events: List<PersistentMessage>): Boolean {
|
||||
val referenceId = events.firstOrNull()?.referenceId ?:return false
|
||||
val preExistingEvents = processedEvents[referenceId]?: setOf()
|
||||
|
||||
val forwardedEvents = events.filter { it.event in requiredEvents }.map { it.eventId }
|
||||
val newEvents = forwardedEvents.filter { it !in preExistingEvents }
|
||||
return newEvents.isNotEmpty()
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -33,6 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null
|
||||
return readFileInfo(selected.data as MediaProcessStarted, event.eventId)
|
||||
|
||||
@ -45,6 +45,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
|
||||
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
||||
|
||||
@ -34,6 +34,8 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
|
||||
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
|
||||
|
||||
@ -27,6 +27,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
|
||||
)
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }
|
||||
|
||||
// Check what it is and create based on it
|
||||
|
||||
@ -19,6 +19,8 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : C
|
||||
get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated)
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterEncodeCreated) {
|
||||
|
||||
@ -19,6 +19,8 @@ class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) :
|
||||
get() = listOf(KafkaEvents.EventMediaParameterExtractCreated)
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterExtractCreated) {
|
||||
|
||||
@ -39,6 +39,8 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover }
|
||||
|
||||
@ -37,6 +37,8 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
|
||||
|
||||
@ -50,6 +50,8 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
|
||||
)
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EventMediaReadBaseInfoPerformed) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? ?: return null
|
||||
|
||||
@ -38,6 +38,8 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null
|
||||
return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId)
|
||||
|
||||
@ -41,6 +41,8 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
|
||||
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
|
||||
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) }
|
||||
|
||||
@ -41,6 +41,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
|
||||
@ -68,7 +70,6 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
|
||||
|
||||
|
||||
//val outDir = SharedConfig.outgoingContent.using(baseInfo.title)
|
||||
|
||||
return getFfmpegVideoArguments(
|
||||
inputFile = inputFile.file,
|
||||
outFullName = videoInfo.fullName,
|
||||
|
||||
@ -45,6 +45,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
|
||||
}
|
||||
|
||||
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
|
||||
super.onProcessEvents(event, events)
|
||||
|
||||
log.info { "${event.referenceId} triggered by ${event.event}" }
|
||||
|
||||
if (!requiredEvents.contains(event.event)) {
|
||||
|
||||
@ -1,6 +1,8 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.tasks
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
|
||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
@ -10,6 +12,9 @@ import javax.annotation.PostConstruct
|
||||
abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessageListener<V>>(
|
||||
open var coordinator: C
|
||||
) : ITaskCreatorListener<V> {
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
protected open val processedEvents: MutableMap<String, Set<String>> = mutableMapOf()
|
||||
|
||||
companion object {
|
||||
fun <T> isInstanceOfTaskCreatorImpl(clazz: Class<T>): Boolean {
|
||||
@ -82,6 +87,7 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
|
||||
private val context: MutableMap<String, Any> = mutableMapOf()
|
||||
private val context_key_reference = "reference"
|
||||
private val context_key_producesEvent = "event"
|
||||
|
||||
final override fun onEventReceived(referenceId: String, event: V, events: List<V>) {
|
||||
context[context_key_reference] = referenceId
|
||||
getListener().producesEvent.let {
|
||||
@ -89,6 +95,12 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
|
||||
}
|
||||
|
||||
if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) {
|
||||
|
||||
if (!containsUnprocessedEvents(events)) {
|
||||
log.warn { "Event register blocked proceeding" }
|
||||
return
|
||||
}
|
||||
|
||||
val result = onProcessEvents(event, events)
|
||||
if (result != null) {
|
||||
onResult(result)
|
||||
@ -99,7 +111,15 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This function is intended to cache the referenceId and its eventid's
|
||||
* This is to prevent dupliation
|
||||
* */
|
||||
abstract fun containsUnprocessedEvents(events: List<V>): Boolean
|
||||
|
||||
|
||||
protected fun onResult(data: MessageDataWrapper) {
|
||||
|
||||
producer.sendMessage(
|
||||
referenceId = context[context_key_reference] as String,
|
||||
event = context[context_key_producesEvent] as KafkaEvents,
|
||||
|
||||
Loading…
Reference in New Issue
Block a user