From b245cf6f942139f9610c2fb5a7b82abc7f707658 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sun, 21 Apr 2024 20:43:03 +0200 Subject: [PATCH] Changed behaviour --- .../mediaprocessing/processer/Reporter.kt | 8 +++++-- .../processer/services/EncodeService.kt | 21 ++++++++++++------- .../persistance/PersistentEventManager.kt | 3 ++- 3 files changed, 22 insertions(+), 10 deletions(-) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt index 8b85ff8d..f64bfefa 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.processer +import mu.KotlinLogging import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo import org.springframework.beans.factory.annotation.Autowired @@ -13,12 +14,15 @@ class Reporter() { lateinit var restTemplate: RestTemplate @Autowired lateinit var messageTemplate: SimpMessagingTemplate + + private val log = KotlinLogging.logger {} + fun sendEncodeProgress(progress: ProcesserEventInfo) { try { restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) messageTemplate.convertAndSend("/topic/encode/progress", progress) } catch (e: Exception) { - e.printStackTrace() + log.error { e.message } } } @@ -27,7 +31,7 @@ class Reporter() { restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java) messageTemplate.convertAndSend("/topic/extract/progress", progress) } catch (e: Exception) { - e.printStackTrace() + log.error { e.message } } } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 8cc884f8..16db1151 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -83,7 +83,8 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired fun startEncode(event: PersistentProcessDataMessage) { val ffwrc = event.data as FfmpegWorkRequestCreated - File(ffwrc.outFile).parentFile.mkdirs() + val outFile = File(ffwrc.outFile) + outFile.parentFile.mkdirs() if (!logDir.exists()) { logDir.mkdirs() } @@ -92,11 +93,13 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired if (setClaim) { log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) - if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { - ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") - // Setting consumed to prevent spamming - eventManager.setProcessEventCompleted(event.referenceId, event.eventId) - return + if (outFile.exists()) { + if (ffwrc.arguments.firstOrNull() != "-y") { + ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") + // Setting consumed to prevent spamming + eventManager.setProcessEventCompleted(event.referenceId, event.eventId, Status.ERROR) + return + } } runner?.runWithProgress() @@ -196,7 +199,11 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired outputFiles = listOf(info.outFile), progress = progress?.toProcessProgress() ) - reporter.sendEncodeProgress(processerEventInfo) + try { + reporter.sendEncodeProgress(processerEventInfo) + } catch (e: Exception) { + e.printStackTrace() + } } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index 5277440e..fe5dd848 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -142,7 +142,8 @@ class PersistentEventManager(private val dataSource: DataSource) { } fun isProcessEventClaimed(referenceId: String, eventId: String): Boolean { - return getProcessEventWith(referenceId, eventId)?.claimed ?: false + val info = getProcessEventWith(referenceId, eventId) + return info?.claimed ?: true && info?.consumed ?: true } fun isProcessEventCompleted(referenceId: String, eventId: String): Boolean {