New removal from queue

This commit is contained in:
bskjon 2024-04-26 00:08:58 +02:00
parent 13651b6e90
commit a455146441

View File

@ -21,7 +21,6 @@ import java.util.UUID
@EnableScheduling @EnableScheduling
@Service @Service
class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() { class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
val io = Coroutines.io()
override fun onCoordinatorReady() { override fun onCoordinatorReady() {
super.onCoordinatorReady() super.onCoordinatorReady()
@ -33,7 +32,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
if (!success) { if (!success) {
log.error { "Failed to store message event\nReferenceId: ${event.value.referenceId}\n\tEventId: ${event.value.eventId}\n\tEvent: ${event.key.event}\n\nData:\n${event.value.data}" } log.error { "Failed to store message event\nReferenceId: ${event.value.referenceId}\n\tEventId: ${event.value.eventId}\n\tEvent: ${event.key.event}\n\nData:\n${event.value.data}" }
} else { } else {
io.launch { ioCoroutine.launch {
readAllMessagesFor(event.value.referenceId, event.value.eventId, event.key.event) readAllMessagesFor(event.value.referenceId, event.value.eventId, event.key.event)
} }
} }
@ -97,7 +96,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
log.info { "Found uncompleted: ${it.referenceId}" } log.info { "Found uncompleted: ${it.referenceId}" }
} }
} }
io.launch { ioCoroutine.launch {
messages.forEach { messages.forEach {
delay(1000) delay(1000)
try { try {
@ -113,7 +112,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
val messages = eventManager.getEventsWith(referenceId) val messages = eventManager.getEventsWith(referenceId)
if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) { if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) {
log.warn { "EventId ($eventId) for ReferenceId ($referenceId) with event $event has not been made available in the database yet." } log.warn { "EventId ($eventId) for ReferenceId ($referenceId) with event $event has not been made available in the database yet." }
io.launch { ioCoroutine.launch {
val fixedDelay = 1000L val fixedDelay = 1000L
delay(fixedDelay) delay(fixedDelay)
var delayed = 0L var delayed = 0L