Encoder queue update

This commit is contained in:
Brage 2023-07-29 16:12:34 +02:00
parent 9daf8b6636
commit 9298115d49
6 changed files with 45 additions and 6 deletions

View File

@ -27,6 +27,7 @@ dependencies {
implementation("org.json:json:20230227")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
testImplementation("junit:junit:4.13.2")
testImplementation("org.junit.jupiter:junit-jupiter")

View File

@ -4,21 +4,29 @@ import com.github.pgreze.process.ProcessResult
import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import com.google.gson.Gson
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
private val logger = KotlinLogging.logger {}
open class Daemon(open val executable: String, val daemonInterface: IDaemon) {
val scope = Coroutines.io()
var job: Job? = null
var executor: ProcessResult? = null
open suspend fun run(parameters: List<String>): Int {
daemonInterface.onStarted()
logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" }
job = scope.launch {
executor = process(executable, *parameters.toTypedArray(),
stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE,
consumer = {
daemonInterface.onOutputChanged(it)
})
}
job?.join()
val resultCode = executor?.resultCode ?: -1
if (resultCode == 0) {
daemonInterface.onEnded()
@ -26,4 +34,9 @@ open class Daemon(open val executable: String, val daemonInterface: IDaemon) {
logger.info { "$executable result: $resultCode" }
return resultCode
}
suspend fun cancel() {
job?.cancelAndJoin()
scope.cancel("Cancel operation triggered!")
}
}

View File

@ -28,6 +28,7 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.pgreze:kotlin-process:1.3.1")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")

View File

@ -9,11 +9,15 @@ import no.iktdev.streamit.content.common.deamon.IDaemon
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
import no.iktdev.streamit.content.encode.progress.Progress
import no.iktdev.streamit.content.encode.progress.ProgressDecoder
import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter
private val logger = KotlinLogging.logger {}
class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInterface: IEncodeListener): IDaemon {
val logDir = File("/src/logs")
lateinit var outLogFile: File
var outputCache = observableListOf<String>()
private val decoder = ProgressDecoder()
private fun produceProgress(items: List<String>) {
@ -37,6 +41,8 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
produceProgress(outputCache)
}
})
logDir.mkdirs()
outLogFile = File(logDir, "${work.workId}-${work.collection}.log")
}
suspend fun runUsingWorkItem(): Int {
@ -68,6 +74,19 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
override fun onOutputChanged(line: String) {
super.onOutputChanged(line)
outputCache.add(line)
writeToLog(line)
}
private fun writeToLog(line: String) {
val fileWriter = FileWriter(outLogFile, true) // true indikerer at vi ønsker å appende til filen
val bufferedWriter = BufferedWriter(fileWriter)
// Skriv logglinjen til filen
bufferedWriter.write(line)
bufferedWriter.newLine() // Legg til en ny linje etter logglinjen
// Lukk BufferedWriter og FileWriter for å frigjøre ressurser
bufferedWriter.close()
fileWriter.close()
}
}

View File

@ -37,6 +37,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
final val defaultScope = Coroutines.default()
private val jobsInProgress = AtomicInteger(0)
private var inProgressJobs = mutableListOf<Job>()
val queue = Channel<ExecutionBlock>(Channel.UNLIMITED)
@ -55,9 +56,11 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
if (jobsInProgress.get() <= maxConcurrentJobs) {
jobsInProgress.incrementAndGet()
val job = processWorkItem(workItem)
inProgressJobs.add(job)
job.invokeOnCompletion {
val currentJobsInProgress = jobsInProgress.decrementAndGet()
logger.info { "Available workers: ${maxConcurrentJobs - currentJobsInProgress}" }
inProgressJobs.remove(job)
}
}
logger.info { "Available workers: ${maxConcurrentJobs - jobsInProgress.get()}" }

View File

@ -28,6 +28,8 @@ dependencies {
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")