diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt index 3280d46d..48779783 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt @@ -1,6 +1,5 @@ package no.iktdev.mediaprocessing.coordinator -import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.coordination.PersistentEventBasedMessageListener import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.tasks.TaskCreatorImpl @@ -9,7 +8,6 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess abstract class TaskCreator(coordinator: Coordinator): TaskCreatorImpl(coordinator) { - val log = KotlinLogging.logger {} override fun isPrerequisiteEventsOk(events: List): Boolean { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt index 0b6d3768..6ec5b75c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/PersistentEventBasedMessageListener.kt @@ -26,7 +26,7 @@ class PersistentEventBasedMessageListener: EventBasedMessageListener): List> { - val nonCreators = listeners.filter { !events.filter { event -> !event.data.isSuccess() }.map { e -> e.event }.contains(it.producesEvent) } + val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) } return nonCreators } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index 638a5d2e..da2f2c8c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -1,8 +1,10 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.lastOrSuccess +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents @@ -16,6 +18,7 @@ import java.io.File @Service class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED @@ -31,7 +34,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - val selected = events.filter { it.event == KafkaEvents.EVENT_PROCESS_STARTED }.lastOrSuccess() ?: return null + val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_PROCESS_STARTED) ?: return null return readFileInfo(selected.data as ProcessStarted) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt index c8980c01..5b3adf4d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt @@ -1,11 +1,13 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails @@ -24,6 +26,9 @@ import java.sql.SQLIntegrityConstraintViolationException @Service class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + + override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE override val requiredEvents: List = listOf( @@ -35,8 +40,8 @@ class CollectAndStoreTask(@Autowired override var coordinator: Coordinator) : Ta override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.find { it.event == EVENT_PROCESS_STARTED } ?: return null - val completed = events.find { it.event == EVENT_PROCESS_COMPLETED } ?: return null + val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null + val completed = events.lastOrSuccessOf(EVENT_PROCESS_COMPLETED) ?: return null if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) { return null } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt index f7bda0eb..e5ace5be 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt @@ -1,8 +1,10 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.* @@ -15,6 +17,8 @@ import org.springframework.stereotype.Service @Service class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED override val requiredEvents: List = listOf( @@ -27,18 +31,19 @@ class CompleteTask(@Autowired override var coordinator: Coordinator) : TaskCreat override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - val started = events.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED } ?: return null + val started = events.lastOrSuccessOf(EVENT_PROCESS_STARTED) ?: return null if (!started.data.isSuccess()) { return null } val receivedEvents = events.map { it.event } + // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. + val requiresOneOf = listOf( - EVENT_MEDIA_EXTRACT_PARAMETER_CREATED, - EVENT_MEDIA_ENCODE_PARAMETER_CREATED, - EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED, - EVENT_WORK_CONVERT_CREATED + EVENT_WORK_CONVERT_PERFORMED, + EVENT_WORK_EXTRACT_PERFORMED, + EVENT_WORK_ENCODE_PERFORMED ) if (!requiresOneOf.any { it in receivedEvents }) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt index daee2cc3..36c7ee63 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import kotlinx.coroutines.runBlocking +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.DownloadClient @@ -19,6 +20,8 @@ import java.util.* @Service class DownloadAndStoreCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}" override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index 87cdb593..a68469ce 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage @@ -15,6 +16,8 @@ import org.springframework.stereotype.Service @Service class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_OUT_COVER diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index c7212b84..8a4682d8 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -1,10 +1,12 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event +import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv @@ -28,6 +30,8 @@ import java.time.LocalDateTime @Service @EnableScheduling class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE @@ -41,15 +45,15 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? - val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? + val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed + val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed? // Only Return here as both baseInfo events are required to continue if (!baseInfo.isSuccess() || !baseInfo.hasValidData() || events.any { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }) { return null } if (baseInfo.isSuccess() && meta == null) { - log.info { "Sending ${baseInfo?.title} to waiting queue" } + log.info { "Sending ${baseInfo.title} to waiting queue" } if (!waitingProcessesForMeta.containsKey(event.referenceId)) { waitingProcessesForMeta[event.referenceId] = LocalDateTime.now() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index 269364a2..01776b19 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -1,8 +1,11 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.shared.common.lastOrSuccess +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams @@ -19,6 +22,8 @@ import org.springframework.stereotype.Service @Service class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED @@ -35,8 +40,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } - val desiredEvent = events.find { it.data is ReaderPerformed } ?: return null - + val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null return parseStreams(desiredEvent.data as ReaderPerformed) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index cf2caf4c..63109551 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import com.google.gson.Gson import com.google.gson.JsonObject import kotlinx.coroutines.runBlocking +import mu.KotlinLogging import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig @@ -20,6 +21,8 @@ import java.io.File @Service class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED @@ -35,11 +38,6 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T } } - override fun prerequisiteRequired(event: PersistentMessage): List<() -> Boolean> { - return listOf { - isEventOfSingle(event, KafkaEvents.EVENT_PROCESS_STARTED) - } - } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { log.info { "${this.javaClass.simpleName} triggered by ${event.event}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index ac9dae71..c247af01 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg +import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator @@ -17,6 +18,8 @@ import java.io.File @Service class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + val preference = Preference.getPreference() override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index 7d17ab52..45f96225 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event.ffmpeg +import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.TaskCreator @@ -20,6 +21,7 @@ import java.io.File @Service class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} val preference = Preference.getPreference() diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py index c4354b7b..5d7860b2 100644 --- a/apps/pyMetadata/app.py +++ b/apps/pyMetadata/app.py @@ -33,7 +33,7 @@ logging.basicConfig( logger = logging.getLogger(__name__) class ProducerDataValueSchema: - def __init__(self, referenceId, data): + def __init__(self, referenceId, data: DataResult): self.referenceId = referenceId self.data = data @@ -173,7 +173,7 @@ class MessageHandlerThread(threading.Thread): def compose_message(self, referenceId: str, result: DataResult) -> ProducerDataValueSchema: return ProducerDataValueSchema( referenceId=referenceId, - data=result.data + data=result ) diff --git a/apps/pyMetadata/sources/anii.py b/apps/pyMetadata/sources/anii.py index 86b7f774..03628193 100644 --- a/apps/pyMetadata/sources/anii.py +++ b/apps/pyMetadata/sources/anii.py @@ -29,12 +29,12 @@ class metadata(): usedTitle=self.name ) if (meta.title is None) or (meta.type is None): - return DataResult("COMPLETED", None, None) + return DataResult(status="COMPLETED", message= None, data= None) - return DataResult("COMPLETED", None, meta) + return DataResult(status="COMPLETED", message= None, data=meta) except IndexError as ingore: - return DataResult(statusType="COMPLETED", message=f"No result for {self.name}") + return DataResult(status="COMPLETED", message=f"No result for {self.name}") except Exception as e: - return DataResult(statusType="ERROR", message=str(e)) + return DataResult(status="ERROR", message=str(e)) \ No newline at end of file diff --git a/apps/pyMetadata/sources/imdb.py b/apps/pyMetadata/sources/imdb.py index 8c751d26..6afa6219 100644 --- a/apps/pyMetadata/sources/imdb.py +++ b/apps/pyMetadata/sources/imdb.py @@ -31,8 +31,8 @@ class metadata(): usedTitle=self.name ) if (meta.title is None) or (meta.type is None): - return DataResult("COMPLETED", None, None) + return DataResult(status="COMPLETED", message= None, data= None) - return DataResult("COMPLETED", None, meta) + return DataResult(status="COMPLETED", message= None, data= meta) except Exception as e: return DataResult(status="ERROR", data=None, message=str(e)) \ No newline at end of file diff --git a/apps/pyMetadata/sources/mal.py b/apps/pyMetadata/sources/mal.py index 6f405127..4593e385 100644 --- a/apps/pyMetadata/sources/mal.py +++ b/apps/pyMetadata/sources/mal.py @@ -29,8 +29,8 @@ class metadata(): usedTitle=self.name ) if (meta.title is None) or (meta.type is None): - return DataResult("COMPLETED", None, None) + return DataResult(status="COMPLETED", message = None, data = None) - return DataResult("COMPLETED", None, meta) + return DataResult(status = "COMPLETED", message = None, data = meta) except Exception as e: return DataResult(status="ERROR", message=str(e)) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index e515ca90..0e66bad0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -4,6 +4,7 @@ import kotlinx.coroutines.delay import mu.KotlinLogging import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import java.io.File import java.io.RandomAccessFile @@ -29,6 +30,16 @@ fun List.lastOrSuccess(): PersistentMessage? { return this.lastOrNull { it.data.isSuccess() } ?: this.lastOrNull() } +fun List.lastOrSuccessOf(event: KafkaEvents): PersistentMessage? { + val validEvents = this.filter { it.event == event } + return validEvents.lastOrNull { it.data.isSuccess() } ?: validEvents.lastOrNull() +} + +fun List.lastOrSuccessOf(event: KafkaEvents, predicate: (PersistentMessage) -> Boolean): PersistentMessage? { + val validEvents = this.filter { it.event == event && predicate(it) } + return validEvents.lastOrNull() +} + suspend fun limitedWhile(condition: () -> Boolean, maxDuration: Long = 500 * 60, delayed: Long = 500, block: () -> Unit) { var elapsedDelay = 0L diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 50c42ed8..71b98613 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -15,7 +15,6 @@ data class PersistentMessage( val created: LocalDateTime ) - fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { return this.event == event } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt index fe17395f..619bceb4 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt @@ -6,6 +6,8 @@ open class MessageDataWrapper( @Transient open val message: String? = null ) + + data class SimpleMessageData( override val status: Status, override val message: String? = null