diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt index 62d835cc..580cf1ee 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt @@ -64,6 +64,7 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { fun parseStreams(data: JsonObject?): ParsedMediaStreams { + val ignoreCodecs = listOf("png", "mjpeg") val gson = Gson() return try { val jStreams = data!!.getAsJsonArray("streams") @@ -72,17 +73,18 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { val audioStreams = mutableListOf() val subtitleStreams = mutableListOf() - jStreams.forEach { streamJson -> + for (streamJson in jStreams) { val streamObject = streamJson.asJsonObject - + if (!streamObject.has("codec_name")) continue + val codecName = streamObject.get("codec_name").asString val codecType = streamObject.get("codec_type").asString - if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") { - } else { - when (codecType) { - "video" -> videoStreams.add(gson.fromJson(streamObject, VideoStream::class.java)) - "audio" -> audioStreams.add(gson.fromJson(streamObject, AudioStream::class.java)) - "subtitle" -> subtitleStreams.add(gson.fromJson(streamObject, SubtitleStream::class.java)) - } + + if (codecName in ignoreCodecs) continue + + when (codecType) { + "video" -> videoStreams.add(gson.fromJson(streamObject, VideoStream::class.java)) + "audio" -> audioStreams.add(gson.fromJson(streamObject, AudioStream::class.java)) + "subtitle" -> subtitleStreams.add(gson.fromJson(streamObject, SubtitleStream::class.java)) } } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index 8d99d1cc..2fe50f41 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -15,6 +15,8 @@ abstract class EventCoordinator> { abstract var eventManager: E val pullDelay: AtomicLong = AtomicLong(1000) + val fastPullDelay: AtomicLong = AtomicLong(500) + val slowPullDelay: AtomicLong = AtomicLong(2500) //private val listeners: MutableList> = mutableListOf() @@ -43,8 +45,21 @@ abstract class EventCoordinator> { private fun onEventGroupsReceived(eventGroup: List>) { + val egRefIds = eventGroup.map { it.first().referenceId() } + val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key } + orphanedReferences.forEach { id -> referencePool.remove(id) } + val activePolls = referencePool.values.filter { it.isActive }.size - log.info { "Active polls $activePolls/${referencePool.values.size}" } + if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) { + log.info { "Last active references removed from pull pool, " } + } + + if (eventGroup.isNotEmpty()) { + log.info { "Active polls $activePolls/${referencePool.values.size}" } + } else { + return + } + eventGroup.forEach { val referenceId = it.first().referenceId() @@ -87,9 +102,15 @@ abstract class EventCoordinator> { val events = eventManager.readAvailableEvents() onEventGroupsReceived(events) if (events.isNotEmpty()) { - pullDelay.set(500) + if (pullDelay.get() != fastPullDelay.get()) { + log.info { "Available events found, switching to fast pull @ Delay -> ${fastPullDelay.get()}" } + } + pullDelay.set(fastPullDelay.get()) } else { - pullDelay.set(2500) + if (pullDelay.get() != slowPullDelay.get()) { + log.info { "None events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" } + } + pullDelay.set(slowPullDelay.get()) } referencePool.values.awaitAll() }