This commit is contained in:
bskjon 2024-04-23 19:09:45 +02:00
parent ac2baa5787
commit 066781edb9
5 changed files with 48 additions and 24 deletions

View File

@ -41,32 +41,41 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel() val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val queue = FileWatcherQueue() val queue = FileWatcherQueue()
val io = Coroutines.io() final val io = Coroutines.io()
init { suspend fun startWatcher() {
io.launch { log.info { "Starting Watcher" }
watcherChannel.consumeEach { watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) { if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else { } else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" } logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
} }
try { try {
when (it.kind) { when (it.kind) {
Deleted -> removeFile(it.file) Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ } Initialized -> { /* Do nothing */ }
else -> { else -> {
val added = addFile(it.file) val added = addFile(it.file)
if (!added) { if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
} }
} }
} catch (e: Exception) {
e.printStackTrace()
} }
} catch (e: Exception) {
e.printStackTrace()
} }
} }
log.info { "Reached end of watcherChannel" }
}
final fun runCoroutine() {
io.launch { startWatcher() }
.invokeOnCompletion { runCoroutine() }
}
init {
runCoroutine()
} }
private fun addFile(file: File): Boolean { private fun addFile(file: File): Boolean {

View File

@ -85,7 +85,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
} }
} }
fun readAllAvailableInQueue() { private fun readAllAvailableInQueue() {
val messages = eventManager.getProcessEventsClaimable() val messages = eventManager.getProcessEventsClaimable()
io.launch { io.launch {
messages.forEach { messages.forEach {
@ -117,7 +117,16 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
} }
fun readAllMessagesFor(referenceId: String, eventId: String) { /**
* If we get double events at the same time, this would be the case
*/
fun readNextAvailableMessageWithEvent(kafkaEvents: KafkaEvents) {
val messages = eventManager.getProcessEventsClaimable().firstOrNull { it.event == kafkaEvents }?.let {
readAllMessagesFor(referenceId = it.referenceId, eventId = it.eventId)
}
}
private fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = eventManager.getProcessEventsClaimable() val messages = eventManager.getProcessEventsClaimable()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
} }

View File

@ -16,6 +16,10 @@ class ClaimsService() {
@Autowired @Autowired
lateinit var coordinator: Coordinator lateinit var coordinator: Coordinator
/**
* If this serivce calls for readback all on the coordinator, it will cause a full on crash, as it
*/
@Scheduled(fixedDelay = (300_000)) @Scheduled(fixedDelay = (300_000))
fun validateClaims() { fun validateClaims() {
val expiredClaims = eventManager.getProcessEventsWithExpiredClaim() val expiredClaims = eventManager.getProcessEventsWithExpiredClaim()
@ -31,9 +35,9 @@ class ClaimsService() {
} }
it it
} }
released.forEach { /*released.forEach {
log.info { "Sending released ${it.referenceId} ${it.event} into queue" } log.info { "Sending released ${it.referenceId} ${it.event} into queue" }
coordinator.readAllMessagesFor(it.referenceId, it.eventId) coordinator.readAllMessagesFor(it.referenceId, it.eventId)
} }*/
} }
} }

View File

@ -209,6 +209,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
fun clearWorker() { fun clearWorker() {
this.runner = null this.runner = null
coordinator.readNextAvailableMessageWithEvent(KafkaEvents.EventWorkEncodeCreated)
} }
@PreDestroy @PreDestroy

View File

@ -188,6 +188,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
fun clearWorker() { fun clearWorker() {
this.runner = null this.runner = null
coordinator.readNextAvailableMessageWithEvent(KafkaEvents.EventWorkExtractCreated)
} }
@PreDestroy @PreDestroy