Adjustment

This commit is contained in:
bskjon 2024-04-22 02:07:57 +02:00
parent d10bca3d1a
commit e5bb1c1733
14 changed files with 17 additions and 16 deletions

View File

@ -44,7 +44,7 @@ abstract class TaskCreator(coordinator: Coordinator):
/** /**
* Will always return null * Will always return null
*/ */
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { open fun onProcessEventsAccepted(event: PersistentMessage, events: List<PersistentMessage>) {
val referenceId = event.referenceId val referenceId = event.referenceId
val eventIds = events.filter { it.event in requiredEvents + listensForEvents }.map { it.eventId } val eventIds = events.filter { it.event in requiredEvents + listensForEvents }.map { it.eventId }
@ -52,7 +52,9 @@ abstract class TaskCreator(coordinator: Coordinator):
current.toMutableSet().addAll(eventIds) current.toMutableSet().addAll(eventIds)
processedEvents[referenceId] = current processedEvents[referenceId] = current
return null if (event.event == KafkaEvents.EventCollectAndStore) {
processedEvents.remove(referenceId)
}
} }
override fun containsUnprocessedEvents(events: List<PersistentMessage>): Boolean { override fun containsUnprocessedEvents(events: List<PersistentMessage>): Boolean {

View File

@ -33,7 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null
return readFileInfo(selected.data as MediaProcessStarted, event.eventId) return readFileInfo(selected.data as MediaProcessStarted, event.eventId)

View File

@ -45,8 +45,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null

View File

@ -34,7 +34,7 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -27,7 +27,7 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
) )
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }

View File

@ -19,7 +19,7 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : C
get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated) get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -19,7 +19,7 @@ class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) :
get() = listOf(KafkaEvents.EventMediaParameterExtractCreated) get() = listOf(KafkaEvents.EventMediaParameterExtractCreated)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -39,7 +39,7 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -37,7 +37,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -50,7 +50,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
) )
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -38,7 +38,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null

View File

@ -41,7 +41,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null

View File

@ -41,7 +41,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }

View File

@ -45,7 +45,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
} }
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEvents(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }