This commit is contained in:
Brage 2024-01-15 01:42:55 +01:00
parent 6e1cc17235
commit 0283739e19
19 changed files with 72 additions and 32 deletions

View File

@ -1,6 +1,5 @@
package no.iktdev.mediaprocessing.coordinator
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl
@ -9,7 +8,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
abstract class TaskCreator(coordinator: Coordinator):
TaskCreatorImpl<Coordinator, PersistentMessage, PersistentEventBasedMessageListener>(coordinator) {
val log = KotlinLogging.logger {}
override fun isPrerequisiteEventsOk(events: List<PersistentMessage>): Boolean {

View File

@ -26,7 +26,7 @@ class PersistentEventBasedMessageListener: EventBasedMessageListener<PersistentM
}
override fun waitingListeners(events: List<PersistentMessage>): List<Tasks<PersistentMessage>> {
val nonCreators = listeners.filter { !events.filter { event -> !event.data.isSuccess() }.map { e -> e.event }.contains(it.producesEvent) }
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}

View File

@ -1,8 +1,10 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccess
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
@ -16,6 +18,7 @@ import java.io.File
@Service
class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED
@ -31,7 +34,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
val selected = events.filter { it.event == KafkaEvents.EVENT_PROCESS_STARTED }.lastOrSuccess() ?: return null
val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_PROCESS_STARTED) ?: return null
return readFileInfo(selected.data as ProcessStarted)
}

View File

@ -1,11 +1,13 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
@ -24,6 +26,9 @@ import java.sql.SQLIntegrityConstraintViolationException
@Service
class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
override val requiredEvents: List<KafkaEvents> = listOf(
@ -35,8 +40,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.find { it.event == EVENT_PROCESS_STARTED } ?: return null
val completed = events.find { it.event == EVENT_PROCESS_COMPLETED } ?: return null
val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null
val completed = events.lastOrSuccessOf(EVENT_PROCESS_COMPLETED) ?: return null
if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) {
return null
}

View File

@ -1,8 +1,10 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
@ -15,6 +17,8 @@ import org.springframework.stereotype.Service
@Service
class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED
override val requiredEvents: List<KafkaEvents> = listOf(
@ -27,18 +31,19 @@ class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreat
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED } ?: return null
val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null
if (!started.data.isSuccess()) {
return null
}
val receivedEvents = events.map { it.event }
// TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event.
val requiresOneOf = listOf(
EVENT_MEDIA_EXTRACT_PARAMETER_CREATED,
EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED,
EVENT_WORK_CONVERT_CREATED
EVENT_WORK_CONVERT_PERFORMED,
EVENT_WORK_EXTRACT_PERFORMED,
EVENT_WORK_ENCODE_PERFORMED
)
if (!requiresOneOf.any { it in receivedEvents }) {

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.DownloadClient
@ -19,6 +20,8 @@ import java.util.*
@Service
class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
@ -15,6 +16,8 @@ import org.springframework.stereotype.Service
@Service
class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER

View File

@ -1,10 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
@ -28,6 +30,8 @@ import java.time.LocalDateTime
@Service
@EnableScheduling
class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
@ -41,15 +45,15 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed?
val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed?
// Only Return here as both baseInfo events are required to continue
if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) {
return null
}
if (baseInfo.isSuccess() && meta == null) {
log.info { "Sending ${baseInfo?.title} to waiting queue" }
log.info { "Sending ${baseInfo.title} to waiting queue" }
if (!waitingProcessesForMeta.containsKey(event.referenceId)) {
waitingProcessesForMeta[event.referenceId] = LocalDateTime.now()
}

View File

@ -1,8 +1,11 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.lastOrSuccess
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
@ -19,6 +22,8 @@ import org.springframework.stereotype.Service
@Service
class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED
@ -35,8 +40,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) :
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }
val desiredEvent = events.find { it.data is ReaderPerformed } ?: return null
val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null
return parseStreams(desiredEvent.data as ReaderPerformed)
}

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import com.google.gson.Gson
import com.google.gson.JsonObject
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig
@ -20,6 +21,8 @@ import java.io.File
@Service
class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED
@ -35,11 +38,6 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T
}
}
override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> {
return listOf {
isEventOfSingle(event, KafkaEvents.EVENT_PROCESS_STARTED)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
log.info { "${this.javaClass.simpleName} triggered by ${event.event}" }

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
@ -17,6 +18,8 @@ import java.io.File
@Service
class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val preference = Preference.getPreference()
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg
import mu.KotlinLogging
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator
@ -20,6 +21,7 @@ import java.io.File
@Service
class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) {
val log = KotlinLogging.logger {}
val preference = Preference.getPreference()

View File

@ -33,7 +33,7 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
class ProducerDataValueSchema:
def __init__(self, referenceId, data):
def __init__(self, referenceId, data: DataResult):
self.referenceId = referenceId
self.data = data
@ -173,7 +173,7 @@ class MessageHandlerThread(threading.Thread):
def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema:
return ProducerDataValueSchema(
referenceId=referenceId,
data=result.data
data=result
)

View File

@ -29,12 +29,12 @@ class metadata():
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult("COMPLETED", None, None)
return DataResult(status="COMPLETED", message= None, data= None)
return DataResult("COMPLETED", None, meta)
return DataResult(status="COMPLETED", message= None, data=meta)
except IndexError as ingore:
return DataResult(statusType="COMPLETED", message=f"No result for {self.name}")
return DataResult(status="COMPLETED", message=f"No result for {self.name}")
except Exception as e:
return DataResult(statusType="ERROR", message=str(e))
return DataResult(status="ERROR", message=str(e))

View File

@ -31,8 +31,8 @@ class metadata():
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult("COMPLETED", None, None)
return DataResult(status="COMPLETED", message= None, data= None)
return DataResult("COMPLETED", None, meta)
return DataResult(status="COMPLETED", message= None, data= meta)
except Exception as e:
return DataResult(status="ERROR", data=None, message=str(e))

View File

@ -29,8 +29,8 @@ class metadata():
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
return DataResult("COMPLETED", None, None)
return DataResult(status="COMPLETED", message = None, data = None)
return DataResult("COMPLETED", None, meta)
return DataResult(status = "COMPLETED", message = None, data = meta)
except Exception as e:
return DataResult(status="ERROR", message=str(e))

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.delay
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import java.io.File
import java.io.RandomAccessFile
@ -29,6 +30,16 @@ fun List<PersistentMessage>.lastOrSuccess(): PersistentMessage? {
return this.lastOrNull { it.data.isSuccess() } ?: this.lastOrNull()
}
fun List<PersistentMessage>.lastOrSuccessOf(event: KafkaEvents): PersistentMessage? {
val validEvents = this.filter { it.event == event }
return validEvents.lastOrNull { it.data.isSuccess() } ?: validEvents.lastOrNull()
}
fun List<PersistentMessage>.lastOrSuccessOf(event: KafkaEvents, predicate: (PersistentMessage) -> Boolean): PersistentMessage? {
val validEvents = this.filter { it.event == event && predicate(it) }
return validEvents.lastOrNull()
}
suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) {
var elapsedDelay = 0L

View File

@ -15,7 +15,6 @@ data class PersistentMessage(
val created: LocalDateTime
)
fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean {
return this.event == event
}

View File

@ -6,6 +6,8 @@ open class MessageDataWrapper(
@Transient open val message: String? = null
)
data class SimpleMessageData(
override val status: Status,
override val message: String? = null