This commit is contained in:
bskjon 2024-07-13 19:49:13 +02:00
parent 72b2d30fab
commit 12f3f6e3ac
99 changed files with 1498 additions and 1164 deletions

View File

@ -1,4 +1,4 @@
name: Build V2
name: Build V3
on:
push:

1
.idea/.name generated Normal file
View File

@ -0,0 +1 @@
MediaProcessing

1
.idea/gradle.xml generated
View File

@ -17,7 +17,6 @@
<option value="$PROJECT_DIR$/shared/common" />
<option value="$PROJECT_DIR$/shared/contract" />
<option value="$PROJECT_DIR$/shared/eventi" />
<option value="$PROJECT_DIR$/shared/kafka" />
</set>
</option>
</GradleProjectSettings>

View File

@ -0,0 +1,26 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="CoordinatorApplicationKt" type="JetRunConfigurationType" nameIsGenerated="true">
<envs>
<env name="DATABASE_ADDRESS" value="192.168.2.250" />
<env name="DATABASE_NAME_E" value="eventsDevV3" />
<env name="DATABASE_NAME_S" value="streamitDev3" />
<env name="DATABASE_PASSWORD" value="shFZ27eL2x2NoxyEDBMfDWkvFO" />
<env name="DATABASE_PORT" value="3306" />
<env name="DATABASE_USERNAME" value="root" />
<env name="DIRECTORY_CONTENT_INCOMING" value="G:\MediaProcessingPlayground\input" />
<env name="DIRECTORY_CONTENT_OUTGOING" value="G:\MediaProcessingPlayground\output" />
<env name="KAFKA_BOOTSTRAP_SERVER" value="192.168.2.250:19092" />
<env name="KAFKA_CONSUMER_ID" value="testConsumer" />
<env name="KAFKA_TOPIC" value="mediaEventsDev" />
<env name="METADATA_TIMEOUT" value="0" />
<env name="SUPPORTING_EXECUTABLE_FFMPEG" value="G:\MediaProcessingPlayground\ffmpeg.exe" />
<env name="SUPPORTING_EXECUTABLE_FFPROBE" value="G:\MediaProcessingPlayground\ffprobe.exe" />
</envs>
<option name="MAIN_CLASS_NAME" value="no.iktdev.mediaprocessing.coordinator.CoordinatorApplicationKt" />
<module name="MediaProcessing.apps.coordinator.main" />
<shortenClasspath name="NONE" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@ -0,0 +1,27 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="ProcesserApplicationKt" type="JetRunConfigurationType" singleton="false" nameIsGenerated="true">
<envs>
<env name="DATABASE_ADDRESS" value="192.168.2.250" />
<env name="DATABASE_NAME_E" value="eventsDevV3" />
<env name="DATABASE_NAME_S" value="streamitDev" />
<env name="DATABASE_PASSWORD" value="shFZ27eL2x2NoxyEDBMfDWkvFO" />
<env name="DATABASE_PORT" value="3306" />
<env name="DATABASE_USERNAME" value="root" />
<env name="DIRECTORY_CONTENT_INCOMING" value="G:\MediaProcessingPlayground\input" />
<env name="DIRECTORY_CONTENT_OUTGOING" value="G:\MediaProcessingPlayground\output" />
<env name="KAFKA_BOOTSTRAP_SERVER" value="192.168.2.250:19092" />
<env name="KAFKA_CONSUMER_ID" value="testConsumer" />
<env name="KAFKA_TOPIC" value="mediaEventsDev" />
<env name="METADATA_TIMEOUT" value="0" />
<env name="SUPPORTING_EXECUTABLE_FFMPEG" value="G:\MediaProcessingPlayground\ffmpeg.exe" />
<env name="SUPPORTING_EXECUTABLE_FFPROBE" value="G:\MediaProcessingPlayground\ffprobe.exe" />
<env name="server.port" value="8081" />
</envs>
<option name="MAIN_CLASS_NAME" value="no.iktdev.mediaprocessing.processer.ProcesserApplicationKt" />
<module name="MediaProcessing.apps.processer.main" />
<shortenClasspath name="NONE" />
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>

View File

@ -58,7 +58,8 @@ dependencies {
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:eventi")))
implementation(kotlin("stdlib-jdk8"))
}

View File

@ -18,11 +18,6 @@ class ConvertApplication
val ioCoroutine = CoroutinesIO()
val defaultCoroutine = CoroutinesDefault()
private var context: ApplicationContext? = null
@Suppress("unused")
fun getContext(): ApplicationContext? {
return context
}
lateinit var taskManager: TasksManager
lateinit var runnerManager: RunnerManager
@ -36,6 +31,9 @@ fun getEventsDatabase(): MySqlDataSource {
}
fun main(args: Array<String>) {
runApplication<ConvertApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
@ -56,8 +54,5 @@ fun main(args: Array<String>) {
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName)
runnerManager.assignRunner()
context = runApplication<ConvertApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
}
//private val logger = KotlinLogging.logger {}

View File

