From 13651b6e90964c1113c36540c093711b9e57b9ea Mon Sep 17 00:00:00 2001 From: bskjon Date: Thu, 25 Apr 2024 23:44:09 +0200 Subject: [PATCH] New removal from queue --- apps/coordinator/build.gradle.kts | 2 +- .../coordinator/CoordinatorApplication.kt | 11 +++- .../tasks/input/watcher/FileWatcherQueue.kt | 30 +++++---- .../input/watcher/InputDirectoryWatcher.kt | 62 +++++++++---------- 4 files changed, 58 insertions(+), 47 deletions(-) diff --git a/apps/coordinator/build.gradle.kts b/apps/coordinator/build.gradle.kts index 5395f66a..c14632f0 100644 --- a/apps/coordinator/build.gradle.kts +++ b/apps/coordinator/build.gradle.kts @@ -36,7 +36,7 @@ dependencies { implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20210307") - implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha27") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index 12b6f25b..249b9bf7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -3,6 +3,8 @@ package no.iktdev.mediaprocessing.coordinator import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.exfl.coroutines.CoroutinesDefault +import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.Observables import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig @@ -29,6 +31,8 @@ class CoordinatorApplication { private var context: ApplicationContext? = null private lateinit var storeDatabase: MySqlDataSource +val ioCoroutine = CoroutinesIO() +val defaultCoroutine = CoroutinesDefault() @Suppress("unused") fun getContext(): ApplicationContext? { @@ -47,7 +51,12 @@ fun getEventsDatabase(): MySqlDataSource { lateinit var eventManager: PersistentEventManager fun main(args: Array) { - Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener { + ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() + } + }) + defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { override fun onUpdated(value: Throwable) { value.printStackTrace() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt index a01514ab..baca05d3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay import kotlinx.coroutines.launch import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.coordinator.defaultCoroutine import no.iktdev.mediaprocessing.shared.common.isFileAvailable import java.io.File import java.util.UUID @@ -24,7 +25,7 @@ class FileWatcherQueue { fileChannel.trySend(PendingFile(file = file)) // Coroutine to process the file and remove it from the queue when accessible - Coroutines.default().launch { + defaultCoroutine.launch { while (true) { delay(500) val currentFile = fileChannel.receive() @@ -42,16 +43,13 @@ class FileWatcherQueue { } fun removeFromQueue(file: File, onFileRemoved: (PendingFile) -> Unit) { - if (file.isFile) { - val removedFile = fileChannel.findAndRemove { it.file == file } - removedFile.let { - it.forEach { file -> onFileRemoved(file) } - } - } else { - val removeFiles = fileChannel.findAndRemove { it.file.parentFile == file } - removeFiles.let { - it.forEach { file -> onFileRemoved(file) } - } + val currentItems = fileChannel.list() + val toRemove = currentItems.filter { + if (it.file.isDirectory) it.file.name == file.name else it.file.name == file.name && it.file.parent == file.parent + } + + toRemove.let { + it.forEach { file -> onFileRemoved(file) } } } @@ -73,5 +71,15 @@ class FileWatcherQueue { return forRemoved } + fun Channel.list(): List { + val items = mutableListOf() + while (true) { + val item = tryReceive().getOrNull() ?: break + items.add(item) + trySend(item).isSuccess + } + return items + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt index 78718e53..c3458088 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt @@ -7,8 +7,8 @@ import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.log +import no.iktdev.exfl.coroutines.CoroutinesIO +import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import no.iktdev.mediaprocessing.shared.contract.ProcessType @@ -41,41 +41,34 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche private val logger = KotlinLogging.logger {} val watcherChannel = SharedConfig.incomingContent.asWatchChannel() val queue = FileWatcherQueue() - final val io = Coroutines.io() - suspend fun startWatcher() { - log.info { "Starting Watcher" } - watcherChannel.consumeEach { - if (it.file == SharedConfig.incomingContent) { - logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } - } else { - logger.info { "IO Event: ${it.kind}: ${it.file.name}" } - } - try { - when (it.kind) { - Deleted -> removeFile(it.file) - Initialized -> { /* Do nothing */ } - else -> { - val added = addFile(it.file) - if (!added) { - logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } - } - } - } - } catch (e: Exception) { - e.printStackTrace() - } - } - log.info { "Reached end of watcherChannel" } - } - - final fun runCoroutine() { - io.launch { startWatcher() } - .invokeOnCompletion { runCoroutine() } - } init { - runCoroutine() + ioCoroutine.launch { + log.info { "Starting Watcher" } + watcherChannel.consumeEach { + if (it.file == SharedConfig.incomingContent) { + logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } + } else { + logger.info { "IO Event: ${it.kind}: ${it.file.name}" } + } + try { + when (it.kind) { + Deleted -> removeFile(it.file) + Initialized -> { /* Do nothing */ } + else -> { + val added = addFile(it.file) + if (!added) { + logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } + } + } + } + } catch (e: Exception) { + e.printStackTrace() + } + } + log.info { "Reached end of watcherChannel" } + } } private fun addFile(file: File): Boolean { @@ -95,6 +88,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche } private fun removeFile(file: File) { + log.info { "Removing file from Queue ${file.name}" } queue.removeFromQueue(file, this@InputDirectoryWatcher::onFileRemoved) }