This commit is contained in:
bskjon 2024-04-07 19:16:14 +02:00
parent 6090c2e8c0
commit f0a8e14aaa
65 changed files with 1206 additions and 437 deletions

View File

@ -18,13 +18,12 @@ class ClaimsService() {
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = persistentReader.getExpiredClaimsProcessEvents()
val expiredClaims = eventManager.getProcessEventsWithExpiredClaim()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = persistentWriter
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {

View File

@ -4,6 +4,7 @@ import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import org.springframework.boot.autoconfigure.SpringBootApplication
@ -20,8 +21,7 @@ fun getContext(): ApplicationContext? {
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
lateinit var eventManager: PersistentEventManager
private lateinit var eventsDatabase: MySqlDataSource
fun getEventsDatabase(): MySqlDataSource {
@ -33,8 +33,7 @@ fun main(args: Array<String>) {
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
eventManager = PersistentEventManager(eventsDatabase)
context = runApplication<ConvertApplication>(*args)
}

View File

@ -40,14 +40,14 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (event.key == KafkaEvents.EVENT_WORK_CONVERT_CREATED) {
val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value)
if (event.key == KafkaEvents.EventWorkConvertCreated) {
val success = eventManager.setProcessEvent(event.key, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}!" }
} else {
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
} else if (event.key == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED) {
} else if (event.key == KafkaEvents.EventWorkExtractPerformed) {
readAllInQueue()
} else {
log.debug { "Skipping ${event.key}" }
@ -55,7 +55,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
}
fun readAllInQueue() {
val messages = persistentReader.getAvailableProcessEvents()
val messages = eventManager.getProcessEventsClaimable()// persistentReader.getAvailableProcessEvents()
io.launch {
messages.forEach {
delay(1000)
@ -65,7 +65,7 @@ class ConverterCoordinator() : CoordinatorBase<PersistentProcessDataMessage, Per
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = persistentReader.getAvailableProcessEvents()
val messages = eventManager.getProcessEventsClaimable() // persistentReader.getAvailableProcessEvents()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}

View File

@ -3,11 +3,8 @@ package no.iktdev.mediaprocessing.converter.tasks
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.converter.ConverterCoordinator
import no.iktdev.mediaprocessing.converter.TaskCreator
import no.iktdev.mediaprocessing.converter.*
import no.iktdev.mediaprocessing.converter.convert.Converter
import no.iktdev.mediaprocessing.converter.persistentReader
import no.iktdev.mediaprocessing.converter.persistentWriter
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -37,18 +34,18 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
override val listensForEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED,
KafkaEvents.EVENT_WORK_CONVERT_CREATED
KafkaEvents.EventWorkExtractPerformed,
KafkaEvents.EventWorkConvertCreated
)
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_CONVERT_PERFORMED
get() = KafkaEvents.EventWorkConvertPerformed
fun getRequiredExtractProcessForContinuation(
referenceId: String,
requiresEventId: String
): PersistentProcessDataMessage? {
return persistentReader.getProcessEvent(referenceId, requiresEventId)
return eventManager.getProcessEventWith(referenceId, requiresEventId)
}
fun canConvert(extract: PersistentProcessDataMessage?): Boolean {
@ -61,7 +58,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
events: List<PersistentProcessDataMessage>
): MessageDataWrapper? {
val convertEvent =
events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED && it.data is ConvertWorkerRequest }
events.find { it.event == KafkaEvents.EventWorkConvertCreated && it.data is ConvertWorkerRequest }
if (convertEvent == null) {
// No convert here..
return null
@ -94,17 +91,16 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
}
val isAlreadyClaimed =
persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
val setClaim = persistentWriter.setProcessEventClaim(
val setClaim = eventManager.setProcessEventClaim(
referenceId = event.referenceId,
eventId = event.eventId,
claimedBy = serviceId
claimer = serviceId
)
if (!setClaim) {
return null
@ -133,20 +129,19 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
val consumedIsSuccessful =
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
}
delay(1000)
var readbackIsSuccess =
persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
var readbackIsSuccess = eventManager.isProcessEventCompleted(event.referenceId, event.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess =
persistentReader.isProcessEventDefinedAsConsumed(event.referenceId, event.eventId, serviceId)
eventManager.isProcessEventCompleted(event.referenceId, event.eventId)
}
}
return result
@ -208,13 +203,13 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
} catch (e: Exception) {
persistentWriter.setProcessEventCompleted(referenceId, event.eventId, serviceId)
eventManager.setProcessEventCompleted(referenceId, event.eventId)
failed.add(event)
log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." }
producer.sendMessage(
referenceId = referenceId,
event = producesEvent,
data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}")
data = SimpleMessageData(Status.SKIPPED, "Required event: ${ce?.requiresEventId} is not found. Skipping convert work for referenceId: ${referenceId}", derivedFromEventId = event.eventId)
)
}
}

View File

@ -7,16 +7,12 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.CoordinatorBase
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isOfEvent
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.ProcessStartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.RequestStartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.stereotype.Service
import java.io.File
import java.util.UUID
@ -30,13 +26,10 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
val success = persistentWriter.storeEventDataMessage(event.key.event, event.value)
val success = eventManager.setEvent(event.key, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().config.databaseName}" }
} else {
deleteOlderEventsIfSuperseded(event.key, event.value)
io.launch {
delay(1000) // Give the database a few sec to update
readAllMessagesFor(event.value.referenceId, event.value.eventId)
@ -78,7 +71,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
file = file.absolutePath,
type = type
)
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, processStartEvent)
producer.sendMessage(UUID.randomUUID().toString(), KafkaEvents.EventMediaProcessStarted, processStartEvent)
}
@ -96,14 +89,14 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
fun permitWorkToProceedOn(referenceId: String, message: String) {
producer.sendMessage(
referenceId = referenceId,
KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED,
SimpleMessageData(Status.COMPLETED, message)
KafkaEvents.EventMediaWorkProceedPermitted,
SimpleMessageData(Status.COMPLETED, message, null)
)
}
fun readAllUncompletedMessagesInQueue() {
val messages = persistentReader.getUncompletedMessages()
val messages = eventManager.getEventsUncompleted()
io.launch {
messages.forEach {
delay(1000)
@ -117,17 +110,17 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = persistentReader.getMessagesFor(referenceId)
val messages = eventManager.getEventsWith(referenceId)
if (messages.find { it.eventId == eventId && it.referenceId == referenceId } == null) {
log.warn { "EventId ($eventId) for ReferenceId ($referenceId) has not been made available in the database yet." }
io.launch {
val fixedDelay = 1000L
delay(fixedDelay)
var delayed = 0L
var msc = persistentReader.getMessagesFor(referenceId)
var msc = eventManager.getEventsWith(referenceId)
while (msc.find { it.eventId == eventId } != null || delayed < 1000 * 60) {
delayed += fixedDelay
msc = persistentReader.getMessagesFor(referenceId)
msc = eventManager.getEventsWith(referenceId)
}
operationToRunOnMessages(referenceId, eventId, msc)
}
@ -145,63 +138,7 @@ class Coordinator() : CoordinatorBase<PersistentMessage, PersistentEventBasedMes
}
fun getProcessStarted(messages: List<PersistentMessage>): MediaProcessStarted? {
return messages.find { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted
}
fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message<out MessageDataWrapper>) {
var existingMessages = persistentReader.getMessagesFor(value.referenceId)
if (!KafkaEvents.isOfWork(event)) {
val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId }
superseded.forEach {
persistentWriter.deleteStoredEventDataMessage(
referenceId = it.referenceId,
eventId = it.eventId,
event = it.event
)
}
}
existingMessages = persistentReader.getMessagesFor(value.referenceId)
val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) }
for (item: PersistentMessage in workItems) {
val originatorId = if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED) ||
item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
) {
val ec = item.data as FfmpegWorkRequestCreated
ec.derivedFromEventId
} else if (item.isOfEvent(KafkaEvents.EVENT_WORK_ENCODE_PERFORMED)) {
try {
(item.data as ProcesserEncodeWorkPerformed).derivedFromEventId
} catch (e: Exception) {
null
}
} else if (item.isOfEvent(KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED)) {
try {
(item.data as ProcesserExtractWorkPerformed).derivedFromEventId
} catch (e: Exception) {
null
}
} else null
originatorId?.let { originator ->
deleteEventsIfNoOriginator(item.referenceId, item.eventId, item.event, originator, existingMessages)
}
}
}
private fun deleteEventsIfNoOriginator(
referenceId: String,
eventId: String,
event: KafkaEvents,
originatorId: String,
existingMessages: List<PersistentMessage>
) {
val originator = existingMessages.find { it.eventId == originatorId }
if (originator == null) {
persistentWriter.deleteStoredEventDataMessage(referenceId, eventId, event)
}
return messages.find { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted
}
}

View File

@ -9,6 +9,7 @@ import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import no.iktdev.mediaprocessing.shared.common.toStoredDatabase
@ -46,8 +47,7 @@ fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
lateinit var eventManager: PersistentEventManager
fun main(args: Array<String>) {
Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
@ -57,16 +57,19 @@ fun main(args: Array<String>) {
})
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
storeDatabase = DatabaseEnvConfig.toStoredDatabase()
eventsDatabase.createDatabase()
storeDatabase = DatabaseEnvConfig.toStoredDatabase()
storeDatabase.createDatabase()
eventManager = PersistentEventManager(eventsDatabase)
val kafkaTables = listOf(
events, // For kafka
)
eventsDatabase.createTables(*kafkaTables.toTypedArray())
val tables = arrayOf(
catalog,
@ -83,9 +86,8 @@ fun main(args: Array<String>) {
)
storeDatabase.createTables(*tables)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
eventsDatabase.createTables(*kafkaTables.toTypedArray())
context = runApplication<CoordinatorApplication>(*args)
printSharedConfig()
}

View File