@ -1,16 +1,8 @@
package no.iktdev.mediaprocessing.converter
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@Configuration
class SocketLocalInit: SocketImplementation()
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.*
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.persistance.RunnerManager
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.data.Event
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@ -13,6 +14,10 @@ import org.springframework.stereotype.Service
@EnableScheduling
class TaskCoordinator(): TaskCoordinatorBase() {
private val log = KotlinLogging.logger {}
override fun onProduceEvent(event: Event) {
taskManager.produceEvent(event)
}
override fun onCoordinatorReady() {
super.onCoordinatorReady()
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName)

View File

@ -8,15 +8,13 @@ import no.iktdev.library.subtitle.export.Export
import no.iktdev.library.subtitle.reader.BaseReader
import no.iktdev.library.subtitle.reader.Reader
import no.iktdev.mediaprocessing.converter.ConverterEnv
import no.iktdev.mediaprocessing.shared.common.task.ConvertTaskData
import no.iktdev.mediaprocessing.shared.contract.data.ConvertData
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import java.io.File
import kotlin.jvm.Throws
class Converter2(val data: ConvertTaskData,
private val listener: ConvertListener
) {
class Converter2(val data: ConvertData,
private val listener: ConvertListener) {
@Throws(FileUnavailableException::class)
private fun getReader(): BaseReader? {
@ -51,19 +49,19 @@ class Converter2(val data: ConvertTaskData,
val filtered = read.filter { !it.ignore && it.type !in listOf(DialogType.SIGN_SONG, DialogType.CAPTION) }
val syncOrNotSync = syncDialogs(filtered)
val exporter = Export(file, File(data.outDirectory), data.outFileBaseName)
val exporter = Export(file, File(data.outputDirectory), data.outputFileName)
val outFiles = if (data.outFormats.isEmpty()) {
val outFiles = if (data.formats.isEmpty()) {
exporter.write(syncOrNotSync)
} else {
val exported = mutableListOf<File>()
if (data.outFormats.contains(SubtitleFormats.SRT)) {
if (data.formats.contains(SubtitleFormats.SRT)) {
exported.add(exporter.writeSrt(syncOrNotSync))
}
if (data.outFormats.contains(SubtitleFormats.SMI)) {
if (data.formats.contains(SubtitleFormats.SMI)) {
exported.add(exporter.writeSmi(syncOrNotSync))
}
if (data.outFormats.contains(SubtitleFormats.VTT)) {
if (data.formats.contains(SubtitleFormats.VTT)) {
exported.add(exporter.writeVtt(syncOrNotSync))
}
exported

View File

@ -3,15 +3,16 @@ package no.iktdev.mediaprocessing.converter.tasks
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.converter.*
import no.iktdev.mediaprocessing.converter.convert.ConvertListener
import no.iktdev.mediaprocessing.converter.convert.Converter2
import no.iktdev.mediaprocessing.shared.common.services.TaskService
import no.iktdev.mediaprocessing.shared.common.task.ConvertTaskData
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.contract.data.ConvertData
import no.iktdev.mediaprocessing.shared.contract.data.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.contract.data.ConvertedData
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@ -51,7 +52,7 @@ class ConvertServiceV2(
}
fun startConvert(task: Task) {
val convert = task.data as ConvertTaskData
val convert = task.data as ConvertData
worker = Converter2(convert, this)
worker?.execute()
}
@ -81,15 +82,16 @@ class ConvertServiceV2(
readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkConvertPerformed,
data = ConvertWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
tasks.onProduceEvent(ConvertWorkPerformed(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
outFiles = outputFiles
)
status = EventStatus.Success
),
data = ConvertedData(
outputFiles = outputFiles
)
))
onClearTask()
}
}
@ -99,17 +101,13 @@ class ConvertServiceV2(
super.onError(inputFile, message)
log.info { "Convert error for ${task.referenceId}" }
val data = ConvertWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
tasks.onProduceEvent(ConvertWorkPerformed(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
outFiles = emptyList()
)
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkConvertPerformed,
data = data
status = EventStatus.Failed
)
))
}

View File

@ -44,7 +44,6 @@ dependencies {
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
//implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))

View File

@ -8,7 +8,6 @@ import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.*
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.streamit.library.db.tables.*
import no.iktdev.streamit.library.db.tables.helper.cast_errors
import no.iktdev.streamit.library.db.tables.helper.data_audio
@ -44,6 +43,10 @@ fun getStoreDatabase(): MySqlDataSource {
lateinit var taskManager: TasksManager
fun main(args: Array<String>) {
printSharedConfig()
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
@ -82,16 +85,11 @@ fun main(args: Array<String>) {
titles
)
storeDatabase.createTables(*tables)
runApplication<CoordinatorApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
printSharedConfig()
}
fun printSharedConfig() {
log.info { "Kafka topic: ${KafkaEnv.kafkaTopic}" }
log.info { "File Input: ${SharedConfig.incomingContent}" }
log.info { "File Output: ${SharedConfig.outgoingContent}" }
log.info { "Ffprobe: ${SharedConfig.ffprobe}" }

View File

@ -1,14 +1,80 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.data.MediaProcessStartEvent
import no.iktdev.mediaprocessing.shared.contract.data.PermitWorkCreationEvent
import no.iktdev.mediaprocessing.shared.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Component
import java.io.File
import java.util.*
@Component
class Coordinator(
@Autowired
override var applicationContext: ApplicationContext,
@Autowired
override var eventManager: EventsManager
) : EventCoordinator<Event, EventsManager>()
) : EventCoordinator<Event, EventsManager>() {
init {
pullDelay.set(100)
}
public fun startProcess(file: File, type: ProcessType) {
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
)
startProcess(file, type, operations)
}
fun startProcess(file: File, type: ProcessType, operations: List<StartOperationEvents>): UUID {
val referenceId: UUID = UUID.randomUUID()
val event = MediaProcessStartEvent(
metadata = EventMetadata(
referenceId = referenceId.toString(),
status = EventStatus.Success
),
data = StartEventData(
file = file.absolutePath,
type = type,
operations = operations
)
)
produceNewEvent(event)
return referenceId
}
fun permitWorkToProceedOn(referenceId: String, events: List<Event>, message: String) {
val defaultRequiredBy = listOf(Events.EventMediaParameterEncodeCreated, Events.EventMediaParameterExtractCreated)
val eventToAttachTo = if (events.any { it.eventType in defaultRequiredBy }) {
events.findLast { it.eventType in defaultRequiredBy }
} else events.find { it.eventType == Events.EventMediaProcessStarted }
if (eventToAttachTo == null) {
log.error { "No event to attach permit to" }
return
}
produceNewEvent(PermitWorkCreationEvent(
metadata = EventMetadata(
referenceId = referenceId,
derivedFromEventId = eventToAttachTo.eventId(),
status = EventStatus.Success
),
data = message
))
}
}

View File

@ -1,36 +1,176 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.implementations.EventsManagerImpl
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.eventi.core.PersistentMessageHelper
import no.iktdev.eventi.data.derivedFromEventId
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.data.referenceId
import no.iktdev.eventi.data.toJson
import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.fromJsonWithDeserializer
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
class EventsManager(dataSource: DataSource) : EventsManagerContract(dataSource) {
override fun readAvailableEvents(): List<Event> {
TODO("Not yet implemented")
override fun storeEvent(event: Event): Boolean {
withTransaction(dataSource.database) {
allEvents.insert {
it[referenceId] = event.referenceId()
it[eventId] = event.eventId()
it[events.event] = event.eventType.event
it[data] = event.toJson()
}
}
val existing = getEventsWith(event.referenceId())
val derivedId = event.metadata.derivedFromEventId
if (derivedId != null) {
val isNewEventOrphan = existing.none { it.eventId() == derivedId }
if (isNewEventOrphan) {
log.warn { "Message not saved! ${event.referenceId()} with eventId(${event.eventId()}) for event ${event.eventType} has derivedEventId($derivedId) which does not exist!" }
return false
}
}
val exception = executeOrException(dataSource.database) {
events.insert {
it[referenceId] = event.referenceId()
it[eventId] = event.eventId()
it[events.event] = event.eventType.event
it[data] = event.toJson()
}
}
val success = if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.debug { "Error is of SQLIntegrityConstraintViolationException" }
log.error { exception.message }
exception.printStackTrace()
} else {
log.debug { "Error code is: ${exception.errorCode}" }
log.error { exception.message }
exception.printStackTrace()
}
} else {
log.error { exception.message }
exception.printStackTrace()
}
false
} else {
true
}
if (success) {
deleteSupersededEvents(referenceId = event.referenceId(), eventId = event.eventId(), event = event.eventType, derivedFromId = event.derivedFromEventId())
}
return success
}
private val exemptedFromSingleEvent = listOf(
Events.EventWorkConvertCreated,
Events.EventWorkExtractCreated,
Events.EventWorkConvertPerformed,
Events.EventWorkExtractPerformed
)
private fun isExempted(event: Events): Boolean {
return event in exemptedFromSingleEvent
}
override fun readAvailableEvents(): List<List<Event>> {
return withTransaction (dataSource.database) {
events.selectAll()
.groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }
} ?: emptyList()
}
override fun readAvailableEventsFor(referenceId: String): List<Event> {
TODO("Not yet implemented")
val events = withTransaction(dataSource.database) {
events.select { events.referenceId eq referenceId }
.mapNotNull { it.toEvent() }
} ?: emptyList()
return if (events.any { it.eventType == Events.EventMediaProcessCompleted }) emptyList() else events
}
override fun storeEvent(event: Event): Boolean {
TODO("Not yet implemented")
override fun getAllEvents(): List<List<Event>> {
val events = withTransaction(dataSource.database) {
events.selectAll()
.groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> v.toEvent() } }
} ?: emptyList()
return events.filter { it.none { it.eventType == Events.EventMediaProcessCompleted } }
}
override fun getEventsWith(referenceId: String): List<Event> {
return withTransaction(dataSource.database) {
events.select {
(events.referenceId eq referenceId)
}
.orderBy(events.created, SortOrder.ASC)
.mapNotNull { it.toEvent() }
} ?: emptyList()
}
/**
* @param referenceId Reference
* @param eventId Current eventId for the message, required to prevent deletion of itself
* @param event Current event for the message
*/
private fun deleteSupersededEvents(referenceId: String, eventId: String, event: Events, derivedFromId: String?) {
val forRemoval = mutableListOf<Event>()
val present = getEventsWith(referenceId).filter { it.metadata.derivedFromEventId != null }
val helper = PersistentMessageHelper<Event>(present)
val replaced = if (!isExempted(event)) present.find { it.eventId() != eventId && it.eventType == event } else null
val orphaned = replaced?.let { helper.getEventsRelatedTo(it.eventId()) }?.toMutableSet() ?: mutableSetOf()
//orphaned.addAll(helper.findOrphanedEvents())
forRemoval.addAll(orphaned)
deleteSupersededEvents(forRemoval)
}
/**
* Deletes the events
*/
private fun deleteSupersededEvents(superseded: List<Event>) {
withTransaction(dataSource) {
superseded.forEach { duplicate ->
events.deleteWhere {
(events.referenceId eq duplicate.referenceId()) and
(events.eventId eq duplicate.eventId()) and
(events.event eq duplicate.eventType.event)
}
}
}
}
class MockEventManager(dataSource: DataSource) : EventsManagerImpl<EventImpl>(dataSource) {
val events: MutableList<EventImpl> = mutableListOf()
override fun readAvailableEvents(): List<EventImpl> {
return events.toList()
private fun ResultRow.toEvent(): Event? {
val kev = try {
Events.toEvent(this[events.event])
} catch (e: IllegalArgumentException) {
e.printStackTrace()
return null
}?: return null
return this[events.data].fromJsonWithDeserializer(kev)
}
override fun readAvailableEventsFor(referenceId: String): List<EventImpl> {
return events.filter { it.metadata.referenceId == referenceId }
}
override fun storeEvent(event: EventImpl): Boolean {
return events.add(event)
}
}

View File

@ -1,9 +1,6 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@ -12,8 +9,3 @@ class SocketLocalInit: SocketImplementation() {
}
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}

View File

@ -1,7 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.eventManager
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.EventsManager
import no.iktdev.mediaprocessing.shared.contract.dto.RequestWorkProceed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpStatus
@ -12,17 +13,17 @@ import org.springframework.web.bind.annotation.RequestMapping
@Controller
@RequestMapping(path = ["/action"])
class ActionEventController(@Autowired var coordinator: EventCoordinatorDep) {
class ActionEventController(@Autowired var coordinator: Coordinator, @Autowired var eventsManager: EventsManager) {
@RequestMapping("/flow/proceed")
fun permitRunOnSequence(@RequestBody data: RequestWorkProceed): ResponseEntity<String> {
val set = eventManager.getEventsWith(data.referenceId)
val set = eventsManager.getEventsWith(data.referenceId)
if (set.isEmpty()) {
return ResponseEntity.status(HttpStatus.NO_CONTENT).body(Gson().toJson(data))
}
coordinator.permitWorkToProceedOn(data.referenceId, "Requested by ${data.source}")
coordinator.permitWorkToProceedOn(data.referenceId, set, "Requested by ${data.source}")
//EVENT_MEDIA_WORK_PROCEED_PERMITTED("event:media-work-proceed:permitted")
return ResponseEntity.ok(null)

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.controller
import com.google.gson.Gson
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.EventRequest
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
@ -16,7 +17,7 @@ import java.io.File
@Controller
@RequestMapping(path = ["/request"])
class RequestEventController(@Autowired var coordinator: EventCoordinatorDep) {
class RequestEventController(@Autowired var coordinator: Coordinator) {
@PostMapping("/convert")
@ResponseStatus(HttpStatus.OK)

View File

@ -1,33 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.coordination
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class PersistentEventBasedMessageListener: EventBasedMessageListener<PersistentMessage>() {
override fun listenerWantingEvent(
event: PersistentMessage,
waitingListeners: List<Tasks<PersistentMessage>>
): List<Tasks<PersistentMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentMessage,
history: List<PersistentMessage>,
listeners: List<ITaskCreatorListener<PersistentMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentMessage>): List<Tasks<PersistentMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,22 +1,19 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataCoverDto
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.contract.reader.SummaryInfo
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import java.io.File
class MetadataMapping(val events: List<PersistentMessage>) {
var collection: String?
class MetadataMapping(val events: List<Event>) {
//var collection: String?
init {
collection = findCollection()
// collection = findCollection()
}
fun map(): MetadataDto? {
/* fun map(): MetadataDto? {
val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val mediaReadOut = events.find { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed?
@ -61,6 +58,6 @@ class MetadataMapping(val events: List<PersistentMessage>) {
return null
}
return mediaReadOut?.outDirectory?.let { File(it).name } ?: baseInfo?.title
}
}*/
}

View File

@ -1,16 +1,11 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.reader.OutputFilesDto
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class OutputFilesMapping(val events: List<PersistentMessage>) {
class OutputFilesMapping(val events: List<Event>) {
fun mapTo(): OutputFilesDto {
/*fun mapTo(): OutputFilesDto {
val videoResult = events.filter { it.data is ProcesserEncodeWorkPerformed }
.map { it.data as ProcesserEncodeWorkPerformed }
@ -38,5 +33,5 @@ class OutputFilesMapping(val events: List<PersistentMessage>) {
val sub1 = extracted.mapNotNull { it.outFile }
val sub2 = converted.flatMap { it.outFiles }
return sub1 + sub2
}
}*/
}

View File

@ -1,16 +1,11 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.isSkipped
import no.iktdev.mediaprocessing.shared.common.persistance.isSuccess
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class ProcessMapping(val events: List<PersistentMessage>) {
class ProcessMapping(val events: List<Event>) {
fun map(): MediaProcessedDto? {
/* fun map(): MediaProcessedDto? {
val referenceId = events.firstOrNull()?.referenceId ?: return null
val processStarted = getProcessStarted()
val meta = MetadataMapping(events)
@ -68,6 +63,6 @@ class ProcessMapping(val events: List<PersistentMessage>) {
fun canCollect(): Boolean {
return (!waitsForEncode() && !waitsForExtract() && !waitsForConvert())
}
}*/
}

View File

@ -1,14 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.reader.SerieInfo
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
class VideoDetailsMapper(val events: List<PersistentMessage>) {
class VideoDetailsMapper(val events: List<Event>) {
fun mapTo(): VideoDetails? {
/* fun mapTo(): VideoDetails? {
val mediaReadOut = events.lastOrNull { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
val proper = mediaReadOut?.toValueObject() ?: return null
@ -23,5 +21,5 @@ class VideoDetailsMapper(val events: List<PersistentMessage>) {
)
)
return details
}
}*/
}

View File

@ -1,23 +1,14 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.db.query.*
import no.iktdev.streamit.library.db.tables.titles
import org.jetbrains.exposed.exceptions.ExposedSQLException
@ -26,24 +17,24 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.sql.SQLIntegrityConstraintViolationException
/*
@Service
class CollectAndStoreTask() : TaskCreator(null) {
class CollectAndStoreTask() {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EventCollectAndStore
val producesEvent: KafkaEvents = KafkaEvents.EventCollectAndStore
override val requiredEvents: List<KafkaEvents> = listOf(
val requiredEvents: List<KafkaEvents> = listOf(
EventMediaProcessStarted,
EventMediaProcessCompleted
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events)
fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${event.referenceId} triggered by ${event.event}" }
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
@ -180,4 +171,4 @@ class CollectAndStoreTask() : TaskCreator(null) {
}
}
}*/

View File

@ -1,135 +0,0 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingPrecondition
import no.iktdev.mediaprocessing.coordinator.utils.isAwaitingTask
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
@Service
class CompleteMediaTask() : TaskCreator(null) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EventMediaProcessCompleted
override val requiredEvents: List<KafkaEvents> = listOf(
EventMediaProcessStarted,
EventMediaReadBaseInfoPerformed,
EventMediaReadOutNameAndType
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
super.onProcessEventsAccepted(event, events)
log.info { "${event.referenceId} triggered by ${event.event}" }
val started = events.lastOrSuccessOf(EventMediaProcessStarted) ?: return null
if (!started.data.isSuccess()) {
return null
}
val receivedEvents = events.map { it.event }
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val ffmpegEvents = listOf(KafkaEvents.EventMediaParameterEncodeCreated, EventMediaParameterExtractCreated)
if (ffmpegEvents.any { receivedEvents.contains(it) } && events.none { e -> KafkaEvents.isOfWork(e.event) }) {
return null
}
val startedData: MediaProcessStarted? = started.data as MediaProcessStarted?
if (startedData == null) {
log.error { "${event.referenceId} contains a started event without proper data object" }
return null
}
val ch = CompleteHandler(events)
val chEvents = ch.getMissingCompletions()
if (chEvents.isNotEmpty()) {
log.info { "Waiting for ${chEvents.joinToString(",")}" }
log.warn { "Waiting report: ${Gson().toJson(chEvents)}" }
return null
}
val taskEvents = startedData.operations.map {
when(it) {
StartOperationEvents.ENCODE -> TaskType.Encode
StartOperationEvents.EXTRACT -> TaskType.Extract
StartOperationEvents.CONVERT -> TaskType.Convert
}
}
val isWaitingForPrecondition = isAwaitingPrecondition(taskEvents, events)
if (isWaitingForPrecondition.isNotEmpty()) {
log.info { "Waiting for preconditions: ${isWaitingForPrecondition.keys.joinToString(",") }" }
return null
}
val isWaiting = taskEvents.map {
isAwaitingTask(it, events)
}.any { it }
//val mapper = ProcessMapping(events)
//if (mapper.canCollect()) {
if (!isWaiting) {
return ProcessCompleted(Status.COMPLETED, event.eventId)
} else {
log.info { "Is waiting for task.." }
}
return null
}
class CompleteHandler(val events: List<PersistentMessage>) {
var report: Map<KafkaEvents, Int> = listOf(
EventReport.fromEvents(EventWorkEncodeCreated, events),
EventReport.fromEvents(EventWorkExtractCreated, events),
EventReport.fromEvents(EventWorkConvertCreated, events),
EventReport.fromEvents(EventWorkEncodePerformed, events),
EventReport.fromEvents(EventWorkExtractPerformed, events),
EventReport.fromEvents(EventWorkConvertPerformed, events)
).associate { it.event to it.count }
fun getMissingCompletions(): List<StartOperationEvents> {
val missings = mutableListOf<StartOperationEvents>()
if ((report[EventWorkEncodeCreated]?: 0) > (report[EventWorkEncodePerformed] ?: 0))
missings.add(StartOperationEvents.ENCODE)
if ((report[EventWorkExtractCreated] ?: 0) > (report[EventWorkExtractPerformed] ?: 0))
missings.add(StartOperationEvents.EXTRACT)
if ((report[EventWorkConvertCreated] ?: 0) > (report[EventWorkConvertPerformed] ?: 0))
missings.add(StartOperationEvents.CONVERT)
return missings
}
data class EventReport(val event: KafkaEvents, val count: Int) {
companion object {
fun fromEvents(event: KafkaEvents, events: List<PersistentMessage>): EventReport {
return EventReport(event = event, count = events.filter { it.event == event }.size)
}
}
}
}
}

View File

@ -1,6 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.Coordinator
@ -9,7 +11,7 @@ import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.data.BaseInfo
import no.iktdev.mediaprocessing.shared.contract.data.BaseInfoEvent
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.contract.data.StartEventData
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -26,20 +28,26 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val message = try {
readFileInfo(incomingEvent.data as MediaProcessStarted, incomingEvent.metadata.eventId)?.let {
BaseInfoEvent(metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success), data = it)
} ?: BaseInfoEvent(metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed))
readFileInfo(event.data as StartEventData, event.metadata.eventId)?.let {
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success), data = it)
} ?: BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed))
} catch (e: Exception) {
BaseInfoEvent(metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed))
e.printStackTrace()
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed))
}
onProduceEvent(message)
}
@Throws(Exception::class)
fun readFileInfo(started: MediaProcessStarted, eventId: String): BaseInfo? {
fun readFileInfo(started: StartEventData, eventId: String): BaseInfo? {
return try {
val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName)

View File

@ -0,0 +1,375 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.*
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.data.*
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.contract.reader.*
import no.iktdev.streamit.library.db.query.CatalogQuery
import no.iktdev.streamit.library.db.query.GenreQuery
import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.db.query.SummaryQuery
import no.iktdev.streamit.library.db.tables.titles
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.insertIgnore
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
import java.sql.SQLIntegrityConstraintViolationException
@Service
class CompletedTaskListener: CoordinatorEventListener() {
val log = KotlinLogging.logger {}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventMediaProcessCompleted
override val listensForEvents: List<Events> = listOf(
Events.EventWorkDownloadCoverPerformed,
Events.EventWorkConvertPerformed,
Events.EventWorkEncodePerformed,
Events.EventWorkExtractPerformed
)
/**
* Checks whether it requires encode or extract or both, and it has created events with args
*/
fun req1(started: MediaProcessStartEvent, events: List<Event>): Boolean {
val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.ENCODE) == true) {
events.any { it.eventType == Events.EventMediaParameterEncodeCreated }
} else true
val extractFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.EXTRACT) == true) {
events.any { it.eventType == Events.EventMediaParameterExtractCreated }
} else true
if (!encodeFulfilledOrSkipped || !extractFulfilledOrSkipped) {
return false
} else return true
}
/**
* Checks whether work that was supposed to be created has been created.
* Checks if all subtitles that can be processed has been created if convert is set.
*/
fun req2(operations: List<StartOperationEvents>, events: List<Event>): Boolean {
if (StartOperationEvents.ENCODE in operations) {
val encodeParamter = events.find { it.eventType == Events.EventMediaParameterEncodeCreated }?.az<EncodeArgumentCreatedEvent>()
val encodeWork = events.find { it.eventType == Events.EventWorkEncodeCreated }
if (encodeParamter?.isSuccessful() == true && (encodeWork == null))
return false
}
val extractParamter = events.find { it.eventType == Events.EventMediaParameterExtractCreated }?.az<ExtractArgumentCreatedEvent>()
val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated }
if (StartOperationEvents.EXTRACT in operations) {
if (extractParamter?.isSuccessful() == true && extractParamter.data?.size != extractWork.size)
return false
}
if (StartOperationEvents.CONVERT in operations) {
val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated }
val supportedSubtitleFormats = SubtitleFormats.entries.map { it.name }
val eventsSupportsConvert = extractWork.filter { it.data is ExtractArgumentData }
.filter { (it.dataAs<ExtractArgumentData>()?.outputFile?.let { f -> File(f).extension.uppercase() } in supportedSubtitleFormats) }
if (convertWork.size != eventsSupportsConvert.size)
return false
}
return true
}
/**
* Checks whether all work that has been created has been completed
*/
fun req3(operations: List<StartOperationEvents>, events: List<Event>): Boolean {
if (StartOperationEvents.ENCODE in operations) {
val encodeWork = events.filter { it.eventType == Events.EventWorkEncodeCreated }
val encodePerformed = events.filter { it.eventType == Events.EventWorkEncodePerformed }
if (encodePerformed.size < encodeWork.size)
return false
}
if (StartOperationEvents.EXTRACT in operations) {
val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated }
val extractPerformed = events.filter { it.eventType == Events.EventWorkExtractPerformed }
if (extractPerformed.size < extractWork.size)
return false
}
if (StartOperationEvents.CONVERT in operations) {
val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated }
val convertPerformed = events.filter { it.eventType == Events.EventWorkConvertPerformed }
if (convertPerformed.size < convertWork.size)
return false
}
return true
}
override fun isPrerequisitesFulfilled(incomingEvent: Event, events: List<Event>): Boolean {
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>()
if (started == null) {
log.info { "No Start event" }
return false
}
val viableEvents = events.filter { it.isSuccessful() }
if (!req1(started, viableEvents)) {
log.info { "${this::class.java.simpleName} Failed Req1" }
return false
}
if (!req2(started.data?.operations ?: emptyList(), viableEvents)) {
log.info { "${this::class.java.simpleName} Failed Req2" }
return false
}
if (!req3(started.data?.operations ?: emptyList(), viableEvents)) {
log.info { "${this::class.java.simpleName} Failed Req3" }
return false
}
return super.isPrerequisitesFulfilled(incomingEvent, events)
}
fun getMetadata(events: List<Event>): MetadataDto? {
val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az<BaseInfoEvent>()
val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()
val metadataInfo = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az<MediaMetadataReceivedEvent>()
val coverInfo = events.find { it.eventType == Events.EventWorkDownloadCoverPerformed }?.az<MediaCoverDownloadedEvent>()
val coverTask = events.find { it.eventType == Events.EventMediaReadOutCover }?.az<MediaCoverInfoReceivedEvent>()
if (baseInfo == null) {
log.info { "Cant find BaseInfoEvent on ${Events.EventMediaReadBaseInfoPerformed}" }
return null
}
if (mediaInfo == null) {
log.info { "Cant find MediaOutInformationConstructedEvent on ${Events.EventMediaReadOutNameAndType}" }
return null
}
if (metadataInfo == null) {
log.info { "Cant find MediaMetadataReceivedEvent on ${Events.EventMediaMetadataSearchPerformed}" }
return null
}
if (coverTask?.isSkipped() == false && coverInfo == null) {
log.info { "Cant find MediaCoverDownloadedEvent on ${Events.EventWorkDownloadCoverPerformed}" }
}
val mediaInfoData = mediaInfo.data?.toValueObject()
val baseInfoData = baseInfo.data
val metadataInfoData = metadataInfo.data
val collection = mediaInfo.data?.outDirectory?.let { File(it).name } ?: baseInfoData?.title
return MetadataDto(
title = mediaInfoData?.title ?: baseInfoData?.title ?: metadataInfoData?.title ?: return null,
collection = collection ?: return null,
cover = coverInfo?.data?.absoluteFilePath,
type = metadataInfoData?.type ?: mediaInfoData?.type ?: return null,
summary = metadataInfoData?.summary?.filter {it.summary != null }?.map { SummaryInfo(language = it.language, summary = it.summary!! ) } ?: emptyList(),
genres = metadataInfoData?.genres ?: emptyList(),
titles = (metadataInfoData?.altTitle ?: emptyList()) + listOfNotNull(mediaInfoData?.title, baseInfoData?.title)
)
}
fun getGenres(events: List<Event>): List<String> {
val metadataInfo = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az<MediaMetadataReceivedEvent>()
return metadataInfo?.data?.genres ?: emptyList()
}
fun getSubtitles(metadataDto: MetadataDto?, events: List<Event>): List<SubtitlesDto> {
val extracted = events.filter { it.eventType == Events.EventWorkExtractPerformed }.mapNotNull { it.dataAs<ExtractedData>() }
val converted = events.filter { it.eventType == Events.EventWorkConvertPerformed }.mapNotNull { it.dataAs<ConvertedData>() }
val outFiles = extracted.map { it.outputFile } + converted.flatMap { it.outputFiles }
return outFiles.map {
val subtitleFile = File(it)
SubtitlesDto(
collection = metadataDto?.collection ?: subtitleFile.parentFile.parentFile.name,
language = subtitleFile.parentFile.name,
subtitleFile = subtitleFile.name,
format = subtitleFile.extension.uppercase(),
associatedWithVideo = subtitleFile.nameWithoutExtension,
)
}
}
fun getVideo(events: List<Event>): VideoDetails? {
val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()
val encoded = events.find { it.eventType == Events.EventWorkEncodePerformed }?.dataAs<EncodedData>()?.outputFile
if (encoded == null) {
log.warn { "No encode no video details!" }
return null
}
val proper = mediaInfo?.data?.toValueObject() ?: return null
val details = VideoDetails(
type = proper.type,
fileName = File(encoded).name,
serieInfo = if (proper !is EpisodeInfo) null else SerieInfo(
episodeTitle = proper.episodeTitle,
episodeNumber = proper.episode,
seasonNumber = proper.season,
title = proper.title
)
)
return details
}
fun storeSubtitles(subtitles: List<SubtitlesDto>) {
subtitles.forEach { subtitle ->
subtitle to executeWithStatus(getStoreDatabase()) {
SubtitleQuery(
collection = subtitle.collection,
associatedWithVideo = subtitle.associatedWithVideo,
language = subtitle.language,
format = subtitle.format,
file = subtitle.subtitleFile
).insert()
}
}
}
fun storeTitles(usedTitle: String, metadata: MetadataDto) {
try {
withTransaction(getStoreDatabase()) {
titles.insertIgnore {
it[titles.masterTitle] = metadata.collection
it[titles.title] = NameHelper.normalize(usedTitle)
it[titles.type] = 1
}
titles.insertIgnore {
it[titles.masterTitle] = usedTitle
it[titles.title] = NameHelper.normalize(usedTitle)
it[titles.type] = 2
}
metadata.titles.forEach { title ->
titles.insertIgnore {
it[titles.masterTitle] = usedTitle
it[titles.title] = title
}
}
}
} catch (e: Exception) {
e.printStackTrace()
}
}
fun storeMetadata(catalogId: Int, metadata: MetadataDto) {
metadata.summary.forEach {
withTransaction(getStoreDatabase()) {
SummaryQuery(
cid = catalogId,
language = it.language,
description = it.summary
).insert()
}
}
}
fun storeAndGetGenres(genres: List<String>): String? {
return withTransaction(getStoreDatabase()) {
val gq = GenreQuery( *genres.toTypedArray() )
gq.insertAndGetIds()
gq.getIds().joinToString(",")
}
}
fun storeCatalog(metadata: MetadataDto, videoDetails: VideoDetails, genres: String?): Int? {
val precreatedCatalogQuery = CatalogQuery(
title = NameHelper.normalize(metadata.title),
cover = metadata.cover,
type = metadata.type,
collection = metadata.collection,
genres = genres
)
val result = when (videoDetails.type) {
"serie" -> {
val serieInfo = videoDetails.serieInfo ?: throw RuntimeException("SerieInfo missing in VideoDetails for Serie! ${videoDetails.fileName}")
executeOrException {
precreatedCatalogQuery.insertWithSerie(
episodeTitle = serieInfo.episodeTitle ?: "",
videoFile = videoDetails.fileName,
episode = serieInfo.episodeNumber,
season = serieInfo.seasonNumber
)
}
}
"movie" -> {
executeOrException {
precreatedCatalogQuery.insertWithMovie(videoDetails.fileName)
}
}
else -> throw RuntimeException("${videoDetails.type} is not supported!")
}
val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
return if (result == null || ignoreException ) {
return withTransaction(getStoreDatabase()) {
precreatedCatalogQuery.getId()
}
} else null
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val result = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return result
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume() ?: return
val metadata = getMetadata(events)
val genres = getGenres(events)
val subtitles = getSubtitles(metadata, events)
val video = getVideo(events)
val storedGenres = storeAndGetGenres(genres)
val catalogId = if (metadata != null && video != null) {
storeCatalog(metadata, video, storedGenres)
} else null
storeSubtitles(subtitles)
metadata?.let {
storeTitles(metadata = metadata, usedTitle = metadata.title)
catalogId?.let { id ->
storeMetadata(catalogId = id, metadata = it)
}
}
onProduceEvent(MediaProcessCompletedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = CompletedEventData(
events.map { it.eventId() }
)
))
}
}

