New completion

This commit is contained in:
bskjon 2025-02-21 22:48:02 +01:00
parent a4c1b6978d
commit f8a69ee620
15 changed files with 141 additions and 83 deletions

View File

@ -11,7 +11,7 @@ import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaProcessStartEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.PermitWorkCreationEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.database.cal.EventsManager
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationContext
@ -36,15 +36,15 @@ class Coordinator(
}
public fun startProcess(file: File, type: ProcessType) {
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
val operations: List<OperationEvents> = listOf(
OperationEvents.ENCODE,
OperationEvents.EXTRACT,
OperationEvents.CONVERT
)
startProcess(file, type, operations)
}
fun startProcess(file: File, type: ProcessType, operations: List<StartOperationEvents>): UUID {
fun startProcess(file: File, type: ProcessType, operations: List<OperationEvents>): UUID {
val referenceId: UUID = UUID.randomUUID()
val event = MediaProcessStartEvent(
metadata = EventMetadata(

View File

@ -11,7 +11,7 @@ class EventsDatabase() {
allEvents,
tasks,
runners,
processed,
processedFile,
files
)

View File

@ -4,7 +4,7 @@ import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.common.contract.ProcessType
import no.iktdev.mediaprocessing.shared.common.contract.dto.EventRequest
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
@ -30,7 +30,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) {
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(convert.file)
}
referenceId = coordinator.startProcess(file, ProcessType.FLOW, listOf(StartOperationEvents.CONVERT)).toString()
referenceId = coordinator.startProcess(file, ProcessType.FLOW, listOf(OperationEvents.CONVERT)).toString()
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(Gson().toJson(convert))
@ -49,7 +49,7 @@ class RequestEventController(@Autowired var coordinator: Coordinator) {
if (!file.exists()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(payload)
}
referenceId = coordinator.startProcess(file, ProcessType.MANUAL, listOf(StartOperationEvents.EXTRACT)).toString()
referenceId = coordinator.startProcess(file, ProcessType.MANUAL, listOf(OperationEvents.EXTRACT)).toString()
} catch (e: Exception) {
return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(payload)

View File

@ -5,26 +5,16 @@ import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.*
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.eventi.database.executeOrException
import no.iktdev.eventi.database.withTransaction
import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.EventsSummaryMapping
import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store.*
import no.iktdev.mediaprocessing.coordinator.tasksV2.validator.CompletionValidator
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.reader.*
import no.iktdev.streamit.library.db.query.SummaryQuery
import no.iktdev.streamit.library.db.tables.catalog
import no.iktdev.streamit.library.db.tables.titles
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.insertIgnore
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.update
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.sql.SQLIntegrityConstraintViolationException
@Service
class CompletedTaskListener : CoordinatorEventListener() {
@ -186,10 +176,13 @@ class CompletedTaskListener : CoordinatorEventListener() {
e.printStackTrace()
}
ProcessedItemsStore.store(
val summary = EventsSummaryMapping().map(events)
ProcessedFileStore.store(
mediaInfo.title,
events,
(listOfNotNull(newVideoPath?.second) + (newSubtitles?.map { it.destination } ?: emptyList()))
summary
)

View File

@ -1,20 +1,16 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.*
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.coordinator.tasksV2.implementations.WorkTaskListener
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.isOnly
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@ -74,7 +70,7 @@ class ConvertWorkTaskListener: WorkTaskListener() {
foundEvent?.outputFile
} else if (event.eventType == Events.EventMediaProcessStarted) {
val startEvent = event.az<MediaProcessStartEvent>()?.data
if (startEvent?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
if (startEvent?.operations?.isOnly(OperationEvents.CONVERT) == true) {
startEvent.file
} else null
} else {

View File

@ -4,19 +4,15 @@ import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.EncodeWorkArgumentsMapping
import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract
import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
@ -55,7 +51,7 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
active = false
return
}
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.ENCODE) == false) {
if (started.data == null || started.data?.operations?.contains(OperationEvents.ENCODE) == false) {
active = false
return
}

View File

@ -4,18 +4,14 @@ import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.ExtractWorkArgumentsMapping
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract
import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@Service
class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
@ -47,7 +43,7 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
}
active = true
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>() ?: return
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.EXTRACT) == false) {
if (started.data == null || started.data?.operations?.contains(OperationEvents.EXTRACT) == false) {
active = false
return
}

View File

@ -8,19 +8,16 @@ import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.dataAs
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract
import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaFileStreamsReadEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -36,7 +33,7 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
override var coordinator: Coordinator? = null
val log = KotlinLogging.logger {}
val requiredOperations = listOf(StartOperationEvents.ENCODE, StartOperationEvents.EXTRACT)
val requiredOperations = listOf(OperationEvents.ENCODE, OperationEvents.EXTRACT)
override val produceEvent: Events = Events.EventMediaReadStreamPerformed
override val listensForEvents: List<Events> = listOf(Events.EventMediaProcessStarted)

View File

@ -0,0 +1,68 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping
import no.iktdev.eventi.data.dataAs
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.EventSummary
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationsSummary
import no.iktdev.mediaprocessing.shared.common.contract.dto.OutputFiles
import no.iktdev.mediaprocessing.shared.common.getChecksum
class EventsSummaryMapping {
fun map(events: List<Event>): EventSummary {
val startOperations = events.find { it.eventType == Events.EventMediaProcessStarted }?.dataAs<StartEventData>() ?: throw RuntimeException("No start event found")
val successOperations = listOfNotNull(
if (isEncodedSuccessful(events)) OperationEvents.ENCODE else null,
if (isExtractedSuccessful(events)) OperationEvents.EXTRACT else null,
if (isConvertedSuccessful(events)) OperationEvents.CONVERT else null
)
return EventSummary(
inputFile = startOperations.file,
inputFileChecksum = getChecksum(startOperations.file),
operationsSummary = OperationsSummary(
requestedOperations = startOperations.operations,
completedOperations = successOperations
),
outputFiles = getProducesFiles(events)
)
}
fun isEncodedSuccessful(events: List<Event>) = events.filter { it.eventType == Events.EventWorkEncodePerformed }.any { it.isSuccessful() }
fun isExtractedSuccessful(events: List<Event>) = events.filter { it.eventType == Events.EventWorkExtractPerformed }.any { it.isSuccessful() }
fun isConvertedSuccessful(events: List<Event>) = events.filter { it.eventType == Events.EventWorkConvertPerformed }.any { it.isSuccessful() }
fun getProducesFiles(events: List<Event>): OutputFiles {
val encoded = if (isEncodedSuccessful(events)) {
events.filter { it.eventType == Events.EventWorkEncodePerformed }
.filter { it.isSuccessful() }
.mapNotNull { it.dataAs<EncodedData>()?.outputFile }
} else emptyList()
val extracted = if (isExtractedSuccessful(events)) {
events.filter { it.eventType == Events.EventWorkExtractPerformed }
.filter { it.isSuccessful() }
.mapNotNull { it.dataAs<ExtractedData>() }
.map { it.outputFile }
} else emptyList()
val converted = if (isConvertedSuccessful(events)) {
events.filter { it.eventType == Events.EventWorkConvertPerformed }
.filter { it.isSuccessful() }
.mapNotNull { it.dataAs<ConvertedData>() }
.flatMap { it.outputFiles }
} else emptyList()
return OutputFiles(
encoded = encoded,
extracted = extracted,
converted = converted
)
}
}

View File

@ -4,30 +4,26 @@ import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.mediaprocessing.coordinator.eventDatabase
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.database.tables.processed
import no.iktdev.mediaprocessing.shared.common.contract.dto.EventSummary
import no.iktdev.mediaprocessing.shared.common.database.tables.processedFile
import no.iktdev.mediaprocessing.shared.common.getChecksum
import no.iktdev.streamit.library.db.executeOrException
import no.iktdev.streamit.library.db.withTransaction
import org.jetbrains.exposed.sql.insert
object ProcessedItemsStore {
object ProcessedFileStore {
val log = KotlinLogging.logger {}
fun store(title: String, events: List<Event>, processedFiles: List<String>) {
fun store(title: String, events: List<Event>, summary: EventSummary) {
val inputFilePath = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.file ?: return
val checksum = getChecksum(inputFilePath)
val isEncoded = events.findEventsOf<EncodeWorkPerformedEvent>().any { it.isSuccessful() }
val isExtracted = events.findEventsOf<EncodeWorkPerformedEvent>().any { it.isSuccessful() }
withTransaction(eventDatabase.database.database, block = {
processed.insert {
processedFile.insert {
it[this.title] = title
it[this.fileName] = inputFilePath
it[this.processedFiles] = Gson().toJson(processedFiles)
it[this.encoded] = isEncoded
it[this.extracted] = isExtracted
it[this.inputFile] = inputFilePath
it[this.data] = Gson().toJson(summary)
it[this.checksum] = checksum
}
}) {

View File

@ -4,7 +4,7 @@ import no.iktdev.eventi.data.dataAs
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.*
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats
import java.io.File
@ -17,11 +17,11 @@ object CompletionValidator {
* Checks whether it requires encode or extract or both, and it has created events with args
*/
fun req1(started: MediaProcessStartEvent, events: List<Event>): Boolean {
val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.ENCODE) == true) {
val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(OperationEvents.ENCODE) == true) {
events.any { it.eventType == Events.EventMediaParameterEncodeCreated }
} else true
val extractFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.EXTRACT) == true) {
val extractFulfilledOrSkipped = if (started.data?.operations?.contains(OperationEvents.EXTRACT) == true) {
events.any { it.eventType == Events.EventMediaParameterExtractCreated }
} else true
@ -34,8 +34,8 @@ object CompletionValidator {
* Checks whether work that was supposed to be created has been created.
* Checks if all subtitles that can be processed has been created if convert is set.
*/
fun req2(operations: List<StartOperationEvents>, events: List<Event>): Boolean {
if (StartOperationEvents.ENCODE in operations) {
fun req2(operations: List<OperationEvents>, events: List<Event>): Boolean {
if (OperationEvents.ENCODE in operations) {
val encodeParamter = events.find { it.eventType == Events.EventMediaParameterEncodeCreated }?.az<EncodeArgumentCreatedEvent>()
val encodeWork = events.find { it.eventType == Events.EventWorkEncodeCreated }
if (encodeParamter?.isSuccessful() == true && (encodeWork == null))
@ -44,12 +44,12 @@ object CompletionValidator {
val extractParamter = events.find { it.eventType == Events.EventMediaParameterExtractCreated }?.az<ExtractArgumentCreatedEvent>()
val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated }
if (StartOperationEvents.EXTRACT in operations) {
if (OperationEvents.EXTRACT in operations) {
if (extractParamter?.isSuccessful() == true && extractParamter.data?.size != extractWork.size)
return false
}
if (StartOperationEvents.CONVERT in operations) {
if (OperationEvents.CONVERT in operations) {
val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated }
val supportedSubtitleFormats = SubtitleFormats.entries.map { it.name }
@ -66,22 +66,22 @@ object CompletionValidator {
/**
* Checks whether all work that has been created has been completed
*/
fun req3(operations: List<StartOperationEvents>, events: List<Event>): Boolean {
if (StartOperationEvents.ENCODE in operations) {
fun req3(operations: List<OperationEvents>, events: List<Event>): Boolean {
if (OperationEvents.ENCODE in operations) {
val encodeWork = events.filter { it.eventType == Events.EventWorkEncodeCreated }
val encodePerformed = events.filter { it.eventType == Events.EventWorkEncodePerformed }
if (encodePerformed.size < encodeWork.size)
return false
}
if (StartOperationEvents.EXTRACT in operations) {
if (OperationEvents.EXTRACT in operations) {
val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated }
val extractPerformed = events.filter { it.eventType == Events.EventWorkExtractPerformed }
if (extractPerformed.size < extractWork.size)
return false
}
if (StartOperationEvents.CONVERT in operations) {
if (OperationEvents.CONVERT in operations) {
val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated }
val convertPerformed = events.filter { it.eventType == Events.EventWorkConvertPerformed }
if (convertPerformed.size < convertWork.size)

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.common.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.ProcessType
import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.common.contract.dto.OperationEvents
data class MediaProcessStartEvent(
override val metadata: EventMetadata,
@ -13,10 +13,10 @@ data class MediaProcessStartEvent(
data class StartEventData(
val type: ProcessType = ProcessType.FLOW,
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
val operations: List<OperationEvents> = listOf(
OperationEvents.ENCODE,
OperationEvents.EXTRACT,
OperationEvents.CONVERT
),
val file: String // AbsolutePath
)

View File

@ -16,12 +16,12 @@ enum class SubtitleFormats {
SMI
}
enum class StartOperationEvents {
enum class OperationEvents {
ENCODE,
EXTRACT,
CONVERT
}
fun List<StartOperationEvents>.isOnly(expected: StartOperationEvents): Boolean {
fun List<OperationEvents>.isOnly(expected: OperationEvents): Boolean {
return this.size == 1 && this.firstOrNull { it == expected } != null
}

View File

@ -0,0 +1,20 @@
package no.iktdev.mediaprocessing.shared.common.contract.dto
data class EventSummary(
val operationsSummary: OperationsSummary,
val inputFile: String,
val inputFileChecksum: String,
val outputFiles: OutputFiles
)
data class OperationsSummary(
val requestedOperations: List<OperationEvents>,
val completedOperations: List<OperationEvents>,
)
data class OutputFiles(
val encoded: List<String>,
val extracted: List<String>,
val converted: List<String>
)

View File

@ -1,19 +1,15 @@
package no.iktdev.mediaprocessing.shared.common.database.tables
import no.iktdev.mediaprocessing.shared.common.database.tables.runners.defaultExpression
import no.iktdev.mediaprocessing.shared.common.database.tables.tasks.default
import org.jetbrains.exposed.dao.id.IntIdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import org.jetbrains.exposed.sql.javatime.datetime
import java.time.LocalDateTime
object processed: IntIdTable() {
object processedFile: IntIdTable() {
val title: Column<String> = varchar("title", 256)
val fileName: Column<String> = varchar("fileName", 256)
val processedFiles: Column<String> = text("processedFilesJson")
val encoded: Column<Boolean> = bool("encoded").default(false)
val extracted: Column<Boolean> = bool("extracted").default(false)
val inputFile: Column<String> = varchar("fileName", 512)
val data: Column<String> = text("data")
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
val checksum: Column<String?> = varchar("checksum", 256).nullable()
}