Updated encode
This commit is contained in:
parent
9298115d49
commit
63a22da10c
@ -1,5 +1,7 @@
|
||||
package no.iktdev.streamit.content.encode
|
||||
|
||||
import no.iktdev.exfl.observable.observableMapOf
|
||||
import no.iktdev.streamit.content.encode.progress.Progress
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
import org.springframework.context.ApplicationContext
|
||||
@ -8,6 +10,7 @@ import org.springframework.context.ApplicationContext
|
||||
class EncoderApplication
|
||||
|
||||
private var context: ApplicationContext? = null
|
||||
val progressMap = observableMapOf<String, Progress>()
|
||||
|
||||
@Suppress("unused")
|
||||
fun getContext(): ApplicationContext? {
|
||||
|
||||
@ -0,0 +1,16 @@
|
||||
package no.iktdev.streamit.content.encode.controllers
|
||||
|
||||
import com.google.gson.Gson
|
||||
import no.iktdev.streamit.content.encode.progressMap
|
||||
import org.springframework.web.bind.annotation.GetMapping
|
||||
import org.springframework.web.bind.annotation.RestController
|
||||
import javax.servlet.http.HttpServletResponse
|
||||
|
||||
@RestController
|
||||
class ProgressController {
|
||||
@GetMapping("/progress")
|
||||
fun getValue(response: HttpServletResponse): String {
|
||||
response.setHeader("Refresh", "5")
|
||||
return Gson().toJson(progressMap.values)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,18 @@
|
||||
package no.iktdev.streamit.content.encode.progress
|
||||
|
||||
data class DecodedProgressData(
|
||||
val frame: Int?,
|
||||
val fps: Double?,
|
||||
val stream_0_0_q: Double?,
|
||||
val bitrate: String?,
|
||||
val total_size: Int?,
|
||||
val out_time_us: Long?,
|
||||
val out_time_ms: Long?,
|
||||
val out_time: String?,
|
||||
val dup_frames: Int?,
|
||||
val drop_frames: Int?,
|
||||
val speed: Double?,
|
||||
val progress: String?
|
||||
)
|
||||
|
||||
data class ECT(val day: Int = 0, val hour: Int = 0, val minute: Int = 0, val second: Int = 0)
|
||||
@ -1,16 +1,8 @@
|
||||
package no.iktdev.streamit.content.encode.progress
|
||||
|
||||
data class Progress(
|
||||
val frame: Int?,
|
||||
val fps: Double?,
|
||||
val stream_0_0_q: Double?,
|
||||
val bitrate: String?,
|
||||
val total_size: Int?,
|
||||
val out_time_us: Long?,
|
||||
val out_time_ms: Long?,
|
||||
val out_time: String?,
|
||||
val dup_frames: Int?,
|
||||
val drop_frames: Int?,
|
||||
val speed: Double?,
|
||||
val progress: String?
|
||||
)
|
||||
val workId: String,
|
||||
val outFileName: String,
|
||||
val progress: Int = -1,
|
||||
val estimatedCompletion: String = "Unknown",
|
||||
)
|
||||
@ -1,7 +1,20 @@
|
||||
package no.iktdev.streamit.content.encode.progress
|
||||
|
||||
class ProgressDecoder {
|
||||
fun parseVideoProgress(lines: List<String>): Progress? {
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.WorkBase
|
||||
import java.io.File
|
||||
import java.lang.StringBuilder
|
||||
import java.time.LocalTime
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.math.floor
|
||||
|
||||
class ProgressDecoder(val workBase: WorkBase) {
|
||||
var duration: Int? = null
|
||||
set(value) {
|
||||
if (field == null || field == 0)
|
||||
field = value
|
||||
}
|
||||
fun parseVideoProgress(lines: List<String>): DecodedProgressData? {
|
||||
var frame: Int? = null
|
||||
var progress: String? = null
|
||||
val metadataMap = mutableMapOf<String, String>()
|
||||
@ -22,7 +35,7 @@ class ProgressDecoder {
|
||||
|
||||
return if (progress != null) {
|
||||
// When "progress" is found, build and return the VideoMetadata object
|
||||
Progress(
|
||||
DecodedProgressData(
|
||||
frame, metadataMap["fps"]?.toDoubleOrNull(), metadataMap["stream_0_0_q"]?.toDoubleOrNull(),
|
||||
metadataMap["bitrate"], metadataMap["total_size"]?.toIntOrNull(), metadataMap["out_time_us"]?.toLongOrNull(),
|
||||
metadataMap["out_time_ms"]?.toLongOrNull(), metadataMap["out_time"], metadataMap["dup_frames"]?.toIntOrNull(),
|
||||
@ -32,4 +45,80 @@ class ProgressDecoder {
|
||||
null // If "progress" is not found, return null
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
fun isDuration(value: String): Boolean {
|
||||
return value.contains("Duration", ignoreCase = true)
|
||||
}
|
||||
fun setDuration(value: String) {
|
||||
val results = Regex("Duration:\\s*([^,]+),").find(value)?.groupValues?.firstOrNull()
|
||||
duration = timeSpanToSeconds(results)
|
||||
}
|
||||
|
||||
private fun timeSpanToSeconds(time: String?): Int?
|
||||
{
|
||||
time ?: return null
|
||||
val timeString = Regex("[0-9]+:[0-9]+:[0-9]+.[0-9]+").find(time) ?: return null
|
||||
val strippedMS = Regex("[0-9]+:[0-9]+:[0-9]+").find(timeString.value) ?: return null
|
||||
val outTime = LocalTime.parse(strippedMS.value, DateTimeFormatter.ofPattern("HH:mm:ss"))
|
||||
return outTime.toSecondOfDay()
|
||||
}
|
||||
|
||||
|
||||
private fun getProgressTime(time: Long?): Long {
|
||||
if (time == null) return 0
|
||||
return time / 1000L
|
||||
}
|
||||
fun getProgress(decoded: DecodedProgressData): Progress {
|
||||
if (duration == null) return Progress(workId = workBase.workId, outFileName = File(workBase.outFile).name)
|
||||
val diff = getProgressTime(decoded.out_time_ms).toDouble() / duration!!.toDouble()
|
||||
val progress = floor(diff*100).toInt()
|
||||
|
||||
val ect = getEstimatedTimeRemaining(decoded)
|
||||
|
||||
return Progress(
|
||||
workId = workBase.workId, outFileName = File(workBase.outFile).name,
|
||||
progress = progress,
|
||||
estimatedCompletion = getETA(ect)
|
||||
)
|
||||
}
|
||||
|
||||
fun getEstimatedTimeRemaining(decoded: DecodedProgressData): Long {
|
||||
val position = getProgressTime(decoded.out_time_ms)
|
||||
return if(duration == null || decoded.speed == null) -1 else
|
||||
Math.round(Math.round(duration!!.toDouble() - position.toDouble()) / decoded.speed)
|
||||
}
|
||||
|
||||
fun getECT(time: Long): ECT {
|
||||
var seconds = time
|
||||
val day = TimeUnit.SECONDS.toDays(seconds)
|
||||
seconds -= java.util.concurrent.TimeUnit.DAYS.toSeconds(day)
|
||||
|
||||
val hour = TimeUnit.SECONDS.toHours(seconds)
|
||||
seconds -= java.util.concurrent.TimeUnit.HOURS.toSeconds(hour)
|
||||
|
||||
val minute = TimeUnit.SECONDS.toMinutes(seconds)
|
||||
seconds -= java.util.concurrent.TimeUnit.MINUTES.toSeconds(minute)
|
||||
|
||||
return ECT(day.toInt(), hour.toInt(), minute.toInt(), seconds.toInt())
|
||||
}
|
||||
private fun getETA(time: Long): String {
|
||||
val etc = getECT(time) ?: return "Unknown"
|
||||
val str = StringBuilder()
|
||||
if (etc.day > 0) {
|
||||
str.append("${etc.day}d").append(" ")
|
||||
}
|
||||
if (etc.hour > 0) {
|
||||
str.append("${etc.hour}h").append(" ")
|
||||
}
|
||||
if (etc.day == 0 && etc.minute > 0) {
|
||||
str.append("${etc.minute}m").append(" ")
|
||||
}
|
||||
if (etc.hour == 0 && etc.second > 0) {
|
||||
str.append("${etc.second}s").append(" ")
|
||||
}
|
||||
return str.toString().trim()
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -7,6 +7,7 @@ import no.iktdev.exfl.observable.observableListOf
|
||||
import no.iktdev.streamit.content.common.deamon.Daemon
|
||||
import no.iktdev.streamit.content.common.deamon.IDaemon
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.content.encode.progress.DecodedProgressData
|
||||
import no.iktdev.streamit.content.encode.progress.Progress
|
||||
import no.iktdev.streamit.content.encode.progress.ProgressDecoder
|
||||
import java.io.BufferedWriter
|
||||
@ -19,11 +20,12 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
|
||||
val logDir = File("/src/logs")
|
||||
lateinit var outLogFile: File
|
||||
var outputCache = observableListOf<String>()
|
||||
private val decoder = ProgressDecoder()
|
||||
private val decoder = ProgressDecoder(work)
|
||||
private fun produceProgress(items: List<String>) {
|
||||
try {
|
||||
val progress = decoder.parseVideoProgress(items)
|
||||
if (progress != null) {
|
||||
val decodedProgress = decoder.parseVideoProgress(items)
|
||||
if (decodedProgress != null) {
|
||||
val progress = decoder.getProgress(decodedProgress)
|
||||
daemonInterface.onProgress(referenceId, work, progress)
|
||||
outputCache.clear()
|
||||
}
|
||||
@ -73,6 +75,9 @@ class EncodeDaemon(val referenceId: String, val work: EncodeWork, val daemonInte
|
||||
}
|
||||
override fun onOutputChanged(line: String) {
|
||||
super.onOutputChanged(line)
|
||||
if (decoder.isDuration(line))
|
||||
decoder.setDuration(line)
|
||||
|
||||
outputCache.add(line)
|
||||
writeToLog(line)
|
||||
}
|
||||
|
||||
@ -6,7 +6,7 @@ import no.iktdev.exfl.observable.observableListOf
|
||||
import no.iktdev.streamit.content.common.deamon.Daemon
|
||||
import no.iktdev.streamit.content.common.deamon.IDaemon
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
|
||||
import no.iktdev.streamit.content.encode.progress.Progress
|
||||
import no.iktdev.streamit.content.encode.progress.DecodedProgressData
|
||||
import java.io.File
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
@ -50,6 +50,6 @@ class ExtractDaemon(val referenceId: String, val work: ExtractWork, val daemonIn
|
||||
interface IExtractListener {
|
||||
fun onStarted(referenceId: String, work: ExtractWork)
|
||||
fun onError(referenceId: String, work: ExtractWork, code: Int)
|
||||
fun onProgress(referenceId: String, work: ExtractWork, progress: Progress) {}
|
||||
fun onProgress(referenceId: String, work: ExtractWork, progress: DecodedProgressData) {}
|
||||
fun onEnded(referenceId: String, work: ExtractWork)
|
||||
}
|
||||
@ -3,23 +3,21 @@ package no.iktdev.streamit.content.encode.runner
|
||||
import com.google.gson.Gson
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.consumeAsFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import no.iktdev.streamit.content.encode.EncodeEnv
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.ExtractWork
|
||||
import no.iktdev.streamit.content.encode.progress.DecodedProgressData
|
||||
import no.iktdev.streamit.content.encode.progress.Progress
|
||||
import no.iktdev.streamit.content.encode.progressMap
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||
import org.springframework.boot.autoconfigure.couchbase.CouchbaseProperties.Env
|
||||
import org.springframework.stereotype.Service
|
||||
import java.util.concurrent.*
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
@ -157,6 +155,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
|
||||
|
||||
override fun onProgress(referenceId: String, work: EncodeWork, progress: Progress) {
|
||||
logger.info { "Work progress for $referenceId with WorkId ${work.workId} @ ${work.outFile}: Progress: ${Gson().toJson(progress)}" }
|
||||
progressMap.put(work.workId, progress)
|
||||
}
|
||||
|
||||
override fun onEnded(referenceId: String, work: EncodeWork) {
|
||||
|
||||
@ -1,14 +1,22 @@
|
||||
package no.iktdev.streamit.content.encode.progress
|
||||
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Test
|
||||
import org.junit.jupiter.api.assertDoesNotThrow
|
||||
import java.util.UUID
|
||||
|
||||
class ProgressDecoderTest {
|
||||
class DecodedProgressDataDecoderTest {
|
||||
|
||||
@Test
|
||||
fun test() {
|
||||
val progress = ProgressDecoder()
|
||||
val progress = ProgressDecoder(EncodeWork(
|
||||
workId = UUID.randomUUID().toString(),
|
||||
collection = "Demo",
|
||||
inFile = "Demo.mkv",
|
||||
outFile = "FancyDemo.mp4",
|
||||
arguments = emptyList()
|
||||
))
|
||||
val lines = text.split("\n")
|
||||
val cache: MutableList<String> = mutableListOf()
|
||||
lines.forEach {
|
||||
Loading…
Reference in New Issue
Block a user