@ -2,8 +2,7 @@ package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.persistentReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.coordinator.eventManager
import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
@ -20,7 +19,7 @@ class ActionEventController(@Autowired var coordinator: Coordinator) {
@RequestMapping("/flow/proceed")
fun permitRunOnSequence(@RequestBody data: RequestWorkProceed): ResponseEntity<String> {
val set = persistentReader.getMessagesFor(data.referenceId)
val set = eventManager.getEventsWith(data.referenceId)
if (set.isEmpty()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(Gson().toJson(data))
}

View File

@ -30,10 +30,10 @@ class ProcessMapping(val events: List<PersistentMessage>) {
fun waitsForEncode(): Boolean {
val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED }
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED}
val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterEncodeCreated }
val created = events.filter { it.event == KafkaEvents.EventWorkEncodeCreated}
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED }
val performed = events.filter { it.event == KafkaEvents.EventWorkEncodePerformed }
val isSkipped = events.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
@ -41,18 +41,18 @@ class ProcessMapping(val events: List<PersistentMessage>) {
fun waitsForExtract(): Boolean {
// Check if message is declared as skipped with statis
val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }.filter { it.data.isSuccess() }
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
val arguments = events.filter { it.event == KafkaEvents.EventMediaParameterExtractCreated }.filter { it.data.isSuccess() }
val created = events.filter { it.event == KafkaEvents.EventWorkExtractCreated }
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED }
val performed = events.filter { it.event == KafkaEvents.EventWorkExtractPerformed }
val isSkipped = events.filter { it.isSkipped() }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
}
fun waitsForConvert(): Boolean {
val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED }
val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED }
val created = events.filter { it.event == KafkaEvents.EventWorkConvertCreated }
val performed = events.filter { it.event == KafkaEvents.EventWorkConvertPerformed }
val isSkipped = events.filter { it.isSkipped() }
return created.size > performed.size + isSkipped.size

View File

@ -21,9 +21,9 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED
get() = KafkaEvents.EventMediaReadBaseInfoPerformed
override val requiredEvents: List<KafkaEvents> = listOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED)
override val requiredEvents: List<KafkaEvents> = listOf(KafkaEvents.EventMediaProcessStarted)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
@ -34,22 +34,23 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null
return readFileInfo(selected.data as MediaProcessStarted)
val selected = events.lastOrSuccessOf(KafkaEvents.EventMediaProcessStarted) ?: return null
return readFileInfo(selected.data as MediaProcessStarted, event.eventId)
}
fun readFileInfo(started: MediaProcessStarted): MessageDataWrapper {
fun readFileInfo(started: MediaProcessStarted, eventId: String): MessageDataWrapper {
val result = try {
val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName)
BaseInfoPerformed(
Status.COMPLETED,
title = fileNameParser.guessDesiredTitle(),
sanitizedName = fileNameParser.guessDesiredFileName()
sanitizedName = fileNameParser.guessDesiredFileName(),
derivedFromEventId = eventId
)
} catch (e: Exception) {
e.printStackTrace()
SimpleMessageData(Status.ERROR, e.message ?: "Unable to obtain proper info from file")
SimpleMessageData(Status.ERROR, e.message ?: "Unable to obtain proper info from file", eventId)
}
return result
}

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
@ -32,7 +33,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_MEDIA_PROCESS_STARTED,
EventMediaProcessStarted,
EVENT_MEDIA_PROCESS_COMPLETED
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
@ -40,7 +41,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
val completed = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_COMPLETED) ?: return null
if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) {
return null
@ -65,20 +66,20 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
null
else
storeCatalog(metadata = meta,genres = genres, videoFile = videoFile, videoDetails = videoInfo)
} ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null")
} ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null", event.eventId)
mapped.metadata?.let {
storeMetadata(catalogId = catalogId, metadata = it)
}
return SimpleMessageData(Status.COMPLETED)
return SimpleMessageData(Status.COMPLETED, derivedFromEventId = event.eventId)
}
private fun storeSubtitles(collection: String, subtitles: List<String>): Boolean {
val result = subtitles.map { subtitle ->
val subtitleFile = File(subtitle)
val language = subtitleFile.parentFile.name
subtitle to executeWithStatus {
subtitle to executeWithStatus(getStoreDatabase()) {
SubtitleQuery(
collection = collection,
associatedWithVideo = subtitleFile.nameWithoutExtension,
@ -93,7 +94,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
private fun storeMetadata(catalogId: Int, metadata: MetadataDto) {
metadata.summary.forEach {
withTransaction {
withTransaction(getStoreDatabase()) {
SummaryQuery(
cid = catalogId,
language = it.language,
@ -104,7 +105,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
}
private fun storeAndGetGenres(genres: List<String>): String? {
return withTransaction {
return withTransaction(getStoreDatabase()) {
val gq = GenreQuery( *genres.toTypedArray() )
gq.insertAndGetIds()
gq.getIds().joinToString(",")
@ -141,7 +142,7 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
}
val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
return if (result == null || ignoreException ) {
return withTransaction {
return withTransaction(getStoreDatabase()) {
precreatedCatalogQuery.getId()
}
} else null

View File

@ -22,16 +22,16 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED
override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_MEDIA_PROCESS_STARTED,
EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
EventMediaProcessStarted,
EventMediaReadBaseInfoPerformed,
EventMediaReadOutNameAndType
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
if (!started.data.isSuccess()) {
return null
}
@ -40,9 +40,9 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val requiresOneOf = listOf(
EVENT_WORK_CONVERT_PERFORMED,
EVENT_WORK_EXTRACT_PERFORMED,
EVENT_WORK_ENCODE_PERFORMED
EventWorkConvertPerformed,
EventWorkExtractPerformed,
EventWorkEncodePerformed
)
if (requiresOneOf.none { it in receivedEvents }) {
@ -56,7 +56,7 @@ class CompleteMediaTask(@Autowired override var coordinator: Coordinator) : Task
val mapper = ProcessMapping(events)
if (mapper.canCollect()) {
return ProcessCompleted(Status.COMPLETED)
return ProcessCompleted(Status.COMPLETED, event.eventId)
}
return null
}

View File

@ -39,9 +39,9 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val requiresOneOf = listOf(
EVENT_WORK_CONVERT_PERFORMED,
EVENT_WORK_EXTRACT_PERFORMED,
EVENT_WORK_ENCODE_PERFORMED
EventWorkConvertPerformed,
EventWorkExtractPerformed,
EventWorkEncodePerformed
)
if (requiresOneOf.none { it in receivedEvents }) {
@ -55,7 +55,7 @@ class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : Ta
val mapper = ProcessMapping(events)
if (mapper.canCollect()) {
return ProcessCompleted(Status.COMPLETED)
return ProcessCompleted(Status.COMPLETED, event.eventId)
}
return null
}

View File

@ -16,11 +16,11 @@ import java.io.File
@Service
class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_CONVERT_CREATED
get() = KafkaEvents.EventWorkConvertCreated
override val requiredEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_WORK_EXTRACT_CREATED
KafkaEvents.EventWorkExtractCreated
// TODO: Add event for request as well
)
@ -30,7 +30,7 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
}
val eventData = event.data as FfmpegWorkRequestCreated? ?: return null
val requiredEventId = if (event.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED) {
val requiredEventId = if (event.event == KafkaEvents.EventWorkExtractCreated) {
event.eventId
} else null;
@ -41,7 +41,8 @@ class CreateConvertWorkTask(@Autowired override var coordinator: Coordinator) :
inputFile = eventData.outFile,
allowOverwrite = true,
outFileBaseName = outFile.nameWithoutExtension,
outDirectory = outFile.parentFile.absolutePath
outDirectory = outFile.parentFile.absolutePath,
derivedFromEventId = event.eventId
)
}

View File

@ -9,9 +9,9 @@ import org.springframework.stereotype.Service
@Service
class CreateEncodeWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_ENCODE_CREATED
get() = KafkaEvents.EventWorkEncodeCreated
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED)
get() = listOf(KafkaEvents.EventMediaParameterEncodeCreated)
}

View File

@ -9,8 +9,8 @@ import org.springframework.stereotype.Service
@Service
class CreateExtractWorkTask(@Autowired override var coordinator: Coordinator) : CreateProcesserWorkTask(coordinator) {
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_EXTRACT_CREATED
get() = KafkaEvents.EventWorkExtractCreated
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED)
get() = listOf(KafkaEvents.EventMediaParameterExtractCreated)
}

View File

@ -24,13 +24,13 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED
get() = KafkaEvents.EventWorkDownloadCoverPerformed
override val requiredEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_COVER,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
KafkaEvents.EventMediaMetadataSearchPerformed,
KafkaEvents.EventMediaReadOutCover,
KafkaEvents.EventWorkEncodePerformed
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
@ -39,14 +39,14 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val cover = events.find { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_COVER }
val cover = events.find { it.event == KafkaEvents.EventMediaReadOutCover }
if (cover == null || cover.data !is CoverInfoPerformed) {
return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId")
return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId", event.eventId)
}
val coverData = cover.data as CoverInfoPerformed
val outDir = File(coverData.outDir)
if (!outDir.exists())
return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId")
return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId", event.eventId)
val client = DownloadClient(coverData.url, File(coverData.outDir), coverData.outFileBaseName)
@ -67,10 +67,10 @@ class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator
}
return if (result == null) {
SimpleMessageData(Status.ERROR, "Could not download cover, check logs")
SimpleMessageData(Status.ERROR, "Could not download cover, check logs", event.eventId)
} else {
val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR
CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath)
CoverDownloadWorkPerformed(status = status, message = message, coverFile = result.absolutePath, event.eventId)
}
}
}

View File

@ -20,12 +20,12 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER
get() = KafkaEvents.EventMediaReadOutCover
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED
KafkaEvents.EventMediaReadBaseInfoPerformed,
KafkaEvents.EventMediaReadOutNameAndType,
KafkaEvents.EventMediaMetadataSearchPerformed
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
@ -50,7 +50,8 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi
status = Status.COMPLETED,
url = coverUrl,
outFileBaseName = baseInfo.title,
outDir = fileOut.outDirectory
outDir = fileOut.outDirectory,
derivedFromEventId = event.eventId
)
}
}

View File

