Logging + diagnostics
This commit is contained in:
parent
518f4726cf
commit
4d213a2b23
@ -20,6 +20,7 @@ abstract class EventPollerImplementation(
|
|||||||
private val log = KotlinLogging.logger {}
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
open suspend fun start() {
|
open suspend fun start() {
|
||||||
|
log.info { "EventPoller starting with initial backoff=$backoff" }
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
pollOnce()
|
pollOnce()
|
||||||
@ -32,9 +33,12 @@ abstract class EventPollerImplementation(
|
|||||||
}
|
}
|
||||||
|
|
||||||
suspend fun pollOnce() {
|
suspend fun pollOnce() {
|
||||||
|
val pollStartedAt = LocalDateTime.now()
|
||||||
|
log.debug { "Polling for new events" }
|
||||||
val newPersisted = eventStore.getPersistedEventsAfter(lastSeenTime)
|
val newPersisted = eventStore.getPersistedEventsAfter(lastSeenTime)
|
||||||
|
|
||||||
if (newPersisted.isEmpty()) {
|
if (newPersisted.isEmpty()) {
|
||||||
|
log.debug { "No new events found. Backing off for $backoff" }
|
||||||
delay(backoff.toMillis())
|
delay(backoff.toMillis())
|
||||||
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
backoff = backoff.multipliedBy(2).coerceAtMost(maxBackoff)
|
||||||
return
|
return
|
||||||
@ -54,8 +58,10 @@ abstract class EventPollerImplementation(
|
|||||||
val events = fullLog.mapNotNull { it.toEvent() }
|
val events = fullLog.mapNotNull { it.toEvent() }
|
||||||
|
|
||||||
dispatchQueue.dispatch(referenceId, events, dispatcher)
|
dispatchQueue.dispatch(referenceId, events, dispatcher)
|
||||||
lastSeenTime = fullLog.maxOf { it.persistedAt }
|
|
||||||
}
|
}
|
||||||
|
val maxPersistedAt = newPersisted.maxOf { it.persistedAt }
|
||||||
|
log.debug { "Updating lastSeenTime from $lastSeenTime to ${minOf(pollStartedAt, maxPersistedAt).plusNanos(1)}" }
|
||||||
|
lastSeenTime = minOf(pollStartedAt, maxPersistedAt).plusNanos(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user