Adjustment

This commit is contained in:
bskjon 2024-07-23 01:48:54 +02:00
parent bcc65e7da6
commit d08bac2c41
17 changed files with 124 additions and 25 deletions

View File

@ -19,6 +19,8 @@ import org.springframework.context.annotation.Bean
val log = KotlinLogging.logger {}
private lateinit var eventDatabase: EventsDatabase
private lateinit var eventsManager: EventsManager
lateinit var runnerManager: RunnerManager
@SpringBootApplication
class CoordinatorApplication {
@ -85,6 +87,10 @@ fun main(args: Array<String>) {
titles
)
storeDatabase.createTables(*tables)
runnerManager = RunnerManager(dataSource = eventDatabase.database, name = CoordinatorApplication::class.java.simpleName)
runnerManager.assignRunner()
runApplication<CoordinatorApplication>(*args)
log.info { "App Version: ${getAppVersion()}" }
}

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.data.eventId
import no.iktdev.eventi.implementations.ActiveMode
import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.ProcessType
@ -82,4 +83,12 @@ class Coordinator(
data = message
))
}
override fun getActiveTaskMode(): ActiveMode {
if (runnerManager.iAmSuperseded()) {
// This will let the application complete but not consume new
taskMode = ActiveMode.Passive
}
return taskMode
}
}

View File

@ -36,6 +36,7 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val message = try {
readFileInfo(event.data as StartEventData, event.metadata.eventId)?.let {
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it)
@ -45,6 +46,7 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()))
}
onProduceEvent(message)
active = false
}

View File

@ -19,9 +19,13 @@ import no.iktdev.streamit.library.db.query.CatalogQuery
import no.iktdev.streamit.library.db.query.GenreQuery
import no.iktdev.streamit.library.db.query.SubtitleQuery
import no.iktdev.streamit.library.db.query.SummaryQuery
import no.iktdev.streamit.library.db.tables.catalog
import no.iktdev.streamit.library.db.tables.titles
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insertIgnore
import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.update
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@ -323,14 +327,34 @@ class CompletedTaskListener: CoordinatorEventListener() {
}
fun storeMetadata(catalogId: Int, metadata: MetadataDto) {
metadata.summary.forEach {
if (!metadata.cover.isNullOrBlank()) {
withTransaction(getStoreDatabase()) {
val storedCatalogCover = catalog.select {
(catalog.id eq catalogId)
}.map { it[catalog.cover] }.firstOrNull()
if (storedCatalogCover.isNullOrBlank()) {
catalog.update({
catalog.id eq catalogId
}) {
it[catalog.cover] = metadata.cover
}
}
}
}
metadata.summary.forEach {
val result = executeOrException(getStoreDatabase().database) {
SummaryQuery(
cid = catalogId,
language = it.language,
description = it.summary
).insert()
}
val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
if (!ignoreException) {
result?.printStackTrace()
}
}
}
@ -371,11 +395,9 @@ class CompletedTaskListener: CoordinatorEventListener() {
else -> throw RuntimeException("${videoDetails.type} is not supported!")
}
val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
return if (result == null || ignoreException ) {
return withTransaction(getStoreDatabase()) {
precreatedCatalogQuery.getId()
}
} else null
}
override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List<Event>): Boolean {
@ -388,6 +410,7 @@ class CompletedTaskListener: CoordinatorEventListener() {
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume() ?: return
active = true
val metadata = getMetadata(events)
val genres = getGenres(events)
@ -418,6 +441,7 @@ class CompletedTaskListener: CoordinatorEventListener() {
events.map { it.eventId() }
)
))
active = false
}
}

View File

@ -57,7 +57,9 @@ class ConvertWorkTaskListener: WorkTaskListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
if (!canStart(event, events)) {
active = false
return
}
@ -104,5 +106,6 @@ class ConvertWorkTaskListener: WorkTaskListener() {
)
}
}
active = false
}
}

View File

@ -35,7 +35,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val failedEventDefault = MediaCoverDownloadedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
@ -45,6 +45,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
if (data == null) {
log.error { "No valid data for use to obtain cover" }
onProduceEvent(failedEventDefault)
active = false
return
}
@ -84,6 +85,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
} else {
if (!result.exists() || !result.canRead()) {
onProduceEvent(failedEventDefault)
active = false
return
}
onProduceEvent(MediaCoverDownloadedEvent(
@ -91,6 +93,6 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
data = DownloadedCover(result.absolutePath)
))
}
active = false
}
}

View File

@ -50,11 +50,12 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az<BaseInfoEvent>()?.data
if (baseInfo == null) {
log.info { "No base info" }
active = false
return
}
@ -64,6 +65,7 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
val mediaOutInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()?.data
if (mediaOutInfo == null) {
log.info { "No Media out info" }
active = false
return
}
val videoInfo = mediaOutInfo.toValueObject()
@ -89,6 +91,6 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
)
}
onProduceEvent(result)
active = false
}
}

View File

