diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7f..35eb1ddf 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,6 @@ - + \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt index 58d81c72..7aa3e161 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Implementations.kt @@ -1,16 +1,19 @@ package no.iktdev.mediaprocessing.processer -import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import no.iktdev.mediaprocessing.shared.common.Defaults import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import -@Configuration -class SocketLocalInit: SocketImplementation() +//@Configuration +//class SocketLocalInit: SocketImplementation() @Configuration @Import(CoordinatorProducer::class, DefaultMessageListener::class) class KafkaLocalInit: KafkaImplementation() { -} \ No newline at end of file +} + +@Configuration +class DefaultConfiguration: Defaults() diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt new file mode 100644 index 00000000..5eaa69b9 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/Reporter.kt @@ -0,0 +1,28 @@ +package no.iktdev.mediaprocessing.processer + +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service +import org.springframework.web.client.RestTemplate + +@Service +class Reporter(@Autowired private val restTemplate: RestTemplate) { + + fun sendEncodeProgress(progress: ProcesserEventInfo) { + try { + restTemplate.postForEntity(SharedConfig.uiUrl + "/encode/progress", progress, String::class.java) + } catch (e: Exception) { + e.printStackTrace() + } + } + + fun sendExtractProgress(progress: ProcesserEventInfo) { + try { + restTemplate.postForEntity(SharedConfig.uiUrl + "/extract/progress", progress, String::class.java) + } catch (e: Exception) { + e.printStackTrace() + } + } + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt index aba6950b..c1f008d6 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegDecodedProgress.kt @@ -1,5 +1,7 @@ package no.iktdev.mediaprocessing.processer.ffmpeg +import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserProgress + data class FfmpegDecodedProgress( val progress: Int = -1, val time: String, @@ -7,6 +9,16 @@ data class FfmpegDecodedProgress( val speed: String, val estimatedCompletionSeconds: Long = -1, val estimatedCompletion: String = "Unknown", -) +) { + fun toProcessProgress(): ProcesserProgress { + return ProcesserProgress( + progress = this.progress, + speed = this.speed, + timeWorkedOn = this.time, + timeLeft = this.estimatedCompletion + ) + + } +} data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0) \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt index fab7eea3..f88fbf72 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegWorker.kt @@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.processer.ffmpeg import com.github.pgreze.process.Redirect import com.github.pgreze.process.process -import com.google.gson.Gson import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import mu.KotlinLogging @@ -59,7 +58,7 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe withContext(Dispatchers.IO) { logFile.createNewFile() } - listener.onStarted(info) + listener.onStarted(referenceId, eventId, info) val processOp = process( ProcesserEnv.ffmpeg, *args.toTypedArray(), stdout = Redirect.CAPTURE, @@ -73,17 +72,29 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe val result = processOp onOutputChanged("Received exit code: ${result.resultCode}") if (result.resultCode != 0) { - listener.onError(info, result.output.joinToString("\n")) + listener.onError(referenceId, eventId, info, result.output.joinToString("\n")) } else { - listener.onCompleted(info) + listener.onCompleted(referenceId, eventId, info) } } + private var progress: FfmpegDecodedProgress? = null fun onOutputChanged(line: String) { outputCache.add(line) writeToLog(line) // toList is needed to prevent mutability. - val progress = decoder.parseVideoProgress(outputCache.toList()) + decoder.parseVideoProgress(outputCache.toList())?.let { decoded -> + try { + val _progress = decoder.getProgress(decoded) + if (progress == null || _progress.progress > (progress?.progress ?: -1) ) { + progress = _progress + listener.onProgressChanged(referenceId, eventId, info, _progress) + } + } catch (e: Exception) { + e.printStackTrace() + } + } + } fun writeToLog(line: String) { @@ -96,8 +107,8 @@ class FfmpegWorker(val referenceId: String, val eventId: String, val info: Ffmpe } interface FfmpegWorkerEvents { - fun onStarted(info: FfmpegWorkRequestCreated,) - fun onCompleted(info: FfmpegWorkRequestCreated) - fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) - fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) + fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated,) + fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) + fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) + fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) } \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 80e35e0e..fe735877 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -7,13 +7,13 @@ import no.iktdev.mediaprocessing.processer.* import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.common.getComputername +import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo +import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed @@ -24,7 +24,7 @@ import java.util.* import javax.annotation.PreDestroy @Service -class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { +class EncodeService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) { private val log = KotlinLogging.logger {} private val logDir = ProcesserEnv.encodeLogDirectory @@ -83,7 +83,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents ) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { - ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") + ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) return @@ -98,7 +98,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat } val ffmpegWorkerEvents = object : FfmpegWorkerEvents { - override fun onStarted(info: FfmpegWorkRequestCreated) { + override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) { val runner = this@EncodeService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce start message when the referenceId is not present" } @@ -106,7 +106,13 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat } log.info { "Encode started for ${runner.referenceId}" } persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) - sendProgress(info, null, false) + sendProgress(referenceId, eventId, status = WorkStatus.Started, info, FfmpegDecodedProgress( + progress = 0, + time = "Unkown", + duration = "Unknown", + speed = "0", + ) + ) scope.launch { while (runnerJob?.isActive == true) { @@ -116,7 +122,7 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat } } - override fun onCompleted(info: FfmpegWorkRequestCreated) { + override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) { val runner = this@EncodeService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce completion message when the referenceId is not present" } @@ -139,12 +145,18 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, data = ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = serviceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile) ) + sendProgress(referenceId, eventId, status = WorkStatus.Completed, info, FfmpegDecodedProgress( + progress = 100, + time = "", + duration = "", + speed = "0", + )) clearWorker() } } - override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { + override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { val runner = this@EncodeService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce error message when the referenceId is not present" } @@ -154,18 +166,31 @@ class EncodeService(@Autowired override var coordinator: Coordinator): TaskCreat producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, data = ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) ) - sendProgress(info = info, ended = true) + sendProgress(referenceId, eventId, status = WorkStatus.Failed, info = info, progress = FfmpegDecodedProgress( + progress = 0, + time = "", + duration = "", + speed = "0", + )) clearWorker() } - override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { - sendProgress(info, progress, false) + override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { + sendProgress(referenceId, eventId, WorkStatus.Working, info, progress) } } - fun sendProgress(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null, ended: Boolean) { - // TODO: Implementation + fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) { + val processerEventInfo = ProcesserEventInfo( + referenceId = referenceId, + eventId = eventId, + status = status, + inputFile = info.inputFile, + outputFiles = listOf(info.outFile), + progress = progress?.toProcessProgress() + ) + reporter.sendEncodeProgress(processerEventInfo) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index d796b845..aeb37615 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -8,13 +8,13 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegDecodedProgress import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorker import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegWorkerEvents import no.iktdev.mediaprocessing.shared.common.limitedWhile -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.common.getComputername +import no.iktdev.mediaprocessing.shared.contract.dto.ProcesserEventInfo +import no.iktdev.mediaprocessing.shared.contract.dto.WorkStatus import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed @@ -25,7 +25,7 @@ import java.util.* import javax.annotation.PreDestroy @Service -class ExtractService(@Autowired override var coordinator: Coordinator): TaskCreator(coordinator) { +class ExtractService(@Autowired override var coordinator: Coordinator, @Autowired private val reporter: Reporter): TaskCreator(coordinator) { private val log = KotlinLogging.logger {} private val logDir = ProcesserEnv.extractLogDirectory @@ -88,7 +88,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea runner = FfmpegWorker(event.referenceId, event.eventId, info = ffwrc, logDir = logDir, listener = ffmpegWorkerEvents) if (File(ffwrc.outFile).exists() && ffwrc.arguments.firstOrNull() != "-y") { - ffmpegWorkerEvents.onError(ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") + ffmpegWorkerEvents.onError(event.referenceId, event.eventId, ffwrc, "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outFile}") // Setting consumed to prevent spamming persistentWriter.setProcessEventCompleted(event.referenceId, event.eventId, serviceId) return @@ -103,7 +103,7 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea } val ffmpegWorkerEvents = object : FfmpegWorkerEvents { - override fun onStarted(info: FfmpegWorkRequestCreated) { + override fun onStarted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) { val runner = this@ExtractService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce start message when the referenceId is not present" } @@ -111,10 +111,10 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea } log.info { "Extract started for ${runner.referenceId}" } persistentWriter.setProcessEventClaim(runner.referenceId, runner.eventId, serviceId) - sendState(info, false) + sendProgress(referenceId, eventId, WorkStatus.Started, info) } - override fun onCompleted(info: FfmpegWorkRequestCreated) { + override fun onCompleted(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated) { val runner = this@ExtractService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce completion message when the referenceId is not present" } @@ -149,12 +149,13 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea derivedFromEventId = runner.eventId, outFile = runner.info.outFile) ) + sendProgress(referenceId, eventId, WorkStatus.Completed, info) log.info { "Extract is releasing worker" } clearWorker() } } - override fun onError(info: FfmpegWorkRequestCreated, errorMessage: String) { + override fun onError(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, errorMessage: String) { val runner = this@ExtractService.runner if (runner == null || runner.referenceId.isBlank()) { log.error { "Can't produce error message when the referenceId is not present" } @@ -164,18 +165,26 @@ class ExtractService(@Autowired override var coordinator: Coordinator): TaskCrea producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = serviceId, derivedFromEventId = runner.eventId) ) - sendState(info, ended= true) + sendProgress(referenceId, eventId, WorkStatus.Failed, info) clearWorker() } - override fun onProgressChanged(info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { - // None as this will not be running with progress + override fun onProgressChanged(referenceId: String, eventId: String, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress) { + sendProgress(referenceId, eventId, WorkStatus.Working, info, progress) } } - fun sendState(info: FfmpegWorkRequestCreated, ended: Boolean) { - + fun sendProgress(referenceId: String, eventId: String, status: WorkStatus, info: FfmpegWorkRequestCreated, progress: FfmpegDecodedProgress? = null) { + val processerEventInfo = ProcesserEventInfo( + referenceId = referenceId, + eventId = eventId, + status = status, + inputFile = info.inputFile, + outputFiles = listOf(info.outFile), + progress = progress?.toProcessProgress() + ) + reporter.sendExtractProgress(processerEventInfo) } diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt index 2b5777cb..17061515 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Configuration.kt @@ -1,12 +1,16 @@ package no.iktdev.mediaprocessing.ui +import no.iktdev.mediaprocessing.shared.common.Defaults import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import org.springframework.beans.factory.annotation.Value import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory import org.springframework.boot.web.server.WebServerFactoryCustomizer import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.messaging.simp.config.MessageBrokerRegistry +import org.springframework.context.annotation.Import import org.springframework.web.bind.annotation.RestController import org.springframework.web.client.RestTemplate import org.springframework.web.method.HandlerTypePredicate @@ -14,9 +18,6 @@ import org.springframework.web.servlet.config.annotation.CorsRegistry import org.springframework.web.servlet.config.annotation.PathMatchConfigurer import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry import org.springframework.web.servlet.config.annotation.WebMvcConfigurer -import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker -import org.springframework.web.socket.config.annotation.StompEndpointRegistry -import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer @Configuration @@ -63,3 +64,11 @@ class ApiCommunicationConfig { @Configuration class SocketImplemented: SocketImplementation() { } + +@Configuration +class DefaultConfiguration: Defaults() + +@Configuration +@Import(CoordinatorProducer::class, DefaultMessageListener::class) +class KafkaLocalInit: KafkaImplementation() { +} \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt index 3b140048..b08d20d8 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/Coordinator.kt @@ -1,7 +1,6 @@ package no.iktdev.mediaprocessing.ui import no.iktdev.mediaprocessing.shared.common.CoordinatorBase -import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.contract.ProcessType @@ -9,22 +8,24 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.ui.coordinator.PersistentEventBasedMessageListener +import no.iktdev.mediaprocessing.ui.dto.EventSummary import no.iktdev.mediaprocessing.ui.dto.EventSummarySubItem import no.iktdev.mediaprocessing.ui.dto.SummaryState +import no.iktdev.mediaprocessing.ui.socket.EventbasedTopic import org.springframework.beans.factory.annotation.Autowired -import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service @Service @EnableScheduling -class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : CoordinatorBase() { +class Coordinator(@Autowired private val eventbasedTopic: EventbasedTopic) : CoordinatorBase() { override val listeners = PersistentEventBasedMessageListener() - val dbReader = PersistentDataReader(getEventsDatabase()) override fun onCoordinatorReady() { @@ -41,14 +42,6 @@ class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : Coo ) { } - fun readAllEvents() { - val messages = persistentReader.getAllMessages() - } - - fun readAllProcesserEvents() { - val messages = persistentReader.getProcessEvents() - } - @Scheduled(fixedDelay = (5_000)) fun refreshDatabaseData() { @@ -106,15 +99,62 @@ class Coordinator(@Autowired private val template: SimpMessagingTemplate?) : Coo return SummaryState.Preparing } - // EVENT_MEDIA_METADATA_SEARCH_PERFORMED + val analyzed2 = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) } + if (analyzed2 != null) { + return SummaryState.Analyzing + } + val waitingForMeta = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED } + if (waitingForMeta != null) { + return SummaryState.Metadata + } + + val analyzed = events.findLast { it.event in listOf(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) } + if (analyzed != null) { + return SummaryState.Analyzing + } + + val readEvent = events.findLast { it.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED } + if (readEvent != null) { + return SummaryState.Read + } return SummaryState.Started } fun buildSummaries() { + val processerMessages = persistentReader.getProcessEvents().groupBy { it.referenceId } val messages = persistentReader.getAllMessages() + val mapped = messages.mapNotNull { it -> + val referenceId = it.firstOrNull()?.referenceId + if (referenceId != null) { + val procM = processerMessages.getOrDefault(referenceId, emptyList()) + val processesStatuses = getCurrentStateFromProcesserEvents(procM) + val messageStatus = getCurrentState(it, processesStatuses) + + val baseNameEvent = it.lastOrNull {ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED }?.data.let { data -> + if (data is BaseInfoPerformed) data else null + } + val mediaNameEvent = it.lastOrNull { ke -> ke.event == KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE }?.data.let { data -> + if (data is VideoInfoPerformed) data else null + } + + val baseName = if (mediaNameEvent == null) baseNameEvent?.sanitizedName else mediaNameEvent.toValueObject()?.fullName + + EventSummary( + referenceId = referenceId, + baseName = baseName, + collection = mediaNameEvent?.toValueObject()?.title, + events = it.map { ke -> ke.event }, + status = messageStatus, + activeEvens = processesStatuses + ) + + } else null + } + + } } \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt index db1717af..59433437 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/UIApplication.kt @@ -7,19 +7,15 @@ import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.observableMapOf import no.iktdev.mediaprocessing.shared.common.DatabaseEnvConfig -import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.toEventsDatabase -import no.iktdev.mediaprocessing.ui.dto.EventDataObject import no.iktdev.mediaprocessing.ui.dto.ExplorerItem -import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit private val logger = KotlinLogging.logger {} @@ -44,8 +40,6 @@ fun getContext(): ApplicationContext? { return context } -val memSimpleConvertedEventsMap: ObservableMap = observableMapOf() -val memActiveEventMap: ObservableMap = observableMapOf() val fileRegister: ObservableMap = observableMapOf() fun main(args: Array) { diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/dto/EventSummary.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/dto/EventSummary.kt index f06ff592..cf969e35 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/dto/EventSummary.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/dto/EventSummary.kt @@ -4,10 +4,10 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents data class EventSummary( val referenceId: String, - val baseName: String, - val collection: String, - val events: List, - val status: SummaryState, + val baseName: String? = null, + val collection: String? = null, + val events: List = emptyList(), + val status: SummaryState = SummaryState.Started, val activeEvens: Map ) @@ -26,7 +26,7 @@ enum class SummaryState { Preparing, Metadata, Analyzing, - Reading, + Read, Started } \ No newline at end of file diff --git a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UISocketService.kt b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventbasedTopic.kt similarity index 70% rename from apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UISocketService.kt rename to apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventbasedTopic.kt index 1a686fa9..025ca1f5 100644 --- a/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/UISocketService.kt +++ b/apps/ui/src/main/kotlin/no/iktdev/mediaprocessing/ui/socket/EventbasedTopic.kt @@ -1,21 +1,26 @@ package no.iktdev.mediaprocessing.ui.socket import mu.KotlinLogging +import no.iktdev.exfl.observable.ObservableList import no.iktdev.exfl.observable.ObservableMap +import no.iktdev.exfl.observable.observableListOf +import no.iktdev.exfl.observable.observableMapOf import no.iktdev.mediaprocessing.ui.dto.EventDataObject +import no.iktdev.mediaprocessing.ui.dto.EventSummary import no.iktdev.mediaprocessing.ui.dto.SimpleEventDataObject -import no.iktdev.mediaprocessing.ui.memActiveEventMap -import no.iktdev.mediaprocessing.ui.memSimpleConvertedEventsMap import org.springframework.beans.factory.annotation.Autowired import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.stereotype.Controller @Controller -class UISocketService( +class EventbasedTopic( @Autowired private val template: SimpMessagingTemplate? ) { private val log = KotlinLogging.logger {} + val summaryList: ObservableList = observableListOf() + val memSimpleConvertedEventsMap: ObservableMap = observableMapOf() + val memActiveEventMap: ObservableMap = observableMapOf() init { memActiveEventMap.addListener(object : ObservableMap.Listener { @@ -38,6 +43,12 @@ class UISocketService( } } }) + summaryList.addListener(object: ObservableList.Listener { + override fun onListChanged(items: List) { + super.onListChanged(items) + template?.convertAndSend("/topic/summary", items) + } + }) } @MessageMapping("/items") diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt new file mode 100644 index 00000000..810e41a4 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Defaults.kt @@ -0,0 +1,15 @@ +package no.iktdev.mediaprocessing.shared.common + +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.web.client.RestTemplate + +@Configuration +open class Defaults { + + @Bean + fun restTemplate(): RestTemplate { + val restTemplate = RestTemplate() + return restTemplate + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index 153fc40b..9d5a4f9c 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -10,6 +10,7 @@ object SharedConfig { val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe" val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "ffmpeg" + val uiUrl: String = System.getenv("APP_URL_UI") ?: "http://ui:8080" val preference: File = File("/data/config/preference.json") } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index f846ca6c..f357d878 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -16,6 +16,7 @@ data class PersistentMessage( val created: LocalDateTime ) + fun PersistentMessage.isOfEvent(event: KafkaEvents): Boolean { return this.event == event } diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt index ab8b9f49..bffcaa7a 100644 --- a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.common +import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import org.h2.jdbcx.JdbcDataSource import java.io.PrintWriter @@ -8,16 +9,11 @@ import java.sql.SQLFeatureNotSupportedException import java.util.logging.Logger import javax.sql.DataSource -class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password) { - companion object { - fun fromDatabaseEnv(): H2DataSource { - if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'") - return H2DataSource( - JdbcDataSource(), - databaseName = DatabaseConfig.database!!, - ) - } - } +class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource( + DatabaseConnectionConfig( + databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password, port = null + ) +) { override fun getConnection(): Connection { return jdbcDataSource.connection } @@ -61,7 +57,7 @@ class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: Str } override fun createDatabaseStatement(): String { - return "CREATE SCHEMA $databaseName" + return "CREATE SCHEMA ${config.databaseName}" } override fun toConnectionUrl(): String { diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConverterEventInfo.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConverterEventInfo.kt new file mode 100644 index 00000000..f407fa7f --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ConverterEventInfo.kt @@ -0,0 +1,7 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +data class ConverterEventInfo( + val status: WorkStatus = WorkStatus.Pending, + val inputFile: String, + val outputFiles: List = emptyList() +) \ No newline at end of file diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt index fc2aaed5..9f687a80 100644 --- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/Enums.kt @@ -1,5 +1,13 @@ package no.iktdev.mediaprocessing.shared.contract.dto +enum class WorkStatus { + Pending, + Started, + Working, + Completed, + Failed +} + enum class SubtitleFormats { ASS, diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ProcesserEventInfo.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ProcesserEventInfo.kt new file mode 100644 index 00000000..2c5d92a3 --- /dev/null +++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/dto/ProcesserEventInfo.kt @@ -0,0 +1,17 @@ +package no.iktdev.mediaprocessing.shared.contract.dto + +data class ProcesserEventInfo( + val referenceId: String, + val eventId: String, + val status: WorkStatus = WorkStatus.Pending, + val progress: ProcesserProgress? = null, + val inputFile: String, + val outputFiles: List +) + +data class ProcesserProgress( + val progress: Int = -1, + val speed: String? = null, + val timeWorkedOn: String? = null, + val timeLeft: String? = "Unknown", // HH mm +) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt index d9eda56b..3f0f7154 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt @@ -27,18 +27,18 @@ data class VideoInfoPerformed( data class EpisodeInfo( override val type: String = "serie", - val title: String, + override val title: String, val episode: Int, val season: Int, val episodeTitle: String?, override val fullName: String -): VideoInfo(type, fullName) +): VideoInfo(type, title, fullName) data class MovieInfo( override val type: String = "movie", - val title: String, + override val title: String, override val fullName: String -) : VideoInfo(type, fullName) +) : VideoInfo(type, title, fullName) data class SubtitleInfo( val inputFile: String, @@ -49,6 +49,7 @@ data class SubtitleInfo( @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE) open class VideoInfo( @Transient open val type: String, + @Transient open val title: String, @Transient open val fullName: String ) { fun toJsonObject(): JsonObject {