WIP - Handover to processer works

This commit is contained in:
Brage 2024-01-03 17:37:56 +01:00
parent 3d119813dd
commit 4e9cdb10a4
52 changed files with 2102 additions and 676 deletions

View File

@ -1,3 +1,6 @@
# Coordinator
Only one instance is supported, while multiple processer's can be run at any time
# FLOW: # FLOW:
### Inputs: ### Inputs:
- File watcher - File watcher

View File

@ -1,9 +1,12 @@
package no.iktdev.mediaprocessing.coordinator package no.iktdev.mediaprocessing.coordinator
import com.google.gson.Gson import com.google.gson.Gson
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.coordination.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -32,12 +35,9 @@ class Coordinator() {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
val listeners = EventBasedMessageListener()
private val listeners: MutableList<TaskCreatorListener> = mutableListOf() private val forwarder = Forwarder()
fun addListener(listener: TaskCreatorListener) {
listeners.add(listener)
}
public fun startProcess(file: File, type: ProcessType) { public fun startProcess(file: File, type: ProcessType) {
val processStartEvent = ProcessStarted( val processStartEvent = ProcessStarted(
@ -48,66 +48,40 @@ class Coordinator() {
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent) producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent)
} }
fun produceEncodeWork(message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(message.referenceId,
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
createdRequest)
}
}
}
fun produceExtractWork(message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
val eventId = UUID.randomUUID().toString()
FfmpegWorkRequestCreated(
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(message.eventId,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
eventId,
createdRequest)
}
val outFile = File(it.outputFile)
ConvertWorkerRequest(
requiresEventId = eventId,
inputFile = it.outputFile,
true,
outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath
).let { createdRequest ->
producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED,
createdRequest)
}
}
}
val io = Coroutines.io() val io = Coroutines.io()
fun readAllUncompletedMessagesInQueue() {
val messages = PersistentDataReader().getUncompletedMessages()
io.launch {
messages.forEach {
delay(1000)
listeners.forwardBatchEventMessagesToListeners(it)
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) { fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getMessagesFor(referenceId) val messages = PersistentDataReader().getMessagesFor(referenceId)
if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) {
log.warn { "EventId ($eventId) for ReferenceId ($referenceId) has not been made available in the database yet." }
io.launch {
val fixedDelay = 1000L
delay(fixedDelay)
var delayed = 0L
var msc = PersistentDataReader().getMessagesFor(referenceId)
while (msc.find { it.eventId == eventId } != null || delayed < 1000*60) {
delayed += fixedDelay
msc = PersistentDataReader().getMessagesFor(referenceId)
}
operationToRunOnMessages(referenceId, eventId, msc)
}
} else {
operationToRunOnMessages(referenceId, eventId, messages)
}
}
fun operationToRunOnMessages(referenceId: String, eventId: String, messages: List<PersistentMessage>) {
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
io.launch { io.launch {
@ -115,6 +89,10 @@ class Coordinator() {
} }
} }
fun getProcessStarted(messages: List<PersistentMessage>): ProcessStarted? {
return messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted
}
suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List<PersistentMessage>) { suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List<PersistentMessage>) {
if (messages.any { it.data is ProcessCompleted }) { if (messages.any { it.data is ProcessCompleted }) {
// TODO: Build and insert into database // TODO: Build and insert into database
@ -127,18 +105,11 @@ class Coordinator() {
log.error { "Could not find $eventId in provided messages" } log.error { "Could not find $eventId in provided messages" }
return return
} }
listeners.forEach { it.onEventReceived(referenceId, triggered, messages) } listeners.forwardEventMessageToListeners(triggered, messages)
if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) { if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) {
val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents(producer = producer, referenceId = referenceId, eventId = eventId, messages = messages)
if (processStarted.type == ProcessType.FLOW) {
log.info { "Process for $referenceId was started from flow and will be processed" }
if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) {
produceEncodeWork(triggered)
} else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
produceExtractWork(triggered)
}
} else { } else {
log.info { "Process for $referenceId was started manually and will require user input for continuation" } log.info { "Process for $referenceId was started manually and will require user input for continuation" }
} }
@ -149,13 +120,115 @@ class Coordinator() {
fun onReady() { fun onReady() {
io.launch { io.launch {
listener.onMessageReceived = { event -> listener.onMessageReceived = { event ->
val success = PersistentDataStore().storeMessage(event.key.event, event.value) val success = PersistentDataStore().storeEventDataMessage(event.key.event, event.value)
if (!success) { if (!success) {
log.error { "Unable to store message: ${event.key.event} in database!" } log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}" }
} else } else
readAllMessagesFor(event.value.referenceId, event.value.eventId) io.launch {
delay(500) // Give the database a few sec to update
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
} }
listener.listen(KafkaEnv.kafkaTopic) } listener.listen(KafkaEnv.kafkaTopic)
}
readAllUncompletedMessagesInQueue()
}
class Forwarder() {
val forwardOnEventReceived = listOf(
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
)
fun hasAnyRequiredEventToCreateProcesserEvents(messages: List<PersistentMessage>): Boolean {
return messages.filter { forwardOnEventReceived.contains(it.event) && it.data.isSuccess() }.map { it.event }.isNotEmpty()
}
fun isMissingEncodeWorkCreated(messages: List<PersistentMessage>): Boolean {
val existingWorkEncodeCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED }
return existingWorkEncodeCreated.isEmpty() && existingWorkEncodeCreated.none { it.data.isSuccess() }
}
fun isMissingExtractWorkCreated(messages: List<PersistentMessage>): Boolean {
val existingWorkCreated = messages.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
return existingWorkCreated.isEmpty() && existingWorkCreated.none { it.data.isSuccess() }
}
fun produceAllMissingProcesserEvents(producer: CoordinatorProducer, referenceId: String, eventId: String, messages: List<PersistentMessage>) {
val currentMessage = messages.find { it.eventId == eventId }
if (!currentMessage?.data.isSuccess()) {
return
}
when (currentMessage?.event) {
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED -> {
if (isMissingEncodeWorkCreated(messages)) {
produceEncodeWork(producer, currentMessage)
}
}
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED -> {
if (isMissingExtractWorkCreated(messages)) {
produceExtractWork(producer, currentMessage)
}
}
else -> {}
}
}
fun produceEncodeWork(producer: CoordinatorProducer, message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(message.referenceId,
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
eventId = message.eventId,
createdRequest)
}
}
}
fun produceExtractWork(producer: CoordinatorProducer, message: PersistentMessage) {
if (message.event != KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
throw RuntimeException("Incorrect event passed ${message.event}")
}
if (message.data !is FfmpegWorkerArgumentsCreated) {
throw RuntimeException("Invalid data passed:\n${Gson().toJson(message)}")
}
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
).let { createdRequest ->
producer.sendMessage(message.referenceId,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
eventId = message.eventId,
createdRequest
)
}
val outFile = File(it.outputFile)
ConvertWorkerRequest(
requiresEventId = message.eventId,
inputFile = it.outputFile,
true,
outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath
).let { createdRequest ->
producer.sendMessage(message.referenceId, KafkaEvents.EVENT_WORK_CONVERT_CREATED,
createdRequest)
}
}
}
} }
} }

View File

@ -1,8 +0,0 @@
package no.iktdev.mediaprocessing.coordinator
import org.springframework.stereotype.Service
@Service
class MessageOperator {
}

View File

