Waiting for listeners annotated with @Service to be loaded into coordinator
This commit is contained in:
parent
f847a0669c
commit
e889dc3c61
@ -1,8 +1,14 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.common
|
package no.iktdev.mediaprocessing.shared.common
|
||||||
|
|
||||||
|
import kotlinx.coroutines.Dispatchers
|
||||||
|
import kotlinx.coroutines.delay
|
||||||
|
import kotlinx.coroutines.launch
|
||||||
|
import kotlinx.coroutines.withContext
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.exfl.coroutines.Coroutines
|
import no.iktdev.exfl.coroutines.Coroutines
|
||||||
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
|
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
|
||||||
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
|
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
|
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
|
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
|
||||||
@ -10,12 +16,17 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
|||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
|
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||||
|
import org.springframework.context.ApplicationContext
|
||||||
import org.springframework.beans.factory.annotation.Autowired
|
import org.springframework.beans.factory.annotation.Autowired
|
||||||
|
import org.springframework.stereotype.Service
|
||||||
import javax.annotation.PostConstruct
|
import javax.annotation.PostConstruct
|
||||||
|
|
||||||
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
|
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
abstract val listeners: L
|
abstract val listeners: L
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private lateinit var context: ApplicationContext
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
lateinit var producer: CoordinatorProducer
|
lateinit var producer: CoordinatorProducer
|
||||||
@ -25,12 +36,30 @@ abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
|
|||||||
|
|
||||||
abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List<V>)
|
abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List<V>)
|
||||||
|
|
||||||
abstract fun onCoordinatorReady()
|
open fun onCoordinatorReady() {
|
||||||
abstract fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>)
|
|
||||||
@PostConstruct
|
|
||||||
fun onInitializationCompleted() {
|
|
||||||
onCoordinatorReady()
|
|
||||||
listener.onMessageReceived = { event -> onMessageReceived(event)}
|
listener.onMessageReceived = { event -> onMessageReceived(event)}
|
||||||
listener.listen(KafkaEnv.kafkaTopic)
|
listener.listen(KafkaEnv.kafkaTopic)
|
||||||
}
|
}
|
||||||
|
abstract fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>)
|
||||||
|
|
||||||
|
fun isAllServicesRegistered(): Boolean {
|
||||||
|
val services = context.getBeansWithAnnotation(Service::class.java).values.map { it.javaClass }.filter { TaskCreatorImpl.isInstanceOfTaskCreatorImpl(it) }
|
||||||
|
val loadedServices = listeners.listeners.map { it.taskHandler.javaClass as Class<Any> }
|
||||||
|
val notPresent = services.filter { it !in loadedServices }
|
||||||
|
return notPresent.isEmpty()
|
||||||
|
}
|
||||||
|
|
||||||
|
@PostConstruct
|
||||||
|
fun onInitializationCompleted() {
|
||||||
|
Coroutines.io().launch {
|
||||||
|
while (!isAllServicesRegistered()) {
|
||||||
|
log.info { "Waiting for mandatory services to start" }
|
||||||
|
delay(1000)
|
||||||
|
}
|
||||||
|
withContext(Dispatchers.Default) {
|
||||||
|
log.info { "Coordinator is Ready!" }
|
||||||
|
onCoordinatorReady()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@ -11,6 +11,13 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
|
|||||||
open var coordinator: C
|
open var coordinator: C
|
||||||
) : ITaskCreatorListener<V> {
|
) : ITaskCreatorListener<V> {
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
fun <T> isInstanceOfTaskCreatorImpl(clazz: Class<T>): Boolean {
|
||||||
|
val superClass = TaskCreatorImpl::class.java
|
||||||
|
return superClass.isAssignableFrom(clazz)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Event that the implementer sets
|
// Event that the implementer sets
|
||||||
abstract val producesEvent: KafkaEvents
|
abstract val producesEvent: KafkaEvents
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user