From 5be1468891d69b20d4635de4d9779820e36fb012 Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 14 Feb 2025 19:17:36 +0100 Subject: [PATCH] Watching multiple directories --- .../coordinator/CoordinatorUtils.kt | 14 ++++ .../watcher/InputDirectoryWatcher.kt | 74 ++++++++++++------- .../shared/common/SharedConfig.kt | 5 +- 3 files changed, 64 insertions(+), 29 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorUtils.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorUtils.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorUtils.kt new file mode 100644 index 00000000..61ca2e97 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorUtils.kt @@ -0,0 +1,14 @@ +package no.iktdev.mediaprocessing.coordinator + +import dev.vishna.watchservice.KWatchChannel +import dev.vishna.watchservice.asWatchChannel +import kotlinx.coroutines.runBlocking +import java.io.File + +data class FileWatcher(val file: File, val watcher: KWatchChannel) + +suspend fun File.asWatcher(block: suspend (KWatchChannel) -> Unit): FileWatcher { + val channel = this.asWatchChannel() + block(channel) + return FileWatcher(this, channel) +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt index 3b094c55..29501e94 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt @@ -5,15 +5,13 @@ import dev.vishna.watchservice.KWatchEvent.Kind.Initialized import dev.vishna.watchservice.asWatchChannel import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.consumeEach -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch +import kotlinx.coroutines.delay import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import no.iktdev.mediaprocessing.shared.common.contract.ProcessType +import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.ApplicationContext import org.springframework.stereotype.Service import java.io.File import javax.annotation.PreDestroy @@ -42,7 +40,7 @@ interface FileWatcherEvents { class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents { private val logger = KotlinLogging.logger {} - val watcherChannel = SharedConfig.incomingContent.asWatchChannel() + val watchDirectories = SharedConfig.incomingContent val queue = FileWatcherQueue() private var isStopping: Boolean = false @@ -55,36 +53,56 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche @OptIn(ExperimentalCoroutinesApi::class) suspend fun watchFiles() { log.info { "Starting Watcher" } - watcherChannel.consumeEach { - if (it.file == SharedConfig.incomingContent) { - logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } - } else { - logger.info { "IO Event: ${it.kind}: ${it.file.name}" } - } - try { - when (it.kind) { - Deleted -> removeFile(it.file) - Initialized -> { /* Do nothing */ } - else -> { - val added = addFile(it.file) - if (!added) { - logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } + for (folder in watchDirectories) { + startWatchOnDirectory(folder) + } + } + + val activeWatchers: MutableList = mutableListOf() + @OptIn(ExperimentalCoroutinesApi::class) + private suspend fun startWatchOnDirectory(file: File) { + if (activeWatchers.any {it -> it.file.absolutePath == file.absolutePath}) { + log.error { "Attempting to start a watcher on an already watched directory ${file.absolutePath}" } + } + val watcher = file.asWatcher { watcher -> + watcher.consumeEach { + if (it.file == SharedConfig.incomingContent) { + logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } + } else { + logger.info { "IO Event: ${it.kind}: ${it.file.name}" } + } + try { + when (it.kind) { + Deleted -> removeFile(it.file) + Initialized -> { /* Do nothing */ } + else -> { + val added = addFile(it.file) + if (!added) { + logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } + } } } + } catch (e: Exception) { + e.printStackTrace() } - } catch (e: Exception) { - e.printStackTrace() } - } - watcherChannel.invokeOnClose { - it?.printStackTrace() - log.warn { "Watcher stopped!!!" } - if (!isStopping) { - ioCoroutine.launch { - watchFiles() + }.also { watcher -> + watcher.watcher.invokeOnClose { + it?.printStackTrace() + log.warn { "Watcher stopped for ${watcher.file}" } + if (!isStopping) { + log.info { "Determined that the program is not in a termination stage.. Restarting watcher" } + activeWatchers.remove(watcher) + ioCoroutine.launch { + log.info { "Waiting 500ms before restarting watcher.." } + delay(500) + startWatchOnDirectory(watcher.file) + } } } } + log.info { "Now watching ${file.absolutePath} for files" } + activeWatchers.add(watcher) } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index 514525d3..a0e7f862 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -5,7 +5,10 @@ import no.iktdev.eventi.database.MySqlDataSource import java.io.File object SharedConfig { - var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") + var incomingContent: List = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) { + System.getenv("DIRECTORY_CONTENT_INCOMING").split(",") + .map { File(it) } + } else listOf(File("/src/input")) var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache") val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")