@ -1,16 +1,97 @@
package no.iktdev.mediaprocessing.coordinator package no.iktdev.mediaprocessing.coordinator
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.coordination.Tasks
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import javax.annotation.PostConstruct
abstract class TaskCreator: TaskCreatorListener { abstract class TaskCreator: TaskCreatorListener {
val log = KotlinLogging.logger {}
abstract val producesEvent: KafkaEvents
@Autowired @Autowired
lateinit var producer: CoordinatorProducer lateinit var producer: CoordinatorProducer
open fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean {
return true @Autowired
lateinit var coordinator: Coordinator
open val requiredEvents: List<KafkaEvents> = listOf()
open val listensForEvents: List<KafkaEvents> = listOf()
open fun isPrerequisiteEventsOk(events: List<PersistentMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
} }
open fun isPrerequisiteDataPresent(events: List<PersistentMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
open fun isEventOfSingle(event: PersistentMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
fun getListener(): Tasks {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}
open fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
open fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> {
return listOf()
}
private val context: MutableMap<String, Any> = mutableMapOf()
private val context_key_reference = "reference"
private val context_key_producesEvent = "event"
final override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
context[context_key_reference] = referenceId
getListener().producesEvent.let {
context[context_key_producesEvent] = it
}
if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) {
val result = onProcessEvents(event, events)
if (result != null) {
onResult(result)
}
} else {
// TODO: Re-enable this
// log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" }
}
}
abstract fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper?
private fun onResult(data: MessageDataWrapper) {
producer.sendMessage(
referenceId = context[context_key_reference] as String,
event = context[context_key_producesEvent] as KafkaEvents,
data = data
)
}
@PostConstruct
fun postConstruct() {
coordinator.listeners.add(getListener())
}
}
fun interface Prerequisite {
fun execute(value: Any): Boolean
} }
interface TaskCreatorListener { interface TaskCreatorListener {

View File

@ -0,0 +1,63 @@
package no.iktdev.mediaprocessing.coordinator.coordination
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
class EventBasedMessageListener {
private val listeners: MutableList<Tasks> = mutableListOf()
fun add(produces: KafkaEvents, listener: TaskCreatorListener) {
listeners.add(Tasks(producesEvent = produces, taskHandler = listener))
}
fun add(task: Tasks) {
listeners.add(task)
}
private fun waitingListeners(events: List<PersistentMessage>): List<Tasks> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
private fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List<Tasks>): List<Tasks> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
/**
* This will be called in sequence, thus some messages might be made a duplicate of.
*/
fun forwardEventMessageToListeners(newEvent: PersistentMessage, events: List<PersistentMessage>) {
val waitingListeners = waitingListeners(events)
val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners)
availableListeners.forEach {
try {
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
/**
* This will be called with all messages at once, thus it should reflect kafka topic and database
*/
fun forwardBatchEventMessagesToListeners(events: List<PersistentMessage>) {
val waitingListeners = waitingListeners(events)
waitingListeners.forEach {
try {
val last = events.last()
it.taskHandler.onEventReceived(last.referenceId, last, events)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
data class Tasks(
val producesEvent: KafkaEvents,
val listensForEvents: List<KafkaEvents> = listOf(),
val taskHandler: TaskCreatorListener
)

View File

@ -1,5 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.coordination
class MessageSequence {
}

View File

@ -0,0 +1,11 @@
package no.iktdev.mediaprocessing.coordinator.coordination
/**
* Class to handle messages from websockets, produced by Processer instances.
* This is due to keep a overview of progress by processer
*/
class ProcesserSocketMessageListener {
}

View File

@ -1,57 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.reader
import kotlinx.coroutines.launch
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class BaseInfoFromFile(@Autowired var coordinator: Coordinator): ProcessingService() {
override fun onResult(referenceId: String, data: MessageDataWrapper) {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data)
}
override fun onReady() {
coordinator.addListener(object : TaskCreatorListener {
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) {
io.launch {
val result = readFileInfo(event.data as ProcessStarted)
onResult(referenceId, result)
}
}
}
})
}
fun readFileInfo(started: ProcessStarted): MessageDataWrapper {
val result = try {
val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName)
BaseInfoPerformed(
Status.COMPLETED,
title = fileNameParser.guessDesiredTitle(),
sanitizedName = fileNameParser.guessDesiredFileName()
)
} catch (e: Exception) {
e.printStackTrace()
MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file")
}
return result
}
}

View File

@ -1,17 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.reader
import no.iktdev.exfl.coroutines.Coroutines
class MediaStreamsAnalyze {
val io = Coroutines.io()
/*
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) {
if (event.value.data?.status == Status.COMPLETED) {
}
}
}*/
}

View File

@ -0,0 +1,51 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
@Service
class BaseInfoFromFile() : TaskCreator() {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED
override val requiredEvents: List<KafkaEvents> = listOf(KafkaEvents.EVENT_PROCESS_STARTED)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
return readFileInfo(event.data as ProcessStarted)
}
fun readFileInfo(started: ProcessStarted): MessageDataWrapper {
val result = try {
val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName)
BaseInfoPerformed(
Status.COMPLETED,
title = fileNameParser.guessDesiredTitle(),
sanitizedName = fileNameParser.guessDesiredFileName()
)
} catch (e: Exception) {
e.printStackTrace()
MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file")
}
return result
}
}

View File

@ -0,0 +1,52 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
@Service
class MetadataAndBaseInfoToCoverTask : TaskCreator() {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null
val fileOut = events.findLast { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed? ?: return null
val coverUrl = meta?.data?.cover
return if (coverUrl.isNullOrBlank()) {
log.warn { "No cover available for ${baseInfo.title}" }
null
} else {
CoverInfoPerformed(
status = Status.COMPLETED,
url = coverUrl,
outFileBaseName = baseInfo.title,
outDir = fileOut.outDirectory
)
}
}
}

View File

@ -1,8 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
@ -11,11 +9,12 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -26,35 +25,40 @@ import java.time.LocalDateTime
*/ */
@Service @Service
@EnableScheduling @EnableScheduling
class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreator() { class MetadataAndBaseInfoToFileOut(): TaskCreator() {
private val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents
init { get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
coordinator.addListener(this)
}
val waitingProcessesForMeta: MutableMap<String, LocalDateTime> = mutableMapOf() val waitingProcessesForMeta: MutableMap<String, LocalDateTime> = mutableMapOf()
override val listensForEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED
)
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
if (!listOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED).contains(event.event)) { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
return
}
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed?
// Only Return here as both baseInfo events are required to continue // Only Return here as both baseInfo events are required to continue
if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) { if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) {
return return null
} }
if (baseInfo.isSuccess() && meta == null) { if (baseInfo.isSuccess() && meta == null) {
log.info { "Sending ${baseInfo?.title} to waiting queue" } log.info { "Sending ${baseInfo?.title} to waiting queue" }
if (!waitingProcessesForMeta.containsKey(referenceId)) { if (!waitingProcessesForMeta.containsKey(event.referenceId)) {
waitingProcessesForMeta[referenceId] = LocalDateTime.now() waitingProcessesForMeta[event.referenceId] = LocalDateTime.now()
} }
return return null
} }
baseInfo ?: return // Return if baseInfo is null if (!isPrerequisiteDataPresent(events)) {
return null
}
baseInfo ?: return null // Return if baseInfo is null
val metaContentType: String? = if (meta.isSuccess()) meta?.data?.type else null val metaContentType: String? = if (meta.isSuccess()) meta?.data?.type else null
val contentType = when (metaContentType) { val contentType = when (metaContentType) {
@ -64,46 +68,21 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord
} }
val fileDeterminate = FileNameDeterminate(baseInfo.title, baseInfo.sanitizedName, contentType) val fileDeterminate = FileNameDeterminate(baseInfo.title, baseInfo.sanitizedName, contentType)
if (waitingProcessesForMeta.containsKey(referenceId)) { if (waitingProcessesForMeta.containsKey(event.referenceId)) {
waitingProcessesForMeta.remove(referenceId) waitingProcessesForMeta.remove(event.referenceId)
} }
val outputDirectory = SharedConfig.outgoingContent.using(baseInfo.title) val outputDirectory = SharedConfig.outgoingContent.using(baseInfo.title)
val vi = fileDeterminate.getDeterminedVideoInfo() val vi = fileDeterminate.getDeterminedVideoInfo()?.toJsonObject()
if (vi != null) { return if (vi != null) {
producer.sendMessage( VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath)
referenceId,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE,
data = VideoInfoPerformed(Status.COMPLETED, vi)
)
} else { } else {
producer.sendMessage( MessageDataWrapper(Status.ERROR, "No VideoInfo found...")
referenceId,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE,
data = MessageDataWrapper(Status.ERROR, "No VideoInfo found...")
)
} }
val coverUrl = meta?.data?.cover
if (coverUrl.isNullOrBlank()) {
log.warn { "No cover available for ${baseInfo.title}" }
} else {
producer.sendMessage(
referenceId,
KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED,
CoverInfoPerformed(
status = Status.COMPLETED,
url = coverUrl,
outFileBaseName = baseInfo.title,
outDir = outputDirectory.absolutePath
)
)
}
} }
//@Scheduled(fixedDelay = (60_000)) //@Scheduled(fixedDelay = (60_000))
@Scheduled(fixedDelay = (1_000)) @Scheduled(fixedDelay = (1_000))
fun sendErrorMessageForMetadata() { fun sendErrorMessageForMetadata() {
@ -112,7 +91,7 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord
} }
expired.forEach { expired.forEach {
log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOutAndCoverTask::class.simpleName}")) producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}"))
waitingProcessesForMeta.remove(it.key) waitingProcessesForMeta.remove(it.key)
} }
} }

View File

