Small tweaks
This commit is contained in:
parent
5e24246912
commit
e169fc39e5
@ -75,7 +75,7 @@ class Coordinator(
|
||||
|
||||
|
||||
produceNewEvent(
|
||||
no.iktdev.mediaprocessing.shared.common.contract.data.PermitWorkCreationEvent(
|
||||
PermitWorkCreationEvent(
|
||||
metadata = EventMetadata(
|
||||
referenceId = referenceId,
|
||||
derivedFromEventId = eventToAttachTo.eventId(),
|
||||
|
||||
@ -172,7 +172,7 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
val viableEvents = events.filter { it.isSuccessful() }
|
||||
|
||||
|
||||
if (!req1(started, viableEvents)) {
|
||||
if (!req1(started, events)) {
|
||||
//log.info { "${this::class.java.simpleName} Failed Req1" }
|
||||
return false
|
||||
}
|
||||
@ -442,6 +442,9 @@ class CompletedTaskListener: CoordinatorEventListener() {
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
|
||||
|
||||
active = false
|
||||
}
|
||||
|
||||
|
||||
@ -63,12 +63,10 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
|
||||
return
|
||||
}
|
||||
|
||||
if (hasPollerForMetadataEvent && hasMetadataSearched) {
|
||||
if (hasMetadataSearched) {
|
||||
waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId)
|
||||
return
|
||||
}
|
||||
|
||||
if (!hasMetadataSearched && !hasPollerForMetadataEvent) {
|
||||
} else if (!hasPollerForMetadataEvent) {
|
||||
val consumedIncoming = incomingEvent.consume()
|
||||
if (consumedIncoming == null) {
|
||||
log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" }
|
||||
|
||||
@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.watcher
|
||||
import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
|
||||
import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
|
||||
import dev.vishna.watchservice.asWatchChannel
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.channels.consumeEach
|
||||
import kotlinx.coroutines.coroutineScope
|
||||
import kotlinx.coroutines.launch
|
||||
@ -51,6 +52,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
|
||||
}
|
||||
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
suspend fun watchFiles() {
|
||||
log.info { "Starting Watcher" }
|
||||
watcherChannel.consumeEach {
|
||||
|
||||
@ -8,6 +8,7 @@ import no.iktdev.eventi.data.toJson
|
||||
import no.iktdev.eventi.database.DataSource
|
||||
import no.iktdev.eventi.database.isCausedByDuplicateError
|
||||
import no.iktdev.eventi.database.isExposedSqlException
|
||||
import no.iktdev.eventi.database.withDirtyRead
|
||||
import no.iktdev.eventi.implementations.EventsManagerImpl
|
||||
import no.iktdev.mediaprocessing.shared.common.database.tables.allEvents
|
||||
import no.iktdev.mediaprocessing.shared.common.database.tables.events
|
||||
@ -90,10 +91,22 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
return event in exemptedFromSingleEvent
|
||||
}
|
||||
|
||||
override fun getAvailableReferenceIds(): List<String> {
|
||||
return withDirtyRead(dataSource.database) {
|
||||
val completedEvents = events
|
||||
.slice(events.referenceId)
|
||||
.select { events.event eq Events.EventMediaProcessCompleted.event }
|
||||
|
||||
events
|
||||
.slice(events.referenceId)
|
||||
.select { events.referenceId notInSubQuery completedEvents }
|
||||
.withDistinct()
|
||||
.map { it[events.referenceId] } ?: emptyList()
|
||||
} ?: emptyList()
|
||||
}
|
||||
|
||||
override fun readAvailableEvents(): List<List<Event>> {
|
||||
return no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
return withDirtyRead(dataSource.database) {
|
||||
events.selectAll()
|
||||
.groupBy { it[events.referenceId] }
|
||||
.mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }.filter { it.none { e -> e.eventType == Events.EventMediaProcessCompleted } }
|
||||
@ -101,7 +114,7 @@ class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource)
|
||||
}
|
||||
|
||||
override fun readAvailableEventsFor(referenceId: String): List<Event> {
|
||||
val events = no.iktdev.eventi.database.withTransaction(dataSource.database) {
|
||||
val events = withDirtyRead(dataSource.database) {
|
||||
events.select { events.referenceId eq referenceId }
|
||||
.mapNotNull { it.toEvent() }
|
||||
} ?: emptyList()
|
||||
|
||||
@ -53,6 +53,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
||||
|
||||
private fun onEventGroupsReceived(eventGroup: List<List<T>>) {
|
||||
val egRefIds = eventGroup.map { it.first().referenceId() }
|
||||
|
||||
val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key }
|
||||
orphanedReferences.forEach { id -> referencePool.remove(id) }
|
||||
|
||||
@ -80,6 +81,28 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
||||
}
|
||||
}
|
||||
|
||||
private fun onEventCollectionReceived(referenceId: String, events: List<T>) {
|
||||
val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in referenceId }.map { it.key }
|
||||
orphanedReferences.forEach { id -> referencePool.remove(id) }
|
||||
|
||||
activePolls = referencePool.values.filter { it.isActive }.size
|
||||
if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) {
|
||||
log.info { "Last active references removed from pull pool, " }
|
||||
}
|
||||
|
||||
val isAvailable = if (referenceId in referencePool.keys) {
|
||||
referencePool[referenceId]?.isActive != true
|
||||
} else true
|
||||
|
||||
if (isAvailable) {
|
||||
referencePool[referenceId] = coroutine.async {
|
||||
onEventsReceived(events)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
private suspend fun onEventsReceived(events: List<T>): Boolean = coroutineScope {
|
||||
val listeners = getListeners()
|
||||
events.forEach { event ->
|
||||
@ -104,9 +127,13 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
||||
while (taskMode == ActiveMode.Active) {
|
||||
if (referencePoolIsReadyForEvents()) {
|
||||
log.debug { "New pull on database" }
|
||||
val events = eventManager.readAvailableEvents()
|
||||
onEventGroupsReceived(events)
|
||||
if (events.isNotEmpty()) {
|
||||
val referenceIdsAvailable = eventManager.getAvailableReferenceIds()
|
||||
for (referenceId in referenceIdsAvailable) {
|
||||
val events = eventManager.readAvailableEventsFor(referenceId)
|
||||
onEventCollectionReceived(referenceId, events)
|
||||
}
|
||||
|
||||
if (referenceIdsAvailable.isNotEmpty()) {
|
||||
if (pullDelay.get() != fastPullDelay.get()) {
|
||||
log.info { "Available events found, switching to fast pull @ Delay -> ${fastPullDelay.get()}" }
|
||||
}
|
||||
|
||||
@ -7,6 +7,8 @@ import no.iktdev.eventi.database.DataSource
|
||||
* Interacts with the database, needs to be within the Coordinator
|
||||
*/
|
||||
abstract class EventsManagerImpl<T: EventImpl>(val dataSource: DataSource) {
|
||||
|
||||
abstract fun getAvailableReferenceIds(): List<String>
|
||||
abstract fun readAvailableEvents(): List<List<T>>
|
||||
abstract fun readAvailableEventsFor(referenceId: String): List<T>
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user