diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt index b499eb96..758ab6ec 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt @@ -9,12 +9,11 @@ import no.iktdev.eventi.tasks.TaskPollerImplementation import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.shared.database.stores.EventStore import no.iktdev.mediaprocessing.shared.database.stores.TaskStore - import org.springframework.boot.ApplicationArguments import org.springframework.boot.ApplicationRunner import org.springframework.stereotype.Component import org.springframework.stereotype.Service -import java.util.UUID +import java.util.* @Component class PollerAdministrator( @@ -49,10 +48,16 @@ class DefaultTaskReporter() : TaskReporter { TaskStore.heartbeat(taskId) } - override fun markConsumed(taskId: UUID) { + override fun markCompleted(taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Completed) + } + override fun markFailed(taskId: UUID) { + TaskStore.markConsumed(taskId, TaskStatus.Failed) + } + + override fun updateProgress(taskId: UUID, progress: Int) { // Not to be implemented for this application } diff --git a/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt b/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt index 96a622c7..de887413 100644 --- a/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt +++ b/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt @@ -107,7 +107,9 @@ class ConvertTaskListenerTest { val overrideReporter = object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt index 953521f0..12269d78 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt @@ -48,10 +48,14 @@ class DefaultTaskReporter() : TaskReporter { TaskStore.heartbeat(taskId) } - override fun markConsumed(taskId: UUID) { + override fun markCompleted(taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Completed) } + override fun markFailed(taskId: UUID) { + TaskStore.markConsumed(taskId, TaskStatus.Failed) + } + override fun updateProgress(taskId: UUID, progress: Int) { // Not to be implemented for this application } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt index 5a3eaa14..7458d7b2 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt @@ -38,7 +38,8 @@ class DownloadCoverTaskListenerTest: TestBase() { private val overrideReporter = object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) {} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt index d0316b99..b790a1d7 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt @@ -44,7 +44,8 @@ class MediaStreamReadTaskListenerTest: TestBase() { val overrideReporter = object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt index 04dbb7d4..846c8de4 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt @@ -52,11 +52,16 @@ class DefaultTaskReporter() : TaskReporter { TaskStore.heartbeat(taskId) } - override fun markConsumed(taskId: UUID) { + override fun markCompleted(taskId: UUID) { log.info { "Marking task $taskId as completed" } TaskStore.markConsumed(taskId, TaskStatus.Completed) } + override fun markFailed(taskId: UUID) { + log.info { "Marking task $taskId as failed" } + TaskStore.markConsumed(taskId, TaskStatus.Failed) + } + override fun updateProgress(taskId: UUID, progress: Int) { // Not to be implemented for this application } diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt index a4b27ec9..05fedb0c 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt @@ -38,7 +38,8 @@ class SubtitleTaskListenerTest { val overrideReporter = object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt index 37274057..ce6ffeef 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt @@ -40,7 +40,8 @@ class VideoTaskListenerTest { val overrideReporter = object : TaskReporter { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} - override fun markConsumed(taskId: UUID) {} + override fun markCompleted(taskId: UUID) {} + override fun markFailed(taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 427b30b4..43da9a38 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,5 @@ [versions] -eventi = "1.0-rc30" +eventi = "1.0-rc31" exfl = "1.0-rc1" [libraries] diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt index 62e00648..90dc6c3a 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt @@ -180,6 +180,7 @@ object TaskStore: TaskStore { it[claimed] = true it[claimedBy] = workerId it[lastCheckIn] = UtcNow() + it[status] = TaskStatus.InProgress } }.isSuccess }