@ -1,299 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
/**
* Is to be called or to run with the result from FileOout
*/
@Service
class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : TaskCreator() {
private val log = KotlinLogging.logger {}
init {
coordinator.addListener(this)
}
override fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean {
val required = listOf(
KafkaEvents.EVENT_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
)
val currentEvents = events.map { it.event }
val hasAllRequiredEvents = required.all { currentEvents.contains(it) }
val hasAllRequiredData = events.filter { e -> e.event in required }.all { it.data.isSuccess() }
return hasAllRequiredData && hasAllRequiredEvents
}
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
val preference = Preference.getPreference()
if (!isPrerequisitesOk(events)) {
return
}
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed
val serializedParsedStreams = readStreamsEvent.streams
val outDir = SharedConfig.outgoingContent.using(baseInfo.title)
getFfmpegVideoArguments(
inputFile = inputFile.file,
outDir = outDir,
preference = preference.encodePreference,
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
).let { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, it) }
getFfmpegSubtitleArguments(
inputFile = inputFile.file,
outDir = outDir,
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
).let { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED, it) }
}
private fun getFfmpegVideoArguments(
inputFile: String,
outDir: File,
preference: EncodingPreference,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
): MessageDataWrapper {
val outVideoFile = outDir.using("${baseInfo.sanitizedName}.mp4").absolutePath
val vaas = VideoAndAudioSelector(serializedParsedStreams, preference)
val vArg = vaas.getVideoStream()?.let { VideoArguments(it, serializedParsedStreams, preference.video).getVideoArguments() }
val aArg = vaas.getAudioStream()?.let { AudioArguments(it, serializedParsedStreams, preference.audio).getAudioArguments() }
val vaArgs = toFfmpegWorkerArguments(vArg, aArg)
return if (vaArgs.isEmpty()) {
MessageDataWrapper(Status.ERROR, message = "Unable to produce arguments")
} else {
FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
inputFile = inputFile,
entries = listOf(FfmpegWorkerArgument(
outputFile = outVideoFile,
arguments = vaArgs
))
)
}
}
private fun getFfmpegSubtitleArguments(
inputFile: String,
outDir: File,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
): MessageDataWrapper {
val subRootDir = outDir.using("sub")
val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments()
val entries = sArg.mapNotNull {
FfmpegWorkerArgument(
arguments = it.codecParameters + it.optionalParameters + listOf("-map", "0:s:${it.index}"),
outputFile = subRootDir.using(it.language, "${baseInfo.sanitizedName}.${it.format}").absolutePath
)
}
return FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
inputFile = inputFile,
entries = entries
)
}
private class VideoAndAudioSelector(val mediaStreams: ParsedMediaStreams, val preference: EncodingPreference) {
private var defaultVideoSelected: VideoStream? = mediaStreams.videoStream
.filter { (it.duration_ts ?: 0) > 0 }
.maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.videoStream.minByOrNull { it.index }
private var defaultAudioSelected: AudioStream? = mediaStreams.audioStream
.filter { (it.duration_ts ?: 0) > 0 }
.maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.audioStream.minByOrNull { it.index }
fun getVideoStream(): VideoStream? {
return defaultVideoSelected
}
fun getAudioStream(): AudioStream? {
val languageFiltered = mediaStreams.audioStream.filter { it.tags.language == preference.audio.language }
val channeledAndCodec = languageFiltered.find {
it.channels >= (preference.audio.channels ?: 2) && it.codec_name == preference.audio.codec.lowercase()
}
return channeledAndCodec ?: return languageFiltered.minByOrNull { it.index } ?: defaultAudioSelected
}
}
private class VideoArguments(val videoStream: VideoStream, val allStreams: ParsedMediaStreams, val preference: VideoPreference) {
fun isVideoCodecEqual() = getCodec(videoStream.codec_name) == getCodec(preference.codec.lowercase())
protected fun getCodec(name: String): String {
return when (name) {
"hevc", "hevec", "h265", "h.265", "libx265"
-> "libx265"
"h.264", "h264", "libx264"
-> "libx264"
else -> name
}
}
fun getVideoArguments(): VideoArgumentsDto {
val optionalParams = mutableListOf<String>()
if (preference.pixelFormatPassthrough.none { it == videoStream.pix_fmt }) {
optionalParams.addAll(listOf("-pix_fmt", preference.pixelFormat))
}
val codecParams = if (isVideoCodecEqual()) listOf("-vcodec", "copy")
else {
optionalParams.addAll(listOf("-crf", preference.threshold.toString()))
listOf("-c:v", getCodec(preference.codec.lowercase()))
}
return VideoArgumentsDto(
index = allStreams.videoStream.indexOf(videoStream),
codecParameters = codecParams,
optionalParameters = optionalParams
)
}
}
private class AudioArguments(val audioStream: AudioStream, val allStreams: ParsedMediaStreams, val preference: AudioPreference) {
fun isAudioCodecEqual() = audioStream.codec_name.lowercase() == preference.codec.lowercase()
private fun shouldUseEAC3(): Boolean {
return (preference.defaultToEAC3OnSurroundDetected && audioStream.channels > 2 && audioStream.codec_name.lowercase() != "eac3")
}
fun getAudioArguments(): AudioArgumentsDto {
val optionalParams = mutableListOf<String>()
val codecParams = if (shouldUseEAC3())
listOf("-c:a", "eac3")
else if (!isAudioCodecEqual()) {
listOf("-c:a", preference.codec)
} else
listOf("-acodec", "copy")
return AudioArgumentsDto(
index = allStreams.audioStream.indexOf(audioStream),
codecParameters = codecParams,
optionalParameters = optionalParams
)
}
}
private class SubtitleArguments(val subtitleStreams: List<SubtitleStream>) {
/**
* @property DEFAULT is default subtitle as dialog
* @property CC is Closed-Captions
* @property SHD is Hard of hearing
* @property NON_DIALOGUE is for Signs or Song (as in lyrics)
*/
private enum class SubtitleType {
DEFAULT,
CC,
SHD,
NON_DIALOGUE
}
private fun SubtitleStream.isCC(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("cc", "closed caption")
return keywords.any { title.contains(it) }
}
private fun SubtitleStream.isSHD(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing")
return keywords.any { title.contains(it) }
}
private fun SubtitleStream.isSignOrSong(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("song", "songs", "sign", "signs")
return keywords.any { title.contains(it) }
}
private fun getSubtitleType(stream: SubtitleStream): SubtitleType {
return if (stream.isSignOrSong())
SubtitleType.NON_DIALOGUE
else if (stream.isSHD()) {
SubtitleType.SHD
} else if (stream.isCC()) {
SubtitleType.CC
} else SubtitleType.DEFAULT
}
fun getSubtitleArguments(): List<SubtitleArgumentsDto> {
val acceptable = subtitleStreams.filter { !it.isSignOrSong() }
val codecFiltered = acceptable.filter { getFormatToCodec(it.codec_name) != null }
val mappedToType = codecFiltered.map { getSubtitleType(it) to it }.filter { it.first in SubtitleType.entries }
.groupBy { it.second.tags.language ?: "eng" }
.mapValues { entry ->
val languageStreams = entry.value
val sortedStreams = languageStreams.sortedBy { SubtitleType.entries.indexOf(it.first) }
sortedStreams.firstOrNull()?.second
}.mapNotNull { it.value }
return mappedToType.mapNotNull { stream ->
getFormatToCodec(stream.codec_name)?.let { format ->
SubtitleArgumentsDto(
index = subtitleStreams.indexOf(stream),
language = stream.tags.language ?: "eng",
format = format
)
}
}
}
fun getFormatToCodec(codecName: String): String? {
return when(codecName) {
"ass" -> "ass"
"subrip" -> "srt"
"webvtt", "vtt" -> "vtt"
"smi" -> "smi"
"hdmv_pgs_subtitle" -> null
else -> null
}
}
}
private fun toFfmpegWorkerArguments(
videoArguments: VideoArgumentsDto?,
audioArguments: AudioArgumentsDto?
): List<String> {
val arguments = mutableListOf<String>(
*videoArguments?.codecParameters?.toTypedArray() ?: arrayOf(),
*videoArguments?.optionalParameters?.toTypedArray() ?: arrayOf(),
*audioArguments?.codecParameters?.toTypedArray() ?: arrayOf(),
*audioArguments?.optionalParameters?.toTypedArray() ?: arrayOf()
)
videoArguments?.index?.let {
arguments.addAll(listOf("-map", "0:v:$it"))
}
audioArguments?.index?.let {
arguments.addAll(listOf("-map", "0:a:$it"))
}
return arguments
}
}

View File

