This commit is contained in:
bskjon 2024-04-17 02:51:33 +02:00
parent 85caa81781
commit 80a7d3e706
5 changed files with 15 additions and 10 deletions

View File

@ -34,6 +34,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
} }
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady()
log.info { "Converter Coordinator is ready" } log.info { "Converter Coordinator is ready" }
readAllInQueue() readAllInQueue()
} }

View File

@ -22,6 +22,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
val io = Coroutines.io() val io = Coroutines.io()
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady()
readAllUncompletedMessagesInQueue() readAllUncompletedMessagesInQueue()
} }

View File

@ -46,6 +46,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
} }
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady()
generateMissingEvents() generateMissingEvents()
readAllAvailableInQueue() readAllAvailableInQueue()
} }

View File

@ -28,7 +28,7 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
override val listeners = PersistentEventBasedMessageListener() override val listeners = PersistentEventBasedMessageListener()
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady()
} }
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) { override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {

View File

@ -1,9 +1,6 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
@ -37,6 +34,7 @@ abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List<V>) abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List<V>)
open fun onCoordinatorReady() { open fun onCoordinatorReady() {
log.info { "Attaching listeners to Coordinator" }
listener.onMessageReceived = { event -> onMessageReceived(event)} listener.onMessageReceived = { event -> onMessageReceived(event)}
listener.listen(KafkaEnv.kafkaTopic) listener.listen(KafkaEnv.kafkaTopic)
} }
@ -46,20 +44,24 @@ abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
val services = context.getBeansWithAnnotation(Service::class.java).values.map { it.javaClass }.filter { TaskCreatorImpl.isInstanceOfTaskCreatorImpl(it) } val services = context.getBeansWithAnnotation(Service::class.java).values.map { it.javaClass }.filter { TaskCreatorImpl.isInstanceOfTaskCreatorImpl(it) }
val loadedServices = listeners.listeners.map { it.taskHandler.javaClass as Class<Any> } val loadedServices = listeners.listeners.map { it.taskHandler.javaClass as Class<Any> }
val notPresent = services.filter { it !in loadedServices } val notPresent = services.filter { it !in loadedServices }
notPresent.forEach {
log.warn { "Waiting for ${it.simpleName} to attach.." }
}
return notPresent.isEmpty() return notPresent.isEmpty()
} }
@PostConstruct @PostConstruct
fun onInitializationCompleted() { fun onInitializationCompleted() {
Coroutines.io().launch { Coroutines.default().launch {
while (!isAllServicesRegistered()) { while (!isAllServicesRegistered()) {
log.info { "Waiting for mandatory services to start" } log.info { "Waiting for mandatory services to start" }
delay(1000) delay(1000)
} }
withContext(Dispatchers.Default) { }.invokeOnCompletion {
log.info { "Coordinator is Ready!" }
onCoordinatorReady() onCoordinatorReady()
} log.info { "Coordinator is Ready!" }
} }
} }
} }