Watching multiple directories

This commit is contained in:
bskjon 2025-02-14 19:17:36 +01:00
parent afa0900755
commit 5be1468891
3 changed files with 64 additions and 29 deletions

View File

@ -0,0 +1,14 @@
package no.iktdev.mediaprocessing.coordinator
import dev.vishna.watchservice.KWatchChannel
import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.runBlocking
import java.io.File
data class FileWatcher(val file: File, val watcher: KWatchChannel)
suspend fun File.asWatcher(block: suspend (KWatchChannel) -> Unit): FileWatcher {
val channel = this.asWatchChannel()
block(channel)
return FileWatcher(this, channel)
}

View File

@ -5,15 +5,13 @@ import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
import dev.vishna.watchservice.asWatchChannel import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import no.iktdev.mediaprocessing.shared.common.contract.ProcessType import no.iktdev.mediaprocessing.shared.common.contract.ProcessType
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import javax.annotation.PreDestroy import javax.annotation.PreDestroy
@ -42,7 +40,7 @@ interface FileWatcherEvents {
class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents { class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel() val watchDirectories = SharedConfig.incomingContent
val queue = FileWatcherQueue() val queue = FileWatcherQueue()
private var isStopping: Boolean = false private var isStopping: Boolean = false
@ -55,36 +53,56 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
@OptIn(ExperimentalCoroutinesApi::class) @OptIn(ExperimentalCoroutinesApi::class)
suspend fun watchFiles() { suspend fun watchFiles() {
log.info { "Starting Watcher" } log.info { "Starting Watcher" }
watcherChannel.consumeEach { for (folder in watchDirectories) {
if (it.file == SharedConfig.incomingContent) { startWatchOnDirectory(folder)
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" } }
} else { }
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
} val activeWatchers: MutableList<FileWatcher> = mutableListOf()
try { @OptIn(ExperimentalCoroutinesApi::class)
when (it.kind) { private suspend fun startWatchOnDirectory(file: File) {
Deleted -> removeFile(it.file) if (activeWatchers.any {it -> it.file.absolutePath == file.absolutePath}) {
Initialized -> { /* Do nothing */ } log.error { "Attempting to start a watcher on an already watched directory ${file.absolutePath}" }
else -> { }
val added = addFile(it.file) val watcher = file.asWatcher { watcher ->
if (!added) { watcher.consumeEach {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
} }
} }
} catch (e: Exception) {
e.printStackTrace()
} }
} catch (e: Exception) {
e.printStackTrace()
} }
} }.also { watcher ->
watcherChannel.invokeOnClose { watcher.watcher.invokeOnClose {
it?.printStackTrace() it?.printStackTrace()
log.warn { "Watcher stopped!!!" } log.warn { "Watcher stopped for ${watcher.file}" }
if (!isStopping) { if (!isStopping) {
ioCoroutine.launch { log.info { "Determined that the program is not in a termination stage.. Restarting watcher" }
watchFiles() activeWatchers.remove(watcher)
ioCoroutine.launch {
log.info { "Waiting 500ms before restarting watcher.." }
delay(500)
startWatchOnDirectory(watcher.file)
}
} }
} }
} }
log.info { "Now watching ${file.absolutePath} for files" }
activeWatchers.add(watcher)
} }

View File

@ -5,7 +5,10 @@ import no.iktdev.eventi.database.MySqlDataSource
import java.io.File import java.io.File
object SharedConfig { object SharedConfig {
var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") var incomingContent: List<File> = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) {
System.getenv("DIRECTORY_CONTENT_INCOMING").split(",")
.map { File(it) }
} else listOf(File("/src/input"))
var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache") var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache")
val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")