Converter now works

This commit is contained in:
Brage 2024-01-06 23:29:37 +01:00
parent 4e9cdb10a4
commit 90e9d873f0
27 changed files with 721 additions and 29 deletions

4
.idea/misc.xml generated
View File

@ -1,7 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="EntryPointsManager">
<list size="1">
<list size="2">
<item index="0" class="java.lang.String" itemvalue="org.springframework.scheduling.annotation.Scheduled" />
<item index="1" class="java.lang.String" itemvalue="org.springframework.stereotype.Service" />
</list>
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">

View File

@ -20,22 +20,39 @@ repositories {
}
}
val exposedVersion = "0.44.0"
dependencies {
/*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation ("mysql:mysql-connector-java:8.0.29")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307")
implementation(project(mapOf("path" to ":shared")))
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev.library:subtitle:1.7.7-SNAPSHOT")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared")))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.4.1")
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:kafka")))
}
tasks.test {

View File

@ -0,0 +1,36 @@
package no.iktdev.mediaprocessing.converter
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@Service
@EnableScheduling
class ClaimsService() {
private val log = KotlinLogging.logger {}
@Autowired
lateinit var coordinator: ConverterCoordinator
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = PersistentDataReader().getExpiredClaimsProcessEvents()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = PersistentDataStore()
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {
log.error { "Failed to release claim on ${it.referenceId}::${it.eventId}::${it.event}" }
}
}
coordinator.readAllInQueue()
}
}

View File

@ -1,5 +1,9 @@
package no.iktdev.mediaprocessing.converter
import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext
@ -13,6 +17,13 @@ fun getContext(): ApplicationContext? {
return context
}
fun main(args: Array<String>) {
val dataSource = MySqlDataSource.fromDatabaseEnv()
Coroutines.default().launch {
dataSource.createDatabase()
dataSource.createTables(
processerEvents
)
}
context = runApplication<ConvertApplication>(*args)
}
//private val logger = KotlinLogging.logger {}

View File

@ -0,0 +1,73 @@
package no.iktdev.mediaprocessing.converter
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.flow.EventBasedProcessMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.stereotype.Service
@Service
class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, EventBasedProcessMessageListener>() {
private val log = KotlinLogging.logger {}
override val listeners: EventBasedProcessMessageListener = EventBasedProcessMessageListener()
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
messages: List<PersistentProcessDataMessage>
) {
val triggeredMessage = messages.find { it.eventId == eventId }
if (triggeredMessage == null) {
log.error { "Could not find $eventId in provided messages" }
return
}
listeners.forwardEventMessageToListeners(triggeredMessage, messages)
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
fun readAllInQueue() {
val messages = PersistentDataReader().getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
createTasksBasedOnEventsAndPersistence(referenceId = it.referenceId, eventId = it.eventId, messages)
}
}
}
override fun onCoordinatorReady() {
log.info { "Converter Coordinator is ready" }
readAllInQueue()
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) {
val success = PersistentDataStore().storeProcessDataMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${DatabaseConfig.database}!" }
} else {
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
} else if (event.key == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED) {
readAllInQueue()
} else {
log.debug { "Skipping ${event.key}" }
}
//log.info { Gson().toJson(event.value) }
}
}

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.converter
import no.iktdev.exfl.using
import java.io.File
class ConverterEnv {
companion object {
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false
val syncDialogs = System.getenv("SYNC_DIALOGS").toBoolean()
val outFormats: List<String> = System.getenv("OUT_FORMATS")?.split(",")?.toList() ?: emptyList()
val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else
File("data").using("logs", "convert")
}
}

View File

@ -0,0 +1,16 @@
package no.iktdev.mediaprocessing.converter
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@Configuration
class SocketLocalInit: SocketImplementation()
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -0,0 +1,55 @@
package no.iktdev.mediaprocessing.converter.convert
import no.iktdev.library.subtitle.Configuration
import no.iktdev.library.subtitle.Syncro
import no.iktdev.library.subtitle.classes.Dialog
import no.iktdev.library.subtitle.classes.DialogType
import no.iktdev.library.subtitle.export.Export
import no.iktdev.library.subtitle.reader.BaseReader
import no.iktdev.library.subtitle.reader.Reader
import no.iktdev.mediaprocessing.converter.ConverterEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import java.io.File
import kotlin.jvm.Throws
class Converter(val referenceId: String, val eventId: String, val data: ConvertWorkerRequest) {
@Throws(FileUnavailableException::class)
private fun getReader(): BaseReader? {
val file = File(data.inputFile)
if (!file.canRead())
throw FileUnavailableException("Can't open file for reading..")
return Reader(file).getSubtitleReader()
}
private fun syncDialogs(input: List<Dialog>): List<Dialog> {
return if (ConverterEnv.syncDialogs) Syncro().sync(input) else input
}
fun canRead(): Boolean {
try {
val reader = getReader()
return reader != null
} catch (e: FileUnavailableException) {
return false
}
}
@Throws(FileUnavailableException::class, FileIsNullOrEmpty::class)
fun execute(): List<File> {
val file = File(data.inputFile)
Configuration.exportJson = true
val read = getReader()?.read() ?: throw FileIsNullOrEmpty()
if (read.isEmpty())
throw FileIsNullOrEmpty()
val filtered = read.filter { !it.ignore && it.type !in listOf(DialogType.SIGN_SONG, DialogType.CAPTION) }
val syncOrNotSync = syncDialogs(filtered)
val exporter = Export(file, File(data.outDirectory), data.outFileBaseName)
return exporter.write(syncOrNotSync)
}
class FileIsNullOrEmpty(override val message: String? = "File read is null or empty"): RuntimeException()
class FileUnavailableException(override val message: String): RuntimeException()
}

View File

@ -0,0 +1,30 @@
package no.iktdev.mediaprocessing.converter.flow
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
class EventBasedProcessMessageListener: EventBasedMessageListener<PersistentProcessDataMessage>() {
override fun waitingListeners(events: List<PersistentProcessDataMessage>): List<Tasks<PersistentProcessDataMessage>> {
val nonCreators = listeners
.filter { !events.map { e -> e.event }
.contains(it.producesEvent) }
return nonCreators
}
override fun listenerWantingEvent(event: PersistentProcessDataMessage, waitingListeners: List<Tasks<PersistentProcessDataMessage>>): List<Tasks<PersistentProcessDataMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentProcessDataMessage,
history: List<PersistentProcessDataMessage>,
listeners: List<ITaskCreatorListener<PersistentProcessDataMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
}

View File

@ -0,0 +1,29 @@
package no.iktdev.mediaprocessing.converter.flow
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class ProcesserTaskCreator(coordinator: ConverterCoordinator):
TaskCreatorImpl<ConverterCoordinator, PersistentProcessDataMessage, EventBasedProcessMessageListener>(coordinator) {
override fun isPrerequisiteEventsOk(events: List<PersistentProcessDataMessage>): Boolean {
val currentEvents = events.map { it.event }
return requiredEvents.all { currentEvents.contains(it) }
}
override fun isPrerequisiteDataPresent(events: List<PersistentProcessDataMessage>): Boolean {
val failed = events
.filter { e -> e.event in requiredEvents }
.filter { !it.data.isSuccess() }
return failed.isEmpty()
}
override fun isEventOfSingle(event: PersistentProcessDataMessage, singleOne: KafkaEvents): Boolean {
return event.event == singleOne
}
}

View File

@ -0,0 +1,159 @@
package no.iktdev.mediaprocessing.converter.tasks
import com.google.gson.Gson
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.converter.flow.ProcesserTaskCreator
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.util.*
@Service
class ConvertService(@Autowired override var coordinator: ConverterCoordinator) : ProcesserTaskCreator(coordinator) {
private val log = KotlinLogging.logger {}
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
override val listensForEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED,
KafkaEvents.EVENT_WORK_CONVERT_CREATED
)
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED
fun getRequiredExtractProcessForContinuation(referenceId: String, requiresEventId: String): PersistentProcessDataMessage? {
return PersistentDataReader().getProcessEvent(referenceId, requiresEventId)
}
fun canConvert(extract: PersistentProcessDataMessage?): Boolean {
return extract?.consumed == true && extract.data.isSuccess()
}
override fun onProcessEvents(
event: PersistentProcessDataMessage,
events: List<PersistentProcessDataMessage>
): MessageDataWrapper? {
if (event.data !is ConvertWorkerRequest)
return null
log.info { Gson().toJson(event) }
val isAlreadyClaimed = PersistentDataReader().isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val payload = event.data as ConvertWorkerRequest
val requiresEventId: String? = payload.requiresEventId
val awaitingFor: PersistentProcessDataMessage? = if (requiresEventId != null) {
val existing = getRequiredExtractProcessForContinuation(referenceId = event.referenceId, requiresEventId = requiresEventId)
if (existing == null) {
skipConvertEvent(event, requiresEventId)
return null
}
existing
} else null
val converter = if (requiresEventId.isNullOrBlank() || canConvert(awaitingFor)) {
Converter(referenceId = event.referenceId, eventId = event.eventId, data = payload)
} else null
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
if (!setClaim) {
return null
}
if (converter == null || !converter.canRead()) {
// Make claim regardless but push to schedule
return null
}
val result = try {
performConvert(converter)
} catch (e: Exception) {
SimpleMessageData(status = Status.ERROR, message = e.message)
}
val consumedIsSuccessful = PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
PersistentDataStore().setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
}
delay(1000)
var readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
}
}
return result
}
fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {
val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}")
coordinator.producer.sendMessage(
referenceId = event.referenceId,
event = KafkaEvents.EVENT_WORK_CONVERT_SKIPPED,
data = producesPayload
)
}
fun performConvert(converter: Converter): ConvertWorkPerformed {
return try {
val result = converter.execute()
ConvertWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
result = result.map { it.absolutePath }
)
} catch (e: Converter.FileUnavailableException) {
e.printStackTrace()
ConvertWorkPerformed(
status = Status.ERROR,
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
result = emptyList()
)
} catch (e : Converter.FileIsNullOrEmpty) {
e.printStackTrace()
ConvertWorkPerformed(
status = Status.ERROR,
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
result = emptyList()
)
}
}
data class PendingWorkerCache(
val referenceId: String,
val eventId: String,
val requiresEventId: String
)
}

View File

@ -1,4 +1,4 @@
package no.iktdev.streamit.content.encode
package no.iktdev.mediaprocessing.processer
import no.iktdev.exfl.using
import java.io.File
@ -7,10 +7,11 @@ class ProcesserEnv {
companion object {
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false
val maxEncodeRunners: Int = try {System.getenv("SIMULTANEOUS_ENCODE_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1}
val maxExtractRunners: Int = try {System.getenv("SIMULTANEOUS_EXTRACT_RUNNERS").toIntOrNull() ?: 1 } catch (e: Exception) {1}
val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else
File("data").using("logs")
val encodeLogDirectory = logDirectory.using("encode")
val extractLogDirectory = logDirectory.using("extract")
}
}

View File

@ -6,16 +6,16 @@ import com.google.gson.Gson
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import java.io.BufferedWriter
import java.io.File
import java.io.FileWriter
class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents) {
class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents, val logDir: File) {
val scope = Coroutines.io()
val decoder = FfmpegProgressDecoder()
private val outputCache = mutableListOf<String>()
val logFile = ProcesserEnv.logDirectory.using("$eventId-${File(info.outFile).nameWithoutExtension}.log")
val logFile = logDir.using("$eventId-${File(info.outFile).nameWithoutExtension}.log")
val getOutputCache = outputCache.toList()
@ -54,7 +54,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
private suspend fun execute(args: List<String>) {
listener.onStarted(info)
val processOp = process(ProcesserEnv.ffmpeg, *args.toTypedArray(),
val processOp = process(
ProcesserEnv.ffmpeg, *args.toTypedArray(),
stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE,
consumer = {

View File

@ -16,7 +16,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
@ -26,6 +26,7 @@ import javax.annotation.PreDestroy
@Service
class EncodeService: TaskCreator() {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.encodeLogDirectory
val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
@ -74,14 +75,14 @@ class EncodeService: TaskCreator() {
fun startEncode(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs()
if (!ProcesserEnv.logDirectory.exists()) {
ProcesserEnv.logDirectory.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = encodeServiceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents)
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
return
@ -161,7 +162,7 @@ class EncodeService: TaskCreator() {
}
fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) {
// TODO: Implementation
}

View File

@ -17,7 +17,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.streamit.content.encode.ProcesserEnv
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
@ -27,6 +27,8 @@ import javax.annotation.PreDestroy
@Service
class ExtractService: TaskCreator() {
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.extractLogDirectory
val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
@ -75,15 +77,15 @@ class ExtractService: TaskCreator() {
fun startExtract(event: PersistentProcessDataMessage) {
val ffwrc = event.data as FfmpegWorkRequestCreated
File(ffwrc.outFile).parentFile.mkdirs()
if (!ProcesserEnv.logDirectory.exists()) {
ProcesserEnv.logDirectory.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = PersistentDataStore().setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = extractServiceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, listener = ffmpegWorkerEvents)
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")

View File

@ -30,6 +30,9 @@ dependencies {
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20230227")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.jetbrains.kotlinx:kotlinx-datetime:0.5.0")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")

View File

@ -0,0 +1,37 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import javax.annotation.PostConstruct
abstract class CoordinatorBase<V, L: EventBasedMessageListener<V>> {
abstract val listeners: L
val io = Coroutines.io()
@Autowired
lateinit var producer: CoordinatorProducer
@Autowired
private lateinit var listener: DefaultMessageListener
abstract fun createTasksBasedOnEventsAndPersistence(referenceId: String, eventId: String, messages: List<V>)
abstract fun onCoordinatorReady()
abstract fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>)
@PostConstruct
fun onInitializationCompleted() {
onCoordinatorReady()
listener.onMessageReceived = { event -> onMessageReceived(event)}
listener.listen(KafkaEnv.kafkaTopic)
}
}

View File

@ -30,3 +30,10 @@ suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60,
delay(delayed)
} while (condition.invoke() && elapsedDelay < maxDuration)
}
fun getComputername(): String {
return listOfNotNull(
System.getenv("hostname"),
System.getenv("computername")
).firstOrNull() ?: "UNKNOWN_SYSTEM"
}

View File

@ -75,6 +75,16 @@ class PersistentDataReader {
return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline }
}
fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? {
val message = withTransaction {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) }
}?.singleOrNull()
return message
}
fun getProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction {
processerEvents.selectAll()

View File

@ -6,18 +6,15 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime
data class PersistentMessage(
val referenceId: String,
val eventId: String,
val event: KafkaEvents,
//val metadata: Metadata,
val data: MessageDataWrapper,
val created: LocalDateTime
)
data class Metadata(
val createdBy: String
)
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event

View File

@ -0,0 +1,67 @@
package no.iktdev.mediaprocessing.shared.common.tasks
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
abstract class EventBasedMessageListener<V> {
val listeners: MutableList<Tasks<V>> = mutableListOf()
fun add(produces: KafkaEvents, listener: ITaskCreatorListener<V>) {
listeners.add(Tasks(producesEvent = produces, taskHandler = listener))
}
fun add(task: Tasks<V>) {
listeners.add(task)
}
/**
* Example implementation
*
* fun waitingListeners(events: List<PersistentMessage>): List<Tasks> {
* val nonCreators = listeners
* .filter { !events.map { e -> e.event }
* .contains(it.producesEvent) }
* return nonCreators
* }
*/
abstract fun waitingListeners(events: List<V>): List<Tasks<V>>
/**
* Example implementation
*
* fun listenerWantingEvent(event: PersistentMessage, waitingListeners: List<Tasks>)
* : List<Tasks>
* {
* return waitingListeners.filter { event.event in it.listensForEvents }
* }
*/
abstract fun listenerWantingEvent(event: V, waitingListeners: List<Tasks<V>>): List<Tasks<V>>
/**
* Send to taskHandler
*/
abstract fun onForward(event: V, history: List<V>, listeners: List<ITaskCreatorListener<V>>)
/**
* This will be called in sequence, thus some messages might be made a duplicate of.
*/
fun forwardEventMessageToListeners(newEvent: V, events: List<V>) {
val waitingListeners = waitingListeners(events)
val availableListeners = listenerWantingEvent(event = newEvent, waitingListeners = waitingListeners)
onForward(event = newEvent, history = events, listeners = availableListeners.map { it.taskHandler })
}
/**
* This will be called with all messages at once, thus it should reflect kafka topic and database
*/
fun forwardBatchEventMessagesToListeners(events: List<V>) {
val waitingListeners = waitingListeners(events)
onForward(event = events.last(), history = events, waitingListeners.map { it.taskHandler })
}
}
data class Tasks<V>(
val producesEvent: KafkaEvents,
val listensForEvents: List<KafkaEvents> = listOf(),
val taskHandler: ITaskCreatorListener<V>
)

