diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index d7f2a7db..6b431d1c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -167,15 +167,6 @@ class PersistentEventManager(private val dataSource: DataSource) { * @param message Kafka message object */ fun setEvent(event: KafkaEvents, message: Message<*>): Boolean { - val existing = getEventsWith(message.referenceId) - val derivedId = message.data?.derivedFromEventId - if (derivedId != null) { - val isNewEventOrphan = existing.none { it.eventId == derivedId } - if (isNewEventOrphan) { - log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" } - return false - } - } withTransaction(dataSource.database) { allEvents.insert { @@ -187,6 +178,18 @@ class PersistentEventManager(private val dataSource: DataSource) { } } + val existing = getEventsWith(message.referenceId) + + val derivedId = message.data?.derivedFromEventId + if (derivedId != null) { + val isNewEventOrphan = existing.none { it.eventId == derivedId } + if (isNewEventOrphan) { + log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" } + return false + } + } + + val exception = executeOrException(dataSource.database) { events.insert { it[referenceId] = message.referenceId