@ -1,48 +1,41 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson import com.google.gson.Gson
import kotlinx.coroutines.launch import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
class ParseVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { class ParseVideoFileStreams() : TaskCreator() {
override fun onResult(referenceId: String, data: MessageDataWrapper) { override val producesEvent: KafkaEvents
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data) get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
} }
override fun onReady() { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
coordinator.addListener(object : TaskCreatorListener { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (event.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED && event.data.isSuccess()) {
io.launch {
val result = parseStreams(event.data as ReaderPerformed)
onResult(referenceId, result)
}
}
}
}) return parseStreams(event.data as ReaderPerformed)
} }
fun parseStreams(data: ReaderPerformed): MessageDataWrapper { fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
val gson = Gson() val gson = Gson()
return try { return try {

View File

@ -1,45 +1,48 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.JsonObject import com.google.gson.JsonObject
import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@Service @Service
class ReadVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { class ReadVideoFileStreams(): TaskCreator() {
override fun onResult(referenceId: String, data: MessageDataWrapper) { override val producesEvent: KafkaEvents
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data) get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_PROCESS_STARTED
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
} }
override fun onReady() { override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> {
coordinator.addListener(object : TaskCreatorListener { return listOf {
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) { isEventOfSingle(event, KafkaEvents.EVENT_PROCESS_STARTED)
if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) { }
io.launch { }
val result = fileReadStreams(event.data as ProcessStarted)
onResult(referenceId, result)
}
}
}
}) override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
return runBlocking { fileReadStreams(event.data as ProcessStarted) }
} }
suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper {

View File

@ -0,0 +1,173 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
@Service
class EncodeArgumentCreatorTask : TaskCreator() {
val preference = Preference.getPreference()
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED
override val requiredEvents: List<KafkaEvents> =
listOf(
KafkaEvents.EVENT_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed
val serializedParsedStreams = readStreamsEvent.streams
val outDir = SharedConfig.outgoingContent.using(baseInfo.title)
return getFfmpegVideoArguments(
inputFile = inputFile.file,
outDir = outDir,
preference = preference.encodePreference,
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
)
}
private fun getFfmpegVideoArguments(
inputFile: String,
outDir: File,
preference: EncodingPreference,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
): MessageDataWrapper {
val outVideoFile = outDir.using("${baseInfo.sanitizedName}.mp4").absolutePath
val vaas = VideoAndAudioSelector(serializedParsedStreams, preference)
val vArg = vaas.getVideoStream()
?.let { VideoArguments(it, serializedParsedStreams, preference.video).getVideoArguments() }
val aArg = vaas.getAudioStream()
?.let { AudioArguments(it, serializedParsedStreams, preference.audio).getAudioArguments() }
val vaArgs = toFfmpegWorkerArguments(vArg, aArg)
return if (vaArgs.isEmpty()) {
MessageDataWrapper(Status.ERROR, message = "Unable to produce arguments")
} else {
FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
inputFile = inputFile,
entries = listOf(
FfmpegWorkerArgument(
outputFile = outVideoFile,
arguments = vaArgs
)
)
)
}
}
private class VideoAndAudioSelector(val mediaStreams: ParsedMediaStreams, val preference: EncodingPreference) {
private var defaultVideoSelected: VideoStream? = mediaStreams.videoStream
.filter { (it.duration_ts ?: 0) > 0 }
.maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.videoStream.minByOrNull { it.index }
private var defaultAudioSelected: AudioStream? = mediaStreams.audioStream
.filter { (it.duration_ts ?: 0) > 0 }
.maxByOrNull { it.duration_ts ?: 0 } ?: mediaStreams.audioStream.minByOrNull { it.index }
fun getVideoStream(): VideoStream? {
return defaultVideoSelected
}
fun getAudioStream(): AudioStream? {
val languageFiltered = mediaStreams.audioStream.filter { it.tags.language == preference.audio.language }
val channeledAndCodec = languageFiltered.find {
it.channels >= (preference.audio.channels ?: 2) && it.codec_name == preference.audio.codec.lowercase()
}
return channeledAndCodec ?: return languageFiltered.minByOrNull { it.index } ?: defaultAudioSelected
}
}
private class VideoArguments(
val videoStream: VideoStream,
val allStreams: ParsedMediaStreams,
val preference: VideoPreference
) {
fun isVideoCodecEqual() = getCodec(videoStream.codec_name) == getCodec(preference.codec.lowercase())
protected fun getCodec(name: String): String {
return when (name) {
"hevc", "hevec", "h265", "h.265", "libx265"
-> "libx265"
"h.264", "h264", "libx264"
-> "libx264"
else -> name
}
}
fun getVideoArguments(): VideoArgumentsDto {
val optionalParams = mutableListOf<String>()
if (preference.pixelFormatPassthrough.none { it == videoStream.pix_fmt }) {
optionalParams.addAll(listOf("-pix_fmt", preference.pixelFormat))
}
val codecParams = if (isVideoCodecEqual()) listOf("-vcodec", "copy")
else {
optionalParams.addAll(listOf("-crf", preference.threshold.toString()))
listOf("-c:v", getCodec(preference.codec.lowercase()))
}
return VideoArgumentsDto(
index = allStreams.videoStream.indexOf(videoStream),
codecParameters = codecParams,
optionalParameters = optionalParams
)
}
}
private class AudioArguments(
val audioStream: AudioStream,
val allStreams: ParsedMediaStreams,
val preference: AudioPreference
) {
fun isAudioCodecEqual() = audioStream.codec_name.lowercase() == preference.codec.lowercase()
private fun shouldUseEAC3(): Boolean {
return (preference.defaultToEAC3OnSurroundDetected && audioStream.channels > 2 && audioStream.codec_name.lowercase() != "eac3")
}
fun getAudioArguments(): AudioArgumentsDto {
val optionalParams = mutableListOf<String>()
val codecParams = if (shouldUseEAC3())
listOf("-c:a", "eac3")
else if (!isAudioCodecEqual()) {
listOf("-c:a", preference.codec)
} else
listOf("-acodec", "copy")
return AudioArgumentsDto(
index = allStreams.audioStream.indexOf(audioStream),
codecParameters = codecParams,
optionalParameters = optionalParams
)
}
}
}

View File

@ -0,0 +1,165 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.*
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
@Service
class ExtractArgumentCreatorTask : TaskCreator() {
val preference = Preference.getPreference()
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
if (!requiredEvents.contains(event.event)) {
log.info { "${this.javaClass.simpleName} ignores ${event.event}@${event.eventId}" }
return null
}
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed
val serializedParsedStreams = readStreamsEvent.streams
val outDir = SharedConfig.outgoingContent.using(baseInfo.title)
return getFfmpegSubtitleArguments(
inputFile = inputFile.file,
outDir = outDir,
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
)
}
private fun getFfmpegSubtitleArguments(
inputFile: String,
outDir: File,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
): MessageDataWrapper {
val subRootDir = outDir.using("sub")
val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments()
val entries = sArg.map {
FfmpegWorkerArgument(
arguments = it.codecParameters + it.optionalParameters + listOf("-map", "0:s:${it.index}"),
outputFile = subRootDir.using(it.language, "${baseInfo.sanitizedName}.${it.format}").absolutePath
)
}
return FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
inputFile = inputFile,
entries = entries
)
}
private class SubtitleArguments(val subtitleStreams: List<SubtitleStream>) {
/**
* @property DEFAULT is default subtitle as dialog
* @property CC is Closed-Captions
* @property SHD is Hard of hearing
* @property NON_DIALOGUE is for Signs or Song (as in lyrics)
*/
private enum class SubtitleType {
DEFAULT,
CC,
SHD,
NON_DIALOGUE
}
private fun SubtitleStream.isCC(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("cc", "closed caption")
return keywords.any { title.contains(it) }
}
private fun SubtitleStream.isSHD(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("shd", "hh", "Hard-of-Hearing", "Hard of Hearing")
return keywords.any { title.contains(it) }
}
private fun SubtitleStream.isSignOrSong(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("song", "songs", "sign", "signs")
return keywords.any { title.contains(it) }
}
private fun getSubtitleType(stream: SubtitleStream): SubtitleType {
return if (stream.isSignOrSong())
SubtitleType.NON_DIALOGUE
else if (stream.isSHD()) {
SubtitleType.SHD
} else if (stream.isCC()) {
SubtitleType.CC
} else SubtitleType.DEFAULT
}
fun getSubtitleArguments(): List<SubtitleArgumentsDto> {
val acceptable = subtitleStreams.filter { !it.isSignOrSong() }
val codecFiltered = acceptable.filter { getFormatToCodec(it.codec_name) != null }
val mappedToType =
codecFiltered.map { getSubtitleType(it) to it }.filter { it.first in SubtitleType.entries }
.groupBy { it.second.tags.language ?: "eng" }
.mapValues { entry ->
val languageStreams = entry.value
val sortedStreams = languageStreams.sortedBy { SubtitleType.entries.indexOf(it.first) }
sortedStreams.firstOrNull()?.second
}.mapNotNull { it.value }
return mappedToType.mapNotNull { stream ->
getFormatToCodec(stream.codec_name)?.let { format ->
SubtitleArgumentsDto(
index = subtitleStreams.indexOf(stream),
language = stream.tags.language ?: "eng",
format = format
)
}
}
}
fun getFormatToCodec(codecName: String): String? {
return when (codecName) {
"ass" -> "ass"
"subrip" -> "srt"
"webvtt", "vtt" -> "vtt"
"smi" -> "smi"
"hdmv_pgs_subtitle" -> null
else -> null
}
}
}
}

View File

