Adjustments

This commit is contained in:
bskjon 2024-04-13 17:54:53 +02:00
parent b5a7aa0f36
commit a3b83bc8a1
4 changed files with 6 additions and 1 deletions

View File

@ -158,6 +158,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired
} }
override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR)
val runner = this@EncodeService.runner val runner = this@EncodeService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" } log.error { "Can't produce error message when the referenceId is not present" }

View File

@ -153,6 +153,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator, @Autowire
} }
override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) {
eventManager.setProcessEventCompleted(referenceId, eventId, Status.ERROR)
val runner = this@ExtractService.runner val runner = this@ExtractService.runner
if (runner == null || runner.referenceId.isBlank()) { if (runner == null || runner.referenceId.isBlank()) {
log.error { "Can't produce error message when the referenceId is not present" } log.error { "Can't produce error message when the referenceId is not present" }

View File

@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.datasource.*
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
@ -219,7 +220,7 @@ class PersistentEventManager(private val dataSource: DataSource) {
} }
} }
fun setProcessEventCompleted(referenceId: String, eventId: String): Boolean { fun setProcessEventCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean {
return executeWithStatus(dataSource) { return executeWithStatus(dataSource) {
processerEvents.update({ processerEvents.update({
(processerEvents.referenceId eq referenceId) and (processerEvents.referenceId eq referenceId) and
@ -227,6 +228,7 @@ class PersistentEventManager(private val dataSource: DataSource) {
}) { }) {
it[consumed] = true it[consumed] = true
it[claimed] = true it[claimed] = true
it[processerEvents.status] = status.name
} }
} }
} }

View File

@ -8,6 +8,7 @@ import java.time.LocalDateTime
object processerEvents: IntIdTable() { object processerEvents: IntIdTable() {
val referenceId: Column<String> = varchar("referenceId", 50) val referenceId: Column<String> = varchar("referenceId", 50)
val status: Column<String> = varchar("status", 10)
val claimed: Column<Boolean> = bool("claimed").default(false) val claimed: Column<Boolean> = bool("claimed").default(false)
val claimedBy: Column<String?> = varchar("claimedBy", 100).nullable() val claimedBy: Column<String?> = varchar("claimedBy", 100).nullable()
val event: Column<String> = varchar("event",100) val event: Column<String> = varchar("event",100)