Changed behaviour
This commit is contained in:
parent
f3c5beaacd
commit
88c9d3c306
@ -167,15 +167,6 @@ class PersistentEventManager(private val dataSource: DataSource) {
|
|||||||
* @param message Kafka message object
|
* @param message Kafka message object
|
||||||
*/
|
*/
|
||||||
fun setEvent(event: KafkaEvents, message: Message<*>): Boolean {
|
fun setEvent(event: KafkaEvents, message: Message<*>): Boolean {
|
||||||
val existing = getEventsWith(message.referenceId)
|
|
||||||
val derivedId = message.data?.derivedFromEventId
|
|
||||||
if (derivedId != null) {
|
|
||||||
val isNewEventOrphan = existing.none { it.eventId == derivedId }
|
|
||||||
if (isNewEventOrphan) {
|
|
||||||
log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" }
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
withTransaction(dataSource.database) {
|
withTransaction(dataSource.database) {
|
||||||
allEvents.insert {
|
allEvents.insert {
|
||||||
@ -187,6 +178,18 @@ class PersistentEventManager(private val dataSource: DataSource) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val existing = getEventsWith(message.referenceId)
|
||||||
|
|
||||||
|
val derivedId = message.data?.derivedFromEventId
|
||||||
|
if (derivedId != null) {
|
||||||
|
val isNewEventOrphan = existing.none { it.eventId == derivedId }
|
||||||
|
if (isNewEventOrphan) {
|
||||||
|
log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" }
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
val exception = executeOrException(dataSource.database) {
|
val exception = executeOrException(dataSource.database) {
|
||||||
events.insert {
|
events.insert {
|
||||||
it[referenceId] = message.referenceId
|
it[referenceId] = message.referenceId
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user