v3 30
This commit is contained in:
parent
c8f3fa5102
commit
f5a3603a5a
@ -64,6 +64,7 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
|
|||||||
|
|
||||||
|
|
||||||
fun parseStreams(data: JsonObject?): ParsedMediaStreams {
|
fun parseStreams(data: JsonObject?): ParsedMediaStreams {
|
||||||
|
val ignoreCodecs = listOf("png", "mjpeg")
|
||||||
val gson = Gson()
|
val gson = Gson()
|
||||||
return try {
|
return try {
|
||||||
val jStreams = data!!.getAsJsonArray("streams")
|
val jStreams = data!!.getAsJsonArray("streams")
|
||||||
@ -72,19 +73,20 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
|
|||||||
val audioStreams = mutableListOf<AudioStream>()
|
val audioStreams = mutableListOf<AudioStream>()
|
||||||
val subtitleStreams = mutableListOf<SubtitleStream>()
|
val subtitleStreams = mutableListOf<SubtitleStream>()
|
||||||
|
|
||||||
jStreams.forEach { streamJson ->
|
for (streamJson in jStreams) {
|
||||||
val streamObject = streamJson.asJsonObject
|
val streamObject = streamJson.asJsonObject
|
||||||
|
if (!streamObject.has("codec_name")) continue
|
||||||
|
val codecName = streamObject.get("codec_name").asString
|
||||||
val codecType = streamObject.get("codec_type").asString
|
val codecType = streamObject.get("codec_type").asString
|
||||||
if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") {
|
|
||||||
} else {
|
if (codecName in ignoreCodecs) continue
|
||||||
|
|
||||||
when (codecType) {
|
when (codecType) {
|
||||||
"video" -> videoStreams.add(gson.fromJson(streamObject, VideoStream::class.java))
|
"video" -> videoStreams.add(gson.fromJson(streamObject, VideoStream::class.java))
|
||||||
"audio" -> audioStreams.add(gson.fromJson(streamObject, AudioStream::class.java))
|
"audio" -> audioStreams.add(gson.fromJson(streamObject, AudioStream::class.java))
|
||||||
"subtitle" -> subtitleStreams.add(gson.fromJson(streamObject, SubtitleStream::class.java))
|
"subtitle" -> subtitleStreams.add(gson.fromJson(streamObject, SubtitleStream::class.java))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
val parsedStreams = ParsedMediaStreams(
|
val parsedStreams = ParsedMediaStreams(
|
||||||
videoStream = videoStreams,
|
videoStream = videoStreams,
|
||||||
|
|||||||
@ -15,6 +15,8 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
|||||||
abstract var eventManager: E
|
abstract var eventManager: E
|
||||||
|
|
||||||
val pullDelay: AtomicLong = AtomicLong(1000)
|
val pullDelay: AtomicLong = AtomicLong(1000)
|
||||||
|
val fastPullDelay: AtomicLong = AtomicLong(500)
|
||||||
|
val slowPullDelay: AtomicLong = AtomicLong(2500)
|
||||||
|
|
||||||
//private val listeners: MutableList<EventListener<T>> = mutableListOf()
|
//private val listeners: MutableList<EventListener<T>> = mutableListOf()
|
||||||
|
|
||||||
@ -43,8 +45,21 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
|||||||
|
|
||||||
|
|
||||||
private fun onEventGroupsReceived(eventGroup: List<List<T>>) {
|
private fun onEventGroupsReceived(eventGroup: List<List<T>>) {
|
||||||
|
val egRefIds = eventGroup.map { it.first().referenceId() }
|
||||||
|
val orphanedReferences = referencePool.filter { !it.value.isActive }.filter { id -> id.key !in egRefIds }.map { it.key }
|
||||||
|
orphanedReferences.forEach { id -> referencePool.remove(id) }
|
||||||
|
|
||||||
val activePolls = referencePool.values.filter { it.isActive }.size
|
val activePolls = referencePool.values.filter { it.isActive }.size
|
||||||
|
if (orphanedReferences.isNotEmpty() && referencePool.isEmpty()) {
|
||||||
|
log.info { "Last active references removed from pull pool, " }
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eventGroup.isNotEmpty()) {
|
||||||
log.info { "Active polls $activePolls/${referencePool.values.size}" }
|
log.info { "Active polls $activePolls/${referencePool.values.size}" }
|
||||||
|
} else {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
eventGroup.forEach {
|
eventGroup.forEach {
|
||||||
val referenceId = it.first().referenceId()
|
val referenceId = it.first().referenceId()
|
||||||
|
|
||||||
@ -87,9 +102,15 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
|
|||||||
val events = eventManager.readAvailableEvents()
|
val events = eventManager.readAvailableEvents()
|
||||||
onEventGroupsReceived(events)
|
onEventGroupsReceived(events)
|
||||||
if (events.isNotEmpty()) {
|
if (events.isNotEmpty()) {
|
||||||
pullDelay.set(500)
|
if (pullDelay.get() != fastPullDelay.get()) {
|
||||||
|
log.info { "Available events found, switching to fast pull @ Delay -> ${fastPullDelay.get()}" }
|
||||||
|
}
|
||||||
|
pullDelay.set(fastPullDelay.get())
|
||||||
} else {
|
} else {
|
||||||
pullDelay.set(2500)
|
if (pullDelay.get() != slowPullDelay.get()) {
|
||||||
|
log.info { "None events available, switching to slow pull @ Delay -> ${slowPullDelay.get()}" }
|
||||||
|
}
|
||||||
|
pullDelay.set(slowPullDelay.get())
|
||||||
}
|
}
|
||||||
referencePool.values.awaitAll()
|
referencePool.values.awaitAll()
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user