Filter
This commit is contained in:
parent
d88c75a8cc
commit
e159560bca
@ -48,6 +48,10 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
|
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
|
||||||
|
if (!processKafkaEvents.contains(event.key)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
|
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
|
||||||
if (!success) {
|
if (!success) {
|
||||||
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
|
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user