diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index f292a573..048fcbe0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -1,8 +1,14 @@ package no.iktdev.mediaprocessing.shared.common +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv @@ -10,12 +16,17 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.context.ApplicationContext import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service import javax.annotation.PostConstruct abstract class CoordinatorBase> { + private val log = KotlinLogging.logger {} abstract val listeners: L + @Autowired + private lateinit var context: ApplicationContext @Autowired lateinit var producer: CoordinatorProducer @@ -25,12 +36,30 @@ abstract class CoordinatorBase> { abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List) - abstract fun onCoordinatorReady() - abstract fun onMessageReceived(event: DeserializedConsumerRecord>) - @PostConstruct - fun onInitializationCompleted() { - onCoordinatorReady() + open fun onCoordinatorReady() { listener.onMessageReceived = { event -> onMessageReceived(event)} listener.listen(KafkaEnv.kafkaTopic) } + abstract fun onMessageReceived(event: DeserializedConsumerRecord>) + + fun isAllServicesRegistered(): Boolean { + val services = context.getBeansWithAnnotation(Service::class.java).values.map { it.javaClass }.filter { TaskCreatorImpl.isInstanceOfTaskCreatorImpl(it) } + val loadedServices = listeners.listeners.map { it.taskHandler.javaClass as Class } + val notPresent = services.filter { it !in loadedServices } + return notPresent.isEmpty() + } + + @PostConstruct + fun onInitializationCompleted() { + Coroutines.io().launch { + while (!isAllServicesRegistered()) { + log.info { "Waiting for mandatory services to start" } + delay(1000) + } + withContext(Dispatchers.Default) { + log.info { "Coordinator is Ready!" } + onCoordinatorReady() + } + } + } } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt index 736c935a..102aaf82 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -11,6 +11,13 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa open var coordinator: C ) : ITaskCreatorListener { + companion object { + fun isInstanceOfTaskCreatorImpl(clazz: Class): Boolean { + val superClass = TaskCreatorImpl::class.java + return superClass.isAssignableFrom(clazz) + } + } + // Event that the implementer sets abstract val producesEvent: KafkaEvents