@ -39,23 +39,23 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
get() = KafkaEvents.EventMediaReadOutNameAndType
val waitingProcessesForMeta: MutableMap<String, LocalDateTime> = mutableMapOf()
val waitingProcessesForMeta: MutableMap<String, MetadataTriggerData> = mutableMapOf()
override val listensForEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED
KafkaEvents.EventMediaReadBaseInfoPerformed,
KafkaEvents.EventMediaMetadataSearchPerformed
)
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed?
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EventMediaReadBaseInfoPerformed) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val meta = events.lastOrSuccessOf(KafkaEvents.EventMediaMetadataSearchPerformed) { it.data is MetadataPerformed }?.data as MetadataPerformed?
// Only Return here as both baseInfo events are required to continue
if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) {
if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EventMediaReadOutNameAndType }) {
return null
}
if (baseInfo.isSuccess() && meta == null) {
@ -65,7 +65,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
log.info { "Sending ${baseInfo?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
if (!waitingProcessesForMeta.containsKey(event.referenceId)) {
waitingProcessesForMeta[event.referenceId] = LocalDateTime.now()
waitingProcessesForMeta[event.referenceId] = MetadataTriggerData(event.eventId, LocalDateTime.now())
}
return null
}
@ -92,9 +92,9 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
val vi = fileDeterminate.getDeterminedVideoInfo()?.toJsonObject()
return if (vi != null) {
VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath)
VideoInfoPerformed(Status.COMPLETED, vi, outDirectory = outputDirectory.absolutePath, event.eventId)
} else {
SimpleMessageData(Status.ERROR, "No VideoInfo found...")
SimpleMessageData(Status.ERROR, "No VideoInfo found...", event.eventId)
}
}
@ -103,13 +103,15 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
@Scheduled(fixedDelay = (1_000))
fun sendErrorMessageForMetadata() {
val expired = waitingProcessesForMeta.filter {
LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + metadataTimeout)
LocalDateTime.now().toEpochSeconds() > (it.value.executed.toEpochSeconds() + metadataTimeout)
}
expired.forEach {
log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}"))
producer.sendMessage(it.key, KafkaEvents.EventMediaMetadataSearchPerformed, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOut::class.simpleName}", derivedFromEventId = it.value.eventId))
waitingProcessesForMeta.remove(it.key)
}
}
data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime)
}

View File

@ -25,10 +25,10 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED
get() = KafkaEvents.EventMediaParseStreamPerformed
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
KafkaEvents.EventMediaReadStreamPerformed
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
@ -39,11 +39,11 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null
return parseStreams(desiredEvent.data as ReaderPerformed)
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EventMediaReadStreamPerformed) ?: return null
return parseStreams(desiredEvent.data as ReaderPerformed, desiredEvent.eventId)
}
fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
fun parseStreams(data: ReaderPerformed, eventId: String): MessageDataWrapper {
val gson = Gson()
return try {
val jStreams = data.output.getAsJsonArray("streams")
@ -71,11 +71,11 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
audioStream = audioStreams,
subtitleStream = subtitleStreams
)
MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams)
MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams, eventId)
} catch (e: Exception) {
e.printStackTrace()
SimpleMessageData(Status.ERROR, message = e.message)
SimpleMessageData(Status.ERROR, message = e.message, eventId)
}
}

View File

@ -26,10 +26,10 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
get() = KafkaEvents.EventMediaReadStreamPerformed
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_PROCESS_STARTED
KafkaEvents.EventMediaProcessStarted
)
@ -43,18 +43,18 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) }
return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted, desiredEvent.eventId) }
}
suspend fun fileReadStreams(started: MediaProcessStarted): MessageDataWrapper {
suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): MessageDataWrapper {
val file = File(started.file)
return if (file.exists() && file.isFile) {
val result = readStreams(file)
val joined = result.output.joinToString(" ")
val jsoned = Gson().fromJson(joined, JsonObject::class.java)
ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned)
ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned, derivedFromEventId = eventId)
} else {
SimpleMessageData(Status.ERROR, "File in data is not a file or does not exist")
SimpleMessageData(Status.ERROR, "File in data is not a file or does not exist", eventId)
}
}

View File

@ -12,12 +12,11 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkReques
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkerArgumentsCreated
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) : TaskCreator(coordinator) {
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }?.data as MediaProcessStarted?
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }?.data as MediaProcessStarted?
if (started == null) {
return null
}
@ -26,7 +25,7 @@ abstract class CreateProcesserWorkTask(override var coordinator: Coordinator) :
return null
}
val proceed = events.find { it.event == KafkaEvents.EVENT_MEDIA_WORK_PROCEED_PERMITTED }
val proceed = events.find { it.event == KafkaEvents.EventMediaWorkProceedPermitted }
if (proceed == null && started.type == ProcessType.MANUAL) {
log.warn { "${event.referenceId} waiting for Proceed event due to Manual process" }
return null

View File

@ -24,14 +24,14 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
val preference = Preference.getPreference()
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED
get() = KafkaEvents.EventMediaParameterEncodeCreated
override val requiredEvents: List<KafkaEvents> =
listOf(
KafkaEvents.EVENT_MEDIA_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
KafkaEvents.EventMediaProcessStarted,
KafkaEvents.EventMediaReadBaseInfoPerformed,
KafkaEvents.EventMediaParseStreamPerformed,
KafkaEvents.EventMediaReadOutNameAndType
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
@ -61,7 +61,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
}
if (videoInfoWrapper == null || videoInfo == null) {
log.error { "${KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE} result is read as null" }
log.error { "${KafkaEvents.EventMediaReadOutNameAndType} result is read as null" }
return null
}
@ -74,7 +74,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
outDir = File(videoInfoWrapper.outDirectory),
preference = preference.encodePreference,
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
serializedParsedStreams = serializedParsedStreams,
eventId = event.eventId
)
}
@ -84,7 +85,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
outDir: File,
preference: EncodingPreference,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
serializedParsedStreams: ParsedMediaStreams,
eventId: String
): MessageDataWrapper {
val outVideoFile = outDir.using("${outFullName}.mp4").absolutePath
@ -97,7 +99,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
val vaArgs = toFfmpegWorkerArguments(vArg, aArg)
return if (vaArgs.isEmpty()) {
SimpleMessageData(Status.ERROR, message = "Unable to produce arguments")
SimpleMessageData(Status.ERROR, message = "Unable to produce arguments", derivedFromEventId = eventId)
} else {
FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
@ -107,7 +109,8 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator
outputFile = outVideoFile,
arguments = vaArgs
)
)
),
derivedFromEventId = eventId
)
}
}

View File

@ -28,13 +28,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
val preference = Preference.getPreference()
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
get() = KafkaEvents.EventMediaParameterExtractCreated
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_MEDIA_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
KafkaEvents.EventMediaProcessStarted,
KafkaEvents.EventMediaReadBaseInfoPerformed,
KafkaEvents.EventMediaParseStreamPerformed,
KafkaEvents.EventMediaReadOutNameAndType
)
@ -64,7 +64,7 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
val videoInfo = videoInfoWrapper?.toValueObject()
if (videoInfoWrapper == null || videoInfo == null) {
log.error { "${KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE} result is read as null" }
log.error { "${KafkaEvents.EventMediaReadOutNameAndType} result is read as null" }
return null
}
@ -73,7 +73,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
outFullName = videoInfo.fullName,
outDir = File(videoInfoWrapper.outDirectory),
baseInfo = baseInfo,
serializedParsedStreams = serializedParsedStreams
serializedParsedStreams = serializedParsedStreams,
eventId = event.eventId
)
}
@ -82,7 +83,8 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
outFullName: String,
outDir: File,
baseInfo: BaseInfoPerformed,
serializedParsedStreams: ParsedMediaStreams
serializedParsedStreams: ParsedMediaStreams,
eventId: String
): MessageDataWrapper? {
val subRootDir = outDir.using("sub")
val sArg = SubtitleArguments(serializedParsedStreams.subtitleStream).getSubtitleArguments()
@ -94,12 +96,13 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato
)
}
if (entries.isEmpty()) {
return SimpleMessageData(status = Status.SKIPPED, "No entries found!")
return SimpleMessageData(status = Status.SKIPPED, "No entries found!", derivedFromEventId = eventId)
}
return FfmpegWorkerArgumentsCreated(
status = Status.COMPLETED,
inputFile = inputFile,
entries = entries
entries = entries,
derivedFromEventId = eventId
)
}

View File

