Moved to props + fixing abandoned issue

This commit is contained in:
Brage Skjønborg 2026-02-01 03:48:33 +01:00
parent 7fc45625bd
commit 22627c387a
29 changed files with 320 additions and 90 deletions

View File

@ -7,6 +7,8 @@ import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig import no.iktdev.mediaprocessing.coordinator.config.ExecutablesConfig
import no.iktdev.mediaprocessing.coordinator.config.ProcesserClientProperties
import no.iktdev.mediaprocessing.shared.common.configs.StreamItConfig
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry
import no.iktdev.mediaprocessing.shared.common.getAppVersion import no.iktdev.mediaprocessing.shared.common.getAppVersion
@ -58,7 +60,9 @@ open class ApplicationConfiguration() {
@Configuration @Configuration
@EnableConfigurationProperties( @EnableConfigurationProperties(
value = [ value = [
ExecutablesConfig::class ExecutablesConfig::class,
StreamItConfig::class,
ProcesserClientProperties::class
] ]
) )
class CoordinatorConfig class CoordinatorConfig

View File

@ -0,0 +1,23 @@
package no.iktdev.mediaprocessing.coordinator
import org.springframework.stereotype.Component
import org.springframework.web.reactive.function.client.WebClient
import reactor.core.publisher.Mono
@Component
class ProcesserClient(
private val webClient: WebClient
) {
fun fetchLog(path: String): Mono<String> =
webClient.get()
.uri { it.path("/state/log").queryParam("path", path).build() }
.retrieve()
.bodyToMono(String::class.java)
fun ping(): Mono<String> =
webClient.get()
.uri("/actuator/health")
.retrieve()
.bodyToMono(String::class.java)
}

View File

@ -0,0 +1,25 @@
package no.iktdev.mediaprocessing.coordinator.config
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient
@Configuration
class ProcesserWebClientConfig {
@Bean
fun processerWebClient(
builder: WebClient.Builder,
props: ProcesserClientProperties
): WebClient =
builder
.baseUrl(props.baseUrl)
.build()
}
@ConfigurationProperties(prefix = "processer")
data class ProcesserClientProperties(
val baseUrl: String
)

View File

@ -34,6 +34,6 @@ fun PersistedTask.toCoordinatorTransferDto(): CoordinatorTaskTransferDto {
consumed = consumed, consumed = consumed,
lastCheckIn = lastCheckIn, lastCheckIn = lastCheckIn,
persistedAt = persistedAt, persistedAt = persistedAt,
abandoned = TaskLifecycleRules.isAbandoned(consumed, lastCheckIn) abandoned = TaskLifecycleRules.isAbandoned(consumed, persistedAt, lastCheckIn)
) )
} }

View File

