diff --git a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt index 0b3c85b..e23de25 100644 --- a/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt +++ b/src/main/kotlin/no/iktdev/eventi/events/EventPollerImplementation.kt @@ -48,21 +48,32 @@ abstract class EventPollerImplementation( val grouped = newPersisted.groupBy { it.referenceId } - for ((referenceId, _) in grouped) { - if (dispatchQueue.isProcessing(referenceId)){ + // Samle persistedAt KUN for referanser vi faktisk dispatch’et + val processedTimes = mutableListOf() + + for ((referenceId, eventsForRef) in grouped) { + if (dispatchQueue.isProcessing(referenceId)) { log.debug { "Skipping dispatch for $referenceId as it is already being processed" } continue } val fullLog = eventStore.getPersistedEventsFor(referenceId) - val events = fullLog.mapNotNull { it.toEvent() } + processedTimes += fullLog.map { it.persistedAt } + val events = fullLog.mapNotNull { it.toEvent() } dispatchQueue.dispatch(referenceId, events, dispatcher) } - val maxPersistedAt = newPersisted.maxOf { it.persistedAt } - val newCheckpointTime = minOf(pollStartedAt, maxPersistedAt) - log.debug { "Updating lastSeenTime from $lastSeenTime to $newCheckpointTime" } - lastSeenTime = newCheckpointTime + + if (processedTimes.isNotEmpty()) { + val maxPersistedAt = processedTimes.max() + val newLastSeen = minOf(pollStartedAt, maxPersistedAt).plusNanos(1) + log.debug { "Updating lastSeenTime from $lastSeenTime to $newLastSeen" } + lastSeenTime = newLastSeen + } else { + // Ingen referanser ble dispatch’et → IKKE oppdater lastSeenTime + log.debug { "No dispatches performed; lastSeenTime remains $lastSeenTime" } + } } + }