Updated using dispatcher scoped on executor
This commit is contained in:
parent
7e9588a9e5
commit
28082d0953
@ -23,6 +23,7 @@ open class Daemon(open val executable: String, val daemonInterface: IDaemon) {
|
|||||||
if (resultCode == 0) {
|
if (resultCode == 0) {
|
||||||
daemonInterface.onEnded()
|
daemonInterface.onEnded()
|
||||||
} else daemonInterface.onError(resultCode)
|
} else daemonInterface.onError(resultCode)
|
||||||
|
logger.info { "Daemon ended: $resultCode" }
|
||||||
return resultCode
|
return resultCode
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1,7 +1,7 @@
|
|||||||
package no.iktdev.streamit.content.encode.runner
|
package no.iktdev.streamit.content.encode.runner
|
||||||
|
|
||||||
|
import kotlinx.coroutines.*
|
||||||
import no.iktdev.streamit.content.encode.EncodeEnv
|
import no.iktdev.streamit.content.encode.EncodeEnv
|
||||||
import kotlinx.coroutines.runBlocking
|
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||||
@ -32,11 +32,12 @@ class RunnerCoordinator {
|
|||||||
TimeUnit.MILLISECONDS,
|
TimeUnit.MILLISECONDS,
|
||||||
LinkedBlockingQueue()
|
LinkedBlockingQueue()
|
||||||
)
|
)
|
||||||
|
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
|
||||||
|
val scope = CoroutineScope(dispatcher)
|
||||||
|
|
||||||
fun addEncodeMessageToQueue(message: Message) {
|
fun addEncodeMessageToQueue(message: Message) {
|
||||||
executor.execute {
|
scope.launch {
|
||||||
try {
|
try {
|
||||||
runBlocking {
|
|
||||||
if (message.data != null && message.data is EncodeWork) {
|
if (message.data != null && message.data is EncodeWork) {
|
||||||
val data: EncodeWork = message.data as EncodeWork
|
val data: EncodeWork = message.data as EncodeWork
|
||||||
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
|
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
|
||||||
@ -45,7 +46,6 @@ class RunnerCoordinator {
|
|||||||
} else {
|
} else {
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
e.printStackTrace()
|
e.printStackTrace()
|
||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user