diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt index 9ae05fde..f2602ddc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CreateConvertWorkTask.kt @@ -4,12 +4,16 @@ import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.lastOf +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.isOnly import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.az import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File @@ -23,41 +27,51 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : override val requiredEvents: List get() = listOf( KafkaEvents.EventWorkExtractCreated - // TODO: Add event for request as well ) + override val listensForEvents: List + get() = listOf(KafkaEvents.EventMediaProcessStarted) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { super.onProcessEventsAccepted(event, events) log.info { "${event.referenceId} @ ${event.eventId} triggered by ${event.event}" } - - // Check what it is and create based on it - - val derivedInfoObject = if (event.event in requiredEvents) { - DerivedInfoObject.fromExtractWorkCreated(event) + val startedEvent = events.lastOf(KafkaEvents.EventMediaProcessStarted) + val startedEventData = events.lastOf(KafkaEvents.EventMediaProcessStarted)?.data?.az() + if (startedEventData?.operations?.isOnly(StartOperationEvents.CONVERT) == true) { + val subtitleFile = File(startedEventData.file) + return produceConvertWorkRequest(subtitleFile, null, startedEvent?.eventId) } else { - val extractEvent = events.findLast { it.event == KafkaEvents.EventWorkExtractCreated } - extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) } - } ?: return null + val derivedInfoObject = if (event.event in requiredEvents) { + DerivedInfoObject.fromExtractWorkCreated(event) + } else { + val extractEvent = events.lastOf(KafkaEvents.EventWorkExtractCreated) + extractEvent?.let { it -> DerivedInfoObject.fromExtractWorkCreated(it) } + } ?: return null - val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { - event.eventId - } else null; + val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) { + event.eventId + } else null; - val outFile = File(derivedInfoObject.outputFile) + val outFile = File(derivedInfoObject.outputFile) + return produceConvertWorkRequest(outFile, requiredEventId, event.eventId) + } + } + + private fun produceConvertWorkRequest(file: File, requiresEventId: String?, derivedFromEventId: String?): ConvertWorkerRequest { return ConvertWorkerRequest( status = Status.COMPLETED, - requiresEventId = requiredEventId, - inputFile = derivedInfoObject.outputFile, + requiresEventId = requiresEventId, + inputFile = file.absolutePath, allowOverwrite = true, - outFileBaseName = outFile.nameWithoutExtension, - outDirectory = outFile.parentFile.absolutePath, - derivedFromEventId = event.eventId + outFileBaseName = file.nameWithoutExtension, + outDirectory = file.parentFile.absolutePath, + derivedFromEventId = derivedFromEventId ) - } + + private data class DerivedInfoObject( val outputFile: String, val derivedFromEventId: String, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index 9731f50f..8c766513 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -6,6 +6,7 @@ import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream @@ -42,7 +43,8 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null - return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId) + val data = desiredEvent.data as ReaderPerformed + return parseStreams(data, desiredEvent.eventId) } fun parseStreams(data: ReaderPerformed, eventId: String): MessageDataWrapper { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 99238761..0be70c90 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -10,6 +10,7 @@ import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing +import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData @@ -23,7 +24,7 @@ import java.io.File @Service class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val log = KotlinLogging.logger {} - + val requiredOperations = listOf(StartOperationEvents.ENCODE, StartOperationEvents.EXTRACT) override val producesEvent: KafkaEvents get() = KafkaEvents.EventMediaReadStreamPerformed @@ -42,10 +43,14 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { super.onProcessEventsAccepted(event, events) - log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null - return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) } + val data = desiredEvent.data as MediaProcessStarted + if (!data.operations.any { it in requiredOperations }) { + log.info { "${event.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" } + return null + } + return runBlocking { fileReadStreams(data, desiredEvent.eventId) } } suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): MessageDataWrapper { diff --git a/apps/ui/build.gradle.kts b/apps/ui/build.gradle.kts index 9cf74347..15811d7b 100644 --- a/apps/ui/build.gradle.kts +++ b/apps/ui/build.gradle.kts @@ -20,6 +20,8 @@ repositories { } } +val exposedVersion = "0.44.0" + dependencies { implementation(kotlin("stdlib-jdk8")) @@ -35,6 +37,11 @@ dependencies { implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") + implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion") + implementation ("mysql:mysql-connector-java:8.0.29") implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") implementation(project(mapOf("path" to ":shared"))) diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventsTableTopic.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventsTableTopic.kt new file mode 100644 index 00000000..7845c812 --- /dev/null +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventsTableTopic.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.ui.socket + +import no.iktdev.mediaprocessing.ui.service.PersistentEventsTableService +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.messaging.handler.annotation.MessageMapping +import org.springframework.messaging.simp.SimpMessagingTemplate +import org.springframework.stereotype.Controller + +@Controller +class EventsTableTopic( + @Autowired private val template: SimpMessagingTemplate?, + @Autowired private val persistentEventsTableService: PersistentEventsTableService +): TopicSupport() { + + @MessageMapping("/persistent/events") + fun readbackEvents() { + template?.convertAndSend("/topic/persistent/events", persistentEventsTableService.cachedEvents) + } + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 932d019d..12e51f3b 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -17,6 +17,10 @@ data class PersistentMessage( val created: LocalDateTime ) +fun List.lastOf(event: KafkaEvents): PersistentMessage? { + return this.lastOrNull { it.event == event && it.isSuccess() } +} + fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { return this.event == event diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt index ce7b69d1..c55645e4 100644 --- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt @@ -20,4 +20,8 @@ enum class StartOperationEvents { ENCODE, EXTRACT, CONVERT -} \ No newline at end of file +} + +fun List.isOnly(expected: StartOperationEvents): Boolean { + return this.size == 1 && this.firstOrNull { it == expected } != null +} diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/EventsDto.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/EventsDto.kt new file mode 100644 index 00000000..e7ae3a74 --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/EventsDto.kt @@ -0,0 +1,11 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +import java.time.LocalDateTime + +data class EventsDto( + val referenceId: String, + val eventId: String, + val event: String, + val data: String, + val created: LocalDateTime +) diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt index e16e623c..d214de81 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt @@ -7,6 +7,15 @@ abstract class MessageDataWrapper( @Transient open val derivedFromEventId: String? = null ) +@Suppress("UNCHECKED_CAST") +fun MessageDataWrapper.az(): T? { + return try { + this as T + } catch (e: Exception) { + e.printStackTrace() + null + } +} data class SimpleMessageData(