Props + controller

This commit is contained in:
Brage Skjønborg 2026-01-11 04:10:54 +01:00
parent b0ca3efc58
commit 3925fb758b
42 changed files with 736 additions and 154 deletions

View File

@ -5,7 +5,7 @@ spring:
flyway:
enabled: true
locations: classpath:flyway
baseline-on-migrate: true
baseline-on-migrate: false
management:
endpoints:
@ -22,3 +22,12 @@ logging:
org.apache.kafka: INFO
Exposed: OFF
org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service

View File

@ -31,3 +31,11 @@ management:
endpoint:
health:
show-details: always
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service

View File

@ -6,11 +6,13 @@ import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig
import no.iktdev.mediaprocessing.shared.common.DatabaseApplication
import no.iktdev.mediaprocessing.shared.common.MediaProcessingApp
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry
import no.iktdev.mediaprocessing.shared.common.getAppVersion
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Configuration
@ -52,3 +54,11 @@ open class ApplicationConfiguration() {
}
}
}
@Configuration
@EnableConfigurationProperties(
value = [
ExecutablesConfig::class
]
)
class CoordinatorConfig

View File

@ -1,17 +1,23 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig
import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths
import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig
import org.springframework.stereotype.Service
import java.io.File
class CoordinatorEnv {
companion object {
val streamitAddress = System.getenv("STREAMIT_ADDRESS") ?: "http://streamit.service"
val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe"
@Service
class CoordinatorEnv(
val streamIt: StreamItConfig,
val exec: ExecutablesConfig,
val media: MediaPaths
) {
val streamitAddress = streamIt.address
val ffprobe = exec.ffprobe
val cachedContent = File(media.cache)
val outgoingContent = File(media.outgoing)
val incomingContent = File(media.incoming)
val preference: File = File("/data/config/preference.json")
var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache")
val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")
}
}

View File

@ -15,4 +15,7 @@ class CoordinatorService {
fun getProgress(taskId: String): ProgressUpdate? =
progressMap[taskId]
fun getProgress(): List<ProgressUpdate> =
progressMap.values.toList()
}

View File

@ -3,6 +3,7 @@ package no.iktdev.mediaprocessing.coordinator
import com.google.gson.Gson
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec
import org.springframework.stereotype.Component
import java.io.File
@ -34,11 +35,11 @@ data class AudioPreference(
val codec: AudioCodec
)
object Preference {
@Component
class Preference(private val coordinatorEnv: CoordinatorEnv) {
fun getProcesserPreference(): ProcesserPreference {
var preference: ProcesserPreference = ProcesserPreference.default()
CoordinatorEnv.preference.ifExists({
coordinatorEnv.preference.ifExists({
val text = readText()
try {
val result = Gson().fromJson(text, PeferenceConfig::class.java)
@ -47,7 +48,7 @@ object Preference {
e.printStackTrace()
}
}, orElse = {
CoordinatorEnv.preference.writeText(Gson().toJson(PeferenceConfig(preference)))
coordinatorEnv.preference.writeText(Gson().toJson(PeferenceConfig(preference)))
})
return preference
}

View File

@ -9,12 +9,14 @@ import org.springframework.web.client.RestTemplate
class RestTemplateConfig {
@Configuration
class RestTemplateConfig {
class RestTemplateConfig(
private val coordinatorEnv: CoordinatorEnv
) {
@Bean
fun streamitRestTemplate(): RestTemplate {
return RestTemplateBuilder()
.rootUri(CoordinatorEnv.streamitAddress)
.rootUri(coordinatorEnv.streamitAddress)
.build()
}
}

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.config
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "executables")
data class ExecutablesConfig(
val ffprobe: String
)

View File

@ -0,0 +1,31 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.services.EventPagingService
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
import java.util.*
@RestController
@RequestMapping("/events")
class EventsController(
private val paging: EventPagingService
) {
@GetMapping
fun getEvents(
@RequestParam referenceId: UUID,
@RequestParam(required = false) beforeEventId: UUID?,
@RequestParam(required = false) afterEventId: UUID?,
@RequestParam(defaultValue = "50") limit: Int
): List<SequenceEvent> {
return paging.getEvents(
referenceId = referenceId,
beforeEventId = beforeEventId,
afterEventId = afterEventId,
limit = limit
)
}
}

View File

@ -1,22 +1,34 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.CoordinatorService
import no.iktdev.mediaprocessing.coordinator.services.SseHub
import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.bind.annotation.*
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
@RestController
@RequestMapping("/internal")
class InternalProcesserController(
private val coordinator: CoordinatorService
private val coordinator: CoordinatorService,
private val hub: SseHub
) {
@PostMapping("/progress")
fun receiveProgress(@RequestBody update: ProgressUpdate): ResponseEntity<Void> {
coordinator.updateProgress(update)
hub.broadcast("progress", update)
return ResponseEntity.ok().build()
}
@GetMapping("/progress")
fun getAllProgress(): List<ProgressUpdate> {
return coordinator.getProgress()
}
@GetMapping("/sse")
fun stream(): SseEmitter {
return hub.createEmitter()
}
}

View File

@ -0,0 +1,43 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.services.CommandService
import no.iktdev.mediaprocessing.shared.common.dto.requests.StartProcessRequest
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/operations")
class OperationsController(
private val commandService: CommandService
) {
@PostMapping("/start")
fun startProcess(@RequestBody req: StartProcessRequest): ResponseEntity<Map<String, String>> {
val referenceId = commandService.startProcess(req)
return when (val result = commandService.startProcess(req)) {
is CommandService.StartResult.Accepted -> ResponseEntity
.accepted()
.body(
mapOf(
"referenceId" to result.referenceId.toString(),
"status" to "accepted",
"message" to "Process accepted and StartedEvent created"
)
)
is CommandService.StartResult.Rejected -> ResponseEntity
.badRequest()
.body(
mapOf(
"status" to "rejected",
"message" to result.reason
)
)
}
}
}

View File

@ -0,0 +1,27 @@
package no.iktdev.mediaprocessing.coordinator.controller
import no.iktdev.mediaprocessing.coordinator.services.SequenceAggregatorService
import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping("/sequences")
class SequenceController(
private val aggregator: SequenceAggregatorService
) {
@GetMapping("/active")
fun getActive(): List<SequenceSummary> {
return aggregator.getActiveSequences()
}
@GetMapping("/recent")
fun getRecent(
@RequestParam(defaultValue = "15") limit: Int
): List<SequenceSummary> {
return aggregator.getRecentSequences(limit)
}
}

View File

@ -3,16 +3,8 @@ package no.iktdev.mediaprocessing.coordinator.listeners.events
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.coordinator.Preference
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioTarget
import no.iktdev.mediaprocessing.ffmpeg.dsl.MediaPlan
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoTarget
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.mediaprocessing.ffmpeg.dsl.*
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask
import no.iktdev.mediaprocessing.shared.common.stores.TaskStore
@ -20,13 +12,15 @@ import org.springframework.stereotype.Component
import java.io.File
@Component
class MediaCreateEncodeTaskListener : EventListener() {
class MediaCreateEncodeTaskListener(
private val preference: Preference
) : EventListener() {
override fun onEvent(
event: Event,
history: List<Event>
): Event? {
val preference = Preference.getProcesserPreference()
val preference = preference.getProcesserPreference()
val startedEvent = history.filterIsInstance<StartProcessingEvent>().firstOrNull() ?: return null
if (startedEvent.data.operation.isNotEmpty()) {

View File

@ -13,7 +13,9 @@ import no.iktdev.mediaprocessing.shared.common.stores.TaskStore
import org.springframework.stereotype.Component
@Component
class MigrateCreateStoreTaskListener: EventListener() {
class MigrateCreateStoreTaskListener(
private val coordinatorEnv: CoordinatorEnv,
): EventListener() {
private val log = KotlinLogging.logger {}
override fun onEvent(
@ -31,7 +33,7 @@ class MigrateCreateStoreTaskListener: EventListener() {
log.warn { "One or more tasks have failed in ${event.referenceId}" }
}
val migrateContentProjection = MigrateContentProject(useHistory, CoordinatorEnv.outgoingContent)
val migrateContentProjection = MigrateContentProject(useHistory, coordinatorEnv.outgoingContent)
val collection = migrateContentProjection.useStore?.name ?:
throw RuntimeException("No content store configured for migration in ${event.referenceId}")

View File

@ -15,7 +15,9 @@ import org.springframework.stereotype.Component
import java.util.*
@Component
class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) {
class DownloadCoverTaskListener(
private val coordinatorEnv: CoordinatorEnv,
): TaskListener(TaskType.MIXED) {
val log = KotlinLogging.logger {}
override fun getWorkerId(): String {
@ -57,11 +59,11 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) {
}
open fun getDownloadClient(): DownloadClient {
return DefaultDownloadClient()
return DefaultDownloadClient(coordinatorEnv)
}
class DefaultDownloadClient() : DownloadClient(
outDir = CoordinatorEnv.cachedContent,
class DefaultDownloadClient(private val coordinatorEnv: CoordinatorEnv) : DownloadClient(
outDir = coordinatorEnv.cachedContent,
connectionFactory = DefaultConnectionFactory(),) {
override fun onCreate() {
super.onCreate()

View File

@ -13,7 +13,9 @@ import org.springframework.stereotype.Component
import java.util.*
@Component
class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) {
class MediaStreamReadTaskListener(
private val coordinatorEnv: CoordinatorEnv
): FfprobeTaskListener(TaskType.CPU_INTENSIVE) {
val log = KotlinLogging.logger {}
override fun getWorkerId(): String {
@ -48,7 +50,7 @@ class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) {
}
override fun getFfprobe(): FFprobe {
return JsonFfinfo(CoordinatorEnv.ffprobe)
return JsonFfinfo(coordinatorEnv.ffprobe)
}
class JsonFfinfo(executable: String): FFprobe(executable) {

View File

@ -0,0 +1,46 @@
package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.mediaprocessing.shared.common.dto.requests.StartProcessRequest
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.mediaprocessing.shared.common.notExist
import no.iktdev.mediaprocessing.shared.common.stores.EventStore
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
@Service
class CommandService {
fun startProcess(request: StartProcessRequest): StartResult {
return try {
val file = File(request.fileUri)
if (file.notExist()) {
throw IllegalArgumentException("File does not exists at ${request.fileUri}")
}
if (!file.canRead()) {
throw IllegalStateException("File is not readable ${request.fileUri}")
}
val startProcessingEvent = StartProcessingEvent(
data = StartData(
fileUri = request.fileUri,
operation = request.operationTypes,
flow = StartFlow.Manual
)
).newReferenceId()
EventStore.persist(startProcessingEvent)
StartResult.Accepted(startProcessingEvent.referenceId)
} catch (e: Exception) {
StartResult.Rejected("Failed to start process for file ${request.fileUri}, with the following reason: ${e.message}")
}
}
sealed class StartResult {
data class Accepted(val referenceId: UUID) : StartResult()
data class Rejected(val reason: String) : StartResult()
}
}

View File

@ -0,0 +1,41 @@
package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.mediaprocessing.shared.common.dto.SequenceEvent
import no.iktdev.mediaprocessing.shared.common.dto.toDto
import no.iktdev.mediaprocessing.shared.common.stores.EventStore
import org.springframework.stereotype.Service
import java.util.*
@Service
class EventPagingService {
fun getEvents(
referenceId: UUID,
beforeEventId: UUID?,
afterEventId: UUID?,
limit: Int
): List<SequenceEvent> {
val all = EventStore.getPersistedEventsFor(referenceId)
.sortedByDescending { it.persistedAt }
val filtered = when {
beforeEventId != null ->
all.dropWhile { it.eventId != beforeEventId }.drop(1)
afterEventId != null ->
all.takeWhile { it.eventId != afterEventId }
else -> all
}
return filtered
.take(limit)
.mapNotNull { persisted ->
val event = persisted.toEvent() ?: return@mapNotNull null
persisted.toDto(event)
}
}
}

View File

@ -0,0 +1,61 @@
package no.iktdev.mediaprocessing.coordinator.services
import no.iktdev.eventi.ZDS.toEvent
import no.iktdev.eventi.models.store.PersistedEvent
import no.iktdev.mediaprocessing.shared.common.LocalDateTimeEpoch
import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
import no.iktdev.mediaprocessing.shared.common.stores.EventStore
import org.springframework.stereotype.Service
@Service
class SequenceAggregatorService() {
fun getActiveSequences(): List<SequenceSummary> {
val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch)
// Gruppér først, deserialiser senere
val grouped = allEvents.groupBy { it.referenceId }
return grouped.values
// aktive = ingen CollectedEvent
.filter { events -> events.none { it.event == CollectedEvent::class.java.simpleName } }
.mapNotNull { events -> buildSummary(events) }
.sortedByDescending { it.lastEventTime }
}
fun getRecentSequences(limit: Int): List<SequenceSummary> {
val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch)
val grouped = allEvents.groupBy { it.referenceId }
return grouped.values
.mapNotNull { events -> buildSummary(events) }
.sortedByDescending { it.lastEventTime }
.take(limit)
}
private fun buildSummary(events: List<PersistedEvent>): SequenceSummary? {
val last = events.maxByOrNull { it.persistedAt } ?: return null
// Deserialiser kun eventene for denne sekvensen
val domainEvents = events.mapNotNull { it.toEvent() }
val projection = CollectProjection(domainEvents)
return SequenceSummary(
referenceId = last.referenceId.toString(),
title = "",
inputFileName = projection.useFile?.name,
lastEventId = last.eventId.toString(),
lastEventTime = last.persistedAt,
metadataTaskStatus = projection.metadataTaskStatus,
encodeTaskStatus = projection.encodeTaskStatus,
extractTaskStatus = projection.extreactTaskStatus,
convertTaskStatus = projection.convertTaskStatus,
coverDownloadTaskStatus = projection.coverDownloadTaskStatus,
hasErrors = projection.getTaskStatus().any { it == CollectProjection.TaskStatus.Failed }
)
}
}

View File

@ -0,0 +1,40 @@
package no.iktdev.mediaprocessing.coordinator.services
import org.springframework.stereotype.Service
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.util.concurrent.CopyOnWriteArrayList
@Service
class SseHub {
private val emitters = CopyOnWriteArrayList<SseEmitter>()
fun createEmitter(): SseEmitter {
val emitter = SseEmitter(0L) // never timeout
emitters.add(emitter)
emitter.onCompletion { emitters.remove(emitter) }
emitter.onTimeout { emitters.remove(emitter) }
emitter.onError { emitters.remove(emitter) }
return emitter
}
fun broadcast(eventName: String, data: Any) {
val dead = mutableListOf<SseEmitter>()
emitters.forEach { emitter ->
try {
emitter.send(
SseEmitter.event()
.name(eventName)
.data(data)
)
} catch (ex: Exception) {
dead.add(emitter)
}
}
emitters.removeAll(dead.toSet())
}
}

View File

@ -5,7 +5,7 @@ spring:
flyway:
enabled: true
locations: classpath:flyway
baseline-on-migrate: true
baseline-on-migrate: false
management:
endpoints:
@ -23,3 +23,14 @@ logging:
org.apache.kafka: INFO
Exposed: OFF
org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service
executables:
ffprobe: ffprobe

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing
import jakarta.annotation.PostConstruct
import no.iktdev.eventi.ListenerOrder
import no.iktdev.eventi.events.EventListenerRegistry
import no.iktdev.mediaprocessing.coordinator.CoordinatorApplication
@ -8,29 +9,34 @@ import no.iktdev.mediaprocessing.shared.common.config.DatasourceConfiguration
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.junit.jupiter.api.extension.ExtendWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.context.TestConfiguration
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.ComponentScan
import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit.jupiter.SpringExtension
@SpringBootTest(
classes = [CoordinatorApplication::class,
DatasourceConfiguration::class],
DatasourceConfiguration::class,
ListenerInformOrderTest.RegistryResetConfig::class],
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT
)
@TestPropertySource(properties = ["spring.flyway.enabled=true"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ComponentScan("no.iktdev.mediaprocessing.coordinator.listeners.events")
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@ExtendWith(SpringExtension::class)
class ListenerInformOrderTest(): TestBase() {
@Autowired lateinit var ctx: ApplicationContext
@BeforeEach
fun reset() {
}
@Test
fun verifyTaskRegistryIsNotEmpty() {
assertThat { TaskRegistry.getTasks().isNotEmpty() }
@ -54,4 +60,13 @@ class ListenerInformOrderTest(): TestBase() {
MediaCreateMetadataSearchTaskListener::class.java.simpleName,
)
}
@TestConfiguration
class RegistryResetConfig {
@PostConstruct
fun reset() {
EventListenerRegistry.wipe()
}
}
}

View File

@ -3,10 +3,7 @@ package no.iktdev.mediaprocessing
import io.mockk.*
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.mediaprocessing.coordinator.AudioPreference
import no.iktdev.mediaprocessing.coordinator.Preference
import no.iktdev.mediaprocessing.coordinator.ProcesserPreference
import no.iktdev.mediaprocessing.coordinator.VideoPreference
import no.iktdev.mediaprocessing.coordinator.*
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
@ -21,17 +18,25 @@ open class TestBase {
class DummyEvent: Event()
class DummyTask: Task()
val preference: Preference = mockk(relaxed = true)
val coordinatorEnv = mockk<CoordinatorEnv>(relaxed = true)
@BeforeEach
fun setup() {
open fun setup() {
mockkObject(TaskStore)
every { TaskStore.persist(any()) } just Runs
mockkObject(Preference)
every { Preference.getProcesserPreference() } returns ProcesserPreference(
every { preference.getProcesserPreference() } returns ProcesserPreference(
videoPreference = VideoPreference(codec = VideoCodec.Hevc()),
audioPreference = AudioPreference(codec = AudioCodec.Aac(channels = 2))
)
every { coordinatorEnv.outgoingContent } returns File("./tmp/output")
every { coordinatorEnv.incomingContent } returns File("./tmp/input")
every { coordinatorEnv.cachedContent } returns File("./tmp/cached")
every { coordinatorEnv.streamitAddress } returns "http://streamit.lan"
}
fun mockkIO() {
mockkConstructor(File::class)
every { anyConstructed<File>().exists() } returns true

View File

@ -0,0 +1,20 @@
package no.iktdev.mediaprocessing
import no.iktdev.eventi.events.EventListener
import no.iktdev.eventi.events.EventListenerRegistry
import org.assertj.core.api.Assertions.assertThat
import java.lang.reflect.Field
fun EventListenerRegistry.wipe() {
val field: Field = EventListenerRegistry::class.java
.superclass
.getDeclaredField("listeners")
field.isAccessible = true
// Tøm mapen
val mutableList = field.get(EventListenerRegistry) as MutableList<*>
(mutableList as MutableList<Class<out EventListener>>).clear()
// Verifiser at det er tomt
assertThat(EventListenerRegistry.getListeners().isEmpty())
}

View File

@ -1,49 +1,38 @@
package no.iktdev.mediaprocessing.coordinator.listeners.events
import io.mockk.Runs
import io.mockk.every
import io.mockk.just
import io.mockk.mockkObject
import io.mockk.verify
import io.mockk.*
import no.iktdev.mediaprocessing.TestBase
import no.iktdev.mediaprocessing.coordinator.AudioPreference
import no.iktdev.mediaprocessing.coordinator.Preference
import no.iktdev.mediaprocessing.coordinator.ProcesserPreference
import no.iktdev.mediaprocessing.coordinator.VideoPreference
import no.iktdev.mediaprocessing.ffmpeg.data.AudioStream
import no.iktdev.mediaprocessing.ffmpeg.data.Disposition
import no.iktdev.mediaprocessing.ffmpeg.data.ParsedMediaStreams
import no.iktdev.mediaprocessing.ffmpeg.data.Tags
import no.iktdev.mediaprocessing.ffmpeg.data.VideoStream
import no.iktdev.mediaprocessing.ffmpeg.data.*
import no.iktdev.mediaprocessing.ffmpeg.dsl.AudioCodec
import no.iktdev.mediaprocessing.ffmpeg.dsl.VideoCodec
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaStreamParsedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaTracksEncodeSelectedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeTaskCreatedEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask
import no.iktdev.mediaprocessing.shared.common.stores.TaskStore
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
class MediaCreateEncodeTaskListenerTest {
class MediaCreateEncodeTaskListenerTest: TestBase() {
private val listener = MediaCreateEncodeTaskListener()
private val listener = MediaCreateEncodeTaskListener(preference)
@BeforeEach
fun setup() {
override fun setup() {
mockkObject(TaskStore)
every { TaskStore.persist(any()) } just Runs
mockkObject(Preference)
every { Preference.getProcesserPreference() } returns ProcesserPreference(
every { preference.getProcesserPreference() } returns ProcesserPreference(
videoPreference = VideoPreference(codec = VideoCodec.Hevc()),
audioPreference = AudioPreference(codec = AudioCodec.Aac(channels = 2))
)
}
@Test
@DisplayName("""
Hvis en video- og audio-track er valgt

View File

@ -23,7 +23,7 @@ import java.io.File
class MigrateCreateStoreTaskListenerTest : TestBase() {
private val listener = MigrateCreateStoreTaskListener()
private val listener = MigrateCreateStoreTaskListener(coordinatorEnv)
@Test
@DisplayName(

View File

@ -7,6 +7,7 @@ import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockDownloadClient
import no.iktdev.mediaprocessing.TestBase
import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv
import no.iktdev.mediaprocessing.shared.common.DownloadClient
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask
@ -17,9 +18,9 @@ import java.io.File
import java.util.*
import kotlin.system.measureTimeMillis
class DownloadCoverTaskListenerTest {
class DownloadCoverTaskListenerTest: TestBase() {
class DownloadCoverTaskListenerTestImplementation : DownloadCoverTaskListener() {
class DownloadCoverTaskListenerTestImplementation(coordinatorEnv: CoordinatorEnv) : DownloadCoverTaskListener(coordinatorEnv) {
fun getJob() = currentJob
lateinit var client: DownloadClient
@ -43,7 +44,7 @@ class DownloadCoverTaskListenerTest {
override fun publishEvent(event: Event) {}
}
private var listener = DownloadCoverTaskListenerTestImplementation()
private var listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv)
@Test
@DisplayName(
@ -65,7 +66,7 @@ class DownloadCoverTaskListenerTest {
)
).newReferenceId()
listener = DownloadCoverTaskListenerTestImplementation().apply {
listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply {
this.client = MockDownloadClient(
delayMillis = delay,
mockFile = File("/tmp/fancy.jpg")
@ -103,7 +104,7 @@ class DownloadCoverTaskListenerTest {
)
).newReferenceId()
listener = DownloadCoverTaskListenerTestImplementation().apply {
listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply {
this.client = MockDownloadClient(throwException = true)
}
@ -141,7 +142,7 @@ class DownloadCoverTaskListenerTest {
fun onTask_produces_correct_output_path() = runTest {
val mockFile = File("/tmp/expected.jpg")
listener = DownloadCoverTaskListenerTestImplementation().apply {
listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply {
this.client = MockDownloadClient(mockFile = mockFile)
}
@ -172,7 +173,7 @@ class DownloadCoverTaskListenerTest {
fun accept_is_non_blocking() = runTest {
val delay = 500L
listener = DownloadCoverTaskListenerTestImplementation().apply {
listener = DownloadCoverTaskListenerTestImplementation(coordinatorEnv).apply {
this.client = MockDownloadClient(delayMillis = delay, mockFile = File("/tmp/x.jpg"))
}

View File

@ -9,6 +9,8 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockFFprobe
import no.iktdev.mediaprocessing.TestBase
import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv
import no.iktdev.mediaprocessing.ffmpeg.FFprobe
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask
@ -19,9 +21,9 @@ import org.junit.jupiter.api.Test
import java.util.*
import kotlin.system.measureTimeMillis
class MediaStreamReadTaskListenerTest {
class MediaStreamReadTaskListenerTest: TestBase() {
class MediaStreamReadTaskListenerTestImplementation(): MediaStreamReadTaskListener() {
class MediaStreamReadTaskListenerTestImplementation(coordinatorEnv: CoordinatorEnv): MediaStreamReadTaskListener(coordinatorEnv) {
fun getJob() = currentJob
lateinit var probe: FFprobe
@ -50,11 +52,11 @@ class MediaStreamReadTaskListenerTest {
}
}
var listener = MediaStreamReadTaskListenerTestImplementation()
var listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv)
@BeforeEach
fun resetListener() {
listener = MediaStreamReadTaskListenerTestImplementation()
listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv)
}
@ -66,7 +68,7 @@ class MediaStreamReadTaskListenerTest {
val task = MediaReadTask(fileUri = "test.mp4").newReferenceId()
listener = MediaStreamReadTaskListenerTestImplementation().apply {
listener = MediaStreamReadTaskListenerTestImplementation(coordinatorEnv).apply {
this.probe = MockFFprobe.success(json, delay)
}

View File

@ -31,3 +31,14 @@ management:
endpoint:
health:
show-details: always
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service
executables:
ffprobe: ffprobe

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.processer
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "executables")
data class ExecutablesConfig(
val ffmpeg: String
)

View File

@ -11,6 +11,7 @@ import no.iktdev.mediaprocessing.shared.common.MediaProcessingApp
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry
import no.iktdev.mediaprocessing.shared.common.getAppVersion
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Configuration
@ -52,3 +53,11 @@ open class ApplicationConfiguration() {
}
}
}
@Configuration
@EnableConfigurationProperties(
value = [
ExecutablesConfig::class
]
)
class ProcesserConfig

View File

@ -5,7 +5,7 @@ spring:
flyway:
enabled: true
locations: classpath:flyway
baseline-on-migrate: true
baseline-on-migrate: false
management:
endpoints:
@ -22,3 +22,14 @@ logging:
org.apache.kafka: INFO
Exposed: OFF
org.springframework.web.socket.config.WebSocketMessageBrokerStats: WARN
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service
executables:
ffmpeg: ffmpeg

View File

@ -31,3 +31,14 @@ management:
endpoint:
health:
show-details: always
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service
executables:
ffmpeg: ffmpeg

View File

@ -1,11 +1,16 @@
package no.iktdev.mediaprocessing.shared.common
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths
import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig
import org.jetbrains.exposed.sql.Database
import org.springframework.beans.factory.InitializingBean
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.boot.runApplication
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.stereotype.Component
import javax.sql.DataSource
@ -34,4 +39,14 @@ class ExposedInitializer(
@Retention(AnnotationRetention.RUNTIME)
@SpringBootApplication
@ComponentScan("no.iktdev.mediaprocessing") // sikrer at common beans blir plukket opp
@Import(SharedConfig::class)
annotation class MediaProcessingApp
@Configuration
@EnableConfigurationProperties(
value = [
StreamItConfig::class,
MediaPaths::class
]
)
class SharedConfig

View File

@ -10,6 +10,7 @@ import java.io.FileInputStream
import java.io.RandomAccessFile
import java.net.InetAddress
import java.security.MessageDigest
import java.time.LocalDateTime
import java.util.zip.CRC32
private val logger = KotlinLogging.logger {}
@ -214,3 +215,6 @@ fun File.resolveConflict(): File {
return candidate
}
val LocalDateTimeEpoch: LocalDateTime =
LocalDateTime.of(1970, 1, 1, 0, 0, 0)

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.shared.common.configs
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "streamit")
data class StreamItConfig(
val address: String
)
@ConfigurationProperties(prefix = "media")
data class MediaPaths(
val cache: String,
val outgoing: String,
val incoming: String
)

View File

@ -0,0 +1,44 @@
package no.iktdev.mediaprocessing.shared.common.dto
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.PersistedEvent
import java.time.LocalDateTime
import java.util.*
import kotlin.reflect.KProperty1
data class SequenceEvent(
val eventId: UUID,
val referenceId: UUID,
val type: String,
val timestamp: LocalDateTime,
val metadata: MetadataDto,
val payload: Map<String, Any?>?
)
data class MetadataDto(
val derivedFromEventIds: Set<UUID>?,
val createdAt: LocalDateTime
)
fun Event.extractPayload(): Map<String, Any?>? {
val ignored = setOf("referenceId", "eventId", "metadata")
return this::class.members
.filterIsInstance<KProperty1<Event, *>>()
.filter { it.name !in ignored }
.associate { it.name to it.get(this) }
}
fun PersistedEvent.toDto(event: Event): SequenceEvent =
SequenceEvent(
eventId = this.eventId,
referenceId = this.referenceId,
type = this.event,
timestamp = this.persistedAt,
metadata = MetadataDto(
derivedFromEventIds = event.metadata.derivedFromId,
createdAt = event.metadata.created
),
payload = event.extractPayload()
)

View File

@ -0,0 +1,24 @@
package no.iktdev.mediaprocessing.shared.common.dto
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
import java.time.LocalDateTime
data class SequenceSummary(
val referenceId: String,
val title: String,
val inputFileName: String?,
val type: ContextType = ContextType.Content,
val lastEventId: String,
val lastEventTime: LocalDateTime,
val metadataTaskStatus: CollectProjection.TaskStatus,
val encodeTaskStatus: CollectProjection.TaskStatus,
val extractTaskStatus: CollectProjection.TaskStatus,
val convertTaskStatus: CollectProjection.TaskStatus,
val coverDownloadTaskStatus: CollectProjection.TaskStatus,
val hasErrors: Boolean,
)
enum class ContextType {
Content,
Metadata
}

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.shared.common.dto.requests
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
data class StartProcessRequest(
val fileUri: String,
val operationTypes: Set<OperationType>
) {
}

View File

@ -1,20 +1,14 @@
package no.iktdev.mediaprocessing.shared.common.projection
import no.iktdev.eventi.models.Event
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ConvertTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MediaParsedInfoEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MetadataSearchResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.OperationType
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartFlow
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.*
import no.iktdev.mediaprocessing.shared.common.model.MediaType
import java.io.File
class CollectProjection(val events: List<Event>) {
val startedWith: StartProjection by lazy { projectStartedWith() }
val useFile: File? by lazy { projectUseFile() }
val startedWith: StartProjection? by lazy { projectStartedWith() }
var metadataTaskStatus: TaskStatus = TaskStatus.NotInitiated
private set
var encodeTaskStatus: TaskStatus = TaskStatus.NotInitiated
@ -47,8 +41,17 @@ class CollectProjection(val events: List<Event>) {
coverDownloadTaskStatus
)
private fun projectStartedWith(): StartProjection {
val startEvent = events.filterIsInstance<no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StartProcessingEvent>().first()
private fun projectUseFile(): File? {
val added = events.filterIsInstance<FileAddedEvent>().firstOrNull()?.data
val startEvent = projectStartedWith()
return added?.fileUri?.let { File(it) } ?: if (startedWith != null) {
startEvent?.inputFile
} else null
}
private fun projectStartedWith(): StartProjection? {
val startEvent = events.filterIsInstance<StartProcessingEvent>().firstOrNull() ?: return null
return StartProjection(
inputFile = startEvent.data.fileUri.let { File(it) },
mode = startEvent.data.flow,
@ -57,12 +60,13 @@ class CollectProjection(val events: List<Event>) {
}
private fun projectMetadata(): MetadataProjection? {
val metadataEvent = events.filterIsInstance<MetadataSearchResultEvent>().lastOrNull()
?: return null
val coverDownloadResultEvents = events.filterIsInstance<CoverDownloadResultEvent>().filter { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }
val coverFile = coverDownloadResultEvents.find { it -> it.data?.source == metadataEvent.recommended?.data?.source }?.data?.outputFile
val coverDownloadResultEvents = events.filterIsInstance<CoverDownloadResultEvent>()
.filter { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }
val coverFile =
coverDownloadResultEvents.find { it -> it.data?.source == metadataEvent.recommended?.data?.source }?.data?.outputFile
?.let { File(it) }
val result = metadataEvent.recommended ?: return null
return MetadataProjection(
@ -76,18 +80,21 @@ class CollectProjection(val events: List<Event>) {
}
private fun projectProcessedMedia(): ProcessedMediaProjection? {
val encodeEvent = events.filterIsInstance<ProcesserEncodeResultEvent>().lastOrNull { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }
val encodeEvent = events.filterIsInstance<ProcesserEncodeResultEvent>()
.lastOrNull { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }
?: return null
val extreactEvents = events.filterIsInstance<ProcesserExtractResultEvent>()
val extractedFiles = if (extreactEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) {
val extractedFiles =
if (extreactEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) {
extreactEvents.mapNotNull { it.data?.cachedOutputFile?.let { filePath -> File(filePath) } }
} else {
emptyList()
}
val convertedEvents = events.filterIsInstance<ConvertTaskResultEvent>()
val convertedFiles = if (convertedEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) {
val convertedFiles =
if (convertedEvents.all { it.status == no.iktdev.eventi.models.store.TaskStatus.Completed }) {
convertedEvents.flatMap { it.data?.outputFiles?.map { filePath -> File(filePath) } ?: emptyList() }
} else {
emptyList()
@ -141,8 +148,6 @@ class CollectProjection(val events: List<Event>) {
)
enum class TaskStatus {
NotInitiated,
Pending,
@ -151,8 +156,10 @@ class CollectProjection(val events: List<Event>) {
}
fun prettyPrint(): String = buildString {
val startedContext = startedWith
if (startedContext != null) {
appendLine("📦 Project snapshot")
appendLine("Started with: ${startedWith.inputFile.name} [mode=${startedWith.mode}, tasks=${startedWith.tasks}]")
appendLine("Started with: ${startedContext.inputFile.name} [mode=${startedContext.mode}, tasks=${startedContext.tasks}]")
appendLine("Task statuses:")
appendLine(" - Metadata: ${metadataTaskStatus.colored()}")
appendLine(" - Encode: ${encodeTaskStatus.colored()}")
@ -181,6 +188,10 @@ class CollectProjection(val events: List<Event>) {
appendLine(" • Extracted: ${it.extractedFiles.joinToString { f -> f.name }}")
appendLine(" • Converted: ${it.convertedFiles.joinToString { f -> f.name }}")
}
} else {
appendLine("Start event is missing, should not evaluate!")
}
}
private fun TaskStatus.colored(): String = when (this) {

View File

@ -8,3 +8,13 @@ spring:
driver-class-name: org.h2.Driver
username: sa
password:
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service

View File

@ -26,3 +26,12 @@ management:
web:
exposure:
include: mappings
media:
cache: /src/cache
outgoing: /src/output
incoming: /src/input
streamit:
address: http://streamit.service