v3 31 - Restart watcher

This commit is contained in:
bskjon 2024-07-19 23:36:31 +02:00
parent f5a3603a5a
commit 0aee64c7f3

View File

@ -4,14 +4,18 @@ import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
import dev.vishna.watchservice.KWatchEvent.Kind.Initialized import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
import dev.vishna.watchservice.asWatchChannel import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import javax.annotation.PreDestroy
interface FileWatcherEvents { interface FileWatcherEvents {
@ -35,39 +39,57 @@ interface FileWatcherEvents {
@Service @Service
class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents { class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel() val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val queue = FileWatcherQueue() val queue = FileWatcherQueue()
private var isStopping: Boolean = false
@PreDestroy
fun setStop() {
isStopping = true
}
suspend fun watchFiles() {
log.info { "Starting Watcher" }
watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
log.warn { "Watcher stopped!!!" }
if (!isStopping) {
ioCoroutine.launch {
watchFiles()
}
}
}
init { init {
ioCoroutine.launch { ioCoroutine.launch {
log.info { "Starting Watcher" } watchFiles()
watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
log.info { "Reached end of watcherChannel" }
} }
} }
private fun addFile(file: File): Boolean { private fun addFile(file: File): Boolean {
return if (file.isFile && file.isSupportedVideoFile()) { return if (file.isFile && file.isSupportedVideoFile()) {
log.info { "Adding ${file.name} to queue" } log.info { "Adding ${file.name} to queue" }