From 90e9d873f012ec3868e676004ead8df386c34927 Mon Sep 17 00:00:00 2001 From: Brage Date: Sat, 6 Jan 2024 23:29:37 +0100 Subject: [PATCH] Converter now works --- .idea/misc.xml | 4 +- apps/converter/build.gradle.kts | 23 ++- .../converter/ClaimsService.kt | 36 ++++ .../converter/ConverterApplication.kt | 11 ++ .../converter/ConverterCoordinator.kt | 73 ++++++++ .../mediaprocessing/converter/ConverterEnv.kt | 15 ++ .../converter/Implementations.kt | 16 ++ .../converter/convert/Converter.kt | 55 ++++++ .../flow/EventBasedProcessMessageListener.kt | 30 ++++ .../converter/flow/ProcesserTaskCreator.kt | 29 ++++ .../converter/tasks/ConvertService.kt | 159 ++++++++++++++++++ .../mediaprocessing/processer/ProcesserEnv.kt | 7 +- .../processer/ffmpeg/FfmpegWorker.kt | 9 +- .../processer/services/EncodeService.kt | 11 +- .../processer/services/ExtractService.kt | 10 +- shared/common/build.gradle.kts | 3 + .../shared/common/CoordinatorBase.kt | 37 ++++ .../mediaprocessing/shared/common/Utils.kt | 7 + .../persistance/PersistentDataReader.kt | 10 ++ .../common/persistance/PersistentMessage.kt | 5 +- .../common/tasks/EventBasedMessageListener.kt | 67 ++++++++ .../common/tasks/ITaskCreatorListener.kt | 6 + .../shared/common/tasks/TaskCreatorImpl.kt | 104 ++++++++++++ .../kafka/core/DeserializingRegistry.kt | 2 +- .../shared/kafka/dto/CollectionReference.kt | 4 +- .../dto/events_result/ConvertWorkPerformed.kt | 15 ++ .../dto/events_result/ConvertWorkerRequest.kt | 2 +- 27 files changed, 721 insertions(+), 29 deletions(-) create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterEnv.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/Implementations.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/EventBasedProcessMessageListener.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/EventBasedMessageListener.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/ITaskCreatorListener.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt create mode 100644 shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt diff --git a/.idea/misc.xml b/.idea/misc.xml index 787ddf6c..79db1727 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,7 +1,9 @@ + - + + diff --git a/apps/converter/build.gradle.kts b/apps/converter/build.gradle.kts index d2ebe0bb..0c974238 100644 --- a/apps/converter/build.gradle.kts +++ b/apps/converter/build.gradle.kts @@ -20,22 +20,39 @@ repositories { } } +val exposedVersion = "0.44.0" dependencies { /*Spring boot*/ implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter:2.7.0") // implementation("org.springframework.kafka:spring-kafka:3.0.1") - implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + implementation("org.springframework.kafka:spring-kafka:2.8.5") + + + implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion") + implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion") + implementation ("mysql:mysql-connector-java:8.0.29") + + implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") + implementation("com.google.code.gson:gson:2.8.9") + implementation("org.json:json:20210307") + implementation(project(mapOf("path" to ":shared"))) implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation("no.iktdev.library:subtitle:1.7.7-SNAPSHOT") - implementation(project(mapOf("path" to ":shared:kafka"))) - implementation(project(mapOf("path" to ":shared"))) + + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") + implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") + implementation("com.github.pgreze:kotlin-process:1.4.1") implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:common"))) + implementation(project(mapOf("path" to ":shared:kafka"))) } tasks.test { diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt new file mode 100644 index 00000000..38f6136f --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ClaimsService.kt @@ -0,0 +1,36 @@ +package no.iktdev.mediaprocessing.converter + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.scheduling.annotation.EnableScheduling +import org.springframework.scheduling.annotation.Scheduled +import org.springframework.stereotype.Service + +@Service +@EnableScheduling +class ClaimsService() { + private val log = KotlinLogging.logger {} + + @Autowired + lateinit var coordinator: ConverterCoordinator + + @Scheduled(fixedDelay = (300_000)) + fun validateClaims() { + val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents() + expiredClaims.forEach { + log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" } + } + val store = PersistentDataStore() + expiredClaims.forEach { + val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId) + if (result) { + log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" } + } else { + log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" } + } + } + coordinator.readAllInQueue() + } +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt index d937c34a..79ce293d 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -1,5 +1,9 @@ package no.iktdev.mediaprocessing.converter +import kotlinx.coroutines.launch +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext @@ -13,6 +17,13 @@ fun getContext(): ApplicationContext? { return context } fun main(args: Array) { + val dataSource = MySqlDataSource.fromDatabaseEnv() + Coroutines.default().launch { + dataSource.createDatabase() + dataSource.createTables( + processerEvents + ) + } context = runApplication(*args) } //private val logger = KotlinLogging.logger {} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt new file mode 100644 index 00000000..f3581946 --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterCoordinator.kt @@ -0,0 +1,73 @@ +package no.iktdev.mediaprocessing.converter + +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import mu.KotlinLogging +import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener +import no.iktdev.mediaprocessing.shared.common.CoordinatorBase +import no.iktdev.mediaprocessing.shared.common.DatabaseConfig +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.stereotype.Service + +@Service +class ConverterCoordinator() : CoordinatorBase() { + + private val log = KotlinLogging.logger {} + + override val listeners: EventBasedProcessMessageListener = EventBasedProcessMessageListener() + override fun createTasksBasedOnEventsAndPersistence( + referenceId: String, + eventId: String, + messages: List + ) { + val triggeredMessage = messages.find { it.eventId == eventId } + if (triggeredMessage == null) { + log.error { "Could not find $eventId in provided messages" } + return + } + listeners.forwardEventMessageToListeners(triggeredMessage, messages) + } + + fun readAllMessagesFor(referenceId: String, eventId: String) { + val messages = PersistentDataReader().getAvailableProcessEvents() + createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages) + } + + fun readAllInQueue() { + val messages = PersistentDataReader().getAvailableProcessEvents() + io.launch { + messages.forEach { + delay(1000) + createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages) + } + } + } + + override fun onCoordinatorReady() { + log.info { "Converter Coordinator is ready" } + readAllInQueue() + } + + override fun onMessageReceived(event: DeserializedConsumerRecord>) { + if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) { + val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value) + if (!success) { + log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" } + } else { + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } + } else if (event.key == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED) { + readAllInQueue() + } else { + log.debug { "Skipping ${event.key}" } + } + //log.info { Gson().toJson(event.value) } + + } +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterEnv.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterEnv.kt new file mode 100644 index 00000000..8cf84efb --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterEnv.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.converter + +import no.iktdev.exfl.using +import java.io.File + +class ConverterEnv { + companion object { + val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false + val syncDialogs = System.getenv("SYNC_DIALOGS").toBoolean() + val outFormats: List = System.getenv("OUT_FORMATS")?.split(",")?.toList() ?: emptyList() + + val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else + File("data").using("logs", "convert") + } +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/Implementations.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/Implementations.kt new file mode 100644 index 00000000..ca82ce73 --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/Implementations.kt @@ -0,0 +1,16 @@ +package no.iktdev.mediaprocessing.converter + +import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import + +@Configuration +class SocketLocalInit: SocketImplementation() + +@Configuration +@Import(CoordinatorProducer::class, DefaultMessageListener::class) +class KafkaLocalInit: KafkaImplementation() { +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt new file mode 100644 index 00000000..5d11e19e --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt @@ -0,0 +1,55 @@ +package no.iktdev.mediaprocessing.converter.convert + +import no.iktdev.library.subtitle.Configuration +import no.iktdev.library.subtitle.Syncro +import no.iktdev.library.subtitle.classes.Dialog +import no.iktdev.library.subtitle.classes.DialogType +import no.iktdev.library.subtitle.export.Export +import no.iktdev.library.subtitle.reader.BaseReader +import no.iktdev.library.subtitle.reader.Reader +import no.iktdev.mediaprocessing.converter.ConverterEnv +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest +import java.io.File +import kotlin.jvm.Throws + +class Converter(val referenceId: String, val eventId: String, val data: ConvertWorkerRequest) { + + @Throws(FileUnavailableException::class) + private fun getReader(): BaseReader? { + val file = File(data.inputFile) + if (!file.canRead()) + throw FileUnavailableException("Can't open file for reading..") + return Reader(file).getSubtitleReader() + } + + private fun syncDialogs(input: List): List { + return if (ConverterEnv.syncDialogs) Syncro().sync(input) else input + } + + fun canRead(): Boolean { + try { + val reader = getReader() + return reader != null + } catch (e: FileUnavailableException) { + return false + } + } + + @Throws(FileUnavailableException::class, FileIsNullOrEmpty::class) + fun execute(): List { + val file = File(data.inputFile) + Configuration.exportJson = true + val read = getReader()?.read() ?: throw FileIsNullOrEmpty() + if (read.isEmpty()) + throw FileIsNullOrEmpty() + val filtered = read.filter { !it.ignore && it.type !in listOf(DialogType.SIGN_SONG, DialogType.CAPTION) } + val syncOrNotSync = syncDialogs(filtered) + + val exporter = Export(file, File(data.outDirectory), data.outFileBaseName) + return exporter.write(syncOrNotSync) + } + + + class FileIsNullOrEmpty(override val message: String? = "File read is null or empty"): RuntimeException() + class FileUnavailableException(override val message: String): RuntimeException() +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/EventBasedProcessMessageListener.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/EventBasedProcessMessageListener.kt new file mode 100644 index 00000000..1592bf14 --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/EventBasedProcessMessageListener.kt @@ -0,0 +1,30 @@ +package no.iktdev.mediaprocessing.converter.flow + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener +import no.iktdev.mediaprocessing.shared.common.tasks.Tasks + +class EventBasedProcessMessageListener: EventBasedMessageListener() { + override fun waitingListeners(events: List): List> { + val nonCreators = listeners + .filter { !events.map { e -> e.event } + .contains(it.producesEvent) } + return nonCreators + } + + override fun listenerWantingEvent(event: PersistentProcessDataMessage, waitingListeners: List>): List> { + return waitingListeners.filter { event.event in it.listensForEvents } + } + + override fun onForward( + event: PersistentProcessDataMessage, + history: List, + listeners: List> + ) { + listeners.forEach { + it.onEventReceived(referenceId = event.referenceId, event = event, events = history) + } + } + +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt new file mode 100644 index 00000000..64f34ba5 --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/flow/ProcesserTaskCreator.kt @@ -0,0 +1,29 @@ +package no.iktdev.mediaprocessing.converter.flow + +import no.iktdev.mediaprocessing.converter.ConverterCoordinator +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess + +abstract class ProcesserTaskCreator(coordinator: ConverterCoordinator): + TaskCreatorImpl(coordinator) { + + override fun isPrerequisiteEventsOk(events: List): Boolean { + val currentEvents = events.map { it.event } + return requiredEvents.all { currentEvents.contains(it) } + } + + override fun isPrerequisiteDataPresent(events: List): Boolean { + val failed = events + .filter { e -> e.event in requiredEvents } + .filter { !it.data.isSuccess() } + return failed.isEmpty() + } + + override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean { + return event.event == singleOne + } + +} \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt new file mode 100644 index 00000000..36934c8e --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -0,0 +1,159 @@ +package no.iktdev.mediaprocessing.converter.tasks + +import com.google.gson.Gson +import kotlinx.coroutines.delay +import kotlinx.coroutines.runBlocking +import mu.KotlinLogging +import no.iktdev.mediaprocessing.converter.ConverterCoordinator +import no.iktdev.mediaprocessing.converter.convert.Converter +import no.iktdev.mediaprocessing.converter.flow.ProcesserTaskCreator +import no.iktdev.mediaprocessing.shared.common.getComputername +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service +import java.util.* + +@Service +class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : ProcesserTaskCreator(coordinator) { + private val log = KotlinLogging.logger {} + val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" + + override val listensForEvents: List + get() = listOf( + KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED, + KafkaEvents.EVENT_WORK_CONVERT_CREATED + ) + override val producesEvent: KafkaEvents + get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED + + + fun getRequiredExtractProcessForContinuation(referenceId: String, requiresEventId: String): PersistentProcessDataMessage? { + return PersistentDataReader().getProcessEvent(referenceId, requiresEventId) + } + fun canConvert(extract: PersistentProcessDataMessage?): Boolean { + return extract?.consumed == true && extract.data.isSuccess() + } + + + override fun onProcessEvents( + event: PersistentProcessDataMessage, + events: List + ): MessageDataWrapper? { + if (event.data !is ConvertWorkerRequest) + return null + log.info { Gson().toJson(event) } + + val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId) + if (isAlreadyClaimed) { + log.warn { "Process is already claimed!" } + return null + } + + + val payload = event.data as ConvertWorkerRequest + val requiresEventId: String? = payload.requiresEventId + + val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) { + val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId) + if (existing == null) { + skipConvertEvent(event, requiresEventId) + return null + } + existing + } else null + + val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) { + Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload) + } else null + + + val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId) + if (!setClaim) { + return null + } + + if (converter == null || !converter.canRead()) { + // Make claim regardless but push to schedule + return null + } + + val result = try { + performConvert(converter) + } catch (e: Exception) { + SimpleMessageData(status = Status.ERROR, message = e.message) + } + + val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + runBlocking { + delay(1000) + if (!consumedIsSuccessful) { + PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId) + } + delay(1000) + var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + + while (!readbackIsSuccess) { + delay(1000) + readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId) + } + } + return result + } + + fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) { + val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}") + coordinator.producer.sendMessage( + referenceId = event.referenceId, + event = KafkaEvents.EVENT_WORK_CONVERT_SKIPPED, + data = producesPayload + ) + } + + fun performConvert(converter: Converter): ConvertWorkPerformed { + return try { + val result = converter.execute() + ConvertWorkPerformed( + status = Status.COMPLETED, + producedBy = serviceId, + derivedFromEventId = converter.eventId, + result = result.map { it.absolutePath } + ) + } catch (e: Converter.FileUnavailableException) { + e.printStackTrace() + ConvertWorkPerformed( + status = Status.ERROR, + message = e.message, + producedBy = serviceId, + derivedFromEventId = converter.eventId, + result = emptyList() + ) + } catch (e : Converter.FileIsNullOrEmpty) { + e.printStackTrace() + ConvertWorkPerformed( + status = Status.ERROR, + message = e.message, + producedBy = serviceId, + derivedFromEventId = converter.eventId, + result = emptyList() + ) + } + } + + + + data class PendingWorkerCache( + val referenceId: String, + val eventId: String, + val requiresEventId: String + ) +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt index c9e1f6c4..e80d3986 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt @@ -1,4 +1,4 @@ -package no.iktdev.streamit.content.encode +package no.iktdev.mediaprocessing.processer import no.iktdev.exfl.using import java.io.File @@ -7,10 +7,11 @@ class ProcesserEnv { companion object { val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false - val maxEncodeRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1} - val maxExtractRunners: Int = try {System.getenv("SIMULTANEOUS_EXTRACT_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1} val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else File("data").using("logs") + + val encodeLogDirectory = logDirectory.using("encode") + val extractLogDirectory = logDirectory.using("extract") } } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt index e994d2c8..3c3ff5bf 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt @@ -6,16 +6,16 @@ import com.google.gson.Gson import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.using import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.streamit.content.encode.ProcesserEnv +import no.iktdev.mediaprocessing.processer.ProcesserEnv import java.io.BufferedWriter import java.io.File import java.io.FileWriter -class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents) { +class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents, val logDir: File) { val scope = Coroutines.io() val decoder = FfmpegProgressDecoder() private val outputCache = mutableListOf() - val logFile = ProcesserEnv.logDirectory.using("$eventId-${File(info.outFile).nameWithoutExtension}.log") + val logFile = logDir.using("$eventId-${File(info.outFile).nameWithoutExtension}.log") val getOutputCache = outputCache.toList() @@ -54,7 +54,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe private suspend fun execute(args: List) { listener.onStarted(info) - val processOp = process(ProcesserEnv.ffmpeg, *args.toTypedArray(), + val processOp = process( + ProcesserEnv.ffmpeg, *args.toTypedArray(), stdout = Redirect.CAPTURE, stderr = Redirect.CAPTURE, consumer = { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 92df6da2..dfa76e55 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -16,7 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.streamit.content.encode.ProcesserEnv +import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.streamit.library.kafka.dto.Status import org.springframework.stereotype.Service import java.io.File @@ -26,6 +26,7 @@ import javax.annotation.PreDestroy @Service class EncodeService: TaskCreator() { private val log = KotlinLogging.logger {} + private val logDir = ProcesserEnv.encodeLogDirectory val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED @@ -74,14 +75,14 @@ class EncodeService: TaskCreator() { fun startEncode(event: PersistentProcessDataMessage) { val ffwrc = event.data as FfmpegWorkRequestCreated File(ffwrc.outFile).parentFile.mkdirs() - if (!ProcesserEnv.logDirectory.exists()) { - ProcesserEnv.logDirectory.mkdirs() + if (!logDir.exists()) { + logDir.mkdirs() } val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} encode" } - runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents) + runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") return @@ -161,7 +162,7 @@ class EncodeService: TaskCreator() { } fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) { - + // TODO: Implementation } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 2759a4ec..cc979aef 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -17,7 +17,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated -import no.iktdev.streamit.content.encode.ProcesserEnv +import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.streamit.library.kafka.dto.Status import org.springframework.stereotype.Service import java.io.File @@ -27,6 +27,8 @@ import javax.annotation.PreDestroy @Service class ExtractService: TaskCreator() { private val log = KotlinLogging.logger {} + private val logDir = ProcesserEnv.extractLogDirectory + val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED @@ -75,15 +77,15 @@ class ExtractService: TaskCreator() { fun startExtract(event: PersistentProcessDataMessage) { val ffwrc = event.data as FfmpegWorkRequestCreated File(ffwrc.outFile).parentFile.mkdirs() - if (!ProcesserEnv.logDirectory.exists()) { - ProcesserEnv.logDirectory.mkdirs() + if (!logDir.exists()) { + logDir.mkdirs() } val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} extract" } - runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents) + runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") diff --git a/shared/common/build.gradle.kts b/shared/common/build.gradle.kts index bf3f3c4d..82860e8f 100644 --- a/shared/common/build.gradle.kts +++ b/shared/common/build.gradle.kts @@ -30,6 +30,9 @@ dependencies { implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20230227") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + implementation("org.springframework.kafka:spring-kafka:2.8.5") + + implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0") implementation("org.jetbrains.exposed:exposed-core:$exposedVersion") implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion") diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt new file mode 100644 index 00000000..1ba607f8 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/CoordinatorBase.kt @@ -0,0 +1,37 @@ +package no.iktdev.mediaprocessing.shared.common + +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.beans.factory.annotation.Autowired +import javax.annotation.PostConstruct + +abstract class CoordinatorBase> { + abstract val listeners: L + + val io = Coroutines.io() + + @Autowired + lateinit var producer: CoordinatorProducer + + @Autowired + private lateinit var listener: DefaultMessageListener + + abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List) + + abstract fun onCoordinatorReady() + abstract fun onMessageReceived(event: DeserializedConsumerRecord>) + @PostConstruct + fun onInitializationCompleted() { + onCoordinatorReady() + listener.onMessageReceived = { event -> onMessageReceived(event)} + listener.listen(KafkaEnv.kafkaTopic) + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 7f2fd755..a68f14f6 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -29,4 +29,11 @@ suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, elapsedDelay += delayed delay(delayed) } while (condition.invoke() && elapsedDelay < maxDuration) +} + +fun getComputername(): String { + return listOfNotNull( + System.getenv("hostname"), + System.getenv("computername") + ).firstOrNull() ?: "UNKNOWN_SYSTEM" } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index f326770b..4d89540f 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -75,6 +75,16 @@ class PersistentDataReader { return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline } } + fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? { + val message = withTransaction { + processerEvents.select { + (processerEvents.referenceId eq referenceId) and + (processerEvents.eventId eq eventId) + }.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } + }?.singleOrNull() + return message + } + fun getProcessEvents(): List { return withTransaction { processerEvents.selectAll() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 4166b36a..50c42ed8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -6,18 +6,15 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.jetbrains.exposed.sql.ResultRow import java.time.LocalDateTime + data class PersistentMessage( val referenceId: String, val eventId: String, val event: KafkaEvents, - //val metadata: Metadata, val data: MessageDataWrapper, val created: LocalDateTime ) -data class Metadata( - val createdBy: String -) fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { return this.event == event diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/EventBasedMessageListener.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/EventBasedMessageListener.kt new file mode 100644 index 00000000..33b7d15f --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/EventBasedMessageListener.kt @@ -0,0 +1,67 @@ +package no.iktdev.mediaprocessing.shared.common.tasks + +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents + +abstract class EventBasedMessageListener { + val listeners: MutableList> = mutableListOf() + + fun add(produces: KafkaEvents, listener: ITaskCreatorListener) { + listeners.add(Tasks(producesEvent = produces, taskHandler = listener)) + } + + fun add(task: Tasks) { + listeners.add(task) + } + + /** + * Example implementation + * + * fun waitingListeners(events: List): List { + * val nonCreators = listeners + * .filter { !events.map { e -> e.event } + * .contains(it.producesEvent) } + * return nonCreators + * } + */ + abstract fun waitingListeners(events: List): List> + + /** + * Example implementation + * + * fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List) + * : List + * { + * return waitingListeners.filter { event.event in it.listensForEvents } + * } + */ + abstract fun listenerWantingEvent(event: V, waitingListeners: List>): List> + + /** + * Send to taskHandler + */ + abstract fun onForward(event: V, history: List, listeners: List>) + + /** + * This will be called in sequence, thus some messages might be made a duplicate of. + */ + fun forwardEventMessageToListeners(newEvent: V, events: List) { + val waitingListeners = waitingListeners(events) + val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners) + onForward(event = newEvent, history = events, listeners = availableListeners.map { it.taskHandler }) + } + + /** + * This will be called with all messages at once, thus it should reflect kafka topic and database + */ + fun forwardBatchEventMessagesToListeners(events: List) { + val waitingListeners = waitingListeners(events) + onForward(event = events.last(), history = events, waitingListeners.map { it.taskHandler }) + } + +} + +data class Tasks( + val producesEvent: KafkaEvents, + val listensForEvents: List = listOf(), + val taskHandler: ITaskCreatorListener +) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/ITaskCreatorListener.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/ITaskCreatorListener.kt new file mode 100644 index 00000000..cf5bd930 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/ITaskCreatorListener.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.common.tasks + + +interface ITaskCreatorListener { + fun onEventReceived(referenceId: String, event: V, events: List): Unit +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt new file mode 100644 index 00000000..9d6c3d73 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -0,0 +1,104 @@ +package no.iktdev.mediaprocessing.shared.common.tasks + +import no.iktdev.mediaprocessing.shared.common.CoordinatorBase +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.beans.factory.annotation.Autowired +import javax.annotation.PostConstruct + +abstract class TaskCreatorImpl, V, L : EventBasedMessageListener>( + open var coordinator: C +) : ITaskCreatorListener { + + // Event that the implementer sets + abstract val producesEvent: KafkaEvents + + open val requiredEvents: List = listOf() + open val listensForEvents: List = listOf() + + @Autowired + lateinit var producer: CoordinatorProducer + fun getListener(): Tasks { + val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } + return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) + } + @PostConstruct + fun attachListener() { + coordinator.listeners.add(getListener()) + } + + + /** + * Example implementation + * + * open fun isPrerequisiteEventsOk(events: List): Boolean { + * val currentEvents = events.map { it.event } + * return requiredEvents.all { currentEvents.contains(it) } + * } + * + */ + abstract fun isPrerequisiteEventsOk(events: List): Boolean + + /** + * Example implementation + * + * open fun isPrerequisiteDataPresent(events: List): Boolean { + * val failed = events + * .filter { e -> e.event in requiredEvents } + * .filter { !it.data.isSuccess() } + * return failed.isEmpty() + * } + */ + abstract fun isPrerequisiteDataPresent(events: List): Boolean + + /** + * Example implementation + * + * open fun isEventOfSingle(event: V, singleOne: KafkaEvents): Boolean { + * return event.event == singleOne + * } + */ + abstract fun isEventOfSingle(event: V, singleOne: KafkaEvents): Boolean + + open fun prerequisitesRequired(events: List): List<() -> Boolean> { + return listOf { + isPrerequisiteEventsOk(events) + } + } + + open fun prerequisiteRequired(event: V): List<() -> Boolean> { + return listOf() + } + + private val context: MutableMap = mutableMapOf() + private val context_key_reference = "reference" + private val context_key_producesEvent = "event" + final override fun onEventReceived(referenceId: String, event: V, events: List) { + context[context_key_reference] = referenceId + getListener().producesEvent.let { + context[context_key_producesEvent] = it + } + + if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) { + val result = onProcessEvents(event, events) + if (result != null) { + onResult(result) + } + } else { + // TODO: Re-enable this + // log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" } + } + } + + private fun onResult(data: MessageDataWrapper) { + producer.sendMessage( + referenceId = context[context_key_reference] as String, + event = context[context_key_producesEvent] as KafkaEvents, + data = data + ) + } + + abstract fun onProcessEvents(event: V, events: List): MessageDataWrapper? + +} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index e88c0f8f..968372c0 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -27,7 +27,7 @@ class DeserializingRegistry { KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java, KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java, - KafkaEvents.EVENT_WORK_CONVERT_CREATED to null, + KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java, KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java, diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/CollectionReference.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/CollectionReference.kt index 453d6543..cd166be1 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/CollectionReference.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/CollectionReference.kt @@ -4,6 +4,4 @@ import java.util.* open class CollectionReference( @Transient open val referenceId: String = UUID.randomUUID().toString(), -) { - -} \ No newline at end of file +) {} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt new file mode 100644 index 00000000..9254d13e --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.shared.kafka.dto.events_result + +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.streamit.library.kafka.dto.Status + +@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED) +data class ConvertWorkPerformed( + override val status: Status, + override val message: String? = null, + val producedBy: String, + val derivedFromEventId: String, + val result: List +): MessageDataWrapper(status, message) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt index 90931a9d..1f5de38c 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt @@ -6,7 +6,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED) data class ConvertWorkerRequest( - val requiresEventId: String, + val requiresEventId: String? = null, val inputFile: String, val allowOverwrite: Boolean, val outFileBaseName: String,