Fixed tagging (again) + made python a bit more sleepy + More

This commit is contained in:
Brage 2023-07-23 21:20:53 +02:00
parent 10d865ef47
commit d270f56936
12 changed files with 74 additions and 57 deletions

View File

@ -106,9 +106,9 @@ jobs:
context: ./Encode context: ./Encode
push: true push: true
tags: | tags: |
bskjon/media-processing.encoder:latest bskjon/mediaprocessing-encoder:latest
bskjon/media-processing.encoder:${{ github.sha }} bskjon/mediaprocessing-encoder:${{ github.sha }}
bskjon/media-processing.encoder:${{ steps.docker-tag.outputs.tag }} bskjon/mediaprocessing-encoder:${{ steps.docker-tag.outputs.tag }}
build-reader: build-reader:
needs: build-commoncode needs: build-commoncode

View File

@ -7,6 +7,7 @@ class DeserializerRegistry {
companion object { companion object {
private val _registry = mutableMapOf<KafkaEvents, IMessageDataDeserialization<*>>( private val _registry = mutableMapOf<KafkaEvents, IMessageDataDeserialization<*>>(
KafkaEvents.EVENT_READER_RECEIVED_FILE to FileResultDeserializer(), KafkaEvents.EVENT_READER_RECEIVED_FILE to FileResultDeserializer(),
KafkaEvents.EVENT_READER_RECEIVED_STREAMS to MediaStreamsDeserializer(),
KafkaEvents.EVENT_METADATA_OBTAINED to MetadataResultDeserializer(), KafkaEvents.EVENT_METADATA_OBTAINED to MetadataResultDeserializer(),
KafkaEvents.EVENT_READER_DETERMINED_SERIE to EpisodeInfoDeserializer(), KafkaEvents.EVENT_READER_DETERMINED_SERIE to EpisodeInfoDeserializer(),
KafkaEvents.EVENT_READER_DETERMINED_MOVIE to MovieInfoDeserializer(), KafkaEvents.EVENT_READER_DETERMINED_MOVIE to MovieInfoDeserializer(),

View File

@ -1,7 +1,9 @@
package no.iktdev.streamit.content.encode
class EncodeEnv { class EncodeEnv {
companion object { companion object {
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false
val maxRunners: Int = System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 val maxRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1}
} }
} }

View File

@ -1,5 +1,5 @@
import mu.KotlinLogging package no.iktdev.streamit.content.encode
import no.iktdev.exfl.observable.ObservableMap
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
@ -16,7 +16,6 @@ fun getContext(): ApplicationContext? {
fun main(args: Array<String>) { fun main(args: Array<String>) {
context = runApplication<EncoderApplication>(*args) context = runApplication<EncoderApplication>(*args)
} }
private val logger = KotlinLogging.logger {}
/*val progress = ObservableMap<String, EncodeInformation>().also { /*val progress = ObservableMap<String, EncodeInformation>().also {
it.addListener(object: ObservableMap.Listener<String, EncodeInformation> { it.addListener(object: ObservableMap.Listener<String, EncodeInformation> {

View File

@ -1,6 +1,6 @@
package no.iktdev.streamit.content.encode.runner package no.iktdev.streamit.content.encode.runner
import EncodeEnv import no.iktdev.streamit.content.encode.EncodeEnv
import no.iktdev.exfl.observable.ObservableList import no.iktdev.exfl.observable.ObservableList
import no.iktdev.exfl.observable.observableListOf import no.iktdev.exfl.observable.observableListOf
import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.Daemon

View File

@ -1,14 +1,11 @@
package no.iktdev.streamit.content.encode.runner package no.iktdev.streamit.content.encode.runner
import EncodeEnv import no.iktdev.streamit.content.encode.EncodeEnv
import no.iktdev.exfl.observable.ObservableList
import no.iktdev.exfl.observable.observableListOf import no.iktdev.exfl.observable.observableListOf
import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.Daemon
import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.deamon.IDaemon
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.Progress
import no.iktdev.streamit.content.encode.progress.ProgressDecoder
import java.io.File import java.io.File
class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonInterface: IExtractListener): IDaemon { class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonInterface: IExtractListener): IDaemon {

View File

@ -1,9 +1,8 @@
package no.iktdev.streamit.content.encode.runner package no.iktdev.streamit.content.encode.runner
import EncodeEnv import no.iktdev.streamit.content.encode.EncodeEnv
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.CommonConfig
import no.iktdev.streamit.content.common.deamon.IDaemon
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.Progress

View File

@ -23,7 +23,7 @@ repositories {
} }
dependencies { dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha63") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha66")
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha7")

View File

@ -74,14 +74,16 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM
return return
} }
val out = ContentOutName(videoInfo.fullName)
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out)
if (videoInfo is EpisodeInfo) { if (videoInfo is EpisodeInfo) {
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo) produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo)
} else if (videoInfo is MovieInfo) { } else if (videoInfo is MovieInfo) {
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo) produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo)
} }
val out = ContentOutName(videoInfo.fullName)
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out)
} }
final override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> { final override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {

View File

@ -12,60 +12,30 @@ import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener
import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Service @Service
class EncodedStreams : DefaultKafkaReader("encodedStreams"), ISequentialMessageEvent { class EncodedStreams : DefaultKafkaReader("streamSelector"), ICollectedMessagesEvent<ResultCollection> {
val collectionListener = CollectorMessageListener<ResultCollection>(
final val mainListener = object : SequentialMessageListener(
topic = CommonConfig.kafkaTopic, topic = CommonConfig.kafkaTopic,
consumer = defaultConsumer, consumer = defaultConsumer,
accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event, initiatorEvent = KafkaEvents.EVENT_READER_RECEIVED_FILE,
subAccepts = listOf( completionEvent = KafkaEvents.EVENT_READER_DETERMINED_FILENAME,
KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event,
KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event
),
deserializers = loadDeserializers(),
listener = this listener = this
) {} )
init { init {
mainListener.listen() collectionListener.listen()
}
override fun getRequiredMessages(): List<String> {
return mainListener.subAccepts + listOf(mainListener.accept)
}
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
logger.info { "All messages are received" }
val fileResultEvent = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]
val determinedFileNameEvent = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event]
val streamEvent = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event]
val fileResult = if (fileResultEvent != null && fileResultEvent.isSuccessful()) {
fileResultEvent.data as FileResult?
} else null
val outFileNameWithoutExtension = if (determinedFileNameEvent != null && determinedFileNameEvent.isSuccessful()) {
(determinedFileNameEvent.data as ContentOutName).baseName
} else fileResult?.sanitizedName
val streams = if (streamEvent != null && streamEvent.isSuccessful()) {
streamEvent.data as MediaStreams
} else null
createEncodeWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension)
createExtractWork(referenceId, fileResult?.title, fileResult?.file, streams, outFileNameWithoutExtension)
} }
fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) { fun createEncodeWork(referenceId: String, collection: String?, inFile: String?, streams: MediaStreams?, outFileName: String?) {
@ -131,4 +101,17 @@ class EncodedStreams : DefaultKafkaReader("encodedStreams"), ISequentialMessageE
) )
} }
override fun onCollectionCompleted(collection: ResultCollection?) {
logger.info { "Collection received" }
val referenceId = collection?.getRecords()?.firstOrNull()?.value()?.referenceId
if (referenceId == null) {
logger.warn { "referenceId is null, throwing collection" }
return
}
val outFileNameWithoutExtension: String? = collection.getFileName()?.baseName ?: collection.getFileResult()?.sanitizedName
createEncodeWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension)
createExtractWork(referenceId, collection.getFileResult()?.title, collection.getFileResult()?.file, collection.getStreams(), outFileNameWithoutExtension)
}
} }

View File

@ -0,0 +1,34 @@
package no.iktdev.streamit.content.reader.analyzer.encoding
import no.iktdev.streamit.content.common.deserializers.ContentOutNameDeserializer
import no.iktdev.streamit.content.common.deserializers.FileResultDeserializer
import no.iktdev.streamit.content.common.deserializers.MediaStreamsDeserializer
import no.iktdev.streamit.content.common.dto.ContentOutName
import no.iktdev.streamit.content.common.dto.reader.FileResult
import no.iktdev.streamit.content.common.streams.MediaStreams
import no.iktdev.streamit.library.kafka.KafkaEvents
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.listener.collector.EventCollection
import no.iktdev.streamit.library.kafka.listener.deserializer.deserializeIfSuccessful
import org.apache.kafka.clients.consumer.ConsumerRecord
class ResultCollection: EventCollection() {
fun getFirstOrNull(events: KafkaEvents): ConsumerRecord<String, Message>? {
return getRecords().firstOrNull { it.key() == events.event }
}
fun getFileResult(): FileResult? {
val record = getRecords().firstOrNull { it.key() == KafkaEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null
return FileResultDeserializer().deserializeIfSuccessful(record.value())
}
fun getFileName(): ContentOutName? {
val record = getFirstOrNull(KafkaEvents.EVENT_READER_DETERMINED_FILENAME) ?: return null
return ContentOutNameDeserializer().deserializeIfSuccessful(record.value())
}
fun getStreams(): MediaStreams? {
val record = getFirstOrNull(KafkaEvents.EVENT_READER_RECEIVED_STREAMS) ?: return null
return MediaStreamsDeserializer().deserializeIfSuccessful(record.value())
}
}

View File

@ -243,7 +243,7 @@ def main():
# Vent til should_stop er satt til True for å avslutte applikasjonen # Vent til should_stop er satt til True for å avslutte applikasjonen
while not should_stop: while not should_stop:
pass time.sleep(60)
# Stopp consumer-tråden # Stopp consumer-tråden
consumer_thread.stop() consumer_thread.stop()