This commit is contained in:
Brage 2023-07-27 20:31:07 +02:00
parent 26c9570bf3
commit d9c8cab1fe
10 changed files with 82 additions and 54 deletions

View File

@ -20,7 +20,7 @@ dependencies {
implementation("com.github.pgreze:kotlin-process:1.3.1")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("com.google.code.gson:gson:2.8.9")

View File

@ -14,10 +14,10 @@ class DeserializerRegistry {
KafkaEvents.EVENT_READER_DETERMINED_FILENAME to ContentOutNameDeserializer(),
KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO to EncodeWorkDeserializer(),
KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE to EncodeWorkDeserializer(),
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED to EncodeWorkDeserializer(),
KafkaEvents.EVENT_READER_ENCODE_GENERATED_SUBTITLE to ExtractWorkDeserializer(),
KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE to ExtractWorkDeserializer(),
KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE to ConvertWorkDeserializer()
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED to ExtractWorkDeserializer(),
KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED to ConvertWorkDeserializer()
)
fun getRegistry(): Map<KafkaEvents, IMessageDataDeserialization<*>> = _registry.toMap()

View File

@ -25,7 +25,7 @@ dependencies {
implementation("no.iktdev.library:subtitle:1.7-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha74")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

View File

@ -27,7 +27,7 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi
private final val listener = object : SimpleMessageListener(
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accepts = listOf(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event)
accepts = listOf(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event)
) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
val referenceId = data.value().referenceId
@ -40,7 +40,7 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi
collection = workResult.collection,
language = workResult.language,
)
produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.PENDING)), convertWork)
produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_STARTED, Message(referenceId = referenceId, Status(statusType = StatusType.PENDING)), convertWork)
Coroutines.io().launch {
ConvertRunner(referenceId, this@SubtitleConsumer).readAndConvert(convertWork)
}
@ -55,15 +55,15 @@ class SubtitleConsumer: DefaultKafkaReader("convertHandlerSubtitle"), IConvertLi
}
override fun onStarted(referenceId: String) {
produceMessage(KafkaEvents.EVENT_CONVERTER_STARTED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), null)
produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_STARTED, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), null)
}
override fun onError(referenceId: String, info: SubtitleInfo, message: String) {
produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null)
produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED, Message(referenceId = referenceId, Status(statusType = StatusType.ERROR)), null)
}
override fun onEnded(referenceId: String, info: SubtitleInfo, work: ConvertWork) {
produceMessage(KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work)
produceMessage(KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED, Message(referenceId = referenceId, Status(statusType = StatusType.SUCCESS)), work)
}
}

View File