@ -53,10 +53,17 @@ dependencies {
implementation(project(mapOf("path" to ":shared:kafka")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("io.mockk:mockk:1.12.0")
testImplementation("com.h2database:h2:1.4.200")
testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2")
testImplementation("io.kotlintest:kotlintest-assertions:3.3.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0")
implementation(kotlin("stdlib-jdk8"))
}

View File

@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.NotificationOfDeletionPerformed
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
@ -22,6 +23,15 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
val io = Coroutines.io()
override val listeners = PersistentEventProcessBasedMessageListener()
private val coordinatorEventListeners: MutableList<CoordinatorEvents> = mutableListOf()
fun getRegisteredEventListeners() = coordinatorEventListeners.toList()
fun addCoordinatorEventListener(listener: CoordinatorEvents) {
coordinatorEventListeners.add(listener)
}
fun removeCoordinatorEventListener(listener: CoordinatorEvents) {
coordinatorEventListeners.remove(listener)
}
override fun createTasksBasedOnEventsAndPersistence(
referenceId: String,
eventId: String,
@ -40,16 +50,18 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
}
override fun onMessageReceived(event: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (!processKafkaEvents.contains(event.key)) {
if (!acceptEvents.contains(event.key)) {
return
}
if (event.key == KafkaEvents.EventNotificationOfWorkItemRemoval) {
handleDeletionOfEvents(event)
return
}
val success = persistentWriter.storeProcessDataMessage(event.key.event, event.value)
val success = eventManager.setProcessEvent(event.key, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database ${getEventsDatabase().database}" }
} else {
deleteOlderEventsIfSuperseded(event.key, event.value)
io.launch {
delay(500)
readAllMessagesFor(event.value.referenceId, event.value.eventId)
@ -57,29 +69,20 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
}
}
fun deleteOlderEventsIfSuperseded(event: KafkaEvents, value: Message<out MessageDataWrapper>) {
val existingMessages = persistentReader.getMessagesFor(value.referenceId)
val workItems = existingMessages.filter { KafkaEvents.isOfWork(it.event) }
if (KafkaEvents.isOfWork(event)) {
// Here i would need to list all of the work events, then proceed to check which one of the derivedId does not correspond to a entry
// Nonmatching has been superseded
val superseded = existingMessages.filter { it.event == event && it.eventId != value.eventId }
superseded.forEach {
persistentWriter.deleteStoredEventDataMessage(referenceId = it.referenceId, eventId = it.eventId, event= it.event )
private fun handleDeletionOfEvents(kafkaPayload: DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) {
if (kafkaPayload.value.data is NotificationOfDeletionPerformed) {
val data = kafkaPayload.value.data as NotificationOfDeletionPerformed
if (data.deletedEvent in processKafkaEvents) {
coordinatorEventListeners.forEach { it.onCancelOrStopProcess(data.deletedEventId) }
eventManager.deleteProcessEvent(kafkaPayload.value.referenceId, data.deletedEventId)
}
} else {
log.warn { "Deletion handling was triggered with wrong data" }
}
}
fun readAllAvailableInQueue() {
val messages = persistentReader.getAvailableProcessEvents()
val messages = eventManager.getProcessEventsClaimable()
io.launch {
messages.forEach {
delay(1000)
@ -89,15 +92,19 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
}
fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = persistentReader.getAvailableProcessEvents()
val messages = eventManager.getProcessEventsClaimable()
createTasksBasedOnEventsAndPersistence(referenceId, eventId, messages)
}
val processKafkaEvents = listOf(
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
private final val processKafkaEvents = listOf(
KafkaEvents.EventWorkEncodeCreated,
KafkaEvents.EventWorkExtractCreated,
)
private final val acceptEvents = listOf(
KafkaEvents.EventNotificationOfWorkItemRemoval
) + processKafkaEvents
@Scheduled(fixedDelay = (5_000))
fun checkForWork() {
@ -105,4 +112,8 @@ class Coordinator(): CoordinatorBase<PersistentProcessDataMessage, PersistentEve
readAllAvailableInQueue()
}
interface CoordinatorEvents {
fun onCancelOrStopProcess(eventId: String)
}
}

View File

@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.processerEvents
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
@ -25,16 +26,17 @@ fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var persistentReader: PersistentDataReader
lateinit var persistentWriter: PersistentDataStore
lateinit var eventManager: PersistentEventManager
fun main(args: Array<String>) {
eventsDatabase = DatabaseEnvConfig.toEventsDatabase()
eventsDatabase.createDatabase()
eventsDatabase.createTables(processerEvents)
persistentReader = PersistentDataReader(eventsDatabase)
persistentWriter = PersistentDataStore(eventsDatabase)
eventManager = PersistentEventManager(eventsDatabase)
val context = runApplication<ProcesserApplication>(*args)
}

View File

@ -0,0 +1,23 @@
package no.iktdev.mediaprocessing.processer.controller
import no.iktdev.mediaprocessing.processer.Coordinator
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.stereotype.Controller
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
@Controller
class CancelController(@Autowired var coordinator: Coordinator) {
@RequestMapping(path = ["/cancel"])
fun cancelProcess(@RequestBody eventId: String? = null): ResponseEntity<String> {
if (eventId.isNullOrBlank()) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body("No eventId provided!")
}
coordinator.getRegisteredEventListeners().forEach { it.onCancelOrStopProcess(eventId) }
return ResponseEntity.ok(null)
}
}

View File

@ -2,17 +2,30 @@ package no.iktdev.mediaprocessing.processer.ffmpeg
import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.eventManager
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import java.io.File
import java.time.Duration
class FfmpegWorker(
val referenceId: String,
val eventId: String,
val info: FfmpegWorkRequestCreated,
val listener: FfmpegWorkerEvents,
val logDir: File
) {
private val scope = Coroutines.io()
private var job: Job? = null
fun isWorking(): Boolean {
return job != null && (job?.isCompleted != true) && scope.isActive
}
class FfmpegWorker(val referenceId: String, val eventId: String, val info: FfmpegWorkRequestCreated, val listener: FfmpegWorkerEvents, val logDir: File) {
val scope = Coroutines.io()
val decoder = FfmpegProgressDecoder()
private val outputCache = mutableListOf<String>()
private val log = KotlinLogging.logger {}
@ -44,20 +57,41 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
}
}
suspend fun run() {
fun run() {
val args = FfmpegWorkerArgumentsBuilder().using(info).build()
execute(args)
job = scope.launch {
execute(args)
}
}
suspend fun runWithProgress() {
fun runWithProgress() {
val args = FfmpegWorkerArgumentsBuilder().using(info).buildWithProgress()
execute(args)
job = scope.launch {
execute(args)
}
}
private suspend fun startIAmAlive() {
scope.launch {
while (scope.isActive && job?.isCompleted != true) {
delay(Duration.ofMinutes(5).toMillis())
listener.onIAmAlive(referenceId, eventId)
}
}
}
fun cancel(message: String = "Work was interrupted as requested") {
job?.cancel()
scope.cancel(message)
listener.onError(referenceId, eventId, info, message)
}
private suspend fun execute(args: List<String>) {
withContext(Dispatchers.IO) {
logFile.createNewFile()
}
startIAmAlive()
listener.onStarted(referenceId, eventId, info)
val processOp = process(
ProcesserEnv.ffmpeg, *args.toTypedArray(),
@ -67,7 +101,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
//log.info { it }
onOutputChanged(it)
},
destroyForcibly = true)
destroyForcibly = true
)
val result = processOp
onOutputChanged("Received exit code: ${result.resultCode}")
@ -86,7 +121,7 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
decoder.parseVideoProgress(outputCache.toList())?.let { decoded ->
try {
val _progress = decoder.getProgress(decoded)
if (progress == null || _progress.progress > (progress?.progress ?: -1) ) {
if (progress == null || _progress.progress > (progress?.progress ?: -1)) {
progress = _progress
listener.onProgressChanged(referenceId, eventId, info, _progress)
}
@ -107,8 +142,14 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe
}
interface FfmpegWorkerEvents {
fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated,)
fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated)
fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated)
fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String)
fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress)
fun onProgressChanged(
referenceId: String,
eventId: String,
info: FfmpegWorkRequestCreated,
progress: FfmpegDecodedProgress
)
fun onIAmAlive(referenceId: String, eventId: String) {}
}

View File

@ -2,10 +2,7 @@ package no.iktdev.mediaprocessing.processer.services
import mu.KotlinLogging
import no.iktdev.mediaprocessing.processer.Coordinator
import no.iktdev.mediaprocessing.processer.persistentReader
import no.iktdev.mediaprocessing.processer.persistentWriter
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.processer.eventManager
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
@ -21,13 +18,12 @@ class ClaimsService() {
@Scheduled(fixedDelay = (300_000))
fun validateClaims() {
val expiredClaims = persistentReader.getExpiredClaimsProcessEvents()
val expiredClaims = eventManager.getProcessEventsWithExpiredClaim()
expiredClaims.forEach {
log.info { "Found event with expired claim: ${it.referenceId}::${it.eventId}::${it.event}" }
}
val store = persistentWriter
expiredClaims.forEach {
val result = store.releaseProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
val result = eventManager.deleteProcessEventClaim(referenceId = it.referenceId, eventId = it.eventId)
if (result) {
log.info { "Released claim on ${it.referenceId}::${it.eventId}::${it.event}" }
} else {

View File

@ -28,19 +28,29 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
private val log = KotlinLogging.logger {}
private val logDir = ProcesserEnv.encodeLogDirectory
override val producesEvent = KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
override val producesEvent = KafkaEvents.EventWorkEncodePerformed
override val requiredEvents: List<KafkaEvents> = listOf(
KafkaEvents.EVENT_WORK_ENCODE_CREATED
KafkaEvents.EventWorkEncodeCreated
)
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
private final val coordinatorEvents = object: Coordinator.CoordinatorEvents {
override fun onCancelOrStopProcess(eventId: String) {
cancelWorkIfRunning(eventId)
}
}
init {
log.info { "Starting with id: $serviceId" }
}
override fun attachListener() {
super.attachListener()
coordinator.addCoordinatorEventListener(listener = coordinatorEvents)
}
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
@ -53,16 +63,16 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
return null
}
if (event.data !is FfmpegWorkRequestCreated) {
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId)
}
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runnerJob?.isActive != true) {
if (runner?.isWorking() != true) {
startEncode(event)
} else {
log.warn { "Worker is already running.." }
@ -78,19 +88,17 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
logDir.mkdirs()
}
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents )
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
return
}
runnerJob = scope.launch {
runner!!.runWithProgress()
}
runner?.runWithProgress()
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
@ -105,7 +113,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
return
}
log.info { "Encode started for ${runner.referenceId}" }
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(referenceId, eventId, status = WorkStatus.Started, info, FfmpegDecodedProgress(
progress = 0,
time = "Unkown",
@ -113,13 +121,6 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
speed = "0",
)
)
scope.launch {
while (runnerJob?.isActive == true) {
delay(java.time.Duration.ofMinutes(5).toMillis())
persistentWriter.updateCurrentProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
}
}
}
override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) {
@ -129,18 +130,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
return
}
log.info { "Encode completed for ${runner.referenceId}" }
val consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
val consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
runBlocking {
delay(1000)
if (!consumedIsSuccessful) {
persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
}
delay(1000)
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
while (!readbackIsSuccess) {
delay(1000)
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
}
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
@ -179,6 +180,10 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
sendProgress(referenceId, eventId, WorkStatus.Working, info, progress)
}
override fun onIAmAlive(referenceId: String, eventId: String) {
super.onIAmAlive(referenceId, eventId)
eventManager.setProcessEventClaimRefresh(referenceId, eventId, serviceId)
}
}
fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) {
@ -195,13 +200,20 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
fun clearWorker() {
this.runner?.scope?.cancel()
this.runner = null
}
@PreDestroy
fun shutdown() {
scope.cancel()
runner?.scope?.cancel("Stopping application")
runner?.cancel("Stopping application")
}
fun cancelWorkIfRunning(eventId: String) {
if (runner?.eventId == eventId) {
runner?.cancel()
}
}
}

View File

