Publishing reason for failure
This commit is contained in:
parent
d7555b825f
commit
b1061b3c9e
@ -110,7 +110,7 @@ class ConvertService(
|
|||||||
super.onError(inputFile, message)
|
super.onError(inputFile, message)
|
||||||
log.info { "Convert error for ${task.referenceId}\nmessage: $message" }
|
log.info { "Convert error for ${task.referenceId}\nmessage: $message" }
|
||||||
|
|
||||||
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
|
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message)
|
||||||
|
|
||||||
tasks.onProduceEvent(ConvertWorkPerformed(
|
tasks.onProduceEvent(ConvertWorkPerformed(
|
||||||
metadata = EventMetadata(
|
metadata = EventMetadata(
|
||||||
|
|||||||
@ -82,13 +82,14 @@ class EncodeService(
|
|||||||
logDir = logDir, listener = this
|
logDir = logDir, listener = this
|
||||||
)
|
)
|
||||||
if (outFile.exists()) {
|
if (outFile.exists()) {
|
||||||
|
val reason = "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outFile.absolutePath}"
|
||||||
if (ffwrc.arguments.firstOrNull() != "-y") {
|
if (ffwrc.arguments.firstOrNull() != "-y") {
|
||||||
this.onError(
|
this.onError(
|
||||||
ffwrc.inputFile,
|
ffwrc.inputFile,
|
||||||
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outFile.absolutePath}"
|
reason
|
||||||
)
|
)
|
||||||
// Setting consumed to prevent spamming
|
// Setting consumed to prevent spamming
|
||||||
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
|
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR, reason)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -165,7 +166,7 @@ class EncodeService(
|
|||||||
override fun onError(inputFile: String, message: String) {
|
override fun onError(inputFile: String, message: String) {
|
||||||
val task = assignedTask ?: return
|
val task = assignedTask ?: return
|
||||||
|
|
||||||
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
|
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message)
|
||||||
|
|
||||||
log.error { "Encode failed for ${task.referenceId}\n$message" }
|
log.error { "Encode failed for ${task.referenceId}\n$message" }
|
||||||
tasks.onProduceEvent(EncodeWorkPerformedEvent(
|
tasks.onProduceEvent(EncodeWorkPerformedEvent(
|
||||||
|
|||||||
@ -67,7 +67,8 @@ class ExtractService(
|
|||||||
logDir.mkdirs()
|
logDir.mkdirs()
|
||||||
}
|
}
|
||||||
|
|
||||||
val setClaim = taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
|
val setClaim =
|
||||||
|
taskManager.markTaskAsClaimed(referenceId = event.referenceId, eventId = event.eventId, claimer = serviceId)
|
||||||
if (setClaim) {
|
if (setClaim) {
|
||||||
log.info { "Claim successful for ${event.referenceId} extract" }
|
log.info { "Claim successful for ${event.referenceId} extract" }
|
||||||
runner = FfmpegRunner(
|
runner = FfmpegRunner(
|
||||||
@ -78,13 +79,15 @@ class ExtractService(
|
|||||||
listener = this
|
listener = this
|
||||||
)
|
)
|
||||||
if (outputFile.exists()) {
|
if (outputFile.exists()) {
|
||||||
|
val reason =
|
||||||
|
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outputFile.absolutePath}"
|
||||||
if (ffwrc.arguments.firstOrNull() != "-y") {
|
if (ffwrc.arguments.firstOrNull() != "-y") {
|
||||||
this.onError(
|
this.onError(
|
||||||
ffwrc.inputFile,
|
ffwrc.inputFile,
|
||||||
"${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outputFile.absolutePath}"
|
reason
|
||||||
)
|
)
|
||||||
// Setting consumed to prevent spamming
|
// Setting consumed to prevent spamming
|
||||||
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR)
|
taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR, reason)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -107,7 +110,7 @@ class ExtractService(
|
|||||||
log.info { "Extract completed for ${task.referenceId}" }
|
log.info { "Extract completed for ${task.referenceId}" }
|
||||||
runBlocking {
|
runBlocking {
|
||||||
var successfulComplete = false
|
var successfulComplete = false
|
||||||
limitedWhile({!successfulComplete}, 1000 * 10, 1000) {
|
limitedWhile({ !successfulComplete }, 1000 * 10, 1000) {
|
||||||
taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
|
taskManager.markTaskAsCompleted(task.referenceId, task.eventId)
|
||||||
successfulComplete = taskManager.isTaskCompleted(task.referenceId, task.eventId)
|
successfulComplete = taskManager.isTaskCompleted(task.referenceId, task.eventId)
|
||||||
}
|
}
|
||||||
@ -142,7 +145,7 @@ class ExtractService(
|
|||||||
override fun onError(inputFile: String, message: String) {
|
override fun onError(inputFile: String, message: String) {
|
||||||
val task = assignedTask ?: return
|
val task = assignedTask ?: return
|
||||||
|
|
||||||
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR)
|
taskManager.markTaskAsCompleted(task.referenceId, task.eventId, Status.ERROR, message)
|
||||||
|
|
||||||
log.error { "Extract failed for ${task.referenceId}\n$message" }
|
log.error { "Extract failed for ${task.referenceId}\n$message" }
|
||||||
tasks.onProduceEvent(
|
tasks.onProduceEvent(
|
||||||
@ -172,7 +175,12 @@ class ExtractService(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, progress: FfmpegDecodedProgress? = null) {
|
fun sendProgress(
|
||||||
|
referenceId: String,
|
||||||
|
eventId: String,
|
||||||
|
status: WorkStatus,
|
||||||
|
progress: FfmpegDecodedProgress? = null
|
||||||
|
) {
|
||||||
val runner = runner ?: return
|
val runner = runner ?: return
|
||||||
|
|
||||||
val processerEventInfo = ProcesserEventInfo(
|
val processerEventInfo = ProcesserEventInfo(
|
||||||
|
|||||||
@ -94,7 +94,7 @@ class TasksManager(private val dataSource: DataSource) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fun markTaskAsCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED): Boolean {
|
fun markTaskAsCompleted(referenceId: String, eventId: String, status: Status = Status.COMPLETED, message: String? = null): Boolean {
|
||||||
return executeWithStatus(dataSource) {
|
return executeWithStatus(dataSource) {
|
||||||
tasks.update({
|
tasks.update({
|
||||||
(tasks.referenceId eq referenceId) and
|
(tasks.referenceId eq referenceId) and
|
||||||
@ -103,6 +103,9 @@ class TasksManager(private val dataSource: DataSource) {
|
|||||||
it[consumed] = true
|
it[consumed] = true
|
||||||
it[claimed] = true
|
it[claimed] = true
|
||||||
it[tasks.status] = status.name
|
it[tasks.status] = status.name
|
||||||
|
message?.let { msg ->
|
||||||
|
it[tasks.taskResult] = msg
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import org.jetbrains.exposed.dao.id.IntIdTable
|
|||||||
import org.jetbrains.exposed.sql.Column
|
import org.jetbrains.exposed.sql.Column
|
||||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
||||||
import org.jetbrains.exposed.sql.javatime.datetime
|
import org.jetbrains.exposed.sql.javatime.datetime
|
||||||
|
import org.w3c.dom.Text
|
||||||
import java.time.LocalDateTime
|
import java.time.LocalDateTime
|
||||||
|
|
||||||
object tasks: IntIdTable() {
|
object tasks: IntIdTable() {
|
||||||
@ -19,6 +20,8 @@ object tasks: IntIdTable() {
|
|||||||
val data: Column<String> = text("data")
|
val data: Column<String> = text("data")
|
||||||
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
||||||
val lastCheckIn: Column<LocalDateTime?> = datetime("lastCheckIn").nullable()
|
val lastCheckIn: Column<LocalDateTime?> = datetime("lastCheckIn").nullable()
|
||||||
|
val taskResult: Column<String?> = text("taskResult").nullable()
|
||||||
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
uniqueIndex(referenceId, task, eventId)
|
uniqueIndex(referenceId, task, eventId)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user