enabled mode

This commit is contained in:
bskjon 2025-04-27 01:58:08 +02:00
parent 7f014a494c
commit 9c6aa304a8
14 changed files with 271 additions and 10 deletions

View File

@ -50,7 +50,7 @@ fun main(args: Array<String>) {
eventsDatabase.createTables(tasks, runners)
taskManager = TasksManager(eventsDatabase)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ConvertApplication::class.java.simpleName)
runnerManager.assignRunner()
runApplication<ConvertApplication>(*args)

View File

@ -19,7 +19,7 @@ class TaskCoordinator(): TaskCoordinatorBase() {
override fun onCoordinatorReady() {
super.onCoordinatorReady()
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ConvertApplication::class.java.simpleName)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ConvertApplication::class.java.simpleName)
runnerManager.assignRunner()
}
@ -77,6 +77,10 @@ class TaskCoordinator(): TaskCoordinatorBase() {
}
}
override fun getEnabledState(): Boolean {
return runnerManager.amIEnabled()
}
interface TaskEvents {
fun onCancelOrStopProcess(eventId: String)
}

View File

@ -104,7 +104,7 @@ fun main(args: Array<String>) {
)
storeDatabase.createTables(*tables)
runnerManager = RunnerManager(dataSource = eventDatabase.database, name = CoordinatorApplication::class.java.simpleName)
runnerManager = RunnerManager(dataSource = eventDatabase.database, applicationName = CoordinatorApplication::class.java.simpleName)
runnerManager.assignRunner()
runApplication<CoordinatorApplication>(*args)

View File

@ -98,4 +98,10 @@ class Coordinator(
}
return taskMode
}
override fun updateEnabledState(): Boolean {
isEnabled = runnerManager.amIEnabled()
return isEnabled
}
}

View File

@ -59,7 +59,7 @@ fun main(args: Array<String>) {
taskManager = TasksManager(eventsDatabase)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ProcesserApplication::class.java.simpleName)
runnerManager.assignRunner()
runApplication<ProcesserApplication>(*args)

View File

