From fc5bb6a71c31cba1ea71f2c2bb53101f18ee8fba Mon Sep 17 00:00:00 2001 From: Brage Date: Mon, 25 Mar 2024 16:38:24 +0100 Subject: [PATCH] Removed class from log --- .../tasks/event/BaseInfoFromFile.kt | 2 +- .../tasks/event/CompleteRequestTask.kt | 62 +++++++++++++++++++ .../event/MetadataAndBaseInfoToCoverTask.kt | 2 +- .../event/MetadataAndBaseInfoToFileOut.kt | 17 +++-- .../tasks/event/ParseVideoFileStreams.kt | 2 +- .../tasks/event/ReadVideoFileStreams.kt | 2 +- .../event/ffmpeg/EncodeArgumentCreatorTask.kt | 2 +- .../ffmpeg/ExtractArgumentCreatorTask.kt | 4 +- 8 files changed, 82 insertions(+), 11 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt index d2332be4..a5c3548c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt @@ -33,7 +33,7 @@ class BaseInfoFromFile(@Autowired override var coordinator: Coordinator) : TaskC } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val selected = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_PROCESS_STARTED) ?: return null return readFileInfo(selected.data as MediaProcessStarted) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt new file mode 100644 index 00000000..1551ab73 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteRequestTask.kt @@ -0,0 +1,62 @@ +package no.iktdev.mediaprocessing.coordinator.tasks.event + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.TaskCreator +import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping +import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.* +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.Status +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service + +@Service +class CompleteRequestTask(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { + val log = KotlinLogging.logger {} + + override val producesEvent: KafkaEvents = KafkaEvents.EVENT_MEDIA_PROCESS_COMPLETED + + override val requiredEvents: List = listOf( + EVENT_REQUEST_PROCESS_STARTED, + ) + override val listensForEvents: List = KafkaEvents.entries + + + + override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { + val started = events.lastOrSuccessOf(EVENT_MEDIA_PROCESS_STARTED) ?: return null + if (!started.data.isSuccess()) { + return null + } + + val receivedEvents = events.map { it.event } + + // TODO: Add filter in case a metadata request was performed or a cover download was performed. for now, for base functionality, it requires a performed event. + + val requiresOneOf = listOf( + EVENT_WORK_CONVERT_PERFORMED, + EVENT_WORK_EXTRACT_PERFORMED, + EVENT_WORK_ENCODE_PERFORMED + ) + + if (requiresOneOf.none { it in receivedEvents }) { + val missing = requiresOneOf.filter { !receivedEvents.contains(it) } + log.info { "Can't complete at this moment. Missing required event(s)" + missing.joinToString("\n\t") } + return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event") + } + + + + + val mapper = ProcessMapping(events) + if (mapper.canCollect()) { + return ProcessCompleted(Status.COMPLETED) + } + return null + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt index b5da9895..65895bd0 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt @@ -35,7 +35,7 @@ class MetadataAndBaseInfoToCoverTask(@Autowired override var coordinator: Coordi } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val meta = events.findLast { it.data is MetadataPerformed }?.data as MetadataPerformed? ?: return null diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt index 801bebb0..34003a32 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt @@ -13,17 +13,21 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData +import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess -import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.springframework.beans.factory.annotation.Autowired import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import java.time.LocalDateTime +import java.time.ZoneOffset +import java.time.format.DateTimeFormatter +import java.util.* + /** * @@ -32,6 +36,7 @@ import java.time.LocalDateTime @EnableScheduling class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordinator) : TaskCreator(coordinator) { val log = KotlinLogging.logger {} + val metadataTimeout = KafkaEnv.metadataTimeoutMinutes * 60 override val producesEvent: KafkaEvents get() = KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE @@ -44,7 +49,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina ) override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val baseInfo = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? val meta = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) { it.data is MetadataPerformed }?.data as MetadataPerformed? @@ -54,7 +59,11 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina return null } if (baseInfo.isSuccess() && meta == null) { - log.info { "Sending ${baseInfo?.title} to waiting queue" } + val estimatedTimeout = LocalDateTime.now().toEpochSeconds() + metadataTimeout + val dateTime = LocalDateTime.ofEpochSecond(estimatedTimeout, 0, ZoneOffset.UTC) + + val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm", Locale.ENGLISH) + log.info { "Sending ${baseInfo?.title} to waiting queue. Expiry ${dateTime.format(formatter)}" } if (!waitingProcessesForMeta.containsKey(event.referenceId)) { waitingProcessesForMeta[event.referenceId] = LocalDateTime.now() } @@ -94,7 +103,7 @@ class MetadataAndBaseInfoToFileOut(@Autowired override var coordinator: Coordina @Scheduled(fixedDelay = (1_000)) fun sendErrorMessageForMetadata() { val expired = waitingProcessesForMeta.filter { - LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + KafkaEnv.metadataTimeoutMinutes * 60) + LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + metadataTimeout) } expired.forEach { log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt index e1f55c5a..d61703eb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt @@ -38,7 +38,7 @@ class ParseVideoFileStreams(@Autowired override var coordinator: Coordinator) : } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.lastOrSuccessOf(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) ?: return null return parseStreams(desiredEvent.data as ReaderPerformed) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt index 6c49731c..99214955 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt @@ -41,7 +41,7 @@ class ReadVideoFileStreams(@Autowired override var coordinator: Coordinator) : T override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val desiredEvent = events.find { it.data is MediaProcessStarted } ?: return null return runBlocking { fileReadStreams(desiredEvent.data as MediaProcessStarted) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt index 565dd7da..6cec7169 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt @@ -41,7 +41,7 @@ class EncodeArgumentCreatorTask(@Autowired override var coordinator: Coordinator } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted if (!started.operations.contains(ProcessStartOperationEvents.ENCODE)) { log.info { "Couldn't find operation event ${ProcessStartOperationEvents.ENCODE} in ${Gson().toJson(started.operations)}\n\tEncode Arguments will not be created" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt index cb50775f..ef3fcfaa 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt @@ -45,9 +45,9 @@ class ExtractArgumentCreatorTask(@Autowired override var coordinator: Coordinato } override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? { - log.info { "${this.javaClass.simpleName} @ ${event.referenceId} triggered by ${event.event}" } + log.info { "${event.referenceId} triggered by ${event.event}" } if (!requiredEvents.contains(event.event)) { - log.info { "${this.javaClass.simpleName} ignores ${event.event} @ ${event.eventId}" } + log.info { "Ignored ${event.event} @ ${event.eventId}" } return null } val started = events.find { it.data is MediaProcessStarted }?.data as MediaProcessStarted