@ -49,23 +49,38 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>() ?: return
active = true
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>()
if (started == null) {
active = false
return
}
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.ENCODE) == false) {
active = false
return
}
val streams = events.find { it.eventType == Events.EventMediaParseStreamPerformed }?.az<MediaFileStreamsParsedEvent>()?.data
if (streams == null) {
active = false
return
}
val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()
if (mediaInfo?.data == null) {
active = false
return
}
val mediaInfoData = mediaInfo.data?.toValueObject()
if (mediaInfoData == null) {
active = false
return
}
val mediaInfoData = mediaInfo.data?.toValueObject() ?: return
val inputFile = started.data?.file ?: return
val inputFile = started.data?.file
if (inputFile == null) {
active = false
return
}
val mapper = EncodeWorkArgumentsMapping(
inputFile = inputFile,
outFileFullName = mediaInfoData.fullName,
@ -85,7 +100,6 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
data = result
))
}
active = false
}
}

View File

@ -43,8 +43,9 @@ class EncodeWorkTaskListener : WorkTaskListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
if (!canStart(event, events)) {
active = false
return
}
@ -56,6 +57,7 @@ class EncodeWorkTaskListener : WorkTaskListener() {
}
if (encodeArguments == null) {
log.error { "No Encode arguments found.. referenceId: ${event.referenceId()}" }
active = false
return
}
EncodeWorkCreatedEvent(
@ -72,6 +74,6 @@ class EncodeWorkTaskListener : WorkTaskListener() {
inputFile = event.data!!.inputFile
)
}
active = false
}
}

View File

@ -45,22 +45,34 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az<MediaProcessStartEvent>() ?: return
if (started.data == null || started.data?.operations?.contains(StartOperationEvents.EXTRACT) == false) {
active = false
return
}
val streams = events.find { it.eventType == Events.EventMediaParseStreamPerformed }?.az<MediaFileStreamsParsedEvent>()?.data
if (streams == null) {
active = false
return
}
val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az<MediaOutInformationConstructedEvent>()
if (mediaInfo?.data == null) {
active = false
return
}
val mediaInfoData = mediaInfo.data?.toValueObject()
if (mediaInfoData == null) {
active = false
return
}
val mediaInfoData = mediaInfo.data?.toValueObject() ?: return
val inputFile = started.data?.file ?: return
val inputFile = started.data?.file
if (inputFile == null) {
active = false
return
}
val mapper = ExtractWorkArgumentsMapping(
inputFile = inputFile,
@ -80,6 +92,6 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
data = result
))
}
active = false
}
}

View File

@ -47,10 +47,14 @@ class ExtractWorkTaskListener: WorkTaskListener() {
val event = incomingEvent.consume()
if (event == null) {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
active = false
return
}
active = true
if (!canStart(event, events)) {
active = false
return
}
@ -62,12 +66,11 @@ class ExtractWorkTaskListener: WorkTaskListener() {
}
if (arguments == null) {
log.error { "No Extract arguments found.. referenceId: ${event.referenceId()}" }
active = false
return
}
if (arguments.isEmpty()) {
ExtractWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
active = false
return
}
@ -87,5 +90,6 @@ class ExtractWorkTaskListener: WorkTaskListener() {
inputFile = event.data!!.inputFile
)
}
active = false
}
}

View File

@ -49,6 +49,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val metadataResult = event.az<MediaMetadataReceivedEvent>()
val mediaBaseInfo = events.findLast { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az<BaseInfoEvent>()?.data
@ -59,6 +60,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
)
active = false
return
}
val pm = ProcessMediaInfoAndMetadata(mediaBaseInfo, metadataResult?.data)
@ -78,6 +80,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
)
}
onProduceEvent(result)
active = false
}
class ProcessMediaInfoAndMetadata(val baseInfo: BaseInfo, val metadata: pyMetadata? = null) {

View File

@ -114,6 +114,7 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
)
waitingProcessesForMeta.remove(it.key)
}
active = expired.isNotEmpty()
}
data class MetadataTriggerData(val eventId: String, val executed: LocalDateTime)

View File

@ -50,6 +50,8 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val readData = event.dataAs<JsonObject>()
val result = try {
@ -64,6 +66,7 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
)
}
onProduceEvent(result)
active = false
}

View File

@ -52,10 +52,12 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
log.error { "Event is null and should not be available! ${WGson.gson.toJson(incomingEvent.metadata())}" }
return
}
active = true
val startEvent = event.dataAs<StartEventData>()
if (startEvent == null || !startEvent.operations.any { it in requiredOperations }) {
log.info { "${event.metadata.referenceId} does not contain a operation in ${requiredOperations.joinToString(",") { it.name }}" }
active = false
return
}
val result = runBlocking {
@ -73,6 +75,7 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
}
}
onProduceEvent(result)
active = false
}

View File

@ -43,6 +43,9 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
private var newEventProduced: Boolean = false
abstract fun getActiveTaskMode(): ActiveMode
private fun onEventGroupsReceived(eventGroup: List<List<T>>) {
val egRefIds = eventGroup.map { it.first().referenceId() }
@ -108,7 +111,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
pullDelay.set(fastPullDelay.get())
} else {
if (pullDelay.get() != slowPullDelay.get()) {
log.info { "None events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" }
log.info { "No events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" }
}
pullDelay.set(slowPullDelay.get())
}
@ -118,6 +121,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
}
newEventProduced = false
}
taskMode = getActiveTaskMode()
}
}

View File

@ -10,6 +10,11 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
abstract val produceEvent: Any
abstract val listensForEvents: List<Any>
protected var active: Boolean = false
open fun isActive(): Boolean {
return active
}
open fun onReady() {
}