diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index 618a91c8..9d50a9dc 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -34,6 +34,7 @@ class ConverterCoordinator() : CoordinatorBase>) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index 048fcbe0..93a054b8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -1,9 +1,6 @@ package no.iktdev.mediaprocessing.shared.common -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.withContext +import kotlinx.coroutines.* import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage @@ -37,6 +34,7 @@ abstract class CoordinatorBase> { abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List) open fun onCoordinatorReady() { + log.info { "Attaching listeners to Coordinator" } listener.onMessageReceived = { event -> onMessageReceived(event)} listener.listen(KafkaEnv.kafkaTopic) } @@ -46,20 +44,24 @@ abstract class CoordinatorBase> { val services = context.getBeansWithAnnotation(Service::class.java).values.map { it.javaClass }.filter { TaskCreatorImpl.isInstanceOfTaskCreatorImpl(it) } val loadedServices = listeners.listeners.map { it.taskHandler.javaClass as Class } val notPresent = services.filter { it !in loadedServices } + + notPresent.forEach { + log.warn { "Waiting for ${it.simpleName} to attach.." } + } + return notPresent.isEmpty() } @PostConstruct fun onInitializationCompleted() { - Coroutines.io().launch { + Coroutines.default().launch { while (!isAllServicesRegistered()) { log.info { "Waiting for mandatory services to start" } delay(1000) } - withContext(Dispatchers.Default) { - log.info { "Coordinator is Ready!" } - onCoordinatorReady() - } + }.invokeOnCompletion { + onCoordinatorReady() + log.info { "Coordinator is Ready!" } } } } \ No newline at end of file