Updated ffmpeg stuff

This commit is contained in:
bskjon 2024-07-02 18:14:48 +02:00
parent 423c6707e9
commit 504b11ba5d
3 changed files with 68 additions and 36 deletions

View File

@ -14,6 +14,7 @@ import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@ -29,13 +30,22 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" }
if (events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterEncodeCreated) }?.isSuccess() != true) {
return null
}
if (!isPermittedToCreateTasks(events)) {
log.warn { "Cannot continue until permitted event is present" }
}
val batchEvents = createMessagesByArgs(event)
val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterEncodeCreated) {
val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterEncodeCreated }
if (sevent != null) {
@ -46,22 +56,24 @@ class CreateEncodeWorkTask(@Autowired override var coordinator: EventCoordinator
sevent ?: event
} else event
forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.let {
val entries = it.entries.firstOrNull() ?: return@let
val ffmpegTask = FfmpegTaskData(
inputFile = it.inputFile,
outFile = entries.outputFile,
arguments = entries.arguments
)
val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Encode, Gson().toJson(ffmpegTask))
batchEvents.forEach { e ->
val createdTask = if (e is FfmpegWorkRequestCreated) {
FfmpegTaskData(
inputFile = e.inputFile,
outFile = e.outFile,
arguments = e.arguments
).let { task ->
val status = taskManager.createTask(referenceId = event.referenceId, task= TaskType.Encode, data = Gson().toJson(task))
if (!status) {
log.error { "Failed to create Encode task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" }
}
status
}
return super.onProcessEvents(forwardEvent, events)
} else false
if (createdTask)
onResult(e)
}
return null
}
}

View File

@ -13,6 +13,7 @@ import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
@ -31,10 +32,17 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato
super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" }
if (events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }?.isSuccess() != true) {
return null
}
if (!isPermittedToCreateTasks(events)) {
log.warn { "Cannot continue until permitted event is present" }
}
val batchEvents = createMessagesByArgs(event)
val forwardEvent = if (event.event != KafkaEvents.EventMediaParameterExtractCreated) {
val sevent = events.findLast { it.event == KafkaEvents.EventMediaParameterExtractCreated }
if (sevent != null) {
@ -45,20 +53,23 @@ class CreateExtractWorkTask(@Autowired override var coordinator: EventCoordinato
sevent ?: event
} else event
forwardEvent.data.az<FfmpegWorkerArgumentsCreated>()?.takeIf { it.isSuccess() }?.let {
it.entries.forEach { argsGroup ->
val ffmpegTask = FfmpegTaskData(
inputFile = it.inputFile,
outFile = argsGroup.outputFile,
arguments = argsGroup.arguments
)
val status = taskManager.createTask(event.referenceId, forwardEvent.eventId, TaskType.Extract, Gson().toJson(ffmpegTask))
batchEvents.forEach { e ->
val createdTask = if (e is FfmpegWorkRequestCreated) {
FfmpegTaskData(
inputFile = e.inputFile,
outFile = e.outFile,
arguments = e.arguments
).let { task ->
val status = taskManager.createTask(referenceId = event.referenceId, task= TaskType.Encode, data = Gson().toJson(task))
if (!status) {
log.error { "Failed to create Extract task on ${forwardEvent.referenceId}@${forwardEvent.eventId}" }
}
status
}
} else false
if (createdTask)
onResult(e)
}
return super.onProcessEvents(forwardEvent, events)
return null
}
}

View File

@ -14,26 +14,34 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStar
abstract class CreateProcesserWorkTask(override var coordinator: EventCoordinator) : TaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
open fun isPermittedToCreateTasks(events: List<PersistentMessage>): Boolean {
val event = events.firstOrNull() ?: return false
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted?
if (started == null) {
log.info { "${event.referenceId} couldn't find start event" }
return null
return false
} else if (started.type == ProcessType.MANUAL) {
val proceed = events.find { it.event == KafkaEvents.EventMediaWorkProceedPermitted }
if (proceed == null) {
log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" }
return null
return false
} else {
log.warn { "${event.referenceId} registered proceed permitted" }
}
}
return true
}
val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return null
fun createMessagesByArgs(event: PersistentMessage): List<MessageDataWrapper> {
val events: MutableList<MessageDataWrapper> = mutableListOf()
val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return events
if (earg == null || earg.entries.isEmpty()) {
log.info { "${event.referenceId} ffargument is empty" }
return null
return events
}
val requestEvents = earg.entries.map {
@ -47,8 +55,9 @@ abstract class CreateProcesserWorkTask(override var coordinator: EventCoordinato
}
requestEvents.forEach {
log.info { "${event.referenceId} creating work request based on ${it.derivedFromEventId}" }
super.onResult(it)
events.add(it)
}
return null
return events
}
}