From 4708252aea712336d93a52aa6c9e05cb6cb67a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Thu, 22 Jan 2026 00:58:10 +0100 Subject: [PATCH] Introduced logging in processer --- .../mediaprocessing/processer/TaskPoller.kt | 8 ++++++-- .../processer/listeners/SubtitleTaskListener.kt | 16 ++++++++++++++-- .../processer/listeners/VideoTaskListener.kt | 11 +++++++++++ 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt index d7d65c91..04dbb7d4 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt @@ -3,18 +3,18 @@ package no.iktdev.mediaprocessing.processer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.launch +import mu.KotlinLogging import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskPollerImplementation import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.shared.database.stores.EventStore import no.iktdev.mediaprocessing.shared.database.stores.TaskStore - import org.springframework.boot.ApplicationArguments import org.springframework.boot.ApplicationRunner import org.springframework.stereotype.Component import org.springframework.stereotype.Service -import java.util.UUID +import java.util.* @Component class PollerAdministrator( @@ -41,7 +41,10 @@ class TaskPoller( @Component class DefaultTaskReporter() : TaskReporter { + private val log = KotlinLogging.logger {} + override fun markClaimed(taskId: UUID, workerId: String) { + log.info { "$workerId claiming task $taskId" } TaskStore.claim(taskId, workerId) } @@ -50,6 +53,7 @@ class DefaultTaskReporter() : TaskReporter { } override fun markConsumed(taskId: UUID) { + log.info { "Marking task $taskId as completed" } TaskStore.markConsumed(taskId, TaskStatus.Completed) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt index 251ae19b..da89d4d1 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt @@ -1,9 +1,10 @@ package no.iktdev.mediaprocessing.processer.listeners +import mu.KotlinLogging import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus -import no.iktdev.eventi.tasks.TaskListener +import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskType import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument @@ -12,14 +13,25 @@ import no.iktdev.mediaprocessing.processer.Util import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask import org.springframework.stereotype.Service -import java.util.UUID +import java.util.* @Service class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { + private val log = KotlinLogging.logger {} + + override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" override fun supports(task: Task) = task is ExtractSubtitleTask + override fun accept(task: Task, reporter: TaskReporter): Boolean { + val accepts = super.accept(task, reporter) + if (accepts) { + log.info { "${getWorkerId()} accepts subtitle task ${task.taskId}" } + } + return accepts + } + override suspend fun onTask(task: Task): Event? { val taskData = task as ExtractSubtitleTask diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt index f9ba8bdd..67c93469 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt @@ -1,8 +1,10 @@ package no.iktdev.mediaprocessing.processer.listeners +import mu.KotlinLogging import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskType import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument @@ -17,11 +19,20 @@ import java.util.* @Service class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) { + private val log = KotlinLogging.logger {} override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" override fun supports(task: Task) = task is EncodeTask + override fun accept(task: Task, reporter: TaskReporter): Boolean { + val accepts = super.accept(task, reporter) + if (accepts) { + log.info { "${getWorkerId()} accepts video task ${task.taskId}" } + } + return accepts + } + override suspend fun onTask(task: Task): Event? { val taskData = task as EncodeTask val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also {