@ -30,12 +30,11 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
private val logDir = ProcesserEnv.extractLogDirectory
override val producesEvent = KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
override val producesEvent = KafkaEvents.EventWorkExtractPerformed
val scope = Coroutines.io()
private var runner: FfmpegWorker? = null
private var runnerJob: Job? = null
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
init {
@ -43,7 +42,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
}
override val requiredEvents: List<KafkaEvents>
get() = listOf(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
get() = listOf(KafkaEvents.EventWorkExtractCreated)
override fun prerequisitesRequired(events: List<PersistentProcessDataMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
@ -56,16 +55,16 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
return null
}
if (event.data !is FfmpegWorkRequestCreated) {
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}")
return SimpleMessageData(status = Status.ERROR, message = "Invalid data (${event.data.javaClass.name}) passed for ${event.event.event}", event.eventId)
}
val isAlreadyClaimed = persistentReader.isProcessEventAlreadyClaimed(referenceId = event.referenceId, eventId = event.eventId)
val isAlreadyClaimed = eventManager.isProcessEventClaimed(referenceId = event.referenceId, eventId = event.eventId)
if (isAlreadyClaimed) {
log.warn { "Process is already claimed!" }
return null
}
if (runnerJob?.isActive != true) {
if (runner?.isWorking() != true) {
startExtract(event)
} else {
log.warn { "Worker is already running.." }
@ -82,7 +81,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
}
val setClaim = persistentWriter.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimedBy = serviceId)
val setClaim = eventManager.setProcessEventClaim(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents)
@ -90,12 +89,10 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") {
ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}")
// Setting consumed to prevent spamming
persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId)
eventManager.setProcessEventCompleted(event.referenceId, event.eventId)
return
}
runnerJob = scope.launch {
runner!!.run()
}
runner!!.run()
} else {
log.error { "Failed to set claim on referenceId: ${event.referenceId} on event ${event.event}" }
}
@ -110,7 +107,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
return
}
log.info { "Extract started for ${runner.referenceId}" }
persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
eventManager.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId)
sendProgress(referenceId, eventId, WorkStatus.Started, info)
}
@ -121,12 +118,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
return
}
log.info { "Extract completed for ${runner.referenceId}" }
var consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
var consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
runBlocking {
delay(1000)
limitedWhile({!consumedIsSuccessful}, 1000 * 10, 1000) {
consumedIsSuccessful = persistentWriter.setProcessEventCompleted(runner.referenceId, runner.eventId, serviceId)
consumedIsSuccessful = eventManager.setProcessEventCompleted(runner.referenceId, runner.eventId)
}
log.info { "Database is reporting extract on ${runner.referenceId} as ${if (consumedIsSuccessful) "CONSUMED" else "NOT CONSUMED"}" }
@ -134,9 +131,9 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
var readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
var readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
limitedWhile({!readbackIsSuccess}, 1000 * 30, 1000) {
readbackIsSuccess = persistentReader.isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, serviceId)
readbackIsSuccess = eventManager.isProcessEventCompleted(runner.referenceId, runner.eventId)
log.info { readbackIsSuccess }
}
log.info { "Database is reporting readback for extract on ${runner.referenceId} as ${if (readbackIsSuccess) "CONSUMED" else "NOT CONSUMED"}" }
@ -189,13 +186,12 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
fun clearWorker() {
this.runner?.scope?.cancel()
this.runner = null
}
@PreDestroy
fun shutdown() {
scope.cancel()
runner?.scope?.cancel("Stopping application")
runner?.cancel("Stopping application")
}
}

View File

@ -0,0 +1,4 @@
package no.iktdev.mediaprocessing.processer.services
class EncodeServiceTest {
}

View File

@ -59,7 +59,7 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
private fun getCurrentState(events: List<PersistentMessage>, processes: Map<String, EventSummarySubItem>): SummaryState {
val stored = events.findLast { it.event == KafkaEvents.EVENT_COLLECT_AND_STORE }
val started = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_STARTED }
val started = events.findLast { it.event == KafkaEvents.EventMediaProcessStarted }
val completedMediaEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED }
val completedRequestEvent = events.findLast { it.event == KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED }
@ -79,9 +79,9 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
}
val workPrepared = events.filter { it.event in listOf(
KafkaEvents.EVENT_WORK_EXTRACT_CREATED,
KafkaEvents.EVENT_WORK_CONVERT_CREATED,
KafkaEvents.EVENT_WORK_ENCODE_CREATED
KafkaEvents.EventWorkExtractCreated,
KafkaEvents.EventWorkConvertCreated,
KafkaEvents.EventWorkEncodeCreated
) }
if (workPrepared.isNotEmpty()) {
return SummaryState.Pending
@ -92,29 +92,29 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
}
val perparation = events.filter { it.event in listOf(
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED,
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
KafkaEvents.EventMediaParameterExtractCreated,
KafkaEvents.EventMediaParameterEncodeCreated,
) }
if (perparation.isNotEmpty()) {
return SummaryState.Preparing
}
val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) }
val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EventMediaReadOutNameAndType) }
if (analyzed2 != null) {
return SummaryState.Analyzing
}
val waitingForMeta = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED }
val waitingForMeta = events.findLast { it.event == KafkaEvents.EventMediaMetadataSearchPerformed }
if (waitingForMeta != null) {
return SummaryState.Metadata
}
val analyzed = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) }
val analyzed = events.findLast { it.event in listOf(KafkaEvents.EventMediaParseStreamPerformed, KafkaEvents.EventMediaReadBaseInfoPerformed, KafkaEvents.EventMediaReadOutNameAndType) }
if (analyzed != null) {
return SummaryState.Analyzing
}
val readEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED }
val readEvent = events.findLast { it.event == KafkaEvents.EventMediaReadStreamPerformed }
if (readEvent != null) {
return SummaryState.Read
}
@ -133,10 +133,10 @@ class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : Coo
val processesStatuses = getCurrentStateFromProcesserEvents(procM)
val messageStatus = getCurrentState(it, processesStatuses)
val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED }?.data.let { data ->
val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EventMediaReadBaseInfoPerformed }?.data.let { data ->
if (data is BaseInfoPerformed) data else null
}
val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }?.data.let { data ->
val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EventMediaReadOutNameAndType }?.data.let { data ->
if (data is VideoInfoPerformed) data else null
}

View File