@ -0,0 +1,23 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioArgumentsDto
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoArgumentsDto
fun toFfmpegWorkerArguments(
videoArguments: VideoArgumentsDto?,
audioArguments: AudioArgumentsDto?
): List<String> {
val arguments = mutableListOf<String>(
*videoArguments?.codecParameters?.toTypedArray() ?: arrayOf(),
*videoArguments?.optionalParameters?.toTypedArray() ?: arrayOf(),
*audioArguments?.codecParameters?.toTypedArray() ?: arrayOf(),
*audioArguments?.optionalParameters?.toTypedArray() ?: arrayOf()
)
videoArguments?.index?.let {
arguments.addAll(listOf("-map", "0:v:$it"))
}
audioArguments?.index?.let {
arguments.addAll(listOf("-map", "0:a:$it"))
}
return arguments
}

View File

@ -20,14 +20,22 @@ repositories {
} }
} }
val exposedVersion = "0.44.0"
dependencies { dependencies {
/*Spring boot*/ /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:2.7.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1") // implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation ("mysql:mysql-connector-java:8.0.29")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.google.code.gson:gson:2.8.9") implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307") implementation("org.json:json:20210307")
@ -37,10 +45,13 @@ dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.4.1")
//implementation(project(mapOf("path" to ":shared:kafka"))) //implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common"))) implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:kafka")))

View File

@ -0,0 +1,86 @@
package no.iktdev.mediaprocessing.processer
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service
@EnableScheduling
class Coordinator() {
@Autowired
private lateinit var producer: CoordinatorProducer
@Autowired
private lateinit var listener: DefaultMessageListener
private val log = KotlinLogging.logger {}
val listeners = EventBasedMessageListener()
val io = Coroutines.io()
fun readAllAvailableInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistance(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
}
fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List<PersistentProcessDataMessage>) {
val triggered = messages.find { it.eventId == eventId }
if (triggered == null) {
log.error { "Could not find $eventId in provided messages" }
return
}
listeners.forwardEventMessageToListeners(triggered, messages)
}
val processKafkaEvents = listOf(
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
)
@PostConstruct
fun onReady() {
io.launch {
listener.onMessageReceived = { event ->
if (event.key in processKafkaEvents) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" }
} else
readAllMessagesFor(event.value.referenceId, event.value.eventId)
} else if (event.key in listOf(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED, KafkaEvents.EVENT_WORK_ENCODE_SKIPPED)) {
readAllAvailableInQueue()
} else {
log.debug { "Skipping ${event.key}" }
}
}
listener.listen(KafkaEnv.kafkaTopic)
}
readAllAvailableInQueue()
}
}

View File

@ -1,20 +0,0 @@
package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import org.springframework.stereotype.Service
@Service
class EncodeService {
/*private val log = KotlinLogging.logger {}
val io = Coroutines.io()
val producer = CoordinatorProducer()
private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
if (event.key == KafkaEvents.EVENT_WORK_ENCODE_CREATED) {
}
}*/
}

View File

@ -0,0 +1,37 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
class EventBasedMessageListener {
private val listeners: MutableList<Tasks> = mutableListOf()
fun add(produces: KafkaEvents, listener: TaskCreatorListener) {
listeners.add(Tasks(produces, listener))
}
fun add(task: Tasks) {
listeners.add(task)
}
private fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
fun forwardEventMessageToListeners(newEvent: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>) {
waitingListeners(events).forEach {
try {
it.taskHandler.onEventReceived(newEvent.referenceId, newEvent, events)
} catch (e: Exception) {
e.printStackTrace()
}
}
}
}
data class Tasks(
val producesEvent: KafkaEvents,
val taskHandler: TaskCreatorListener
)

View File

@ -1,8 +0,0 @@
package no.iktdev.mediaprocessing.processer
import org.springframework.stereotype.Service
@Service
class ExtractService {
}

View File

@ -0,0 +1,16 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@Configuration
class SocketLocalInit: SocketImplementation()
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -1,7 +1,10 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
@ -13,10 +16,23 @@ class ProcesserApplication {
} }
fun main(args: Array<String>) { fun main(args: Array<String>) {
//val dataSource = MySqlDataSource.fromDatabaseEnv(); val dataSource = MySqlDataSource.fromDatabaseEnv()
Coroutines.default().launch {
dataSource.createDatabase()
dataSource.createTables(
processerEvents
)
}
val context = runApplication<ProcesserApplication>(*args) val context = runApplication<ProcesserApplication>(*args)
} }
fun getComputername(): String {
return listOfNotNull(
System.getenv("hostname"),
System.getenv("computername")
).first()
}
class SocketImplemented: SocketImplementation() { class SocketImplemented: SocketImplementation() {
} }

View File

@ -0,0 +1,16 @@
package no.iktdev.streamit.content.encode
import no.iktdev.exfl.using
import java.io.File
class ProcesserEnv {
companion object {
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false
val maxEncodeRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1}
val maxExtractRunners: Int = try {System.getenv("SIMULTANEOUS_EXTRACT_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1}
val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else
File("data").using("logs")
}
}

View File

@ -0,0 +1,92 @@
package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.simp.SimpMessagingTemplate
import javax.annotation.PostConstruct
abstract class TaskCreator: TaskCreatorListener {
private val log = KotlinLogging.logger {}
@Autowired
lateinit var producer: CoordinatorProducer
@Autowired
lateinit var coordinator: Coordinator
@Autowired
lateinit var socketMessage: SimpMessagingTemplate
open val requiredEvents: List<KafkaEvents> = listOf()
open fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
open fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events.filter { e -> e.event in requiredEvents }.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
open fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
abstract fun getListener(): Tasks
open fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
private val context: MutableMap<String, Any> = mutableMapOf()
private val context_key_reference = "reference"
private val context_key_producesEvent = "event"
final override fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>) {
context[context_key_reference] = referenceId
getListener().producesEvent.let {
context[context_key_producesEvent] = it
}
if (prerequisitesRequired(events).all { it.invoke() }) {
val result = onProcessEvents(event, events)
if (result != null) {
onResult(result)
}
} else {
log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" }
}
}
abstract fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper?
private fun onResult(data: MessageDataWrapper) {
producer.sendMessage(
referenceId = context[context_key_reference] as String,
event = context[context_key_producesEvent] as KafkaEvents,
data = data
)
}
@PostConstruct
fun postConstruct() {
coordinator.listeners.add(getListener())
}
}
fun interface Prerequisite {
fun execute(value: Any): Boolean
}
interface TaskCreatorListener {
fun onEventReceived(referenceId: String, event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): Unit
}

View File

@ -0,0 +1,12 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
data class FfmpegDecodedProgress(
val progress: Int = -1,
val time: String,
val duration: String,
val speed: String,
val estimatedCompletionSeconds: Long = -1,
val estimatedCompletion: String = "Unknown",
)
data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0)

View File