View File

@ -2,7 +2,9 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.*
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
@ -29,15 +31,33 @@ class ConvertWorkTaskListener: WorkTaskListener() {
Events.EventWorkExtractPerformed
)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
if (!canStart(incomingEvent, events)) {
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
if (!isOfEventsIListenFor(incomingEvent))
return false
if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) {
return false
}
val producedEvents = events.filter { it.eventType == produceEvent }
val shouldIHandleAndProduce = producedEvents.none { it.derivedFromEventId() == incomingEvent.eventId() }
if (shouldIHandleAndProduce) {
log.info { "Permitting handling of event: ${incomingEvent.dataAs<ExtractedData>()?.outputFile}" }
}
return shouldIHandleAndProduce
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
if (!canStart(event, events)) {
return
}
val file = if (incomingEvent.eventType == Events.EventWorkExtractPerformed) {
incomingEvent.az<ExtractWorkPerformedEvent>()?.data?.outputFile
} else if (incomingEvent.eventType == Events.EventMediaProcessStarted) {
val startEvent = incomingEvent.az<MediaProcessStartEvent>()?.data
val file = if (event.eventType == Events.EventWorkExtractPerformed) {
event.az<ExtractWorkPerformedEvent>()?.data?.outputFile
} else if (event.eventType == Events.EventMediaProcessStarted) {
val startEvent = event.az<MediaProcessStartEvent>()?.data
if (startEvent?.operations?.isOnly(StartOperationEvents.CONVERT) == true) {
startEvent.file
} else null
@ -50,7 +70,7 @@ class ConvertWorkTaskListener: WorkTaskListener() {
val convertFile = file?.let { File(it) }
if (convertFile == null || !convertFile.exists()) {
onProduceEvent(ConvertWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
))
return
} else {
@ -61,24 +81,21 @@ class ConvertWorkTaskListener: WorkTaskListener() {
allowOverwrite = true
)
val status = taskManager.createTask(
referenceId = incomingEvent.referenceId(),
eventId = incomingEvent.eventId(),
task = TaskType.Convert,
derivedFromEventId = incomingEvent.eventId(),
data = Gson().toJson(convertData),
inputFile = convertFile.absolutePath
)
if (!status) {
log.error { "Failed to create Convert task on ${incomingEvent.referenceId()}@${incomingEvent.eventId()}" }
return
}
onProduceEvent(ConvertWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
ConvertWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = convertData
))
).also { event ->
onProduceEvent(event)
taskManager.createTask(
referenceId = event.referenceId(),
eventId = event.eventId(),
derivedFromEventId = event.derivedFromEventId(),
task = TaskType.Convert,
data = WGson.gson.toJson(event.data!!),
inputFile = event.data!!.inputFile
)
}
}
}
}