@ -1,7 +1,7 @@
plugins {
id("java")
kotlin("jvm")
id("org.jetbrains.kotlin.plugin.serialization") version "1.5.0" // Legg til Kotlin Serialization-plugin
}
group = "no.iktdev.mediaprocessing.shared"
@ -48,9 +48,13 @@ dependencies {
testImplementation("io.mockk:mockk:1.12.0")
testImplementation("com.h2database:h2:1.4.200")
testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2")
testImplementation("io.kotlintest:kotlintest-assertions:3.3.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0")
}
tasks.test {

View File

@ -20,6 +20,8 @@ abstract class DataSource(val config: DatabaseConnectionConfig) {
abstract fun toConnectionUrl(): String
abstract fun toDatabaseConnectionUrl(database: String): String
fun toPortedAddress(): String {
var baseAddress = config.address
if (!config.port.isNullOrBlank()) {

View File

@ -17,7 +17,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
override fun createDatabase(): Database? {
val ok = transaction(toDatabaseServerConnection()) {
val tmc = TransactionManager.current().connection
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}'"
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '${config.databaseName}';"
val stmt = tmc.prepareStatement(query, true)
val resultSet = stmt.executeQuery()
@ -52,7 +52,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
}
override fun createDatabaseStatement(): String {
return "CREATE DATABASE ${config.databaseName}"
return "CREATE DATABASE ${config.databaseName};"
}
protected fun toDatabaseServerConnection(): Database {
@ -66,7 +66,7 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
override fun toDatabase(): Database {
val database = Database.connect(
"${toConnectionUrl()}/${config.databaseName}",
toDatabaseConnectionUrl(config.databaseName),
user = config.username,
password = config.password
)
@ -74,6 +74,10 @@ open class MySqlDataSource(conf: DatabaseConnectionConfig): DataSource(conf) {
return database
}
override fun toDatabaseConnectionUrl(database: String): String {
return toConnectionUrl() + "/$database"
}
override fun toConnectionUrl(): String {
return "jdbc:mysql://${toPortedAddress()}"
}

View File

@ -1,10 +1,12 @@
package no.iktdev.mediaprocessing.shared.common.datasource
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.transactions.transaction
import java.sql.Connection
import java.sql.SQLIntegrityConstraintViolationException
open class TableDefaultOperations<T : Table> {
@ -46,6 +48,11 @@ fun <T> withTransaction(db: Database? = null, block: () -> T): T? {
null
}
}
fun <T> withTransaction(db: DataSource? = null, block: () -> T): T? {
return withTransaction(db?.database, block)
}
fun <T> insertWithSuccess(db: Database? = null, block: () -> T): Boolean {
return try {
@ -125,6 +132,18 @@ fun <T> executeWithStatus(db: Database? = null, block: () -> T): Boolean {
}
}
fun <T> executeWithStatus(db: DataSource? = null, block: () -> T): Boolean {
return executeWithStatus(db?.database, block)
}
fun Exception.isExposedSqlException(): Boolean {
return this is ExposedSQLException
}
fun ExposedSQLException.isCausedByDuplicateError(): Boolean {
return if (this.cause is SQLIntegrityConstraintViolationException) {
return this.errorCode == 1062
} else false
}

View File

@ -11,6 +11,7 @@ import java.time.LocalDateTime
class PersistentDataReader(var dataSource: DataSource) {
val dzz = DeserializingRegistry()
@Deprecated("Use PersistentEventManager.getAllEventsGrouped")
fun getAllMessages(): List<List<PersistentMessage>> {
val events = withTransaction(dataSource.database) {
events.selectAll()
@ -19,6 +20,7 @@ class PersistentDataReader(var dataSource: DataSource) {
return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getEvetnsWith")
fun getMessagesFor(referenceId: String): List<PersistentMessage> {
return withTransaction(dataSource.database) {
events.select { events.referenceId eq referenceId }
@ -27,6 +29,7 @@ class PersistentDataReader(var dataSource: DataSource) {
} ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getEventsUncompleted")
fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withDirtyRead(dataSource.database) {
events.selectAll()
@ -37,6 +40,7 @@ class PersistentDataReader(var dataSource: DataSource) {
return result
}
@Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted")
fun isProcessEventAlreadyClaimed(referenceId: String, eventId: String): Boolean {
val result = withDirtyRead(dataSource.database) {
processerEvents.select {
@ -47,6 +51,7 @@ class PersistentDataReader(var dataSource: DataSource) {
return result?.claimed ?: true
}
@Deprecated(message = "Use PersistentEventManager.isProcessEventCompleted")
fun isProcessEventDefinedAsConsumed(referenceId: String, eventId: String, claimedBy: String): Boolean {
return withDirtyRead(dataSource.database) {
processerEvents.select {
@ -57,6 +62,7 @@ class PersistentDataReader(var dataSource: DataSource) {
}?.singleOrNull()?.consumed ?: false
}
@Deprecated(message = "Use PersistentEventManager.getProcessEventsClaimable")
fun getAvailableProcessEvents(): List<PersistentProcessDataMessage> {
return withDirtyRead(dataSource.database) {
processerEvents.select {
@ -66,6 +72,7 @@ class PersistentDataReader(var dataSource: DataSource) {
} ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getProcessEventsWithExpiredClaim")
fun getExpiredClaimsProcessEvents(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
val entries = withTransaction(dataSource.database) {
@ -77,6 +84,7 @@ class PersistentDataReader(var dataSource: DataSource) {
return entries.filter { it.lastCheckIn == null || it.lastCheckIn.plusMinutes(15) < deadline }
}
@Deprecated("Use PersistentEventManager.getProcessEventWith")
fun getProcessEvent(referenceId: String, eventId: String): PersistentProcessDataMessage? {
val message = withDirtyRead(dataSource.database) {
processerEvents.select {
@ -87,6 +95,7 @@ class PersistentDataReader(var dataSource: DataSource) {
return message
}
@Deprecated("Use PersistentEventManager.getAllEventsProcesser")
fun getProcessEvents(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.selectAll()

View File

@ -0,0 +1,292 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import java.sql.SQLIntegrityConstraintViolationException
import java.time.LocalDateTime
import javax.xml.crypto.Data
import kotlin.coroutines.coroutineContext
private val log = KotlinLogging.logger {}
class PersistentEventManager(private val dataSource: DataSource) {
val dzz = DeserializingRegistry()
/**
* Deletes the events
*/
private fun deleteSupersededEvents(superseded: List<PersistentMessage>) {
withTransaction(dataSource) {
superseded.forEach { duplicate ->
events.deleteWhere {
(events.referenceId eq duplicate.referenceId) and
(events.eventId eq duplicate.eventId) and
(events.event eq duplicate.event.event)
}
}
}
}
/**
* @param referenceId Reference
* @param eventId Current eventId for the message, required to prevent deletion of itself
* @param event Current event for the message
*/
private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents) {
val present = getEventsWith(referenceId).filter { it.eventId != eventId }
val superseded = present.filter { it.event == event && it.eventId != eventId }
val availableForRemoval = mutableListOf<PersistentMessage>()
val helper = PersistentMessageHelper(present)
superseded.forEach { availableForRemoval.addAll(helper.getCascadingFrom(it.eventId)) }
deleteSupersededEvents(availableForRemoval)
}
//region Database read
fun getEventsWith(referenceId: String): List<PersistentMessage> {
return withDirtyRead(dataSource.database) {
events.select {
(events.referenceId eq referenceId)
}
.orderBy(events.created, SortOrder.ASC)
.toPersistentMessage(dzz)
} ?: emptyList()
}
fun getProcessEventWith(referenceId: String, eventId: String): PersistentProcessDataMessage? {
return withDirtyRead(dataSource.database) {
processerEvents.select {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}.toPersistentProcesserMessage(dzz)
}?.singleOrNull()
}
fun getAllEvents(): List<PersistentMessage> {
return withDirtyRead(dataSource.database) {
events.selectAll()
.toPersistentMessage(dzz)
} ?: emptyList()
}
fun getAllEventsGrouped(): List<List<PersistentMessage>> {
return getAllEvents().toGrouped()
}
fun getAllProcessEvents(): List<PersistentProcessDataMessage> {
return withDirtyRead(dataSource.database) {
processerEvents.selectAll()
.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getEventsUncompleted(): List<List<PersistentMessage>> {
val identifiesAsCompleted = listOf(
KafkaEvents.EVENT_REQUEST_PROCESS_COMPLETED,
KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED,
KafkaEvents.EVENT_COLLECT_AND_STORE
)
val all = getAllEventsGrouped()
return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } }
}
fun getProcessEventsUncompleted(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.consumed eq false)
}.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getProcessEventsClaimable(): List<PersistentProcessDataMessage> {
return withTransaction(dataSource.database) {
processerEvents.select {
(processerEvents.consumed eq false) and
(processerEvents.claimed eq false)
}.toPersistentProcesserMessage(dzz)
} ?: emptyList()
}
fun getProcessEventsWithExpiredClaim(): List<PersistentProcessDataMessage> {
val deadline = LocalDateTime.now()
return getProcessEventsUncompleted()
.filter { it.claimed && if (it.lastCheckIn != null) it.lastCheckIn.plusMinutes(15) < deadline else true }
}
fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean {
return getProcessEventWith(referenceId, eventId)?.claimed ?: false
}
fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean {
return getProcessEventWith(referenceId, eventId)?.consumed ?: false
}
//endregion
//region Database write
/**
* Stores the kafka event and its data in the database as PersistentMessage
* @param event KafkaEvents
* @param message Kafka message object
*/
fun setEvent(event: KafkaEvents, message: Message<*>): Boolean {
val existing = getEventsWith(message.referenceId)
val derivedId = message.data?.derivedFromEventId
if (derivedId != null) {
val isNewEventOrphan = existing.none { it.eventId == derivedId }
if (isNewEventOrphan) {
log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) has derivedEventId($derivedId) which does not exist!" }
return false
}
}
val exception = executeOrException(dataSource.database) {
events.insert {
it[referenceId] = message.referenceId
it[eventId] = message.eventId
it[events.event] = event.event
it[data] = message.dataAsJson()
}
}
val success = if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.info { "Error is of SQLIntegrityConstraintViolationException" }
} else {
log.info { "Error code is: ${exception.errorCode}" }
exception.printStackTrace()
}
} else {
exception.printStackTrace()
}
false
} else {
true
}
if (success) {
deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event)
}
return success
}
fun setProcessEvent(event: KafkaEvents, message: Message<*>): Boolean {
val exception = executeOrException(dataSource.database) {
processerEvents.insert {
it[processerEvents.referenceId] = message.referenceId
it[processerEvents.eventId] = message.eventId
it[processerEvents.event] = event.event
it[processerEvents.data] = message.dataAsJson()
}
}
return if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.info { "Error is of SQLIntegrityConstraintViolationException" }
} else {
log.info { "Error code is: ${exception.errorCode}" }
exception.printStackTrace()
}
}
false
} else {
true
}
}
fun setProcessEventClaim(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource.database) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq false) and
(processerEvents.consumed eq false)
}) {
it[claimedBy] = claimer
it[lastCheckIn] = CurrentDateTime
it[claimed] = true
}
}
}
fun setProcessEventCompleted(referenceId: String, eventId: String): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[consumed] = true
it[claimed] = true
}
}
}
fun setProcessEventClaimRefresh(referenceId: String, eventId: String, claimer: String): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId) and
(processerEvents.claimed eq true) and
(processerEvents.claimedBy eq claimer)
}) {
it[lastCheckIn] = CurrentDateTime
}
}
}
/**
* Removes the claim set on the process event
*/
fun deleteProcessEventClaim(referenceId: String, eventId: String): Boolean {
return executeWithStatus(dataSource) {
processerEvents.update({
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}) {
it[claimed] = false
it[claimedBy] = null
it[lastCheckIn] = null
}
}
}
fun deleteProcessEvent(referenceId: String, eventId: String): Boolean {
return executeWithStatus (dataSource) {
processerEvents.deleteWhere {
(processerEvents.referenceId eq referenceId) and
(processerEvents.eventId eq eventId)
}
}
}
//endregion
}
fun List<PersistentMessage>?.toGrouped(): List<List<PersistentMessage>> {
return this?.groupBy { it.referenceId }?.mapNotNull { it.value } ?: emptyList()
}
fun Query?.toPersistentMessage(dzz: DeserializingRegistry): List<PersistentMessage> {
return this?.mapNotNull { fromRowToPersistentMessage(it, dzz) } ?: emptyList()
}
fun Query?.toPersistentProcesserMessage(dzz: DeserializingRegistry): List<PersistentProcessDataMessage> {
return this?.mapNotNull { fromRowToPersistentProcessDataMessage(it, dzz) } ?: emptyList()
}

View File

@ -38,7 +38,40 @@ fun PersistentMessage.isSkipped(): Boolean {
}
}
class PersistentMessageHelper(val messages: List<PersistentMessage>) {
fun findOrphanedEvents(): List<PersistentMessage> {
val withDerivedId = messages.filter { it.data.derivedFromEventId != null }
val idsFlat = messages.map { it.eventId }
return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat }
}
fun getCascadingFrom(eventId: String): List<PersistentMessage> {
val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList()
val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null }
val derivedEventsMap = mutableMapOf<String, MutableList<String>>()
for (event in usableEvents) {
derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId)
}
val eventsToDelete = mutableSetOf<String>()
// Utfør DFS for å finne alle avledede hendelser som skal slettes
dfs(triggered.eventId, derivedEventsMap, eventsToDelete)
return messages.filter { it.eventId in eventsToDelete }
}
/**
* @param eventId Initial eventId
*/
fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToDelete: MutableSet<String>) {
eventsToDelete.add(eventId)
derivedEventsMap[eventId]?.forEach { derivedEventId ->
dfs(derivedEventId, derivedEventsMap, eventsToDelete)
}
}
}
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try {

View File

@ -25,7 +25,7 @@ abstract class TaskCreatorImpl<C : CoordinatorBase<V, L>, V, L : EventBasedMessa
return Tasks(taskHandler = this, producesEvent = producesEvent, listensForEvents = reactableEvents)
}
@PostConstruct
fun attachListener() {
open fun attachListener() {
coordinator.listeners.add(getListener())
}

View File

@ -11,9 +11,21 @@ import javax.sql.DataSource
class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(
DatabaseConnectionConfig(
databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password, port = null
databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = "", port = null
)
) {
companion object {
val connectionUrl = "jdbc:h2:test;MODE=MySQL" //"jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;"
fun getDatasource(): JdbcDataSource {
val ds = JdbcDataSource()
ds.setUrl(connectionUrl)
ds.user = "test"
ds.password = ""
return ds
}
}
override fun getConnection(): Connection {
return jdbcDataSource.connection
}
@ -61,6 +73,6 @@ class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: Str
}
override fun toConnectionUrl(): String {
return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;"
return connectionUrl // "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;"
}
}

View File

@ -0,0 +1,23 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import org.jetbrains.exposed.sql.Database
class H2DataSource2(conf: DatabaseConnectionConfig): MySqlDataSource(conf) {
override fun createDatabaseStatement(): String {
return "CREATE SCHEMA ${config.databaseName};"
}
override fun toDatabaseConnectionUrl(database: String): String {
return toConnectionUrl()
}
override fun toDatabase(): Database {
return super.toDatabase()
}
override fun toConnectionUrl(): String {
return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;CASE_INSENSITIVE_IDENTIFIERS=TRUE;"
}
}

View File