@ -0,0 +1,157 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import java.lang.StringBuilder
import java.time.LocalTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.TimeUnit
import kotlin.math.floor
class FfmpegProgressDecoder {
data class DecodedProgressData(
val frame: Int?,
val fps: Double?,
val stream_0_0_q: Double?,
val bitrate: String?,
val total_size: Int?,
val out_time_us: Long?,
val out_time_ms: Long?,
val out_time: String?,
val dup_frames: Int?,
val drop_frames: Int?,
val speed: Double?,
val progress: String?
)
val expectedKeys = listOf<String>(
"frame=",
"fps=",
"stream_0_0_q=",
"bitrate=",
"total_size=",
"out_time_us=",
"out_time_ms=",
"out_time=",
"dup_frames=",
"drop_frames=",
"speed=",
"progress="
)
var duration: Int? = null
set(value) {
if (field == null || field == 0)
field = value
}
var durationTime: String = "NA"
fun parseVideoProgress(lines: List<String>): DecodedProgressData? {
var frame: Int? = null
var progress: String? = null
val metadataMap = mutableMapOf<String, String>()
try {
val eqValue = Regex("=")
for (line in lines) {
val keyValuePairs = Regex("=\\s*").replace(line, "=").split(" ").filter { it.isNotBlank() }.filter { eqValue.containsMatchIn(it) }
for (keyValuePair in keyValuePairs) {
val (key, value) = keyValuePair.split("=")
metadataMap[key] = value
}
if (frame == null) {
frame = metadataMap["frame"]?.toIntOrNull()
}
progress = metadataMap["progress"]
}
} catch (e: Exception) {
e.printStackTrace()
}
return if (progress != null) {
// When "progress" is found, build and return the VideoMetadata object
DecodedProgressData(
frame, metadataMap["fps"]?.toDoubleOrNull(), metadataMap["stream_0_0_q"]?.toDoubleOrNull(),
metadataMap["bitrate"], metadataMap["total_size"]?.toIntOrNull(), metadataMap["out_time_us"]?.toLongOrNull(),
metadataMap["out_time_ms"]?.toLongOrNull(), metadataMap["out_time"], metadataMap["dup_frames"]?.toIntOrNull(),
metadataMap["drop_frames"]?.toIntOrNull(), metadataMap["speed"]?.replace("x", "", ignoreCase = true)?.toDoubleOrNull(), progress
)
} else {
null // If "progress" is not found, return null
}
}
fun isDuration(value: String): Boolean {
return value.contains("Duration", ignoreCase = true)
}
fun setDuration(value: String) {
val results = Regex("Duration:\\s*([^,]+),").find(value)?.groupValues?.firstOrNull()
durationTime = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(results.toString())?.value ?: "NA"
duration = timeSpanToSeconds(results)
}
private fun timeSpanToSeconds(time: String?): Int?
{
time ?: return null
val timeString = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(time) ?: return null
val strippedMS = Regex("[0-9]+:[0-9]+:[0-9]+").find(timeString.value) ?: return null
val outTime = LocalTime.parse(strippedMS.value, DateTimeFormatter.ofPattern("HH:mm:ss"))
return outTime.toSecondOfDay()
}
fun getProgress(decoded: DecodedProgressData): FfmpegDecodedProgress {
if (duration == null)
return FfmpegDecodedProgress(duration = durationTime, time = "NA", speed = "NA")
val progressTime = timeSpanToSeconds(decoded.out_time) ?: 0
val progress = floor((progressTime.toDouble() / duration!!.toDouble()) *100).toInt()
val ect = getEstimatedTimeRemaining(decoded)
return FfmpegDecodedProgress(
progress = progress,
estimatedCompletionSeconds = ect,
estimatedCompletion = getETA(ect),
duration = durationTime,
time = decoded.out_time ?: "NA",
speed = decoded.speed?.toString() ?: "NA"
)
}
fun getEstimatedTimeRemaining(decoded: DecodedProgressData): Long {
val position = timeSpanToSeconds(decoded.out_time) ?: 0
return if(duration == null || decoded.speed == null) -1 else
Math.round(Math.round(duration!!.toDouble() - position.toDouble()) / decoded.speed)
}
fun getECT(time: Long): ECT {
var seconds = time
val day = TimeUnit.SECONDS.toDays(seconds)
seconds -= java.util.concurrent.TimeUnit.DAYS.toSeconds(day)
val hour = TimeUnit.SECONDS.toHours(seconds)
seconds -= java.util.concurrent.TimeUnit.HOURS.toSeconds(hour)
val minute = TimeUnit.SECONDS.toMinutes(seconds)
seconds -= java.util.concurrent.TimeUnit.MINUTES.toSeconds(minute)
return ECT(day.toInt(), hour.toInt(), minute.toInt(), seconds.toInt())
}
private fun getETA(time: Long): String {
val etc = getECT(time) ?: return "Unknown"
val str = StringBuilder()
if (etc.day > 0) {
str.append("${etc.day}d").append(" ")
}
if (etc.hour > 0) {
str.append("${etc.hour}h").append(" ")
}
if (etc.day == 0 && etc.minute > 0) {
str.append("${etc.minute}m").append(" ")
}
if (etc.hour == 0 && etc.second > 0) {
str.append("${etc.second}s").append(" ")
}
return str.toString().trim()
}
}

View File

@ -0,0 +1,102 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import com.google.gson.Gson
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter
class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents) {
val scope = Coroutines.io()
val decoder = FfmpegProgressDecoder()
private val outputCache = mutableListOf<String>()
val logFile = ProcesserEnv.logDirectory.using("$eventId-${File(info.outFile).nameWithoutExtension}.log")
val getOutputCache = outputCache.toList()
data class FfmpegWorkerArgumentsBuilder(
private val mutableList: MutableList<String> = mutableListOf()
) {
private val defaultArguments = listOf(
"-nostdin",
"-hide_banner"
)
private val progressArguments = listOf("-progress", "pipe:1")
fun using(info: FfmpegWorkRequestCreated) = apply {
this.mutableList.add(info.inputFile)
this.mutableList.addAll(info.arguments)
this.mutableList.add(info.outFile)
}
fun build(): List<String> {
return (if (ProcesserEnv.allowOverwrite) listOf("-y") else emptyList()) + defaultArguments + listOf("-i") + mutableList
}
fun buildWithProgress(): List<String> {
return build() + progressArguments
}
}
suspend fun run() {
val args = FfmpegWorkerArgumentsBuilder().using(info).build()
execute(args)
}
suspend fun runWithProgress() {
val args = FfmpegWorkerArgumentsBuilder().using(info).buildWithProgress()
execute(args)
}
private suspend fun execute(args: List<String>) {
listener.onStarted(info)
val processOp = process(ProcesserEnv.ffmpeg, *args.toTypedArray(),
stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE,
consumer = {
onOutputChanged(it)
},
destroyForcibly = true)
val result = processOp
println(Gson().toJson(result))
if (result.resultCode != 0) {
listener.onError(info, result.output.joinToString("\n"))
} else {
listener.onCompleted(info)
}
}
fun onOutputChanged(line: String) {
outputCache.add(line)
writeToLog(line)
// toList is needed to prevent mutability.
val progress = decoder.parseVideoProgress(outputCache.toList())
}
fun writeToLog(line: String) {
val fileWriter = FileWriter(logFile, true) // true indikerer at vi ønsker å appende til filen
val bufferedWriter = BufferedWriter(fileWriter)
// Skriv logglinjen til filen
bufferedWriter.write(line)
bufferedWriter.newLine() // Legg til en ny linje etter logglinjen
// Lukk BufferedWriter og FileWriter for å frigjøre ressurser
bufferedWriter.close()
fileWriter.close()
}
}
interface FfmpegWorkerEvents {
fun onStarted(info: FfmpegWorkRequestCreated,)
fun onCompleted(info: FfmpegWorkRequestCreated)
fun onError(info: FfmpegWorkRequestCreated, errorMessage: String)
fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress)
}

View File

@ -0,0 +1,37 @@
package no.iktdev.mediaprocessing.processer.services
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class ClaimsService() {
private val log = KotlinLogging.logger {}
@Autowired
lateinit var coordinator: Coordinator
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = PersistentDataStore()
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" }
}
}
coordinator.readAllAvailableInQueue()
}
}

View File

@ -0,0 +1,178 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.Tasks
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.processer.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class EncodeService: TaskCreator() {
private val log = KotlinLogging.logger {}
val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null
val encodeServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
log.info { "Starting encode service with id: $encodeServiceId" }
}
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_WORK_ENCODE_CREATED)
override fun getListener(): Tasks {
return Tasks(producesEvent, this)
}
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper? {
if (event.data !is FfmpegWorkRequestCreated) {
return MessageDataWrapper(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runnerJob?.isActive != true) {
startEncode(event)
} else {
log.warn { "Worker is already running.." }
}
// This should never return any other than continue or skipped
return null
}
fun startEncode(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs()
if (!ProcesserEnv.logDirectory.exists()) {
ProcesserEnv.logDirectory.mkdirs()
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents)
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
return
}
runnerJob = scope.launch {
runner!!.runWithProgress()
}
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
}
}
val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" }
return
}
log.info { "Encode started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId)
sendProgress(info, null, false)
scope.launch {
while (runnerJob?.isActive == true) {
delay(java.time.Duration.ofMinutes(5).toMillis())
PersistentDataStore().updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, encodeServiceId)
}
}
}
override fun onCompleted(info: FfmpegWorkRequestCreated) {
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" }
return
}
log.info { "Encode completed for ${runner.referenceId}" }
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, encodeServiceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId)
}
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId))
clearWorker()
}
}
override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) {
val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" }
return
}
log.info { "Encode failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId))
sendProgress(info = info, ended = true)
clearWorker()
}
override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
sendProgress(info, progress, false)
}
}
fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) {
}
fun clearWorker() {
this.runner?.scope?.cancel()
this.runner = null
}
@PreDestroy
fun shutdown() {
scope.cancel()
runner?.scope?.cancel("Stopping application")
}
}

View File

