From 3300742d2bfbd0bc01742e58cd0e9cfcbbada91f Mon Sep 17 00:00:00 2001 From: Brage Date: Sun, 30 Jul 2023 01:18:36 +0200 Subject: [PATCH] Updated encode and corrected progress stuff --- CommonCode/build.gradle.kts | 2 +- Convert/build.gradle.kts | 2 +- Encode/build.gradle.kts | 2 +- .../encode/progress/ProgressDecoder.kt | 14 ++++++ .../content/encode/runner/EncodeDaemon.kt | 16 +++++-- .../streamit/content/encode/Resources.kt | 29 ++++++++++++ .../DecodedProgressDataDecoderTest.kt | 47 +++++++++++++++++++ Reader/build.gradle.kts | 2 +- 8 files changed, 105 insertions(+), 9 deletions(-) create mode 100644 Encode/src/test/kotlin/no/iktdev/streamit/content/encode/Resources.kt diff --git a/CommonCode/build.gradle.kts b/CommonCode/build.gradle.kts index 3479b317..ec7d1700 100644 --- a/CommonCode/build.gradle.kts +++ b/CommonCode/build.gradle.kts @@ -21,7 +21,7 @@ dependencies { implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80") - implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("com.google.code.gson:gson:2.8.9") implementation("org.json:json:20230227") diff --git a/Convert/build.gradle.kts b/Convert/build.gradle.kts index 809bdbf8..b712acfe 100644 --- a/Convert/build.gradle.kts +++ b/Convert/build.gradle.kts @@ -26,7 +26,7 @@ dependencies { implementation("no.iktdev.library:subtitle:1.7-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80") - implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") diff --git a/Encode/build.gradle.kts b/Encode/build.gradle.kts index 7b8edcb7..bc31250f 100644 --- a/Encode/build.gradle.kts +++ b/Encode/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":CommonCode")) implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80") - implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt index 5c1c8a5c..b4a48b9f 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/progress/ProgressDecoder.kt @@ -9,6 +9,20 @@ import java.util.concurrent.TimeUnit import kotlin.math.floor class ProgressDecoder(val workBase: WorkBase) { + val expectedKeys = listOf( + "frame=", + "fps=", + "stream_0_0_q=", + "bitrate=", + "total_size=", + "out_time_us=", + "out_time_ms=", + "out_time=", + "dup_frames=", + "drop_frames=", + "speed=", + "progress=" + ) var duration: Int? = null set(value) { if (field == null || field == 0) diff --git a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt index 8a7335ee..bed2b696 100644 --- a/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt +++ b/Encode/src/main/kotlin/no/iktdev/streamit/content/encode/runner/EncodeDaemon.kt @@ -21,13 +21,13 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte lateinit var outLogFile: File var outputCache = observableListOf() private val decoder = ProgressDecoder(work) - private fun produceProgress(items: List) { + fun produceProgress(items: List): Progress? { try { val decodedProgress = decoder.parseVideoProgress(items) if (decodedProgress != null) { val progress = decoder.getProgress(decodedProgress) - daemonInterface.onProgress(referenceId, work, progress) outputCache.clear() + return progress } } catch (e: IndexOutOfBoundsException) { // Do nothing @@ -35,12 +35,16 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte //logger.error { e.message } e.printStackTrace() } + return null } init { outputCache.addListener(object : ObservableList.Listener { override fun onAdded(item: String) { - produceProgress(outputCache) + val progress = produceProgress(outputCache) + progress?.let { + daemonInterface.onProgress(referenceId, work, progress) + } } }) logDir.mkdirs() @@ -73,12 +77,14 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte override fun onError(code: Int) { daemonInterface.onError(referenceId, work, code) } + override fun onOutputChanged(line: String) { super.onOutputChanged(line) if (decoder.isDuration(line)) decoder.setDuration(line) - - outputCache.add(line) + if (decoder.expectedKeys.any { line.startsWith(it) }) { + outputCache.add(line) + } writeToLog(line) } private fun writeToLog(line: String) { diff --git a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/Resources.kt b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/Resources.kt new file mode 100644 index 00000000..22bb2314 --- /dev/null +++ b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/Resources.kt @@ -0,0 +1,29 @@ +package no.iktdev.streamit.content.encode + +import org.apache.kafka.clients.consumer.ConsumerRecord + +open class Resources { + + fun getText(path: String): String? { + return this.javaClass.classLoader.getResource(path)?.readText() + } + + open class Streams(): Resources() { + fun all(): List { + return listOf( + getSample(0), + getSample(1), + getSample(2), + getSample(3), + getSample(4), + getSample(5), + getSample(6), + ) + } + + fun getSample(number: Int): String { + return getText("streams/sample$number.json")!! + } + } + +} \ No newline at end of file diff --git a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt index 0d97be3e..2331eae4 100644 --- a/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt +++ b/Encode/src/test/kotlin/no/iktdev/streamit/content/encode/progress/DecodedProgressDataDecoderTest.kt @@ -1,6 +1,9 @@ package no.iktdev.streamit.content.encode.progress import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork +import no.iktdev.streamit.content.encode.Resources +import no.iktdev.streamit.content.encode.runner.EncodeDaemon +import no.iktdev.streamit.content.encode.runner.IEncodeListener import org.assertj.core.api.Assertions.assertThat import org.junit.jupiter.api.Test import org.junit.jupiter.api.assertDoesNotThrow @@ -29,6 +32,50 @@ class DecodedProgressDataDecoderTest { assertThat(lines).isNotEmpty() } + + + @Test + fun testCanRead() { + val res = Resources() + val data = res.getText("Output1.txt") ?: "" + assertThat(data).isNotEmpty() + val lines = data.split("\n").map { it.trim() } + assertThat(lines).isNotEmpty() + + val encodeWork = EncodeWork( + workId = UUID.randomUUID().toString(), + collection = "Demo", + inFile = "Demo.mkv", + outFile = "FancyDemo.mp4", + arguments = emptyList() + ) + val decoder = ProgressDecoder(encodeWork) + lines.forEach { decoder.setDuration(it) } + assertThat(decoder.duration).isNotNull() + val produced = mutableListOf() + val encoder = EncodeDaemon(UUID.randomUUID().toString(), encodeWork, object : IEncodeListener { + override fun onStarted(referenceId: String, work: EncodeWork) { + } + override fun onError(referenceId: String, work: EncodeWork, code: Int) { + } + override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) { + produced.add(progress) + } + override fun onEnded(referenceId: String, work: EncodeWork) { + } + + }) + + + lines.forEach { + encoder.onOutputChanged(it) + } + assertThat(produced).isNotEmpty() + + + } + + val text = """ frame=16811 fps= 88 q=40.0 size= 9984kB time=00:x01:10.79 bitrate=1155.3kbits/s speed=3.71x fps=88.03 diff --git a/Reader/build.gradle.kts b/Reader/build.gradle.kts index 5231d6f4..4192a821 100644 --- a/Reader/build.gradle.kts +++ b/Reader/build.gradle.kts @@ -25,7 +25,7 @@ repositories { val exposedVersion = "0.38.2" dependencies { implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80") - implementation("no.iktdev:exfl:0.0.12-SNAPSHOT") + implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")