@ -0,0 +1,56 @@
package no.iktdev.mediaprocessing.shared.common
import kotlinx.serialization.json.*
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.json.JSONArray
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
class PersistentMessageFromJsonDump(events: String) {
private var data: JsonArray?
init {
val jsonArray = Json.parseToJsonElement(events) as JsonArray
data = jsonArray.firstOrNull { it.jsonObject["data"] != null }?.jsonObject?.get("data") as? JsonArray
}
fun getPersistentMessages(): List<PersistentMessage> {
return data?.mapNotNull {
try {
mapToPersistentMessage(it)
} catch (e: Exception) {
System.err.print(it.toString())
e.printStackTrace()
null
}
} ?: emptyList()
}
val dzz = DeserializingRegistry()
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
private fun mapToPersistentMessage(e: JsonElement): PersistentMessage? {
val referenceId: String = e.jsonObject["referenceId"]?.jsonPrimitive?.content ?: throw RuntimeException("No ReferenceId found")
val eventId: String = e.jsonObject["eventId"]?.jsonPrimitive?.content ?: throw RuntimeException("No EventId")
val event: String = e.jsonObject["event"]?.jsonPrimitive?.content ?: throw RuntimeException("No Event")
val data: String = e.jsonObject["data"]?.jsonPrimitive?.content ?: throw RuntimeException("No data")
val created: String = e.jsonObject["created"]?.jsonPrimitive?.content ?: throw RuntimeException("No Created date time found")
val kev = KafkaEvents.toEvent(event) ?: throw RuntimeException("Not able to convert event to Enum")
val dzdata = dzz.deserializeData(kev, data)
return PersistentMessage(
referenceId = referenceId,
eventId = eventId,
event = kev,
data = dzdata,
created = LocalDateTime.parse(created, formatter)
)
}
}

View File

@ -0,0 +1,257 @@
package no.iktdev.mediaprocessing.shared.common.tests
import no.iktdev.mediaprocessing.shared.common.H2DataSource2
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import org.junit.jupiter.api.Test
import java.util.UUID
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.deleteAll
class PersistentEventMangerTest {
val defaultReferenceId = UUID.randomUUID().toString()
val dataSource = H2DataSource2(DatabaseConnectionConfig(
address = "",
username = "",
password = "",
databaseName = "test",
port = null
))
val eventManager: PersistentEventManager = PersistentEventManager(dataSource)
init {
val kafkaTables = listOf(
events, // For kafka
)
dataSource.createDatabase()
dataSource.createTables(*kafkaTables.toTypedArray())
}
@Test
fun testDatabaseIsCreated() {
val success = dataSource.createDatabase()
assertThat(success).isNotNull()
}
@Test
fun testDatabaseInit() {
val referenceId = UUID.randomUUID().toString()
val mStart = Message<MediaProcessStarted>(
referenceId = referenceId,
eventId = UUID.randomUUID().toString(),
data = MediaProcessStarted(
status = Status.COMPLETED,
file = "Nan"
)
)
eventManager.setEvent(KafkaEvents.EventMediaProcessStarted, mStart)
val stored = eventManager.getEventsWith(referenceId);
assertThat(stored).isNotEmpty()
}
@Test
fun testSuperseded1() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage())
val oldStack = listOf(
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
)
eventManager.setEvent(startEvent.event, startEvent.message)
for (entry in oldStack) {
eventManager.setEvent(entry.event, entry.message)
}
val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId)
assertThat(currentTableWithOldStack).hasSize(oldStack.size +1)
val supersedingStack = listOf(
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c"))
).forEach {entry -> eventManager.setEvent(entry.event, entry.message)}
// Final check
val result = eventManager.getEventsWith(defaultReferenceId)
val idsThatShouldBeRemoved = listOf(
"9e8f2e04-4950-437f-a203-cfd566203078",
"af7f2519-0f1d-4679-82bd-0314d1b97b68"
)
val search = result.filter { it.eventId in idsThatShouldBeRemoved }
assertThat(search).isEmpty()
val expectedInList = listOf(
startEvent.message.eventId,
"48c72454-6c7b-406b-b598-fc0a961dabde",
"1d8d995d-a7e4-4d6e-a501-fe82f521cf72",
"f6cae204-7c8e-4003-b598-f7b4e566d03e",
"cbb1e871-e9a5-496d-a655-db719ac4903c",
"98a39721-41ff-4d79-905e-ced260478524",
"2c3a40bb-2225-4dd4-a8c3-32c6356f8764"
)
val searchForExpected = result.map { it.eventId }
assertThat(expectedInList).isEqualTo(searchForExpected)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test
fun testSuperseded2() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {
eventManager.setEvent(it.event, it.message)
}
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)),
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val toBeReplaced = listOf(
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId)
assertThat(currentTableWithOldStack).hasSize(keepStack.size + toBeReplaced.size +1)
val supersedingStack = listOf(
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c"))
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
// Final check
val result = eventManager.getEventsWith(defaultReferenceId)
val idsRemoved = toBeReplaced.map { it.message.eventId }
val search = result.filter { it.eventId in idsRemoved }
assertThat(search).isEmpty()
val expectedInList = listOf(startEvent.message.eventId) + keepStack.map { it.message.eventId } + supersedingStack.map { it.message.eventId }
val searchForExpected = result.map { it.eventId }
assertThat(expectedInList).isEqualTo(searchForExpected)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test
fun testSuperseded3() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {
eventManager.setEvent(it.event, it.message)
}
val keepStack = listOf(
EventToMessage(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(eventId = "48c72454-6c7b-406b-b598-fc0a961dabde", derivedFromEventId = startEvent.message.eventId)),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val toBeReplaced = listOf(
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "1d8d995d-a7e4-4d6e-a501-fe82f521cf72", derivedFromEventId ="48c72454-6c7b-406b-b598-fc0a961dabde")),
EventToMessage(KafkaEvents.EventMediaReadBaseInfoPerformed,
createMessage(eventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e", derivedFromEventId ="1d8d995d-a7e4-4d6e-a501-fe82f521cf72")),
EventToMessage(KafkaEvents.EventMediaMetadataSearchPerformed,
createMessage(eventId = "cbb1e871-e9a5-496d-a655-db719ac4903c", derivedFromEventId = "f6cae204-7c8e-4003-b598-f7b4e566d03e")),
EventToMessage(KafkaEvents.EventMediaReadOutCover,
createMessage(eventId = "98a39721-41ff-4d79-905e-ced260478524", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaReadOutNameAndType,
createMessage(eventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9", derivedFromEventId = "cbb1e871-e9a5-496d-a655-db719ac4903c")),
EventToMessage(KafkaEvents.EventMediaParameterEncodeCreated,
createMessage(eventId = "9e8f2e04-4950-437f-a203-cfd566203078", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
EventToMessage(KafkaEvents.EventMediaParameterExtractCreated,
createMessage(eventId = "af7f2519-0f1d-4679-82bd-0314d1b97b68", derivedFromEventId = "3f376b72-f55a-4dd7-af87-fb1755ba4ad9")),
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
val currentTableWithOldStack = eventManager.getEventsWith(defaultReferenceId)
assertThat(currentTableWithOldStack).hasSize(keepStack.size + toBeReplaced.size +1)
val supersedingStack = listOf(
EventToMessage(KafkaEvents.EventMediaParseStreamPerformed,
createMessage(eventId = "2c3a40bb-2225-4dd4-a8c3-32c6356f8764", derivedFromEventId = "48c72454-6c7b-406b-b598-fc0a961dabde"))
).onEach { entry -> eventManager.setEvent(entry.event, entry.message) }
// Final check
val result = eventManager.getEventsWith(defaultReferenceId)
val idsRemoved = toBeReplaced.map { it.message.eventId }
val search = result.filter { it.eventId in idsRemoved }
assertThat(search).isEmpty()
val expectedInList = listOf(startEvent.message.eventId) + keepStack.map { it.message.eventId } + supersedingStack.map { it.message.eventId }
val searchForExpected = result.map { it.eventId }
assertThat(expectedInList).isEqualTo(searchForExpected)
withTransaction(dataSource) {
events.deleteAll()
}
}
@Test
fun testDerivedOrphanNotInserted() {
val startEvent = EventToMessage(KafkaEvents.EventMediaProcessStarted, createMessage()).also {
eventManager.setEvent(it.event, it.message)
}
val result = eventManager.setEvent(KafkaEvents.EventMediaReadStreamPerformed,
createMessage(derivedFromEventId = UUID.randomUUID().toString()))
assertThat(result).isFalse()
}
data class EventToMessage(val event: KafkaEvents, val message: Message<*>)
private fun createMessage(referenceId: String = defaultReferenceId, eventId: String = UUID.randomUUID().toString(), derivedFromEventId: String? = null): Message<SimpleMessageData>{
return Message<SimpleMessageData>(
referenceId = referenceId,
eventId = eventId,
data = SimpleMessageData(
status = Status.COMPLETED,
message = "Potato",
derivedFromEventId = derivedFromEventId
)
)
}
}

View File

@ -15,30 +15,29 @@ class DeserializingRegistry {
companion object {
val deserializables = mutableMapOf(
KafkaEvents.EVENT_MEDIA_PROCESS_STARTED to MediaProcessStarted::class.java,
KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED to ReaderPerformed::class.java,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED to MediaStreamsParsePerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED to BaseInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED to MetadataPerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE to VideoInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_READ_OUT_COVER to CoverInfoPerformed::class.java,
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EVENT_MEDIA_CONVERT_PARAMETER_CREATED to null,
KafkaEvents.EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED to null,
KafkaEvents.EventMediaProcessStarted to MediaProcessStarted::class.java,
KafkaEvents.EventMediaReadStreamPerformed to ReaderPerformed::class.java,
KafkaEvents.EventMediaParseStreamPerformed to MediaStreamsParsePerformed::class.java,
KafkaEvents.EventMediaReadBaseInfoPerformed to BaseInfoPerformed::class.java,
KafkaEvents.EventMediaMetadataSearchPerformed to MetadataPerformed::class.java,
KafkaEvents.EventMediaReadOutNameAndType to VideoInfoPerformed::class.java,
KafkaEvents.EventMediaReadOutCover to CoverInfoPerformed::class.java,
KafkaEvents.EventMediaParameterEncodeCreated to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EventMediaParameterExtractCreated to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EventMediaParameterConvertCreated to null,
KafkaEvents.EventMediaParameterDownloadCoverCreated to null,
KafkaEvents.EVENT_WORK_ENCODE_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java,
KafkaEvents.EventNotificationOfWorkItemRemoval to NotificationOfDeletionPerformed::class.java,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to ProcesserEncodeWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to ProcesserExtractWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to ConvertWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EventWorkEncodeCreated to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EventWorkExtractCreated to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EventWorkConvertCreated to ConvertWorkerRequest::class.java,
KafkaEvents.EventWorkEncodePerformed to ProcesserEncodeWorkPerformed::class.java,
KafkaEvents.EventWorkExtractPerformed to ProcesserExtractWorkPerformed::class.java,
KafkaEvents.EventWorkConvertPerformed to ConvertWorkPerformed::class.java,
KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null,
KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null,
KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null,
KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED to ProcessCompleted::class.java
)

View File

@ -1,31 +1,34 @@
package no.iktdev.mediaprocessing.shared.kafka.core
enum class KafkaEvents(val event: String) {
EVENT_MEDIA_PROCESS_STARTED("event:media-process:started"),
EventMediaProcessStarted("event:media-process:started"),
EVENT_REQUEST_PROCESS_STARTED("event:request-process:started"),
EVENT_MEDIA_READ_STREAM_PERFORMED("event:media-read-stream:performed"),
EVENT_MEDIA_PARSE_STREAM_PERFORMED("event:media-parse-stream:performed"),
EVENT_MEDIA_READ_BASE_INFO_PERFORMED("event:media-read-base-info:performed"),
EVENT_MEDIA_METADATA_SEARCH_PERFORMED("event:media-metadata-search:performed"),
EVENT_MEDIA_READ_OUT_NAME_AND_TYPE("event:media-read-out-name-and-type:performed"),
EVENT_MEDIA_READ_OUT_COVER("event:media-read-out-cover:performed"),
EventMediaReadStreamPerformed("event:media-read-stream:performed"),
EventMediaParseStreamPerformed("event:media-parse-stream:performed"),
EventMediaReadBaseInfoPerformed("event:media-read-base-info:performed"),
EventMediaMetadataSearchPerformed("event:media-metadata-search:performed"),
EventMediaReadOutNameAndType("event:media-read-out-name-and-type:performed"),
EventMediaReadOutCover("event:media-read-out-cover:performed"),
EVENT_MEDIA_ENCODE_PARAMETER_CREATED("event:media-encode-parameter:created"),
EVENT_MEDIA_EXTRACT_PARAMETER_CREATED("event:media-extract-parameter:created"),
EVENT_MEDIA_CONVERT_PARAMETER_CREATED("event:media-convert-parameter:created"),
EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED("event:media-download-cover-parameter:created"),
EventMediaParameterEncodeCreated("event:media-encode-parameter:created"),
EventMediaParameterExtractCreated("event:media-extract-parameter:created"),
EventMediaParameterConvertCreated("event:media-convert-parameter:created"),
EventMediaParameterDownloadCoverCreated("event:media-download-cover-parameter:created"),
EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted"),
EventMediaWorkProceedPermitted("event:media-work-proceed:permitted"),
EVENT_WORK_ENCODE_CREATED("event:work-encode:created"),
EVENT_WORK_EXTRACT_CREATED("event:work-extract:created"),
EVENT_WORK_CONVERT_CREATED("event:work-convert:created"),
// This event is to be used for commuincating across all appss taht an event has ben removed and to rterminate existint events
EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"),
EVENT_WORK_ENCODE_PERFORMED("event:work-encode:performed"),
EVENT_WORK_EXTRACT_PERFORMED("event:work-extract:performed"),
EVENT_WORK_CONVERT_PERFORMED("event:work-convert:performed"),
EVENT_WORK_DOWNLOAD_COVER_PERFORMED("event:work-download-cover:performed"),
EventWorkEncodeCreated("event:work-encode:created"),
EventWorkExtractCreated("event:work-extract:created"),
EventWorkConvertCreated("event:work-convert:created"),
EventWorkEncodePerformed("event:work-encode:performed"),
EventWorkExtractPerformed("event:work-extract:performed"),
EventWorkConvertPerformed("event:work-convert:performed"),
EventWorkDownloadCoverPerformed("event:work-download-cover:performed"),
EVENT_STORE_VIDEO_PERFORMED("event:store-video:performed"),
@ -45,13 +48,13 @@ enum class KafkaEvents(val event: String) {
fun isOfWork(event: KafkaEvents): Boolean {
return event in listOf(
EVENT_WORK_CONVERT_CREATED,
EVENT_WORK_EXTRACT_CREATED,
EVENT_WORK_ENCODE_CREATED,
EventWorkConvertCreated,
EventWorkExtractCreated,
EventWorkEncodeCreated,
EVENT_WORK_ENCODE_PERFORMED,
EVENT_WORK_CONVERT_PERFORMED,
EVENT_WORK_EXTRACT_PERFORMED
EventWorkEncodePerformed,
EventWorkConvertPerformed,
EventWorkExtractPerformed
)
}
}

View File

@ -3,15 +3,17 @@ package no.iktdev.mediaprocessing.shared.kafka.dto
abstract class MessageDataWrapper(
@Transient open val status: Status = Status.ERROR,
@Transient open val message: String? = null
@Transient open val message: String? = null,
@Transient open val derivedFromEventId: String? = null
)
data class SimpleMessageData(
override val status: Status,
override val message: String? = null
) : MessageDataWrapper(status, message)
override val message: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, message, derivedFromEventId)
fun MessageDataWrapper?.isSuccess(): Boolean {

View File

@ -5,12 +5,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadBaseInfoPerformed)
data class BaseInfoPerformed(
override val status: Status,
val title: String,
val sanitizedName: String
) : MessageDataWrapper(status)
val sanitizedName: String,
override val derivedFromEventId: String
) : MessageDataWrapper(status = status, derivedFromEventId = derivedFromEventId)
fun BaseInfoPerformed?.hasValidData(): Boolean {
return this != null && this.title.isNotBlank() && this.sanitizedName.isNotBlank()

View File

@ -5,11 +5,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertPerformed)
data class ConvertWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
override val derivedFromEventId: String,
val outFiles: List<String> = listOf()
): MessageDataWrapper(status, message)
): MessageDataWrapper(status, message, derivedFromEventId)

View File

@ -6,13 +6,14 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED)
@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertCreated)
data class ConvertWorkerRequest(
override val status: Status,
val requiresEventId: String? = null,
override val derivedFromEventId: String? = null,
val inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,
val outDirectory: String,
val outFormats: List<SubtitleFormats> = listOf()
): MessageDataWrapper(status)
): MessageDataWrapper(status, derivedFromEventId = derivedFromEventId)

View File

@ -5,9 +5,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventWorkDownloadCoverPerformed)
data class CoverDownloadWorkPerformed(
override val status: Status,
override val message: String? = null,
val coverFile: String
): MessageDataWrapper(status, message)
val coverFile: String,
override val derivedFromEventId: String?
): MessageDataWrapper(status, message, derivedFromEventId = derivedFromEventId)