@ -0,0 +1,184 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.processer.TaskCreator
import no.iktdev.mediaprocessing.processer.Tasks
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents
import no.iktdev.mediaprocessing.processer.getComputername
import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
import javax.annotation.PreDestroy
@Service
class ExtractService: TaskCreator() {
private val log = KotlinLogging.logger {}
val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null
val extractServiceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
log.info { "Starting extract service with id: $extractServiceId" }
}
override fun getListener(): Tasks {
return Tasks(producesEvent, this)
}
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentProcessDataMessage, events: List<PersistentProcessDataMessage>): MessageDataWrapper? {
if (event.data !is FfmpegWorkRequestCreated) {
return MessageDataWrapper(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
}
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runnerJob?.isActive != true) {
startExtract(event)
} else {
log.warn { "Worker is already running.." }
}
// This should never return any other than continue or skipped
return null
}
fun startExtract(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs()
if (!ProcesserEnv.logDirectory.exists()) {
ProcesserEnv.logDirectory.mkdirs()
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents)
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
return
}
runnerJob = scope.launch {
runner!!.run()
}
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
}
}
val ffmpegWorkerEvents = object : FfmpegWorkerEvents {
override fun onStarted(info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce start message when the referenceId is not present" }
return
}
log.info { "Extract started for ${runner.referenceId}" }
PersistentDataStore().setProcessEventClaim(runner.referenceId, runner.eventId, extractServiceId)
sendState(info, false)
}
override fun onCompleted(info: FfmpegWorkRequestCreated) {
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce completion message when the referenceId is not present" }
return
}
log.info { "Extract completed for ${runner.referenceId}" }
var consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId)
runBlocking {
delay(1000)
limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) {
consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(runner.referenceId, runner.eventId, extractServiceId)
}
log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" }
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId)
limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) {
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, extractServiceId)
log.info { readbackIsSuccess }
}
log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
)
log.info { "Extract is releasing worker" }
clearWorker()
}
}
override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) {
val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" }
return
}
log.info { "Extract failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
)
sendState(info, ended= true)
clearWorker()
}
override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) {
// None as this will not be running with progress
}
}
fun sendState(info: FfmpegWorkRequestCreated, ended: Boolean) {
}
fun clearWorker() {
this.runner?.scope?.cancel()
this.runner = null
}
@PreDestroy
fun shutdown() {
scope.cancel()
runner?.scope?.cancel("Stopping application")
}
}

View File

@ -1,4 +0,0 @@
package no.mediaprocessing.apps.processer
class ProcesserApplication {
}

View File

@ -0,0 +1,3 @@
spring.output.ansi.enabled=always
logging.level.org.apache.kafka=WARN
logging.level.root=INFO

View File

@ -1,39 +0,0 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import kotlin.reflect.KClass
class DeserializingRegistry {
companion object {
val deserializables = mutableListOf<Pair<KafkaEvents, KClass<out MessageDataWrapper>?>>(
KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class,
KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to null,
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to null,
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to null,
KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null,
KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null,
KafkaEvents.EVENT_WORK_ENCODE_CREATED to null,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to null,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to null,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to null,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to null,
KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null,
KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null,
KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null,
KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null,
KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null,
)
}
}

View File

@ -8,22 +8,30 @@ private val log = KotlinLogging.logger {}
class Preference { class Preference {
companion object { companion object {
private var prevPreference: PreferenceDto? = null
fun getPreference(): PreferenceDto { fun getPreference(): PreferenceDto {
val preference = readOrDefaultPreference()
if (preference != prevPreference) {
log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec }
log.info { "[Audio]: Language = " + preference.encodePreference.audio.language }
log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels }
log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate }
log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected }
log.info { "[Video]: Codec = " + preference.encodePreference.video.codec }
log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat }
log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") }
log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold }
}
return preference.also { prevPreference = it }
}
private fun readOrDefaultPreference(): PreferenceDto {
val preference = readPreferenceFromFile() ?: PreferenceDto() val preference = readPreferenceFromFile() ?: PreferenceDto()
log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec }
log.info { "[Audio]: Language = " + preference.encodePreference.audio.language }
log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels }
log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate }
log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected }
log.info { "[Video]: Codec = " + preference.encodePreference.video.codec }
log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat }
log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") }
log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold }
return preference return preference
} }
private fun readPreferenceFromFile(): PreferenceDto? { private fun readPreferenceFromFile(): PreferenceDto? {
val prefFile = SharedConfig.preference val prefFile = SharedConfig.preference
if (!prefFile.exists()) { if (!prefFile.exists()) {

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.delay
import mu.KotlinLogging import mu.KotlinLogging
import java.io.File import java.io.File
import java.io.RandomAccessFile import java.io.RandomAccessFile
@ -19,3 +20,13 @@ fun isFileAvailable(file: File): Boolean {
} }
return false return false
} }
suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) {
var elapsedDelay = 0L
do {
block.invoke()
elapsedDelay += delayed
delay(delayed)
} while (condition.invoke() && elapsedDelay < maxDuration)
}

View File

@ -31,6 +31,7 @@ fun <T> insertWithSuccess(block: () -> T): Boolean {
transaction { transaction {
try { try {
block() block()
commit()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
// log the error here or handle the exception as needed // log the error here or handle the exception as needed
@ -49,10 +50,13 @@ fun <T> executeOrException(block: () -> T): Exception? {
transaction { transaction {
try { try {
block() block()
commit()
null null
} catch (e: Exception) { } catch (e: Exception) {
// log the error here or handle the exception as needed // log the error here or handle the exception as needed
rollback()
e e
} }
} }
} catch (e: Exception) { } catch (e: Exception) {
@ -66,6 +70,7 @@ fun <T> executeWithStatus(block: () -> T): Boolean {
transaction { transaction {
try { try {
block() block()
commit()
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
// log the error here or handle the exception as needed // log the error here or handle the exception as needed

View File

@ -29,7 +29,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp
else -> sanitizedName else -> sanitizedName
} }
val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped
return MovieInfo(type = "movie", cleanup(nonResolutioned), cleanup(nonResolutioned)) return MovieInfo(title = cleanup(nonResolutioned), fullName = cleanup(nonResolutioned))
} }
private fun determineSerieFileName(): EpisodeInfo? { private fun determineSerieFileName(): EpisodeInfo? {
@ -58,7 +58,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp
} }
} else title } else title
val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim()
return EpisodeInfo(type= "serie", title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) return EpisodeInfo(title = title, episode = episodeNumber.toInt(), season = seasonNumber.toInt(), episodeTitle = episodeTitle, fullName = fullName)
} }
private fun determineUndefinedFileName(): VideoInfo? { private fun determineUndefinedFileName(): VideoInfo? {

View File

@ -2,9 +2,9 @@ package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import org.jetbrains.exposed.sql.SortOrder import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.selectAll import java.time.LocalDateTime
class PersistentDataReader { class PersistentDataReader {
val dzz = DeserializingRegistry() val dzz = DeserializingRegistry()
@ -25,4 +25,62 @@ class PersistentDataReader {
} ?: emptyList() } ?: emptyList()
} }
fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withTransaction {
events.selectAll()
.andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event }
.groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } }
} ?: emptyList()
return result
}
fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean {
val result = withTransaction {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }.singleOrNull()
}
return result?.claimed ?: true
}
fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimedBy eq claimedBy)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
}?.singleOrNull()?.consumed ?: false
}
fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction {
processerEvents.select {
(processerEvents.claimed eq false) and
(processerEvents.consumed eq false)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
}
fun getExpiredClaimsProcessEvents(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
val entries = withTransaction {
processerEvents.select {
(processerEvents.claimed eq true) and
(processerEvents.consumed neq true)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline }
}
fun getProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction {
processerEvents.selectAll()
.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
} ?: emptyList()
}
} }

View File

@ -2,13 +2,18 @@ package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.update
import java.sql.SQLIntegrityConstraintViolationException import java.sql.SQLIntegrityConstraintViolationException
open class PersistentDataStore { open class PersistentDataStore {
fun storeMessage(event: String, message: Message<*>): Boolean { fun storeEventDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException { val exception = executeOrException {
events.insert { events.insert {
it[events.referenceId] = message.referenceId it[events.referenceId] = message.referenceId
@ -28,4 +33,78 @@ open class PersistentDataStore {
} }
} }
fun storeProcessDataMessage(event: String, message: Message<*>): Boolean {
val exception = executeOrException {
processerEvents.insert {
it[processerEvents.referenceId] = message.referenceId
it[processerEvents.eventId] = message.eventId
it[processerEvents.event] = event
it[processerEvents.data] = message.dataAsJson()
}
}
return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) {
(exception as ExposedSQLException).errorCode == 1062
}
else {
exception.printStackTrace()
false
}
}
}
fun setProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false)
}) {
it[processerEvents.claimedBy] = claimedBy
it[lastCheckIn] = CurrentDateTime
it[claimed] = true
}
} == 1
}
fun setProcessEventCompleted(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withTransaction {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimedBy eq claimedBy) and
(processerEvents.claimed eq true)
}) {
it[processerEvents.consumed] = true
}
} == 1
}
fun updateCurrentProcessEventClaim(referenceId: String, eventId: String, claimedBy: String): Boolean {
return executeWithStatus {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false) and
(processerEvents.claimedBy eq claimedBy)
}) {
it[lastCheckIn] = CurrentDateTime
}
}
}
fun releaseProcessEventClaim(referenceId: String, eventId: String): Boolean {
val exception = executeOrException {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[claimedBy] = null
it[lastCheckIn] = null
it[claimed] = false
}
}
return exception == null
}
} }

View File

