This commit is contained in:
bskjon 2024-05-05 17:57:05 +02:00
parent 4d7af0dedb
commit e776e00375
9 changed files with 101 additions and 25 deletions

View File

@ -4,12 +4,16 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.lastOf
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.isOnly
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.az
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@ -23,41 +27,51 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
override val requiredEvents: List<KafkaEvents> override val requiredEvents: List<KafkaEvents>
get() = listOf( get() = listOf(
KafkaEvents.EventWorkExtractCreated KafkaEvents.EventWorkExtractCreated
// TODO: Add event for request as well
) )
override val listensForEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EventMediaProcessStarted)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" }
val startedEvent = events.lastOf(KafkaEvents.EventMediaProcessStarted)
// Check what it is and create based on it val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az<MediaProcessStarted>()
if (startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
val derivedInfoObject = if (event.event in requiredEvents) { val subtitleFile = File(startedEventData.file)
DerivedInfoObject.fromExtractWorkCreated(event) return produceConvertWorkRequest(subtitleFile, null, startedEvent?.eventId)
} else { } else {
val extractEvent = events.findLast { it.event == KafkaEvents.EventWorkExtractCreated } val derivedInfoObject = if (event.event in requiredEvents) {
extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) } DerivedInfoObject.fromExtractWorkCreated(event)
} ?: return null } else {
val extractEvent = events.lastOf(KafkaEvents.EventWorkExtractCreated)
extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) }
} ?: return null
val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) {
event.eventId event.eventId
} else null; } else null;
val outFile = File(derivedInfoObject.outputFile) val outFile = File(derivedInfoObject.outputFile)
return produceConvertWorkRequest(outFile, requiredEventId, event.eventId)
}
}
private fun produceConvertWorkRequest(file: File, requiresEventId: String?, derivedFromEventId: String?): ConvertWorkerRequest {
return ConvertWorkerRequest( return ConvertWorkerRequest(
status = Status.COMPLETED, status = Status.COMPLETED,
requiresEventId = requiredEventId, requiresEventId = requiresEventId,
inputFile = derivedInfoObject.outputFile, inputFile = file.absolutePath,
allowOverwrite = true, allowOverwrite = true,
outFileBaseName = outFile.nameWithoutExtension, outFileBaseName = file.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath, outDirectory = file.parentFile.absolutePath,
derivedFromEventId = event.eventId derivedFromEventId = derivedFromEventId
) )
} }
private data class DerivedInfoObject( private data class DerivedInfoObject(
val outputFile: String, val outputFile: String,
val derivedFromEventId: String, val derivedFromEventId: String,

View File

@ -6,6 +6,7 @@ import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
@ -42,7 +43,8 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null
return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId) val data = desiredEvent.data as ReaderPerformed
return parseStreams(data, desiredEvent.eventId)
} }
fun parseStreams(data: ReaderPerformed, eventId: String): MessageDataWrapper { fun parseStreams(data: ReaderPerformed, eventId: String): MessageDataWrapper {

View File

@ -10,6 +10,7 @@ import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
@ -23,7 +24,7 @@ import java.io.File
@Service @Service
class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
val requiredOperations = listOf(StartOperationEvents.ENCODE, StartOperationEvents.EXTRACT)
override val producesEvent: KafkaEvents override val producesEvent: KafkaEvents
get() = KafkaEvents.EventMediaReadStreamPerformed get() = KafkaEvents.EventMediaReadStreamPerformed
@ -42,10 +43,14 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events) super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" } log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) } val data = desiredEvent.data as MediaProcessStarted
if (!data.operations.any { it in requiredOperations }) {
log.info { "${event.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" }
return null
}
return runBlocking { fileReadStreams(data, desiredEvent.eventId) }
} }
suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): MessageDataWrapper { suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): MessageDataWrapper {

View File

@ -20,6 +20,8 @@ repositories {
} }
} }
val exposedVersion = "0.44.0"
dependencies { dependencies {
implementation(kotlin("stdlib-jdk8")) implementation(kotlin("stdlib-jdk8"))
@ -35,6 +37,11 @@ dependencies {
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation ("mysql:mysql-connector-java:8.0.29")
implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation("no.iktdev:exfl:0.0.16-SNAPSHOT")
implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared")))

View File

@ -0,0 +1,20 @@
package no.iktdev.mediaprocessing.ui.socket
import no.iktdev.mediaprocessing.ui.service.PersistentEventsTableService
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate
import org.springframework.stereotype.Controller
@Controller
class EventsTableTopic(
@Autowired private val template: SimpMessagingTemplate?,
@Autowired private val persistentEventsTableService: PersistentEventsTableService
): TopicSupport() {
@MessageMapping("/persistent/events")
fun readbackEvents() {
template?.convertAndSend("/topic/persistent/events", persistentEventsTableService.cachedEvents)
}
}

View File

@ -17,6 +17,10 @@ data class PersistentMessage(
val created: LocalDateTime val created: LocalDateTime
) )
fun List<PersistentMessage>.lastOf(event: KafkaEvents): PersistentMessage? {
return this.lastOrNull { it.event == event && it.isSuccess() }
}
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event return this.event == event

View File

@ -20,4 +20,8 @@ enum class StartOperationEvents {
ENCODE, ENCODE,
EXTRACT, EXTRACT,
CONVERT CONVERT
} }
fun List<StartOperationEvents>.isOnly(expected: StartOperationEvents): Boolean {
return this.size == 1 && this.firstOrNull { it == expected } != null
}

View File

@ -0,0 +1,11 @@
package no.iktdev.mediaprocessing.shared.contract.dto
import java.time.LocalDateTime
data class EventsDto(
val referenceId: String,
val eventId: String,
val event: String,
val data: String,
val created: LocalDateTime
)

View File

@ -7,6 +7,15 @@ abstract class MessageDataWrapper(
@Transient open val derivedFromEventId: String? = null @Transient open val derivedFromEventId: String? = null
) )
@Suppress("UNCHECKED_CAST")
fun <T> MessageDataWrapper.az(): T? {
return try {
this as T
} catch (e: Exception) {
e.printStackTrace()
null
}
}
data class SimpleMessageData( data class SimpleMessageData(