Adjustments
This commit is contained in:
parent
f416698ad8
commit
50b171df65
@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
|||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
|
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.JsonDataMessage
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed
|
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed
|
||||||
import org.springframework.scheduling.annotation.EnableScheduling
|
import org.springframework.scheduling.annotation.EnableScheduling
|
||||||
import org.springframework.scheduling.annotation.Scheduled
|
import org.springframework.scheduling.annotation.Scheduled
|
||||||
@ -46,6 +47,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onCoordinatorReady() {
|
override fun onCoordinatorReady() {
|
||||||
|
generateMissingEvents()
|
||||||
readAllAvailableInQueue()
|
readAllAvailableInQueue()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,6 +93,25 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun generateMissingEvents() {
|
||||||
|
val existing = eventManager.getAllProcessEvents().map { it.eventId }
|
||||||
|
val messages = eventManager.getEventsUncompleted()
|
||||||
|
|
||||||
|
val validMessages = messages.filter { list -> list.any { it.event in processKafkaEvents } }
|
||||||
|
.filter { list ->
|
||||||
|
list.any { it.event == KafkaEvents.EventMediaProcessStarted } ||
|
||||||
|
(list.any { it.event == KafkaEvents.EventMediaWorkProceedPermitted } && list.any { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_STARTED })
|
||||||
|
}.flatten().filter { it.event in processKafkaEvents }
|
||||||
|
|
||||||
|
validMessages.filter { it.eventId !in existing }.forEach {
|
||||||
|
eventManager.setProcessEvent(it.event, Message(
|
||||||
|
referenceId = it.referenceId,
|
||||||
|
eventId = it.eventId,
|
||||||
|
data = it.data
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fun readAllMessagesFor(referenceId: String, eventId: String) {
|
fun readAllMessagesFor(referenceId: String, eventId: String) {
|
||||||
val messages = eventManager.getProcessEventsClaimable()
|
val messages = eventManager.getProcessEventsClaimable()
|
||||||
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
|
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user