From 9298115d49af6c84e458db481831f50df89f6fdd Mon Sep 17 00:00:00 2001 From: Brage Date: Sat, 29 Jul 2023 16:12:34 +0200 Subject: [PATCH] Encoder queue update --- CommonCode/build.gradle.kts | 1 + .../streamit/content/common/deamon/Daemon.kt | 25 ++++++++++++++----- Encode/build.gradle.kts | 1 + .../content/encode/runner/EncodeDaemon.kt | 19 ++++++++++++++ .../encode/runner/RunnerCoordinator.kt | 3 +++ Reader/build.gradle.kts | 2 ++ 6 files changed, 45 insertions(+), 6 deletions(-) diff --git a/CommonCode/build.gradle.kts b/CommonCode/build.gradle.kts index a408f8dc..3479b317 100644 --- a/CommonCode/build.gradle.kts +++ b/CommonCode/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { implementation("org.json:json:20230227") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") testImplementation("junit:junit:4.13.2") testImplementation("org.junit.jupiter:junit-jupiter") diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt index a0c39d3a..fea48205 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/deamon/Daemon.kt @@ -4,21 +4,29 @@ import com.github.pgreze.process.ProcessResult import com.github.pgreze.process.Redirect import com.github.pgreze.process.process import com.google.gson.Gson +import kotlinx.coroutines.* import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines private val logger = KotlinLogging.logger {} open class Daemon(open val executable: String, val daemonInterface: IDaemon) { + val scope = Coroutines.io() + var job: Job? = null var executor: ProcessResult? = null open suspend fun run(parameters: List): Int { daemonInterface.onStarted() logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" } - executor = process(executable, *parameters.toTypedArray(), - stdout = Redirect.CAPTURE, - stderr = Redirect.CAPTURE, - consumer = { - daemonInterface.onOutputChanged(it) - }) + job = scope.launch { + executor = process(executable, *parameters.toTypedArray(), + stdout = Redirect.CAPTURE, + stderr = Redirect.CAPTURE, + consumer = { + daemonInterface.onOutputChanged(it) + }) + } + job?.join() + val resultCode = executor?.resultCode ?: -1 if (resultCode == 0) { daemonInterface.onEnded() @@ -26,4 +34,9 @@ open class Daemon(open val executable: String, val daemonInterface: IDaemon) { logger.info { "$executable result: $resultCode" } return resultCode } + + suspend fun cancel() { + job?.cancelAndJoin() + scope.cancel("Cancel operation triggered!") + } } \ No newline at end of file diff --git a/Encode/build.gradle.kts b/Encode/build.gradle.kts index aed156d1..7b8edcb7 100644 --- a/Encode/build.gradle.kts +++ b/Encode/build.gradle.kts @@ -28,6 +28,7 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") + implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt index 75ed9345..64306de2 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt @@ -9,11 +9,15 @@ import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.ProgressDecoder +import java.io.BufferedWriter import java.io.File +import java.io.FileWriter private val logger = KotlinLogging.logger {} class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInterface: IEncodeListener): IDaemon { + val logDir = File("/src/logs") + lateinit var outLogFile: File var outputCache = observableListOf() private val decoder = ProgressDecoder() private fun produceProgress(items: List) { @@ -37,6 +41,8 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte produceProgress(outputCache) } }) + logDir.mkdirs() + outLogFile = File(logDir, "${work.workId}-${work.collection}.log") } suspend fun runUsingWorkItem(): Int { @@ -68,6 +74,19 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte override fun onOutputChanged(line: String) { super.onOutputChanged(line) outputCache.add(line) + writeToLog(line) + } + private fun writeToLog(line: String) { + val fileWriter = FileWriter(outLogFile, true) // true indikerer at vi ønsker å appende til filen + val bufferedWriter = BufferedWriter(fileWriter) + + // Skriv logglinjen til filen + bufferedWriter.write(line) + bufferedWriter.newLine() // Legg til en ny linje etter logglinjen + + // Lukk BufferedWriter og FileWriter for å frigjøre ressurser + bufferedWriter.close() + fileWriter.close() } } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index e2b7553e..d8f5b08a 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -37,6 +37,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { final val defaultScope = Coroutines.default() private val jobsInProgress = AtomicInteger(0) + private var inProgressJobs = mutableListOf() val queue = Channel(Channel.UNLIMITED) @@ -55,9 +56,11 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { if (jobsInProgress.get() <= maxConcurrentJobs) { jobsInProgress.incrementAndGet() val job = processWorkItem(workItem) + inProgressJobs.add(job) job.invokeOnCompletion { val currentJobsInProgress = jobsInProgress.decrementAndGet() logger.info { "Available workers: ${maxConcurrentJobs - currentJobsInProgress}" } + inProgressJobs.remove(job) } } logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" } diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 4312d2d2..5231d6f4 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -28,6 +28,8 @@ dependencies { implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") + implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")