View File

@ -2,6 +2,8 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
@ -23,12 +25,19 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkDownloadCoverPerformed
override val listensForEvents: List<Events> = listOf(Events.EventMediaReadOutCover)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val failedEventDefault = MediaCoverDownloadedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
val data = incomingEvent.az<MediaCoverInfoReceivedEvent>()?.data
val data = event.az<MediaCoverInfoReceivedEvent>()?.data
if (data == null) {
log.error { "No valid data for use to obtain cover" }
onProduceEvent(failedEventDefault)
@ -37,7 +46,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
val outDir = File(data.outDir)
if (!outDir.exists()) {
log.error { "Check for output directory for cover storage failed for ${incomingEvent.metadata.eventId} " }
log.error { "Check for output directory for cover storage failed for ${event.metadata.eventId} " }
onProduceEvent(failedEventDefault)
}
@ -62,14 +71,14 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
}
if (result == null) {
log.error { "Could not download cover, check logs ${incomingEvent.metadata.eventId} " }
log.error { "Could not download cover, check logs ${event.metadata.eventId} " }
} else {
if (!result.exists() || !result.canRead()) {
onProduceEvent(failedEventDefault)
return
}
onProduceEvent(MediaCoverDownloadedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = DownloadedCover(result.absolutePath)
))
}

View File

@ -1,6 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
@ -25,7 +27,14 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
override val produceEvent: Events = Events.EventMediaReadOutCover
override val listensForEvents: List<Events> = listOf(Events.EventMediaMetadataSearchPerformed)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az<BaseInfoEvent>()?.data ?: return
val metadata = events.findLast { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az<MediaMetadataReceivedEvent>()?.data ?: return
val mediaOutInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()?.data ?: return
@ -39,11 +48,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
val result = if (coverUrl.isNullOrBlank()) {
log.warn { "No cover available for ${baseInfo.title}" }
MediaCoverInfoReceivedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Skipped)
metadata = event.makeDerivedEventInfo(EventStatus.Skipped)
)
} else {
MediaCoverInfoReceivedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = CoverDetails(
url = coverUrl,
outFileBaseName = NameHelper.normalize(coverTitle),

View File

@ -1,5 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
@ -17,6 +20,9 @@ import java.io.File
@Service
class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
val log = KotlinLogging.logger {}
@Autowired
override var coordinator: Coordinator? = null
@ -28,8 +34,19 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
)
val preference = Preference.getPreference()
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
val eventType = events.map { it.eventType }
return state && eventType.containsAll(listensForEvents)
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>() ?: return
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.ENCODE) == false) {
return
@ -57,11 +74,11 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
val result = mapper.getArguments()
if (result == null) {
onProduceEvent(EncodeArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
))
} else {
onProduceEvent(EncodeArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = result
))
}

View File

@ -1,10 +1,17 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.derivedFromEventId
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.data.referenceId
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.coordinator.tasksV2.implementations.WorkTaskListener
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.contract.data.*
@ -23,27 +30,41 @@ class EncodeWorkTaskListener : WorkTaskListener() {
Events.EventMediaWorkProceedPermitted
)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
if (!canStart(incomingEvent, events)) {
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val encodeArguments = if (incomingEvent.eventType == Events.EventMediaParameterEncodeCreated) {
incomingEvent.az<EncodeArgumentCreatedEvent>()?.data
if (!canStart(event, events)) {
return
}
val encodeArguments = if (event.eventType == Events.EventMediaParameterEncodeCreated) {
event.az<EncodeArgumentCreatedEvent>()?.data
} else {
events.find { it.eventType == Events.EventMediaParameterEncodeCreated }
?.az<EncodeArgumentCreatedEvent>()?.data
}
if (encodeArguments == null) {
log.error { "No Encode arguments found.. referenceId: ${incomingEvent.referenceId()}" }
log.error { "No Encode arguments found.. referenceId: ${event.referenceId()}" }
return
}
onProduceEvent(
EncodeWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = encodeArguments
)
).also { event ->
onProduceEvent(event)
taskManager.createTask(
referenceId = event.referenceId(),
eventId = event.eventId(),
derivedFromEventId = event.derivedFromEventId(),
task = TaskType.Encode,
data = WGson.gson.toJson(event.data!!),
inputFile = event.data!!.inputFile
)
}
}
}

View File

@ -1,6 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
@ -26,7 +28,19 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
Events.EventMediaParseStreamPerformed,
Events.EventMediaReadOutNameAndType
)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
val eventType = events.map { it.eventType }
return state && eventType.containsAll(listensForEvents)
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>() ?: return
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.EXTRACT) == false) {
return
@ -54,11 +68,11 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
val result = mapper.getArguments()
if (result.isEmpty()) {
onProduceEvent(ExtractArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Skipped)
metadata = event.makeDerivedEventInfo(EventStatus.Skipped)
))
} else {
onProduceEvent(ExtractArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = result
))
}

View File

@ -1,10 +1,17 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.derivedFromEventId
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.data.referenceId
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.taskManager
import no.iktdev.mediaprocessing.coordinator.tasksV2.implementations.WorkTaskListener
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.contract.data.*
@ -17,41 +24,60 @@ class ExtractWorkTaskListener: WorkTaskListener() {
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkEncodeCreated
override val produceEvent: Events = Events.EventWorkExtractCreated
override val listensForEvents: List<Events> = listOf(
Events.EventMediaParameterEncodeCreated,
Events.EventMediaParameterExtractCreated,
Events.EventMediaWorkProceedPermitted
)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
if (!canStart(incomingEvent, events)) {
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val state = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return state
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val arguments = if (incomingEvent.eventType == Events.EventMediaParameterExtractCreated) {
incomingEvent.az<ExtractArgumentCreatedEvent>()?.data
if (!canStart(event, events)) {
return
}
val arguments = if (event.eventType == Events.EventMediaParameterExtractCreated) {
event.az<ExtractArgumentCreatedEvent>()?.data
} else {
events.find { it.eventType == Events.EventMediaParameterExtractCreated }
?.az<ExtractArgumentCreatedEvent>()?.data
}
if (arguments == null) {
log.error { "No Extract arguments found.. referenceId: ${incomingEvent.referenceId()}" }
log.error { "No Extract arguments found.. referenceId: ${event.referenceId()}" }
return
}
if (arguments.isEmpty()) {
ExtractWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
return
}
arguments.mapNotNull {
ExtractWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = it
)
}.forEach { event ->
onProduceEvent(event)
taskManager.createTask(
referenceId = event.referenceId(),
eventId = event.eventId(),
derivedFromEventId = event.derivedFromEventId(),
task = TaskType.Extract,
data = WGson.gson.toJson(event.data!!),
inputFile = event.data!!.inputFile
)
}
}
}

View File

@ -1,6 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import com.google.gson.JsonObject
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.exfl.using
@ -33,14 +35,24 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
Events.EventMediaMetadataSearchPerformed
)
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
val metadataResult = incomingEvent.az<MediaMetadataReceivedEvent>()
override fun shouldIHandleFailedEvents(incomingEvent: Event): Boolean {
return (incomingEvent.eventType in listensForEvents)
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val metadataResult = event.az<MediaMetadataReceivedEvent>()
val mediaBaseInfo = events.findLast { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az<BaseInfoEvent>()?.data
if (mediaBaseInfo == null) {
log.error { "Required event ${Events.EventMediaReadBaseInfoPerformed} is not present" }
coordinator?.produceNewEvent(
MediaOutInformationConstructedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
)
return
@ -53,12 +65,12 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
outDirectory = pm.getOutputDirectory().absolutePath,
info = vi
).let { MediaOutInformationConstructedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = it
) }
} else {
MediaOutInformationConstructedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
}
onProduceEvent(result)

View File

@ -1,6 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
@ -11,7 +13,6 @@ import no.iktdev.mediaprocessing.shared.contract.data.BaseInfoEvent
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.data.MediaMetadataReceivedEvent
import no.iktdev.mediaprocessing.shared.contract.data.az
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
@ -21,6 +22,9 @@ import java.time.ZoneOffset
import java.time.format.DateTimeFormatter
import java.util.*
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10
@Service
@EnableScheduling
class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
@ -37,16 +41,28 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
)
val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60
val metadataTimeout = metadataTimeoutMinutes * 60
val waitingProcessesForMeta: MutableMap<String, MetadataTriggerData> = mutableMapOf()
/**
* This one gets special treatment, since it will only produce a timeout it does not need to use the incoming event
*/
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
if (incomingEvent.eventType == Events.EventMediaReadBaseInfoPerformed &&
events.none { it.eventType == Events.EventMediaMetadataSearchPerformed }) {
val baseInfo = incomingEvent.az<BaseInfoEvent>()?.data
if (events.any { it.eventType == Events.EventMediaReadBaseInfoPerformed } &&
events.none { it.eventType == Events.EventMediaMetadataSearchPerformed } &&
!waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) {
val consumedIncoming = incomingEvent.consume()
if (consumedIncoming == null) {
log.error { "Event is null and should not be available nor provided! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed}?.az<BaseInfoEvent>()?.data
if (baseInfo == null) {
log.error { "BaseInfoEvent is null for referenceId: ${incomingEvent.metadata.referenceId} on eventId: ${incomingEvent.metadata.eventId}" }
log.error { "BaseInfoEvent is null for referenceId: ${consumedIncoming.metadata.referenceId} on eventId: ${consumedIncoming.metadata.eventId}" }
return
}
@ -54,17 +70,16 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC)
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH)
if (!waitingProcessesForMeta.containsKey(consumedIncoming.metadata.referenceId)) {
waitingProcessesForMeta[consumedIncoming.metadata.referenceId] =
MetadataTriggerData(consumedIncoming.metadata.eventId, LocalDateTime.now())
log.info { "Sending ${baseInfo.title} to waiting queue. Expiry ${dateTime.format(formatter)}" }
if (!waitingProcessesForMeta.containsKey(incomingEvent.metadata.referenceId)) {
waitingProcessesForMeta[incomingEvent.metadata.referenceId] =
MetadataTriggerData(incomingEvent.metadata.eventId, LocalDateTime.now())
}
}
if (incomingEvent.eventType == Events.EventMediaMetadataSearchPerformed) {
if (waitingProcessesForMeta.containsKey(incomingEvent.metadata.referenceId)) {
waitingProcessesForMeta.remove(incomingEvent.metadata.referenceId)
}
if (events.any { it.eventType == Events.EventMediaMetadataSearchPerformed }
&& waitingProcessesForMeta.containsKey(incomingEvent.metadata().referenceId)) {
waitingProcessesForMeta.remove(incomingEvent.metadata().referenceId)
}
}

View File

@ -3,6 +3,8 @@ package no.iktdev.mediaprocessing.coordinator.tasksV2.listeners
import com.google.gson.Gson
import com.google.gson.JsonObject
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.dataAs
import no.iktdev.eventi.implementations.EventCoordinator
@ -34,21 +36,30 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
Events.EventMediaReadStreamPerformed
)
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
return super.shouldIProcessAndHandleEvent(incomingEvent, events)
}
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
// MediaFileStreamsReadEvent
val readData = incomingEvent.dataAs<MediaFileStreamsReadEvent>()?.data
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val readData = event.dataAs<JsonObject>()
val result = try {
MediaFileStreamsParsedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = parseStreams(readData)
)
} catch (e: Exception) {
e.printStackTrace()
MediaFileStreamsParsedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
}
onProduceEvent(result)
}

View File