@ -17,7 +17,7 @@ class TaskCoordinator(): TaskCoordinatorBase() {
override fun onCoordinatorReady() {
super.onCoordinatorReady()
runnerManager = RunnerManager(dataSource = getEventsDatabase(), name = ProcesserApplication::class.java.simpleName)
runnerManager = RunnerManager(dataSource = getEventsDatabase(), applicationName = ProcesserApplication::class.java.simpleName)
runnerManager.assignRunner()
}
@ -68,6 +68,10 @@ class TaskCoordinator(): TaskCoordinatorBase() {
taskManager.produceEvent(event)
}
override fun getEnabledState(): Boolean {
return runnerManager.amIEnabled()
}
override fun clearExpiredClaims() {
val expiredClaims = taskManager.getTasksWithExpiredClaim().filter { it.task in listOf(TaskType.Encode, TaskType.Extract) }
expiredClaims.forEach {

View File

@ -0,0 +1,20 @@
package no.iktdev.mediaprocessing.processer
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.jsonToEvent
import org.json.JSONArray
enum class Files(val fileName: String) {
Output1("encodeProgress1.txt")
}
fun Files.getAsList(): List<String> {
return this.javaClass.classLoader.getResource(this.fileName)?.readText()?.lines() ?: emptyList()
}
fun Files.getContent(): String? {
return this.javaClass.classLoader.getResource(this.fileName)?.readText()
}

View File

@ -0,0 +1,15 @@
package no.iktdev.mediaprocessing.processer.ffmpeg
import no.iktdev.mediaprocessing.processer.Files
import no.iktdev.mediaprocessing.processer.getAsList
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
class FfmpegRunnerTest {
}

View File

@ -0,0 +1,26 @@
package no.iktdev.mediaprocessing.processer.ffmpeg.progress
import no.iktdev.mediaprocessing.processer.Files
import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegRunner
import no.iktdev.mediaprocessing.processer.getAsList
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
class FfmpegProgressDecoderTest {
@Test
@DisplayName("Verify that progress can be decoded")
fun parseReadout1() {
val lines = Files.Output1.getAsList()
val decoder = FfmpegProgressDecoder()
lines.forEach {
decoder.defineDuration(it)
}
val result = decoder.parseVideoProgress(lines)
assertThat(result?.progress).isNotNull()
val progress = decoder.getProgress(result!!)
assertThat(progress.progress).isGreaterThanOrEqualTo(0)
}
}

View File

@ -0,0 +1,141 @@
Guessed Channel Layout for Input Stream #0.1 : 5.1
Input #0, matroska,webm, from '/src/input/completed/standalone/Potato.mkv':
Metadata:
encoder : libebml v1.3.10 + libmatroska v1.5.2
creation_time : 2020-02-17T16:42:19.000000Z
Duration: 00:29:09.75, start: 0.000000, bitrate: 9882 kb/s
Stream #0:0: Video: h264 (High), yuv420p(tv, bt709, progressive), 1920x1080 [SAR 1:1 DAR 16:9], 23.98 fps, 23.98 tbr, 1k tbn, 47.95 tbc (default)
Metadata:
BPS-eng : 9239506
DURATION-eng : 00:29:09.748000000
NUMBER_OF_FRAMES-eng: 41952
NUMBER_OF_BYTES-eng: 2020851044
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
Stream #0:1(eng): Audio: eac3, 48000 Hz, 5.1, fltp (default)
Metadata:
BPS-eng : 640000
DURATION-eng : 00:29:09.728000000
NUMBER_OF_FRAMES-eng: 54679
NUMBER_OF_BYTES-eng: 139978240
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
Stream #0:2(eng): Subtitle: subrip
Metadata:
BPS-eng : 120
DURATION-eng : 00:28:08.917000000
NUMBER_OF_FRAMES-eng: 718
NUMBER_OF_BYTES-eng: 25445
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
Stream #0:3(eng): Subtitle: subrip
Metadata:
title : SDH
BPS-eng : 120
DURATION-eng : 00:28:08.917000000
NUMBER_OF_FRAMES-eng: 718
NUMBER_OF_BYTES-eng: 25445
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
Stream mapping:
Stream #0:0 -> #0:0 (h264 (native) -> hevc (libx265))
Stream #0:1 -> #0:1 (copy)
x265 [info]: HEVC encoder version 3.4
x265 [info]: build info [Linux][GCC 9.3.0][64 bit] 8bit+10bit+12bit
x265 [info]: using cpu capabilities: MMX2 SSE2Fast LZCNT SSSE3 SSE4.2 AVX FMA3 BMI2 AVX2
x265 [info]: Main profile, Level-4 (Main tier)
x265 [info]: Thread pool created using 16 threads
x265 [info]: Slices : 1
x265 [info]: frame threads / pool features : 4 / wpp(17 rows)
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
set_mempolicy: Operation not permitted
x265 [info]: Coding QT: max CU size, min CU size : 64 / 8
x265 [info]: Residual QT: max TU size, max depth : 32 / 1 inter / 1 intra
x265 [info]: ME / range / subpel / merge : hex / 57 / 2 / 3
x265 [info]: Keyframe min / max / scenecut / bias : 23 / 250 / 40 / 5.00
x265 [info]: Lookahead / bframes / badapt : 20 / 4 / 2
x265 [info]: b-pyramid / weightp / weightb : 1 / 1 / 0
x265 [info]: References / ref-limit cu / depth : 3 / off / on
x265 [info]: AQ: mode / str / qg-size / cu-tree : 2 / 1.0 / 32 / 1
x265 [info]: Rate Control / qCompress : CRF-16.0 / 0.60
x265 [info]: tools: rd=3 psy-rd=2.00 early-skip rskip mode=1 signhide tmvp
x265 [info]: tools: b-intra strong-intra-smoothing lslices=6 deblock sao
[mp4 @ 0x64a098f4e140] track 1: codec frame size is not set
Output #0, mp4, to '/src/cache/Potato.work.mp4':
Metadata:
encoder : Lavf58.45.100
Stream #0:0: Video: hevc (libx265) (hev1 / 0x31766568), yuv420p, 1920x1080 [SAR 1:1 DAR 16:9], q=-1--1, 23.98 fps, 24k tbn, 23.98 tbc (default)
Metadata:
BPS-eng : 9239506
DURATION-eng : 00:29:09.748000000
NUMBER_OF_FRAMES-eng: 41952
NUMBER_OF_BYTES-eng: 2020851044
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
encoder : Lavc58.91.100 libx265
Side data:
cpb: bitrate max/min/avg: 0/0/0 buffer size: 0 vbv_delay: N/A
Stream #0:1(eng): Audio: eac3 (ec-3 / 0x332D6365), 48000 Hz, 5.1, fltp (default)
Metadata:
BPS-eng : 640000
DURATION-eng : 00:29:09.728000000
NUMBER_OF_FRAMES-eng: 54679
NUMBER_OF_BYTES-eng: 139978240
_STATISTICS_WRITING_APP-eng: mkvmerge v43.0.0 ('The Quartermaster') 64-bit
_STATISTICS_WRITING_DATE_UTC-eng: 2020-02-17 16:42:19
_STATISTICS_TAGS-eng: BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES
frame=34
fps=0.00
stream_0_0_q=21.9
bitrate= 0.2kbits/s
total_size=44
out_time_us=2198000
out_time_ms=2198000
out_time=00:00:02.198000
dup_frames=0
drop_frames=0
speed=4.04x
progress=continue

View File

@ -13,6 +13,7 @@ import javax.annotation.PostConstruct
abstract class TaskCoordinatorBase() {
private val log = KotlinLogging.logger {}
var taskMode: ActiveMode = ActiveMode.Active
var isEnabled: Boolean = true
private var ready: Boolean = false
fun isReady() = ready
@ -43,7 +44,7 @@ abstract class TaskCoordinatorBase() {
@Scheduled(fixedDelay = (5_000))
fun pullAvailable() {
if (taskMode != ActiveMode.Active) {
if (taskMode != ActiveMode.Active || !isEnabled) {
return
}
pullForAvailableTasks()
@ -51,6 +52,20 @@ abstract class TaskCoordinatorBase() {
abstract fun clearExpiredClaims()
abstract fun getEnabledState(): Boolean
@Scheduled(fixedDelay = 10_000)
fun pullEnabledState() {
val prevState = isEnabled
isEnabled = getEnabledState()
if (prevState != isEnabled) {
log.info { "State changed for coordinator: $prevState -> $isEnabled" }
if (isEnabled) {
log.info { "New tasks will now be processed" }
}
}
}
@Scheduled(fixedDelay = (300_000))
fun resetExpiredClaims() {
if (taskMode != ActiveMode.Active) {

View File

@ -11,24 +11,35 @@ import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.select
import java.util.UUID
class RunnerManager(private val dataSource: DataSource, private val startId: String = UUID.randomUUID().toString(), val name: String) {
class RunnerManager(private val dataSource: DataSource, val startId: String = UUID.randomUUID().toString(), val applicationName: String) {
private val log = KotlinLogging.logger {}
fun assignRunner(): Boolean {
return executeOrException(dataSource.database) {
runners.insert {
it[runners.startId] = this@RunnerManager.startId
it[runners.application] = this@RunnerManager.name
it[runners.application] = this@RunnerManager.applicationName
it[runners.version] = getAppVersion()
}
} == null
}
fun amIEnabled(): Boolean {
return withDirtyRead(dataSource.database) {
runners.select {
(runners.application eq applicationName) and
(runners.startId eq startId)
}.singleOrNull()?.get(runners.enabled)
} ?: run {
log.error { "Failed to get a response, reporting false for enabled" }
false
}
}
fun iAmSuperseded(): Boolean {
return withDirtyRead(dataSource.database) {
val runnerVersionCodes = runners.select {
(runners.application eq this@RunnerManager.name) and
(runners.application eq this@RunnerManager.applicationName) and
(runners.startId neq this@RunnerManager.startId)
}.map { it[runners.version] }

View File

@ -11,4 +11,5 @@ object runners: IntIdTable() {
val application: Column<String> = varchar("application", 50)
val version: Column<Int> = integer("version")
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
val enabled: Column<Boolean> = bool("enabled").default(true)
}

View File

@ -36,6 +36,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
open var taskMode: ActiveMode = ActiveMode.Active
open var isEnabled: Boolean = true
private val referencePool: MutableMap<String, Deferred<Boolean>> = mutableMapOf()
private fun referencePoolIsReadyForEvents(): Boolean {
return (referencePool.isEmpty() || referencePool.any { !it.value.isActive })
@ -44,6 +45,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
private var newEventProduced: Boolean = false
abstract fun getActiveTaskMode(): ActiveMode
abstract fun updateEnabledState(): Boolean
private var activePolls: Int = 0
data class PollStats(val active: Int, val total: Int)
@ -119,7 +121,8 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
var cachedReferenceList: MutableList<String> = mutableListOf()
private fun pullForEvents() {
coroutine.launch {
while (taskMode == ActiveMode.Active && coroutine.isActive) {
while (taskMode == ActiveMode.Active && coroutine.isActive && isEnabled) {
if (referencePoolIsReadyForEvents()) {
log.debug { "New pull on database" }
val referenceIdsAvailable = eventManager.getAvailableReferenceIds()
@ -155,6 +158,21 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
}
taskMode = getActiveTaskMode()
}
coroutine.launch {
while (taskMode == ActiveMode.Active && coroutine.isActive) {
val previousState = isEnabled
val iminentStatus = updateEnabledState()
if (previousState != iminentStatus) {
log.info { "Observed change to enabled state, changed $previousState -> $iminentStatus" }
if (iminentStatus) {
log.info { "Ovserved change to enabled state, new state is enabled\n\tRestarting pullForEvents!" }
onReady()
return@launch
}
}
delay(10_000)
}
}
}
private var cachedListeners: List<String> = emptyList()