Added filter for commentary + failing broken process

This commit is contained in:
bskjon 2025-04-19 01:38:37 +02:00
parent 816e268b2d
commit dca730afc2
18 changed files with 332 additions and 3 deletions

View File

@ -81,5 +81,11 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
}
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(BaseInfoEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
))
}
}

View File

@ -217,6 +217,13 @@ class CompletedTaskListener : CoordinatorEventListener() {
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(MediaProcessCompletedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
internal data class ComposedMediaInfo(
val title: String,
val fallbackCollection: String,
@ -289,4 +296,6 @@ class CompletedTaskListener : CoordinatorEventListener() {
return data.data
}
}

View File

@ -139,4 +139,11 @@ class ConvertWorkTaskListener: WorkTaskListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(ConvertWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -82,4 +82,11 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(MediaCoverDownloadedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -92,4 +92,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
onProduceEvent(result)
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(MediaCoverInfoReceivedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -97,4 +97,10 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(EncodeArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
))
}
}

View File

@ -74,4 +74,11 @@ class EncodeWorkTaskListener : WorkTaskListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(EncodeWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -92,4 +92,11 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(ExtractArgumentCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -83,4 +83,11 @@ class ExtractWorkTaskListener: WorkTaskListener() {
}
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(ExtractWorkCreatedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
}

View File

@ -78,6 +78,13 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(MediaOutInformationConstructedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
class ProcessMediaInfoAndMetadata(val baseInfo: BaseInfo, val metadata: pyMetadata? = null) {
var metadataDeterminedContentType: FileNameDeterminate.ContentType = metadata?.type?.let { contentType ->
when (contentType) {

View File

@ -114,6 +114,13 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
}
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(MediaMetadataReceivedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
))
}
suspend fun createTimeout(referenceId: String, eventId: String, baseInfo: BaseInfoEvent) {
val expiryTime = (Instant.now().epochSecond + metadataTimeout)
val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC)

View File

@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener
import no.iktdev.mediaprocessing.shared.common.contract.Events
import no.iktdev.mediaprocessing.shared.common.contract.data.Event
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaFileStreamsParsedEvent
import no.iktdev.mediaprocessing.shared.common.contract.data.MediaMetadataReceivedEvent
import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.SubtitleStream
@ -65,6 +66,14 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(
MediaFileStreamsParsedEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
)
)
}
fun parseStreams(data: JsonObject?): ParsedMediaStreams {
val ignoreCodecs = listOf("png", "mjpeg")

View File

@ -125,6 +125,15 @@ class PersistContentTaskListener : CoordinatorEventListener() {
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(
PersistedContentEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
)
)
}
internal data class ComposedMediaInfo(
val title: String,
val fallbackCollection: String,

View File

@ -78,6 +78,15 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
active = false
}
override fun produceFailure(incomingEvent: Event) {
onProduceEvent(
MediaFileStreamsReadEvent(
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()),
data = null
)
)
}
suspend fun fileReadStreams(started: StartEventData, eventId: String): JsonObject? {
val file = File(started.file)

View File

@ -45,8 +45,14 @@ class SubtitleArguments(val subtitleStreams: List<SubtitleStream>) {
return keywords.any { title.contains(it) }
}
private fun SubtitleStream.isNonDialog(): Boolean {
val title = this.tags.title?.lowercase() ?: return false
val keywords = listOf("commentary")
return keywords.any { title.contains(it) }
}
private fun getSubtitleType(stream: SubtitleStream): SubtitleType {
return if (stream.isSignOrSong())
return if (stream.isSignOrSong() || stream.isNonDialog())
SubtitleType.NON_DIALOGUE
else if (stream.isSHD()) {
SubtitleType.SHD

View File

@ -50,6 +50,13 @@ class SubtitleArgumentsTest {
assertThat(args.firstOrNull()?.index).isEqualTo(0)
}
@Test
fun assertThatCommentaryIsNotSelected() {
val data = Gson().fromJson<List<SubtitleStream>>(streamsWithCommentary, type)
val args = SubtitleArguments(data).getSubtitleArguments()
assertThat(args).hasSize(1)
assertThat(args.firstOrNull()?.mediaIndex).isEqualTo(4)
}
val multipleSubtitleStreamsWithSameLanguage = """
[{
@ -367,4 +374,193 @@ class SubtitleArgumentsTest {
}
]
""".trimIndent()
val streamsWithCommentary = """
[
{
"index": 4,
"codec_name": "subrip",
"codec_long_name": "SubRip subtitle",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 5501856,
"duration": "5501.856000",
"disposition": {
"default": 0,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 1,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "eng",
"title": "English (Forced)",
"BPS": "4",
"DURATION": "00:50:23.770000000",
"NUMBER_OF_FRAMES": "67",
"NUMBER_OF_BYTES": "1591",
"_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit",
"_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52",
"_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES"
}
},
{
"index": 5,
"codec_name": "subrip",
"codec_long_name": "SubRip subtitle",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 5501856,
"duration": "5501.856000",
"disposition": {
"default": 0,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 1,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "eng",
"title": "English (SDH)",
"BPS": "54",
"DURATION": "01:28:15.462000000",
"NUMBER_OF_FRAMES": "1302",
"NUMBER_OF_BYTES": "35817",
"_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit",
"_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52",
"_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES"
}
},
{
"index": 8,
"codec_name": "subrip",
"codec_long_name": "SubRip subtitle",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 5501856,
"duration": "5501.856000",
"disposition": {
"default": 0,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "eng",
"title": "English (Commentary #1)",
"BPS": "124",
"DURATION": "01:30:35.847000000",
"NUMBER_OF_FRAMES": "1596",
"NUMBER_OF_BYTES": "84444",
"_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit",
"_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52",
"_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES"
}
},
{
"index": 9,
"codec_name": "subrip",
"codec_long_name": "SubRip subtitle",
"codec_type": "subtitle",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"duration_ts": 5501856,
"duration": "5501.856000",
"disposition": {
"default": 0,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "eng",
"title": "English (Commentary #2)",
"BPS": "134",
"DURATION": "01:31:22.561000000",
"NUMBER_OF_FRAMES": "1646",
"NUMBER_OF_BYTES": "92269",
"_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit",
"_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52",
"_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES"
}
}
]
""".trimIndent()
}

View File

@ -71,6 +71,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
}
val bashingReferenceObject: MutableMap<String, MutableList<Pair<String, Long>>> = mutableMapOf()
private suspend fun onEventsReceived(events: List<T>): Boolean = coroutineScope {
val listeners = getListeners()
@ -80,18 +81,36 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
val consumableEvent = ConsumableEvent(event)
listener.onEventsReceived(consumableEvent, events)
if (consumableEvent.isConsumed) {
// 🚨 Suppress logging hvis det er en deadlock
val referenceId = events.first().referenceId()
val listenerName = listener::class.java.simpleName
val eventId = consumableEvent.metadata().eventId
val bashingId = "$eventId-$listenerName"
bashingReferenceObject.computeIfAbsent(referenceId) { mutableListOf() }.add(Pair(bashingId, System.currentTimeMillis()))
// 🚨 Suppress logging hvis det er en deadlock
if (EventDeadlockDetector.detect(referenceId, listenerName, event.eventType.toString())) {
log.info { "Consumption detected for $referenceId -> $listenerName on event ${event.eventType}" }
EventDeadlockDetector.resolve(referenceId, listenerName, event.eventType.toString())
}
val bashed = bashingReferenceObject[referenceId]?.takeLast(10) ?: emptyList()
if (bashed.any { it == bashed.first() }) {
// We have entered a deadlock here
// Due to the nature of the deadlock the event will be determined to be dead
log.error { "Producing Failure on $referenceId on event ${event.eventType} due to deadlock in $listenerName" }
listener.produceFailure(event)
}
return@coroutineScope true
}
}
}
}
val threshold = System.currentTimeMillis() - 300_000
bashingReferenceObject.entries.removeIf { entry ->
entry.value.removeIf { it.second < threshold }
entry.value.isEmpty() // Fjern oppføringen hvis listen nå er tom
}
log.debug { "No consumption detected for ${events.first().referenceId()}" }
false
}

View File

@ -32,6 +32,9 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
}
}
abstract fun produceFailure(incomingEvent: T)
open fun isOfEventsIListenFor(event: T): Boolean {
return listensForEvents.any { it == event.eventType }
}
@ -98,6 +101,7 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
return true
}
/**
* @param incomingEvent Can be a new event or iterated form sequence in order to re-produce events
* @param events Will be all available events for collection with the same reference id