Updated encode and corrected progress stuff
This commit is contained in:
parent
63a22da10c
commit
3300742d2b
@ -21,7 +21,7 @@ dependencies {
|
||||
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
|
||||
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80")
|
||||
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
|
||||
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
|
||||
|
||||
implementation("com.google.code.gson:gson:2.8.9")
|
||||
implementation("org.json:json:20230227")
|
||||
|
||||
@ -26,7 +26,7 @@ dependencies {
|
||||
implementation("no.iktdev.library:subtitle:1.7-SNAPSHOT")
|
||||
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80")
|
||||
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
|
||||
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
|
||||
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
|
||||
|
||||
|
||||
@ -24,7 +24,7 @@ dependencies {
|
||||
implementation(project(":CommonCode"))
|
||||
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80")
|
||||
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
|
||||
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
|
||||
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
|
||||
|
||||
|
||||
@ -9,6 +9,20 @@ import java.util.concurrent.TimeUnit
|
||||
import kotlin.math.floor
|
||||
|
||||
class ProgressDecoder(val workBase: WorkBase) {
|
||||
val expectedKeys = listOf<String>(
|
||||
"frame=",
|
||||
"fps=",
|
||||
"stream_0_0_q=",
|
||||
"bitrate=",
|
||||
"total_size=",
|
||||
"out_time_us=",
|
||||
"out_time_ms=",
|
||||
"out_time=",
|
||||
"dup_frames=",
|
||||
"drop_frames=",
|
||||
"speed=",
|
||||
"progress="
|
||||
)
|
||||
var duration: Int? = null
|
||||
set(value) {
|
||||
if (field == null || field == 0)
|
||||
|
||||
@ -21,13 +21,13 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
|
||||
lateinit var outLogFile: File
|
||||
var outputCache = observableListOf<String>()
|
||||
private val decoder = ProgressDecoder(work)
|
||||
private fun produceProgress(items: List<String>) {
|
||||
fun produceProgress(items: List<String>): Progress? {
|
||||
try {
|
||||
val decodedProgress = decoder.parseVideoProgress(items)
|
||||
if (decodedProgress != null) {
|
||||
val progress = decoder.getProgress(decodedProgress)
|
||||
daemonInterface.onProgress(referenceId, work, progress)
|
||||
outputCache.clear()
|
||||
return progress
|
||||
}
|
||||
} catch (e: IndexOutOfBoundsException) {
|
||||
// Do nothing
|
||||
@ -35,12 +35,16 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
|
||||
//logger.error { e.message }
|
||||
e.printStackTrace()
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
init {
|
||||
outputCache.addListener(object : ObservableList.Listener<String> {
|
||||
override fun onAdded(item: String) {
|
||||
produceProgress(outputCache)
|
||||
val progress = produceProgress(outputCache)
|
||||
progress?.let {
|
||||
daemonInterface.onProgress(referenceId, work, progress)
|
||||
}
|
||||
}
|
||||
})
|
||||
logDir.mkdirs()
|
||||
@ -73,12 +77,14 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
|
||||
override fun onError(code: Int) {
|
||||
daemonInterface.onError(referenceId, work, code)
|
||||
}
|
||||
|
||||
override fun onOutputChanged(line: String) {
|
||||
super.onOutputChanged(line)
|
||||
if (decoder.isDuration(line))
|
||||
decoder.setDuration(line)
|
||||
|
||||
outputCache.add(line)
|
||||
if (decoder.expectedKeys.any { line.startsWith(it) }) {
|
||||
outputCache.add(line)
|
||||
}
|
||||
writeToLog(line)
|
||||
}
|
||||
private fun writeToLog(line: String) {
|
||||
|
||||
@ -0,0 +1,29 @@
|
||||
package no.iktdev.streamit.content.encode
|
||||
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
|
||||
open class Resources {
|
||||
|
||||
fun getText(path: String): String? {
|
||||
return this.javaClass.classLoader.getResource(path)?.readText()
|
||||
}
|
||||
|
||||
open class Streams(): Resources() {
|
||||
fun all(): List<String> {
|
||||
return listOf<String>(
|
||||
getSample(0),
|
||||
getSample(1),
|
||||
getSample(2),
|
||||
getSample(3),
|
||||
getSample(4),
|
||||
getSample(5),
|
||||
getSample(6),
|
||||
)
|
||||
}
|
||||
|
||||
fun getSample(number: Int): String {
|
||||
return getText("streams/sample$number.json")!!
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,6 +1,9 @@
|
||||
package no.iktdev.streamit.content.encode.progress
|
||||
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.content.encode.Resources
|
||||
import no.iktdev.streamit.content.encode.runner.EncodeDaemon
|
||||
import no.iktdev.streamit.content.encode.runner.IEncodeListener
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertDoesNotThrow
|
||||
@ -29,6 +32,50 @@ class DecodedProgressDataDecoderTest {
|
||||
assertThat(lines).isNotEmpty()
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
fun testCanRead() {
|
||||
val res = Resources()
|
||||
val data = res.getText("Output1.txt") ?: ""
|
||||
assertThat(data).isNotEmpty()
|
||||
val lines = data.split("\n").map { it.trim() }
|
||||
assertThat(lines).isNotEmpty()
|
||||
|
||||
val encodeWork = EncodeWork(
|
||||
workId = UUID.randomUUID().toString(),
|
||||
collection = "Demo",
|
||||
inFile = "Demo.mkv",
|
||||
outFile = "FancyDemo.mp4",
|
||||
arguments = emptyList()
|
||||
)
|
||||
val decoder = ProgressDecoder(encodeWork)
|
||||
lines.forEach { decoder.setDuration(it) }
|
||||
assertThat(decoder.duration).isNotNull()
|
||||
val produced = mutableListOf<Progress>()
|
||||
val encoder = EncodeDaemon(UUID.randomUUID().toString(), encodeWork, object : IEncodeListener {
|
||||
override fun onStarted(referenceId: String, work: EncodeWork) {
|
||||
}
|
||||
override fun onError(referenceId: String, work: EncodeWork, code: Int) {
|
||||
}
|
||||
override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) {
|
||||
produced.add(progress)
|
||||
}
|
||||
override fun onEnded(referenceId: String, work: EncodeWork) {
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
|
||||
lines.forEach {
|
||||
encoder.onOutputChanged(it)
|
||||
}
|
||||
assertThat(produced).isNotEmpty()
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
||||
val text = """
|
||||
frame=16811 fps= 88 q=40.0 size= 9984kB time=00:x01:10.79 bitrate=1155.3kbits/s speed=3.71x
|
||||
fps=88.03
|
||||
|
||||
@ -25,7 +25,7 @@ repositories {
|
||||
val exposedVersion = "0.38.2"
|
||||
dependencies {
|
||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha80")
|
||||
implementation("no.iktdev:exfl:0.0.12-SNAPSHOT")
|
||||
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
|
||||
|
||||
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha14")
|
||||
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
|
||||
|
||||
Loading…
Reference in New Issue
Block a user