View File

@ -0,0 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.tasks
interface ITaskCreatorListener<V> {
fun onEventReceived(referenceId: String, event: V, events: List<V>): Unit
}

View File

@ -0,0 +1,104 @@
package no.iktdev.mediaprocessing.shared.common.tasks
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import javax.annotation.PostConstruct
abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessageListener<V>>(
open var coordinator: C
) : ITaskCreatorListener<V> {
// Event that the implementer sets
abstract val producesEvent: KafkaEvents
open val requiredEvents: List<KafkaEvents> = listOf()
open val listensForEvents: List<KafkaEvents> = listOf()
@Autowired
lateinit var producer: CoordinatorProducer
fun getListener(): Tasks<V> {
val eventListenerFilter = listensForEvents.ifEmpty { requiredEvents }
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = eventListenerFilter)
}
@PostConstruct
fun attachListener() {
coordinator.listeners.add(getListener())
}
/**
* Example implementation
*
* open fun isPrerequisiteEventsOk(events: List<V>): Boolean {
* val currentEvents = events.map { it.event }
* return requiredEvents.all { currentEvents.contains(it) }
* }
*
*/
abstract fun isPrerequisiteEventsOk(events: List<V>): Boolean
/**
* Example implementation
*
* open fun isPrerequisiteDataPresent(events: List<V>): Boolean {
* val failed = events
* .filter { e -> e.event in requiredEvents }
* .filter { !it.data.isSuccess() }
* return failed.isEmpty()
* }
*/
abstract fun isPrerequisiteDataPresent(events: List<V>): Boolean
/**
* Example implementation
*
* open fun isEventOfSingle(event: V, singleOne: KafkaEvents): Boolean {
* return event.event == singleOne
* }
*/
abstract fun isEventOfSingle(event: V, singleOne: KafkaEvents): Boolean
open fun prerequisitesRequired(events: List<V>): List<() -> Boolean> {
return listOf {
isPrerequisiteEventsOk(events)
}
}
open fun prerequisiteRequired(event: V): List<() -> Boolean> {
return listOf()
}
private val context: MutableMap<String, Any> = mutableMapOf()
private val context_key_reference = "reference"
private val context_key_producesEvent = "event"
final override fun onEventReceived(referenceId: String, event: V, events: List<V>) {
context[context_key_reference] = referenceId
getListener().producesEvent.let {
context[context_key_producesEvent] = it
}
if (prerequisitesRequired(events).all { it.invoke() } && prerequisiteRequired(event).all { it.invoke() }) {
val result = onProcessEvents(event, events)
if (result != null) {
onResult(result)
}
} else {
// TODO: Re-enable this
// log.info { "Skipping: ${event.event} as it does not fulfill the requirements for ${context[context_key_producesEvent]}" }
}
}
private fun onResult(data: MessageDataWrapper) {
producer.sendMessage(
referenceId = context[context_key_reference] as String,
event = context[context_key_producesEvent] as KafkaEvents,
data = data
)
}
abstract fun onProcessEvents(event: V, events: List<V>): MessageDataWrapper?
}

View File

@ -27,7 +27,7 @@ class DeserializingRegistry {
KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to null,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java,

View File

@ -4,6 +4,4 @@ import java.util.*
open class CollectionReference(
@Transient open val referenceId: String = UUID.randomUUID().toString(),
) {
}
) {}

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED)
data class ConvertWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
val result: List<String>
): MessageDataWrapper(status, message)

View File

@ -6,7 +6,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED)
data class ConvertWorkerRequest(
val requiresEventId: String,
val requiresEventId: String? = null,
val inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,