From 22627c387a3a8910f1e02c5d70154082de40b128 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sun, 1 Feb 2026 03:48:33 +0100 Subject: [PATCH] Moved to props + fixing abandoned issue --- .../coordinator/CoordinatorApplication.kt | 6 +- .../coordinator/ProcesserClient.kt | 23 ++++++++ .../config/ProcesserClientConfig.kt | 25 +++++++++ .../translate/CoordinatorTaskTransferDto.kt | 2 +- .../services/CoordinatorHealthService.kt | 2 +- .../src/main/resources/application.yml | 5 +- .../src/test/resources/application.yml | 3 + .../processer/CoordinatorClient.kt | 29 ++++++++-- .../mediaprocessing/processer/FileUtil.kt | 18 ++++++ .../processer/LocalProgressCache.kt | 19 +++++++ .../processer/ProcesserApplication.kt | 9 ++- .../mediaprocessing/processer/ProcesserEnv.kt | 25 --------- .../iktdev/mediaprocessing/processer/Util.kt | 10 ---- .../{ => config}/CoordinatorClientConfig.kt | 9 ++- .../processer/config/DirectoryProperties.kt | 8 +++ .../{ => config}/ExecutableConfig.kt | 2 +- .../processer/config/ProcesserProperties.kt | 10 ++++ .../processer/controller/StateController.kt | 36 ++++++++++++ .../listeners/SubtitleTaskListener.kt | 33 +++++++---- .../processer/listeners/VideoTaskListener.kt | 56 ++++++++++++++----- .../src/main/resources/application.yml | 13 +++-- .../mediaprocessing/processer/TestUtils.kt | 27 +++++++++ .../processer/listeners/MockFFmpeg.kt | 1 + .../listeners/SubtitleTaskListenerTest.kt | 6 +- .../listeners/VideoTaskListenerTest.kt | 10 +++- .../src/test/resources/application.yml | 17 ++++-- .../common/MediaProcessingApplication.kt | 1 - .../events/ProcesserEncodeResultEvent.kt | 1 + .../shared/common/rules/TaskLifecycleRules.kt | 4 +- 29 files changed, 320 insertions(+), 90 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/FileUtil.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/LocalProgressCache.kt delete mode 100755 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt delete mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Util.kt rename apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/{ => config}/CoordinatorClientConfig.kt (66%) create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/DirectoryProperties.kt rename apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/{ => config}/ExecutableConfig.kt (77%) create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ProcesserProperties.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/StateController.kt create mode 100644 apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/TestUtils.kt diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index 4735ebb4..a5a37a8d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -7,6 +7,8 @@ import no.iktdev.exfl.coroutines.CoroutinesDefault import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.Observables import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig +import no.iktdev.mediaprocessing.coordinator.config.ProcesserClientProperties +import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.getAppVersion @@ -58,7 +60,9 @@ open class ApplicationConfiguration() { @Configuration @EnableConfigurationProperties( value = [ - ExecutablesConfig::class + ExecutablesConfig::class, + StreamItConfig::class, + ProcesserClientProperties::class ] ) class CoordinatorConfig \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt new file mode 100644 index 00000000..0e14dbbc --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/ProcesserClient.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.coordinator + +import org.springframework.stereotype.Component +import org.springframework.web.reactive.function.client.WebClient +import reactor.core.publisher.Mono + +@Component +class ProcesserClient( + private val webClient: WebClient +) { + + fun fetchLog(path: String): Mono = + webClient.get() + .uri { it.path("/state/log").queryParam("path", path).build() } + .retrieve() + .bodyToMono(String::class.java) + + fun ping(): Mono = + webClient.get() + .uri("/actuator/health") + .retrieve() + .bodyToMono(String::class.java) +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt new file mode 100644 index 00000000..61c4dca9 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ProcesserClientConfig.kt @@ -0,0 +1,25 @@ +package no.iktdev.mediaprocessing.coordinator.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.web.reactive.function.client.WebClient + +@Configuration +class ProcesserWebClientConfig { + + @Bean + fun processerWebClient( + builder: WebClient.Builder, + props: ProcesserClientProperties + ): WebClient = + builder + .baseUrl(props.baseUrl) + .build() +} + + +@ConfigurationProperties(prefix = "processer") +data class ProcesserClientProperties( + val baseUrl: String +) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt index 62c6a649..483151d6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/translate/CoordinatorTaskTransferDto.kt @@ -34,6 +34,6 @@ fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto { consumed = consumed, lastCheckIn = lastCheckIn, persistedAt = persistedAt, - abandoned = TaskLifecycleRules.isAbandoned(consumed, lastCheckIn) + abandoned = TaskLifecycleRules.isAbandoned(consumed, persistedAt, lastCheckIn) ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt index a739e54c..c804d063 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt @@ -31,7 +31,7 @@ class CoordinatorHealthService( // --- TASK HEALTH --- val abandonedTaskIds = tasks - .filter { TaskLifecycleRules.isAbandoned(it.consumed, it.lastCheckIn) } + .filter { TaskLifecycleRules.isAbandoned(it.consumed, it.persistedAt, it.lastCheckIn) } .map { it.taskId } val stalledTaskIds = tasks diff --git a/apps/coordinator/src/main/resources/application.yml b/apps/coordinator/src/main/resources/application.yml index 22585c6c..c79f9d6b 100644 --- a/apps/coordinator/src/main/resources/application.yml +++ b/apps/coordinator/src/main/resources/application.yml @@ -31,7 +31,10 @@ media: incoming: /src/input streamit: - address: http://streamit.service + base-url: ${PROCESSER_URL:http://processer:8080} + +processer: + base-url: http://processer.service:8080 executables: ffprobe: ffprobe diff --git a/apps/coordinator/src/test/resources/application.yml b/apps/coordinator/src/test/resources/application.yml index cf51c21a..f9128c6a 100644 --- a/apps/coordinator/src/test/resources/application.yml +++ b/apps/coordinator/src/test/resources/application.yml @@ -40,5 +40,8 @@ media: streamit: address: http://streamit.service +processer: + base-url: http://processer.service:8080 + executables: ffprobe: ffprobe diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt index af5a267e..d2e59ceb 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClient.kt @@ -1,6 +1,9 @@ package no.iktdev.mediaprocessing.processer +import jakarta.annotation.PostConstruct +import mu.KotlinLogging import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.processer.config.ProcesserProperties import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate import org.springframework.http.MediaType import org.springframework.stereotype.Component @@ -8,17 +11,35 @@ import org.springframework.web.reactive.function.client.WebClient @Component class CoordinatorClient( + private val processerProperties: ProcesserProperties, private val webClient: WebClient ) { + private val log = KotlinLogging.logger {} + + @PostConstruct + fun pingCoordinator() { + if (!processerProperties.coordinatorPingOnStartup) { + log.info { "Coordinator ping on startup is disabled" } + return + } + try { + val result = webClient.get() + .uri("/actuator/health") + .retrieve() + .bodyToMono(String::class.java) + .block() + + log.info { "Coordinator reachable. Health: $result" } + } catch (e: Exception) { + log.error(e) { "Coordinator NOT reachable at startup" } + } + } fun reportProgress(referenceId: String, taskId: String, percent: FfmpegDecodedProgress, message: String?) = webClient.post() .uri("/internal/progress") .contentType(MediaType.APPLICATION_JSON) - .bodyValue( - ProgressUpdate(referenceId, taskId, percent, message) - ) + .bodyValue(ProgressUpdate(referenceId, taskId, percent, message)) .retrieve() .toBodilessEntity() - } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/FileUtil.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/FileUtil.kt new file mode 100644 index 00000000..9aa0106f --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/FileUtil.kt @@ -0,0 +1,18 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.processer.config.DirectoryProperties +import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths +import org.springframework.stereotype.Component +import java.io.File + +@Component +class FileUtil( + private val dirs: DirectoryProperties, + private val mediaPaths: MediaPaths +) { + fun getTemporaryStoreFile(fileName: String): File = + File(mediaPaths.cache).using(fileName) + + fun getLogDirectory(): File = File(dirs.logs) +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/LocalProgressCache.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/LocalProgressCache.kt new file mode 100644 index 00000000..7721eded --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/LocalProgressCache.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress +import org.springframework.stereotype.Component +import java.util.* +import java.util.concurrent.ConcurrentHashMap + +@Component +class LocalProgressCache { + private val cache = ConcurrentHashMap() + + fun update(taskId: UUID, progress: FfmpegDecodedProgress) { + cache[taskId] = progress + } + + fun get(taskId: UUID): FfmpegDecodedProgress? = cache[taskId] + + fun getAll(): Map = cache.toMap() +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index 58a8bfbf..6ab6cbd3 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -6,6 +6,10 @@ import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.exfl.coroutines.CoroutinesDefault import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.Observables +import no.iktdev.mediaprocessing.processer.config.DirectoryProperties +import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig +import no.iktdev.mediaprocessing.processer.config.ProcesserProperties +import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.getAppVersion @@ -57,7 +61,10 @@ open class ApplicationConfiguration() { @Configuration @EnableConfigurationProperties( value = [ - ExecutablesConfig::class + ExecutablesConfig::class, + DirectoryProperties::class, + ProcesserProperties::class, + MediaPaths::class ] ) class ProcesserConfig \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt deleted file mode 100755 index d96cca75..00000000 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserEnv.kt +++ /dev/null @@ -1,25 +0,0 @@ -package no.iktdev.mediaprocessing.processer - -import no.iktdev.exfl.using -import java.io.File - -class ProcesserEnv { - companion object { - val coordinatorUrl = System.getenv("COORDINATOR_URL") ?: "http://coordinator:8080" - - val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" - val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false - - var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache") - - - val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else - File("data").using("logs") - - val encodeLogDirectory = logDirectory.using("encode") - val extractLogDirectory = logDirectory.using("extract") - val subtitleExtractLogDirectory = logDirectory.using("subtitles") - - val fullLogging = System.getenv("FullLogging").toBoolean() - } -} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Util.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Util.kt deleted file mode 100644 index 9c2e075a..00000000 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Util.kt +++ /dev/null @@ -1,10 +0,0 @@ -package no.iktdev.mediaprocessing.processer - -import no.iktdev.exfl.using -import java.io.File - -object Util { - fun getTemporaryStoreFile(fileName: String): File { - return ProcesserEnv.cachedContent.using(fileName) - } -} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/CoordinatorClientConfig.kt similarity index 66% rename from apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt rename to apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/CoordinatorClientConfig.kt index 63350e2d..099a7ad6 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/CoordinatorClientConfig.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/CoordinatorClientConfig.kt @@ -1,15 +1,17 @@ -package no.iktdev.mediaprocessing.processer +package no.iktdev.mediaprocessing.processer.config import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.web.reactive.function.client.WebClient @Configuration -class CoordinatorClientConfig { +class CoordinatorClientConfig( + private val processerProperties: ProcesserProperties, +) { @Bean fun coordinatorWebClient(builder: WebClient.Builder): WebClient { - val baseUrl = ProcesserEnv.coordinatorUrl + val baseUrl = processerProperties.coordinatorUrl ?: error("COORDINATOR_URL must be set") return builder @@ -17,3 +19,4 @@ class CoordinatorClientConfig { .build() } } + diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/DirectoryProperties.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/DirectoryProperties.kt new file mode 100644 index 00000000..6b98d860 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/DirectoryProperties.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.processer.config + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "directories") +data class DirectoryProperties( + val logs: String, +) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ExecutableConfig.kt similarity index 77% rename from apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt rename to apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ExecutableConfig.kt index 0d1e0855..e1b31ae0 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ExecutableConfig.kt @@ -1,4 +1,4 @@ -package no.iktdev.mediaprocessing.processer +package no.iktdev.mediaprocessing.processer.config import org.springframework.boot.context.properties.ConfigurationProperties diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ProcesserProperties.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ProcesserProperties.kt new file mode 100644 index 00000000..81ec4870 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/config/ProcesserProperties.kt @@ -0,0 +1,10 @@ +package no.iktdev.mediaprocessing.processer.config + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "processer") +data class ProcesserProperties( + val coordinatorUrl: String, + val coordinatorPingOnStartup: Boolean, + val allowOverwrite: Boolean +) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/StateController.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/StateController.kt new file mode 100644 index 00000000..ececc0f4 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/controller/StateController.kt @@ -0,0 +1,36 @@ +package no.iktdev.mediaprocessing.processer.controller + +import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress +import no.iktdev.mediaprocessing.processer.LocalProgressCache +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.* +import java.io.File +import java.util.* + +@RestController +@RequestMapping("/state") +class StateController( + private val localProgress: LocalProgressCache +) { + + @GetMapping("/progress") + fun allProgress(): Map = + localProgress.getAll() + + @GetMapping("/progress/{taskId}") + fun progress(@PathVariable taskId: UUID): FfmpegDecodedProgress? = + localProgress.get(taskId) + + + @GetMapping("/log") + fun getLog(@RequestParam path: String): ResponseEntity { + val file = File(path) + return if (file.exists()) { + ResponseEntity.ok(file.readText()) + } else { + ResponseEntity.notFound().build() + } + } + + +} diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt index 43202538..2b852fd5 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt @@ -6,17 +6,22 @@ import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskType +import no.iktdev.exfl.using import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument -import no.iktdev.mediaprocessing.processer.ProcesserEnv -import no.iktdev.mediaprocessing.processer.Util +import no.iktdev.mediaprocessing.processer.FileUtil +import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask import org.springframework.stereotype.Service +import java.io.File import java.util.* @Service -class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { +class SubtitleTaskListener( + private val executableConfig: ExecutablesConfig, + private val fileUtil: FileUtil +) : FfmpegTaskListener(TaskType.CPU_INTENSIVE) { private val log = KotlinLogging.logger {} @@ -35,16 +40,18 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { override suspend fun onTask(task: Task): Event? { val taskData = task as ExtractSubtitleTask - val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also { + val cachedOutFile = fileUtil.getTemporaryStoreFile(taskData.data.outputFileName).also { if (!it.parentFile.exists()) { it.parentFile.mkdirs() } } if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") { - reporter?.publishEvent(ProcesserExtractResultEvent( - status = TaskStatus.Failed - ).producedFrom(task)) + reporter?.publishEvent( + ProcesserExtractResultEvent( + status = TaskStatus.Failed + ).producedFrom(task) + ) throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite") } @@ -58,7 +65,7 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { reporter?.updateLastSeen(task.taskId) } result.run(arguments) - if (result.result.resultCode != 0 ) { + if (result.result.resultCode != 0) { return ProcesserExtractResultEvent(status = TaskStatus.Failed).producedFrom(task) } @@ -85,15 +92,17 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { } override fun getFfmpeg(): FFmpeg { - return SubtitleFFmpeg() + val logDirectory = fileUtil.getLogDirectory().using("subtitles") + return SubtitleFFmpeg(null, executableConfig.ffmpeg, logDirectory) } - class SubtitleFFmpeg(override val listener: Listener? = null): FFmpeg(executable = ProcesserEnv.ffmpeg, logDir = ProcesserEnv.subtitleExtractLogDirectory ) { + class SubtitleFFmpeg(override val listener: Listener? = null, private val executablePath: String, val logDirectory: File) : + FFmpeg(executable = executablePath, logDir = logDirectory) { override fun onCreate() { - if (!ProcesserEnv.subtitleExtractLogDirectory.exists()) { - ProcesserEnv.subtitleExtractLogDirectory.mkdirs() + if (!logDir.exists()) { + logDir.mkdirs() } } } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt index 2989290b..30c38202 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt @@ -6,19 +6,27 @@ import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskType +import no.iktdev.exfl.using import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.CoordinatorClient -import no.iktdev.mediaprocessing.processer.ProcesserEnv -import no.iktdev.mediaprocessing.processer.Util +import no.iktdev.mediaprocessing.processer.FileUtil +import no.iktdev.mediaprocessing.processer.LocalProgressCache +import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import org.springframework.stereotype.Service +import java.io.File import java.util.* @Service -class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) { +class VideoTaskListener( + private var coordinatorWebClient: CoordinatorClient, + private val localProgress: LocalProgressCache, + private val executableConfig: ExecutablesConfig, + private val fileUtil: FileUtil, +) : FfmpegTaskListener(TaskType.CPU_INTENSIVE) { private val log = KotlinLogging.logger {} override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" @@ -35,15 +43,17 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff override suspend fun onTask(task: Task): Event? { val taskData = task as EncodeTask - val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also { + val cachedOutFile = fileUtil.getTemporaryStoreFile(taskData.data.outputFileName).also { if (!it.parentFile.exists()) { it.parentFile.mkdirs() } } if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") { - reporter?.publishEvent(ProcesserEncodeResultEvent( - status = TaskStatus.Failed - ).producedFrom(task)) + reporter?.publishEvent( + ProcesserEncodeResultEvent( + status = TaskStatus.Failed + ).producedFrom(task) + ) throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite") } @@ -58,12 +68,16 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff reporter?.updateLastSeen(task.taskId) } result.run(arguments) - if (result.result.resultCode != 0 ) { - return ProcesserEncodeResultEvent(status = TaskStatus.Failed).producedFrom(task) + if (result.result.resultCode != 0) { + return ProcesserEncodeResultEvent( + status = TaskStatus.Failed, + logFile = result.logFile.absolutePath + ).producedFrom(task) } return ProcesserEncodeResultEvent( status = TaskStatus.Completed, + logFile = result.logFile.absolutePath, data = ProcesserEncodeResultEvent.EncodeResult( cachedOutputFile = cachedOutFile.absolutePath ) @@ -80,12 +94,15 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff TaskStatus.Cancelled -> "Canceled" else -> "" } - return ProcesserEncodeResultEvent(null, status, error = message) + return ProcesserEncodeResultEvent(null, null, status, error = message) } override fun getFfmpeg(): FFmpeg { - return VideoFFmpeg(object : FFmpeg.Listener { + val logDirectory = fileUtil.getLogDirectory().using("encode") + return VideoFFmpeg(execPath = executableConfig.ffmpeg, + logDirectory = logDirectory, + listener = object : FFmpeg.Listener { var lastProgress: FfmpegDecodedProgress? = null override fun onStarted(inputFile: String) { } @@ -95,7 +112,14 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff coordinatorWebClient.reportProgress( referenceId = it.referenceId.toString(), taskId = it.taskId.toString(), - percent = FfmpegDecodedProgress(100, "", lastProgress?.duration ?: "", "0", estimatedCompletion = "", estimatedCompletionSeconds = 0), + percent = FfmpegDecodedProgress( + 100, + "", + lastProgress?.duration ?: "", + "0", + estimatedCompletion = "", + estimatedCompletionSeconds = 0 + ), "" ) } @@ -107,6 +131,7 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff ) { lastProgress = progress currentTask?.let { + localProgress.update(it.taskId, progress) coordinatorWebClient.reportProgress( referenceId = it.referenceId.toString(), taskId = it.taskId.toString(), @@ -120,12 +145,13 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff } - class VideoFFmpeg(override val listener: Listener? = null): FFmpeg(executable = ProcesserEnv.ffmpeg, logDir = ProcesserEnv.encodeLogDirectory) { + class VideoFFmpeg(override val listener: Listener? = null, private val execPath: String, val logDirectory: File) : + FFmpeg(executable = execPath, logDir = logDirectory) { override fun onCreate() { super.onCreate() - if (!ProcesserEnv.encodeLogDirectory.exists()) { - ProcesserEnv.encodeLogDirectory.mkdirs() + if (!logDirectory.exists()) { + logDirectory.mkdirs() } } } diff --git a/apps/processer/src/main/resources/application.yml b/apps/processer/src/main/resources/application.yml index 7579499d..604aaeae 100644 --- a/apps/processer/src/main/resources/application.yml +++ b/apps/processer/src/main/resources/application.yml @@ -26,12 +26,17 @@ logging: media: - cache: /src/cache + cache: ${DIRECTORY_CONTENT_CACHE:/src/cache} outgoing: /src/output incoming: /src/input -streamit: - address: http://streamit.service +processer: + coordinator-url: ${COORDINATOR_URL:http://coordinator:8080} + coordinator-ping-on-startup: true + allow-overwrite: ${ALLOW_OVERWRITE:false} executables: - ffmpeg: ffmpeg + ffmpeg: ${SUPPORTING_EXECUTABLE_FFMPEG:ffmpeg} + +directories: + logs: ${LOG_DIR:/data/logs} diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/TestUtils.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/TestUtils.kt new file mode 100644 index 00000000..4d2b7783 --- /dev/null +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/TestUtils.kt @@ -0,0 +1,27 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.processer.config.DirectoryProperties +import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig +import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths + +object TestUtils { + fun getFileUtil(): FileUtil { + val dirs = DirectoryProperties( + logs = "build/test-logs", + ) + val mediaPaths = MediaPaths( + cache = "build/test-cache", + incoming = "build/test-input", + outgoing = "build/test-output" + ) + + return FileUtil(dirs, mediaPaths) + } + + fun getExecutableConfig(): ExecutablesConfig { + return ExecutablesConfig( + ffmpeg = "ffmpeg" + ) + } + +} \ No newline at end of file diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/MockFFmpeg.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/MockFFmpeg.kt index f22deb81..118e5f16 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/MockFFmpeg.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/MockFFmpeg.kt @@ -19,6 +19,7 @@ class MockFFmpeg(override val listener: Listener, val delayMillis: Long = 500, p } override suspend fun run(argument: MpegArgument) { + logFile = File("build/test-log/file.json") inputFile = argument.inputFile!! listener.onStarted(argument.inputFile!!) delay(delayMillis) diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt index f0a6d23e..efa5f75f 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt @@ -7,6 +7,7 @@ import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.mediaprocessing.ffmpeg.FFmpeg +import no.iktdev.mediaprocessing.processer.TestUtils import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask @@ -19,7 +20,10 @@ import kotlin.system.measureTimeMillis class SubtitleTaskListenerTest { - class TestListener(val delay: Long): SubtitleTaskListener() { + class TestListener(val delay: Long): SubtitleTaskListener( + fileUtil = TestUtils.getFileUtil(), + executableConfig = TestUtils.getExecutableConfig(), + ) { fun getJob() = currentJob private var _result: Event? = null diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt index 6441981e..69795931 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt @@ -9,6 +9,8 @@ import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.processer.CoordinatorClient +import no.iktdev.mediaprocessing.processer.LocalProgressCache +import no.iktdev.mediaprocessing.processer.TestUtils import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask @@ -21,7 +23,13 @@ import kotlin.system.measureTimeMillis class VideoTaskListenerTest { - class TestListener(val delay: Long, coordinatorClient: CoordinatorClient): VideoTaskListener(coordinatorClient) { + + class TestListener(val delay: Long, coordinatorClient: CoordinatorClient): + VideoTaskListener(coordinatorWebClient = coordinatorClient, + localProgress = LocalProgressCache(), + fileUtil = TestUtils.getFileUtil(), + executableConfig = TestUtils.getExecutableConfig(), + ) { fun getJob() = currentJob private var _result: Event? = null diff --git a/apps/processer/src/test/resources/application.yml b/apps/processer/src/test/resources/application.yml index cfaabf57..40c39f35 100644 --- a/apps/processer/src/test/resources/application.yml +++ b/apps/processer/src/test/resources/application.yml @@ -33,12 +33,17 @@ management: show-details: always media: - cache: /src/cache - outgoing: /src/output - incoming: /src/input - -streamit: - address: http://streamit.service + cache: build/test-cache + outgoing: build/test-output + incoming: build/test-input executables: ffmpeg: ffmpeg + +processer: + coordinator-url: ${COORDINATOR_URL:http://coordinator:8080} + coordinator-ping-on-startup: false + allow-overwrite: ${ALLOW_OVERWRITE:false} + +directories: + logs: build/test-logs \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/MediaProcessingApplication.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/MediaProcessingApplication.kt index 9da62a9f..c2589419 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/MediaProcessingApplication.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/MediaProcessingApplication.kt @@ -29,7 +29,6 @@ annotation class MediaProcessingApp @Configuration @EnableConfigurationProperties( value = [ - StreamItConfig::class, MediaPaths::class ] ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt index 20f69c70..28e58f8d 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt @@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve class ProcesserEncodeResultEvent( val data: EncodeResult? = null, + val logFile: String? = null, status: TaskStatus, error: String? = null ) : TaskResultEvent(status, error) { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt index 9e309a97..411c01da 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt @@ -11,14 +11,14 @@ object TaskLifecycleRules { fun isAbandoned( consumed: Boolean, - createdAt: Instant, + persistedAt: Instant, lastCheckIn: Instant? ): Boolean { if (consumed) return false val cutoff = Instant.now().minus(abandonedAfterMinutes, ChronoUnit.MINUTES) - val reference = lastCheckIn ?: createdAt + val reference = lastCheckIn ?: persistedAt return reference.isBefore(cutoff) }