@ -23,7 +23,7 @@ repositories {
dependencies {
implementation(project(":CommonCode"))
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")

View File

@ -2,9 +2,11 @@ package no.iktdev.streamit.content.encode.runner
import com.google.gson.Gson
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.sync.Mutex
import no.iktdev.streamit.content.encode.EncodeEnv
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
@ -14,18 +16,27 @@ import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env
import org.springframework.stereotype.Service
import java.util.concurrent.*
private val logger = KotlinLogging.logger {}
data class ExecutionBlock(
val type: String,
val work: suspend () -> Int
)
@Service
class RunnerCoordinator {
private val logger = KotlinLogging.logger {}
val producer = DefaultProducer(CommonConfig.kafkaTopic)
final val defaultScope = Coroutines.default()
val queue = Channel<ExecutionBlock>()
val executor: ExecutorService = ThreadPoolExecutor(
/*val executor: ExecutorService = ThreadPoolExecutor(
EncodeEnv.maxRunners,
EncodeEnv.maxRunners,
0L,
@ -33,43 +44,60 @@ class RunnerCoordinator {
LinkedBlockingQueue()
)
val dispatcher: CoroutineDispatcher = executor.asCoroutineDispatcher()
val scope = CoroutineScope(dispatcher)
val scope = CoroutineScope(dispatcher)*/
init {
defaultScope.launch {
repeat(EncodeEnv.maxRunners) {
launch {
for (item in queue) {
item.work()
}
}
}
}
}
fun addEncodeMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
scope.launch {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
try {
if (message.data != null && message.data is EncodeWork) {
val workBlock = suspend {
val data: EncodeWork = message.data as EncodeWork
val encodeDaemon = EncodeDaemon(message.referenceId, data, encodeListener)
logger.info { "${message.referenceId} Starting encoding ${data.workId}" }
encodeDaemon.runUsingWorkItem()
}
queue.trySend(ExecutionBlock("encode", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS)))
} else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of EncodeWork or null")))
}
} catch (e: Exception) {
e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
}
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
}
}
fun addExtractMessageToQueue(message: Message) {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.PENDING)))
scope.launch {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
try {
if (message.data != null && message.data is ExtractWork) {
val workBlock = suspend {
val data: ExtractWork = message.data as ExtractWork
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
logger.info { "${message.referenceId} Starting extraction ${data.workId}" }
extractDaemon.runUsingWorkItem()
}
queue.trySend(ExecutionBlock("extract", workBlock))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.SUCCESS)))
} else {
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, "Data is not an instance of ExtractWork")))
}
} catch (e: Exception) {
e.printStackTrace()
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
}
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, message.withNewStatus(Status(StatusType.ERROR, e.message)))
}
}
@ -80,12 +108,12 @@ class RunnerCoordinator {
val encodeListener = object: IEncodeListener {
override fun onStarted(referenceId: String, work: EncodeWork) {
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
}
override fun onError(referenceId: String, work: EncodeWork, code: Int) {
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, message = code.toString()), work))
}
override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) {
@ -94,24 +122,24 @@ class RunnerCoordinator {
override fun onEnded(referenceId: String, work: EncodeWork) {
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
}
}
val extractListener = object : IExtractListener {
override fun onStarted(referenceId: String, work: ExtractWork) {
logger.info { "Work started for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Started" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_STARTED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_STARTED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
}
override fun onError(referenceId: String, work: ExtractWork, code: Int) {
logger.error { "Work failed for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Error $code" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(StatusType.ERROR, code.toString()), work))
}
override fun onEnded(referenceId: String, work: ExtractWork) {
logger.info { "Work ended for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Ended" }
producer.sendMessage(KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event, Message(referenceId, Status(statusType = StatusType.SUCCESS), work))
}
}

View File

@ -24,7 +24,7 @@ repositories {
val exposedVersion = "0.38.2"
dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha75")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha76")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14")

View File

@ -64,7 +64,7 @@ class ResultCollection: DefaultEventCollection() {
}
fun getEncodeWork(): EncodeWork? {
return firstOrNull(KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE)?.let {
return firstOrNull(KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED)?.let {
EncodeWorkDeserializer().deserializeIfSuccessful(it.value())
}
}

View File

@ -27,20 +27,20 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle"
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
accepts = listOf(
KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event,
KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event,
KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED.event
)
) {
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
val referenceId = data.value().referenceId
if (data.key() == KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE.event) {
if (data.key() == KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED.event) {
val work = data.value().dataAs(ExtractWork::class.java)
if (work == null) {
logger.info { "Event: ${data.key()} value is null" }
} else {
storeExtractWork(referenceId, work)
}
} else if (data.key() == KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE.event) {
} else if (data.key() == KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED.event) {
val work = data.value().dataAs(ConvertWork::class.java)
if (work == null) {
logger.info { "Event: ${data.key()} value is null" }
@ -63,10 +63,10 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle"
fun produceMessage(referenceId: String, outFile: String, statusType: StatusType, result: Any?) {
if (statusType == StatusType.SUCCESS) {
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, referenceId)
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_STORED_SUBTITLE, referenceId)
logger.info { "Stored ${File(outFile).absolutePath} subtitle" }
} else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, Message(referenceId, Status(statusType), result), "See log")
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_STORED_SUBTITLE, Message(referenceId, Status(statusType), result), "See log")
logger.error { "Failed to store ${File(outFile).absolutePath} subtitle" }
}
}
@ -104,8 +104,8 @@ class SubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle"
override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {
return DeserializerRegistry.getEventToDeserializer(
KafkaEvents.EVENT_ENCODER_ENDED_SUBTITLE_FILE,
KafkaEvents.EVENT_CONVERTER_ENDED_SUBTITLE_FILE
KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_ENDED,
KafkaEvents.EVENT_CONVERTER_SUBTITLE_FILE_ENDED
)
}
}

View File

@ -33,7 +33,7 @@ class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), IColle
topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer,
initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE,
completionEvent = KafkaEvents.EVENT_ENCODER_ENDED_VIDEO_FILE,
completionEvent = KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED,
acceptsFilter = listOf(
KafkaEvents.EVENT_METADATA_OBTAINED,
KafkaEvents.EVENT_READER_DETERMINED_SERIE,
@ -133,7 +133,7 @@ class VideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), IColle
e.printStackTrace()
}
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", status)
produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_STORED_VIDEO, collection.getReferenceId() ?: "M.I.A", status)
logger.info { "Stored ${encodeWork.outFile} video" }
}