From 5eb94df884283c868e769677451acc9f031dcfd7 Mon Sep 17 00:00:00 2001 From: bskjon Date: Thu, 11 Apr 2024 01:33:42 +0200 Subject: [PATCH] Updated --- .idea/misc.xml | 9 ++------- .../converter/tasks/ConvertService.kt | 17 ++++++----------- .../processer/Implementations.kt | 5 ++++- .../mediaprocessing/processer/Reporter.kt | 5 +++-- .../ui/socket/internal/EncoderReaderService.kt | 5 +---- .../mediaprocessing/shared/common/Defaults.kt | 2 +- .../shared/kafka/core/KafkaEvents.kt | 8 ++++++++ 7 files changed, 25 insertions(+), 26 deletions(-) diff --git a/.idea/misc.xml b/.idea/misc.xml index 79db1727..c13008c4 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,12 +1,7 @@ - - - - - - - + + \ No newline at end of file diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 3535041c..6bef21d4 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -182,12 +182,10 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) val scheduled_deferred_events: MutableMap> = mutableMapOf() @Scheduled(fixedDelay = (300_000)) fun validatePresenceOfRequiredEvent() { - val removal = mutableMapOf>() - + val continueDeferral: MutableMap> = mutableMapOf() for ((referenceId, eventList) in scheduled_deferred_events) { - val failed = mutableListOf() - + val keepable = mutableListOf() for (event in eventList) { val ce = if (event.event.data is ConvertWorkerRequest) event.event.data as ConvertWorkerRequest else null try { @@ -199,12 +197,12 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) throw RuntimeException("Iterated overshot") } else { event.iterated++ + keepable.add(event) "Iteration ${event.iterated} for event ${event.eventId} in deferred check" } } catch (e: Exception) { eventManager.setProcessEventCompleted(referenceId, event.eventId) - failed.add(event) log.error { "Canceling event ${event.eventId}\n\t by declaring it as consumed." } producer.sendMessage( referenceId = referenceId, @@ -213,14 +211,11 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator) ) } } - removal[referenceId] = failed + continueDeferral[referenceId] = keepable } - for ((referenceId, events) in removal) { - val list = scheduled_deferred_events[referenceId] ?: continue - list.toMutableList().removeAll(events) - scheduled_deferred_events[referenceId] = list - } + scheduled_deferred_events.clear() + scheduled_deferred_events.putAll(continueDeferral) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt index 7aa3e161..0789c8f9 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt @@ -4,8 +4,10 @@ import no.iktdev.mediaprocessing.shared.common.Defaults import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation +import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import +import org.springframework.web.client.RestTemplate //@Configuration //class SocketLocalInit: SocketImplementation() @@ -16,4 +18,5 @@ class KafkaLocalInit: KafkaImplementation() { } @Configuration -class DefaultConfiguration: Defaults() +public class DefaultProcesserConfiguration: Defaults() { +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt index 5eaa69b9..2544e514 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt @@ -7,8 +7,9 @@ import org.springframework.stereotype.Service import org.springframework.web.client.RestTemplate @Service -class Reporter(@Autowired private val restTemplate: RestTemplate) { - +class Reporter() { + @Autowired + lateinit var restTemplate: RestTemplate fun sendEncodeProgress(progress: ProcesserEventInfo) { try { restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/internal/EncoderReaderService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/internal/EncoderReaderService.kt index dce7fa2f..6463e0bb 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/internal/EncoderReaderService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/internal/EncoderReaderService.kt @@ -3,10 +3,7 @@ package no.iktdev.mediaprocessing.ui.socket.internal import com.google.gson.Gson import com.google.gson.reflect.TypeToken import mu.KotlinLogging -import no.iktdev.mediaprocessing.ui.UIEnv -import no.iktdev.mediaprocessing.ui.dto.EventDataObject -import no.iktdev.mediaprocessing.ui.memActiveEventMap -import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap + import org.springframework.messaging.simp.stomp.StompFrameHandler import org.springframework.messaging.simp.stomp.StompHeaders import org.springframework.messaging.simp.stomp.StompSession diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt index 810e41a4..1c372095 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt @@ -8,7 +8,7 @@ import org.springframework.web.client.RestTemplate open class Defaults { @Bean - fun restTemplate(): RestTemplate { + open fun restTemplate(): RestTemplate { val restTemplate = RestTemplate() return restTemplate } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt index 06adb2c8..7177df5b 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt @@ -57,5 +57,13 @@ enum class KafkaEvents(val event: String) { EventWorkExtractPerformed ) } + + fun isOfFinalize(event: KafkaEvents): Boolean { + return event in listOf( + EVENT_MEDIA_PROCESS_COMPLETED, + EVENT_REQUEST_PROCESS_COMPLETED, + EVENT_COLLECT_AND_STORE + ) + } } } \ No newline at end of file