Migrated coroutine

This commit is contained in:
bskjon 2024-04-26 00:36:55 +02:00
parent a455146441
commit ca46332007
20 changed files with 63 additions and 46 deletions

View File

@ -41,7 +41,7 @@ dependencies {
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation("no.iktdev.library:subtitle:1.7.9-SNAPSHOT")

View File

@ -1,5 +1,8 @@
package no.iktdev.mediaprocessing.converter
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
@ -14,6 +17,8 @@ import org.springframework.context.ApplicationContext
@SpringBootApplication
class ConvertApplication
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
private var context: ApplicationContext? = null
@Suppress("unused")
fun getContext(): ApplicationContext? {
@ -29,6 +34,18 @@ fun getEventsDatabase(): MySqlDataSource {
}
fun main(args: Array<String>) {
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.converter
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.converter.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
@ -19,7 +18,6 @@ import org.springframework.stereotype.Service
@EnableScheduling
@Service
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
val io = Coroutines.io()
private val log = KotlinLogging.logger {}
@ -51,7 +49,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
if (!success) {
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
} else {
io.launch {
ioCoroutine.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
@ -65,7 +63,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
fun readAllInQueue() {
val messages = eventManager.getProcessEventsClaimable()// persistentReader.getAvailableProcessEvents()
io.launch {
ioCoroutine.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.coordinator
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.coordinator
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables

View File

@ -2,8 +2,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.defaultCoroutine
import no.iktdev.mediaprocessing.shared.common.isFileAvailable
import java.io.File

View File

@ -4,10 +4,7 @@ import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile

View File

@ -41,7 +41,7 @@ dependencies {
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.processer
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.coordination.PersistentEventProcessBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
@ -22,7 +21,7 @@ import org.springframework.stereotype.Service
@EnableScheduling
class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEventProcessBasedMessageListener>() {
private val log = KotlinLogging.logger {}
val io = Coroutines.io()
override val listeners = PersistentEventProcessBasedMessageListener()
private val coordinatorEventListeners: MutableList<CoordinatorEvents> = mutableListOf()
@ -66,7 +65,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
if (!success) {
log.error { "Unable to store message event: ${event.key.event} with eventId ${event.value.eventId} with referenceId ${event.value.referenceId} in database ${getEventsDatabase().database}!" }
} else {
io.launch {
ioCoroutine.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
@ -87,7 +86,7 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
private fun readAllAvailableInQueue() {
val messages = eventManager.getProcessEventsClaimable()
io.launch {
ioCoroutine.launch {
messages.forEach {
delay(500)
listeners.forwardBatchEventMessagesToListeners(listOf(it))

View File

@ -1,6 +1,9 @@
package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
@ -16,7 +19,8 @@ import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
private val logger = KotlinLogging.logger {}
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
@SpringBootApplication
class ProcesserApplication {
}
@ -30,6 +34,18 @@ lateinit var eventManager: PersistentEventManager
fun main(args: Array<String>) {
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)

View File

@ -4,7 +4,7 @@ import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.eventManager
@ -20,7 +20,7 @@ class FfmpegWorker(
val listener: FfmpegWorkerEvents,
val logDir: File
) {
private val scope = Coroutines.io()
private val scope = CoroutineScope(Dispatchers.IO + Job())
private var job: Job? = null
fun isWorking(): Boolean {

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.*
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
@ -32,8 +31,6 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
override val producesEvent = KafkaEvents.EventWorkExtractPerformed
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
@ -193,7 +190,6 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
@PreDestroy
fun shutdown() {
scope.cancel()
runner?.cancel("Stopping application")
}
}

View File

@ -36,7 +36,7 @@ dependencies {
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:contract")))

View File

@ -2,7 +2,8 @@ package no.iktdev.mediaprocessing.ui
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.Observables
import no.iktdev.exfl.observable.observableMapOf
@ -19,6 +20,8 @@ import java.util.concurrent.CountDownLatch
private val logger = KotlinLogging.logger {}
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
@SpringBootApplication
class UIApplication {
@ -51,10 +54,14 @@ fun main(args: Array<String>) {
persistentWriter = PersistentDataStore(eventsDatabase)
Coroutines.addListener(object : Observables.ObservableValue.ValueListener<Throwable> {
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
logger.error { "Received error: ${value.message}" }
value.cause?.printStackTrace()
value.printStackTrace()
}
})
defaultCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})

View File

@ -4,10 +4,10 @@ import dev.vishna.watchservice.KWatchEvent
import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.ui.explorer.ExplorerCore
import no.iktdev.mediaprocessing.ui.fileRegister
import no.iktdev.mediaprocessing.ui.ioCoroutine
import org.springframework.stereotype.Service
import java.io.File
import java.math.BigInteger
@ -38,10 +38,7 @@ class FileRegisterService {
}
init {
Coroutines.io().launch {
}
Coroutines.io().launch {
ioCoroutine.launch {
watcherChannel.consumeEach {
when (it.kind) {
KWatchEvent.Kind.Created, KWatchEvent.Kind.Modified, KWatchEvent.Kind.Initialized -> {

View File

@ -24,7 +24,7 @@ dependencies {
implementation("com.github.pgreze:kotlin-process:1.3.1")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.google.code.gson:gson:2.8.9")

View File

@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
@ -19,6 +19,7 @@ import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
val defaultCoroutine = CoroutinesDefault()
private var ready: Boolean = false
fun isReady() = ready
private val log = KotlinLogging.logger {}
@ -57,7 +58,7 @@ abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
@PostConstruct
fun onInitializationCompleted() {
Coroutines.default().launch {
defaultCoroutine.launch {
while (!isAllServicesRegistered()) {
log.info { "Waiting for mandatory services to start" }
delay(1000)

View File

@ -1,6 +1,5 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -12,7 +11,6 @@ import javax.annotation.PostConstruct
@Service
@Import(DefaultMessageListener::class)
abstract class ProcessingService() {
val io = Coroutines.io()
@Autowired
lateinit var producer: CoordinatorProducer

View File

@ -1,16 +1,12 @@
package no.iktdev.mediaprocessing.shared.common.runner
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
open class Runner(open val executable: String, val daemonInterface: IRunner) {
private val logger = KotlinLogging.logger {}
val scope = Coroutines.io()
val scope = CoroutineScope(Dispatchers.IO + Job())
var job: Job? = null
var executor: com.github.pgreze.process.ProcessResult? = null
open suspend fun run(parameters: List<String>): Int {