Changed work creation

This commit is contained in:
Brage 2024-03-25 23:25:23 +01:00
parent 876d900e9b
commit 07111f7b98
8 changed files with 147 additions and 8 deletions

View File

@ -53,7 +53,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} }
listeners.forwardEventMessageToListeners(triggered, messages) listeners.forwardEventMessageToListeners(triggered, messages)
if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) { /*if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(messages)) {
if (getProcessStarted(messages)?.type == ProcessType.FLOW) { if (getProcessStarted(messages)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents( forwarder.produceAllMissingProcesserEvents(
producer = producer, producer = producer,
@ -62,14 +62,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} else { } else {
log.info { "Process for $referenceId was started manually and will require user input for continuation" } log.info { "Process for $referenceId was started manually and will require user input for continuation" }
} }
} }*/
} }
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
override val listeners = PersistentEventBasedMessageListener() override val listeners = PersistentEventBasedMessageListener()
private val forwarder = Forwarder() //private val forwarder = Forwarder()
public fun startProcess(file: File, type: ProcessType) { public fun startProcess(file: File, type: ProcessType) {
val processStartEvent = MediaProcessStarted( val processStartEvent = MediaProcessStarted(
@ -101,14 +101,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
} }
if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(it)) { /*if (forwarder.hasAnyRequiredEventToCreateProcesserEvents(it)) {
if (getProcessStarted(it)?.type == ProcessType.FLOW) { if (getProcessStarted(it)?.type == ProcessType.FLOW) {
forwarder.produceAllMissingProcesserEvents( forwarder.produceAllMissingProcesserEvents(
producer = producer, producer = producer,
messages = it messages = it
) )
} }
} }*/
} }
} }
} }
@ -157,7 +157,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
class Forwarder() { /*class Forwarder() {
val forwardOnEventReceived = listOf( val forwardOnEventReceived = listOf(
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
) )
@ -262,7 +262,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
} }
} }
} }
} }*/
} }

View File

@ -0,0 +1,48 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_CONVERT_CREATED
override val requiredEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_WORK_EXTRACT_CREATED
// TODO: Add event for request as well
)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
if (!event.data.isSuccess()) {
return null
}
val eventData = event.data as FfmpegWorkRequestCreated? ?: return null
val requiredEventId = if (event.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED) {
event.eventId
} else null;
val outFile = File(eventData.outFile)
return ConvertWorkerRequest(
status = Status.COMPLETED,
requiresEventId = requiredEventId,
inputFile = eventData.outFile,
allowOverwrite = true,
outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath
)
}
}

View File

@ -0,0 +1,17 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_ENCODE_CREATED
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED)
}

View File

@ -0,0 +1,16 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg.CreateProcesserWorkTask
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_EXTRACT_CREATED
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED)
}

View File

@ -0,0 +1,55 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.log
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) {
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted?
if (started == null) {
return null
}
if (!event.data.isSuccess()) {
return null
}
val proceed = events.find { it.event == KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED }
if (proceed == null && started.type == ProcessType.MANUAL) {
log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" }
return null
}
val earg = if (event.data is FfmpegWorkerArgumentsCreated) event.data as FfmpegWorkerArgumentsCreated? else return null
if (earg == null || earg.entries.isEmpty()) {
return null
}
val requestEvents = earg.entries.map {
FfmpegWorkRequestCreated(
status = Status.COMPLETED,
derivedFromEventId = event.eventId,
inputFile = earg.inputFile,
arguments = it.arguments,
outFile = it.outputFile
)
}
requestEvents.forEach {
super.onResult(it)
}
return null
}
}

View File

@ -92,7 +92,7 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
} }
} }
private fun onResult(data: MessageDataWrapper) { protected fun onResult(data: MessageDataWrapper) {
producer.sendMessage( producer.sendMessage(
referenceId = context[context_key_reference] as String, referenceId = context[context_key_reference] as String,
event = context[context_key_producesEvent] as KafkaEvents, event = context[context_key_producesEvent] as KafkaEvents,

View File

@ -16,6 +16,8 @@ enum class KafkaEvents(val event: String) {
EVENT_MEDIA_CONVERT_PARAMETER_CREATED("event:media-convert-parameter:created"), EVENT_MEDIA_CONVERT_PARAMETER_CREATED("event:media-convert-parameter:created"),
EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED("event:media-download-cover-parameter:created"), EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED("event:media-download-cover-parameter:created"),
EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted"),
EVENT_WORK_ENCODE_CREATED("event:work-encode:created"), EVENT_WORK_ENCODE_CREATED("event:work-encode:created"),
EVENT_WORK_EXTRACT_CREATED("event:work-extract:created"), EVENT_WORK_EXTRACT_CREATED("event:work-extract:created"),
EVENT_WORK_CONVERT_CREATED("event:work-convert:created"), EVENT_WORK_CONVERT_CREATED("event:work-convert:created"),

View File

@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
) )
data class FfmpegWorkRequestCreated( data class FfmpegWorkRequestCreated(
override val status: Status, override val status: Status,
val derivedFromEventId: String,
val inputFile: String, val inputFile: String,
val arguments: List<String>, val arguments: List<String>,
val outFile: String val outFile: String