diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/HealthController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/HealthController.kt new file mode 100644 index 00000000..bf9d71f4 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/HealthController.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealth +import no.iktdev.mediaprocessing.coordinator.services.CoordinatorHealthService +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/health") +class HealthController( + private val healthService: CoordinatorHealthService +) { + + @GetMapping + fun getHealth(): CoordinatorHealth { + return healthService.getHealth() + } +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt index 475b48de..7b42fb68 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/TaskController.kt @@ -1,13 +1,15 @@ package no.iktdev.mediaprocessing.coordinator.controller -import no.iktdev.eventi.models.store.PersistedTask import no.iktdev.mediaprocessing.coordinator.services.EventService import no.iktdev.mediaprocessing.coordinator.services.TaskService +import no.iktdev.mediaprocessing.coordinator.translateDto.CoordinatorTaskTransferDto +import no.iktdev.mediaprocessing.coordinator.translateDto.toCoordinatorTransferDto import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.Paginated import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery +import no.iktdev.mediaprocessing.shared.common.dto.map import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity import org.springframework.web.bind.annotation.GetMapping @@ -24,18 +26,20 @@ class TaskController( ) { @GetMapping("/active") - fun getActiveTasks(): List = - taskService.getActiveTasks() + fun getActiveTasks(): List = + taskService.getActiveTasks().map { it.toCoordinatorTransferDto() } @GetMapping - fun getPagedTasks(query: TaskQuery): Paginated = - taskService.getPagedTasks(query) + fun getPagedTasks(query: TaskQuery): Paginated { + val paginatedTasks = taskService.getPagedTasks(query) + return paginatedTasks.map { it.toCoordinatorTransferDto() } + } @GetMapping("/{id}") - fun getTask(@PathVariable id: UUID): PersistedTask? = - taskService.getTaskById(id) + fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? = + taskService.getTaskById(id)?.toCoordinatorTransferDto() @GetMapping("/{taskId}/reset") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealth.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealth.kt new file mode 100644 index 00000000..41a9473c --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealth.kt @@ -0,0 +1,22 @@ +package no.iktdev.mediaprocessing.coordinator.dto + +import java.time.Instant + +data class CoordinatorHealth( + val status: CoordinatorHealthStatus, + val abandonedTasks: Int, + val stalledTasks: Int, + val activeTasks: Int, + val queuedTasks: Int, + val lastActivity: Instant?, + + // IDs for UI linking + val abandonedTaskIds: List, + val stalledTaskIds: List, + val overdueSequenceIds: List, + + // Detailed sequence info + val overdueSequences: List, + + val details: Map +) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealthStatus.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealthStatus.kt new file mode 100644 index 00000000..9291b43f --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/CoordinatorHealthStatus.kt @@ -0,0 +1,7 @@ +package no.iktdev.mediaprocessing.coordinator.dto + +enum class CoordinatorHealthStatus { + HEALTHY, + DEGRADED, + UNHEALTHY +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/SequenceHealth.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/SequenceHealth.kt new file mode 100644 index 00000000..33ecc908 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/dto/SequenceHealth.kt @@ -0,0 +1,12 @@ +package no.iktdev.mediaprocessing.coordinator.dto + +import java.time.Duration +import java.time.Instant + +data class SequenceHealth( + val referenceId: String, + val age: Duration, + val expected: Duration, + val lastEventAt: Instant, + val eventCount: Int +) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt new file mode 100644 index 00000000..2bc67c2a --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CoordinatorHealthService.kt @@ -0,0 +1,89 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealth +import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealthStatus +import no.iktdev.mediaprocessing.coordinator.dto.SequenceHealth +import no.iktdev.mediaprocessing.shared.common.rules.EventLifecycleRules +import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules +import no.iktdev.mediaprocessing.shared.database.stores.TaskStore +import org.springframework.stereotype.Service +import java.time.Duration +import java.time.Instant + +@Service +class CoordinatorHealthService( + private val taskService: TaskService, + private val eventService: EventService +) { + + fun getHealth(): CoordinatorHealth { + val tasks = taskService.getActiveTasks() + val incompleteSequences = eventService.getIncompleteSequences().groupBy { it.referenceId }.values + + // --- TASK HEALTH --- + val abandonedTaskIds = tasks + .filter { TaskLifecycleRules.isAbandoned(it.consumed, it.lastCheckIn) } + .map { it.taskId } + + val stalledTaskIds = tasks + .filter { TaskLifecycleRules.isStalled(it) } + .map { it.taskId } + + // --- SEQUENCE HEALTH --- + val overdueSequences = incompleteSequences + .filter { EventLifecycleRules.isOverdue(it) } + .map { seq -> + val refId = seq.first().referenceId + val first = seq.minOf { it.persistedAt } + val last = seq.maxOf { it.persistedAt } + val expected = EventLifecycleRules.expectedCompletionTimeWindow(seq) + val age = Duration.between(first, Instant.now()) + + SequenceHealth( + referenceId = refId.toString(), + age = age, + expected = expected, + lastEventAt = last, + eventCount = seq.size + ) + } + + val overdueSequenceIds = overdueSequences.map { it.referenceId } + + // --- AGGREGATED STATUS --- + val status = when { + abandonedTaskIds.isNotEmpty() || + stalledTaskIds.isNotEmpty() || + overdueSequenceIds.isNotEmpty() -> CoordinatorHealthStatus.DEGRADED + + else -> CoordinatorHealthStatus.HEALTHY + } + + val eventsLastMinute = eventService.getEventsLast(1) + val eventsLastFive = eventService.getEventsLast(5) + + + return CoordinatorHealth( + status = status, + abandonedTasks = abandonedTaskIds.size, + stalledTasks = stalledTaskIds.size, + activeTasks = tasks.count { !it.consumed }, + queuedTasks = TaskStore.getPendingTasks().size, + lastActivity = tasks.maxOfOrNull { it.persistedAt }, + + abandonedTaskIds = abandonedTaskIds.map { it.toString() }, + stalledTaskIds = stalledTaskIds.map { it.toString() }, + overdueSequenceIds = overdueSequenceIds, + overdueSequences = overdueSequences, + + details = mapOf( + "oldestActiveTaskAgeMinutes" to tasks.minOfOrNull { + Duration.between(it.persistedAt, Instant.now()).toMinutes() + }, + "eventsLastMinute" to eventsLastMinute, + "eventsLastFiveMinutes" to eventsLastFive, + ) + ) + } +} + diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt index 0563e686..d5ea8939 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventService.kt @@ -60,6 +60,14 @@ class EventService { .effectivePersisted() } + fun getIncompleteSequences(): List { + return EventStore.getIncompletedEventSequence() + } + + fun getEventsLast(minutes: Long = 1): Long { + return EventStore.eventsLast(minutes) + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/translateDto/CoordinatorTaskTransferDto.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/translateDto/CoordinatorTaskTransferDto.kt new file mode 100644 index 00000000..bdf3d4ce --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/translateDto/CoordinatorTaskTransferDto.kt @@ -0,0 +1,39 @@ +package no.iktdev.mediaprocessing.coordinator.translateDto + +import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules +import java.time.Instant +import java.util.* + +data class CoordinatorTaskTransferDto( + val id: Long, + val referenceId: UUID, + val status: String, + val taskId: UUID, + val task: String, + val data: String, + val claimed: Boolean, + val claimedBy: String?, + val consumed: Boolean, + val lastCheckIn: Instant?, + val persistedAt: Instant, + val abandoned: Boolean, +) { +} + +fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto { + return CoordinatorTaskTransferDto( + id = id, + referenceId = referenceId, + status = status.name, + taskId = taskId, + task = task, + data = data, + claimed = claimed, + claimedBy = claimedBy, + consumed = consumed, + lastCheckIn = lastCheckIn, + persistedAt = persistedAt, + abandoned = TaskLifecycleRules.isAbandoned(consumed, lastCheckIn) + ) +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/DiskInfo.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/DiskInfo.kt new file mode 100644 index 00000000..c44136db --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/DiskInfo.kt @@ -0,0 +1,35 @@ +package no.iktdev.mediaprocessing.coordinator.util + +import java.nio.file.FileSystems +import java.nio.file.Files +import java.nio.file.Paths.get + +data class DiskInfo( + val mount: String, + val device: String, + val totalBytes: Long, + val freeBytes: Long +) + +fun getDiskInfoFor(mounts: List): List { + val fileStores = FileSystems.getDefault().fileStores + + return mounts.mapNotNull { mount -> + val path = get(mount) + + val store = fileStores.find { fs -> + try { + Files.getFileStore(path) == fs + } catch (e: Exception) { + false + } + } ?: return@mapNotNull null + + DiskInfo( + mount = mount, + device = store.name(), + totalBytes = store.totalSpace, + freeBytes = store.usableSpace + ) + } +} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt index efda1b4f..e9a3ff62 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt @@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreCo import no.iktdev.mediaprocessing.shared.common.model.ContentExport import no.iktdev.mediaprocessing.shared.common.model.MediaType import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test @@ -116,7 +115,7 @@ class StoreContentAndMetadataTaskListenerTest { assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java) val result = event as StoreContentAndMetadataTaskResultEvent - assertThat(result.taskStatus).isEqualTo(TaskStatus.Completed) + assertThat(result.status).isEqualTo(TaskStatus.Completed) } @Test @@ -144,7 +143,7 @@ class StoreContentAndMetadataTaskListenerTest { assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java) val result = event as StoreContentAndMetadataTaskResultEvent - assertThat(result.taskStatus).isEqualTo(TaskStatus.Failed) + assertThat(result.status).isEqualTo(TaskStatus.Failed) } @Test diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 300ef74f..c0d82236 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -15,6 +15,7 @@ import java.net.InetAddress import java.security.MessageDigest import java.time.Instant import java.util.zip.CRC32 +import kotlin.reflect.KClass private val logger = KotlinLogging.logger {} @@ -249,3 +250,8 @@ fun List.effectivePersisted(): List { .sortedBy { it.persistedAt } } +fun KClass.getName(): String = + this.simpleName ?: this.java.simpleName + + + diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt index 3b31acd3..7fed8753 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/Paginated.kt @@ -5,4 +5,13 @@ data class Paginated( val page: Int, val size: Int, val total: Long -) \ No newline at end of file +) + +fun Paginated.map(transform: (T) -> R): Paginated { + return Paginated( + items = items.map(transform), + page = page, + size = size, + total = total + ) +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt index 6dc5e5c1..a82016d1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/EventRegistry.kt @@ -39,17 +39,12 @@ object EventRegistry { MigrateContentToStoreTaskCreatedEvent::class.java, MigrateContentToStoreTaskResultEvent::class.java, - ProcesserEncodePerformedEvent::class.java, ProcesserEncodeResultEvent::class.java, ProcesserEncodeTaskCreatedEvent::class.java, - ProcesserExtractPerformedEvent::class.java, ProcesserExtractResultEvent::class.java, ProcesserExtractTaskCreatedEvent::class.java, - ProcesserEncodeTaskCreatedEvent::class.java, - ProcesserEncodeResultEvent::class.java, - StartProcessingEvent::class.java, StoreContentAndMetadataTaskCreatedEvent::class.java, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt new file mode 100644 index 00000000..bea868f3 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/TaskResultEvent.kt @@ -0,0 +1,12 @@ +package no.iktdev.mediaprocessing.shared.common.event_task_contract + +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.TaskStatus + +/** + * Base class, should not be serialized into + */ +abstract class TaskResultEvent( + open val status: TaskStatus, + open val error: String? = null +) : Event() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ConvertTaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ConvertTaskResultEvent.kt index 9ebcdb8a..0cde1827 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ConvertTaskResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ConvertTaskResultEvent.kt @@ -1,12 +1,12 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class ConvertTaskResultEvent( val data: ConvertedData?, - val status: TaskStatus, -): Event() { + override val status: TaskStatus, +): TaskResultEvent(status = status) { data class ConvertedData( val language: String, val baseName: String, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoordinatorReadStreamsResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoordinatorReadStreamsResultEvent.kt index 124c1b58..939406b4 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoordinatorReadStreamsResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoordinatorReadStreamsResultEvent.kt @@ -1,11 +1,11 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events import com.google.gson.JsonObject -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class CoordinatorReadStreamsResultEvent( val data: JsonObject? = null, - val status: TaskStatus -): Event() { -} \ No newline at end of file + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoverDownloadResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoverDownloadResultEvent.kt index 144147d3..f0203ca4 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoverDownloadResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/CoverDownloadResultEvent.kt @@ -1,12 +1,13 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class CoverDownloadResultEvent( val data: CoverDownloadedData? = null, - val status: TaskStatus -): Event() { + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error){ data class CoverDownloadedData( val source: String, val outputFile: String diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MetadataSearchResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MetadataSearchResultEvent.kt index 60a2c6e7..bdacf9e2 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MetadataSearchResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MetadataSearchResultEvent.kt @@ -1,16 +1,17 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Metadata import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent import no.iktdev.mediaprocessing.shared.common.model.MediaType import java.util.* data class MetadataSearchResultEvent( val results: List = emptyList(), val recommended: SearchResult? = null, - val status: TaskStatus -): Event() { + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error) { data class SearchResult( val simpleScore: Int, val prefixScore: Int, diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt index 35daac2d..b876bc53 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt @@ -1,16 +1,17 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus data class MigrateContentToStoreTaskResultEvent( - val status: TaskStatus, val collection: String, val videoMigrate: FileMigration, val subtitleMigrate: List, - val coverMigrate: List -) : Event() { + val coverMigrate: List, + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error) { data class FileMigration( val storedUri: String?, val status: MigrateStatus diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodePerformedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodePerformedEvent.kt deleted file mode 100644 index 3de1e776..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodePerformedEvent.kt +++ /dev/null @@ -1,6 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.event_task_contract.events - -import no.iktdev.eventi.models.Event - -class ProcesserEncodePerformedEvent: Event() { -} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt index 792d4bb2..b3be6ab8 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserEncodeResultEvent.kt @@ -1,12 +1,13 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class ProcesserEncodeResultEvent( val data: EncodeResult? = null, - val status: TaskStatus, -): Event() { + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error) { data class EncodeResult( val cachedOutputFile: String? = null ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractPerformedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractPerformedEvent.kt deleted file mode 100644 index 3cc871b3..00000000 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractPerformedEvent.kt +++ /dev/null @@ -1,6 +0,0 @@ -package no.iktdev.mediaprocessing.shared.common.event_task_contract.events - -import no.iktdev.eventi.models.Event - -class ProcesserExtractPerformedEvent: Event() { -} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt index d6eb1528..60106ea1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/ProcesserExtractResultEvent.kt @@ -1,12 +1,13 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class ProcesserExtractResultEvent( - val status: TaskStatus, - val data: ExtractResult? = null -): Event() { + val data: ExtractResult? = null, + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error) { data class ExtractResult( val language: String, val cachedOutputFile: String diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StoreContentAndMetadataTaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StoreContentAndMetadataTaskResultEvent.kt index c3de0f27..42cfe188 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StoreContentAndMetadataTaskResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/StoreContentAndMetadataTaskResultEvent.kt @@ -1,9 +1,10 @@ package no.iktdev.mediaprocessing.shared.common.event_task_contract.events -import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent data class StoreContentAndMetadataTaskResultEvent( - val taskStatus: TaskStatus, -) : Event() { + override val status: TaskStatus, + override val error: String? = null +) : TaskResultEvent(status, error){ } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt index 11a55d2b..43689ff2 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/TaskProjection.kt @@ -101,7 +101,7 @@ class TaskProjection(val events: List) { fun projectStoreContentAndMetadataStatus(): TaskStatus { return projectStatus( createdIds = { it.map { e -> e.taskId }}, - resultStatus = {it.taskStatus}, + resultStatus = {it.status}, resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } } ) } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/EventLifecycleRules.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/EventLifecycleRules.kt new file mode 100644 index 00000000..84c2514f --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/EventLifecycleRules.kt @@ -0,0 +1,38 @@ +package no.iktdev.mediaprocessing.shared.common.rules + +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.mediaprocessing.shared.common.UtcNow +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* +import no.iktdev.mediaprocessing.shared.common.getName +import java.time.Duration + +object EventLifecycleRules { + + fun isOverdue(events: List): Boolean { + if (events.isEmpty()) return false + + val firstEventTime = events.minOf { it.persistedAt } + val expectedWindow = expectedCompletionTimeWindow(events) + + val deadline = firstEventTime.plus(expectedWindow) + + return UtcNow().isAfter(deadline) + } + + fun expectedCompletionTimeWindow(events: List): Duration = + events.fold(Duration.ZERO) { acc, pe -> + acc + + pe.match(Duration.ofMinutes(5)) + + pe.match(Duration.ofMinutes(5)) + + pe.match(Duration.ofMinutes(5)) + + pe.match(Duration.ofMinutes(10)) + + pe.match(Duration.ofMinutes(5)) + + pe.match(Duration.ofHours(8)) + + pe.match(Duration.ofMinutes(15)) + + pe.match(Duration.ofMinutes(5)) + } + + inline fun PersistedEvent.match(duration: Duration): Duration = + if (this.event == T::class.getName()) duration else Duration.ZERO +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt new file mode 100644 index 00000000..15180b54 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/rules/TaskLifecycleRules.kt @@ -0,0 +1,43 @@ +package no.iktdev.mediaprocessing.shared.common.rules + +import no.iktdev.eventi.models.store.PersistedTask +import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.* +import no.iktdev.mediaprocessing.shared.common.getName +import java.time.Instant +import java.time.temporal.ChronoUnit + +object TaskLifecycleRules { + const val abandonedAfterMinutes = 15L + + fun isAbandoned( + consumed: Boolean, + lastCheckIn: Instant? + ): Boolean { + if (consumed) return false + + val cutoff = Instant.now().minus(abandonedAfterMinutes, ChronoUnit.MINUTES) + return lastCheckIn == null || lastCheckIn.isBefore(cutoff) + } + + fun isStalled(task: PersistedTask): Boolean { + if (task.consumed) return false + + val cutoff = stalledCutoffFor(task.task) + return task.lastCheckIn?.isBefore(cutoff) ?: false + } + + + private fun stalledCutoffFor(taskName: String): Instant { + return when (taskName) { + MediaReadTask::class.getName() -> Instant.now().minus(5, ChronoUnit.MINUTES) + EncodeTask::class.getName() -> Instant.now().minus(6, ChronoUnit.HOURS) + ExtractSubtitleTask::class.getName() -> Instant.now().minus(15, ChronoUnit.MINUTES) + ConvertTask::class.getName() -> Instant.now().minus(6, ChronoUnit.MINUTES) + MetadataSearchTask::class.getName() -> Instant.now().minus(10, ChronoUnit.MINUTES) + MigrateToContentStoreTask::class.getName() -> Instant.now().minus(30, ChronoUnit.MINUTES) + StoreContentAndMetadataTask::class.getName() -> Instant.now().minus(5, ChronoUnit.MINUTES) + else -> Instant.now().minus(30, ChronoUnit.MINUTES) + } + } + +} diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt index 5c2fda97..9798dead 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/EventStore.kt @@ -9,14 +9,17 @@ import no.iktdev.eventi.stores.EventStore import no.iktdev.mediaprocessing.shared.common.UtcNow import no.iktdev.mediaprocessing.shared.common.dto.EventQuery import no.iktdev.mediaprocessing.shared.common.dto.Paginated +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent +import no.iktdev.mediaprocessing.shared.common.getName import no.iktdev.mediaprocessing.shared.database.likeAny import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.EventsTable import no.iktdev.mediaprocessing.shared.database.withTransaction import org.jetbrains.exposed.sql.insert import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.* @@ -128,4 +131,25 @@ object EventStore: EventStore { persist(auditEvent) return auditEvent.eventId } + + fun eventsLast(minutes: Long = 1): Long { + val cutoff = Instant.now().minus(minutes, ChronoUnit.MINUTES) + return withTransaction { + EventsTable.select(EventsTable.eventId).where { + EventsTable.persistedAt greater cutoff + }.count() + }.getOrDefault(-1) + } + + fun getIncompletedEventSequence(): List { + return withTransaction { + val completedReferences = EventsTable.select(EventsTable.referenceId) + .where { EventsTable.event eq CompletedEvent::class.getName() } + .map { it[EventsTable.referenceId] } + EventsTable.getWhere { + EventsTable.referenceId notInList completedReferences + } + }.getOrDefault(emptyList()) + } + } \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt index 67c84fb5..b844c5fe 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/stores/TaskStore.kt @@ -12,11 +12,10 @@ import no.iktdev.mediaprocessing.shared.database.likeAny import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery import no.iktdev.mediaprocessing.shared.database.tables.TasksTable import no.iktdev.mediaprocessing.shared.database.withTransaction -import org.jetbrains.exposed.sql.and -import org.jetbrains.exposed.sql.insert -import org.jetbrains.exposed.sql.selectAll -import org.jetbrains.exposed.sql.update +import org.jetbrains.exposed.sql.* import java.time.Duration +import java.time.Instant +import java.time.temporal.ChronoUnit import java.util.* object TaskStore: TaskStore { @@ -202,8 +201,22 @@ object TaskStore: TaskStore { return withTransaction { TasksTable.getWhere { (TasksTable.consumed eq false) and - (TasksTable.claimed eq false) + (TasksTable.claimed eq false) and + (TasksTable.status eq TaskStatus.Pending) } }.getOrDefault(emptyList()) } + + fun findAbandonedTasks(): List { + val cutoff = Instant.now().minus(15, ChronoUnit.MINUTES) + return withTransaction { + TasksTable.getWhere { + (TasksTable.lastCheckIn less cutoff or TasksTable.lastCheckIn.isNull()) and + (TasksTable.consumed eq false) and + (TasksTable.claimed eq true) + } + }.getOrDefault(emptyList()) + } + + } \ No newline at end of file diff --git a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt index cedb26e8..e3e98418 100644 --- a/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt +++ b/shared/database/src/main/kotlin/no/iktdev/mediaprocessing/shared/database/tables/EventsTable.kt @@ -38,4 +38,6 @@ object EventsTable: IntIdTable(name = "EVENTS") { ) } } + + } \ No newline at end of file