diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt index c58ff211..2c59cde4 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/EncoderApplication.kt @@ -1,5 +1,7 @@ package no.iktdev.streamit.content.encode +import no.iktdev.exfl.observable.observableMapOf +import no.iktdev.streamit.content.encode.progress.Progress import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext @@ -8,6 +10,7 @@ import org.springframework.context.ApplicationContext class EncoderApplication private var context: ApplicationContext? = null +val progressMap = observableMapOf() @Suppress("unused") fun getContext(): ApplicationContext? { diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/controllers/ProgressController.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/controllers/ProgressController.kt new file mode 100644 index 00000000..29f5edee --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/controllers/ProgressController.kt @@ -0,0 +1,16 @@ +package no.iktdev.streamit.content.encode.controllers + +import com.google.gson.Gson +import no.iktdev.streamit.content.encode.progressMap +import org.springframework.web.bind.annotation.GetMapping +import org.springframework.web.bind.annotation.RestController +import javax.servlet.http.HttpServletResponse + +@RestController +class ProgressController { + @GetMapping("/progress") + fun getValue(response: HttpServletResponse): String { + response.setHeader("Refresh", "5") + return Gson().toJson(progressMap.values) + } +} diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressData.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressData.kt new file mode 100644 index 00000000..5705b35a --- /dev/null +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressData.kt @@ -0,0 +1,18 @@ +package no.iktdev.streamit.content.encode.progress + +data class DecodedProgressData( + val frame: Int?, + val fps: Double?, + val stream_0_0_q: Double?, + val bitrate: String?, + val total_size: Int?, + val out_time_us: Long?, + val out_time_ms: Long?, + val out_time: String?, + val dup_frames: Int?, + val drop_frames: Int?, + val speed: Double?, + val progress: String? +) + +data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0) diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt index aaec4eeb..1c4e721b 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/Progress.kt @@ -1,16 +1,8 @@ package no.iktdev.streamit.content.encode.progress data class Progress( - val frame: Int?, - val fps: Double?, - val stream_0_0_q: Double?, - val bitrate: String?, - val total_size: Int?, - val out_time_us: Long?, - val out_time_ms: Long?, - val out_time: String?, - val dup_frames: Int?, - val drop_frames: Int?, - val speed: Double?, - val progress: String? -) + val workId: String, + val outFileName: String, + val progress: Int = -1, + val estimatedCompletion: String = "Unknown", +) \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt index c52caabd..5c1c8a5c 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt @@ -1,7 +1,20 @@ package no.iktdev.streamit.content.encode.progress -class ProgressDecoder { - fun parseVideoProgress(lines: List): Progress? { +import no.iktdev.streamit.content.common.dto.reader.work.WorkBase +import java.io.File +import java.lang.StringBuilder +import java.time.LocalTime +import java.time.format.DateTimeFormatter +import java.util.concurrent.TimeUnit +import kotlin.math.floor + +class ProgressDecoder(val workBase: WorkBase) { + var duration: Int? = null + set(value) { + if (field == null || field == 0) + field = value + } + fun parseVideoProgress(lines: List): DecodedProgressData? { var frame: Int? = null var progress: String? = null val metadataMap = mutableMapOf() @@ -22,7 +35,7 @@ class ProgressDecoder { return if (progress != null) { // When "progress" is found, build and return the VideoMetadata object - Progress( + DecodedProgressData( frame, metadataMap["fps"]?.toDoubleOrNull(), metadataMap["stream_0_0_q"]?.toDoubleOrNull(), metadataMap["bitrate"], metadataMap["total_size"]?.toIntOrNull(), metadataMap["out_time_us"]?.toLongOrNull(), metadataMap["out_time_ms"]?.toLongOrNull(), metadataMap["out_time"], metadataMap["dup_frames"]?.toIntOrNull(), @@ -32,4 +45,80 @@ class ProgressDecoder { null // If "progress" is not found, return null } } + + + fun isDuration(value: String): Boolean { + return value.contains("Duration", ignoreCase = true) + } + fun setDuration(value: String) { + val results = Regex("Duration:\\s*([^,]+),").find(value)?.groupValues?.firstOrNull() + duration = timeSpanToSeconds(results) + } + + private fun timeSpanToSeconds(time: String?): Int? + { + time ?: return null + val timeString = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(time) ?: return null + val strippedMS = Regex("[0-9]+:[0-9]+:[0-9]+").find(timeString.value) ?: return null + val outTime = LocalTime.parse(strippedMS.value, DateTimeFormatter.ofPattern("HH:mm:ss")) + return outTime.toSecondOfDay() + } + + + private fun getProgressTime(time: Long?): Long { + if (time == null) return 0 + return time / 1000L + } + fun getProgress(decoded: DecodedProgressData): Progress { + if (duration == null) return Progress(workId = workBase.workId, outFileName = File(workBase.outFile).name) + val diff = getProgressTime(decoded.out_time_ms).toDouble() / duration!!.toDouble() + val progress = floor(diff*100).toInt() + + val ect = getEstimatedTimeRemaining(decoded) + + return Progress( + workId = workBase.workId, outFileName = File(workBase.outFile).name, + progress = progress, + estimatedCompletion = getETA(ect) + ) + } + + fun getEstimatedTimeRemaining(decoded: DecodedProgressData): Long { + val position = getProgressTime(decoded.out_time_ms) + return if(duration == null || decoded.speed == null) -1 else + Math.round(Math.round(duration!!.toDouble() - position.toDouble()) / decoded.speed) + } + + fun getECT(time: Long): ECT { + var seconds = time + val day = TimeUnit.SECONDS.toDays(seconds) + seconds -= java.util.concurrent.TimeUnit.DAYS.toSeconds(day) + + val hour = TimeUnit.SECONDS.toHours(seconds) + seconds -= java.util.concurrent.TimeUnit.HOURS.toSeconds(hour) + + val minute = TimeUnit.SECONDS.toMinutes(seconds) + seconds -= java.util.concurrent.TimeUnit.MINUTES.toSeconds(minute) + + return ECT(day.toInt(), hour.toInt(), minute.toInt(), seconds.toInt()) + } + private fun getETA(time: Long): String { + val etc = getECT(time) ?: return "Unknown" + val str = StringBuilder() + if (etc.day > 0) { + str.append("${etc.day}d").append(" ") + } + if (etc.hour > 0) { + str.append("${etc.hour}h").append(" ") + } + if (etc.day == 0 && etc.minute > 0) { + str.append("${etc.minute}m").append(" ") + } + if (etc.hour == 0 && etc.second > 0) { + str.append("${etc.second}s").append(" ") + } + return str.toString().trim() + } + + } \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt index 64306de2..8a7335ee 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt @@ -7,6 +7,7 @@ import no.iktdev.exfl.observable.observableListOf import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.encode.progress.DecodedProgressData import no.iktdev.streamit.content.encode.progress.Progress import no.iktdev.streamit.content.encode.progress.ProgressDecoder import java.io.BufferedWriter @@ -19,11 +20,12 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte val logDir = File("/src/logs") lateinit var outLogFile: File var outputCache = observableListOf() - private val decoder = ProgressDecoder() + private val decoder = ProgressDecoder(work) private fun produceProgress(items: List) { try { - val progress = decoder.parseVideoProgress(items) - if (progress != null) { + val decodedProgress = decoder.parseVideoProgress(items) + if (decodedProgress != null) { + val progress = decoder.getProgress(decodedProgress) daemonInterface.onProgress(referenceId, work, progress) outputCache.clear() } @@ -73,6 +75,9 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte } override fun onOutputChanged(line: String) { super.onOutputChanged(line) + if (decoder.isDuration(line)) + decoder.setDuration(line) + outputCache.add(line) writeToLog(line) } diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt index eaaba085..ad8dcb39 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/ExtractDaemon.kt @@ -6,7 +6,7 @@ import no.iktdev.exfl.observable.observableListOf import no.iktdev.streamit.content.common.deamon.Daemon import no.iktdev.streamit.content.common.deamon.IDaemon import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork -import no.iktdev.streamit.content.encode.progress.Progress +import no.iktdev.streamit.content.encode.progress.DecodedProgressData import java.io.File private val logger = KotlinLogging.logger {} @@ -50,6 +50,6 @@ class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonIn interface IExtractListener { fun onStarted(referenceId: String, work: ExtractWork) fun onError(referenceId: String, work: ExtractWork, code: Int) - fun onProgress(referenceId: String, work: ExtractWork, progress: Progress) {} + fun onProgress(referenceId: String, work: ExtractWork, progress: DecodedProgressData) {} fun onEnded(referenceId: String, work: ExtractWork) } \ No newline at end of file diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt index d8f5b08a..78b90988 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/RunnerCoordinator.kt @@ -3,23 +3,21 @@ package no.iktdev.streamit.content.encode.runner import com.google.gson.Gson import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.sync.Mutex import no.iktdev.streamit.content.encode.EncodeEnv import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork +import no.iktdev.streamit.content.encode.progress.DecodedProgressData import no.iktdev.streamit.content.encode.progress.Progress +import no.iktdev.streamit.content.encode.progressMap import no.iktdev.streamit.library.kafka.KafkaEvents import no.iktdev.streamit.library.kafka.dto.Message import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.StatusType import no.iktdev.streamit.library.kafka.producer.DefaultProducer -import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env import org.springframework.stereotype.Service -import java.util.concurrent.* import java.util.concurrent.atomic.AtomicInteger private val logger = KotlinLogging.logger {} @@ -157,6 +155,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) { override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { logger.info { "Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${Gson().toJson(progress)}" } + progressMap.put(work.workId, progress) } override fun onEnded(referenceId: String, work: EncodeWork) { diff --git a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt similarity index 78% rename from Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt rename to Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt index aba5d661..0d97be3e 100644 --- a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoderTest.kt +++ b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt @@ -1,14 +1,22 @@ package no.iktdev.streamit.content.encode.progress +import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow +import java.util.UUID -class ProgressDecoderTest { +class DecodedProgressDataDecoderTest { @Test fun test() { - val progress = ProgressDecoder() + val progress = ProgressDecoder(EncodeWork( + workId = UUID.randomUUID().toString(), + collection = "Demo", + inFile = "Demo.mkv", + outFile = "FancyDemo.mp4", + arguments = emptyList() + )) val lines = text.split("\n") val cache: MutableList = mutableListOf() lines.forEach {