diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt index 0dd61e70..101c1a98 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/watcher/InputDirectoryWatcher.kt @@ -4,14 +4,18 @@ import dev.vishna.watchservice.KWatchEvent.Kind.Deleted import dev.vishna.watchservice.KWatchEvent.Kind.Initialized import dev.vishna.watchservice.asWatchChannel import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import no.iktdev.mediaprocessing.shared.contract.ProcessType import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.ApplicationContext import org.springframework.stereotype.Service import java.io.File +import javax.annotation.PreDestroy interface FileWatcherEvents { @@ -35,39 +39,57 @@ interface FileWatcherEvents { @Service class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents { + private val logger = KotlinLogging.logger {} val watcherChannel = SharedConfig.incomingContent.asWatchChannel() val queue = FileWatcherQueue() + private var isStopping: Boolean = false + @PreDestroy + fun setStop() { + isStopping = true + } + + + suspend fun watchFiles() { + log.info { "Starting Watcher" } + watcherChannel.consumeEach { + if (it.file == SharedConfig.incomingContent) { + logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } + } else { + logger.info { "IO Event: ${it.kind}: ${it.file.name}" } + } + try { + when (it.kind) { + Deleted -> removeFile(it.file) + Initialized -> { /* Do nothing */ } + else -> { + val added = addFile(it.file) + if (!added) { + logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } + } + } + } + } catch (e: Exception) { + e.printStackTrace() + } + } + log.warn { "Watcher stopped!!!" } + if (!isStopping) { + ioCoroutine.launch { + watchFiles() + } + } + } + init { ioCoroutine.launch { - log.info { "Starting Watcher" } - watcherChannel.consumeEach { - if (it.file == SharedConfig.incomingContent) { - logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } - } else { - logger.info { "IO Event: ${it.kind}: ${it.file.name}" } - } - try { - when (it.kind) { - Deleted -> removeFile(it.file) - Initialized -> { /* Do nothing */ } - else -> { - val added = addFile(it.file) - if (!added) { - logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } - } - } - } - } catch (e: Exception) { - e.printStackTrace() - } - } - log.info { "Reached end of watcherChannel" } + watchFiles() } } + private fun addFile(file: File): Boolean { return if (file.isFile && file.isSupportedVideoFile()) { log.info { "Adding ${file.name} to queue" }