@ -10,10 +10,19 @@ data class PersistentMessage(
val referenceId: String, val referenceId: String,
val eventId: String, val eventId: String,
val event: KafkaEvents, val event: KafkaEvents,
//val metadata: Metadata,
val data: MessageDataWrapper, val data: MessageDataWrapper,
val created: LocalDateTime val created: LocalDateTime
) )
data class Metadata(
val createdBy: String
)
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event
}
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try { val kev = try {
KafkaEvents.toEvent(row[events.event]) KafkaEvents.toEvent(row[events.event])

View File

@ -0,0 +1,49 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimed
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.claimedBy
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.consumed
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.created
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.data
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.event
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.eventId
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.lastCheckIn
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents.referenceId
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime
data class PersistentProcessDataMessage(
val referenceId: String,
val eventId: String,
val event: KafkaEvents,
val data: MessageDataWrapper,
val created: LocalDateTime,
val claimedBy: String? = null,
val claimed: Boolean = false,
val consumed: Boolean = false,
val lastCheckIn: LocalDateTime? = null
)
fun fromRowToPersistentProcessDataMessage(row: ResultRow, dez: DeserializingRegistry): PersistentProcessDataMessage? {
val kev = try {
KafkaEvents.toEvent(row[event])
} catch (e: IllegalArgumentException) {
e.printStackTrace()
return null
}?: return null
val dzdata = dez.deserializeData(kev, row[data])
return PersistentProcessDataMessage(
referenceId = row[referenceId],
eventId = row[eventId],
event = kev,
data = dzdata,
created = row[created],
claimed = row[claimed],
claimedBy = row[claimedBy],
consumed = row[consumed],
lastCheckIn = row[lastCheckIn]
)
}

View File

@ -9,7 +9,7 @@ import java.time.LocalDateTime
object events: IntIdTable() { object events: IntIdTable() {
val referenceId: Column<String> = varchar("referenceId", 50) val referenceId: Column<String> = varchar("referenceId", 50)
val eventId: Column<String> = varchar("eventId", 50) val eventId: Column<String> = varchar("eventId", 50)
val event: Column<String> = varchar("event1",100) val event: Column<String> = varchar("event",100)
val data: Column<String> = text("data") val data: Column<String> = text("data")
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime) val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)

View File

@ -2,9 +2,22 @@ package no.iktdev.mediaprocessing.shared.common.persistance
import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.javatime.datetime
import java.time.LocalDateTime
object processerEvents: IntIdTable() { object processerEvents: IntIdTable() {
val referenceId: Column<String> = varchar("referenceId", 50)
val claimed: Column<Boolean> = bool("claimed") val claimed: Column<Boolean> = bool("claimed").default(false)
val claimedBy: Column<String?> = varchar("claimedBy", 100).nullable()
val event: Column<String> = varchar("event",100)
val eventId: Column<String> = varchar("eventId", 50)
val data: Column<String> = text("data") val data: Column<String> = text("data")
val consumed: Column<Boolean> = bool("consumed").default(false)
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
val lastCheckIn: Column<LocalDateTime?> = datetime("lastCheckIn").nullable()
init {
uniqueIndex(referenceId, event)
}
} }

View File

@ -2,14 +2,15 @@ package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import java.lang.reflect.Type
import kotlin.reflect.KClass
class DeserializingRegistry { class DeserializingRegistry {
private val log = KotlinLogging.logger {}
companion object { companion object {
val deserializables = mutableMapOf( val deserializables = mutableMapOf(
KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class.java, KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class.java,
@ -17,18 +18,19 @@ class DeserializingRegistry {
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class.java, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to null, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to VideoInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to null, KafkaEvents.EVENT_MEDIA_READ_OUT_COVER to CoverInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to null, KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null, KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null,
KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null, KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null,
KafkaEvents.EVENT_WORK_ENCODE_CREATED to null, KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to null, KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to null, KafkaEvents.EVENT_WORK_CONVERT_CREATED to null,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to null, KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to null, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null, KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null,
KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null, KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null,
@ -41,6 +43,9 @@ class DeserializingRegistry {
fun deserialize(event: KafkaEvents, json: String): Message<out MessageDataWrapper> { fun deserialize(event: KafkaEvents, json: String): Message<out MessageDataWrapper> {
val gson = Gson() val gson = Gson()
val dezClazz = deserializables[event] val dezClazz = deserializables[event]
if (dezClazz == null) {
log.warn { "${event.event} will be deserialized with default!" }
}
dezClazz?.let { eventClass -> dezClazz?.let { eventClass ->
try { try {
val type = TypeToken.getParameterized(Message::class.java, eventClass).type val type = TypeToken.getParameterized(Message::class.java, eventClass).type
@ -51,7 +56,7 @@ class DeserializingRegistry {
} }
// Fallback // Fallback
val type = object : TypeToken<Message<out MessageDataWrapper>>() {}.type val type = object : TypeToken<Message<out MessageDataWrapper>>() {}.type
return gson.fromJson<Message<MessageDataWrapper>>(json, type) return gson.fromJson<Message<SimpleMessageData>>(json, type)
} }
fun deserializeData(event: KafkaEvents, json: String): MessageDataWrapper { fun deserializeData(event: KafkaEvents, json: String): MessageDataWrapper {

View File

@ -8,6 +8,7 @@ enum class KafkaEvents(val event: String) {
EVENT_MEDIA_READ_BASE_INFO_PERFORMED("event:media-read-base-info:performed"), EVENT_MEDIA_READ_BASE_INFO_PERFORMED("event:media-read-base-info:performed"),
EVENT_MEDIA_METADATA_SEARCH_PERFORMED("event:media-metadata-search:performed"), EVENT_MEDIA_METADATA_SEARCH_PERFORMED("event:media-metadata-search:performed"),
EVENT_MEDIA_READ_OUT_NAME_AND_TYPE("event:media-read-out-name-and-type:performed"), EVENT_MEDIA_READ_OUT_NAME_AND_TYPE("event:media-read-out-name-and-type:performed"),
EVENT_MEDIA_READ_OUT_COVER("event:media-read-out-cover:performed"),
EVENT_MEDIA_ENCODE_PARAMETER_CREATED("event:media-encode-parameter:created"), EVENT_MEDIA_ENCODE_PARAMETER_CREATED("event:media-encode-parameter:created"),
EVENT_MEDIA_EXTRACT_PARAMETER_CREATED("event:media-extract-parameter:created"), EVENT_MEDIA_EXTRACT_PARAMETER_CREATED("event:media-extract-parameter:created"),

View File

@ -25,7 +25,7 @@ open class KafkaImplementation {
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
log.info { config } //log.info { config }
return DefaultKafkaProducerFactory(config) return DefaultKafkaProducerFactory(config)
} }
@Bean @Bean
@ -43,7 +43,7 @@ open class KafkaImplementation {
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages
config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds
config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds
log.info { config } //log.info { config }
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
} }
} }

View File

@ -1,11 +1,6 @@
package no.iktdev.mediaprocessing.shared.kafka.dto package no.iktdev.mediaprocessing.shared.kafka.dto
import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import java.io.Serializable
import java.lang.reflect.Type
import java.util.*
open class MessageDataWrapper( open class MessageDataWrapper(
@ -15,7 +10,7 @@ open class MessageDataWrapper(
data class SimpleMessageData( data class SimpleMessageData(
override val status: Status, override val status: Status,
override val message: String? override val message: String? = null
) : MessageDataWrapper(status, message) ) : MessageDataWrapper(status, message)

View File

@ -0,0 +1,17 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
)
data class FfmpegWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String
): MessageDataWrapper(status, message)

View File

@ -1,17 +1,29 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.Gson
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
data class VideoInfoPerformed( data class VideoInfoPerformed(
override val status: Status, override val status: Status,
val info: VideoInfo val info: JsonObject,
val outDirectory: String
) )
: MessageDataWrapper(status) : MessageDataWrapper(status) {
fun toValueObject(): VideoInfo? {
val type = info.get("type").asString
return when (type) {
"movie" -> Gson().fromJson(info.toString(), MovieInfo::class.java)
"serie" -> Gson().fromJson(info.toString(), EpisodeInfo::class.java)
else -> null
}
}
}
data class EpisodeInfo( data class EpisodeInfo(
override val type: String, override val type: String = "serie",
val title: String, val title: String,
val episode: Int, val episode: Int,
val season: Int, val season: Int,
@ -20,7 +32,7 @@ data class EpisodeInfo(
): VideoInfo(type, fullName) ): VideoInfo(type, fullName)
data class MovieInfo( data class MovieInfo(
override val type: String, override val type: String = "movie",
val title: String, val title: String,
override val fullName: String override val fullName: String
) : VideoInfo(type, fullName) ) : VideoInfo(type, fullName)
@ -34,4 +46,8 @@ data class SubtitleInfo(
open class VideoInfo( open class VideoInfo(
@Transient open val type: String, @Transient open val type: String,
@Transient open val fullName: String @Transient open val fullName: String
) ) {
fun toJsonObject(): JsonObject {
return Gson().toJsonTree(this).asJsonObject
}
}