@ -31,7 +31,7 @@ class CoordinatorHealthService(
// --- TASK HEALTH --- // --- TASK HEALTH ---
val abandonedTaskIds = tasks val abandonedTaskIds = tasks
.filter { TaskLifecycleRules.isAbandoned(it.consumed, it.lastCheckIn) } .filter { TaskLifecycleRules.isAbandoned(it.consumed, it.persistedAt, it.lastCheckIn) }
.map { it.taskId } .map { it.taskId }
val stalledTaskIds = tasks val stalledTaskIds = tasks

View File

@ -31,7 +31,10 @@ media:
incoming: /src/input incoming: /src/input
streamit: streamit:
address: http://streamit.service base-url: ${PROCESSER_URL:http://processer:8080}
processer:
base-url: http://processer.service:8080
executables: executables:
ffprobe: ffprobe ffprobe: ffprobe

View File

@ -40,5 +40,8 @@ media:
streamit: streamit:
address: http://streamit.service address: http://streamit.service
processer:
base-url: http://processer.service:8080
executables: executables:
ffprobe: ffprobe ffprobe: ffprobe

View File

@ -1,6 +1,9 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer
import jakarta.annotation.PostConstruct
import mu.KotlinLogging
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.config.ProcesserProperties
import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate import no.iktdev.mediaprocessing.shared.common.model.ProgressUpdate
import org.springframework.http.MediaType import org.springframework.http.MediaType
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
@ -8,17 +11,35 @@ import org.springframework.web.reactive.function.client.WebClient
@Component @Component
class CoordinatorClient( class CoordinatorClient(
private val processerProperties: ProcesserProperties,
private val webClient: WebClient private val webClient: WebClient
) { ) {
private val log = KotlinLogging.logger {}
@PostConstruct
fun pingCoordinator() {
if (!processerProperties.coordinatorPingOnStartup) {
log.info { "Coordinator ping on startup is disabled" }
return
}
try {
val result = webClient.get()
.uri("/actuator/health")
.retrieve()
.bodyToMono(String::class.java)
.block()
log.info { "Coordinator reachable. Health: $result" }
} catch (e: Exception) {
log.error(e) { "Coordinator NOT reachable at startup" }
}
}
fun reportProgress(referenceId: String, taskId: String, percent: FfmpegDecodedProgress, message: String?) = fun reportProgress(referenceId: String, taskId: String, percent: FfmpegDecodedProgress, message: String?) =
webClient.post() webClient.post()
.uri("/internal/progress") .uri("/internal/progress")
.contentType(MediaType.APPLICATION_JSON) .contentType(MediaType.APPLICATION_JSON)
.bodyValue( .bodyValue(ProgressUpdate(referenceId, taskId, percent, message))
ProgressUpdate(referenceId, taskId, percent, message)
)
.retrieve() .retrieve()
.toBodilessEntity() .toBodilessEntity()
} }

View File

@ -0,0 +1,18 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.processer.config.DirectoryProperties
import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths
import org.springframework.stereotype.Component
import java.io.File
@Component
class FileUtil(
private val dirs: DirectoryProperties,
private val mediaPaths: MediaPaths
) {
fun getTemporaryStoreFile(fileName: String): File =
File(mediaPaths.cache).using(fileName)
fun getLogDirectory(): File = File(dirs.logs)
}

View File

@ -0,0 +1,19 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import org.springframework.stereotype.Component
import java.util.*
import java.util.concurrent.ConcurrentHashMap
@Component
class LocalProgressCache {
private val cache = ConcurrentHashMap<UUID, FfmpegDecodedProgress>()
fun update(taskId: UUID, progress: FfmpegDecodedProgress) {
cache[taskId] = progress
}
fun get(taskId: UUID): FfmpegDecodedProgress? = cache[taskId]
fun getAll(): Map<UUID, FfmpegDecodedProgress> = cache.toMap()
}

View File

@ -6,6 +6,10 @@ import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.exfl.coroutines.CoroutinesDefault import no.iktdev.exfl.coroutines.CoroutinesDefault
import no.iktdev.exfl.coroutines.CoroutinesIO import no.iktdev.exfl.coroutines.CoroutinesIO
import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.processer.config.DirectoryProperties
import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig
import no.iktdev.mediaprocessing.processer.config.ProcesserProperties
import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths
import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.EventRegistry
import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskRegistry
import no.iktdev.mediaprocessing.shared.common.getAppVersion import no.iktdev.mediaprocessing.shared.common.getAppVersion
@ -57,7 +61,10 @@ open class ApplicationConfiguration() {
@Configuration @Configuration
@EnableConfigurationProperties( @EnableConfigurationProperties(
value = [ value = [
ExecutablesConfig::class ExecutablesConfig::class,
DirectoryProperties::class,
ProcesserProperties::class,
MediaPaths::class
] ]
) )
class ProcesserConfig class ProcesserConfig

View File

@ -1,25 +0,0 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.exfl.using
import java.io.File
class ProcesserEnv {
companion object {
val coordinatorUrl = System.getenv("COORDINATOR_URL") ?: "http://coordinator:8080"
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg"
val allowOverwrite = System.getenv("ALLOW_OVERWRITE").toBoolean() ?: false
var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache")
val logDirectory = if (!System.getenv("LOG_DIR").isNullOrBlank()) File(System.getenv("LOG_DIR")) else
File("data").using("logs")
val encodeLogDirectory = logDirectory.using("encode")
val extractLogDirectory = logDirectory.using("extract")
val subtitleExtractLogDirectory = logDirectory.using("subtitles")
val fullLogging = System.getenv("FullLogging").toBoolean()
}
}

View File

@ -1,10 +0,0 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.exfl.using
import java.io.File
object Util {
fun getTemporaryStoreFile(fileName: String): File {
return ProcesserEnv.cachedContent.using(fileName)
}
}

View File

@ -1,15 +1,17 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer.config
import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.WebClient
@Configuration @Configuration
class CoordinatorClientConfig { class CoordinatorClientConfig(
private val processerProperties: ProcesserProperties,
) {
@Bean @Bean
fun coordinatorWebClient(builder: WebClient.Builder): WebClient { fun coordinatorWebClient(builder: WebClient.Builder): WebClient {
val baseUrl = ProcesserEnv.coordinatorUrl val baseUrl = processerProperties.coordinatorUrl
?: error("COORDINATOR_URL must be set") ?: error("COORDINATOR_URL must be set")
return builder return builder
@ -17,3 +19,4 @@ class CoordinatorClientConfig {
.build() .build()
} }
} }

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.processer.config
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "directories")
data class DirectoryProperties(
val logs: String,
)

View File

@ -1,4 +1,4 @@
package no.iktdev.mediaprocessing.processer package no.iktdev.mediaprocessing.processer.config
import org.springframework.boot.context.properties.ConfigurationProperties import org.springframework.boot.context.properties.ConfigurationProperties

View File

@ -0,0 +1,10 @@
package no.iktdev.mediaprocessing.processer.config
import org.springframework.boot.context.properties.ConfigurationProperties
@ConfigurationProperties(prefix = "processer")
data class ProcesserProperties(
val coordinatorUrl: String,
val coordinatorPingOnStartup: Boolean,
val allowOverwrite: Boolean
)

View File

@ -0,0 +1,36 @@
package no.iktdev.mediaprocessing.processer.controller
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.LocalProgressCache
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.*
import java.io.File
import java.util.*
@RestController
@RequestMapping("/state")
class StateController(
private val localProgress: LocalProgressCache
) {
@GetMapping("/progress")
fun allProgress(): Map<UUID, FfmpegDecodedProgress> =
localProgress.getAll()
@GetMapping("/progress/{taskId}")
fun progress(@PathVariable taskId: UUID): FfmpegDecodedProgress? =
localProgress.get(taskId)
@GetMapping("/log")
fun getLog(@RequestParam path: String): ResponseEntity<String> {
val file = File(path)
return if (file.exists()) {
ResponseEntity.ok(file.readText())
} else {
ResponseEntity.notFound().build()
}
}
}

View File

@ -6,17 +6,22 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.eventi.tasks.TaskType import no.iktdev.eventi.tasks.TaskType
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.FileUtil
import no.iktdev.mediaprocessing.processer.Util import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File
import java.util.* import java.util.*
@Service @Service
class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { class SubtitleTaskListener(
private val executableConfig: ExecutablesConfig,
private val fileUtil: FileUtil
) : FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
@ -35,16 +40,18 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
val taskData = task as ExtractSubtitleTask val taskData = task as ExtractSubtitleTask
val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also { val cachedOutFile = fileUtil.getTemporaryStoreFile(taskData.data.outputFileName).also {
if (!it.parentFile.exists()) { if (!it.parentFile.exists()) {
it.parentFile.mkdirs() it.parentFile.mkdirs()
} }
} }
if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") { if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") {
reporter?.publishEvent(ProcesserExtractResultEvent( reporter?.publishEvent(
ProcesserExtractResultEvent(
status = TaskStatus.Failed status = TaskStatus.Failed
).producedFrom(task)) ).producedFrom(task)
)
throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite") throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite")
} }
@ -58,7 +65,7 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
reporter?.updateLastSeen(task.taskId) reporter?.updateLastSeen(task.taskId)
} }
result.run(arguments) result.run(arguments)
if (result.result.resultCode != 0 ) { if (result.result.resultCode != 0) {
return ProcesserExtractResultEvent(status = TaskStatus.Failed).producedFrom(task) return ProcesserExtractResultEvent(status = TaskStatus.Failed).producedFrom(task)
} }
@ -85,15 +92,17 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
} }
override fun getFfmpeg(): FFmpeg { override fun getFfmpeg(): FFmpeg {
return SubtitleFFmpeg() val logDirectory = fileUtil.getLogDirectory().using("subtitles")
return SubtitleFFmpeg(null, executableConfig.ffmpeg, logDirectory)
} }
class SubtitleFFmpeg(override val listener: Listener? = null): FFmpeg(executable = ProcesserEnv.ffmpeg, logDir = ProcesserEnv.subtitleExtractLogDirectory ) { class SubtitleFFmpeg(override val listener: Listener? = null, private val executablePath: String, val logDirectory: File) :
FFmpeg(executable = executablePath, logDir = logDirectory) {
override fun onCreate() { override fun onCreate() {
if (!ProcesserEnv.subtitleExtractLogDirectory.exists()) { if (!logDir.exists()) {
ProcesserEnv.subtitleExtractLogDirectory.mkdirs() logDir.mkdirs()
} }
} }
} }

