Set time changes

This commit is contained in:
Brage Skjønborg 2026-01-22 03:22:10 +01:00
parent 674a536818
commit d31448e26c

View File

@ -48,21 +48,32 @@ abstract class EventPollerImplementation(
val grouped = newPersisted.groupBy { it.referenceId } val grouped = newPersisted.groupBy { it.referenceId }
for ((referenceId, _) in grouped) { // Samle persistedAt KUN for referanser vi faktisk dispatchet
if (dispatchQueue.isProcessing(referenceId)){ val processedTimes = mutableListOf<LocalDateTime>()
for ((referenceId, eventsForRef) in grouped) {
if (dispatchQueue.isProcessing(referenceId)) {
log.debug { "Skipping dispatch for $referenceId as it is already being processed" } log.debug { "Skipping dispatch for $referenceId as it is already being processed" }
continue continue
} }
val fullLog = eventStore.getPersistedEventsFor(referenceId) val fullLog = eventStore.getPersistedEventsFor(referenceId)
val events = fullLog.mapNotNull { it.toEvent() } processedTimes += fullLog.map { it.persistedAt }
val events = fullLog.mapNotNull { it.toEvent() }
dispatchQueue.dispatch(referenceId, events, dispatcher) dispatchQueue.dispatch(referenceId, events, dispatcher)
} }
val maxPersistedAt = newPersisted.maxOf { it.persistedAt }
val newCheckpointTime = minOf(pollStartedAt, maxPersistedAt) if (processedTimes.isNotEmpty()) {
log.debug { "Updating lastSeenTime from $lastSeenTime to $newCheckpointTime" } val maxPersistedAt = processedTimes.max()
lastSeenTime = newCheckpointTime val newLastSeen = minOf(pollStartedAt, maxPersistedAt).plusNanos(1)
log.debug { "Updating lastSeenTime from $lastSeenTime to $newLastSeen" }
lastSeenTime = newLastSeen
} else {
// Ingen referanser ble dispatchet → IKKE oppdater lastSeenTime
log.debug { "No dispatches performed; lastSeenTime remains $lastSeenTime" }
}
} }
} }