diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt index c4f95680..f8fe8732 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/convert/Converter.kt @@ -8,8 +8,8 @@ import no.iktdev.library.subtitle.export.Export import no.iktdev.library.subtitle.reader.BaseReader import no.iktdev.library.subtitle.reader.Reader import no.iktdev.mediaprocessing.converter.ConverterEnv +import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.SubtitleFormats import java.io.File import kotlin.jvm.Throws diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index d63ac995..7c6d4be3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -12,17 +12,14 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* -import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.util.UUID -import javax.annotation.PostConstruct @Service class Coordinator() : CoordinatorBase() { @@ -75,12 +72,23 @@ class Coordinator() : CoordinatorBase): UUID { + val referenceId: UUID = UUID.randomUUID() + val start = RequestProcessStarted( + status = Status.COMPLETED, + file = file.absolutePath, + operations = operations + ) + producer.sendMessage(referenceId = referenceId.toString(), KafkaEvents.EVENT_REQUEST_PROCESS_STARTED, start) + return referenceId } fun readAllUncompletedMessagesInQueue() { @@ -137,8 +145,8 @@ class Coordinator() : CoordinatorBase): ProcessStarted? { - return messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted + fun getProcessStarted(messages: List): MediaProcessStarted? { + return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted } suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RequestHandler.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RequestHandler.kt new file mode 100644 index 00000000..f0420a51 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RequestHandler.kt @@ -0,0 +1,5 @@ +package no.iktdev.mediaprocessing.coordinator + + +class RequestHandler { +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt new file mode 100644 index 00000000..a43b23c0 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/RequestEventController.kt @@ -0,0 +1,36 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import com.google.gson.Gson +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest +import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.HttpStatus +import org.springframework.http.ResponseEntity +import org.springframework.stereotype.Controller +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.ResponseStatus +import java.io.File + +@Controller +@RequestMapping(path = ["/request"]) +class RequestEventController(@Autowired var coordinator: Coordinator) { + + @PostMapping("/convert") + @ResponseStatus(HttpStatus.OK) + fun requestConvert(@RequestBody convert: ConvertRequest): ResponseEntity { + try { + val file = File(convert.file) + if (!file.exists()) { + return ResponseEntity.status(HttpStatus.NO_CONTENT).body(convert.file) + } + val referenceId = coordinator.startRequestProcess(file, listOf(RequestStartOperationEvents.CONVERT)) + + } catch (e: Exception) { + return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Gson().toJson(convert)) + } + return ResponseEntity.ok(null) + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt index e1648c6a..d32a357c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt @@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.mapping import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess @@ -23,8 +23,8 @@ class ProcessMapping(val events: List) { ) } - fun getProcessStarted(): ProcessStarted? { - return events.lastOrNull { it.data is ProcessStarted }?.data as ProcessStarted? + fun getProcessStarted(): MediaProcessStarted? { + return events.lastOrNull { it.data is MediaProcessStarted }?.data as MediaProcessStarted? } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index bd2e92b5..d2332be4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator -import no.iktdev.mediaprocessing.shared.common.lastOrSuccess import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -11,7 +10,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @@ -24,7 +23,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED - override val requiredEvents: List = listOf(KafkaEvents.EVENT_PROCESS_STARTED) + override val requiredEvents: List = listOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) override fun prerequisitesRequired(events: List): List<() -> Boolean> { @@ -35,11 +34,11 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } - val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_PROCESS_STARTED) ?: return null - return readFileInfo(selected.data as ProcessStarted) + val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null + return readFileInfo(selected.data as MediaProcessStarted) } - fun readFileInfo(started: ProcessStarted): MessageDataWrapper { + fun readFileInfo(started: MediaProcessStarted): MessageDataWrapper { val result = try { val fileName = File(started.file).nameWithoutExtension val fileNameParser = FileNameParser(fileName) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index 5b3adf4d..80301b58 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -32,16 +32,16 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val requiredEvents: List = listOf( - EVENT_PROCESS_STARTED, - EVENT_PROCESS_COMPLETED + EVENT_MEDIA_PROCESS_STARTED, + EVENT_MEDIA_PROCESS_COMPLETED ) override val listensForEvents: List = KafkaEvents.entries override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null - val completed = events.lastOrSuccessOf(EVENT_PROCESS_COMPLETED) ?: return null + val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null + val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt similarity index 90% rename from apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt rename to apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt index 04d6a181..ae86ab0c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteMediaTask.kt @@ -16,13 +16,13 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service @Service -class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { +class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val log = KotlinLogging.logger {} - override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED + override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED override val requiredEvents: List = listOf( - EVENT_PROCESS_STARTED, + EVENT_MEDIA_PROCESS_STARTED, EVENT_MEDIA_READ_BASE_INFO_PERFORMED, EVENT_MEDIA_READ_OUT_NAME_AND_TYPE ) @@ -31,7 +31,7 @@ class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreat override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null + val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null if (!started.data.isSuccess()) { return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index 0c0a57c5..e1f55c5a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -4,7 +4,6 @@ import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator -import no.iktdev.mediaprocessing.shared.common.lastOrSuccess import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream @@ -17,7 +16,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index a5995f10..6c49731c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -13,7 +13,7 @@ import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.springframework.beans.factory.annotation.Autowired @@ -29,7 +29,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED override val requiredEvents: List = listOf( - KafkaEvents.EVENT_PROCESS_STARTED + KafkaEvents.EVENT_MEDIA_PROCESS_STARTED ) @@ -42,11 +42,11 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } - val desiredEvent = events.find { it.data is ProcessStarted } ?: return null - return runBlocking { fileReadStreams(desiredEvent.data as ProcessStarted) } + val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null + return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) } } - suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { + suspend fun fileReadStreams(started: MediaProcessStarted): MessageDataWrapper { val file = File(started.file) return if (file.exists() && file.isFile) { val result = readStreams(file) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 9e26b4f5..565dd7da 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -1,12 +1,13 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.Preference -import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -27,7 +28,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator override val requiredEvents: List = listOf( - KafkaEvents.EVENT_PROCESS_STARTED, + KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE @@ -41,8 +42,13 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted + if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) { + log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" } + return null + } - val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted + val inputFile = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed? val serializedParsedStreams = readStreamsEvent?.streams diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index 6fddfbd4..cb50775f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -1,13 +1,14 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg +import com.google.gson.Gson import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.* import no.iktdev.mediaprocessing.shared.common.Preference -import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream @@ -30,7 +31,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED override val requiredEvents: List = listOf( - KafkaEvents.EVENT_PROCESS_STARTED, + KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE @@ -49,8 +50,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato log.info { "${this.javaClass.simpleName} ignores ${event.event} @ ${event.eventId}" } return null } + val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted + if (!started.operations.contains(ProcessStartOperationEvents.EXTRACT)) { + log.info { "Couldn't find operation event ${ProcessStartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" } + return null + } - val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted + val inputFile = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed val serializedParsedStreams = readStreamsEvent.streams diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index ea4f662e..ea9a284d 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -24,9 +24,10 @@ kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents" suppress_ignore: List[str] = [ - "event:process:started", + "event:media-process:started", + "event:request-process:started", "event::save", - "event:process:completed", + "event:media-process:completed", "event:work-encode:created", "event:work-extract:created", "event:work-convert:created", diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt index f8e4e7dd..5ce7ffdb 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -29,7 +29,7 @@ class PersistentDataReader { fun getUncompletedMessages(): List> { val result = withDirtyRead { events.selectAll() - .andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event } + .andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event } .groupBy { it[events.referenceId] } .mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } } ?: emptyList() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt index 9d6c3d73..34d5da64 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/tasks/TaskCreatorImpl.kt @@ -20,8 +20,9 @@ abstract class TaskCreatorImpl, V, L : EventBasedMessa @Autowired lateinit var producer: CoordinatorProducer fun getListener(): Tasks { - val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } - return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) + val reactableEvents: Set = requiredEvents.toSet() + listensForEvents.toSet() + //val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } + return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents.toList()) } @PostConstruct fun attachListener() { diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt new file mode 100644 index 00000000..4dafcf7d --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConvertRequest.kt @@ -0,0 +1,6 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +data class ConvertRequest( + val file: String, // FullPath + val formats: List +) \ No newline at end of file diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt new file mode 100644 index 00000000..562f1d73 --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + + +enum class SubtitleFormats { + ASS, + SRT, + VTT, + SMI +} + +enum class ProcessStartOperationEvents { + ENCODE, + EXTRACT, + CONVERT +} + +enum class RequestStartOperationEvents { + CONVERT +} \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index 63877af6..eec256e3 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -15,7 +15,7 @@ class DeserializingRegistry { companion object { val deserializables = mutableMapOf( - KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class.java, + KafkaEvents.EVENT_MEDIA_PROCESS_STARTED to MediaProcessStarted::class.java, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class.java, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java, @@ -40,7 +40,7 @@ class DeserializingRegistry { KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null, KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null, - KafkaEvents.EVENT_PROCESS_COMPLETED to ProcessCompleted::class.java + KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java ) } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 87db5aea..09cc5fd2 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -1,7 +1,8 @@ package no.iktdev.mediaprocessing.shared.kafka.core enum class KafkaEvents(val event: String) { - EVENT_PROCESS_STARTED("event:process:started"), + EVENT_MEDIA_PROCESS_STARTED("event:media-process:started"), + EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"), EVENT_MEDIA_READ_STREAM_PERFORMED("event:media-read-stream:performed"), EVENT_MEDIA_PARSE_STREAM_PERFORMED("event:media-parse-stream:performed"), @@ -35,7 +36,8 @@ enum class KafkaEvents(val event: String) { EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), - EVENT_PROCESS_COMPLETED("event:process:completed"), + EVENT_MEDIA_PROCESS_COMPLETED("event:media-process:completed"), + EVENT_REQUEST_PROCESS_COMPLETED("event:request-process:completed"), EVENT_COLLECT_AND_STORE("event::save"); companion object { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt index 3e9f5205..38d32b95 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result +import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -14,11 +15,4 @@ data class ConvertWorkerRequest( val outFileBaseName: String, val outDirectory: String, val outFormats: List = listOf() -): MessageDataWrapper(status) - -enum class SubtitleFormats { - ASS, - SRT, - VTT, - SMI -} \ No newline at end of file +): MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt similarity index 58% rename from shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt rename to shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt index 5cd48614..de05cebd 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaProcessStarted.kt @@ -1,14 +1,20 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_STARTED) -data class ProcessStarted( +@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) +data class MediaProcessStarted( override val status: Status, val type: ProcessType = ProcessType.FLOW, + val operations: List = listOf( + ProcessStartOperationEvents.ENCODE, + ProcessStartOperationEvents.EXTRACT, + ProcessStartOperationEvents.CONVERT + ), val file: String // AbsolutePath ) : MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt index f53662ae..f356c213 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt @@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.Status -@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_COMPLETED) +@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED) data class ProcessCompleted( override val status: Status ) : MessageDataWrapper(status) { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt new file mode 100644 index 00000000..1392f722 --- /dev/null +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/RequestProcessStarted.kt @@ -0,0 +1,18 @@ +package no.iktdev.mediaprocessing.shared.kafka.dto.events_result + +import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents +import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.Status + +@KafkaBelongsToEvent(KafkaEvents.EVENT_REQUEST_PROCESS_STARTED) +data class RequestProcessStarted( + override val status: Status, + val operations: List = listOf( + RequestStartOperationEvents.CONVERT + ), + val file: String // AbsolutePath +) : MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt index ea6716d1..71d96c53 100644 --- a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt +++ b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt @@ -6,7 +6,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.junit.jupiter.api.Test import org.assertj.core.api.Assertions.assertThat @@ -20,7 +20,7 @@ class SerializationTest { val message = Message( "d2fb1472-ebdd-4fce-9ffd-7202a1ad911d", "01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e", - data = ProcessStarted( + data = MediaProcessStarted( Status.COMPLETED, ProcessType.MANUAL, file = "Potato.mp4" @@ -28,8 +28,8 @@ class SerializationTest { val json = gson.toJson(message) val deserializer = DeserializingRegistry() - val result = deserializer.deserialize(KafkaEvents.EVENT_PROCESS_STARTED, json) - assertThat(result.data).isInstanceOf(ProcessStarted::class.java) + val result = deserializer.deserialize(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, json) + assertThat(result.data).isInstanceOf(MediaProcessStarted::class.java) }