From 3d64a992139b0a613787b331649604773ca1c343 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 13 Apr 2024 14:02:26 +0200 Subject: [PATCH] Adjustments --- .../event/MetadataAndBaseInfoToCoverTask.kt | 4 +- .../tasks/input/watcher/FileWatcherQueue.kt | 22 ++++++--- .../input/watcher/InputDirectoryWatcher.kt | 45 +++++++++++++------ .../mediaprocessing/processer/Coordinator.kt | 2 +- 4 files changed, 51 insertions(+), 22 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index 4190d380..8bf2f047 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -41,7 +41,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null val fileOut = events.findLast { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed? ?: return null - val coverUrl = meta?.data?.cover + val coverUrl = meta.data?.cover return if (coverUrl.isNullOrBlank()) { log.warn { "No cover available for ${baseInfo.title}" } null @@ -49,7 +49,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi CoverInfoPerformed( status = Status.COMPLETED, url = coverUrl, - outFileBaseName = baseInfo.title, + outFileBaseName = meta.data?.title ?: baseInfo.title, outDir = fileOut.outDirectory, derivedFromEventId = event.eventId ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt index 28e4f032..a01514ab 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/FileWatcherQueue.kt @@ -42,26 +42,36 @@ class FileWatcherQueue { } fun removeFromQueue(file: File, onFileRemoved: (PendingFile) -> Unit) { - val removedFile = fileChannel.findAndRemove { it.file == file } - removedFile?.let { - onFileRemoved(it) + if (file.isFile) { + val removedFile = fileChannel.findAndRemove { it.file == file } + removedFile.let { + it.forEach { file -> onFileRemoved(file) } + } + } else { + val removeFiles = fileChannel.findAndRemove { it.file.parentFile == file } + removeFiles.let { + it.forEach { file -> onFileRemoved(file) } + } } } + // Extension function to find and remove an element from the channel - fun Channel.findAndRemove(predicate: (T) -> Boolean): T? { + fun Channel.findAndRemove(predicate: (T) -> Boolean): List { + val forRemoved = mutableListOf() val items = mutableListOf() while (true) { val item = tryReceive().getOrNull() ?: break if (predicate(item)) { - return item + forRemoved.add(item) } items.add(item) } for (item in items) { trySend(item).isSuccess } - return null + return forRemoved } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt index 5672c595..da9cdf1c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt @@ -8,12 +8,13 @@ import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.log import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile import no.iktdev.mediaprocessing.shared.contract.ProcessType import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service - +import java.io.File interface FileWatcherEvents { @@ -50,26 +51,44 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche } else { logger.info { "IO Event: ${it.kind}: ${it.file.name}" } } - when (it.kind) { - Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved) - Initialized -> { /* Do nothing */ } - else -> { - if (it.file.isFile && it.file.isSupportedVideoFile()) { - queue.addToQueue(it.file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable) - } else if (it.file.isDirectory) { - val supportedFiles = it.file.walkTopDown().filter { f -> f.isFile && f.isSupportedVideoFile() } - supportedFiles.forEach { sf -> - queue.addToQueue(sf, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable) + try { + when (it.kind) { + Deleted -> removeFile(it.file) + Initialized -> { /* Do nothing */ } + else -> { + val added = addFile(it.file) + if (!added) { + logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } } - } else { - logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" } } } + } catch (e: Exception) { + e.printStackTrace() } } } } + private fun addFile(file: File): Boolean { + return if (file.isFile && file.isSupportedVideoFile()) { + log.info { "Adding ${file.name} to queue" } + queue.addToQueue(file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable) + true + } else if (file.isDirectory) { + log.info { "Searching for files in ${file.name}" } + val supportedFiles = file.walkTopDown().filter { f -> f.isFile && f.isSupportedVideoFile() } + supportedFiles.forEach { sf -> + log.info { "Adding ${sf.name} to queue from folder" } + queue.addToQueue(sf, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable) + } + true + } else false + } + + private fun removeFile(file: File) { + queue.removeFromQueue(file, this@InputDirectoryWatcher::onFileRemoved) + } + override fun onFileAvailable(file: PendingFile) { logger.info { "File available ${file.file.name}" } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt index d9a19fbb..a246317f 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -106,7 +106,7 @@ class Coordinator(): CoordinatorBase