Renaming events

This commit is contained in:
Brage 2024-03-25 15:21:06 +01:00
parent d6958ff0ce
commit 3e8924af8f
24 changed files with 168 additions and 63 deletions

View File

@ -8,8 +8,8 @@ import no.iktdev.library.subtitle.export.Export
import no.iktdev.library.subtitle.reader.BaseReader import no.iktdev.library.subtitle.reader.BaseReader
import no.iktdev.library.subtitle.reader.Reader import no.iktdev.library.subtitle.reader.Reader
import no.iktdev.mediaprocessing.converter.ConverterEnv import no.iktdev.mediaprocessing.converter.ConverterEnv
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.SubtitleFormats
import java.io.File import java.io.File
import kotlin.jvm.Throws import kotlin.jvm.Throws

View File

@ -12,17 +12,14 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.* import no.iktdev.mediaprocessing.shared.kafka.dto.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.UUID import java.util.UUID
import javax.annotation.PostConstruct
@Service @Service
class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() { class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMessageListener>() {
@ -75,12 +72,23 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
private val forwarder = Forwarder() private val forwarder = Forwarder()
public fun startProcess(file: File, type: ProcessType) { public fun startProcess(file: File, type: ProcessType) {
val processStartEvent = ProcessStarted( val processStartEvent = MediaProcessStarted(
status = Status.COMPLETED, status = Status.COMPLETED,
file = file.absolutePath, file = file.absolutePath,
type = type type = type
) )
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_PROCESS_STARTED, processStartEvent) producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, processStartEvent)
}
public fun startRequestProcess(file: File, operations: List<RequestStartOperationEvents>): UUID {
val referenceId: UUID = UUID.randomUUID()
val start = RequestProcessStarted(
status = Status.COMPLETED,
file = file.absolutePath,
operations = operations
)
producer.sendMessage(referenceId = referenceId.toString(), KafkaEvents.EVENT_REQUEST_PROCESS_STARTED, start)
return referenceId
} }
fun readAllUncompletedMessagesInQueue() { fun readAllUncompletedMessagesInQueue() {
@ -137,8 +145,8 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} }
} }
fun getProcessStarted(messages: List<PersistentMessage>): ProcessStarted? { fun getProcessStarted(messages: List<PersistentMessage>): MediaProcessStarted? {
return messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted
} }
suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List<PersistentMessage>) { suspend fun buildModelBasedOnMessagesFor(referenceId: String, messages: List<PersistentMessage>) {

View File

@ -0,0 +1,5 @@
package no.iktdev.mediaprocessing.coordinator
class RequestHandler {
}

View File

@ -0,0 +1,36 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.contract.dto.ConvertRequest
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import java.io.File
@Controller
@RequestMapping(path = ["/request"])
class RequestEventController(@Autowired var coordinator: Coordinator) {
@PostMapping("/convert")
@ResponseStatus(HttpStatus.OK)
fun requestConvert(@RequestBody convert: ConvertRequest): ResponseEntity<String> {
try {
val file = File(convert.file)
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(convert.file)
}
val referenceId = coordinator.startRequestProcess(file, listOf(RequestStartOperationEvents.CONVERT))
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Gson().toJson(convert))
}
return ResponseEntity.ok(null)
}
}

View File

@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
@ -23,8 +23,8 @@ class ProcessMapping(val events: List<PersistentMessage>) {
) )
} }
fun getProcessStarted(): ProcessStarted? { fun getProcessStarted(): MediaProcessStarted? {
return events.lastOrNull { it.data is ProcessStarted }?.data as ProcessStarted? return events.lastOrNull { it.data is MediaProcessStarted }?.data as MediaProcessStarted?
} }

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccess
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -11,7 +10,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -24,7 +23,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
override val producesEvent: KafkaEvents override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED
override val requiredEvents: List<KafkaEvents> = listOf(KafkaEvents.EVENT_PROCESS_STARTED) override val requiredEvents: List<KafkaEvents> = listOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> { override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
@ -35,11 +34,11 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_PROCESS_STARTED) ?: return null val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null
return readFileInfo(selected.data as ProcessStarted) return readFileInfo(selected.data as MediaProcessStarted)
} }
fun readFileInfo(started: ProcessStarted): MessageDataWrapper { fun readFileInfo(started: MediaProcessStarted): MessageDataWrapper {
val result = try { val result = try {
val fileName = File(started.file).nameWithoutExtension val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName) val fileNameParser = FileNameParser(fileName)

View File

@ -32,16 +32,16 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_PROCESS_STARTED, EVENT_MEDIA_PROCESS_STARTED,
EVENT_PROCESS_COMPLETED EVENT_MEDIA_PROCESS_COMPLETED
) )
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null
val completed = events.lastOrSuccessOf(EVENT_PROCESS_COMPLETED) ?: return null val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null
if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) {
return null return null
} }

