From b1061b3c9e5b3b1c436e32facf553ac40806b1f0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Wed, 20 Aug 2025 00:02:09 +0200 Subject: [PATCH] Publishing reason for failure --- .../converter/tasks/ConvertService.kt | 2 +- .../processer/services/EncodeService.kt | 7 ++++--- .../processer/services/ExtractService.kt | 20 +++++++++++++------ .../common/database/cal/TasksManager.kt | 5 ++++- .../shared/common/database/tables/tasks.kt | 3 +++ 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index a9e6aea6..4dba80ec 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -110,7 +110,7 @@ class ConvertService( super.onError(inputFile, message) log.info { "Convert error for ${task.referenceId}\nmessage: $message" } - taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR) + taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message) tasks.onProduceEvent(ConvertWorkPerformed( metadata = EventMetadata( diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index f6ebe019..77022241 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -82,13 +82,14 @@ class EncodeService( logDir = logDir, listener = this ) if (outFile.exists()) { + val reason = "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outFile.absolutePath}" if (ffwrc.arguments.firstOrNull() != "-y") { this.onError( ffwrc.inputFile, - "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outFile.absolutePath}" + reason ) // Setting consumed to prevent spamming - taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR) + taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR, reason) return } } @@ -165,7 +166,7 @@ class EncodeService( override fun onError(inputFile: String, message: String) { val task = assignedTask ?: return - taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR) + taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message) log.error { "Encode failed for ${task.referenceId}\n$message" } tasks.onProduceEvent(EncodeWorkPerformedEvent( diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index d26f337b..a77cb4de 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -67,7 +67,8 @@ class ExtractService( logDir.mkdirs() } - val setClaim = taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId) + val setClaim = + taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId) if (setClaim) { log.info { "Claim successful for ${event.referenceId} extract" } runner = FfmpegRunner( @@ -78,13 +79,15 @@ class ExtractService( listener = this ) if (outputFile.exists()) { + val reason = + "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outputFile.absolutePath}" if (ffwrc.arguments.firstOrNull() != "-y") { this.onError( ffwrc.inputFile, - "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outputFile.absolutePath}" + reason ) // Setting consumed to prevent spamming - taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR) + taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR, reason) return } } @@ -107,7 +110,7 @@ class ExtractService( log.info { "Extract completed for ${task.referenceId}" } runBlocking { var successfulComplete = false - limitedWhile({!successfulComplete}, 1000 * 10, 1000) { + limitedWhile({ !successfulComplete }, 1000 * 10, 1000) { taskManager.markTaskAsCompleted(task.referenceId, task.eventId) successfulComplete = taskManager.isTaskCompleted(task.referenceId, task.eventId) } @@ -142,7 +145,7 @@ class ExtractService( override fun onError(inputFile: String, message: String) { val task = assignedTask ?: return - taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR) + taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message) log.error { "Extract failed for ${task.referenceId}\n$message" } tasks.onProduceEvent( @@ -172,7 +175,12 @@ class ExtractService( } - fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, progress: FfmpegDecodedProgress? = null) { + fun sendProgress( + referenceId: String, + eventId: String, + status: WorkStatus, + progress: FfmpegDecodedProgress? = null + ) { val runner = runner ?: return val processerEventInfo = ProcesserEventInfo( diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/TasksManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/TasksManager.kt index 420cf12c..7c665dc8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/TasksManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/cal/TasksManager.kt @@ -94,7 +94,7 @@ class TasksManager(private val dataSource: DataSource) { } } - fun markTaskAsCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean { + fun markTaskAsCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED, message: String? = null): Boolean { return executeWithStatus(dataSource) { tasks.update({ (tasks.referenceId eq referenceId) and @@ -103,6 +103,9 @@ class TasksManager(private val dataSource: DataSource) { it[consumed] = true it[claimed] = true it[tasks.status] = status.name + message?.let { msg -> + it[tasks.taskResult] = msg + } } } } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/tasks.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/tasks.kt index 153e568f..dea13686 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/tasks.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/database/tables/tasks.kt @@ -4,6 +4,7 @@ import org.jetbrains.exposed.dao.id.IntIdTable import org.jetbrains.exposed.sql.Column import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.datetime +import org.w3c.dom.Text import java.time.LocalDateTime object tasks: IntIdTable() { @@ -19,6 +20,8 @@ object tasks: IntIdTable() { val data: Column = text("data") val created: Column = datetime("created").defaultExpression(CurrentDateTime) val lastCheckIn: Column = datetime("lastCheckIn").nullable() + val taskResult: Column = text("taskResult").nullable() + init { uniqueIndex(referenceId, task, eventId)