New removal from queue

This commit is contained in:
bskjon 2024-04-25 23:44:09 +02:00
parent 67ba916360
commit 13651b6e90
4 changed files with 58 additions and 47 deletions

View File

@ -36,7 +36,7 @@ dependencies {
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha27")

View File

@ -3,6 +3,8 @@ package no.iktdev.mediaprocessing.coordinator
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig
@ -29,6 +31,8 @@ class CoordinatorApplication {
private var context: ApplicationContext? = null
private lateinit var storeDatabase: MySqlDataSource
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
@Suppress("unused")
fun getContext(): ApplicationContext? {
@ -47,7 +51,12 @@ fun getEventsDatabase(): MySqlDataSource {
lateinit var eventManager: PersistentEventManager
fun main(args: Array<String>) {
Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.defaultCoroutine
import no.iktdev.mediaprocessing.shared.common.isFileAvailable
import java.io.File
import java.util.UUID
@ -24,7 +25,7 @@ class FileWatcherQueue {
fileChannel.trySend(PendingFile(file = file))
// Coroutine to process the file and remove it from the queue when accessible
Coroutines.default().launch {
defaultCoroutine.launch {
while (true) {
delay(500)
val currentFile = fileChannel.receive()
@ -42,16 +43,13 @@ class FileWatcherQueue {
}
fun removeFromQueue(file: File, onFileRemoved: (PendingFile) -> Unit) {
if (file.isFile) {
val removedFile = fileChannel.findAndRemove { it.file == file }
removedFile.let {
it.forEach { file -> onFileRemoved(file) }
}
} else {
val removeFiles = fileChannel.findAndRemove { it.file.parentFile == file }
removeFiles.let {
it.forEach { file -> onFileRemoved(file) }
}
val currentItems = fileChannel.list()
val toRemove = currentItems.filter {
if (it.file.isDirectory) it.file.name == file.name else it.file.name == file.name && it.file.parent == file.parent
}
toRemove.let {
it.forEach { file -> onFileRemoved(file) }
}
}
@ -73,5 +71,15 @@ class FileWatcherQueue {
return forRemoved
}
fun <T> Channel<T>.list(): List<T> {
val items = mutableListOf<T>()
while (true) {
val item = tryReceive().getOrNull() ?: break
items.add(item)
trySend(item).isSuccess
}
return items
}
}

View File

@ -7,8 +7,8 @@ import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.log
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import no.iktdev.mediaprocessing.shared.contract.ProcessType
@ -41,41 +41,34 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val queue = FileWatcherQueue()
final val io = Coroutines.io()
suspend fun startWatcher() {
log.info { "Starting Watcher" }
watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
log.info { "Reached end of watcherChannel" }
}
final fun runCoroutine() {
io.launch { startWatcher() }
.invokeOnCompletion { runCoroutine() }
}
init {
runCoroutine()
ioCoroutine.launch {
log.info { "Starting Watcher" }
watcherChannel.consumeEach {
if (it.file == SharedConfig.incomingContent) {
logger.info { "IO Watcher ${it.kind} on ${it.file.absolutePath}" }
} else {
logger.info { "IO Event: ${it.kind}: ${it.file.name}" }
}
try {
when (it.kind) {
Deleted -> removeFile(it.file)
Initialized -> { /* Do nothing */ }
else -> {
val added = addFile(it.file)
if (!added) {
logger.info { "Ignoring event kind: ${it.kind.name} for file ${it.file.name} as it is not a supported video file" }
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
log.info { "Reached end of watcherChannel" }
}
}
private fun addFile(file: File): Boolean {
@ -95,6 +88,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
}
private fun removeFile(file: File) {
log.info { "Removing file from Queue ${file.name}" }
queue.removeFromQueue(file, this@InputDirectoryWatcher::onFileRemoved)
}