Added health status
This commit is contained in:
parent
30b5d6907c
commit
768a162ac0
@ -0,0 +1,19 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.controller
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealth
|
||||
import no.iktdev.mediaprocessing.coordinator.services.CoordinatorHealthService
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
import org.springframework.web.bind.annotation.RequestMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/health")
|
||||
class HealthController(
|
||||
private val healthService: CoordinatorHealthService
|
||||
) {
|
||||
|
||||
@GetMapping
|
||||
fun getHealth(): CoordinatorHealth {
|
||||
return healthService.getHealth()
|
||||
}
|
||||
}
|
||||
@ -1,13 +1,15 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.controller
|
||||
|
||||
|
||||
import no.iktdev.eventi.models.store.PersistedTask
|
||||
import no.iktdev.mediaprocessing.coordinator.services.EventService
|
||||
import no.iktdev.mediaprocessing.coordinator.services.TaskService
|
||||
import no.iktdev.mediaprocessing.coordinator.translateDto.CoordinatorTaskTransferDto
|
||||
import no.iktdev.mediaprocessing.coordinator.translateDto.toCoordinatorTransferDto
|
||||
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.ResetTaskResponse
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.TaskQuery
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.map
|
||||
import org.springframework.http.HttpStatus
|
||||
import org.springframework.http.ResponseEntity
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
@ -24,18 +26,20 @@ class TaskController(
|
||||
) {
|
||||
|
||||
@GetMapping("/active")
|
||||
fun getActiveTasks(): List<PersistedTask> =
|
||||
taskService.getActiveTasks()
|
||||
fun getActiveTasks(): List<CoordinatorTaskTransferDto> =
|
||||
taskService.getActiveTasks().map { it.toCoordinatorTransferDto() }
|
||||
|
||||
@GetMapping
|
||||
fun getPagedTasks(query: TaskQuery): Paginated<PersistedTask> =
|
||||
taskService.getPagedTasks(query)
|
||||
fun getPagedTasks(query: TaskQuery): Paginated<CoordinatorTaskTransferDto> {
|
||||
val paginatedTasks = taskService.getPagedTasks(query)
|
||||
return paginatedTasks.map { it.toCoordinatorTransferDto() }
|
||||
}
|
||||
|
||||
|
||||
|
||||
@GetMapping("/{id}")
|
||||
fun getTask(@PathVariable id: UUID): PersistedTask? =
|
||||
taskService.getTaskById(id)
|
||||
fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? =
|
||||
taskService.getTaskById(id)?.toCoordinatorTransferDto()
|
||||
|
||||
|
||||
@GetMapping("/{taskId}/reset")
|
||||
|
||||
@ -0,0 +1,22 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.dto
|
||||
|
||||
import java.time.Instant
|
||||
|
||||
data class CoordinatorHealth(
|
||||
val status: CoordinatorHealthStatus,
|
||||
val abandonedTasks: Int,
|
||||
val stalledTasks: Int,
|
||||
val activeTasks: Int,
|
||||
val queuedTasks: Int,
|
||||
val lastActivity: Instant?,
|
||||
|
||||
// IDs for UI linking
|
||||
val abandonedTaskIds: List<String>,
|
||||
val stalledTaskIds: List<String>,
|
||||
val overdueSequenceIds: List<String>,
|
||||
|
||||
// Detailed sequence info
|
||||
val overdueSequences: List<SequenceHealth>,
|
||||
|
||||
val details: Map<String, Any?>
|
||||
)
|
||||
@ -0,0 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.dto
|
||||
|
||||
enum class CoordinatorHealthStatus {
|
||||
HEALTHY,
|
||||
DEGRADED,
|
||||
UNHEALTHY
|
||||
}
|
||||
@ -0,0 +1,12 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.dto
|
||||
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
data class SequenceHealth(
|
||||
val referenceId: String,
|
||||
val age: Duration,
|
||||
val expected: Duration,
|
||||
val lastEventAt: Instant,
|
||||
val eventCount: Int
|
||||
)
|
||||
@ -0,0 +1,89 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.services
|
||||
|
||||
import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealth
|
||||
import no.iktdev.mediaprocessing.coordinator.dto.CoordinatorHealthStatus
|
||||
import no.iktdev.mediaprocessing.coordinator.dto.SequenceHealth
|
||||
import no.iktdev.mediaprocessing.shared.common.rules.EventLifecycleRules
|
||||
import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules
|
||||
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
|
||||
import org.springframework.stereotype.Service
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
||||
@Service
|
||||
class CoordinatorHealthService(
|
||||
private val taskService: TaskService,
|
||||
private val eventService: EventService
|
||||
) {
|
||||
|
||||
fun getHealth(): CoordinatorHealth {
|
||||
val tasks = taskService.getActiveTasks()
|
||||
val incompleteSequences = eventService.getIncompleteSequences().groupBy { it.referenceId }.values
|
||||
|
||||
// --- TASK HEALTH ---
|
||||
val abandonedTaskIds = tasks
|
||||
.filter { TaskLifecycleRules.isAbandoned(it.consumed, it.lastCheckIn) }
|
||||
.map { it.taskId }
|
||||
|
||||
val stalledTaskIds = tasks
|
||||
.filter { TaskLifecycleRules.isStalled(it) }
|
||||
.map { it.taskId }
|
||||
|
||||
// --- SEQUENCE HEALTH ---
|
||||
val overdueSequences = incompleteSequences
|
||||
.filter { EventLifecycleRules.isOverdue(it) }
|
||||
.map { seq ->
|
||||
val refId = seq.first().referenceId
|
||||
val first = seq.minOf { it.persistedAt }
|
||||
val last = seq.maxOf { it.persistedAt }
|
||||
val expected = EventLifecycleRules.expectedCompletionTimeWindow(seq)
|
||||
val age = Duration.between(first, Instant.now())
|
||||
|
||||
SequenceHealth(
|
||||
referenceId = refId.toString(),
|
||||
age = age,
|
||||
expected = expected,
|
||||
lastEventAt = last,
|
||||
eventCount = seq.size
|
||||
)
|
||||
}
|
||||
|
||||
val overdueSequenceIds = overdueSequences.map { it.referenceId }
|
||||
|
||||
// --- AGGREGATED STATUS ---
|
||||
val status = when {
|
||||
abandonedTaskIds.isNotEmpty() ||
|
||||
stalledTaskIds.isNotEmpty() ||
|
||||
overdueSequenceIds.isNotEmpty() -> CoordinatorHealthStatus.DEGRADED
|
||||
|
||||
else -> CoordinatorHealthStatus.HEALTHY
|
||||
}
|
||||
|
||||
val eventsLastMinute = eventService.getEventsLast(1)
|
||||
val eventsLastFive = eventService.getEventsLast(5)
|
||||
|
||||
|
||||
return CoordinatorHealth(
|
||||
status = status,
|
||||
abandonedTasks = abandonedTaskIds.size,
|
||||
stalledTasks = stalledTaskIds.size,
|
||||
activeTasks = tasks.count { !it.consumed },
|
||||
queuedTasks = TaskStore.getPendingTasks().size,
|
||||
lastActivity = tasks.maxOfOrNull { it.persistedAt },
|
||||
|
||||
abandonedTaskIds = abandonedTaskIds.map { it.toString() },
|
||||
stalledTaskIds = stalledTaskIds.map { it.toString() },
|
||||
overdueSequenceIds = overdueSequenceIds,
|
||||
overdueSequences = overdueSequences,
|
||||
|
||||
details = mapOf(
|
||||
"oldestActiveTaskAgeMinutes" to tasks.minOfOrNull {
|
||||
Duration.between(it.persistedAt, Instant.now()).toMinutes()
|
||||
},
|
||||
"eventsLastMinute" to eventsLastMinute,
|
||||
"eventsLastFiveMinutes" to eventsLastFive,
|
||||
)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,6 +60,14 @@ class EventService {
|
||||
.effectivePersisted()
|
||||
}
|
||||
|
||||
fun getIncompleteSequences(): List<PersistedEvent> {
|
||||
return EventStore.getIncompletedEventSequence()
|
||||
}
|
||||
|
||||
fun getEventsLast(minutes: Long = 1): Long {
|
||||
return EventStore.eventsLast(minutes)
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,39 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.translateDto
|
||||
|
||||
import no.iktdev.eventi.models.store.PersistedTask
|
||||
import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
|
||||
data class CoordinatorTaskTransferDto(
|
||||
val id: Long,
|
||||
val referenceId: UUID,
|
||||
val status: String,
|
||||
val taskId: UUID,
|
||||
val task: String,
|
||||
val data: String,
|
||||
val claimed: Boolean,
|
||||
val claimedBy: String?,
|
||||
val consumed: Boolean,
|
||||
val lastCheckIn: Instant?,
|
||||
val persistedAt: Instant,
|
||||
val abandoned: Boolean,
|
||||
) {
|
||||
}
|
||||
|
||||
fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto {
|
||||
return CoordinatorTaskTransferDto(
|
||||
id = id,
|
||||
referenceId = referenceId,
|
||||
status = status.name,
|
||||
taskId = taskId,
|
||||
task = task,
|
||||
data = data,
|
||||
claimed = claimed,
|
||||
claimedBy = claimedBy,
|
||||
consumed = consumed,
|
||||
lastCheckIn = lastCheckIn,
|
||||
persistedAt = persistedAt,
|
||||
abandoned = TaskLifecycleRules.isAbandoned(consumed, lastCheckIn)
|
||||
)
|
||||
}
|
||||
@ -0,0 +1,35 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.util
|
||||
|
||||
import java.nio.file.FileSystems
|
||||
import java.nio.file.Files
|
||||
import java.nio.file.Paths.get
|
||||
|
||||
data class DiskInfo(
|
||||
val mount: String,
|
||||
val device: String,
|
||||
val totalBytes: Long,
|
||||
val freeBytes: Long
|
||||
)
|
||||
|
||||
fun getDiskInfoFor(mounts: List<String>): List<DiskInfo> {
|
||||
val fileStores = FileSystems.getDefault().fileStores
|
||||
|
||||
return mounts.mapNotNull { mount ->
|
||||
val path = get(mount)
|
||||
|
||||
val store = fileStores.find { fs ->
|
||||
try {
|
||||
Files.getFileStore(path) == fs
|
||||
} catch (e: Exception) {
|
||||
false
|
||||
}
|
||||
} ?: return@mapNotNull null
|
||||
|
||||
DiskInfo(
|
||||
mount = mount,
|
||||
device = store.name(),
|
||||
totalBytes = store.totalSpace,
|
||||
freeBytes = store.usableSpace
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -8,7 +8,6 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreCo
|
||||
import no.iktdev.mediaprocessing.shared.common.model.ContentExport
|
||||
import no.iktdev.mediaprocessing.shared.common.model.MediaType
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.BeforeEach
|
||||
import org.junit.jupiter.api.DisplayName
|
||||
import org.junit.jupiter.api.Test
|
||||
@ -116,7 +115,7 @@ class StoreContentAndMetadataTaskListenerTest {
|
||||
|
||||
assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java)
|
||||
val result = event as StoreContentAndMetadataTaskResultEvent
|
||||
assertThat(result.taskStatus).isEqualTo(TaskStatus.Completed)
|
||||
assertThat(result.status).isEqualTo(TaskStatus.Completed)
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -144,7 +143,7 @@ class StoreContentAndMetadataTaskListenerTest {
|
||||
|
||||
assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java)
|
||||
val result = event as StoreContentAndMetadataTaskResultEvent
|
||||
assertThat(result.taskStatus).isEqualTo(TaskStatus.Failed)
|
||||
assertThat(result.status).isEqualTo(TaskStatus.Failed)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -15,6 +15,7 @@ import java.net.InetAddress
|
||||
import java.security.MessageDigest
|
||||
import java.time.Instant
|
||||
import java.util.zip.CRC32
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
@ -249,3 +250,8 @@ fun List<PersistedEvent>.effectivePersisted(): List<PersistedEvent> {
|
||||
.sortedBy { it.persistedAt }
|
||||
}
|
||||
|
||||
fun <T : Any> KClass<T>.getName(): String =
|
||||
this.simpleName ?: this.java.simpleName
|
||||
|
||||
|
||||
|
||||
|
||||
@ -6,3 +6,12 @@ data class Paginated<T>(
|
||||
val size: Int,
|
||||
val total: Long
|
||||
)
|
||||
|
||||
fun <T, R> Paginated<T>.map(transform: (T) -> R): Paginated<R> {
|
||||
return Paginated(
|
||||
items = items.map(transform),
|
||||
page = page,
|
||||
size = size,
|
||||
total = total
|
||||
)
|
||||
}
|
||||
|
||||
@ -39,17 +39,12 @@ object EventRegistry {
|
||||
MigrateContentToStoreTaskCreatedEvent::class.java,
|
||||
MigrateContentToStoreTaskResultEvent::class.java,
|
||||
|
||||
ProcesserEncodePerformedEvent::class.java,
|
||||
ProcesserEncodeResultEvent::class.java,
|
||||
ProcesserEncodeTaskCreatedEvent::class.java,
|
||||
|
||||
ProcesserExtractPerformedEvent::class.java,
|
||||
ProcesserExtractResultEvent::class.java,
|
||||
ProcesserExtractTaskCreatedEvent::class.java,
|
||||
|
||||
ProcesserEncodeTaskCreatedEvent::class.java,
|
||||
ProcesserEncodeResultEvent::class.java,
|
||||
|
||||
StartProcessingEvent::class.java,
|
||||
|
||||
StoreContentAndMetadataTaskCreatedEvent::class.java,
|
||||
|
||||
@ -0,0 +1,12 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
|
||||
/**
|
||||
* Base class, should not be serialized into
|
||||
*/
|
||||
abstract class TaskResultEvent(
|
||||
open val status: TaskStatus,
|
||||
open val error: String? = null
|
||||
) : Event()
|
||||
@ -1,12 +1,12 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class ConvertTaskResultEvent(
|
||||
val data: ConvertedData?,
|
||||
val status: TaskStatus,
|
||||
): Event() {
|
||||
override val status: TaskStatus,
|
||||
): TaskResultEvent(status = status) {
|
||||
data class ConvertedData(
|
||||
val language: String,
|
||||
val baseName: String,
|
||||
|
||||
@ -1,11 +1,11 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import com.google.gson.JsonObject
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class CoordinatorReadStreamsResultEvent(
|
||||
val data: JsonObject? = null,
|
||||
val status: TaskStatus
|
||||
): Event() {
|
||||
}
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error)
|
||||
|
||||
@ -1,12 +1,13 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class CoverDownloadResultEvent(
|
||||
val data: CoverDownloadedData? = null,
|
||||
val status: TaskStatus
|
||||
): Event() {
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error){
|
||||
data class CoverDownloadedData(
|
||||
val source: String,
|
||||
val outputFile: String
|
||||
|
||||
@ -1,16 +1,17 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.Metadata
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.model.MediaType
|
||||
import java.util.*
|
||||
|
||||
data class MetadataSearchResultEvent(
|
||||
val results: List<SearchResult> = emptyList(),
|
||||
val recommended: SearchResult? = null,
|
||||
val status: TaskStatus
|
||||
): Event() {
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error) {
|
||||
data class SearchResult(
|
||||
val simpleScore: Int,
|
||||
val prefixScore: Int,
|
||||
|
||||
@ -1,16 +1,17 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
|
||||
|
||||
data class MigrateContentToStoreTaskResultEvent(
|
||||
val status: TaskStatus,
|
||||
val collection: String,
|
||||
val videoMigrate: FileMigration,
|
||||
val subtitleMigrate: List<SubtitleMigration>,
|
||||
val coverMigrate: List<FileMigration>
|
||||
) : Event() {
|
||||
val coverMigrate: List<FileMigration>,
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error) {
|
||||
data class FileMigration(
|
||||
val storedUri: String?,
|
||||
val status: MigrateStatus
|
||||
|
||||
@ -1,6 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
|
||||
class ProcesserEncodePerformedEvent: Event() {
|
||||
}
|
||||
@ -1,12 +1,13 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class ProcesserEncodeResultEvent(
|
||||
val data: EncodeResult? = null,
|
||||
val status: TaskStatus,
|
||||
): Event() {
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error) {
|
||||
data class EncodeResult(
|
||||
val cachedOutputFile: String? = null
|
||||
)
|
||||
|
||||
@ -1,6 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
|
||||
class ProcesserExtractPerformedEvent: Event() {
|
||||
}
|
||||
@ -1,12 +1,13 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class ProcesserExtractResultEvent(
|
||||
val status: TaskStatus,
|
||||
val data: ExtractResult? = null
|
||||
): Event() {
|
||||
val data: ExtractResult? = null,
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error) {
|
||||
data class ExtractResult(
|
||||
val language: String,
|
||||
val cachedOutputFile: String
|
||||
|
||||
@ -1,9 +1,10 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.event_task_contract.events
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.TaskStatus
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
|
||||
|
||||
data class StoreContentAndMetadataTaskResultEvent(
|
||||
val taskStatus: TaskStatus,
|
||||
) : Event() {
|
||||
override val status: TaskStatus,
|
||||
override val error: String? = null
|
||||
) : TaskResultEvent(status, error){
|
||||
}
|
||||
@ -101,7 +101,7 @@ class TaskProjection(val events: List<Event>) {
|
||||
fun projectStoreContentAndMetadataStatus(): TaskStatus {
|
||||
return projectStatus<StoreContentAndMetadataTaskCreatedEvent, StoreContentAndMetadataTaskResultEvent>(
|
||||
createdIds = { it.map { e -> e.taskId }},
|
||||
resultStatus = {it.taskStatus},
|
||||
resultStatus = {it.status},
|
||||
resultIds = { it.flatMap { e -> e.metadata.derivedFromId?.toList() ?: emptyList() } }
|
||||
)
|
||||
}
|
||||
|
||||
@ -0,0 +1,38 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.rules
|
||||
|
||||
import no.iktdev.eventi.models.Event
|
||||
import no.iktdev.eventi.models.store.PersistedEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
|
||||
import no.iktdev.mediaprocessing.shared.common.getName
|
||||
import java.time.Duration
|
||||
|
||||
object EventLifecycleRules {
|
||||
|
||||
fun isOverdue(events: List<PersistedEvent>): Boolean {
|
||||
if (events.isEmpty()) return false
|
||||
|
||||
val firstEventTime = events.minOf { it.persistedAt }
|
||||
val expectedWindow = expectedCompletionTimeWindow(events)
|
||||
|
||||
val deadline = firstEventTime.plus(expectedWindow)
|
||||
|
||||
return UtcNow().isAfter(deadline)
|
||||
}
|
||||
|
||||
fun expectedCompletionTimeWindow(events: List<PersistedEvent>): Duration =
|
||||
events.fold(Duration.ZERO) { acc, pe ->
|
||||
acc +
|
||||
pe.match<CoordinatorReadStreamsTaskCreatedEvent>(Duration.ofMinutes(5)) +
|
||||
pe.match<ConvertTaskCreatedEvent>(Duration.ofMinutes(5)) +
|
||||
pe.match<CoverDownloadTaskCreatedEvent>(Duration.ofMinutes(5)) +
|
||||
pe.match<MetadataSearchTaskCreatedEvent>(Duration.ofMinutes(10)) +
|
||||
pe.match<MigrateContentToStoreTaskCreatedEvent>(Duration.ofMinutes(5)) +
|
||||
pe.match<ProcesserEncodeTaskCreatedEvent>(Duration.ofHours(8)) +
|
||||
pe.match<ProcesserExtractTaskCreatedEvent>(Duration.ofMinutes(15)) +
|
||||
pe.match<StoreContentAndMetadataTaskCreatedEvent>(Duration.ofMinutes(5))
|
||||
}
|
||||
|
||||
inline fun <reified T : Event> PersistedEvent.match(duration: Duration): Duration =
|
||||
if (this.event == T::class.getName()) duration else Duration.ZERO
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.rules
|
||||
|
||||
import no.iktdev.eventi.models.store.PersistedTask
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.*
|
||||
import no.iktdev.mediaprocessing.shared.common.getName
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
|
||||
object TaskLifecycleRules {
|
||||
const val abandonedAfterMinutes = 15L
|
||||
|
||||
fun isAbandoned(
|
||||
consumed: Boolean,
|
||||
lastCheckIn: Instant?
|
||||
): Boolean {
|
||||
if (consumed) return false
|
||||
|
||||
val cutoff = Instant.now().minus(abandonedAfterMinutes, ChronoUnit.MINUTES)
|
||||
return lastCheckIn == null || lastCheckIn.isBefore(cutoff)
|
||||
}
|
||||
|
||||
fun isStalled(task: PersistedTask): Boolean {
|
||||
if (task.consumed) return false
|
||||
|
||||
val cutoff = stalledCutoffFor(task.task)
|
||||
return task.lastCheckIn?.isBefore(cutoff) ?: false
|
||||
}
|
||||
|
||||
|
||||
private fun stalledCutoffFor(taskName: String): Instant {
|
||||
return when (taskName) {
|
||||
MediaReadTask::class.getName() -> Instant.now().minus(5, ChronoUnit.MINUTES)
|
||||
EncodeTask::class.getName() -> Instant.now().minus(6, ChronoUnit.HOURS)
|
||||
ExtractSubtitleTask::class.getName() -> Instant.now().minus(15, ChronoUnit.MINUTES)
|
||||
ConvertTask::class.getName() -> Instant.now().minus(6, ChronoUnit.MINUTES)
|
||||
MetadataSearchTask::class.getName() -> Instant.now().minus(10, ChronoUnit.MINUTES)
|
||||
MigrateToContentStoreTask::class.getName() -> Instant.now().minus(30, ChronoUnit.MINUTES)
|
||||
StoreContentAndMetadataTask::class.getName() -> Instant.now().minus(5, ChronoUnit.MINUTES)
|
||||
else -> Instant.now().minus(30, ChronoUnit.MINUTES)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -9,14 +9,17 @@ import no.iktdev.eventi.stores.EventStore
|
||||
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
|
||||
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CompletedEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.DeletedTaskResultEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ForcedTaskResetAuditEvent
|
||||
import no.iktdev.mediaprocessing.shared.common.getName
|
||||
import no.iktdev.mediaprocessing.shared.database.likeAny
|
||||
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
||||
import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
|
||||
|
||||
@ -128,4 +131,25 @@ object EventStore: EventStore {
|
||||
persist(auditEvent)
|
||||
return auditEvent.eventId
|
||||
}
|
||||
|
||||
fun eventsLast(minutes: Long = 1): Long {
|
||||
val cutoff = Instant.now().minus(minutes, ChronoUnit.MINUTES)
|
||||
return withTransaction {
|
||||
EventsTable.select(EventsTable.eventId).where {
|
||||
EventsTable.persistedAt greater cutoff
|
||||
}.count()
|
||||
}.getOrDefault(-1)
|
||||
}
|
||||
|
||||
fun getIncompletedEventSequence(): List<PersistedEvent> {
|
||||
return withTransaction {
|
||||
val completedReferences = EventsTable.select(EventsTable.referenceId)
|
||||
.where { EventsTable.event eq CompletedEvent::class.getName() }
|
||||
.map { it[EventsTable.referenceId] }
|
||||
EventsTable.getWhere {
|
||||
EventsTable.referenceId notInList completedReferences
|
||||
}
|
||||
}.getOrDefault(emptyList())
|
||||
}
|
||||
|
||||
}
|
||||
@ -12,11 +12,10 @@ import no.iktdev.mediaprocessing.shared.database.likeAny
|
||||
import no.iktdev.mediaprocessing.shared.database.queries.pagedQuery
|
||||
import no.iktdev.mediaprocessing.shared.database.tables.TasksTable
|
||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||
import org.jetbrains.exposed.sql.and
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
import org.jetbrains.exposed.sql.selectAll
|
||||
import org.jetbrains.exposed.sql.update
|
||||
import org.jetbrains.exposed.sql.*
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
import java.time.temporal.ChronoUnit
|
||||
import java.util.*
|
||||
|
||||
object TaskStore: TaskStore {
|
||||
@ -202,8 +201,22 @@ object TaskStore: TaskStore {
|
||||
return withTransaction {
|
||||
TasksTable.getWhere {
|
||||
(TasksTable.consumed eq false) and
|
||||
(TasksTable.claimed eq false)
|
||||
(TasksTable.claimed eq false) and
|
||||
(TasksTable.status eq TaskStatus.Pending)
|
||||
}
|
||||
}.getOrDefault(emptyList())
|
||||
}
|
||||
|
||||
fun findAbandonedTasks(): List<PersistedTask> {
|
||||
val cutoff = Instant.now().minus(15, ChronoUnit.MINUTES)
|
||||
return withTransaction {
|
||||
TasksTable.getWhere {
|
||||
(TasksTable.lastCheckIn less cutoff or TasksTable.lastCheckIn.isNull()) and
|
||||
(TasksTable.consumed eq false) and
|
||||
(TasksTable.claimed eq true)
|
||||
}
|
||||
}.getOrDefault(emptyList())
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -38,4 +38,6 @@ object EventsTable: IntIdTable(name = "EVENTS") {
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user