@ -4,6 +4,8 @@ import com.google.gson.Gson
import com.google.gson.JsonObject
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.dataAs
import no.iktdev.eventi.implementations.EventCoordinator
@ -17,8 +19,8 @@ import no.iktdev.mediaprocessing.shared.contract.EventsListenerContract
import no.iktdev.mediaprocessing.shared.contract.EventsManagerContract
import no.iktdev.mediaprocessing.shared.contract.data.Event
import no.iktdev.mediaprocessing.shared.contract.data.MediaFileStreamsReadEvent
import no.iktdev.mediaprocessing.shared.contract.data.StartEventData
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -34,24 +36,34 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
override val produceEvent: Events = Events.EventMediaReadStreamPerformed
override val listensForEvents: List<Events> = listOf(Events.EventMediaProcessStarted)
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
val status = super.shouldIProcessAndHandleEvent(incomingEvent, events)
return status
}
override fun onEventsReceived(incomingEvent: Event, events: List<Event>) {
val startEvent = incomingEvent.dataAs<MediaProcessStarted>() ?: return
if (!startEvent.operations.any { it in requiredOperations }) {
log.info { "${incomingEvent.metadata.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" }
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val startEvent = event.dataAs<StartEventData>()
if (startEvent == null || !startEvent.operations.any { it in requiredOperations }) {
log.info { "${event.metadata.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" }
return
}
val result = runBlocking {
try {
val data = fileReadStreams(startEvent, incomingEvent.metadata.eventId)
val data = fileReadStreams(startEvent, event.metadata.eventId)
MediaFileStreamsReadEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = data
)
} catch (e: Exception) {
e.printStackTrace()
MediaFileStreamsReadEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
)
}
}
@ -59,7 +71,7 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
}
suspend fun fileReadStreams(started: MediaProcessStarted, eventId: String): JsonObject? {
suspend fun fileReadStreams(started: StartEventData, eventId: String): JsonObject? {
val file = File(started.file)
return if (file.exists() && file.isFile) {
val result = readStreams(file)

View File

@ -2,44 +2,44 @@ package no.iktdev.mediaprocessing.coordinator.utils
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.*
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.contract.data.Event
val log = KotlinLogging.logger {}
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage>): Map<TaskType, Boolean> {
/*
fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<Event>): Map<TaskType, Boolean> {
val response = mutableMapOf<TaskType, Boolean>()
if (tasks.any { it == TaskType.Encode }) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterEncodeCreated
Events.EventMediaParameterEncodeCreated
) } == null) {
response[TaskType.Encode] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterEncodeCreated}" }
log.info { "Waiting for ${Events.EventMediaParameterEncodeCreated}" }
}
}
val convertEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventWorkConvertCreated) }
val convertEvent = events.lastOrNull { it.isOfEvent(Events.EventWorkConvertCreated) }
if (tasks.any { it == TaskType.Convert } && tasks.none { it == TaskType.Extract }) {
if (convertEvent == null) {
response[TaskType.Convert] = true
log.info { "Waiting for ${KafkaEvents.EventWorkConvertCreated}" }
log.info { "Waiting for ${Events.EventWorkConvertCreated}" }
}
} else if (tasks.any { it == TaskType.Convert }) {
val extractEvent = events.lastOrNull { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }
val extractEvent = events.lastOrNull { it.isOfEvent(Events.EventMediaParameterExtractCreated) }
if (extractEvent == null || extractEvent.isSuccess() && convertEvent == null) {
response[TaskType.Convert] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterExtractCreated}" }
log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" }
}
}
if (tasks.contains(TaskType.Extract)) {
if (events.lastOrNull { it.isOfEvent(
KafkaEvents.EventMediaParameterExtractCreated
Events.EventMediaParameterExtractCreated
) } == null) {
response[TaskType.Extract] = true
log.info { "Waiting for ${KafkaEvents.EventMediaParameterExtractCreated}" }
log.info { "Waiting for ${Events.EventMediaParameterExtractCreated}" }
}
}
@ -49,12 +49,12 @@ fun isAwaitingPrecondition(tasks: List<TaskType>, events: List<PersistentMessage
}
fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
fun isAwaitingTask(task: TaskType, events: List<Event>): Boolean {
val taskStatus = when (task) {
TaskType.Encode -> {
val argumentEvent = KafkaEvents.EventMediaParameterEncodeCreated
val taskCreatedEvent = KafkaEvents.EventWorkEncodeCreated
val taskCompletedEvent = KafkaEvents.EventWorkEncodePerformed
val argumentEvent = Events.EventMediaParameterEncodeCreated
val taskCreatedEvent = Events.EventWorkEncodeCreated
val taskCompletedEvent = Events.EventWorkEncodePerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true
if (!argument.isSuccess()) return false
@ -71,9 +71,9 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
waiting
}
TaskType.Extract -> {
val argumentEvent = KafkaEvents.EventMediaParameterExtractCreated
val taskCreatedEvent = KafkaEvents.EventWorkExtractCreated
val taskCompletedEvent = KafkaEvents.EventWorkExtractPerformed
val argumentEvent = Events.EventMediaParameterExtractCreated
val taskCreatedEvent = Events.EventWorkExtractCreated
val taskCompletedEvent = Events.EventWorkExtractPerformed
val argument = events.findLast { it.event == argumentEvent } ?: return true
if (!argument.isSuccess()) return false
@ -89,12 +89,12 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
}
TaskType.Convert -> {
val extractEvents = events.findLast { it.isOfEvent(KafkaEvents.EventMediaParameterExtractCreated) }
val extractEvents = events.findLast { it.isOfEvent(Events.EventMediaParameterExtractCreated) }
if (extractEvents == null || extractEvents.isSkipped()) {
false
} else {
val taskCreatedEvent = KafkaEvents.EventWorkConvertCreated
val taskCompletedEvent = KafkaEvents.EventWorkConvertPerformed
val taskCreatedEvent = Events.EventWorkConvertCreated
val taskCompletedEvent = Events.EventWorkConvertPerformed
val argument = events.findLast { it.event == taskCreatedEvent } ?: return true
if (!argument.isSuccess()) return false
@ -114,4 +114,4 @@ fun isAwaitingTask(task: TaskType, events: List<PersistentMessage>): Boolean {
log.info { "isAwaiting for $task" }
}
return taskStatus
}
}*/

View File

@ -34,7 +34,7 @@ interface FileWatcherEvents {
@Service
class InputDirectoryWatcher(@Autowired var coordinator: EventCoordinatorDep): FileWatcherEvents {
class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatcherEvents {
private val logger = KotlinLogging.logger {}
val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val queue = FileWatcherQueue()

View File

@ -51,7 +51,8 @@ dependencies {
//implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:eventi")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))

View File

@ -2,9 +2,6 @@ package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.Defaults
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
@ -12,11 +9,6 @@ import org.springframework.web.client.RestTemplate
@Configuration
@Import(CoordinatorProducer::class, DefaultMessageListener::class)
class KafkaLocalInit: KafkaImplementation() {
}
@Configuration
public class DefaultProcesserConfiguration: Defaults() {
}

View File

@ -31,6 +31,7 @@ fun getEventsDatabase(): MySqlDataSource {
return eventsDatabase
}
lateinit var taskManager: TasksManager
lateinit var runnerManager: RunnerManager
@ -38,6 +39,8 @@ private val log = KotlinLogging.logger {}
fun main(args: Array<String>) {
runApplication<ProcesserApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
ioCoroutine.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
@ -59,8 +62,7 @@ fun main(args: Array<String>) {
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName)
runnerManager.assignRunner()
val context = runApplication<ProcesserApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
}

View File

@ -2,9 +2,16 @@ package no.iktdev.mediaprocessing.processer
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.*
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.persistance.RunnerManager
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import no.iktdev.mediaprocessing.shared.common.persistance.toTask
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.contract.data.Event
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.select
import org.springframework.beans.factory.annotation.Value
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@ -64,6 +71,10 @@ class TaskCoordinator(): TaskCoordinatorBase() {
}
}
override fun onProduceEvent(event: Event) {
taskManager.produceEvent(event)
}
override fun clearExpiredClaims() {
val expiredClaims = taskManager.getTasksWithExpiredClaim().filter { it.task in listOf(TaskType.Encode, TaskType.Extract) }
expiredClaims.forEach {

View File

@ -1,158 +0,0 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import com.github.pgreze.process.Redirect
import com.github.pgreze.process.process
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegProgressDecoder
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import java.io.File
import java.time.Duration
import java.util.UUID
class FfmpegWorker(
val referenceId: String,
val eventId: String,
val info: FfmpegWorkRequestCreated,
val listener: FfmpegWorkerEvents,
val logDir: File
) {
private val scope = CoroutineScope(Dispatchers.IO + Job())
private var job: Job? = null
fun isWorking(): Boolean {
return job != null && (job?.isCompleted != true) && scope.isActive
}
val decoder = FfmpegProgressDecoder()
private val outputCache = mutableListOf<String>()
private val log = KotlinLogging.logger {}
val logFile = logDir.using("$eventId-=-${File(info.outFile).nameWithoutExtension}.log")
val getOutputCache = outputCache.toList()
data class FfmpegWorkerArgumentsBuilder(
private val mutableList: MutableList<String> = mutableListOf()
) {
private val defaultArguments = listOf(
"-nostdin",
"-hide_banner"
)
private val progressArguments = listOf("-progress", "pipe:1")
fun using(info: FfmpegWorkRequestCreated) = apply {
this.mutableList.add(info.inputFile)
this.mutableList.addAll(info.arguments)
this.mutableList.add(info.outFile)
}
fun build(): List<String> {
return (if (ProcesserEnv.allowOverwrite) listOf("-y") else emptyList()) + defaultArguments + listOf("-i") + mutableList
}
fun buildWithProgress(): List<String> {
return build() + progressArguments
}
}
fun run() {
log.info { "Starting ffmpeg ReferenceId: $referenceId, eventId $eventId for file ${info.outFile}, debugId: ${UUID.randomUUID().toString()}" }
val args = FfmpegWorkerArgumentsBuilder().using(info).build()
job = scope.launch {
execute(args)
}
}
fun runWithProgress() {
log.info { "Starting ffmpeg ReferenceId: $referenceId, eventId $eventId for file ${info.outFile}, debugId: ${UUID.randomUUID().toString()}" }
val args = FfmpegWorkerArgumentsBuilder().using(info).buildWithProgress()
job = scope.launch {
execute(args)
}
}
private suspend fun startIAmAlive() {
scope.launch {
while (scope.isActive && job?.isCompleted != true) {
delay(Duration.ofMinutes(5).toMillis())
listener.onIAmAlive(referenceId, eventId)
}
}
}
fun cancel(message: String = "Work was interrupted as requested") {
job?.cancel()
scope.cancel(message)
listener.onError(referenceId, eventId, info, message)
}
private suspend fun execute(args: List<String>) {
withContext(Dispatchers.IO) {
logFile.createNewFile()
}
startIAmAlive()
listener.onStarted(referenceId, eventId, info)
val processOp = process(
ProcesserEnv.ffmpeg, *args.toTypedArray(),
stdout = Redirect.CAPTURE,
stderr = Redirect.CAPTURE,
consumer = {
//log.info { it }
onOutputChanged(it)
},
destroyForcibly = true
)
val result = processOp
onOutputChanged("Received exit code: ${result.resultCode}")
if (result.resultCode != 0) {
listener.onError(referenceId, eventId, info, result.output.joinToString("\n"))
} else {
listener.onCompleted(referenceId, eventId, info)
}
}
private var progress: FfmpegDecodedProgress? = null
fun onOutputChanged(line: String) {
outputCache.add(line)
writeToLog(line)
// toList is needed to prevent mutability.
decoder.parseVideoProgress(outputCache.toList())?.let { decoded ->
try {
val _progress = decoder.getProgress(decoded)
if (progress == null || _progress.progress > (progress?.progress ?: -1)) {
progress = _progress
listener.onProgressChanged(referenceId, eventId, info, _progress)
}
} catch (e: Exception) {
e.printStackTrace()
}
}
}
fun writeToLog(line: String) {
logFile.printWriter().use {
it.appendLine(line)
}
}
}
interface FfmpegWorkerEvents {
fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated)
fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated)
fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String)
fun onProgressChanged(
referenceId: String,
eventId: String,
info: FfmpegWorkRequestCreated,
progress: FfmpegDecodedProgress
)
fun onIAmAlive(referenceId: String, eventId: String) {}
}

View File

@ -4,6 +4,9 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.Reporter
import no.iktdev.mediaprocessing.processer.TaskCoordinator
@ -11,14 +14,15 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegRunner
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.ClaimableTask
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.persistance.Status
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.data.EncodeArgumentData
import no.iktdev.mediaprocessing.shared.contract.data.EncodeWorkPerformedEvent
import no.iktdev.mediaprocessing.shared.contract.data.EncodedData
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -56,20 +60,21 @@ class EncodeServiceV2(
fun startEncode(event: Task) {
val ffwrc = event.data as FfmpegTaskData
val outFile = File(ffwrc.outFile)
val ffwrc = event.data as EncodeArgumentData
val outFile = File(ffwrc.outputFile)
outFile.parentFile.mkdirs()
if (!logDir.exists()) {
logDir.mkdirs()
}
val setClaim = taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
val setClaim =
taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
if (setClaim) {
log.info { "Claim successful for ${event.referenceId} encode" }
runner = FfmpegRunner(
inputFile = ffwrc.inputFile,
outputFile = ffwrc.outFile,
outputFile = ffwrc.outputFile,
arguments = ffwrc.arguments,
logDir = logDir, listener = this
)
@ -77,7 +82,7 @@ class EncodeServiceV2(
if (ffwrc.arguments.firstOrNull() != "-y") {
this.onError(
ffwrc.inputFile,
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}"
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outputFile}"
)
// Setting consumed to prevent spamming
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
@ -129,13 +134,16 @@ class EncodeServiceV2(
readbackIsSuccess = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkEncodePerformed,
data = ProcesserEncodeWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = task.derivedFromEventId,
outFile = outputFile
tasks.onProduceEvent(
EncodeWorkPerformedEvent(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Success
),
data = EncodedData(
outputFile
)
)
)
sendProgress(
@ -156,15 +164,13 @@ class EncodeServiceV2(
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkEncodePerformed,
data = ProcesserEncodeWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
derivedFromEventId = task.derivedFromEventId,
)
tasks.onProduceEvent(EncodeWorkPerformedEvent(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Failed
)
))
sendProgress(
task.referenceId, task.eventId, status = WorkStatus.Failed, progress = FfmpegDecodedProgress(
progress = 0,
@ -183,7 +189,6 @@ class EncodeServiceV2(
}
fun sendProgress(
referenceId: String,
eventId: String,

View File

@ -1,8 +1,9 @@
package no.iktdev.mediaprocessing.processer.services
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.mediaprocessing.processer.Reporter
import no.iktdev.mediaprocessing.processer.TaskCoordinator
@ -11,13 +12,13 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.limitedWhile
import no.iktdev.mediaprocessing.shared.common.task.FfmpegTaskData
import no.iktdev.mediaprocessing.shared.common.persistance.Status
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.contract.data.ExtractArgumentData
import no.iktdev.mediaprocessing.shared.contract.data.ExtractWorkPerformedEvent
import no.iktdev.mediaprocessing.shared.contract.data.ExtractedData
import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo
import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -54,8 +55,8 @@ class ExtractServiceV2(
fun startExtract(event: Task) {
val ffwrc = event.data as FfmpegTaskData
val outFile = File(ffwrc.outFile).also {
val ffwrc = event.data as ExtractArgumentData
val outFile = File(ffwrc.outputFile).also {
it.parentFile.mkdirs()
}
if (!logDir.exists()) {
@ -67,7 +68,7 @@ class ExtractServiceV2(
log.info { "Claim successful for ${event.referenceId} extract" }
runner = FfmpegRunner(
inputFile = ffwrc.inputFile,
outputFile = ffwrc.outFile,
outputFile = ffwrc.outputFile,
arguments = ffwrc.arguments,
logDir = logDir,
listener = this
@ -76,7 +77,7 @@ class ExtractServiceV2(
if (ffwrc.arguments.firstOrNull() != "-y") {
this.onError(
ffwrc.inputFile,
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}"
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outputFile}"
)
// Setting consumed to prevent spamming
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
@ -105,13 +106,16 @@ class ExtractServiceV2(
successfulComplete = taskManager.isTaskCompleted(task.referenceId, task.eventId)
}
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkExtractPerformed,
data = ProcesserExtractWorkPerformed(
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = task.derivedFromEventId,
outFile = outputFile
tasks.onProduceEvent(
ExtractWorkPerformedEvent(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Success
),
data = ExtractedData(
outputFile
)
)
)
sendProgress(
@ -132,13 +136,13 @@ class ExtractServiceV2(
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
log.info { "Encode failed for ${task.referenceId}\n$message" }
tasks.producer.sendMessage(
referenceId = task.referenceId, event = KafkaEvents.EventWorkExtractPerformed,
data = ProcesserExtractWorkPerformed(
status = Status.ERROR,
message = message,
producedBy = serviceId,
derivedFromEventId = task.derivedFromEventId,
tasks.onProduceEvent(
ExtractWorkPerformedEvent(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Failed
)
)
)
sendProgress(

View File

@ -47,7 +47,6 @@ dependencies {
implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:common")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:kafka")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))

View File

@ -9,7 +9,6 @@ import no.iktdev.exfl.observable.Observables
import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.toEventsDatabase
import no.iktdev.mediaprocessing.ui.dto.ExplorerItem

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.ui.service
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.contract.dto.EventsDto
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.ui.getEventsDatabase
import org.jetbrains.exposed.sql.*

View File

@ -9,7 +9,6 @@ findProject(":apps:converter")?.name = "converter"
findProject(":apps:processer")?.name = "processer"
findProject(":shared")?.name = "shared"
findProject(":shared:kafka")?.name = "kafka"
findProject(":shared:contract")?.name = "contract"
findProject(":shared:common")?.name = "common"
findProject(":shared:eventi")?.name = "eventi"
@ -21,7 +20,6 @@ include("apps:converter")
include("apps:processer")
include("shared")
include("shared:kafka")
include("shared:contract")
include("shared:common")
include("shared:eventi")

View File

@ -42,8 +42,8 @@ dependencies {
implementation("org.apache.commons:commons-lang3:3.12.0")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:eventi")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")

View File

@ -1,21 +0,0 @@
package no.iktdev.mediaprocessing.shared.common
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Import
import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service
@Import(DefaultMessageListener::class)
abstract class ProcessingService() {
@Autowired
lateinit var producer: CoordinatorProducer
abstract fun onResult(referenceId: String, data: MessageDataWrapper)
@PostConstruct
abstract fun onReady(): Unit
}

View File

@ -4,7 +4,7 @@ import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.ActiveMode
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.contract.data.Event
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
@ -17,8 +17,8 @@ abstract class TaskCoordinatorBase() {
private var ready: Boolean = false
fun isReady() = ready
@Autowired
lateinit var producer: CoordinatorProducer
abstract fun onProduceEvent(event: Event)
abstract val taskAvailabilityEventListener: MutableMap<TaskType, MutableList<TaskQueueListener>>

View File

@ -2,9 +2,6 @@ package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.delay
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import java.io.File
import java.io.RandomAccessFile
import java.net.InetAddress
@ -32,20 +29,6 @@ fun getAppVersion(): Int {
return Integer.parseInt(parsed)
}
fun List<PersistentMessage>.lastOrSuccess(): PersistentMessage? {
return this.lastOrNull { it.data.isSuccess() } ?: this.lastOrNull()
}
fun List<PersistentMessage>.lastOrSuccessOf(event: KafkaEvents): PersistentMessage? {
val validEvents = this.filter { it.event == event }
return validEvents.lastOrNull { it.data.isSuccess() } ?: validEvents.lastOrNull()
}
fun List<PersistentMessage>.lastOrSuccessOf(event: KafkaEvents, predicate: (PersistentMessage) -> Boolean): PersistentMessage? {
val validEvents = this.filter { it.event == event && predicate(it) }
return validEvents.lastOrNull()
}
suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) {
var elapsedDelay = 0L

View File

@ -1,47 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.jetbrains.exposed.sql.*
import java.time.LocalDateTime
class PersistentDataReader(var dataSource: DataSource) {
val dzz = DeserializingRegistry()
@Deprecated("Use PersistentEventManager.getAllEventsGrouped")
fun getAllMessages(): List<List<PersistentMessage>> {
val events = withTransaction(dataSource.database) {
events.selectAll()
.groupBy { it[events.referenceId] }
}
return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getEvetnsWith")
fun getMessagesFor(referenceId: String): List<PersistentMessage> {
return withTransaction(dataSource.database) {
events.select { events.referenceId eq referenceId }
.orderBy(events.created, SortOrder.ASC)
.mapNotNull { fromRowToPersistentMessage(it, dzz) }
} ?: emptyList()
}
@Deprecated("Use PersistentEventManager.getEventsUncompleted")
fun getUncompletedMessages(): List<List<PersistentMessage>> {
val result = withDirtyRead(dataSource.database) {
events.selectAll()
.andWhere { events.event neq KafkaEvents.EventMediaProcessCompleted.event }
.groupBy { it[events.referenceId] }
.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } }
} ?: emptyList()
return result
}
}

View File

@ -1,183 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
import java.security.MessageDigest
import java.time.LocalDateTime
import kotlin.text.Charsets.UTF_8
private val log = KotlinLogging.logger {}
class PersistentEventManager(private val dataSource: DataSource) {
val dzz = DeserializingRegistry()
/**
* Deletes the events
*/
private fun deleteSupersededEvents(superseded: List<PersistentMessage>) {
withTransaction(dataSource) {
superseded.forEach { duplicate ->
events.deleteWhere {
(events.referenceId eq duplicate.referenceId) and
(events.eventId eq duplicate.eventId) and
(events.event eq duplicate.event.event)
}
}
}
}
private val exemptedFromSingleEvent = listOf(
KafkaEvents.EventWorkConvertCreated,
KafkaEvents.EventWorkExtractCreated,
KafkaEvents.EventWorkConvertPerformed,
KafkaEvents.EventWorkExtractPerformed
)
private fun isExempted(event: KafkaEvents): Boolean {
return event in exemptedFromSingleEvent
}
/**
* @param referenceId Reference
* @param eventId Current eventId for the message, required to prevent deletion of itself
* @param event Current event for the message
*/
private fun deleteSupersededEvents(referenceId: String, eventId: String, event: KafkaEvents, derivedFromId: String?) {
val forRemoval = mutableListOf<PersistentMessage>()
val present = getEventsWith(referenceId).filter { it.data.derivedFromEventId != null }
val helper = PersistentMessageHelper(present)
val replaced = if (!isExempted(event)) present.find { it.eventId != eventId && it.event == event } else null
val orphaned = replaced?.let { helper.getEventsRelatedTo(it.eventId) } ?: emptyList()
forRemoval.addAll(orphaned)
//superseded.filter { !notSuperseded.contains(it) }.forEach { availableForRemoval.addAll(helper.getEventsRelatedTo(it.eventId)) }
deleteSupersededEvents(forRemoval)
}
//region Database read
fun getEventsWith(referenceId: String): List<PersistentMessage> {
return withDirtyRead(dataSource.database) {
events.select {
(events.referenceId eq referenceId)
}
.orderBy(events.created, SortOrder.ASC)
.toPersistentMessage(dzz)
} ?: emptyList()
}
fun getAllEvents(): List<PersistentMessage> {
return withDirtyRead(dataSource.database) {
events.selectAll()
.toPersistentMessage(dzz)
} ?: emptyList()
}
fun getAllEventsGrouped(): List<List<PersistentMessage>> {
return getAllEvents().toGrouped()
}
fun getEventsUncompleted(): List<List<PersistentMessage>> {
val identifiesAsCompleted = listOf(
KafkaEvents.EventCollectAndStore
)
val all = getAllEventsGrouped()
return all.filter { entry -> entry.none { it.event in identifiesAsCompleted } }
}
//endregion
//region Database write
/**
* Stores the kafka event and its data in the database as PersistentMessage
* @param event KafkaEvents
* @param message Kafka message object
*/
fun setEvent(event: KafkaEvents, message: Message<*>): Boolean {
withTransaction(dataSource.database) {
allEvents.insert {
it[referenceId] = message.referenceId
it[eventId] = message.eventId
it[events.event] = event.event
it[data] = message.dataAsJson()
}
}
val existing = getEventsWith(message.referenceId)
val derivedId = message.data?.derivedFromEventId
if (derivedId != null) {
val isNewEventOrphan = existing.none { it.eventId == derivedId }
if (isNewEventOrphan) {
log.warn { "Message not saved! ${message.referenceId} with eventId(${message.eventId}) for event ${event.event} has derivedEventId($derivedId) which does not exist!" }
return false
}
}
val exception = executeOrException(dataSource.database) {
events.insert {
it[referenceId] = message.referenceId
it[eventId] = message.eventId
it[events.event] = event.event
it[data] = message.dataAsJson()
}
}
val success = if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.debug { "Error is of SQLIntegrityConstraintViolationException" }
log.error { exception.message }
exception.printStackTrace()
} else {
log.debug { "Error code is: ${exception.errorCode}" }
log.error { exception.message }
exception.printStackTrace()
}
} else {
log.error { exception.message }
exception.printStackTrace()
}
false
} else {
true
}
if (success) {
deleteSupersededEvents(referenceId = message.referenceId, eventId = message.eventId, event = event, derivedFromId = message.data?.derivedFromEventId)
}
return success
}
//endregion
}
fun List<PersistentMessage>?.toGrouped(): List<List<PersistentMessage>> {
return this?.groupBy { it.referenceId }?.mapNotNull { it.value } ?: emptyList()
}
fun Query?.toPersistentMessage(dzz: DeserializingRegistry): List<PersistentMessage> {
return this?.mapNotNull { fromRowToPersistentMessage(it, dzz) } ?: emptyList()
}

View File

@ -1,95 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.isSkipped
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.jetbrains.exposed.sql.ResultRow
import java.time.LocalDateTime
data class PersistentMessage(
val referenceId: String,
val eventId: String,
val event: KafkaEvents,
val data: MessageDataWrapper,
val created: LocalDateTime
)
fun List<PersistentMessage>.lastOf(event: KafkaEvents): PersistentMessage? {
return this.lastOrNull { it.event == event && it.isSuccess() }
}
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event
}
fun PersistentMessage.isSuccess(): Boolean {
return try {
this.data.isSuccess()
} catch (e: Exception) {
false
}
}
fun PersistentMessage.isSkipped(): Boolean {
return try {
this.data.isSkipped()
} catch (e: Exception) {
false
}
}
class PersistentMessageHelper(val messages: List<PersistentMessage>) {
fun findOrphanedEvents(): List<PersistentMessage> {
val withDerivedId = messages.filter { it.data.derivedFromEventId != null }
val idsFlat = messages.map { it.eventId }
return withDerivedId.filter { it.data.derivedFromEventId !in idsFlat }
}
fun getEventsRelatedTo(eventId: String): List<PersistentMessage> {
val triggered = messages.firstOrNull { it.eventId == eventId } ?: return emptyList()
val usableEvents = messages.filter { it.eventId != eventId && it.data.derivedFromEventId != null }
val derivedEventsMap = mutableMapOf<String, MutableList<String>>()
for (event in usableEvents) {
derivedEventsMap.getOrPut(event.data.derivedFromEventId!!) { mutableListOf() }.add(event.eventId)
}
val eventsToFind = mutableSetOf<String>()
// Utfør DFS for å finne alle avledede hendelser som skal slettes
dfs(triggered.eventId, derivedEventsMap, eventsToFind)
return messages.filter { it.eventId in eventsToFind }
}
/**
* @param eventId Initial eventId
*/
private fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToFind: MutableSet<String>) {
eventsToFind.add(eventId)
derivedEventsMap[eventId]?.forEach { derivedEventId ->
dfs(derivedEventId, derivedEventsMap, eventsToFind)
}
}
}
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try {
KafkaEvents.toEvent(row[events.event])
} catch (e: IllegalArgumentException) {
e.printStackTrace()
return null
}?: return null
val dzdata = dez.deserializeData(kev, row[events.data])
return PersistentMessage(
referenceId = row[events.referenceId],
eventId = row[events.eventId],
event = kev,
data = dzdata,
created = row[events.created]
)
}

View File

@ -1,14 +1,15 @@
package no.iktdev.mediaprocessing.shared.common.persistance
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withDirtyRead
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.data.referenceId
import no.iktdev.eventi.data.toJson
import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.common.task.TaskType
import no.iktdev.mediaprocessing.shared.common.task.TaskDoz
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.contract.data.Event
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
@ -16,6 +17,12 @@ import java.security.MessageDigest
import java.time.LocalDateTime
import java.util.*
enum class Status {
SKIPPED,
COMPLETED,
ERROR
}
class TasksManager(private val dataSource: DataSource) {
private val log = KotlinLogging.logger {}
@ -136,6 +143,38 @@ class TasksManager(private val dataSource: DataSource) {
}
}
fun produceEvent(event: Event): Boolean {
val exception = executeOrException(dataSource.database) {
events.insert {
it[referenceId] = event.referenceId()
it[eventId] = event.eventId()
it[events.event] = event.eventType.event
it[data] = event.toJson()
}
}
val success = if (exception != null) {
if (exception.isExposedSqlException()) {
if ((exception as ExposedSQLException).isCausedByDuplicateError()) {
log.debug { "Error is of SQLIntegrityConstraintViolationException" }
log.error { exception.message }
exception.printStackTrace()
} else {
log.debug { "Error code is: ${exception.errorCode}" }
log.error { exception.message }
exception.printStackTrace()
}
} else {
log.error { exception.message }
exception.printStackTrace()
}
false
} else {
true
}
return success
}
}
val digest = MessageDigest.getInstance("MD5")

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.task
import no.iktdev.mediaprocessing.shared.contract.dto.tasks.TaskData
import java.time.LocalDateTime
data class Task(

View File

@ -1,28 +0,0 @@
package no.iktdev.mediaprocessing.shared.common.task
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import org.jetbrains.exposed.sql.ResultRow
import java.io.Serializable
open class TaskData(
val inputFile: String,
): Serializable {
}
class FfmpegTaskData(
inputFile: String,
val arguments: List<String>,
val outFile: String
): TaskData(inputFile = inputFile)
class ConvertTaskData(
inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,
val outDirectory: String,
val outFormats: List<SubtitleFormats> = listOf()
): TaskData(inputFile = inputFile)

View File

@ -4,6 +4,10 @@ import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import com.mysql.cj.xdevapi.RowResult
import no.iktdev.mediaprocessing.shared.common.persistance.tasks
import no.iktdev.mediaprocessing.shared.contract.data.ConvertData
import no.iktdev.mediaprocessing.shared.contract.data.EncodeArgumentData
import no.iktdev.mediaprocessing.shared.contract.data.ExtractArgumentData
import no.iktdev.mediaprocessing.shared.contract.dto.tasks.TaskData
import org.jetbrains.exposed.sql.ResultRow
class TaskDoz {
@ -11,12 +15,9 @@ class TaskDoz {
fun <T: TaskData> dzdata(type: TaskType, data: String): T? {
val clazz: Class<out TaskData> = when(type) {
TaskType.Encode, TaskType.Extract -> {
FfmpegTaskData::class.java
}
TaskType.Convert -> {
ConvertTaskData::class.java
}
TaskType.Encode -> EncodeArgumentData::class.java
TaskType.Extract -> ExtractArgumentData::class.java
TaskType.Convert -> ConvertData::class.java
else -> TaskData::class.java
}
val type = TypeToken.getParameterized(clazz).type

View File

@ -14,6 +14,7 @@ dependencies {
implementation(project(mapOf("path" to ":shared:eventi")))
implementation("com.google.code.gson:gson:2.8.9")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("org.springframework.boot:spring-boot-starter:2.7.0")

View File

@ -0,0 +1,59 @@
package no.iktdev.mediaprocessing.shared.contract
import com.google.gson.reflect.TypeToken
import mu.KotlinLogging
import no.iktdev.eventi.core.WGson
import no.iktdev.mediaprocessing.shared.contract.data.*
private val log = KotlinLogging.logger {}
object EventToClazzTable {
val table = mutableMapOf(
Events.EventMediaProcessStarted to MediaProcessStartEvent::class.java,
Events.EventMediaReadBaseInfoPerformed to BaseInfoEvent::class.java,
Events.EventMediaReadStreamPerformed to MediaFileStreamsReadEvent::class.java,
Events.EventMediaParseStreamPerformed to MediaFileStreamsParsedEvent::class.java,
Events.EventWorkConvertCreated to ConvertWorkCreatedEvent::class.java,
Events.EventWorkConvertPerformed to ConvertWorkPerformed::class.java,
Events.EventMediaParameterEncodeCreated to EncodeArgumentCreatedEvent::class.java,
Events.EventWorkEncodeCreated to EncodeWorkCreatedEvent::class.java,
Events.EventWorkEncodePerformed to EncodeWorkPerformedEvent::class.java,
Events.EventMediaParameterExtractCreated to ExtractArgumentCreatedEvent::class.java,
Events.EventWorkExtractCreated to ExtractWorkCreatedEvent::class.java,
Events.EventWorkExtractPerformed to ExtractWorkPerformedEvent::class.java,
Events.EventWorkDownloadCoverPerformed to MediaCoverDownloadedEvent::class.java,
Events.EventMediaReadOutCover to MediaCoverInfoReceivedEvent::class.java,
Events.EventMediaParseStreamPerformed to MediaFileStreamsParsedEvent::class.java,
Events.EventMediaReadStreamPerformed to MediaFileStreamsReadEvent::class.java,
Events.EventMediaMetadataSearchPerformed to MediaMetadataReceivedEvent::class.java,
Events.EventMediaReadOutNameAndType to MediaOutInformationConstructedEvent::class.java,
Events.EventMediaWorkProceedPermitted to PermitWorkCreationEvent::class.java,
Events.EventMediaProcessCompleted to MediaProcessCompletedEvent::class.java
)
}
fun String.fromJsonWithDeserializer(event: Events): Event {
val clazz = EventToClazzTable.table[event]
clazz?.let { eventClass ->
try {
val type = TypeToken.getParameterized(eventClass).type
return WGson.gson.fromJson<Event>(this, type)
} catch (e: Exception) {
e.printStackTrace()
}
}
try {
// Fallback
val type = object : TypeToken<Event>() {}.type
return WGson.gson.fromJson(this, type)
} catch (e: Exception) {
e.printStackTrace()
}
// Default
val type = object : TypeToken<Event>() {}.type
log.error { "Failed to convert event: $event and data: $this to proper type!" }
return WGson.gson.fromJson<Event>(this, type)
}

View File

@ -12,7 +12,6 @@ enum class Events(val event: String) {
EventMediaParameterEncodeCreated ("event:media-encode-parameter:created"),
EventMediaParameterExtractCreated ("event:media-extract-parameter:created"),
EventMediaParameterConvertCreated ("event:media-convert-parameter:created"),
EventMediaParameterDownloadCoverCreated ("event:media-download-cover-parameter:created"),
EventMediaWorkProceedPermitted ("event:media-work-proceed:permitted"),

View File

@ -4,9 +4,9 @@ import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
class BaseInfoEvent(
override val eventType: Events = Events.EventMediaReadBaseInfoPerformed,
data class BaseInfoEvent(
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaReadBaseInfoPerformed,
override val data: BaseInfo? = null
) : Event()

View File

@ -2,18 +2,20 @@ package no.iktdev.mediaprocessing.shared.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.contract.dto.tasks.TaskData
data class ConvertWorkCreatedEvent(
override val eventType: Events = Events.EventWorkConvertCreated,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkConvertCreated,
override val data: ConvertData? = null
) : Event() {
}
data class ConvertData(
val inputFile: String,
override val inputFile: String,
val outputDirectory: String,
val outputFileName: String,
val formats: List<String> = emptyList(),
val formats: List<SubtitleFormats> = emptyList(),
val allowOverwrite: Boolean
)
): TaskData()

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
class ConvertWorkPerformed(
override val eventType: Events = Events.EventWorkConvertPerformed,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkConvertPerformed,
override val data: ConvertedData? = null,
val message: String? = null
) : Event() {

View File

@ -2,16 +2,18 @@ package no.iktdev.mediaprocessing.shared.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.dto.tasks.TaskData
data class EncodeArgumentCreatedEvent(
override val eventType: Events = Events.EventMediaParameterEncodeCreated,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaParameterEncodeCreated,
override val data: EncodeArgumentData? = null
) : Event() {
}
data class EncodeArgumentData(
val arguments: List<String>,
val outputFile: String,
val inputFile: String
)
override val inputFile: String
): TaskData()

View File

@ -4,7 +4,7 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class EncodeWorkCreatedEvent(
override val eventType: Events = Events.EventWorkEncodeCreated,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkEncodeCreated,
override val data: EncodeArgumentData? = null
) : Event()

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class EncodeWorkPerformedEvent(
override val eventType: Events = Events.EventWorkEncodePerformed,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkEncodePerformed,
override val data: EncodedData? = null,
val message: String? = null
) : Event() {

View File

@ -14,11 +14,3 @@ inline fun <reified T: Event> Event.az(): T? {
null
} else this
}
fun Event.referenceId(): String {
return this.metadata.referenceId
}
fun Event.eventId(): String {
return this.metadata.eventId
}

View File

@ -2,10 +2,11 @@ package no.iktdev.mediaprocessing.shared.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.dto.tasks.TaskData
data class ExtractArgumentCreatedEvent(
override val eventType: Events = Events.EventMediaParameterExtractCreated,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaParameterExtractCreated,
override val data: List<ExtractArgumentData>? = null
): Event()
@ -13,5 +14,5 @@ data class ExtractArgumentCreatedEvent(
data class ExtractArgumentData(
val arguments: List<String>,
val outputFile: String,
val inputFile: String
)
override val inputFile: String
): TaskData()

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class ExtractWorkCreatedEvent(
override val eventType: Events = Events.EventWorkExtractCreated,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkExtractCreated,
override val data: ExtractArgumentData? = null
) : Event() {
}

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class ExtractWorkPerformedEvent(
override val eventType: Events = Events.EventWorkExtractPerformed,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkExtractPerformed,
override val data: ExtractedData? = null,
val message: String? = null
) : Event() {

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class MediaCoverDownloadedEvent(
override val eventType: Events = Events.EventWorkDownloadCoverPerformed,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventWorkDownloadCoverPerformed,
override val data: DownloadedCover? = null
) : Event() {
}

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class MediaCoverInfoReceivedEvent(
override val eventType: Events = Events.EventMediaReadOutCover,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaReadOutCover,
override val data: CoverDetails? = null
) : Event() {
}

View File

@ -4,8 +4,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class MediaMetadataReceivedEvent(
override val eventType: Events = Events.EventMediaMetadataSearchPerformed,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaMetadataSearchPerformed,
override val data: pyMetadata? = null,
): Event() {
}

View File

@ -6,8 +6,8 @@ import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class MediaOutInformationConstructedEvent(
override val eventType: Events = Events.EventMediaReadOutNameAndType,
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaReadOutNameAndType,
override val data: MediaInfoReceived? = null
) : Event() {
}

View File

@ -0,0 +1,16 @@
package no.iktdev.mediaprocessing.shared.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
data class MediaProcessCompletedEvent(
override val metadata: EventMetadata,
override val data: CompletedEventData?,
override val eventType: Events = Events.EventMediaProcessCompleted
): Event()
data class CompletedEventData(
val eventIdsCollected: List<String>
)

View File

@ -0,0 +1,11 @@
package no.iktdev.mediaprocessing.shared.contract.data
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.mediaprocessing.shared.contract.Events
data class PermitWorkCreationEvent(
override val metadata: EventMetadata,
override val eventType: Events = Events.EventMediaWorkProceedPermitted,
override val data: String?
) : Event() {
}

View File

@ -1,7 +0,0 @@
package no.iktdev.mediaprocessing.shared.contract.dto
data class ConverterEventInfo(
val status: WorkStatus = WorkStatus.Pending,
val inputFile: String,
val outputFiles: List<String> = emptyList()
)

View File

@ -1,11 +0,0 @@
package no.iktdev.mediaprocessing.shared.contract.dto
import java.time.LocalDateTime
data class EventsDto(
val referenceId: String,
val eventId: String,
val event: String,
val data: String,
val created: LocalDateTime
)

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.shared.contract.dto.tasks
import java.io.Serializable
abstract class TaskData(): Serializable {
abstract val inputFile: String
}

View File

@ -4,7 +4,7 @@ data class MetadataDto(
val title: String,
val collection: String,
val type: String,
val cover: MetadataCoverDto?,
val cover: String?,
val summary: List<SummaryInfo> = emptyList(),
val genres: List<String>,
val titles: List<String> = emptyList()
@ -14,9 +14,3 @@ data class SummaryInfo(
val summary: String,
val language: String = "eng"
)
data class MetadataCoverDto(
val cover: String?, // ex Fancy.jpeg
val coverUrl: String?,
val coverFile: String?
)

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.shared.contract.reader
data class SubtitlesDto(
val collection: String,
val language: String,
val subtitleFile: String,
val format: String,
val associatedWithVideo: String
)

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.contract.reader
data class VideoDetails(
val serieInfo: SerieInfo? = null,
val type: String,
val fullName: String
val fileName: String
)
data class SerieInfo(

View File

@ -30,6 +30,7 @@ dependencies {
implementation ("mysql:mysql-connector-java:8.0.29")
implementation("org.apache.commons:commons-lang3:3.12.0")
implementation("com.google.code.gson:gson:2.8.9")
testImplementation("org.springframework.boot:spring-boot-starter-test:2.7.0")

View File

@ -0,0 +1,19 @@
package no.iktdev.eventi.core
import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.EventMetadata
class ConsumableEvent<T: EventImpl>(private var event: T) {
var isConsumed: Boolean = false
private set
fun consume(): T? {
return if (!isConsumed) {
isConsumed = true
event
} else null
}
fun metadata(): EventMetadata {
return event.metadata
}
}

View File

@ -0,0 +1,21 @@
package no.iktdev.eventi.core
import com.google.gson.*
import java.lang.reflect.Type
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
class LocalDateTimeAdapter : JsonSerializer<LocalDateTime>, JsonDeserializer<LocalDateTime> {
private val formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME
override fun serialize(
src: LocalDateTime, typeOfSrc: Type, context: JsonSerializationContext
): JsonElement {
return JsonPrimitive(src.format(formatter))
}
override fun deserialize(
json: JsonElement, typeOfT: Type, context: JsonDeserializationContext
): LocalDateTime {
return LocalDateTime.parse(json.asString, formatter)
}
}

View File

@ -0,0 +1,38 @@
package no.iktdev.eventi.core
import no.iktdev.eventi.data.EventImpl
class PersistentMessageHelper<T: EventImpl>(val messages: List<T>) {
fun findOrphanedEvents(): List<T> {
val withDerivedId = messages.filter { it.metadata.derivedFromEventId != null }
val idsFlat = messages.map { it.metadata.eventId }
return withDerivedId.filter { it.metadata.derivedFromEventId !in idsFlat }
}
fun getEventsRelatedTo(eventId: String): List<T> {
val triggered = messages.firstOrNull { it.metadata.eventId == eventId } ?: return emptyList()
val usableEvents = messages.filter { it.metadata.eventId != eventId && it.metadata.derivedFromEventId != null }
val derivedEventsMap = mutableMapOf<String, MutableList<String>>()
for (event in usableEvents) {
derivedEventsMap.getOrPut(event.metadata.derivedFromEventId!!) { mutableListOf() }.add(event.metadata.eventId)
}
val eventsToFind = mutableSetOf<String>()
// Utfør DFS for å finne alle avledede hendelser som skal slettes
dfs(triggered.metadata.eventId, derivedEventsMap, eventsToFind)
return messages.filter { it.metadata.eventId in eventsToFind }
}
/**
* @param eventId Initial eventId
*/
private fun dfs(eventId: String, derivedEventsMap: Map<String, List<String>>, eventsToFind: MutableSet<String>) {
eventsToFind.add(eventId)
derivedEventsMap[eventId]?.forEach { derivedEventId ->
dfs(derivedEventId, derivedEventsMap, eventsToFind)
}
}
}

View File

@ -0,0 +1,10 @@
package no.iktdev.eventi.core
import com.google.gson.GsonBuilder
import java.time.LocalDateTime
object WGson {
val gson = GsonBuilder()
.registerTypeAdapter(LocalDateTime::class.java, LocalDateTimeAdapter())
.create()
}

View File

@ -1,5 +1,7 @@
package no.iktdev.eventi.data
import com.google.gson.Gson
import no.iktdev.eventi.core.WGson
import java.time.LocalDateTime
import java.util.*
@ -13,11 +15,14 @@ fun <T> EventImpl.dataAs(): T? {
return this.data as T
}
fun EventImpl.toJson(): String {
return WGson.gson.toJson(this)
}
data class EventMetadata(
val referenceId: String,
val eventId: String = UUID.randomUUID().toString(),
val derivedFromEventId: String? = null, // Can be null but should not, unless its init event
val eventId: String = UUID.randomUUID().toString(),
val referenceId: String,
val status: EventStatus,
val created: LocalDateTime = LocalDateTime.now()
)
@ -39,3 +44,15 @@ fun EventImpl.isSkipped(): Boolean {
fun EventImpl.isFailed(): Boolean {
return this.metadata.status == EventStatus.Failed
}
fun EventImpl.referenceId(): String {
return this.metadata.referenceId
}
fun EventImpl.eventId(): String {
return this.metadata.eventId
}
fun EventImpl.derivedFromEventId(): String? {
return this.metadata.derivedFromEventId
}

View File

@ -2,14 +2,18 @@ package no.iktdev.eventi.implementations
import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.EventImpl
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Service
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
abstract var applicationContext: ApplicationContext
abstract var eventManager: E
val pullDelay: AtomicLong = AtomicLong(5000)
//private val listeners: MutableList<EventListener<T>> = mutableListOf()
@ -30,23 +34,27 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
var taskMode: ActiveMode = ActiveMode.Active
private fun onEventsReceived(list: List<T>) = runBlocking {
private var newEventProduced: Boolean = false
private fun onEventsReceived(events: List<T>) = runBlocking {
val listeners = getListeners()
list.groupBy { it.metadata.referenceId }.forEach { (referenceId, events) ->
launch {
events.forEach { event ->
listeners.forEach { listener ->
if (listener.shouldIProcessAndHandleEvent(event, events))
listener.onEventsReceived(event, events)
}
if (listener.shouldIProcessAndHandleEvent(event, events)) {
val consumableEvent = ConsumableEvent(event)
listener.onEventsReceived(consumableEvent, events)
if (consumableEvent.isConsumed) {
log.info { "Consumption detected for ${listener::class.java.simpleName} on event ${event.eventType}" }
return@launch
}
}
}
}
}
}
private var newItemReceived: Boolean = false
private var newEventsProducedOnReferenceId: AtomicReference<List<String>> = AtomicReference(emptyList())
private fun pullForEvents() {
coroutine.launch {
while (taskMode == ActiveMode.Active) {
@ -54,16 +62,18 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
if (events == null) {
log.warn { "EventManager is not loaded!" }
} else {
onEventsReceived(events)
events.forEach { group ->
onEventsReceived(group)
}
waitForConditionOrTimeout(5000) { newItemReceived }.also {
newItemReceived = false
}
waitForConditionOrTimeout(pullDelay.get()) { newEventProduced }.also {
newEventProduced = false
}
}
}
}
private var cachedListeners: List<String> = emptyList()
fun getListeners(): List<EventListenerImpl<T, *>> {
val serviceBeans: Map<String, Any> = applicationContext.getBeansWithAnnotation(Service::class.java)
@ -71,7 +81,15 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
.filter { bean: Any? -> bean is EventListenerImpl<*, *> }
.map { it -> it as EventListenerImpl<*, *> }
.toList()
return beans as List<EventListenerImpl<T, *>>
val eventListeners = beans as List<EventListenerImpl<T, *>>
val listenerNames = eventListeners.map { it::class.java.name }
if (listenerNames != cachedListeners) {
listenerNames.filter { it !in cachedListeners }.forEach {
log.info { "Registered new listener $it" }
}
}
cachedListeners = listenerNames
return eventListeners
}
@ -79,9 +97,12 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
* @return true if its stored
*/
fun produceNewEvent(event: T): Boolean {
val isStored = eventManager?.storeEvent(event) ?: false
val isStored = eventManager.storeEvent(event)
if (isStored) {
newItemReceived = true
log.info { "Stored event: ${event.eventType}" }
newEventProduced = true
} else {
log.error { "Failed to store event: ${event.eventType}" }
}
return isStored
}
@ -89,6 +110,7 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
suspend fun waitForConditionOrTimeout(timeout: Long, condition: () -> Boolean) {
val startTime = System.currentTimeMillis()
try {
withTimeout(timeout) {
while (!condition()) {
delay(100)
@ -97,6 +119,11 @@ abstract class EventCoordinator<T: EventImpl, E: EventsManagerImpl<T>> {
}
}
}
} catch (e: TimeoutCancellationException) {
// Do nothing
} catch (e: Exception) {
e.printStackTrace()
}
}
}

View File

@ -1,9 +1,7 @@
package no.iktdev.eventi.implementations
import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.isSuccessful
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.*
abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
abstract val coordinator: EventCoordinator<T, E>?
@ -21,21 +19,40 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
return listensForEvents.any { it == event.eventType }
}
open fun <T: EventImpl> isPrerequisitesFulfilled(incomingEvent: T, events: List<T>): Boolean {
open fun isPrerequisitesFulfilled(incomingEvent: T, events: List<T>): Boolean {
return true
}
open fun shouldIHandleFailedEvents(incomingEvent: T): Boolean {
return false
}
open fun haveProducedExpectedMessageBasedOnEvent(incomingEvent: T, events: List<T>): Boolean {
val eventsProducedByListener = events.filter { it.eventType == produceEvent }
val triggeredBy = events.filter { it.eventType in listensForEvents }
return eventsProducedByListener.any { it.derivedFromEventId() in triggeredBy.map { t -> t.eventId() } }
}
open fun shouldIProcessAndHandleEvent(incomingEvent: T, events: List<T>): Boolean {
if (!isOfEventsIListenFor(incomingEvent))
return false
if (!isPrerequisitesFulfilled(incomingEvent, events)) {
return false
}
if (!incomingEvent.isSuccessful()) {
if (!incomingEvent.isSuccessful() && !shouldIHandleFailedEvents(incomingEvent)) {
return false
}
val isDerived = events.any { it.metadata.derivedFromEventId == incomingEvent.metadata.eventId } // && incomingEvent.eventType == produceEvent
return !isDerived
val childOf = events.filter { it.derivedFromEventId() == incomingEvent.eventId() }
val haveListenerProduced = childOf.any { it.eventType == produceEvent }
if (haveListenerProduced)
return false
if (haveProducedExpectedMessageBasedOnEvent(incomingEvent, events))
return false
//val isDerived = events.any { it.metadata.derivedFromEventId == incomingEvent.metadata.eventId } // && incomingEvent.eventType == produceEvent
return true
}
/**
@ -43,7 +60,7 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
* @param events Will be all available events for collection with the same reference id
* @return boolean if read or not
*/
abstract fun onEventsReceived(incomingEvent: T, events: List<T>)
abstract fun onEventsReceived(incomingEvent: ConsumableEvent<T>, events: List<T>)
fun T.makeDerivedEventInfo(status: EventStatus): EventMetadata {
return EventMetadata(

View File

@ -7,9 +7,11 @@ import no.iktdev.mediaprocessing.shared.common.datasource.DataSource
* Interacts with the database, needs to be within the Coordinator
*/
abstract class EventsManagerImpl<T: EventImpl>(val dataSource: DataSource) {
abstract fun readAvailableEvents(): List<T>
abstract fun readAvailableEvents(): List<List<T>>
abstract fun readAvailableEventsFor(referenceId: String): List<T>
abstract fun getAllEvents(): List<List<T>>
abstract fun getEventsWith(referenceId: String): List<T>
abstract fun storeEvent(event: T): Boolean
}

View File

@ -7,14 +7,22 @@ import org.springframework.stereotype.Component
@Component
class MockEventManager(dataSource: MockDataSource = MockDataSource()) : EventsManagerImpl<EventImpl>(dataSource) {
val events: MutableList<EventImpl> = mutableListOf()
override fun readAvailableEvents(): List<EventImpl> {
return events.toList()
override fun readAvailableEvents(): List<List<EventImpl>> {
return listOf(events)
}
override fun readAvailableEventsFor(referenceId: String): List<EventImpl> {
return events.filter { it.metadata.referenceId == referenceId }
}
override fun getAllEvents(): List<List<EventImpl>> {
return listOf(events)
}
override fun getEventsWith(referenceId: String): List<EventImpl> {
return events
}
override fun storeEvent(event: EventImpl): Boolean {
return events.add(event)
}

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.mock.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.mock.MockDataEventListener
@ -28,8 +29,8 @@ class FirstEventListener() : MockDataEventListener() {
super.onProduceEvent(event)
}
override fun onEventsReceived(incomingEvent: EventImpl, events: List<EventImpl>) {
val info = incomingEvent.makeDerivedEventInfo(EventStatus.Success)
override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
val info = incomingEvent.consume()!!.makeDerivedEventInfo(EventStatus.Success)
onProduceEvent(FirstEvent(
eventType = produceEvent,
metadata = info,
@ -37,5 +38,6 @@ class FirstEventListener() : MockDataEventListener() {
))
}
}

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.mock.listeners
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.eventi.implementations.EventListenerImpl
import no.iktdev.eventi.data.EventImpl
@ -30,7 +31,7 @@ class ForthEventListener() : MockDataEventListener() {
super.onProduceEvent(event)
}
override fun onEventsReceived(incomingEvent: EventImpl, events: List<EventImpl>) {
override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
if (!shouldIProcessAndHandleEvent(incomingEvent, events))
return
val info = incomingEvent.makeDerivedEventInfo(EventStatus.Success)