diff --git a/apps/converter/src/main/resources/application.yml b/apps/converter/src/main/resources/application.yml index 75920619..533d9e11 100644 --- a/apps/converter/src/main/resources/application.yml +++ b/apps/converter/src/main/resources/application.yml @@ -5,7 +5,7 @@ spring: flyway: enabled: true locations: classpath:flyway - baseline-on-migrate: true + baseline-on-migrate: false management: endpoints: @@ -22,3 +22,12 @@ logging: org.apache.kafka: INFO Exposed: OFF org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN + + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service \ No newline at end of file diff --git a/apps/converter/src/test/resources/application.yml b/apps/converter/src/test/resources/application.yml index 0edf3706..2c424c6a 100644 --- a/apps/converter/src/test/resources/application.yml +++ b/apps/converter/src/test/resources/application.yml @@ -30,4 +30,12 @@ management: - health endpoint: health: - show-details: always \ No newline at end of file + show-details: always + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index b0c64450..f0ae80af 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -6,11 +6,13 @@ import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.exfl.coroutines.CoroutinesDefault import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.observable.Observables +import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig import no.iktdev.mediaprocessing.shared.common.DatabaseApplication import no.iktdev.mediaprocessing.shared.common.MediaProcessingApp import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.getAppVersion +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.runApplication import org.springframework.context.annotation.Configuration @@ -52,3 +54,11 @@ open class ApplicationConfiguration() { } } } + +@Configuration +@EnableConfigurationProperties( + value = [ + ExecutablesConfig::class + ] +) +class CoordinatorConfig \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEnv.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEnv.kt index 8f397570..71281c1e 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEnv.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorEnv.kt @@ -1,17 +1,23 @@ package no.iktdev.mediaprocessing.coordinator +import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig +import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths +import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig +import org.springframework.stereotype.Service import java.io.File -class CoordinatorEnv { - companion object { - val streamitAddress = System.getenv("STREAMIT_ADDRESS") ?: "http://streamit.service" +@Service +class CoordinatorEnv( + val streamIt: StreamItConfig, + val exec: ExecutablesConfig, + val media: MediaPaths +) { + val streamitAddress = streamIt.address + val ffprobe = exec.ffprobe - val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe" + val cachedContent = File(media.cache) + val outgoingContent = File(media.outgoing) + val incomingContent = File(media.incoming) + val preference: File = File("/data/config/preference.json") - val preference: File = File("/data/config/preference.json") - - var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache") - val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") - - } -} \ No newline at end of file +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt index c5fb5a64..5ec1f85a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorService.kt @@ -15,4 +15,7 @@ class CoordinatorService { fun getProgress(taskId: String): ProgressUpdate? = progressMap[taskId] + + fun getProgress(): List = + progressMap.values.toList() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Preference.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Preference.kt index 027e4010..a64a16cb 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Preference.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Preference.kt @@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator import com.google.gson.Gson import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec +import org.springframework.stereotype.Component import java.io.File @@ -34,11 +35,11 @@ data class AudioPreference( val codec: AudioCodec ) - -object Preference { +@Component +class Preference(private val coordinatorEnv: CoordinatorEnv) { fun getProcesserPreference(): ProcesserPreference { var preference: ProcesserPreference = ProcesserPreference.default() - CoordinatorEnv.preference.ifExists({ + coordinatorEnv.preference.ifExists({ val text = readText() try { val result = Gson().fromJson(text, PeferenceConfig::class.java) @@ -47,7 +48,7 @@ object Preference { e.printStackTrace() } }, orElse = { - CoordinatorEnv.preference.writeText(Gson().toJson(PeferenceConfig(preference))) + coordinatorEnv.preference.writeText(Gson().toJson(PeferenceConfig(preference))) }) return preference } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RestTemplateConfig.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RestTemplateConfig.kt index f567966b..695c3194 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RestTemplateConfig.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/RestTemplateConfig.kt @@ -9,12 +9,14 @@ import org.springframework.web.client.RestTemplate class RestTemplateConfig { @Configuration - class RestTemplateConfig { + class RestTemplateConfig( + private val coordinatorEnv: CoordinatorEnv + ) { @Bean fun streamitRestTemplate(): RestTemplate { return RestTemplateBuilder() - .rootUri(CoordinatorEnv.streamitAddress) + .rootUri(coordinatorEnv.streamitAddress) .build() } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ExecutableConfig.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ExecutableConfig.kt new file mode 100644 index 00000000..c0f687a1 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/config/ExecutableConfig.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.coordinator.config + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "executables") +data class ExecutablesConfig( + val ffprobe: String +) \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt new file mode 100644 index 00000000..56e53170 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/EventsController.kt @@ -0,0 +1,31 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.services.EventPagingService +import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam +import org.springframework.web.bind.annotation.RestController +import java.util.* + +@RestController +@RequestMapping("/events") +class EventsController( + private val paging: EventPagingService +) { + + @GetMapping + fun getEvents( + @RequestParam referenceId: UUID, + @RequestParam(required = false) beforeEventId: UUID?, + @RequestParam(required = false) afterEventId: UUID?, + @RequestParam(defaultValue = "50") limit: Int + ): List { + return paging.getEvents( + referenceId = referenceId, + beforeEventId = beforeEventId, + afterEventId = afterEventId, + limit = limit + ) + } +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt index 12a6de90..a5ec2bb4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/InternalProcesserController.kt @@ -1,22 +1,34 @@ package no.iktdev.mediaprocessing.coordinator.controller import no.iktdev.mediaprocessing.coordinator.CoordinatorService +import no.iktdev.mediaprocessing.coordinator.services.SseHub import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate import org.springframework.http.ResponseEntity -import org.springframework.web.bind.annotation.PostMapping -import org.springframework.web.bind.annotation.RequestBody -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.RestController +import org.springframework.web.bind.annotation.* +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter @RestController @RequestMapping("/internal") class InternalProcesserController( - private val coordinator: CoordinatorService + private val coordinator: CoordinatorService, + private val hub: SseHub ) { @PostMapping("/progress") fun receiveProgress(@RequestBody update: ProgressUpdate): ResponseEntity { coordinator.updateProgress(update) + + hub.broadcast("progress", update) return ResponseEntity.ok().build() } + + @GetMapping("/progress") + fun getAllProgress(): List { + return coordinator.getProgress() + } + + @GetMapping("/sse") + fun stream(): SseEmitter { + return hub.createEmitter() + } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt new file mode 100644 index 00000000..8529d35a --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/OperationsController.kt @@ -0,0 +1,43 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.services.CommandService +import no.iktdev.mediaprocessing.shared.common.dto.requests.StartProcessRequest +import org.springframework.http.ResponseEntity +import org.springframework.web.bind.annotation.PostMapping +import org.springframework.web.bind.annotation.RequestBody +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/operations") +class OperationsController( + private val commandService: CommandService +) { + + @PostMapping("/start") + fun startProcess(@RequestBody req: StartProcessRequest): ResponseEntity> { + val referenceId = commandService.startProcess(req) + return when (val result = commandService.startProcess(req)) { + is CommandService.StartResult.Accepted -> ResponseEntity + .accepted() + .body( + mapOf( + "referenceId" to result.referenceId.toString(), + "status" to "accepted", + "message" to "Process accepted and StartedEvent created" + ) + ) + + is CommandService.StartResult.Rejected -> ResponseEntity + .badRequest() + .body( + mapOf( + "status" to "rejected", + "message" to result.reason + ) + ) + } + } + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt new file mode 100644 index 00000000..d32798d0 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/controller/SequencesController.kt @@ -0,0 +1,27 @@ +package no.iktdev.mediaprocessing.coordinator.controller + +import no.iktdev.mediaprocessing.coordinator.services.SequenceAggregatorService +import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RequestMapping +import org.springframework.web.bind.annotation.RequestParam +import org.springframework.web.bind.annotation.RestController + +@RestController +@RequestMapping("/sequences") +class SequenceController( + private val aggregator: SequenceAggregatorService +) { + + @GetMapping("/active") + fun getActive(): List { + return aggregator.getActiveSequences() + } + + @GetMapping("/recent") + fun getRecent( + @RequestParam(defaultValue = "15") limit: Int + ): List { + return aggregator.getRecentSequences(limit) + } +} diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListener.kt index 853e2134..baa11153 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListener.kt @@ -3,16 +3,8 @@ package no.iktdev.mediaprocessing.coordinator.listeners.events import no.iktdev.eventi.events.EventListener import no.iktdev.eventi.models.Event import no.iktdev.mediaprocessing.coordinator.Preference -import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec -import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioTarget -import no.iktdev.mediaprocessing.ffmpeg.dsl.MediaPlan -import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec -import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoTarget -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent +import no.iktdev.mediaprocessing.ffmpeg.dsl.* +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import no.iktdev.mediaprocessing.shared.common.stores.TaskStore @@ -20,13 +12,15 @@ import org.springframework.stereotype.Component import java.io.File @Component -class MediaCreateEncodeTaskListener : EventListener() { +class MediaCreateEncodeTaskListener( + private val preference: Preference +) : EventListener() { override fun onEvent( event: Event, history: List ): Event? { - val preference = Preference.getProcesserPreference() + val preference = preference.getProcesserPreference() val startedEvent = history.filterIsInstance().firstOrNull() ?: return null if (startedEvent.data.operation.isNotEmpty()) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListener.kt index 7edceb37..80afa7b1 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListener.kt @@ -13,7 +13,9 @@ import no.iktdev.mediaprocessing.shared.common.stores.TaskStore import org.springframework.stereotype.Component @Component -class MigrateCreateStoreTaskListener: EventListener() { +class MigrateCreateStoreTaskListener( + private val coordinatorEnv: CoordinatorEnv, +): EventListener() { private val log = KotlinLogging.logger {} override fun onEvent( @@ -31,7 +33,7 @@ class MigrateCreateStoreTaskListener: EventListener() { log.warn { "One or more tasks have failed in ${event.referenceId}" } } - val migrateContentProjection = MigrateContentProject(useHistory, CoordinatorEnv.outgoingContent) + val migrateContentProjection = MigrateContentProject(useHistory, coordinatorEnv.outgoingContent) val collection = migrateContentProjection.useStore?.name ?: throw RuntimeException("No content store configured for migration in ${event.referenceId}") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt index ba43003c..554b7fa4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt @@ -15,7 +15,9 @@ import org.springframework.stereotype.Component import java.util.* @Component -class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { +class DownloadCoverTaskListener( + private val coordinatorEnv: CoordinatorEnv, +): TaskListener(TaskType.MIXED) { val log = KotlinLogging.logger {} override fun getWorkerId(): String { @@ -57,11 +59,11 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { } open fun getDownloadClient(): DownloadClient { - return DefaultDownloadClient() + return DefaultDownloadClient(coordinatorEnv) } - class DefaultDownloadClient() : DownloadClient( - outDir = CoordinatorEnv.cachedContent, + class DefaultDownloadClient(private val coordinatorEnv: CoordinatorEnv) : DownloadClient( + outDir = coordinatorEnv.cachedContent, connectionFactory = DefaultConnectionFactory(),) { override fun onCreate() { super.onCreate() diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt index 414fe36b..89d8e649 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt @@ -13,7 +13,9 @@ import org.springframework.stereotype.Component import java.util.* @Component -class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) { +class MediaStreamReadTaskListener( + private val coordinatorEnv: CoordinatorEnv +): FfprobeTaskListener(TaskType.CPU_INTENSIVE) { val log = KotlinLogging.logger {} override fun getWorkerId(): String { @@ -48,7 +50,7 @@ class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) { } override fun getFfprobe(): FFprobe { - return JsonFfinfo(CoordinatorEnv.ffprobe) + return JsonFfinfo(coordinatorEnv.ffprobe) } class JsonFfinfo(executable: String): FFprobe(executable) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CommandService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CommandService.kt new file mode 100644 index 00000000..368d9116 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/CommandService.kt @@ -0,0 +1,46 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import no.iktdev.mediaprocessing.shared.common.dto.requests.StartProcessRequest +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent +import no.iktdev.mediaprocessing.shared.common.notExist +import no.iktdev.mediaprocessing.shared.common.stores.EventStore +import org.springframework.stereotype.Service +import java.io.File +import java.util.* + +@Service +class CommandService { + + fun startProcess(request: StartProcessRequest): StartResult { + return try { + val file = File(request.fileUri) + if (file.notExist()) { + throw IllegalArgumentException("File does not exists at ${request.fileUri}") + } + if (!file.canRead()) { + throw IllegalStateException("File is not readable ${request.fileUri}") + } + + val startProcessingEvent = StartProcessingEvent( + data = StartData( + fileUri = request.fileUri, + operation = request.operationTypes, + flow = StartFlow.Manual + ) + ).newReferenceId() + EventStore.persist(startProcessingEvent) + StartResult.Accepted(startProcessingEvent.referenceId) + } catch (e: Exception) { + StartResult.Rejected("Failed to start process for file ${request.fileUri}, with the following reason: ${e.message}") + } + } + + + sealed class StartResult { + data class Accepted(val referenceId: UUID) : StartResult() + data class Rejected(val reason: String) : StartResult() + } + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt new file mode 100644 index 00000000..fbbc9b9b --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/EventPagingService.kt @@ -0,0 +1,41 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent +import no.iktdev.mediaprocessing.shared.common.dto.toDto +import no.iktdev.mediaprocessing.shared.common.stores.EventStore +import org.springframework.stereotype.Service +import java.util.* + +@Service +class EventPagingService { + + fun getEvents( + referenceId: UUID, + beforeEventId: UUID?, + afterEventId: UUID?, + limit: Int + ): List { + + val all = EventStore.getPersistedEventsFor(referenceId) + .sortedByDescending { it.persistedAt } + + val filtered = when { + beforeEventId != null -> + all.dropWhile { it.eventId != beforeEventId }.drop(1) + + afterEventId != null -> + all.takeWhile { it.eventId != afterEventId } + + else -> all + } + + return filtered + .take(limit) + .mapNotNull { persisted -> + val event = persisted.toEvent() ?: return@mapNotNull null + persisted.toDto(event) + } + } + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt new file mode 100644 index 00000000..571cd5c3 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SequenceAggregatorService.kt @@ -0,0 +1,61 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import no.iktdev.eventi.ZDS.toEvent +import no.iktdev.eventi.models.store.PersistedEvent +import no.iktdev.mediaprocessing.shared.common.LocalDateTimeEpoch +import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent +import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection +import no.iktdev.mediaprocessing.shared.common.stores.EventStore +import org.springframework.stereotype.Service + +@Service +class SequenceAggregatorService() { + + fun getActiveSequences(): List { + val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch) + + // Gruppér først, deserialiser senere + val grouped = allEvents.groupBy { it.referenceId } + + return grouped.values + // aktive = ingen CollectedEvent + .filter { events -> events.none { it.event == CollectedEvent::class.java.simpleName } } + .mapNotNull { events -> buildSummary(events) } + .sortedByDescending { it.lastEventTime } + } + + fun getRecentSequences(limit: Int): List { + val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch) + + val grouped = allEvents.groupBy { it.referenceId } + + return grouped.values + .mapNotNull { events -> buildSummary(events) } + .sortedByDescending { it.lastEventTime } + .take(limit) + } + + private fun buildSummary(events: List): SequenceSummary? { + val last = events.maxByOrNull { it.persistedAt } ?: return null + + // Deserialiser kun eventene for denne sekvensen + val domainEvents = events.mapNotNull { it.toEvent() } + + val projection = CollectProjection(domainEvents) + + return SequenceSummary( + referenceId = last.referenceId.toString(), + title = "", + inputFileName = projection.useFile?.name, + lastEventId = last.eventId.toString(), + lastEventTime = last.persistedAt, + metadataTaskStatus = projection.metadataTaskStatus, + encodeTaskStatus = projection.encodeTaskStatus, + extractTaskStatus = projection.extreactTaskStatus, + convertTaskStatus = projection.convertTaskStatus, + coverDownloadTaskStatus = projection.coverDownloadTaskStatus, + hasErrors = projection.getTaskStatus().any { it == CollectProjection.TaskStatus.Failed } + ) + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SseHub.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SseHub.kt new file mode 100644 index 00000000..4094036b --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/services/SseHub.kt @@ -0,0 +1,40 @@ +package no.iktdev.mediaprocessing.coordinator.services + +import org.springframework.stereotype.Service +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter +import java.util.concurrent.CopyOnWriteArrayList + +@Service +class SseHub { + + private val emitters = CopyOnWriteArrayList() + + fun createEmitter(): SseEmitter { + val emitter = SseEmitter(0L) // never timeout + emitters.add(emitter) + + emitter.onCompletion { emitters.remove(emitter) } + emitter.onTimeout { emitters.remove(emitter) } + emitter.onError { emitters.remove(emitter) } + + return emitter + } + + fun broadcast(eventName: String, data: Any) { + val dead = mutableListOf() + + emitters.forEach { emitter -> + try { + emitter.send( + SseEmitter.event() + .name(eventName) + .data(data) + ) + } catch (ex: Exception) { + dead.add(emitter) + } + } + + emitters.removeAll(dead.toSet()) + } +} diff --git a/apps/coordinator/src/main/resources/application.yml b/apps/coordinator/src/main/resources/application.yml index 406c1f65..8c9a3d42 100644 --- a/apps/coordinator/src/main/resources/application.yml +++ b/apps/coordinator/src/main/resources/application.yml @@ -5,7 +5,7 @@ spring: flyway: enabled: true locations: classpath:flyway - baseline-on-migrate: true + baseline-on-migrate: false management: endpoints: @@ -23,3 +23,14 @@ logging: org.apache.kafka: INFO Exposed: OFF org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service + +executables: + ffprobe: ffprobe diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/ListenerInformOrderTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/ListenerInformOrderTest.kt index 5fd47232..c8558dcb 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/ListenerInformOrderTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/ListenerInformOrderTest.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing +import jakarta.annotation.PostConstruct import no.iktdev.eventi.ListenerOrder import no.iktdev.eventi.events.EventListenerRegistry import no.iktdev.mediaprocessing.coordinator.CoordinatorApplication @@ -8,29 +9,34 @@ import no.iktdev.mediaprocessing.shared.common.config.DatasourceConfiguration import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -import org.junit.jupiter.api.TestInstance import org.junit.jupiter.api.extension.ExtendWith import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest +import org.springframework.boot.test.context.TestConfiguration import org.springframework.context.ApplicationContext -import org.springframework.context.annotation.ComponentScan +import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit.jupiter.SpringExtension @SpringBootTest( classes = [CoordinatorApplication::class, - DatasourceConfiguration::class], + DatasourceConfiguration::class, + ListenerInformOrderTest.RegistryResetConfig::class], webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT ) @TestPropertySource(properties = ["spring.flyway.enabled=true"]) -@TestInstance(TestInstance.Lifecycle.PER_CLASS) -@ComponentScan("no.iktdev.mediaprocessing.coordinator.listeners.events") +@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD) @ExtendWith(SpringExtension::class) class ListenerInformOrderTest(): TestBase() { @Autowired lateinit var ctx: ApplicationContext + @BeforeEach + fun reset() { + } + @Test fun verifyTaskRegistryIsNotEmpty() { assertThat { TaskRegistry.getTasks().isNotEmpty() } @@ -54,4 +60,13 @@ class ListenerInformOrderTest(): TestBase() { MediaCreateMetadataSearchTaskListener::class.java.simpleName, ) } + + @TestConfiguration + class RegistryResetConfig { + @PostConstruct + fun reset() { + EventListenerRegistry.wipe() + } + } + } \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt index 4cc36507..a499c2d4 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestBase.kt @@ -3,10 +3,7 @@ package no.iktdev.mediaprocessing import io.mockk.* import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task -import no.iktdev.mediaprocessing.coordinator.AudioPreference -import no.iktdev.mediaprocessing.coordinator.Preference -import no.iktdev.mediaprocessing.coordinator.ProcesserPreference -import no.iktdev.mediaprocessing.coordinator.VideoPreference +import no.iktdev.mediaprocessing.coordinator.* import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType @@ -21,17 +18,25 @@ open class TestBase { class DummyEvent: Event() class DummyTask: Task() + val preference: Preference = mockk(relaxed = true) + val coordinatorEnv = mockk(relaxed = true) + + @BeforeEach - fun setup() { + open fun setup() { mockkObject(TaskStore) every { TaskStore.persist(any()) } just Runs - mockkObject(Preference) - every { Preference.getProcesserPreference() } returns ProcesserPreference( + every { preference.getProcesserPreference() } returns ProcesserPreference( videoPreference = VideoPreference(codec = VideoCodec.Hevc()), audioPreference = AudioPreference(codec = AudioCodec.Aac(channels = 2)) ) + every { coordinatorEnv.outgoingContent } returns File("./tmp/output") + every { coordinatorEnv.incomingContent } returns File("./tmp/input") + every { coordinatorEnv.cachedContent } returns File("./tmp/cached") + every { coordinatorEnv.streamitAddress } returns "http://streamit.lan" } + fun mockkIO() { mockkConstructor(File::class) every { anyConstructed().exists() } returns true diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestUtil.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestUtil.kt new file mode 100644 index 00000000..efb44cc1 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/TestUtil.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing + +import no.iktdev.eventi.events.EventListener +import no.iktdev.eventi.events.EventListenerRegistry +import org.assertj.core.api.Assertions.assertThat +import java.lang.reflect.Field + +fun EventListenerRegistry.wipe() { + val field: Field = EventListenerRegistry::class.java + .superclass + .getDeclaredField("listeners") + field.isAccessible = true + + // Tøm map’en + val mutableList = field.get(EventListenerRegistry) as MutableList<*> + (mutableList as MutableList>).clear() + + // Verifiser at det er tomt + assertThat(EventListenerRegistry.getListeners().isEmpty()) +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListenerTest.kt index 17a1377a..f99a2f72 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MediaCreateEncodeTaskListenerTest.kt @@ -1,49 +1,38 @@ package no.iktdev.mediaprocessing.coordinator.listeners.events -import io.mockk.Runs -import io.mockk.every -import io.mockk.just -import io.mockk.mockkObject -import io.mockk.verify +import io.mockk.* +import no.iktdev.mediaprocessing.TestBase import no.iktdev.mediaprocessing.coordinator.AudioPreference -import no.iktdev.mediaprocessing.coordinator.Preference import no.iktdev.mediaprocessing.coordinator.ProcesserPreference import no.iktdev.mediaprocessing.coordinator.VideoPreference -import no.iktdev.mediaprocessing.ffmpeg.data.AudioStream -import no.iktdev.mediaprocessing.ffmpeg.data.Disposition -import no.iktdev.mediaprocessing.ffmpeg.data.ParsedMediaStreams -import no.iktdev.mediaprocessing.ffmpeg.data.Tags -import no.iktdev.mediaprocessing.ffmpeg.data.VideoStream +import no.iktdev.mediaprocessing.ffmpeg.data.* import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import no.iktdev.mediaprocessing.shared.common.stores.TaskStore -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test -class MediaCreateEncodeTaskListenerTest { +class MediaCreateEncodeTaskListenerTest: TestBase() { - private val listener = MediaCreateEncodeTaskListener() + private val listener = MediaCreateEncodeTaskListener(preference) @BeforeEach - fun setup() { + override fun setup() { mockkObject(TaskStore) every { TaskStore.persist(any()) } just Runs - mockkObject(Preference) - every { Preference.getProcesserPreference() } returns ProcesserPreference( + every { preference.getProcesserPreference() } returns ProcesserPreference( videoPreference = VideoPreference(codec = VideoCodec.Hevc()), audioPreference = AudioPreference(codec = AudioCodec.Aac(channels = 2)) ) } + + @Test @DisplayName(""" Hvis en video- og audio-track er valgt diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt index ff75c1b4..e267673f 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt @@ -23,7 +23,7 @@ import java.io.File class MigrateCreateStoreTaskListenerTest : TestBase() { - private val listener = MigrateCreateStoreTaskListener() + private val listener = MigrateCreateStoreTaskListener(coordinatorEnv) @Test @DisplayName( diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt index 12a53b76..5a3eaa14 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt @@ -7,6 +7,7 @@ import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.MockDownloadClient import no.iktdev.mediaprocessing.TestBase +import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv import no.iktdev.mediaprocessing.shared.common.DownloadClient import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask @@ -17,9 +18,9 @@ import java.io.File import java.util.* import kotlin.system.measureTimeMillis -class DownloadCoverTaskListenerTest { +class DownloadCoverTaskListenerTest: TestBase() { - class DownloadCoverTaskListenerTestImplementation : DownloadCoverTaskListener() { + class DownloadCoverTaskListenerTestImplementation(coordinatorEnv: CoordinatorEnv) : DownloadCoverTaskListener(coordinatorEnv) { fun getJob() = currentJob lateinit var client: DownloadClient @@ -43,7 +44,7 @@ class DownloadCoverTaskListenerTest { override fun publishEvent(event: Event) {} } - private var listener = DownloadCoverTaskListenerTestImplementation() + private var listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv) @Test @DisplayName( @@ -65,7 +66,7 @@ class DownloadCoverTaskListenerTest { ) ).newReferenceId() - listener = DownloadCoverTaskListenerTestImplementation().apply { + listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply { this.client = MockDownloadClient( delayMillis = delay, mockFile = File("/tmp/fancy.jpg") @@ -103,7 +104,7 @@ class DownloadCoverTaskListenerTest { ) ).newReferenceId() - listener = DownloadCoverTaskListenerTestImplementation().apply { + listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply { this.client = MockDownloadClient(throwException = true) } @@ -141,7 +142,7 @@ class DownloadCoverTaskListenerTest { fun onTask_produces_correct_output_path() = runTest { val mockFile = File("/tmp/expected.jpg") - listener = DownloadCoverTaskListenerTestImplementation().apply { + listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply { this.client = MockDownloadClient(mockFile = mockFile) } @@ -172,7 +173,7 @@ class DownloadCoverTaskListenerTest { fun accept_is_non_blocking() = runTest { val delay = 500L - listener = DownloadCoverTaskListenerTestImplementation().apply { + listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply { this.client = MockDownloadClient(delayMillis = delay, mockFile = File("/tmp/x.jpg")) } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt index 24f49a5c..d0316b99 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt @@ -9,6 +9,8 @@ import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.MockFFprobe +import no.iktdev.mediaprocessing.TestBase +import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv import no.iktdev.mediaprocessing.ffmpeg.FFprobe import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask @@ -19,9 +21,9 @@ import org.junit.jupiter.api.Test import java.util.* import kotlin.system.measureTimeMillis -class MediaStreamReadTaskListenerTest { +class MediaStreamReadTaskListenerTest: TestBase() { - class MediaStreamReadTaskListenerTestImplementation(): MediaStreamReadTaskListener() { + class MediaStreamReadTaskListenerTestImplementation(coordinatorEnv: CoordinatorEnv): MediaStreamReadTaskListener(coordinatorEnv) { fun getJob() = currentJob lateinit var probe: FFprobe @@ -50,11 +52,11 @@ class MediaStreamReadTaskListenerTest { } } - var listener = MediaStreamReadTaskListenerTestImplementation() + var listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv) @BeforeEach fun resetListener() { - listener = MediaStreamReadTaskListenerTestImplementation() + listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv) } @@ -66,7 +68,7 @@ class MediaStreamReadTaskListenerTest { val task = MediaReadTask(fileUri = "test.mp4").newReferenceId() - listener = MediaStreamReadTaskListenerTestImplementation().apply { + listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv).apply { this.probe = MockFFprobe.success(json, delay) } diff --git a/apps/coordinator/src/test/resources/application.yml b/apps/coordinator/src/test/resources/application.yml index 0edf3706..cf51c21a 100644 --- a/apps/coordinator/src/test/resources/application.yml +++ b/apps/coordinator/src/test/resources/application.yml @@ -30,4 +30,15 @@ management: - health endpoint: health: - show-details: always \ No newline at end of file + show-details: always + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service + +executables: + ffprobe: ffprobe diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt new file mode 100644 index 00000000..0d1e0855 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExecutableConfig.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.processer + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "executables") +data class ExecutablesConfig( + val ffmpeg: String +) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt index 24433e31..70c440c8 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.common.MediaProcessingApp import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.getAppVersion +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.runApplication import org.springframework.context.annotation.Configuration @@ -52,3 +53,11 @@ open class ApplicationConfiguration() { } } } + +@Configuration +@EnableConfigurationProperties( + value = [ + ExecutablesConfig::class + ] +) +class ProcesserConfig \ No newline at end of file diff --git a/apps/processer/src/main/resources/application.yml b/apps/processer/src/main/resources/application.yml index 75920619..032080fc 100644 --- a/apps/processer/src/main/resources/application.yml +++ b/apps/processer/src/main/resources/application.yml @@ -5,7 +5,7 @@ spring: flyway: enabled: true locations: classpath:flyway - baseline-on-migrate: true + baseline-on-migrate: false management: endpoints: @@ -22,3 +22,14 @@ logging: org.apache.kafka: INFO Exposed: OFF org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service + +executables: + ffmpeg: ffmpeg diff --git a/apps/processer/src/test/resources/application.yml b/apps/processer/src/test/resources/application.yml index 0edf3706..cfaabf57 100644 --- a/apps/processer/src/test/resources/application.yml +++ b/apps/processer/src/test/resources/application.yml @@ -30,4 +30,15 @@ management: - health endpoint: health: - show-details: always \ No newline at end of file + show-details: always + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service + +executables: + ffmpeg: ffmpeg diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DatabaseApplication.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DatabaseApplication.kt index 9ed6ade6..ef151215 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DatabaseApplication.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DatabaseApplication.kt @@ -1,11 +1,16 @@ package no.iktdev.mediaprocessing.shared.common import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths +import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig import org.jetbrains.exposed.sql.Database import org.springframework.beans.factory.InitializingBean import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.boot.runApplication import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import import org.springframework.stereotype.Component import javax.sql.DataSource @@ -34,4 +39,14 @@ class ExposedInitializer( @Retention(AnnotationRetention.RUNTIME) @SpringBootApplication @ComponentScan("no.iktdev.mediaprocessing") // sikrer at common beans blir plukket opp +@Import(SharedConfig::class) annotation class MediaProcessingApp + +@Configuration +@EnableConfigurationProperties( + value = [ + StreamItConfig::class, + MediaPaths::class + ] +) +class SharedConfig \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index c34c089d..b40840b6 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -10,6 +10,7 @@ import java.io.FileInputStream import java.io.RandomAccessFile import java.net.InetAddress import java.security.MessageDigest +import java.time.LocalDateTime import java.util.zip.CRC32 private val logger = KotlinLogging.logger {} @@ -214,3 +215,6 @@ fun File.resolveConflict(): File { return candidate } + +val LocalDateTimeEpoch: LocalDateTime = + LocalDateTime.of(1970, 1, 1, 0, 0, 0) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/configs/AppConfigurationProperties.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/configs/AppConfigurationProperties.kt new file mode 100644 index 00000000..6cd55167 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/configs/AppConfigurationProperties.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.shared.common.configs + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties(prefix = "streamit") +data class StreamItConfig( + val address: String +) + +@ConfigurationProperties(prefix = "media") +data class MediaPaths( + val cache: String, + val outgoing: String, + val incoming: String +) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceEvent.kt new file mode 100644 index 00000000..27addd3d --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceEvent.kt @@ -0,0 +1,44 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.store.PersistedEvent +import java.time.LocalDateTime +import java.util.* +import kotlin.reflect.KProperty1 + +data class SequenceEvent( + val eventId: UUID, + val referenceId: UUID, + val type: String, + val timestamp: LocalDateTime, + val metadata: MetadataDto, + val payload: Map? +) + +data class MetadataDto( + val derivedFromEventIds: Set?, + val createdAt: LocalDateTime +) + +fun Event.extractPayload(): Map? { + val ignored = setOf("referenceId", "eventId", "metadata") + + return this::class.members + .filterIsInstance>() + .filter { it.name !in ignored } + .associate { it.name to it.get(this) } +} + + +fun PersistedEvent.toDto(event: Event): SequenceEvent = + SequenceEvent( + eventId = this.eventId, + referenceId = this.referenceId, + type = this.event, + timestamp = this.persistedAt, + metadata = MetadataDto( + derivedFromEventIds = event.metadata.derivedFromId, + createdAt = event.metadata.created + ), + payload = event.extractPayload() + ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt new file mode 100644 index 00000000..9e93a142 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/SequenceSummary.kt @@ -0,0 +1,24 @@ +package no.iktdev.mediaprocessing.shared.common.dto + +import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection +import java.time.LocalDateTime + +data class SequenceSummary( + val referenceId: String, + val title: String, + val inputFileName: String?, + val type: ContextType = ContextType.Content, + val lastEventId: String, + val lastEventTime: LocalDateTime, + val metadataTaskStatus: CollectProjection.TaskStatus, + val encodeTaskStatus: CollectProjection.TaskStatus, + val extractTaskStatus: CollectProjection.TaskStatus, + val convertTaskStatus: CollectProjection.TaskStatus, + val coverDownloadTaskStatus: CollectProjection.TaskStatus, + val hasErrors: Boolean, +) + +enum class ContextType { + Content, + Metadata +} diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/requests/StartProcessRequest.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/requests/StartProcessRequest.kt new file mode 100644 index 00000000..7b1f2ba3 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/dto/requests/StartProcessRequest.kt @@ -0,0 +1,9 @@ +package no.iktdev.mediaprocessing.shared.common.dto.requests + +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType + +data class StartProcessRequest( + val fileUri: String, + val operationTypes: Set +) { +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt index dc43c8e0..fd063de0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/CollectProjection.kt @@ -1,20 +1,14 @@ package no.iktdev.mediaprocessing.shared.common.projection import no.iktdev.eventi.models.Event -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.* import no.iktdev.mediaprocessing.shared.common.model.MediaType import java.io.File class CollectProjection(val events: List) { - val startedWith: StartProjection by lazy { projectStartedWith() } + val useFile: File? by lazy { projectUseFile() } + val startedWith: StartProjection? by lazy { projectStartedWith() } var metadataTaskStatus: TaskStatus = TaskStatus.NotInitiated private set var encodeTaskStatus: TaskStatus = TaskStatus.NotInitiated @@ -47,8 +41,17 @@ class CollectProjection(val events: List) { coverDownloadTaskStatus ) - private fun projectStartedWith(): StartProjection { - val startEvent = events.filterIsInstance().first() + private fun projectUseFile(): File? { + val added = events.filterIsInstance().firstOrNull()?.data + val startEvent = projectStartedWith() + return added?.fileUri?.let { File(it) } ?: if (startedWith != null) { + startEvent?.inputFile + } else null + + } + + private fun projectStartedWith(): StartProjection? { + val startEvent = events.filterIsInstance().firstOrNull() ?: return null return StartProjection( inputFile = startEvent.data.fileUri.let { File(it) }, mode = startEvent.data.flow, @@ -57,13 +60,14 @@ class CollectProjection(val events: List) { } - private fun projectMetadata(): MetadataProjection? { val metadataEvent = events.filterIsInstance().lastOrNull() ?: return null - val coverDownloadResultEvents = events.filterIsInstance().filter { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed } - val coverFile = coverDownloadResultEvents.find { it -> it.data?.source == metadataEvent.recommended?.data?.source }?.data?.outputFile - ?.let { File(it) } + val coverDownloadResultEvents = events.filterIsInstance() + .filter { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed } + val coverFile = + coverDownloadResultEvents.find { it -> it.data?.source == metadataEvent.recommended?.data?.source }?.data?.outputFile + ?.let { File(it) } val result = metadataEvent.recommended ?: return null return MetadataProjection( title = result.data.title, @@ -76,22 +80,25 @@ class CollectProjection(val events: List) { } private fun projectProcessedMedia(): ProcessedMediaProjection? { - val encodeEvent = events.filterIsInstance().lastOrNull { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed } + val encodeEvent = events.filterIsInstance() + .lastOrNull { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed } ?: return null val extreactEvents = events.filterIsInstance() - val extractedFiles = if (extreactEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) { - extreactEvents.mapNotNull { it.data?.cachedOutputFile?.let { filePath -> File(filePath) } } - } else { - emptyList() - } + val extractedFiles = + if (extreactEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) { + extreactEvents.mapNotNull { it.data?.cachedOutputFile?.let { filePath -> File(filePath) } } + } else { + emptyList() + } val convertedEvents = events.filterIsInstance() - val convertedFiles = if (convertedEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) { - convertedEvents.flatMap { it.data?.outputFiles?.map { filePath -> File(filePath) } ?: emptyList() } - } else { - emptyList() - } + val convertedFiles = + if (convertedEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) { + convertedEvents.flatMap { it.data?.outputFiles?.map { filePath -> File(filePath) } ?: emptyList() } + } else { + emptyList() + } val encodedFile = encodeEvent.data?.cachedOutputFile?.let { File(it) } @@ -141,8 +148,6 @@ class CollectProjection(val events: List) { ) - - enum class TaskStatus { NotInitiated, Pending, @@ -151,43 +156,49 @@ class CollectProjection(val events: List) { } fun prettyPrint(): String = buildString { - appendLine("📦 Project snapshot") - appendLine("Started with: ${startedWith.inputFile.name} [mode=${startedWith.mode}, tasks=${startedWith.tasks}]") - appendLine("Task statuses:") - appendLine(" - Metadata: ${metadataTaskStatus.colored()}") - appendLine(" - Encode: ${encodeTaskStatus.colored()}") - appendLine(" - Extract: ${extreactTaskStatus.colored()}") - appendLine(" - Convert: ${convertTaskStatus.colored()}") - appendLine(" - Cover: ${coverDownloadTaskStatus.colored()}") + val startedContext = startedWith + if (startedContext != null) { + appendLine("📦 Project snapshot") + appendLine("Started with: ${startedContext.inputFile.name} [mode=${startedContext.mode}, tasks=${startedContext.tasks}]") + appendLine("Task statuses:") + appendLine(" - Metadata: ${metadataTaskStatus.colored()}") + appendLine(" - Encode: ${encodeTaskStatus.colored()}") + appendLine(" - Extract: ${extreactTaskStatus.colored()}") + appendLine(" - Convert: ${convertTaskStatus.colored()}") + appendLine(" - Cover: ${coverDownloadTaskStatus.colored()}") - metadata?.let { - appendLine("Metadata:") - appendLine(" • Title: ${it.title}") - appendLine(" • Genres: ${it.genres.joinToString()}") - appendLine(" • Source: ${it.source}") - appendLine(" • Cover: ${it.cover?.path ?: "none"}") + metadata?.let { + appendLine("Metadata:") + appendLine(" • Title: ${it.title}") + appendLine(" • Genres: ${it.genres.joinToString()}") + appendLine(" • Source: ${it.source}") + appendLine(" • Cover: ${it.cover?.path ?: "none"}") + } + + parsedFileInfo?.let { + appendLine("Parsed file info:") + appendLine(" • Name: ${it.name}") + appendLine(" • Collection: ${it.collection}") + appendLine(" • Type: ${it.mediaType}") + } + + processedMedia?.let { + appendLine("Processed media:") + appendLine(" • Encoded: ${it.encodedFile?.path ?: "none"}") + appendLine(" • Extracted: ${it.extractedFiles.joinToString { f -> f.name }}") + appendLine(" • Converted: ${it.convertedFiles.joinToString { f -> f.name }}") + } + } else { + appendLine("Start event is missing, should not evaluate!") } - parsedFileInfo?.let { - appendLine("Parsed file info:") - appendLine(" • Name: ${it.name}") - appendLine(" • Collection: ${it.collection}") - appendLine(" • Type: ${it.mediaType}") - } - - processedMedia?.let { - appendLine("Processed media:") - appendLine(" • Encoded: ${it.encodedFile?.path ?: "none"}") - appendLine(" • Extracted: ${it.extractedFiles.joinToString { f -> f.name }}") - appendLine(" • Converted: ${it.convertedFiles.joinToString { f -> f.name }}") - } } private fun TaskStatus.colored(): String = when (this) { TaskStatus.NotInitiated -> "\u001B[90m$this\u001B[0m" // grå - TaskStatus.Pending -> "\u001B[33m$this\u001B[0m" // gul - TaskStatus.Completed -> "\u001B[32m$this\u001B[0m" // grønn - TaskStatus.Failed -> "\u001B[31m$this\u001B[0m" // rød + TaskStatus.Pending -> "\u001B[33m$this\u001B[0m" // gul + TaskStatus.Completed -> "\u001B[32m$this\u001B[0m" // grønn + TaskStatus.Failed -> "\u001B[31m$this\u001B[0m" // rød } } \ No newline at end of file diff --git a/shared/common/src/main/resources/application.yml b/shared/common/src/main/resources/application.yml index bc93c80d..ff0f9282 100644 --- a/shared/common/src/main/resources/application.yml +++ b/shared/common/src/main/resources/application.yml @@ -8,3 +8,13 @@ spring: driver-class-name: org.h2.Driver username: sa password: + + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service + diff --git a/shared/common/src/test/resources/application.yml b/shared/common/src/test/resources/application.yml index 3df1331c..cf8e67bd 100644 --- a/shared/common/src/test/resources/application.yml +++ b/shared/common/src/test/resources/application.yml @@ -26,3 +26,12 @@ management: web: exposure: include: mappings + +media: + cache: /src/cache + outgoing: /src/output + incoming: /src/input + +streamit: + address: http://streamit.service +