From ca463320070ddda2d8765f35d4105ee674ad78fe Mon Sep 17 00:00:00 2001 From: bskjon Date: Fri, 26 Apr 2024 00:36:55 +0200 Subject: [PATCH] Migrated coroutine --- apps/converter/build.gradle.kts | 2 +- .../converter/ConverterApplication.kt | 17 +++++++++++++++++ .../converter/ConverterCoordinator.kt | 6 ++---- .../mediaprocessing/coordinator/Coordinator.kt | 1 - .../coordinator/CoordinatorApplication.kt | 1 - .../tasks/input/watcher/FileWatcherQueue.kt | 2 -- .../input/watcher/InputDirectoryWatcher.kt | 3 --- apps/processer/build.gradle.kts | 2 +- .../mediaprocessing/processer/Coordinator.kt | 7 +++---- .../processer/ProcesserApplication.kt | 18 +++++++++++++++++- .../processer/ffmpeg/FfmpegWorker.kt | 4 ++-- .../processer/services/EncodeService.kt | 1 - .../processer/services/ExtractService.kt | 4 ---- apps/ui/build.gradle.kts | 2 +- .../iktdev/mediaprocessing/ui/UIApplication.kt | 15 +++++++++++---- .../ui/service/FileRegisterService.kt | 7 ++----- shared/common/build.gradle.kts | 2 +- .../shared/common/CoordinatorBase.kt | 5 +++-- .../shared/common/ProcessingService.kt | 2 -- .../shared/common/runner/Runner.kt | 8 ++------ 20 files changed, 63 insertions(+), 46 deletions(-) diff --git a/apps/converter/build.gradle.kts b/apps/converter/build.gradle.kts index e55c23ee..cd1843df 100644 --- a/apps/converter/build.gradle.kts +++ b/apps/converter/build.gradle.kts @@ -41,7 +41,7 @@ dependencies { implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20210307") - implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation("no.iktdev.library:subtitle:1.7.9-SNAPSHOT") diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt index 4504527f..2970c1f5 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -1,5 +1,8 @@ package no.iktdev.mediaprocessing.converter +import no.iktdev.exfl.coroutines.CoroutinesDefault +import no.iktdev.exfl.coroutines.CoroutinesIO +import no.iktdev.exfl.observable.Observables import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader @@ -14,6 +17,8 @@ import org.springframework.context.ApplicationContext @SpringBootApplication class ConvertApplication +val ioCoroutine = CoroutinesIO() +val defaultCoroutine = CoroutinesDefault() private var context: ApplicationContext? = null @Suppress("unused") fun getContext(): ApplicationContext? { @@ -29,6 +34,18 @@ fun getEventsDatabase(): MySqlDataSource { } fun main(args: Array) { + ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() + } + }) + defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() + } + }) + + eventsDatabase = DatabaseEnvConfig.toEventsDatabase() eventsDatabase.createDatabase() eventsDatabase.createTables(processerEvents) diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt index 0033a08e..50ef4129 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.converter import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener import no.iktdev.mediaprocessing.shared.common.CoordinatorBase import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage @@ -19,7 +18,6 @@ import org.springframework.stereotype.Service @EnableScheduling @Service class ConverterCoordinator() : CoordinatorBase() { - val io = Coroutines.io() private val log = KotlinLogging.logger {} @@ -51,7 +49,7 @@ class ConverterCoordinator() : CoordinatorBase() { private val log = KotlinLogging.logger {} - val io = Coroutines.io() + override val listeners = PersistentEventProcessBasedMessageListener() private val coordinatorEventListeners: MutableList = mutableListOf() @@ -66,7 +65,7 @@ class Coordinator(): CoordinatorBase) { + + ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() + } + }) + defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() + } + }) + eventsDatabase = DatabaseEnvConfig.toEventsDatabase() eventsDatabase.createDatabase() eventsDatabase.createTables(processerEvents) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt index 3124a6af..cb4ed740 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt @@ -4,7 +4,7 @@ import com.github.pgreze.process.Redirect import com.github.pgreze.process.process import kotlinx.coroutines.* import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.using import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.eventManager @@ -20,7 +20,7 @@ class FfmpegWorker( val listener: FfmpegWorkerEvents, val logDir: File ) { - private val scope = Coroutines.io() + private val scope = CoroutineScope(Dispatchers.IO + Job()) private var job: Job? = null fun isWorking(): Boolean { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 0742b6d0..475c65a0 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.processer.* import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 24d55432..125d6724 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.services import kotlinx.coroutines.* import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.processer.* import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker @@ -32,8 +31,6 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire override val producesEvent = KafkaEvents.EventWorkExtractPerformed - val scope = Coroutines.io() - private var runner: FfmpegWorker? = null val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" @@ -193,7 +190,6 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire @PreDestroy fun shutdown() { - scope.cancel() runner?.cancel("Stopping application") } } \ No newline at end of file diff --git a/apps/ui/build.gradle.kts b/apps/ui/build.gradle.kts index 9e97cc27..9cf74347 100644 --- a/apps/ui/build.gradle.kts +++ b/apps/ui/build.gradle.kts @@ -36,7 +36,7 @@ dependencies { implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") - implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared:common"))) implementation(project(mapOf("path" to ":shared:contract"))) diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt index 59433437..fcc05661 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt @@ -2,7 +2,8 @@ package no.iktdev.mediaprocessing.ui import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.exfl.coroutines.CoroutinesDefault +import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.observableMapOf @@ -19,6 +20,8 @@ import java.util.concurrent.CountDownLatch private val logger = KotlinLogging.logger {} +val ioCoroutine = CoroutinesIO() +val defaultCoroutine = CoroutinesDefault() @SpringBootApplication class UIApplication { @@ -51,10 +54,14 @@ fun main(args: Array) { persistentWriter = PersistentDataStore(eventsDatabase) - Coroutines.addListener(object : Observables.ObservableValue.ValueListener { + ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { override fun onUpdated(value: Throwable) { - logger.error { "Received error: ${value.message}" } - value.cause?.printStackTrace() + value.printStackTrace() + } + }) + defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener { + override fun onUpdated(value: Throwable) { + value.printStackTrace() } }) diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/FileRegisterService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/FileRegisterService.kt index 9169c958..e9a5dc7b 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/FileRegisterService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/service/FileRegisterService.kt @@ -4,10 +4,10 @@ import dev.vishna.watchservice.KWatchEvent import dev.vishna.watchservice.asWatchChannel import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.launch -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.ui.explorer.ExplorerCore import no.iktdev.mediaprocessing.ui.fileRegister +import no.iktdev.mediaprocessing.ui.ioCoroutine import org.springframework.stereotype.Service import java.io.File import java.math.BigInteger @@ -38,10 +38,7 @@ class FileRegisterService { } init { - Coroutines.io().launch { - - } - Coroutines.io().launch { + ioCoroutine.launch { watcherChannel.consumeEach { when (it.kind) { KWatchEvent.Kind.Created, KWatchEvent.Kind.Modified, KWatchEvent.Kind.Initialized -> { diff --git a/shared/common/build.gradle.kts b/shared/common/build.gradle.kts index 15575895..fe4b2b75 100644 --- a/shared/common/build.gradle.kts +++ b/shared/common/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation("com.github.pgreze:kotlin-process:1.3.1") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") - implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") implementation("com.google.code.gson:gson:2.8.9") diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt index 440ad732..249cf920 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.shared.common import kotlinx.coroutines.* import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.exfl.coroutines.CoroutinesDefault import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl @@ -19,6 +19,7 @@ import org.springframework.stereotype.Service import javax.annotation.PostConstruct abstract class CoordinatorBase> { + val defaultCoroutine = CoroutinesDefault() private var ready: Boolean = false fun isReady() = ready private val log = KotlinLogging.logger {} @@ -57,7 +58,7 @@ abstract class CoordinatorBase> { @PostConstruct fun onInitializationCompleted() { - Coroutines.default().launch { + defaultCoroutine.launch { while (!isAllServicesRegistered()) { log.info { "Waiting for mandatory services to start" } delay(1000) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt index e948b4cf..640469cc 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt @@ -1,6 +1,5 @@ package no.iktdev.mediaprocessing.shared.common -import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -12,7 +11,6 @@ import javax.annotation.PostConstruct @Service @Import(DefaultMessageListener::class) abstract class ProcessingService() { - val io = Coroutines.io() @Autowired lateinit var producer: CoordinatorProducer diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt index a38a716a..0441e6b0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt @@ -1,16 +1,12 @@ package no.iktdev.mediaprocessing.shared.common.runner -import kotlinx.coroutines.Job -import kotlinx.coroutines.cancel -import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.launch +import kotlinx.coroutines.* import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines open class Runner(open val executable: String, val daemonInterface: IRunner) { private val logger = KotlinLogging.logger {} - val scope = Coroutines.io() + val scope = CoroutineScope(Dispatchers.IO + Job()) var job: Job? = null var executor: com.github.pgreze.process.ProcessResult? = null open suspend fun run(parameters: List): Int {