View File

@ -16,13 +16,13 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service
class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_PROCESS_STARTED, EVENT_MEDIA_PROCESS_STARTED,
EVENT_MEDIA_READ_BASE_INFO_PERFORMED, EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
EVENT_MEDIA_READ_OUT_NAME_AND_TYPE EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
) )
@ -31,7 +31,7 @@ class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreat
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null
if (!started.data.isSuccess()) { if (!started.data.isSuccess()) {
return null return null
} }

View File

@ -4,7 +4,6 @@ import com.google.gson.Gson
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccess
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
@ -17,7 +16,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service

View File

@ -13,7 +13,7 @@ import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
@ -29,7 +29,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_PROCESS_STARTED KafkaEvents.EVENT_MEDIA_PROCESS_STARTED
) )
@ -42,11 +42,11 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.find { it.data is ProcessStarted } ?: return null val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
return runBlocking { fileReadStreams(desiredEvent.data as ProcessStarted) } return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) }
} }
suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { suspend fun fileReadStreams(started: MediaProcessStarted): MessageDataWrapper {
val file = File(started.file) val file = File(started.file)
return if (file.exists() && file.isFile) { return if (file.exists() && file.isFile) {
val result = readStreams(file) val result = readStreams(file)

View File

@ -1,12 +1,13 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import com.google.gson.Gson
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -27,7 +28,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
override val requiredEvents: List<KafkaEvents> = override val requiredEvents: List<KafkaEvents> =
listOf( listOf(
KafkaEvents.EVENT_PROCESS_STARTED, KafkaEvents.EVENT_MEDIA_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
@ -41,8 +42,13 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? { override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" }
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) {
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" }
return null
}
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted val inputFile = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed? val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed?
val serializedParsedStreams = readStreamsEvent?.streams val serializedParsedStreams = readStreamsEvent?.streams

View File

@ -1,13 +1,14 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import com.google.gson.Gson
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.* import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.ExtractArgumentCreatorTask.SubtitleArguments.SubtitleType.*
import no.iktdev.mediaprocessing.shared.common.Preference import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleArgumentsDto
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
@ -30,7 +31,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
override val requiredEvents: List<KafkaEvents> = listOf( override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_PROCESS_STARTED, KafkaEvents.EVENT_MEDIA_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
@ -49,8 +50,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
log.info { "${this.javaClass.simpleName} ignores ${event.event} @ ${event.eventId}" } log.info { "${this.javaClass.simpleName} ignores ${event.event} @ ${event.eventId}" }
return null return null
} }
val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
if (!started.operations.contains(ProcessStartOperationEvents.EXTRACT)) {
log.info { "Couldn't find operation event ${ProcessStartOperationEvents.EXTRACT} in ${Gson().toJson(started.operations)}\n\tExtract Arguments will not be created" }
return null
}
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted val inputFile = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed
val serializedParsedStreams = readStreamsEvent.streams val serializedParsedStreams = readStreamsEvent.streams

View File

@ -24,9 +24,10 @@ kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents"
suppress_ignore: List[str] = [ suppress_ignore: List[str] = [
"event:process:started", "event:media-process:started",
"event:request-process:started",
"event::save", "event::save",
"event:process:completed", "event:media-process:completed",
"event:work-encode:created", "event:work-encode:created",
"event:work-extract:created", "event:work-extract:created",
"event:work-convert:created", "event:work-convert:created",

View File

@ -29,7 +29,7 @@ class PersistentDataReader {
fun getUncompletedMessages(): List<List<PersistentMessage>> { fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withDirtyRead { val result = withDirtyRead {
events.selectAll() events.selectAll()
.andWhere { events.event neq KafkaEvents.EVENT_PROCESS_COMPLETED.event } .andWhere { events.event neq KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED.event }
.groupBy { it[events.referenceId] } .groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } .mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } }
} ?: emptyList() } ?: emptyList()

View File

@ -20,8 +20,9 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
@Autowired @Autowired
lateinit var producer: CoordinatorProducer lateinit var producer: CoordinatorProducer
fun getListener(): Tasks<V> { fun getListener(): Tasks<V> {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents } val reactableEvents: Set<KafkaEvents> = requiredEvents.toSet() + listensForEvents.toSet()
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter) //val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents.toList())
} }
@PostConstruct @PostConstruct
fun attachListener() { fun attachListener() {

View File

@ -0,0 +1,6 @@
package no.iktdev.mediaprocessing.shared.contract.dto
data class ConvertRequest(
val file: String, // FullPath
val formats: List<SubtitleFormats>
)

View File

@ -0,0 +1,19 @@
package no.iktdev.mediaprocessing.shared.contract.dto
enum class SubtitleFormats {
ASS,
SRT,
VTT,
SMI
}
enum class ProcessStartOperationEvents {
ENCODE,
EXTRACT,
CONVERT
}
enum class RequestStartOperationEvents {
CONVERT
}

View File

@ -15,7 +15,7 @@ class DeserializingRegistry {
companion object { companion object {
val deserializables = mutableMapOf( val deserializables = mutableMapOf(
KafkaEvents.EVENT_PROCESS_STARTED to ProcessStarted::class.java, KafkaEvents.EVENT_MEDIA_PROCESS_STARTED to MediaProcessStarted::class.java,
KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class.java, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class.java,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java,
@ -40,7 +40,7 @@ class DeserializingRegistry {
KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null,
KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null, KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null,
KafkaEvents.EVENT_PROCESS_COMPLETED to ProcessCompleted::class.java KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java
) )
} }

View File

@ -1,7 +1,8 @@
package no.iktdev.mediaprocessing.shared.kafka.core package no.iktdev.mediaprocessing.shared.kafka.core
enum class KafkaEvents(val event: String) { enum class KafkaEvents(val event: String) {
EVENT_PROCESS_STARTED("event:process:started"), EVENT_MEDIA_PROCESS_STARTED("event:media-process:started"),
EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"),
EVENT_MEDIA_READ_STREAM_PERFORMED("event:media-read-stream:performed"), EVENT_MEDIA_READ_STREAM_PERFORMED("event:media-read-stream:performed"),
EVENT_MEDIA_PARSE_STREAM_PERFORMED("event:media-parse-stream:performed"), EVENT_MEDIA_PARSE_STREAM_PERFORMED("event:media-parse-stream:performed"),
@ -35,7 +36,8 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
EVENT_PROCESS_COMPLETED("event:process:completed"), EVENT_MEDIA_PROCESS_COMPLETED("event:media-process:completed"),
EVENT_REQUEST_PROCESS_COMPLETED("event:request-process:completed"),
EVENT_COLLECT_AND_STORE("event::save"); EVENT_COLLECT_AND_STORE("event::save");
companion object { companion object {

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -14,11 +15,4 @@ data class ConvertWorkerRequest(
val outFileBaseName: String, val outFileBaseName: String,
val outDirectory: String, val outDirectory: String,
val outFormats: List<SubtitleFormats> = listOf() val outFormats: List<SubtitleFormats> = listOf()
): MessageDataWrapper(status) ): MessageDataWrapper(status)
enum class SubtitleFormats {
ASS,
SRT,
VTT,
SMI
}

View File

@ -1,14 +1,20 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_STARTED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED)
data class ProcessStarted( data class MediaProcessStarted(
override val status: Status, override val status: Status,
val type: ProcessType = ProcessType.FLOW, val type: ProcessType = ProcessType.FLOW,
val operations: List<ProcessStartOperationEvents> = listOf(
ProcessStartOperationEvents.ENCODE,
ProcessStartOperationEvents.EXTRACT,
ProcessStartOperationEvents.CONVERT
),
val file: String // AbsolutePath val file: String // AbsolutePath
) : MessageDataWrapper(status) ) : MessageDataWrapper(status)

View File

@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_COMPLETED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED)
data class ProcessCompleted( data class ProcessCompleted(
override val status: Status override val status: Status
) : MessageDataWrapper(status) { ) : MessageDataWrapper(status) {

View File

@ -0,0 +1,18 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_REQUEST_PROCESS_STARTED)
data class RequestProcessStarted(
override val status: Status,
val operations: List<RequestStartOperationEvents> = listOf(
RequestStartOperationEvents.CONVERT
),
val file: String // AbsolutePath
) : MessageDataWrapper(status)

View File

@ -6,7 +6,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
@ -20,7 +20,7 @@ class SerializationTest {
val message = Message( val message = Message(
"d2fb1472-ebdd-4fce-9ffd-7202a1ad911d", "d2fb1472-ebdd-4fce-9ffd-7202a1ad911d",
"01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e", "01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e",
data = ProcessStarted( data = MediaProcessStarted(
Status.COMPLETED, Status.COMPLETED,
ProcessType.MANUAL, ProcessType.MANUAL,
file = "Potato.mp4" file = "Potato.mp4"
@ -28,8 +28,8 @@ class SerializationTest {
val json = gson.toJson(message) val json = gson.toJson(message)
val deserializer = DeserializingRegistry() val deserializer = DeserializingRegistry()
val result = deserializer.deserialize(KafkaEvents.EVENT_PROCESS_STARTED, json) val result = deserializer.deserialize(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, json)
assertThat(result.data).isInstanceOf(ProcessStarted::class.java) assertThat(result.data).isInstanceOf(MediaProcessStarted::class.java)
} }