From a3b83bc8a1f5000fde2158b1322ca3b413d1b1e4 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 13 Apr 2024 17:54:53 +0200 Subject: [PATCH] Adjustments --- .../mediaprocessing/processer/services/EncodeService.kt | 1 + .../mediaprocessing/processer/services/ExtractService.kt | 1 + .../shared/common/persistance/PersistentEventManager.kt | 4 +++- .../shared/common/persistance/processerEvents.kt | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 89a792d0..d2e4dd4a 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -158,6 +158,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired } override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { + eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR) val runner = this@EncodeService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce error message when the referenceId is not present" } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index 47e39c74..7a8686b7 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -153,6 +153,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire } override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { + eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR) val runner = this@ExtractService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce error message when the referenceId is not present" } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt index e53f755a..037327f8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentEventManager.kt @@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.datasource.* import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.Status import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq @@ -219,7 +220,7 @@ class PersistentEventManager(private val dataSource: DataSource) { } } - fun setProcessEventCompleted(referenceId: String, eventId: String): Boolean { + fun setProcessEventCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean { return executeWithStatus(dataSource) { processerEvents.update({ (processerEvents.referenceId eq referenceId) and @@ -227,6 +228,7 @@ class PersistentEventManager(private val dataSource: DataSource) { }) { it[consumed] = true it[claimed] = true + it[processerEvents.status] = status.name } } } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt index cab0a06a..a9766a03 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt @@ -8,6 +8,7 @@ import java.time.LocalDateTime object processerEvents: IntIdTable() { val referenceId: Column = varchar("referenceId", 50) + val status: Column = varchar("status", 10) val claimed: Column = bool("claimed").default(false) val claimedBy: Column = varchar("claimedBy", 100).nullable() val event: Column = varchar("event",100)