Adjusted Collection

This commit is contained in:
Brage Skjønborg 2026-02-01 05:26:29 +01:00
parent 22627c387a
commit a13b949c9b
19 changed files with 229 additions and 77 deletions

View File

@ -6,17 +6,17 @@ import reactor.core.publisher.Mono
@Component
class ProcesserClient(
private val webClient: WebClient
private val processerWebClient: WebClient
) {
fun fetchLog(path: String): Mono<String> =
webClient.get()
processerWebClient.get()
.uri { it.path("/state/log").queryParam("path", path).build() }
.retrieve()
.bodyToMono(String::class.java)
fun ping(): Mono<String> =
webClient.get()
processerWebClient.get()
.uri("/actuator/health")
.retrieve()
.bodyToMono(String::class.java)

View File

@ -5,21 +5,8 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class ProcesserWebClientConfig {
@Bean
fun processerWebClient(
builder: WebClient.Builder,
props: ProcesserClientProperties
): WebClient =
builder
.baseUrl(props.baseUrl)
.build()
}
@ConfigurationProperties(prefix = "processer")
data class ProcesserClientProperties(
val baseUrl: String
)

View File

@ -0,0 +1,21 @@
package no.iktdev.mediaprocessing.coordinator.config
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class WebClients(
private val processerClientProperties: ProcesserClientProperties
) {
@Bean
fun webClient(): WebClient.Builder =
WebClient
.builder()
.codecs { it.defaultCodecs().maxInMemorySize(10 * 1024 * 1024) }
@Bean
fun processerWebClient(builder: WebClient.Builder): WebClient {
return builder.baseUrl(processerClientProperties.baseUrl).build()
}
}

View File

@ -0,0 +1,21 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.ProcesserClient
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import reactor.core.publisher.Mono
@RestController
@RequestMapping("/log")
class LogController(
private val processerClient: ProcesserClient
) {
@GetMapping
fun getLog(@RequestParam path: String): Mono<String> {
return processerClient.fetchLog(path)
}
}

View File

@ -26,20 +26,28 @@ class TaskController(
) {
@GetMapping("/active")
fun getActiveTasks(): List<CoordinatorTaskTransferDto> =
taskService.getActiveTasks().map { it.toCoordinatorTransferDto() }
fun getActiveTasks(): List<CoordinatorTaskTransferDto> {
val tasks = taskService.getActiveTasks()
val logEvents = eventService.getTaskEventResultsWithLogs(tasks.map { it.referenceId }.toSet())
return tasks.map { it.toCoordinatorTransferDto(logEvents) }
}
@GetMapping
fun getPagedTasks(query: TaskQuery): Paginated<CoordinatorTaskTransferDto> {
val paginatedTasks = taskService.getPagedTasks(query)
return paginatedTasks.map { it.toCoordinatorTransferDto() }
val logEvents = eventService.getTaskEventResultsWithLogs(paginatedTasks.items.map { it.referenceId }.toSet())
return paginatedTasks.map { it.toCoordinatorTransferDto(logEvents) }
}
@GetMapping("/{id}")
fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? =
taskService.getTaskById(id)?.toCoordinatorTransferDto()
fun getTask(@PathVariable id: UUID): CoordinatorTaskTransferDto? {
val tasks = taskService.getTaskById(id) ?: return null
val logEvents = eventService.getTaskEventResultsWithLogs(setOf(tasks.referenceId))
return tasks.toCoordinatorTransferDto(logEvents)
}
@GetMapping("/{taskId}/reset")

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.coordinator.dto
import java.util.*
data class LogAssociatedIds(
val referenceId: UUID,
val ids: Set<UUID>,
val logFile: String
)

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.dto.translate
import no.iktdev.eventi.models.store.PersistedTask
import no.iktdev.mediaprocessing.coordinator.dto.LogAssociatedIds
import no.iktdev.mediaprocessing.shared.common.rules.TaskLifecycleRules
import java.time.Instant
import java.util.*
@ -17,11 +18,16 @@ data class CoordinatorTaskTransferDto(
val consumed: Boolean,
val lastCheckIn: Instant?,
val persistedAt: Instant,
val logs: List<String> = emptyList(),
val abandoned: Boolean,
) {
}
fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto {
fun PersistedTask.toCoordinatorTransferDto(logs: List<LogAssociatedIds>): CoordinatorTaskTransferDto {
val matchingLogs = logs
.filter { log -> log.ids.contains(taskId) }
.map { it.logFile }
return CoordinatorTaskTransferDto(
id = id,
referenceId = referenceId,
@ -34,6 +40,7 @@ fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto {
consumed = consumed,
lastCheckIn = lastCheckIn,
persistedAt = persistedAt,
logs = matchingLogs,
abandoned = TaskLifecycleRules.isAbandoned(consumed, persistedAt, lastCheckIn)
)
}

View File

@ -8,35 +8,25 @@ import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
import org.springframework.stereotype.Component
@Component
class CollectEventsListener: EventListener() {
class CollectEventsListener : EventListener() {
private val log = KotlinLogging.logger {}
val undesiredStates = listOf(CollectProjection.TaskStatus.Failed, CollectProjection.TaskStatus.Pending)
override fun onEvent(
event: Event,
history: List<Event>
): Event? {
// Prevent Rouge trigger when replayed
override fun onEvent(event: Event, history: List<Event>): Event? {
// Avoid double-collection
if (event is CollectedEvent || history.any { it is CollectedEvent }) return null
val collectProjection = CollectProjection(history)
log.info { collectProjection.prettyPrint() }
val projection = CollectProjection(history)
val taskStatus = collectProjection.getTaskStatus()
if (taskStatus.all { it == CollectProjection.TaskStatus.NotInitiated }) {
// No work has been done, so we are not ready
return null
}
val statusAcceptable = taskStatus.none { it in undesiredStates }
if (!statusAcceptable) {
if (taskStatus.any { it == CollectProjection.TaskStatus.Failed }) {
log.warn { "One or more tasks have failed in ${event.referenceId}" }
} else {
log.info { "One or more tasks are still pending in ${event.referenceId}" }
}
return null
}
// Must have a StartProcessingEvent
if (projection.startedWith == null) return null
// Must be allowed to store (Auto or Manual + AllowCompletion)
if (!projection.isStorePermitted()) return null
// Must have all relevant tasks completed
if (!projection.isWorkflowComplete()) return null
return CollectedEvent(history.map { it.eventId }.toSet()).derivedOf(event)
}
}
}

View File

@ -4,12 +4,9 @@ import no.iktdev.eventi.ListenerOrder
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MetadataSearchTask
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
import org.jetbrains.annotations.VisibleForTesting
import org.springframework.stereotype.Component
import java.util.*
@ -30,6 +27,13 @@ class MediaCreateMetadataSearchTaskListener: EventListener() {
event: Event,
history: List<Event>
): Event? {
val startedEvent = history.filterIsInstance<StartProcessingEvent>().firstOrNull() ?: return null
if (startedEvent.data.operation.isNotEmpty()) {
if (!startedEvent.data.operation.contains(OperationType.Metadata))
return null
}
// For replay
if (event is MetadataSearchTaskCreatedEvent) {
val hasResult = history.filter { it is MetadataSearchResultEvent }

View File

@ -22,7 +22,8 @@ class StartedListener : EventListener() {
operation = setOf(
OperationType.ExtractSubtitles,
OperationType.ConvertSubtitles,
OperationType.Encode
OperationType.Encode,
OperationType.Metadata
)
)
)

View File

@ -2,11 +2,14 @@ package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.mediaprocessing.coordinator.dto.LogAssociatedIds
import no.iktdev.mediaprocessing.shared.common.dto.EventQuery
import no.iktdev.mediaprocessing.shared.common.dto.Paginated
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
import no.iktdev.mediaprocessing.shared.common.dto.toDto
import no.iktdev.mediaprocessing.shared.common.effectivePersisted
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEvent
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
import org.springframework.stereotype.Service
import java.time.Instant
@ -77,4 +80,32 @@ class EventService {
return EventStore.getDeletedSequences(referenceIds)
}
val taskResultEventTypes: List<String> =
EventRegistry.getEvents()
.filter { TaskResultEvent::class.java.isAssignableFrom(it) }
.map { it.simpleName }
fun getTaskEventResultsWithLogs(referenceIds: Set<UUID>): List<LogAssociatedIds> {
// 1. Hent persisted events som matcher TaskResultEvent-typene
val persisted = EventStore.getPersistedEventsFor(referenceIds, taskResultEventTypes)
// 2. Deserialiser til domeneklasse
val domainEvents = persisted.map { it.toEvent() }
// 3. Filtrer til TaskResultEvent-instansene som har logg
return domainEvents
.filterIsInstance<TaskResultEvent>()
.filter { it.logFile != null }
.map {
LogAssociatedIds(
referenceId = it.referenceId,
ids = setOf( it.eventId, *(it.metadata.derivedFromId?.toTypedArray() ?: emptyArray())),
logFile = it.logFile!!
)
}
}
}

View File

@ -46,7 +46,7 @@ open class TestBase {
fun defaultStartEvent(): StartProcessingEvent {
val start = StartProcessingEvent(
data = StartData(
operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles, OperationType.ConvertSubtitles),
operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles, OperationType.ConvertSubtitles, OperationType.Metadata),
fileUri = "file:///unit/${UUID.randomUUID()}.mkv"
)
)

View File

@ -67,71 +67,97 @@ class CollectEventsListenerTest : TestBase() {
@Test
@DisplayName(
"""
Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract
Når encode result kommer inn
:
Opprettes CollectEvent basert historikken
Hvis vi har kun encoded hendelse, men vi har sagt at vi også skal ha extract, men ikke har opprettet extract
Når encode result kommer inn
:
Opprettes CollectEvent basert historikken
"""
)
fun success2() {
val started = defaultStartEvent().let { ev ->
ev.copy(data = ev.data.copy(operation = setOf(OperationType.Encode, OperationType.ExtractSubtitles)))
ev.copy(
data = ev.data.copy(
operation = setOf(
OperationType.Metadata,
OperationType.Encode,
OperationType.ExtractSubtitles
)
)
)
}
val parsed = mediaParsedEvent(
collection = "MyCollection",
fileName = "MyCollection 1",
mediaType = MediaType.Movie
).derivedOf(started)
val metadata = metadataEvent(parsed).first()
val encode = encodeEvent("/tmp/video.mp4", parsed)
val history = listOf(
started,
parsed,
metadata,
*encode.toTypedArray(),
)
val result = listener.onEvent(history.last(), history)
assertThat(result).isNotNull()
assertThat {
result is CollectedEvent
}
assertThat(result).isNull()
}
@Test
@DisplayName(
"""
"""
Hvis vi har kun convert hendelse
Når convert har komment inn
Når convert har kommet inn
:
Opprettes CollectEvent basert historikken
"""
"""
)
fun success3() {
val started = defaultStartEvent().let { ev ->
ev.copy(data = ev.data.copy(operation = setOf(OperationType.ConvertSubtitles)))
ev.copy(
data = ev.data.copy(
operation = setOf(
OperationType.Metadata,
OperationType.ConvertSubtitles
)
)
)
}
val parsed = mediaParsedEvent(
collection = "MyCollection",
fileName = "MyCollection 1",
mediaType = MediaType.Movie
).derivedOf(started)
val convert = encodeEvent("/tmp/fancy.srt", parsed)
val metadata = metadataEvent(parsed)
val convert = convertEvent(
language = "en",
baseName = "sub1",
outputFiles = listOf("/tmp/sub1.vtt"),
derivedFrom = parsed
)
val history = listOf(
started,
parsed,
*metadata.toTypedArray(),
*convert.toTypedArray(),
)
val result = listener.onEvent(history.last(), history)
assertThat(result).isNotNull()
assertThat {
result is CollectedEvent
}
assertThat(result).isInstanceOf(CollectedEvent::class.java)
}
@Test
@DisplayName(
"""
@ -200,6 +226,8 @@ class CollectEventsListenerTest : TestBase() {
assertThat(result).isNull()
}
@Test
@DisplayName(
"""

View File

@ -8,5 +8,6 @@ import no.iktdev.eventi.models.store.TaskStatus
*/
open class TaskResultEvent(
val status: TaskStatus,
val error: String? = null
val error: String? = null,
val logFile: String? = null
) : Event()

View File

@ -5,10 +5,10 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve
class ProcesserEncodeResultEvent(
val data: EncodeResult? = null,
val logFile: String? = null,
logFile: String? = null,
status: TaskStatus,
error: String? = null
) : TaskResultEvent(status, error) {
) : TaskResultEvent(status, error, logFile) {
data class EncodeResult(
val cachedOutputFile: String? = null
)

View File

@ -6,8 +6,9 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve
class ProcesserExtractResultEvent(
val data: ExtractResult? = null,
status: TaskStatus,
error: String? = null
) : TaskResultEvent(status, error) {
error: String? = null,
logFile: String? = null,
) : TaskResultEvent(status, error, logFile) {
data class ExtractResult(
val language: String,
val cachedOutputFile: String

View File

@ -22,5 +22,6 @@ enum class StartFlow {
enum class OperationType {
ExtractSubtitles,
Encode,
ConvertSubtitles
ConvertSubtitles,
Metadata
}

View File

@ -49,6 +49,37 @@ class CollectProjection(val events: List<Event>) {
coverDownloadTaskStatus
)
fun getRelevantTaskStatuses(): List<TaskStatus> {
val required = startedWith?.tasks ?: emptySet()
val statusMap = mapOf(
OperationType.Encode to encodeTaskStatus,
OperationType.ExtractSubtitles to extreactTaskStatus,
OperationType.ConvertSubtitles to convertTaskStatus,
OperationType.Metadata to metadataTaskStatus,
)
return required.map { statusMap[it] ?: TaskStatus.NotInitiated }
}
fun isWorkflowComplete(): Boolean {
val statuses = getRelevantTaskStatuses()
if (statuses.isEmpty()) return false
val anyFailed = statuses.any { it == TaskStatus.Failed }
val anyPending = statuses.any { it == TaskStatus.Pending }
val allCompleted = statuses.all { it == TaskStatus.Completed }
if (anyFailed) return false
if (anyPending) return false
return allCompleted
}
fun isStorePermitted(): Boolean {
val start = events.filterIsInstance<StartProcessingEvent>().firstOrNull()
?: return false // ingen start → ingen store

View File

@ -88,6 +88,17 @@ object EventStore: EventStore {
return result.getOrDefault(emptyList())
}
fun getPersistedEventsFor(referenceId: Set<UUID>, eventNames: List<String>): List<PersistedEvent> {
val deleted = getDeletedSequences(referenceId).map { it.toString() }
val result = withTransaction {
EventsTable
.getWhere { (EventsTable.referenceId eq referenceId.toString()) and
(EventsTable.referenceId notInList deleted.toList()) and
(EventsTable.event inList eventNames )}
}
return result.getOrDefault(emptyList())
}
override fun persist(event: Event) {
val asData = ZDS.WGson.toJson(event)
val eventName = event::class.simpleName ?: run {