Adjustments

This commit is contained in:
bskjon 2024-04-13 14:02:26 +02:00
parent 5b964970a0
commit 3d64a99213
4 changed files with 51 additions and 22 deletions

View File

@ -41,7 +41,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null
val fileOut = events.findLast { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed? ?: return null
val coverUrl = meta?.data?.cover
val coverUrl = meta.data?.cover
return if (coverUrl.isNullOrBlank()) {
log.warn { "No cover available for ${baseInfo.title}" }
null
@ -49,7 +49,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
CoverInfoPerformed(
status = Status.COMPLETED,
url = coverUrl,
outFileBaseName = baseInfo.title,
outFileBaseName = meta.data?.title ?: baseInfo.title,
outDir = fileOut.outDirectory,
derivedFromEventId = event.eventId
)

View File

@ -42,26 +42,36 @@ class FileWatcherQueue {
}
fun removeFromQueue(file: File, onFileRemoved: (PendingFile) -> Unit) {
if (file.isFile) {
val removedFile = fileChannel.findAndRemove { it.file == file }
removedFile?.let {
onFileRemoved(it)
removedFile.let {
it.forEach { file -> onFileRemoved(file) }
}
} else {
val removeFiles = fileChannel.findAndRemove { it.file.parentFile == file }
removeFiles.let {
it.forEach { file -> onFileRemoved(file) }
}
}
}
// Extension function to find and remove an element from the channel
fun <T> Channel<T>.findAndRemove(predicate: (T) -> Boolean): T? {
fun <T> Channel<T>.findAndRemove(predicate: (T) -> Boolean): List<T> {
val forRemoved = mutableListOf<T>()
val items = mutableListOf<T>()
while (true) {
val item = tryReceive().getOrNull() ?: break
if (predicate(item)) {
return item
forRemoved.add(item)
}
items.add(item)
}
for (item in items) {
trySend(item).isSuccess
}
return null
return forRemoved
}
}

View File

@ -8,12 +8,13 @@ import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.log
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
interface FileWatcherEvents {
@ -50,25 +51,43 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved)
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
if (it.file.isFile && it.file.isSupportedVideoFile()) {
queue.addToQueue(it.file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)
} else if (it.file.isDirectory) {
val supportedFiles = it.file.walkTopDown().filter { f -> f.isFile && f.isSupportedVideoFile() }
supportedFiles.forEach { sf ->
queue.addToQueue(sf, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)
}
} else {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
private fun addFile(file: File): Boolean {
return if (file.isFile && file.isSupportedVideoFile()) {
log.info { "Adding ${file.name} to queue" }
queue.addToQueue(file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)
true
} else if (file.isDirectory) {
log.info { "Searching for files in ${file.name}" }
val supportedFiles = file.walkTopDown().filter { f -> f.isFile && f.isSupportedVideoFile() }
supportedFiles.forEach { sf ->
log.info { "Adding ${sf.name} to queue from folder" }
queue.addToQueue(sf, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)
}
true
} else false
}
private fun removeFile(file: File) {
queue.removeFromQueue(file, this@InputDirectoryWatcher::onFileRemoved)
}
override fun onFileAvailable(file: PendingFile) {
logger.info { "File available ${file.file.name}" }

View File

@ -106,7 +106,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
) + processKafkaEvents
@Scheduled(fixedDelay = (5_000))
@Scheduled(fixedDelay = (5*6_0000))
fun checkForWork() {
log.info { "Checking if there is any work to do.." }
readAllAvailableInQueue()