diff --git a/apps/coordinator/README.md b/apps/coordinator/README.md index 2fc50209..00c44f09 100644 --- a/apps/coordinator/README.md +++ b/apps/coordinator/README.md @@ -1,3 +1,6 @@ +# Coordinator +Only one instance is supported, while multiple processer's can be run at any time + # FLOW: ### Inputs: - File watcher diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 4af3cd2c..29a7d898 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -1,9 +1,12 @@ package no.iktdev.mediaprocessing.coordinator import com.google.gson.Gson +import kotlinx.coroutines.delay import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.coordinator.coordination.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -32,12 +35,9 @@ class Coordinator() { private val log = KotlinLogging.logger {} + val listeners = EventBasedMessageListener() - private val listeners: MutableList = mutableListOf() - fun addListener(listener: TaskCreatorListener) { - listeners.add(listener) - } - + private val forwarder = Forwarder() public fun startProcess(file: File, type: ProcessType) { val processStartEvent = ProcessStarted( @@ -48,66 +48,40 @@ class Coordinator() { producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent) } - fun produceEncodeWork(message: PersistentMessage) { - if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { - throw RuntimeException("Incorrect event passed ${message.event}") - } - if (message.data !is FfmpegWorkerArgumentsCreated) { - throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") - } - val data = message.data as FfmpegWorkerArgumentsCreated - data.entries.forEach { - FfmpegWorkRequestCreated( - inputFile = data.inputFile, - arguments = it.arguments, - outFile = it.outputFile - ).let { createdRequest -> - producer.sendMessage(message.referenceId, - KafkaEvents.EVENT_WORK_ENCODE_CREATED, - createdRequest) - } - } - } - - fun produceExtractWork(message: PersistentMessage) { - if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { - throw RuntimeException("Incorrect event passed ${message.event}") - } - if (message.data !is FfmpegWorkerArgumentsCreated) { - throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") - } - val data = message.data as FfmpegWorkerArgumentsCreated - data.entries.forEach { - val eventId = UUID.randomUUID().toString() - FfmpegWorkRequestCreated( - inputFile = data.inputFile, - arguments = it.arguments, - outFile = it.outputFile - ).let { createdRequest -> - producer.sendMessage(message.eventId, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED, - eventId, - createdRequest) - } - val outFile = File(it.outputFile) - ConvertWorkerRequest( - requiresEventId = eventId, - inputFile = it.outputFile, - true, - outFileBaseName = outFile.nameWithoutExtension, - outDirectory = outFile.parentFile.absolutePath - ).let { createdRequest -> - producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED, - createdRequest) - } - } - } - val io = Coroutines.io() + fun readAllUncompletedMessagesInQueue() { + val messages = PersistentDataReader().getUncompletedMessages() + io.launch { + messages.forEach { + delay(1000) + listeners.forwardBatchEventMessagesToListeners(it) + } + } + } + fun readAllMessagesFor(referenceId: String, eventId: String) { val messages = PersistentDataReader().getMessagesFor(referenceId) + if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) { + log.warn { "EventId ($eventId) for ReferenceId ($referenceId) has not been made available in the database yet." } + io.launch { + val fixedDelay = 1000L + delay(fixedDelay) + var delayed = 0L + var msc = PersistentDataReader().getMessagesFor(referenceId) + while (msc.find { it.eventId == eventId } != null || delayed < 1000*60) { + delayed += fixedDelay + msc = PersistentDataReader().getMessagesFor(referenceId) + } + operationToRunOnMessages(referenceId, eventId, msc) + } + } else { + operationToRunOnMessages(referenceId, eventId, messages) + } + } + + fun operationToRunOnMessages(referenceId: String, eventId: String, messages: List) { createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) io.launch { @@ -115,6 +89,10 @@ class Coordinator() { } } + fun getProcessStarted(messages: List): ProcessStarted? { + return messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted + } + suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List) { if (messages.any { it.data is ProcessCompleted }) { // TODO: Build and insert into database @@ -127,18 +105,11 @@ class Coordinator() { log.error { "Could not find $eventId in provided messages" } return } - listeners.forEach { it.onEventReceived(referenceId, triggered, messages) } + listeners.forwardEventMessageToListeners(triggered, messages) - if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) { - val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted - - if (processStarted.type == ProcessType.FLOW) { - log.info { "Process for $referenceId was started from flow and will be processed" } - if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { - produceEncodeWork(triggered) - } else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { - produceExtractWork(triggered) - } + if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) { + if (getProcessStarted(messages)?.type == ProcessType.FLOW) { + forwarder.produceAllMissingProcesserEvents(producer = producer, referenceId = referenceId, eventId = eventId, messages = messages) } else { log.info { "Process for $referenceId was started manually and will require user input for continuation" } } @@ -149,13 +120,115 @@ class Coordinator() { fun onReady() { io.launch { listener.onMessageReceived = { event -> - val success = PersistentDataStore().storeMessage(event.key.event, event.value) + val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value) if (!success) { - log.error { "Unable to store message: ${event.key.event} in database!" } + log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" } } else - readAllMessagesFor(event.value.referenceId, event.value.eventId) + io.launch { + delay(500) // Give the database a few sec to update + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } } - listener.listen(KafkaEnv.kafkaTopic) } + listener.listen(KafkaEnv.kafkaTopic) + } + readAllUncompletedMessagesInQueue() + } + + + class Forwarder() { + val forwardOnEventReceived = listOf( + KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED + ) + + fun hasAnyRequiredEventToCreateProcesserEvents(messages: List): Boolean { + return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }.isNotEmpty() + } + + fun isMissingEncodeWorkCreated(messages: List): Boolean { + val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED } + return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() } + } + fun isMissingExtractWorkCreated(messages: List): Boolean { + val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED } + return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() } + } + + fun produceAllMissingProcesserEvents(producer: CoordinatorProducer, referenceId: String, eventId: String, messages: List) { + val currentMessage = messages.find { it.eventId == eventId } + if (!currentMessage?.data.isSuccess()) { + return + } + when (currentMessage?.event) { + KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED -> { + if (isMissingEncodeWorkCreated(messages)) { + produceEncodeWork(producer, currentMessage) + } + } + KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> { + if (isMissingExtractWorkCreated(messages)) { + produceExtractWork(producer, currentMessage) + } + } + else -> {} + } + } + + + fun produceEncodeWork(producer: CoordinatorProducer, message: PersistentMessage) { + if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { + throw RuntimeException("Incorrect event passed ${message.event}") + } + if (message.data !is FfmpegWorkerArgumentsCreated) { + throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") + } + val data = message.data as FfmpegWorkerArgumentsCreated + data.entries.forEach { + FfmpegWorkRequestCreated( + inputFile = data.inputFile, + arguments = it.arguments, + outFile = it.outputFile + ).let { createdRequest -> + producer.sendMessage(message.referenceId, + KafkaEvents.EVENT_WORK_ENCODE_CREATED, + eventId = message.eventId, + createdRequest) + } + } + } + + fun produceExtractWork(producer: CoordinatorProducer, message: PersistentMessage) { + if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { + throw RuntimeException("Incorrect event passed ${message.event}") + } + if (message.data !is FfmpegWorkerArgumentsCreated) { + throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}") + } + val data = message.data as FfmpegWorkerArgumentsCreated + data.entries.forEach { + FfmpegWorkRequestCreated( + inputFile = data.inputFile, + arguments = it.arguments, + outFile = it.outputFile + ).let { createdRequest -> + producer.sendMessage(message.referenceId, + KafkaEvents.EVENT_WORK_EXTRACT_CREATED, + eventId = message.eventId, + createdRequest + ) + } + val outFile = File(it.outputFile) + ConvertWorkerRequest( + requiresEventId = message.eventId, + inputFile = it.outputFile, + true, + outFileBaseName = outFile.nameWithoutExtension, + outDirectory = outFile.parentFile.absolutePath + ).let { createdRequest -> + producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED, + createdRequest) + } + } + } } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt deleted file mode 100644 index eaff68cf..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt +++ /dev/null @@ -1,8 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator - -import org.springframework.stereotype.Service - -@Service -class MessageOperator { - -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt index 244a5196..57a8bb74 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt @@ -1,16 +1,97 @@ package no.iktdev.mediaprocessing.coordinator +import mu.KotlinLogging +import no.iktdev.mediaprocessing.coordinator.coordination.Tasks import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import org.springframework.beans.factory.annotation.Autowired +import javax.annotation.PostConstruct abstract class TaskCreator: TaskCreatorListener { + val log = KotlinLogging.logger {} + abstract val producesEvent: KafkaEvents + @Autowired lateinit var producer: CoordinatorProducer - open fun isPrerequisitesOk(events: List): Boolean { - return true + + @Autowired + lateinit var coordinator: Coordinator + + open val requiredEvents: List = listOf() + open val listensForEvents: List = listOf() + + open fun isPrerequisiteEventsOk(events: List): Boolean { + val currentEvents = events.map { it.event } + return requiredEvents.all { currentEvents.contains(it) } } + open fun isPrerequisiteDataPresent(events: List): Boolean { + val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() } + return failed.isEmpty() + } + + open fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean { + return event.event == singleOne + } + + fun getListener(): Tasks { + val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } + return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) + } + + + open fun prerequisitesRequired(events: List): List<() -> Boolean> { + return listOf { + isPrerequisiteEventsOk(events) + } + } + + open fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> { + return listOf() + } + + + private val context: MutableMap = mutableMapOf() + private val context_key_reference = "reference" + private val context_key_producesEvent = "event" + final override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { + context[context_key_reference] = referenceId + getListener().producesEvent.let { + context[context_key_producesEvent] = it + } + + if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) { + val result = onProcessEvents(event, events) + if (result != null) { + onResult(result) + } + } else { + // TODO: Re-enable this + // log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" } + } + } + + abstract fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? + + + private fun onResult(data: MessageDataWrapper) { + producer.sendMessage( + referenceId = context[context_key_reference] as String, + event = context[context_key_producesEvent] as KafkaEvents, + data = data + ) + } + + @PostConstruct + fun postConstruct() { + coordinator.listeners.add(getListener()) + } +} + +fun interface Prerequisite { + fun execute(value: Any): Boolean } interface TaskCreatorListener { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt new file mode 100644 index 00000000..49c330f0 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/EventBasedMessageListener.kt @@ -0,0 +1,63 @@ +package no.iktdev.mediaprocessing.coordinator.coordination + +import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents + +class EventBasedMessageListener { + private val listeners: MutableList = mutableListOf() + + fun add(produces: KafkaEvents, listener: TaskCreatorListener) { + listeners.add(Tasks(producesEvent = produces, taskHandler = listener)) + } + + fun add(task: Tasks) { + listeners.add(task) + } + + private fun waitingListeners(events: List): List { + val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } + return nonCreators + } + + private fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List): List { + return waitingListeners.filter { event.event in it.listensForEvents } + } + + /** + * This will be called in sequence, thus some messages might be made a duplicate of. + */ + fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List) { + val waitingListeners = waitingListeners(events) + val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners) + availableListeners.forEach { + try { + it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events) + } catch (e: Exception) { + e.printStackTrace() + } + } + } + + /** + * This will be called with all messages at once, thus it should reflect kafka topic and database + */ + fun forwardBatchEventMessagesToListeners(events: List) { + val waitingListeners = waitingListeners(events) + waitingListeners.forEach { + try { + val last = events.last() + it.taskHandler.onEventReceived(last.referenceId, last, events) + } catch (e: Exception) { + e.printStackTrace() + } + } + } + +} + +data class Tasks( + val producesEvent: KafkaEvents, + val listensForEvents: List = listOf(), + val taskHandler: TaskCreatorListener +) \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt deleted file mode 100644 index 99554637..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt +++ /dev/null @@ -1,5 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.coordination - -class MessageSequence { - -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/ProcesserSocketMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/ProcesserSocketMessageListener.kt new file mode 100644 index 00000000..33531a9b --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/ProcesserSocketMessageListener.kt @@ -0,0 +1,11 @@ +package no.iktdev.mediaprocessing.coordinator.coordination + +/** + * Class to handle messages from websockets, produced by Processer instances. + * This is due to keep a overview of progress by processer + */ +class ProcesserSocketMessageListener { + + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt deleted file mode 100644 index 25b0ec13..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt +++ /dev/null @@ -1,57 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.reader - -import kotlinx.coroutines.launch -import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener -import no.iktdev.mediaprocessing.shared.common.ProcessingService -import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import java.io.File - -@Service -class BaseInfoFromFile(@Autowired var coordinator: Coordinator): ProcessingService() { - - - override fun onResult(referenceId: String, data: MessageDataWrapper) { - producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data) - } - - override fun onReady() { - coordinator.addListener(object : TaskCreatorListener { - override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) { - io.launch { - val result = readFileInfo(event.data as ProcessStarted) - onResult(referenceId, result) - } - } - } - - }) - } - - fun readFileInfo(started: ProcessStarted): MessageDataWrapper { - val result = try { - val fileName = File(started.file).nameWithoutExtension - val fileNameParser = FileNameParser(fileName) - BaseInfoPerformed( - Status.COMPLETED, - title = fileNameParser.guessDesiredTitle(), - sanitizedName = fileNameParser.guessDesiredFileName() - ) - } catch (e: Exception) { - e.printStackTrace() - MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file") - } - return result - } -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt deleted file mode 100644 index 62618270..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt +++ /dev/null @@ -1,17 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.reader - -import no.iktdev.exfl.coroutines.Coroutines - - -class MediaStreamsAnalyze { - val io = Coroutines.io() -/* - val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> - if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) { - if (event.value.data?.status == Status.COMPLETED) { - - } - } - }*/ - -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt new file mode 100644 index 00000000..5c680a8c --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -0,0 +1,51 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event + +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service +import java.io.File + +@Service +class BaseInfoFromFile() : TaskCreator() { + + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED + + override val requiredEvents: List = listOf(KafkaEvents.EVENT_PROCESS_STARTED) + + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + return readFileInfo(event.data as ProcessStarted) + } + + fun readFileInfo(started: ProcessStarted): MessageDataWrapper { + val result = try { + val fileName = File(started.file).nameWithoutExtension + val fileNameParser = FileNameParser(fileName) + BaseInfoPerformed( + Status.COMPLETED, + title = fileNameParser.guessDesiredTitle(), + sanitizedName = fileNameParser.guessDesiredFileName() + ) + } catch (e: Exception) { + e.printStackTrace() + MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file") + } + return result + } + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt new file mode 100644 index 00000000..a34a3f91 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -0,0 +1,52 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event + +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service + +@Service +class MetadataAndBaseInfoToCoverTask : TaskCreator() { + + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER + + override val requiredEvents: List = listOf( + KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, + KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE, + KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED + ) + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + + val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed + val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null + val fileOut = events.findLast { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed? ?: return null + + val coverUrl = meta?.data?.cover + return if (coverUrl.isNullOrBlank()) { + log.warn { "No cover available for ${baseInfo.title}" } + null + } else { + CoverInfoPerformed( + status = Status.COMPLETED, + url = coverUrl, + outFileBaseName = baseInfo.title, + outDir = fileOut.outDirectory + ) + } + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt similarity index 56% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 4818162a..0e220cdf 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -1,8 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event -import mu.KotlinLogging import no.iktdev.exfl.using -import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds @@ -11,11 +9,12 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service @@ -26,35 +25,40 @@ import java.time.LocalDateTime */ @Service @EnableScheduling -class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreator() { - private val log = KotlinLogging.logger {} - init { - coordinator.addListener(this) - } +class MetadataAndBaseInfoToFileOut(): TaskCreator() { + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + val waitingProcessesForMeta: MutableMap = mutableMapOf() + override val listensForEvents: List = listOf( + KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, + KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED + ) - override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - if (!listOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED).contains(event.event)) { - return - } + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? // Only Return here as both baseInfo events are required to continue if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) { - return + return null } if (baseInfo.isSuccess() && meta == null) { log.info { "Sending ${baseInfo?.title} to waiting queue" } - if (!waitingProcessesForMeta.containsKey(referenceId)) { - waitingProcessesForMeta[referenceId] = LocalDateTime.now() + if (!waitingProcessesForMeta.containsKey(event.referenceId)) { + waitingProcessesForMeta[event.referenceId] = LocalDateTime.now() } - return + return null } - baseInfo ?: return // Return if baseInfo is null + if (!isPrerequisiteDataPresent(events)) { + return null + } + + baseInfo ?: return null // Return if baseInfo is null val metaContentType: String? = if (meta.isSuccess()) meta?.data?.type else null val contentType = when (metaContentType) { @@ -64,46 +68,21 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord } val fileDeterminate = FileNameDeterminate(baseInfo.title, baseInfo.sanitizedName, contentType) - if (waitingProcessesForMeta.containsKey(referenceId)) { - waitingProcessesForMeta.remove(referenceId) + if (waitingProcessesForMeta.containsKey(event.referenceId)) { + waitingProcessesForMeta.remove(event.referenceId) } val outputDirectory = SharedConfig.outgoingContent.using(baseInfo.title) - val vi = fileDeterminate.getDeterminedVideoInfo() - if (vi != null) { - producer.sendMessage( - referenceId, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE, - data = VideoInfoPerformed(Status.COMPLETED, vi) - ) + val vi = fileDeterminate.getDeterminedVideoInfo()?.toJsonObject() + return if (vi != null) { + VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath) } else { - producer.sendMessage( - referenceId, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE, - data = MessageDataWrapper(Status.ERROR, "No VideoInfo found...") - ) + MessageDataWrapper(Status.ERROR, "No VideoInfo found...") } - - - val coverUrl = meta?.data?.cover - if (coverUrl.isNullOrBlank()) { - log.warn { "No cover available for ${baseInfo.title}" } - } else { - producer.sendMessage( - referenceId, - KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED, - CoverInfoPerformed( - status = Status.COMPLETED, - url = coverUrl, - outFileBaseName = baseInfo.title, - outDir = outputDirectory.absolutePath - ) - ) - } - } + //@Scheduled(fixedDelay = (60_000)) @Scheduled(fixedDelay = (1_000)) fun sendErrorMessageForMetadata() { @@ -112,7 +91,7 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord } expired.forEach { log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } - producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOutAndCoverTask::class.simpleName}")) + producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}")) waitingProcessesForMeta.remove(it.key) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt deleted file mode 100644 index 5c6ea92b..00000000 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt +++ /dev/null @@ -1,299 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.tasks.event - -import com.google.gson.Gson -import mu.KotlinLogging -import no.iktdev.exfl.using -import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.TaskCreator -import no.iktdev.mediaprocessing.shared.common.Preference -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage -import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.stereotype.Service -import java.io.File - -/** - * Is to be called or to run with the result from FileOout - */ -@Service -class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : TaskCreator() { - private val log = KotlinLogging.logger {} - - init { - coordinator.addListener(this) - } - - override fun isPrerequisitesOk(events: List): Boolean { - val required = listOf( - KafkaEvents.EVENT_PROCESS_STARTED, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE - ) - val currentEvents = events.map { it.event } - val hasAllRequiredEvents = required.all { currentEvents.contains(it) } - val hasAllRequiredData = events.filter { e -> e.event in required }.all { it.data.isSuccess() } - return hasAllRequiredData && hasAllRequiredEvents - } - - override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - val preference = Preference.getPreference() - - if (!isPrerequisitesOk(events)) { - return - } - val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted - val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed - val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed - val serializedParsedStreams = readStreamsEvent.streams - - val outDir = SharedConfig.outgoingContent.using(baseInfo.title) - - getFfmpegVideoArguments( - inputFile = inputFile.file, - outDir = outDir, - preference = preference.encodePreference, - baseInfo = baseInfo, - serializedParsedStreams = serializedParsedStreams - ).let { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, it) } - - getFfmpegSubtitleArguments( - inputFile = inputFile.file, - outDir = outDir, - baseInfo = baseInfo, - serializedParsedStreams = serializedParsedStreams - ).let { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED, it) } - - - } - - private fun getFfmpegVideoArguments( - inputFile: String, - outDir: File, - preference: EncodingPreference, - baseInfo: BaseInfoPerformed, - serializedParsedStreams: ParsedMediaStreams - ): MessageDataWrapper { - val outVideoFile = outDir.using("${baseInfo.sanitizedName}.mp4").absolutePath - - val vaas = VideoAndAudioSelector(serializedParsedStreams, preference) - - val vArg = vaas.getVideoStream()?.let { VideoArguments(it, serializedParsedStreams, preference.video).getVideoArguments() } - val aArg = vaas.getAudioStream()?.let { AudioArguments(it, serializedParsedStreams, preference.audio).getAudioArguments() } - - val vaArgs = toFfmpegWorkerArguments(vArg, aArg) - return if (vaArgs.isEmpty()) { - MessageDataWrapper(Status.ERROR, message = "Unable to produce arguments") - } else { - FfmpegWorkerArgumentsCreated( - status = Status.COMPLETED, - inputFile = inputFile, - entries = listOf(FfmpegWorkerArgument( - outputFile = outVideoFile, - arguments = vaArgs - )) - ) - } - } - - private fun getFfmpegSubtitleArguments( - inputFile: String, - outDir: File, - baseInfo: BaseInfoPerformed, - serializedParsedStreams: ParsedMediaStreams - ): MessageDataWrapper { - val subRootDir = outDir.using("sub") - val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments() - - val entries = sArg.mapNotNull { - FfmpegWorkerArgument( - arguments = it.codecParameters + it.optionalParameters + listOf("-map", "0:s:${it.index}"), - outputFile = subRootDir.using(it.language, "${baseInfo.sanitizedName}.${it.format}").absolutePath - ) - } - return FfmpegWorkerArgumentsCreated( - status = Status.COMPLETED, - inputFile = inputFile, - entries = entries - ) - } - - private class VideoAndAudioSelector(val mediaStreams: ParsedMediaStreams, val preference: EncodingPreference) { - private var defaultVideoSelected: VideoStream? = mediaStreams.videoStream - .filter { (it.duration_ts ?: 0) > 0 } - .maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.videoStream.minByOrNull { it.index } - private var defaultAudioSelected: AudioStream? = mediaStreams.audioStream - .filter { (it.duration_ts ?: 0) > 0 } - .maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.audioStream.minByOrNull { it.index } - - fun getVideoStream(): VideoStream? { - return defaultVideoSelected - } - - fun getAudioStream(): AudioStream? { - val languageFiltered = mediaStreams.audioStream.filter { it.tags.language == preference.audio.language } - val channeledAndCodec = languageFiltered.find { - it.channels >= (preference.audio.channels ?: 2) && it.codec_name == preference.audio.codec.lowercase() - } - return channeledAndCodec ?: return languageFiltered.minByOrNull { it.index } ?: defaultAudioSelected - } - - } - - private class VideoArguments(val videoStream: VideoStream, val allStreams: ParsedMediaStreams, val preference: VideoPreference) { - fun isVideoCodecEqual() = getCodec(videoStream.codec_name) == getCodec(preference.codec.lowercase()) - protected fun getCodec(name: String): String { - return when (name) { - "hevc", "hevec", "h265", "h.265", "libx265" - -> "libx265" - - "h.264", "h264", "libx264" - -> "libx264" - - else -> name - } - } - - fun getVideoArguments(): VideoArgumentsDto { - val optionalParams = mutableListOf() - if (preference.pixelFormatPassthrough.none { it == videoStream.pix_fmt }) { - optionalParams.addAll(listOf("-pix_fmt", preference.pixelFormat)) - } - val codecParams = if (isVideoCodecEqual()) listOf("-vcodec", "copy") - else { - optionalParams.addAll(listOf("-crf", preference.threshold.toString())) - listOf("-c:v", getCodec(preference.codec.lowercase())) - } - - return VideoArgumentsDto( - index = allStreams.videoStream.indexOf(videoStream), - codecParameters = codecParams, - optionalParameters = optionalParams - ) - } - } - - private class AudioArguments(val audioStream: AudioStream, val allStreams: ParsedMediaStreams, val preference: AudioPreference) { - fun isAudioCodecEqual() = audioStream.codec_name.lowercase() == preference.codec.lowercase() - private fun shouldUseEAC3(): Boolean { - return (preference.defaultToEAC3OnSurroundDetected && audioStream.channels > 2 && audioStream.codec_name.lowercase() != "eac3") - } - - fun getAudioArguments(): AudioArgumentsDto { - val optionalParams = mutableListOf() - val codecParams = if (shouldUseEAC3()) - listOf("-c:a", "eac3") - else if (!isAudioCodecEqual()) { - listOf("-c:a", preference.codec) - } else - listOf("-acodec", "copy") - return AudioArgumentsDto( - index = allStreams.audioStream.indexOf(audioStream), - codecParameters = codecParams, - optionalParameters = optionalParams - ) - } - - } - - private class SubtitleArguments(val subtitleStreams: List) { - /** - * @property DEFAULT is default subtitle as dialog - * @property CC is Closed-Captions - * @property SHD is Hard of hearing - * @property NON_DIALOGUE is for Signs or Song (as in lyrics) - */ - private enum class SubtitleType { - DEFAULT, - CC, - SHD, - NON_DIALOGUE - } - - private fun SubtitleStream.isCC(): Boolean { - val title = this.tags.title?.lowercase() ?: return false - val keywords = listOf("cc", "closed caption") - return keywords.any { title.contains(it) } - } - private fun SubtitleStream.isSHD(): Boolean { - val title = this.tags.title?.lowercase() ?: return false - val keywords = listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing") - return keywords.any { title.contains(it) } - } - private fun SubtitleStream.isSignOrSong(): Boolean { - val title = this.tags.title?.lowercase() ?: return false - val keywords = listOf("song", "songs", "sign", "signs") - return keywords.any { title.contains(it) } - } - private fun getSubtitleType(stream: SubtitleStream): SubtitleType { - return if (stream.isSignOrSong()) - SubtitleType.NON_DIALOGUE - else if (stream.isSHD()) { - SubtitleType.SHD - } else if (stream.isCC()) { - SubtitleType.CC - } else SubtitleType.DEFAULT - } - - fun getSubtitleArguments(): List { - val acceptable = subtitleStreams.filter { !it.isSignOrSong() } - val codecFiltered = acceptable.filter { getFormatToCodec(it.codec_name) != null } - val mappedToType = codecFiltered.map { getSubtitleType(it) to it }.filter { it.first in SubtitleType.entries } - .groupBy { it.second.tags.language ?: "eng" } - .mapValues { entry -> - val languageStreams = entry.value - val sortedStreams = languageStreams.sortedBy { SubtitleType.entries.indexOf(it.first) } - sortedStreams.firstOrNull()?.second - }.mapNotNull { it.value } - - return mappedToType.mapNotNull { stream -> - getFormatToCodec(stream.codec_name)?.let { format -> - SubtitleArgumentsDto( - index = subtitleStreams.indexOf(stream), - language = stream.tags.language ?: "eng", - format = format - ) - } - } - - } - - fun getFormatToCodec(codecName: String): String? { - return when(codecName) { - "ass" -> "ass" - "subrip" -> "srt" - "webvtt", "vtt" -> "vtt" - "smi" -> "smi" - "hdmv_pgs_subtitle" -> null - else -> null - } - } - - } - - - private fun toFfmpegWorkerArguments( - videoArguments: VideoArgumentsDto?, - audioArguments: AudioArgumentsDto? - ): List { - val arguments = mutableListOf( - *videoArguments?.codecParameters?.toTypedArray() ?: arrayOf(), - *videoArguments?.optionalParameters?.toTypedArray() ?: arrayOf(), - *audioArguments?.codecParameters?.toTypedArray() ?: arrayOf(), - *audioArguments?.optionalParameters?.toTypedArray() ?: arrayOf() - ) - videoArguments?.index?.let { - arguments.addAll(listOf("-map", "0:v:$it")) - } - audioArguments?.index?.let { - arguments.addAll(listOf("-map", "0:a:$it")) - } - return arguments - } -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt similarity index 63% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index b32ac133..c1ead14c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -1,48 +1,41 @@ -package no.iktdev.mediaprocessing.coordinator.reader +package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson -import kotlinx.coroutines.launch -import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener -import no.iktdev.mediaprocessing.shared.common.ProcessingService +import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service -class ParseVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { +class ParseVideoFileStreams() : TaskCreator() { - override fun onResult(referenceId: String, data: MessageDataWrapper) { - producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data) + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED + + override val requiredEvents: List = listOf( + KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED + ) + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } } - override fun onReady() { - coordinator.addListener(object : TaskCreatorListener { - override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - if (event.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED && event.data.isSuccess()) { - io.launch { - val result = parseStreams(event.data as ReaderPerformed) - onResult(referenceId, result) - } - } - } + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - }) + return parseStreams(event.data as ReaderPerformed) } - fun parseStreams(data: ReaderPerformed): MessageDataWrapper { val gson = Gson() return try { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt similarity index 55% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 85690cd8..f017ded5 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -1,45 +1,48 @@ -package no.iktdev.mediaprocessing.coordinator.reader +package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson import com.google.gson.JsonObject -import kotlinx.coroutines.launch -import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener -import no.iktdev.mediaprocessing.shared.common.ProcessingService +import kotlinx.coroutines.runBlocking +import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @Service -class ReadVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { +class ReadVideoFileStreams(): TaskCreator() { - override fun onResult(referenceId: String, data: MessageDataWrapper) { - producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data) + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED + + override val requiredEvents: List = listOf( + KafkaEvents.EVENT_PROCESS_STARTED + ) + + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } } - override fun onReady() { - coordinator.addListener(object : TaskCreatorListener { - override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) { - io.launch { - val result = fileReadStreams(event.data as ProcessStarted) - onResult(referenceId, result) - } - } - } + override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> { + return listOf { + isEventOfSingle(event, KafkaEvents.EVENT_PROCESS_STARTED) + } + } - }) + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + + return runBlocking { fileReadStreams(event.data as ProcessStarted) } } suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt new file mode 100644 index 00000000..b77fd762 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -0,0 +1,173 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg + +import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.shared.common.Preference +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service +import java.io.File + +@Service +class EncodeArgumentCreatorTask : TaskCreator() { + val preference = Preference.getPreference() + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED + + override val requiredEvents: List = + listOf( + KafkaEvents.EVENT_PROCESS_STARTED, + KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, + KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, + KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + ) + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + + val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted + val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed + val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed + val serializedParsedStreams = readStreamsEvent.streams + + val outDir = SharedConfig.outgoingContent.using(baseInfo.title) + + return getFfmpegVideoArguments( + inputFile = inputFile.file, + outDir = outDir, + preference = preference.encodePreference, + baseInfo = baseInfo, + serializedParsedStreams = serializedParsedStreams + ) + } + + private fun getFfmpegVideoArguments( + inputFile: String, + outDir: File, + preference: EncodingPreference, + baseInfo: BaseInfoPerformed, + serializedParsedStreams: ParsedMediaStreams + ): MessageDataWrapper { + val outVideoFile = outDir.using("${baseInfo.sanitizedName}.mp4").absolutePath + + val vaas = VideoAndAudioSelector(serializedParsedStreams, preference) + + val vArg = vaas.getVideoStream() + ?.let { VideoArguments(it, serializedParsedStreams, preference.video).getVideoArguments() } + val aArg = vaas.getAudioStream() + ?.let { AudioArguments(it, serializedParsedStreams, preference.audio).getAudioArguments() } + + val vaArgs = toFfmpegWorkerArguments(vArg, aArg) + return if (vaArgs.isEmpty()) { + MessageDataWrapper(Status.ERROR, message = "Unable to produce arguments") + } else { + FfmpegWorkerArgumentsCreated( + status = Status.COMPLETED, + inputFile = inputFile, + entries = listOf( + FfmpegWorkerArgument( + outputFile = outVideoFile, + arguments = vaArgs + ) + ) + ) + } + } + + private class VideoAndAudioSelector(val mediaStreams: ParsedMediaStreams, val preference: EncodingPreference) { + private var defaultVideoSelected: VideoStream? = mediaStreams.videoStream + .filter { (it.duration_ts ?: 0) > 0 } + .maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.videoStream.minByOrNull { it.index } + private var defaultAudioSelected: AudioStream? = mediaStreams.audioStream + .filter { (it.duration_ts ?: 0) > 0 } + .maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.audioStream.minByOrNull { it.index } + + fun getVideoStream(): VideoStream? { + return defaultVideoSelected + } + + fun getAudioStream(): AudioStream? { + val languageFiltered = mediaStreams.audioStream.filter { it.tags.language == preference.audio.language } + val channeledAndCodec = languageFiltered.find { + it.channels >= (preference.audio.channels ?: 2) && it.codec_name == preference.audio.codec.lowercase() + } + return channeledAndCodec ?: return languageFiltered.minByOrNull { it.index } ?: defaultAudioSelected + } + + } + + private class VideoArguments( + val videoStream: VideoStream, + val allStreams: ParsedMediaStreams, + val preference: VideoPreference + ) { + fun isVideoCodecEqual() = getCodec(videoStream.codec_name) == getCodec(preference.codec.lowercase()) + protected fun getCodec(name: String): String { + return when (name) { + "hevc", "hevec", "h265", "h.265", "libx265" + -> "libx265" + + "h.264", "h264", "libx264" + -> "libx264" + + else -> name + } + } + + fun getVideoArguments(): VideoArgumentsDto { + val optionalParams = mutableListOf() + if (preference.pixelFormatPassthrough.none { it == videoStream.pix_fmt }) { + optionalParams.addAll(listOf("-pix_fmt", preference.pixelFormat)) + } + val codecParams = if (isVideoCodecEqual()) listOf("-vcodec", "copy") + else { + optionalParams.addAll(listOf("-crf", preference.threshold.toString())) + listOf("-c:v", getCodec(preference.codec.lowercase())) + } + + return VideoArgumentsDto( + index = allStreams.videoStream.indexOf(videoStream), + codecParameters = codecParams, + optionalParameters = optionalParams + ) + } + } + + private class AudioArguments( + val audioStream: AudioStream, + val allStreams: ParsedMediaStreams, + val preference: AudioPreference + ) { + fun isAudioCodecEqual() = audioStream.codec_name.lowercase() == preference.codec.lowercase() + private fun shouldUseEAC3(): Boolean { + return (preference.defaultToEAC3OnSurroundDetected && audioStream.channels > 2 && audioStream.codec_name.lowercase() != "eac3") + } + + fun getAudioArguments(): AudioArgumentsDto { + val optionalParams = mutableListOf() + val codecParams = if (shouldUseEAC3()) + listOf("-c:a", "eac3") + else if (!isAudioCodecEqual()) { + listOf("-c:a", preference.codec) + } else + listOf("-acodec", "copy") + return AudioArgumentsDto( + index = allStreams.audioStream.indexOf(audioStream), + codecParameters = codecParams, + optionalParameters = optionalParams + ) + } + + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt new file mode 100644 index 00000000..52678033 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -0,0 +1,165 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg + +import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.* +import no.iktdev.mediaprocessing.shared.common.Preference +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service +import java.io.File + +@Service +class ExtractArgumentCreatorTask : TaskCreator() { + + val preference = Preference.getPreference() + + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED + + override val requiredEvents: List = listOf( + KafkaEvents.EVENT_PROCESS_STARTED, + KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, + KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, + KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE + ) + + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } + if (!requiredEvents.contains(event.event)) { + log.info { "${this.javaClass.simpleName} ignores ${event.event}@${event.eventId}" } + return null + } + + val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted + val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed + val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed + val serializedParsedStreams = readStreamsEvent.streams + + val outDir = SharedConfig.outgoingContent.using(baseInfo.title) + + return getFfmpegSubtitleArguments( + inputFile = inputFile.file, + outDir = outDir, + baseInfo = baseInfo, + serializedParsedStreams = serializedParsedStreams + ) + } + + private fun getFfmpegSubtitleArguments( + inputFile: String, + outDir: File, + baseInfo: BaseInfoPerformed, + serializedParsedStreams: ParsedMediaStreams + ): MessageDataWrapper { + val subRootDir = outDir.using("sub") + val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments() + + val entries = sArg.map { + FfmpegWorkerArgument( + arguments = it.codecParameters + it.optionalParameters + listOf("-map", "0:s:${it.index}"), + outputFile = subRootDir.using(it.language, "${baseInfo.sanitizedName}.${it.format}").absolutePath + ) + } + return FfmpegWorkerArgumentsCreated( + status = Status.COMPLETED, + inputFile = inputFile, + entries = entries + ) + } + + private class SubtitleArguments(val subtitleStreams: List) { + /** + * @property DEFAULT is default subtitle as dialog + * @property CC is Closed-Captions + * @property SHD is Hard of hearing + * @property NON_DIALOGUE is for Signs or Song (as in lyrics) + */ + private enum class SubtitleType { + DEFAULT, + CC, + SHD, + NON_DIALOGUE + } + + private fun SubtitleStream.isCC(): Boolean { + val title = this.tags.title?.lowercase() ?: return false + val keywords = listOf("cc", "closed caption") + return keywords.any { title.contains(it) } + } + + private fun SubtitleStream.isSHD(): Boolean { + val title = this.tags.title?.lowercase() ?: return false + val keywords = listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing") + return keywords.any { title.contains(it) } + } + + private fun SubtitleStream.isSignOrSong(): Boolean { + val title = this.tags.title?.lowercase() ?: return false + val keywords = listOf("song", "songs", "sign", "signs") + return keywords.any { title.contains(it) } + } + + private fun getSubtitleType(stream: SubtitleStream): SubtitleType { + return if (stream.isSignOrSong()) + SubtitleType.NON_DIALOGUE + else if (stream.isSHD()) { + SubtitleType.SHD + } else if (stream.isCC()) { + SubtitleType.CC + } else SubtitleType.DEFAULT + } + + fun getSubtitleArguments(): List { + val acceptable = subtitleStreams.filter { !it.isSignOrSong() } + val codecFiltered = acceptable.filter { getFormatToCodec(it.codec_name) != null } + val mappedToType = + codecFiltered.map { getSubtitleType(it) to it }.filter { it.first in SubtitleType.entries } + .groupBy { it.second.tags.language ?: "eng" } + .mapValues { entry -> + val languageStreams = entry.value + val sortedStreams = languageStreams.sortedBy { SubtitleType.entries.indexOf(it.first) } + sortedStreams.firstOrNull()?.second + }.mapNotNull { it.value } + + return mappedToType.mapNotNull { stream -> + getFormatToCodec(stream.codec_name)?.let { format -> + SubtitleArgumentsDto( + index = subtitleStreams.indexOf(stream), + language = stream.tags.language ?: "eng", + format = format + ) + } + } + + } + + fun getFormatToCodec(codecName: String): String? { + return when (codecName) { + "ass" -> "ass" + "subrip" -> "srt" + "webvtt", "vtt" -> "vtt" + "smi" -> "smi" + "hdmv_pgs_subtitle" -> null + else -> null + } + } + + } + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/FFmpegBase.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/FFmpegBase.kt new file mode 100644 index 00000000..551a9f7f --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/FFmpegBase.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg + +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioArgumentsDto +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoArgumentsDto + +fun toFfmpegWorkerArguments( + videoArguments: VideoArgumentsDto?, + audioArguments: AudioArgumentsDto? +): List { + val arguments = mutableListOf( + *videoArguments?.codecParameters?.toTypedArray() ?: arrayOf(), + *videoArguments?.optionalParameters?.toTypedArray() ?: arrayOf(), + *audioArguments?.codecParameters?.toTypedArray() ?: arrayOf(), + *audioArguments?.optionalParameters?.toTypedArray() ?: arrayOf() + ) + videoArguments?.index?.let { + arguments.addAll(listOf("-map", "0:v:$it")) + } + audioArguments?.index?.let { + arguments.addAll(listOf("-map", "0:a:$it")) + } + return arguments +} \ No newline at end of file diff --git a/apps/processer/build.gradle.kts b/apps/processer/build.gradle.kts index b01d2e4d..e576e72f 100644 --- a/apps/processer/build.gradle.kts +++ b/apps/processer/build.gradle.kts @@ -20,14 +20,22 @@ repositories { } } +val exposedVersion = "0.44.0" dependencies { /*Spring boot*/ implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter:2.7.0") // implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + implementation("org.springframework.kafka:spring-kafka:2.8.5") + implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion") + implementation ("mysql:mysql-connector-java:8.0.29") + implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20210307") @@ -37,10 +45,13 @@ dependencies { implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") + implementation("com.github.pgreze:kotlin-process:1.4.1") + //implementation(project(mapOf("path" to ":shared:kafka"))) implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:common"))) + implementation(project(mapOf("path" to ":shared:kafka"))) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt new file mode 100644 index 00000000..195fa01b --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Coordinator.kt @@ -0,0 +1,86 @@ +package no.iktdev.mediaprocessing.processer + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.DatabaseConfig +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.stereotype.Service +import javax.annotation.PostConstruct + +@Service +@EnableScheduling +class Coordinator() { + + @Autowired + private lateinit var producer: CoordinatorProducer + + @Autowired + private lateinit var listener: DefaultMessageListener + + private val log = KotlinLogging.logger {} + + val listeners = EventBasedMessageListener() + + val io = Coroutines.io() + + fun readAllAvailableInQueue() { + val messages = PersistentDataReader().getAvailableProcessEvents() + io.launch { + messages.forEach { + delay(1000) + createTasksBasedOnEventsAndPersistance(referenceId = it.referenceId, eventId = it.eventId, messages) + } + } + } + + fun readAllMessagesFor(referenceId: String, eventId: String) { + val messages = PersistentDataReader().getAvailableProcessEvents() + createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) + } + + fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List) { + val triggered = messages.find { it.eventId == eventId } + if (triggered == null) { + log.error { "Could not find $eventId in provided messages" } + return + } + listeners.forwardEventMessageToListeners(triggered, messages) + } + + val processKafkaEvents = listOf( + KafkaEvents.EVENT_WORK_ENCODE_CREATED, + KafkaEvents.EVENT_WORK_EXTRACT_CREATED, + ) + + @PostConstruct + fun onReady() { + io.launch { + listener.onMessageReceived = { event -> + if (event.key in processKafkaEvents) { + val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) + if (!success) { + log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" } + } else + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } else if (event.key in listOf(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED, KafkaEvents.EVENT_WORK_ENCODE_SKIPPED)) { + readAllAvailableInQueue() + } else { + log.debug { "Skipping ${event.key}" } + } + } + listener.listen(KafkaEnv.kafkaTopic) + } + readAllAvailableInQueue() + } + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt deleted file mode 100644 index 75d84588..00000000 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt +++ /dev/null @@ -1,20 +0,0 @@ -package no.iktdev.mediaprocessing.processer - -import mu.KotlinLogging -import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import org.springframework.stereotype.Service - -@Service -class EncodeService { - /*private val log = KotlinLogging.logger {} - val io = Coroutines.io() - - val producer = CoordinatorProducer() - private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> - if (event.key == KafkaEvents.EVENT_WORK_ENCODE_CREATED) { - - } - }*/ - -} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt new file mode 100644 index 00000000..b306c9ff --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EventBasedMessageListener.kt @@ -0,0 +1,37 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents + +class EventBasedMessageListener { + private val listeners: MutableList = mutableListOf() + + fun add(produces: KafkaEvents, listener: TaskCreatorListener) { + listeners.add(Tasks(produces, listener)) + } + + fun add(task: Tasks) { + listeners.add(task) + } + + private fun waitingListeners(events: List): List { + val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } + return nonCreators + } + + fun forwardEventMessageToListeners(newEvent: PersistentProcessDataMessage, events: List) { + waitingListeners(events).forEach { + try { + it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events) + } catch (e: Exception) { + e.printStackTrace() + } + } + } + +} + +data class Tasks( + val producesEvent: KafkaEvents, + val taskHandler: TaskCreatorListener +) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt deleted file mode 100644 index 7429d816..00000000 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt +++ /dev/null @@ -1,8 +0,0 @@ -package no.iktdev.mediaprocessing.processer - -import org.springframework.stereotype.Service - -@Service -class ExtractService { - -} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt new file mode 100644 index 00000000..58d81c72 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt @@ -0,0 +1,16 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import + +@Configuration +class SocketLocalInit: SocketImplementation() + +@Configuration +@Import(CoordinatorProducer::class, DefaultMessageListener::class) +class KafkaLocalInit: KafkaImplementation() { +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index 19ae6ea1..0bb63ff9 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -1,7 +1,10 @@ package no.iktdev.mediaprocessing.processer +import kotlinx.coroutines.launch import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication @@ -13,10 +16,23 @@ class ProcesserApplication { } fun main(args: Array) { - //val dataSource = MySqlDataSource.fromDatabaseEnv(); + val dataSource = MySqlDataSource.fromDatabaseEnv() + Coroutines.default().launch { + dataSource.createDatabase() + dataSource.createTables( + processerEvents + ) + } val context = runApplication(*args) } +fun getComputername(): String { + return listOfNotNull( + System.getenv("hostname"), + System.getenv("computername") + ).first() +} + class SocketImplemented: SocketImplementation() { } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt new file mode 100644 index 00000000..c9e1f6c4 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt @@ -0,0 +1,16 @@ +package no.iktdev.streamit.content.encode + +import no.iktdev.exfl.using +import java.io.File + +class ProcesserEnv { + companion object { + val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" + val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false + val maxEncodeRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1} + val maxExtractRunners: Int = try {System.getenv("SIMULTANEOUS_EXTRACT_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1} + + val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else + File("data").using("logs") + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt new file mode 100644 index 00000000..5029d643 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Task.kt @@ -0,0 +1,92 @@ +package no.iktdev.mediaprocessing.processer + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.simp.SimpMessagingTemplate +import javax.annotation.PostConstruct + +abstract class TaskCreator: TaskCreatorListener { + private val log = KotlinLogging.logger {} + + @Autowired + lateinit var producer: CoordinatorProducer + + @Autowired + lateinit var coordinator: Coordinator + + @Autowired + lateinit var socketMessage: SimpMessagingTemplate + + open val requiredEvents: List = listOf() + + open fun isPrerequisiteEventsOk(events: List): Boolean { + val currentEvents = events.map { it.event } + return requiredEvents.all { currentEvents.contains(it) } + } + open fun isPrerequisiteDataPresent(events: List): Boolean { + val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() } + return failed.isEmpty() + } + + open fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { + return event.event == singleOne + } + + abstract fun getListener(): Tasks + + open fun prerequisitesRequired(events: List): List<() -> Boolean> { + return listOf { + isPrerequisiteEventsOk(events) + } + } + + + + private val context: MutableMap = mutableMapOf() + private val context_key_reference = "reference" + private val context_key_producesEvent = "event" + final override fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List) { + context[context_key_reference] = referenceId + getListener().producesEvent.let { + context[context_key_producesEvent] = it + } + + if (prerequisitesRequired(events).all { it.invoke() }) { + val result = onProcessEvents(event, events) + if (result != null) { + onResult(result) + } + } else { + log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" } + } + } + + abstract fun onProcessEvents(event: PersistentProcessDataMessage, events: List): MessageDataWrapper? + + + private fun onResult(data: MessageDataWrapper) { + producer.sendMessage( + referenceId = context[context_key_reference] as String, + event = context[context_key_producesEvent] as KafkaEvents, + data = data + ) + } + + @PostConstruct + fun postConstruct() { + coordinator.listeners.add(getListener()) + } +} + +fun interface Prerequisite { + fun execute(value: Any): Boolean +} + +interface TaskCreatorListener { + fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List): Unit +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt new file mode 100644 index 00000000..aba6950b --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt @@ -0,0 +1,12 @@ +package no.iktdev.mediaprocessing.processer.ffmpeg + +data class FfmpegDecodedProgress( + val progress: Int = -1, + val time: String, + val duration: String, + val speed: String, + val estimatedCompletionSeconds: Long = -1, + val estimatedCompletion: String = "Unknown", +) + +data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegProgressDecoder.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegProgressDecoder.kt new file mode 100644 index 00000000..350f0359 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegProgressDecoder.kt @@ -0,0 +1,157 @@ +package no.iktdev.mediaprocessing.processer.ffmpeg + +import java.lang.StringBuilder +import java.time.LocalTime +import java.time.format.DateTimeFormatter +import java.util.concurrent.TimeUnit +import kotlin.math.floor + +class FfmpegProgressDecoder { + + data class DecodedProgressData( + val frame: Int?, + val fps: Double?, + val stream_0_0_q: Double?, + val bitrate: String?, + val total_size: Int?, + val out_time_us: Long?, + val out_time_ms: Long?, + val out_time: String?, + val dup_frames: Int?, + val drop_frames: Int?, + val speed: Double?, + val progress: String? + ) + + val expectedKeys = listOf( + "frame=", + "fps=", + "stream_0_0_q=", + "bitrate=", + "total_size=", + "out_time_us=", + "out_time_ms=", + "out_time=", + "dup_frames=", + "drop_frames=", + "speed=", + "progress=" + ) + var duration: Int? = null + set(value) { + if (field == null || field == 0) + field = value + } + var durationTime: String = "NA" + fun parseVideoProgress(lines: List): DecodedProgressData? { + var frame: Int? = null + var progress: String? = null + val metadataMap = mutableMapOf() + + try { + val eqValue = Regex("=") + for (line in lines) { + val keyValuePairs = Regex("=\\s*").replace(line, "=").split(" ").filter { it.isNotBlank() }.filter { eqValue.containsMatchIn(it) } + for (keyValuePair in keyValuePairs) { + val (key, value) = keyValuePair.split("=") + metadataMap[key] = value + } + + if (frame == null) { + frame = metadataMap["frame"]?.toIntOrNull() + } + + progress = metadataMap["progress"] + } + } catch (e: Exception) { + e.printStackTrace() + } + + return if (progress != null) { + // When "progress" is found, build and return the VideoMetadata object + DecodedProgressData( + frame, metadataMap["fps"]?.toDoubleOrNull(), metadataMap["stream_0_0_q"]?.toDoubleOrNull(), + metadataMap["bitrate"], metadataMap["total_size"]?.toIntOrNull(), metadataMap["out_time_us"]?.toLongOrNull(), + metadataMap["out_time_ms"]?.toLongOrNull(), metadataMap["out_time"], metadataMap["dup_frames"]?.toIntOrNull(), + metadataMap["drop_frames"]?.toIntOrNull(), metadataMap["speed"]?.replace("x", "", ignoreCase = true)?.toDoubleOrNull(), progress + ) + } else { + null // If "progress" is not found, return null + } + } + + + fun isDuration(value: String): Boolean { + return value.contains("Duration", ignoreCase = true) + } + fun setDuration(value: String) { + val results = Regex("Duration:\\s*([^,]+),").find(value)?.groupValues?.firstOrNull() + durationTime = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(results.toString())?.value ?: "NA" + duration = timeSpanToSeconds(results) + } + + private fun timeSpanToSeconds(time: String?): Int? + { + time ?: return null + val timeString = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(time) ?: return null + val strippedMS = Regex("[0-9]+:[0-9]+:[0-9]+").find(timeString.value) ?: return null + val outTime = LocalTime.parse(strippedMS.value, DateTimeFormatter.ofPattern("HH:mm:ss")) + return outTime.toSecondOfDay() + } + + + fun getProgress(decoded: DecodedProgressData): FfmpegDecodedProgress { + if (duration == null) + return FfmpegDecodedProgress(duration = durationTime, time = "NA", speed = "NA") + val progressTime = timeSpanToSeconds(decoded.out_time) ?: 0 + val progress = floor((progressTime.toDouble() / duration!!.toDouble()) *100).toInt() + + val ect = getEstimatedTimeRemaining(decoded) + + return FfmpegDecodedProgress( + progress = progress, + estimatedCompletionSeconds = ect, + estimatedCompletion = getETA(ect), + duration = durationTime, + time = decoded.out_time ?: "NA", + speed = decoded.speed?.toString() ?: "NA" + ) + } + + fun getEstimatedTimeRemaining(decoded: DecodedProgressData): Long { + val position = timeSpanToSeconds(decoded.out_time) ?: 0 + return if(duration == null || decoded.speed == null) -1 else + Math.round(Math.round(duration!!.toDouble() - position.toDouble()) / decoded.speed) + } + + fun getECT(time: Long): ECT { + var seconds = time + val day = TimeUnit.SECONDS.toDays(seconds) + seconds -= java.util.concurrent.TimeUnit.DAYS.toSeconds(day) + + val hour = TimeUnit.SECONDS.toHours(seconds) + seconds -= java.util.concurrent.TimeUnit.HOURS.toSeconds(hour) + + val minute = TimeUnit.SECONDS.toMinutes(seconds) + seconds -= java.util.concurrent.TimeUnit.MINUTES.toSeconds(minute) + + return ECT(day.toInt(), hour.toInt(), minute.toInt(), seconds.toInt()) + } + private fun getETA(time: Long): String { + val etc = getECT(time) ?: return "Unknown" + val str = StringBuilder() + if (etc.day > 0) { + str.append("${etc.day}d").append(" ") + } + if (etc.hour > 0) { + str.append("${etc.hour}h").append(" ") + } + if (etc.day == 0 && etc.minute > 0) { + str.append("${etc.minute}m").append(" ") + } + if (etc.hour == 0 && etc.second > 0) { + str.append("${etc.second}s").append(" ") + } + return str.toString().trim() + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt new file mode 100644 index 00000000..e994d2c8 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt @@ -0,0 +1,102 @@ +package no.iktdev.mediaprocessing.processer.ffmpeg + +import com.github.pgreze.process.Redirect +import com.github.pgreze.process.process +import com.google.gson.Gson +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated +import no.iktdev.streamit.content.encode.ProcesserEnv +import java.io.BufferedWriter +import java.io.File +import java.io.FileWriter + +class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents) { + val scope = Coroutines.io() + val decoder = FfmpegProgressDecoder() + private val outputCache = mutableListOf() + val logFile = ProcesserEnv.logDirectory.using("$eventId-${File(info.outFile).nameWithoutExtension}.log") + + val getOutputCache = outputCache.toList() + + data class FfmpegWorkerArgumentsBuilder( + private val mutableList: MutableList = mutableListOf() + ) { + private val defaultArguments = listOf( + "-nostdin", + "-hide_banner" + ) + private val progressArguments = listOf("-progress", "pipe:1") + fun using(info: FfmpegWorkRequestCreated) = apply { + this.mutableList.add(info.inputFile) + this.mutableList.addAll(info.arguments) + this.mutableList.add(info.outFile) + } + + fun build(): List { + return (if (ProcesserEnv.allowOverwrite) listOf("-y") else emptyList()) + defaultArguments + listOf("-i") + mutableList + } + + fun buildWithProgress(): List { + return build() + progressArguments + } + } + + suspend fun run() { + val args = FfmpegWorkerArgumentsBuilder().using(info).build() + execute(args) + } + + suspend fun runWithProgress() { + val args = FfmpegWorkerArgumentsBuilder().using(info).buildWithProgress() + execute(args) + } + + private suspend fun execute(args: List) { + listener.onStarted(info) + val processOp = process(ProcesserEnv.ffmpeg, *args.toTypedArray(), + stdout = Redirect.CAPTURE, + stderr = Redirect.CAPTURE, + consumer = { + onOutputChanged(it) + }, + destroyForcibly = true) + + val result = processOp + println(Gson().toJson(result)) + if (result.resultCode != 0) { + listener.onError(info, result.output.joinToString("\n")) + } else { + listener.onCompleted(info) + } + } + + fun onOutputChanged(line: String) { + outputCache.add(line) + writeToLog(line) + // toList is needed to prevent mutability. + val progress = decoder.parseVideoProgress(outputCache.toList()) + } + + fun writeToLog(line: String) { + val fileWriter = FileWriter(logFile, true) // true indikerer at vi ønsker å appende til filen + val bufferedWriter = BufferedWriter(fileWriter) + + // Skriv logglinjen til filen + bufferedWriter.write(line) + bufferedWriter.newLine() // Legg til en ny linje etter logglinjen + + // Lukk BufferedWriter og FileWriter for å frigjøre ressurser + bufferedWriter.close() + fileWriter.close() + } + + +} + +interface FfmpegWorkerEvents { + fun onStarted(info: FfmpegWorkRequestCreated,) + fun onCompleted(info: FfmpegWorkRequestCreated) + fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) + fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt new file mode 100644 index 00000000..4bae0be2 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ClaimsService.kt @@ -0,0 +1,37 @@ +package no.iktdev.mediaprocessing.processer.services + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.processer.Coordinator +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Service + +@Service +@EnableScheduling +class ClaimsService() { + private val log = KotlinLogging.logger {} + + @Autowired + lateinit var coordinator: Coordinator + + @Scheduled(fixedDelay = (300_000)) + fun validateClaims() { + val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents() + expiredClaims.forEach { + log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } + } + val store = PersistentDataStore() + expiredClaims.forEach { + val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) + if (result) { + log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" } + } else { + log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" } + } + } + coordinator.readAllAvailableInQueue() + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt new file mode 100644 index 00000000..92df6da2 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -0,0 +1,178 @@ +package no.iktdev.mediaprocessing.processer.services + +import kotlinx.coroutines.* +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.processer.Tasks +import no.iktdev.mediaprocessing.processer.TaskCreator +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents +import no.iktdev.mediaprocessing.processer.getComputername +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated +import no.iktdev.streamit.content.encode.ProcesserEnv +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service +import java.io.File +import java.util.* +import javax.annotation.PreDestroy + +@Service +class EncodeService: TaskCreator() { + private val log = KotlinLogging.logger {} + + val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED + + val scope = Coroutines.io() + private var runner: FfmpegWorker? = null + private var runnerJob: Job? = null + val encodeServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + init { + log.info { "Starting encode service with id: $encodeServiceId" } + } + + override val requiredEvents: List + get() = listOf(KafkaEvents.EVENT_WORK_ENCODE_CREATED) + + override fun getListener(): Tasks { + return Tasks(producesEvent, this) + } + + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentProcessDataMessage, events: List): MessageDataWrapper? { + if (event.data !is FfmpegWorkRequestCreated) { + return MessageDataWrapper(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") + } + + val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + if (isAlreadyClaimed) { + log.warn { "Process is already claimed!" } + return null + } + + if (runnerJob?.isActive != true) { + startEncode(event) + } else { + log.warn { "Worker is already running.." } + } + // This should never return any other than continue or skipped + return null + } + + fun startEncode(event: PersistentProcessDataMessage) { + val ffwrc = event.data as FfmpegWorkRequestCreated + File(ffwrc.outFile).parentFile.mkdirs() + if (!ProcesserEnv.logDirectory.exists()) { + ProcesserEnv.logDirectory.mkdirs() + } + + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId) + if (setClaim) { + log.info { "Claim successful for ${event.referenceId} encode" } + runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents) + if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { + ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") + return + } + runnerJob = scope.launch { + runner!!.runWithProgress() + } + + } else { + log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" } + } + } + + val ffmpegWorkerEvents = object : FfmpegWorkerEvents { + override fun onStarted(info: FfmpegWorkRequestCreated) { + val runner = this@EncodeService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce start message when the referenceId is not present" } + return + } + log.info { "Encode started for ${runner.referenceId}" } + PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) + sendProgress(info, null, false) + + scope.launch { + while (runnerJob?.isActive == true) { + delay(java.time.Duration.ofMinutes(5).toMillis()) + PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId) + } + } + } + + override fun onCompleted(info: FfmpegWorkRequestCreated) { + val runner = this@EncodeService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce completion message when the referenceId is not present" } + return + } + log.info { "Encode completed for ${runner.referenceId}" } + val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) + runBlocking { + delay(1000) + if (!consumedIsSuccessful) { + PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId) + } + delay(1000) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) + + while (!readbackIsSuccess) { + delay(1000) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) + } + producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, + FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)) + clearWorker() + } + + } + + override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { + val runner = this@EncodeService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce error message when the referenceId is not present" } + return + } + log.info { "Encode failed for ${runner.referenceId}" } + producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, + FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)) + sendProgress(info = info, ended = true) + clearWorker() + } + + override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { + sendProgress(info, progress, false) + } + + } + + fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) { + + } + + + fun clearWorker() { + this.runner?.scope?.cancel() + this.runner = null + } + + @PreDestroy + fun shutdown() { + scope.cancel() + runner?.scope?.cancel("Stopping application") + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt new file mode 100644 index 00000000..2759a4ec --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -0,0 +1,184 @@ +package no.iktdev.mediaprocessing.processer.services + +import kotlinx.coroutines.* +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.processer.TaskCreator +import no.iktdev.mediaprocessing.processer.Tasks +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker +import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents +import no.iktdev.mediaprocessing.processer.getComputername +import no.iktdev.mediaprocessing.shared.common.limitedWhile +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated +import no.iktdev.streamit.content.encode.ProcesserEnv +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.stereotype.Service +import java.io.File +import java.util.* +import javax.annotation.PreDestroy + +@Service +class ExtractService: TaskCreator() { + private val log = KotlinLogging.logger {} + + val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED + + val scope = Coroutines.io() + + private var runner: FfmpegWorker? = null + private var runnerJob: Job? = null + + val extractServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + init { + log.info { "Starting extract service with id: $extractServiceId" } + } + override fun getListener(): Tasks { + return Tasks(producesEvent, this) + } + + override val requiredEvents: List + get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED) + + override fun prerequisitesRequired(events: List): List<() -> Boolean> { + return super.prerequisitesRequired(events) + listOf { + isPrerequisiteDataPresent(events) + } + } + + override fun onProcessEvents(event: PersistentProcessDataMessage, events: List): MessageDataWrapper? { + if (event.data !is FfmpegWorkRequestCreated) { + return MessageDataWrapper(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}") + } + + val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + if (isAlreadyClaimed) { + log.warn { "Process is already claimed!" } + return null + } + + if (runnerJob?.isActive != true) { + startExtract(event) + } else { + log.warn { "Worker is already running.." } + } + // This should never return any other than continue or skipped + return null + } + + fun startExtract(event: PersistentProcessDataMessage) { + val ffwrc = event.data as FfmpegWorkRequestCreated + File(ffwrc.outFile).parentFile.mkdirs() + if (!ProcesserEnv.logDirectory.exists()) { + ProcesserEnv.logDirectory.mkdirs() + } + + + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId) + if (setClaim) { + log.info { "Claim successful for ${event.referenceId} extract" } + runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents) + + if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { + ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") + return + } + runnerJob = scope.launch { + runner!!.run() + } + } else { + log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" } + } + + } + + val ffmpegWorkerEvents = object : FfmpegWorkerEvents { + override fun onStarted(info: FfmpegWorkRequestCreated) { + val runner = this@ExtractService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce start message when the referenceId is not present" } + return + } + log.info { "Extract started for ${runner.referenceId}" } + PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, extractServiceId) + sendState(info, false) + } + + override fun onCompleted(info: FfmpegWorkRequestCreated) { + val runner = this@ExtractService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce completion message when the referenceId is not present" } + return + } + log.info { "Extract completed for ${runner.referenceId}" } + var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) + runBlocking { + + delay(1000) + limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) { + consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId) + } + + log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" } + delay(1000) + + + + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) + limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) { + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId) + log.info { readbackIsSuccess } + } + log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" } + + + producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, + FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = extractServiceId, derivedFromEventId = runner.eventId) + ) + log.info { "Extract is releasing worker" } + clearWorker() + } + } + + override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { + val runner = this@ExtractService.runner + if (runner == null || runner.referenceId.isBlank()) { + log.error { "Can't produce error message when the referenceId is not present" } + return + } + log.info { "Extract failed for ${runner.referenceId}" } + producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, + FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId) + ) + sendState(info, ended= true) + clearWorker() + } + + override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { + // None as this will not be running with progress + } + + } + + fun sendState(info: FfmpegWorkRequestCreated, ended: Boolean) { + + } + + + fun clearWorker() { + this.runner?.scope?.cancel() + this.runner = null + } + + @PreDestroy + fun shutdown() { + scope.cancel() + runner?.scope?.cancel("Stopping application") + } +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/mediaprocessing/apps/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/mediaprocessing/apps/processer/ProcesserApplication.kt deleted file mode 100644 index bad6de26..00000000 --- a/apps/processer/src/main/kotlin/no/mediaprocessing/apps/processer/ProcesserApplication.kt +++ /dev/null @@ -1,4 +0,0 @@ -package no.mediaprocessing.apps.processer - -class ProcesserApplication { -} \ No newline at end of file diff --git a/apps/processer/src/main/resources/application.properties b/apps/processer/src/main/resources/application.properties new file mode 100644 index 00000000..a1f43aa1 --- /dev/null +++ b/apps/processer/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.output.ansi.enabled=always +logging.level.org.apache.kafka=WARN +logging.level.root=INFO diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DeserializingRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DeserializingRegistry.kt deleted file mode 100644 index fb3e5b0c..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DeserializingRegistry.kt +++ /dev/null @@ -1,39 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common - -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import kotlin.reflect.KClass - -class DeserializingRegistry { - companion object { - val deserializables = mutableListOf?>>( - KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class, - KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class, - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to null, - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to null, - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to null, - KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null, - KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null, - - KafkaEvents.EVENT_WORK_ENCODE_CREATED to null, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED to null, - KafkaEvents.EVENT_WORK_CONVERT_CREATED to null, - - KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to null, - KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to null, - KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null, - KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null, - - KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null, - KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null, - KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null, - - - - ) - } -} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt index d4c11736..23d5182c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt @@ -8,22 +8,30 @@ private val log = KotlinLogging.logger {} class Preference { companion object { + private var prevPreference: PreferenceDto? = null fun getPreference(): PreferenceDto { + val preference = readOrDefaultPreference() + if (preference != prevPreference) { + log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec } + log.info { "[Audio]: Language = " + preference.encodePreference.audio.language } + log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels } + log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate } + log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected } + + log.info { "[Video]: Codec = " + preference.encodePreference.video.codec } + log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat } + log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") } + log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold } + } + return preference.also { prevPreference = it } + } + + private fun readOrDefaultPreference(): PreferenceDto { val preference = readPreferenceFromFile() ?: PreferenceDto() - log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec } - log.info { "[Audio]: Language = " + preference.encodePreference.audio.language } - log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels } - log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate } - log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected } - - log.info { "[Video]: Codec = " + preference.encodePreference.video.codec } - log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat } - log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") } - log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold } - return preference } + private fun readPreferenceFromFile(): PreferenceDto? { val prefFile = SharedConfig.preference if (!prefFile.exists()) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 86842154..7f2fd755 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.common +import kotlinx.coroutines.delay import mu.KotlinLogging import java.io.File import java.io.RandomAccessFile @@ -18,4 +19,14 @@ fun isFileAvailable(file: File): Boolean { stream?.close() } return false +} + + +suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) { + var elapsedDelay = 0L + do { + block.invoke() + elapsedDelay += delayed + delay(delayed) + } while (condition.invoke() && elapsedDelay < maxDuration) } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt index 8a680882..f6966bf6 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -31,6 +31,7 @@ fun insertWithSuccess(block: () -> T): Boolean { transaction { try { block() + commit() } catch (e: Exception) { e.printStackTrace() // log the error here or handle the exception as needed @@ -49,10 +50,13 @@ fun executeOrException(block: () -> T): Exception? { transaction { try { block() + commit() null } catch (e: Exception) { // log the error here or handle the exception as needed + rollback() e + } } } catch (e: Exception) { @@ -66,6 +70,7 @@ fun executeWithStatus(block: () -> T): Boolean { transaction { try { block() + commit() } catch (e: Exception) { e.printStackTrace() // log the error here or handle the exception as needed diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt index 5481f648..07a568ef 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt @@ -29,7 +29,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp else -> sanitizedName } val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped - return MovieInfo(type = "movie", cleanup(nonResolutioned), cleanup(nonResolutioned)) + return MovieInfo(title = cleanup(nonResolutioned), fullName = cleanup(nonResolutioned)) } private fun determineSerieFileName(): EpisodeInfo? { @@ -58,7 +58,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp } } else title val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() - return EpisodeInfo(type= "serie", title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) + return EpisodeInfo(title = title, episode = episodeNumber.toInt(), season = seasonNumber.toInt(), episodeTitle = episodeTitle, fullName = fullName) } private fun determineUndefinedFileName(): VideoInfo? { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index 6ce12f05..f326770b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -2,9 +2,9 @@ package no.iktdev.mediaprocessing.shared.common.persistance import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry -import org.jetbrains.exposed.sql.SortOrder -import org.jetbrains.exposed.sql.select -import org.jetbrains.exposed.sql.selectAll +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.jetbrains.exposed.sql.* +import java.time.LocalDateTime class PersistentDataReader { val dzz = DeserializingRegistry() @@ -25,4 +25,62 @@ class PersistentDataReader { } ?: emptyList() } + fun getUncompletedMessages(): List> { + val result = withTransaction { + events.selectAll() + .andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event } + .groupBy { it[events.referenceId] } + .mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } + } ?: emptyList() + return result + } + + fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean { + val result = withTransaction { + processerEvents.select { + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }.singleOrNull() + } + return result?.claimed ?: true + } + + fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean { + return withTransaction { + processerEvents.select { + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimedBy eq claimedBy) + }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } + }?.singleOrNull()?.consumed ?: false + } + + fun getAvailableProcessEvents(): List { + return withTransaction { + processerEvents.select { + (processerEvents.claimed eq false) and + (processerEvents.consumed eq false) + }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } + } ?: emptyList() + } + + fun getExpiredClaimsProcessEvents(): List { + val deadline = LocalDateTime.now() + val entries = withTransaction { + processerEvents.select { + (processerEvents.claimed eq true) and + (processerEvents.consumed neq true) + }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } + } ?: emptyList() + return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline } + } + + fun getProcessEvents(): List { + return withTransaction { + processerEvents.selectAll() + .mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } + } ?: emptyList() + } + + } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt index 295bcea9..0c990e0b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt @@ -2,13 +2,18 @@ package no.iktdev.mediaprocessing.shared.common.persistance import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus +import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.kafka.dto.Message import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insert +import org.jetbrains.exposed.sql.javatime.CurrentDateTime +import org.jetbrains.exposed.sql.update import java.sql.SQLIntegrityConstraintViolationException open class PersistentDataStore { - fun storeMessage(event: String, message: Message<*>): Boolean { + fun storeEventDataMessage(event: String, message: Message<*>): Boolean { val exception = executeOrException { events.insert { it[events.referenceId] = message.referenceId @@ -28,4 +33,78 @@ open class PersistentDataStore { } } + fun storeProcessDataMessage(event: String, message: Message<*>): Boolean { + val exception = executeOrException { + processerEvents.insert { + it[processerEvents.referenceId] = message.referenceId + it[processerEvents.eventId] = message.eventId + it[processerEvents.event] = event + it[processerEvents.data] = message.dataAsJson() + } + } + return if (exception == null) true else { + if (exception.cause is SQLIntegrityConstraintViolationException) { + (exception as ExposedSQLException).errorCode == 1062 + } + else { + exception.printStackTrace() + false + } + } + } + + fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { + return withTransaction { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimed eq false) + }) { + it[processerEvents.claimedBy] = claimedBy + it[lastCheckIn] = CurrentDateTime + it[claimed] = true + } + } == 1 + } + + fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean { + return withTransaction { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimedBy eq claimedBy) and + (processerEvents.claimed eq true) + }) { + it[processerEvents.consumed] = true + } + } == 1 + } + + fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean { + return executeWithStatus { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) and + (processerEvents.claimed eq false) and + (processerEvents.claimedBy eq claimedBy) + }) { + it[lastCheckIn] = CurrentDateTime + } + } + } + + fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean { + val exception = executeOrException { + processerEvents.update({ + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }) { + it[claimedBy] = null + it[lastCheckIn] = null + it[claimed] = false + } + } + return exception == null + } + } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index e36bd085..4166b36a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -10,10 +10,19 @@ data class PersistentMessage( val referenceId: String, val eventId: String, val event: KafkaEvents, + //val metadata: Metadata, val data: MessageDataWrapper, val created: LocalDateTime ) +data class Metadata( + val createdBy: String +) + +fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { + return this.event == event +} + fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { val kev = try { KafkaEvents.toEvent(row[events.event]) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt new file mode 100644 index 00000000..a44cf85e --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentProcessDataMessage.kt @@ -0,0 +1,49 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimed +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimedBy +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.consumed +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.created +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.data +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.event +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.eventId +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.lastCheckIn +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.referenceId +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.jetbrains.exposed.sql.ResultRow +import java.time.LocalDateTime + +data class PersistentProcessDataMessage( + val referenceId: String, + val eventId: String, + val event: KafkaEvents, + val data: MessageDataWrapper, + val created: LocalDateTime, + val claimedBy: String? = null, + val claimed: Boolean = false, + val consumed: Boolean = false, + val lastCheckIn: LocalDateTime? = null +) + +fun fromRowToPersistentProcessDataMessage(row: ResultRow, dez: DeserializingRegistry): PersistentProcessDataMessage? { + val kev = try { + KafkaEvents.toEvent(row[event]) + } catch (e: IllegalArgumentException) { + e.printStackTrace() + return null + }?: return null + val dzdata = dez.deserializeData(kev, row[data]) + return PersistentProcessDataMessage( + referenceId = row[referenceId], + eventId = row[eventId], + event = kev, + data = dzdata, + created = row[created], + claimed = row[claimed], + claimedBy = row[claimedBy], + consumed = row[consumed], + lastCheckIn = row[lastCheckIn] + ) +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt index 4dcfaaf0..4490bef9 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt @@ -9,7 +9,7 @@ import java.time.LocalDateTime object events: IntIdTable() { val referenceId: Column = varchar("referenceId", 50) val eventId: Column = varchar("eventId", 50) - val event: Column = varchar("event1",100) + val event: Column = varchar("event",100) val data: Column = text("data") val created: Column = datetime("created").defaultExpression(CurrentDateTime) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt index 77bf86e8..cab0a06a 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt @@ -2,9 +2,22 @@ package no.iktdev.mediaprocessing.shared.common.persistance import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.Column +import org.jetbrains.exposed.sql.javatime.CurrentDateTime +import org.jetbrains.exposed.sql.javatime.datetime +import java.time.LocalDateTime object processerEvents: IntIdTable() { - - val claimed: Column = bool("claimed") + val referenceId: Column = varchar("referenceId", 50) + val claimed: Column = bool("claimed").default(false) + val claimedBy: Column = varchar("claimedBy", 100).nullable() + val event: Column = varchar("event",100) + val eventId: Column = varchar("eventId", 50) val data: Column = text("data") + val consumed: Column = bool("consumed").default(false) + val created: Column = datetime("created").defaultExpression(CurrentDateTime) + val lastCheckIn: Column = datetime("lastCheckIn").nullable() + + init { + uniqueIndex(referenceId, event) + } } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index 73464e69..e88c0f8f 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -2,14 +2,15 @@ package no.iktdev.mediaprocessing.shared.kafka.core import com.google.gson.Gson import com.google.gson.reflect.TypeToken +import mu.KotlinLogging import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import java.lang.reflect.Type -import kotlin.reflect.KClass class DeserializingRegistry { + private val log = KotlinLogging.logger {} + companion object { val deserializables = mutableMapOf( KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class.java, @@ -17,18 +18,19 @@ class DeserializingRegistry { KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class.java, - KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to null, - KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to null, - KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to null, + KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to VideoInfoPerformed::class.java, + KafkaEvents.EVENT_MEDIA_READ_OUT_COVER to CoverInfoPerformed::class.java, + KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java, + KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java, KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null, KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null, - KafkaEvents.EVENT_WORK_ENCODE_CREATED to null, - KafkaEvents.EVENT_WORK_EXTRACT_CREATED to null, + KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java, + KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java, KafkaEvents.EVENT_WORK_CONVERT_CREATED to null, - KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to null, - KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to null, + KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java, + KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java, KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null, KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null, @@ -41,6 +43,9 @@ class DeserializingRegistry { fun deserialize(event: KafkaEvents, json: String): Message { val gson = Gson() val dezClazz = deserializables[event] + if (dezClazz == null) { + log.warn { "${event.event} will be deserialized with default!" } + } dezClazz?.let { eventClass -> try { val type = TypeToken.getParameterized(Message::class.java, eventClass).type @@ -51,7 +56,7 @@ class DeserializingRegistry { } // Fallback val type = object : TypeToken>() {}.type - return gson.fromJson>(json, type) + return gson.fromJson>(json, type) } fun deserializeData(event: KafkaEvents, json: String): MessageDataWrapper { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 792c747b..e334da6c 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -8,6 +8,7 @@ enum class KafkaEvents(val event: String) { EVENT_MEDIA_READ_BASE_INFO_PERFORMED("event:media-read-base-info:performed"), EVENT_MEDIA_METADATA_SEARCH_PERFORMED("event:media-metadata-search:performed"), EVENT_MEDIA_READ_OUT_NAME_AND_TYPE("event:media-read-out-name-and-type:performed"), + EVENT_MEDIA_READ_OUT_COVER("event:media-read-out-cover:performed"), EVENT_MEDIA_ENCODE_PARAMETER_CREATED("event:media-encode-parameter:created"), EVENT_MEDIA_EXTRACT_PARAMETER_CREATED("event:media-extract-parameter:created"), diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt index 8acf5e43..e31f8457 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt @@ -25,7 +25,7 @@ open class KafkaImplementation { config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java - log.info { config } + //log.info { config } return DefaultKafkaProducerFactory(config) } @Bean @@ -43,7 +43,7 @@ open class KafkaImplementation { config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds - log.info { config } + //log.info { config } return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) } } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt index ae993a55..928bd772 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt @@ -1,11 +1,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto -import com.google.gson.Gson -import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.streamit.library.kafka.dto.Status -import java.io.Serializable -import java.lang.reflect.Type -import java.util.* open class MessageDataWrapper( @@ -15,7 +10,7 @@ open class MessageDataWrapper( data class SimpleMessageData( override val status: Status, - override val message: String? + override val message: String? = null ) : MessageDataWrapper(status, message) diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt new file mode 100644 index 00000000..00eaf027 --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt @@ -0,0 +1,17 @@ +package no.iktdev.mediaprocessing.shared.kafka.dto.events_result + +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.streamit.library.kafka.dto.Status + +@KafkaBelongsToEvent( + KafkaEvents.EVENT_WORK_ENCODE_PERFORMED, + KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED +) +data class FfmpegWorkPerformed( + override val status: Status, + override val message: String? = null, + val producedBy: String, + val derivedFromEventId: String +): MessageDataWrapper(status, message) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt index 6f9e7eb2..a033e472 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt @@ -1,17 +1,29 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result +import com.google.gson.Gson +import com.google.gson.JsonObject import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.streamit.library.kafka.dto.Status data class VideoInfoPerformed( override val status: Status, - val info: VideoInfo + val info: JsonObject, + val outDirectory: String ) - : MessageDataWrapper(status) + : MessageDataWrapper(status) { + fun toValueObject(): VideoInfo? { + val type = info.get("type").asString + return when (type) { + "movie" -> Gson().fromJson(info.toString(), MovieInfo::class.java) + "serie" -> Gson().fromJson(info.toString(), EpisodeInfo::class.java) + else -> null + } + } + } data class EpisodeInfo( - override val type: String, + override val type: String = "serie", val title: String, val episode: Int, val season: Int, @@ -20,7 +32,7 @@ data class EpisodeInfo( ): VideoInfo(type, fullName) data class MovieInfo( - override val type: String, + override val type: String = "movie", val title: String, override val fullName: String ) : VideoInfo(type, fullName) @@ -34,4 +46,8 @@ data class SubtitleInfo( open class VideoInfo( @Transient open val type: String, @Transient open val fullName: String -) \ No newline at end of file +) { + fun toJsonObject(): JsonObject { + return Gson().toJsonTree(this).asJsonObject + } +} \ No newline at end of file