View File

@ -6,19 +6,27 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.eventi.tasks.TaskType import no.iktdev.eventi.tasks.TaskType
import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.CoordinatorClient import no.iktdev.mediaprocessing.processer.CoordinatorClient
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.FileUtil
import no.iktdev.mediaprocessing.processer.Util import no.iktdev.mediaprocessing.processer.LocalProgressCache
import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File
import java.util.* import java.util.*
@Service @Service
class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) { class VideoTaskListener(
private var coordinatorWebClient: CoordinatorClient,
private val localProgress: LocalProgressCache,
private val executableConfig: ExecutablesConfig,
private val fileUtil: FileUtil,
) : FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
@ -35,15 +43,17 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
val taskData = task as EncodeTask val taskData = task as EncodeTask
val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also { val cachedOutFile = fileUtil.getTemporaryStoreFile(taskData.data.outputFileName).also {
if (!it.parentFile.exists()) { if (!it.parentFile.exists()) {
it.parentFile.mkdirs() it.parentFile.mkdirs()
} }
} }
if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") { if (cachedOutFile.exists() && taskData.data.arguments.firstOrNull() != "-y") {
reporter?.publishEvent(ProcesserEncodeResultEvent( reporter?.publishEvent(
ProcesserEncodeResultEvent(
status = TaskStatus.Failed status = TaskStatus.Failed
).producedFrom(task)) ).producedFrom(task)
)
throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite") throw IllegalStateException("${cachedOutFile.absolutePath} does already exist, and arguments does not permit overwrite")
} }
@ -58,12 +68,16 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
reporter?.updateLastSeen(task.taskId) reporter?.updateLastSeen(task.taskId)
} }
result.run(arguments) result.run(arguments)
if (result.result.resultCode != 0 ) { if (result.result.resultCode != 0) {
return ProcesserEncodeResultEvent(status = TaskStatus.Failed).producedFrom(task) return ProcesserEncodeResultEvent(
status = TaskStatus.Failed,
logFile = result.logFile.absolutePath
).producedFrom(task)
} }
return ProcesserEncodeResultEvent( return ProcesserEncodeResultEvent(
status = TaskStatus.Completed, status = TaskStatus.Completed,
logFile = result.logFile.absolutePath,
data = ProcesserEncodeResultEvent.EncodeResult( data = ProcesserEncodeResultEvent.EncodeResult(
cachedOutputFile = cachedOutFile.absolutePath cachedOutputFile = cachedOutFile.absolutePath
) )
@ -80,12 +94,15 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
TaskStatus.Cancelled -> "Canceled" TaskStatus.Cancelled -> "Canceled"
else -> "" else -> ""
} }
return ProcesserEncodeResultEvent(null, status, error = message) return ProcesserEncodeResultEvent(null, null, status, error = message)
} }
override fun getFfmpeg(): FFmpeg { override fun getFfmpeg(): FFmpeg {
return VideoFFmpeg(object : FFmpeg.Listener { val logDirectory = fileUtil.getLogDirectory().using("encode")
return VideoFFmpeg(execPath = executableConfig.ffmpeg,
logDirectory = logDirectory,
listener = object : FFmpeg.Listener {
var lastProgress: FfmpegDecodedProgress? = null var lastProgress: FfmpegDecodedProgress? = null
override fun onStarted(inputFile: String) { override fun onStarted(inputFile: String) {
} }
@ -95,7 +112,14 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
coordinatorWebClient.reportProgress( coordinatorWebClient.reportProgress(
referenceId = it.referenceId.toString(), referenceId = it.referenceId.toString(),
taskId = it.taskId.toString(), taskId = it.taskId.toString(),
percent = FfmpegDecodedProgress(100, "", lastProgress?.duration ?: "", "0", estimatedCompletion = "", estimatedCompletionSeconds = 0), percent = FfmpegDecodedProgress(
100,
"",
lastProgress?.duration ?: "",
"0",
estimatedCompletion = "",
estimatedCompletionSeconds = 0
),
"" ""
) )
} }
@ -107,6 +131,7 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
) { ) {
lastProgress = progress lastProgress = progress
currentTask?.let { currentTask?.let {
localProgress.update(it.taskId, progress)
coordinatorWebClient.reportProgress( coordinatorWebClient.reportProgress(
referenceId = it.referenceId.toString(), referenceId = it.referenceId.toString(),
taskId = it.taskId.toString(), taskId = it.taskId.toString(),
@ -120,12 +145,13 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
} }
class VideoFFmpeg(override val listener: Listener? = null): FFmpeg(executable = ProcesserEnv.ffmpeg, logDir = ProcesserEnv.encodeLogDirectory) { class VideoFFmpeg(override val listener: Listener? = null, private val execPath: String, val logDirectory: File) :
FFmpeg(executable = execPath, logDir = logDirectory) {
override fun onCreate() { override fun onCreate() {
super.onCreate() super.onCreate()
if (!ProcesserEnv.encodeLogDirectory.exists()) { if (!logDirectory.exists()) {
ProcesserEnv.encodeLogDirectory.mkdirs() logDirectory.mkdirs()
} }
} }
} }

View File

@ -26,12 +26,17 @@ logging:
media: media:
cache: /src/cache cache: ${DIRECTORY_CONTENT_CACHE:/src/cache}
outgoing: /src/output outgoing: /src/output
incoming: /src/input incoming: /src/input
streamit: processer:
address: http://streamit.service coordinator-url: ${COORDINATOR_URL:http://coordinator:8080}
coordinator-ping-on-startup: true
allow-overwrite: ${ALLOW_OVERWRITE:false}
executables: executables:
ffmpeg: ffmpeg ffmpeg: ${SUPPORTING_EXECUTABLE_FFMPEG:ffmpeg}
directories:
logs: ${LOG_DIR:/data/logs}

View File

@ -0,0 +1,27 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.processer.config.DirectoryProperties
import no.iktdev.mediaprocessing.processer.config.ExecutablesConfig
import no.iktdev.mediaprocessing.shared.common.configs.MediaPaths
object TestUtils {
fun getFileUtil(): FileUtil {
val dirs = DirectoryProperties(
logs = "build/test-logs",
)
val mediaPaths = MediaPaths(
cache = "build/test-cache",
incoming = "build/test-input",
outgoing = "build/test-output"
)
return FileUtil(dirs, mediaPaths)
}
fun getExecutableConfig(): ExecutablesConfig {
return ExecutablesConfig(
ffmpeg = "ffmpeg"
)
}
}

View File

@ -19,6 +19,7 @@ class MockFFmpeg(override val listener: Listener, val delayMillis: Long = 500, p
} }
override suspend fun run(argument: MpegArgument) { override suspend fun run(argument: MpegArgument) {
logFile = File("build/test-log/file.json")
inputFile = argument.inputFile!! inputFile = argument.inputFile!!
listener.onStarted(argument.inputFile!!) listener.onStarted(argument.inputFile!!)
delay(delayMillis) delay(delayMillis)

View File

@ -7,6 +7,7 @@ import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.processer.TestUtils
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask
@ -19,7 +20,10 @@ import kotlin.system.measureTimeMillis
class SubtitleTaskListenerTest { class SubtitleTaskListenerTest {
class TestListener(val delay: Long): SubtitleTaskListener() { class TestListener(val delay: Long): SubtitleTaskListener(
fileUtil = TestUtils.getFileUtil(),
executableConfig = TestUtils.getExecutableConfig(),
) {
fun getJob() = currentJob fun getJob() = currentJob
private var _result: Event? = null private var _result: Event? = null

View File

@ -9,6 +9,8 @@ import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.eventi.tasks.TaskTypeRegistry import no.iktdev.eventi.tasks.TaskTypeRegistry
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
import no.iktdev.mediaprocessing.processer.CoordinatorClient import no.iktdev.mediaprocessing.processer.CoordinatorClient
import no.iktdev.mediaprocessing.processer.LocalProgressCache
import no.iktdev.mediaprocessing.processer.TestUtils
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserEncodeResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeData
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.EncodeTask
@ -21,7 +23,13 @@ import kotlin.system.measureTimeMillis
class VideoTaskListenerTest { class VideoTaskListenerTest {
class TestListener(val delay: Long, coordinatorClient: CoordinatorClient): VideoTaskListener(coordinatorClient) {
class TestListener(val delay: Long, coordinatorClient: CoordinatorClient):
VideoTaskListener(coordinatorWebClient = coordinatorClient,
localProgress = LocalProgressCache(),
fileUtil = TestUtils.getFileUtil(),
executableConfig = TestUtils.getExecutableConfig(),
) {
fun getJob() = currentJob fun getJob() = currentJob
private var _result: Event? = null private var _result: Event? = null

View File

@ -33,12 +33,17 @@ management:
show-details: always show-details: always
media: media:
cache: /src/cache cache: build/test-cache
outgoing: /src/output outgoing: build/test-output
incoming: /src/input incoming: build/test-input
streamit:
address: http://streamit.service
executables: executables:
ffmpeg: ffmpeg ffmpeg: ffmpeg
processer:
coordinator-url: ${COORDINATOR_URL:http://coordinator:8080}
coordinator-ping-on-startup: false
allow-overwrite: ${ALLOW_OVERWRITE:false}
directories:
logs: build/test-logs

View File

@ -29,7 +29,6 @@ annotation class MediaProcessingApp
@Configuration @Configuration
@EnableConfigurationProperties( @EnableConfigurationProperties(
value = [ value = [
StreamItConfig::class,
MediaPaths::class MediaPaths::class
] ]
) )

View File

@ -5,6 +5,7 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve
class ProcesserEncodeResultEvent( class ProcesserEncodeResultEvent(
val data: EncodeResult? = null, val data: EncodeResult? = null,
val logFile: String? = null,
status: TaskStatus, status: TaskStatus,
error: String? = null error: String? = null
) : TaskResultEvent(status, error) { ) : TaskResultEvent(status, error) {

View File

@ -11,14 +11,14 @@ object TaskLifecycleRules {
fun isAbandoned( fun isAbandoned(
consumed: Boolean, consumed: Boolean,
createdAt: Instant, persistedAt: Instant,
lastCheckIn: Instant? lastCheckIn: Instant?
): Boolean { ): Boolean {
if (consumed) return false if (consumed) return false
val cutoff = Instant.now().minus(abandonedAfterMinutes, ChronoUnit.MINUTES) val cutoff = Instant.now().minus(abandonedAfterMinutes, ChronoUnit.MINUTES)
val reference = lastCheckIn ?: createdAt val reference = lastCheckIn ?: persistedAt
return reference.isBefore(cutoff) return reference.isBefore(cutoff)
} }