View File

@ -5,11 +5,11 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_COVER)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutCover)
data class CoverInfoPerformed(
override val status: Status,
val url: String,
val outDir: String,
val outFileBaseName: String
)
: MessageDataWrapper(status)
val outFileBaseName: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -6,13 +6,13 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED
KafkaEvents.EventWorkEncodeCreated,
KafkaEvents.EventWorkExtractCreated
)
data class FfmpegWorkRequestCreated(
override val status: Status,
val derivedFromEventId: String,
val inputFile: String,
val arguments: List<String>,
val outFile: String
): MessageDataWrapper(status)
val outFile: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -12,15 +12,15 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
* @param arguments Requires arguments, instructions for what ffmpeg should do
*/
@KafkaBelongsToEvent(
KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED
KafkaEvents.EventMediaParameterEncodeCreated,
KafkaEvents.EventMediaParameterExtractCreated
)
data class FfmpegWorkerArgumentsCreated(
override val status: Status,
val inputFile: String, // absolutePath
val entries: List<FfmpegWorkerArgument>
):
MessageDataWrapper(status)
val entries: List<FfmpegWorkerArgument>,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)
data class FfmpegWorkerArgument(
val outputFile: String,

View File

@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED)
@KafkaBelongsToEvent(KafkaEvents.EventMediaProcessStarted)
data class MediaProcessStarted(
override val status: Status,
val type: ProcessType = ProcessType.FLOW,

View File

@ -6,9 +6,9 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventMediaParseStreamPerformed)
data class MediaStreamsParsePerformed(
override val status: Status,
val streams: ParsedMediaStreams
): MessageDataWrapper(status)
val streams: ParsedMediaStreams,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -5,12 +5,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventMediaMetadataSearchPerformed)
data class MetadataPerformed(
override val status: Status,
override val message: String? = null,
val data: pyMetadata? = null
) : MessageDataWrapper(status, message)
val data: pyMetadata? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)
data class pyMetadata(
val title: String,

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventNotificationOfWorkItemRemoval)
data class NotificationOfDeletionPerformed(
override val status: Status = Status.COMPLETED,
override val message: String? = null,
override val derivedFromEventId: String? = null, // Skal aldri settes derived
val deletedEventId: String,
val deletedEvent: KafkaEvents
): MessageDataWrapper()

View File

@ -7,6 +7,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED)
data class ProcessCompleted(
override val status: Status
) : MessageDataWrapper(status) {
}
override val status: Status,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -6,9 +6,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadStreamPerformed)
data class ReaderPerformed(
override val status: Status,
val file: String, //AbsolutePath
val output: JsonObject
) : MessageDataWrapper(status)
val output: JsonObject,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -7,13 +7,13 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType)
data class VideoInfoPerformed(
override val status: Status,
val info: JsonObject,
val outDirectory: String
)
: MessageDataWrapper(status) {
val outDirectory: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId) {
fun toValueObject(): VideoInfo? {
val type = info.get("type").asString
return when (type) {
@ -46,7 +46,7 @@ data class SubtitleInfo(
val language: String
)
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType)
open class VideoInfo(
@Transient open val type: String,
@Transient open val title: String,

View File

@ -7,12 +7,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
KafkaEvents.EventWorkEncodePerformed
)
data class ProcesserEncodeWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
val outFile: String? = null
): MessageDataWrapper(status, message)
val outFile: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -7,12 +7,12 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
KafkaEvents.EventWorkExtractPerformed
)
data class ProcesserExtractWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
val outFile: String? = null
): MessageDataWrapper(status, message)
val outFile: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -28,7 +28,7 @@ class SerializationTest {
val json = gson.toJson(message)
val deserializer = DeserializingRegistry()
val result = deserializer.deserialize(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED, json)
val result = deserializer.deserialize(KafkaEvents.EventMediaProcessStarted, json)
assertThat(result.data).isInstanceOf(MediaProcessStarted::class.java)