From dca730afc27cd7210ca9bddc481743c7fd5be7a2 Mon Sep 17 00:00:00 2001 From: bskjon Date: Sat, 19 Apr 2025 01:38:37 +0200 Subject: [PATCH] Added filter for commentary + failing broken process --- .../listeners/BaseInfoFromFileTaskListener.kt | 6 + .../listeners/CompletedTaskListener.kt | 9 + .../listeners/ConvertWorkTaskListener.kt | 7 + .../listeners/CoverDownloadTaskListener.kt | 7 + .../CoverFromMetadataTaskListener.kt | 7 + .../EncodeWorkArgumentsTaskListener.kt | 6 + .../listeners/EncodeWorkTaskListener.kt | 7 + .../ExtractWorkArgumentsTaskListener.kt | 7 + .../listeners/ExtractWorkTaskListener.kt | 7 + .../MediaOutInformationTaskListener.kt | 7 + .../MetadataWaitOrDefaultTaskListener.kt | 9 +- .../ParseMediaFileStreamsTaskListener.kt | 9 + .../listeners/PersistContentTaskListener.kt | 9 + .../ReadMediaFileStreamsTaskListener.kt | 9 + .../mapping/streams/SubtitleArguments.kt | 8 +- .../mapping/streams/SubtitleArgumentsTest.kt | 196 ++++++++++++++++++ .../implementations/EventCoordinator.kt | 21 +- .../implementations/EventListenerImpl.kt | 4 + 18 files changed, 332 insertions(+), 3 deletions(-) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt index f1540206..a71dbeb3 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/BaseInfoFromFileTaskListener.kt @@ -81,5 +81,11 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() { } } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(BaseInfoEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + )) + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index 7861b441..a99ab29c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -217,6 +217,13 @@ class CompletedTaskListener : CoordinatorEventListener() { active = false } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(MediaProcessCompletedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } + internal data class ComposedMediaInfo( val title: String, val fallbackCollection: String, @@ -289,4 +296,6 @@ class CompletedTaskListener : CoordinatorEventListener() { return data.data } + + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index 82fe2007..af4fc5cf 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -139,4 +139,11 @@ class ConvertWorkTaskListener: WorkTaskListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(ConvertWorkCreatedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt index 1c8568a5..6186aa07 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt @@ -82,4 +82,11 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(MediaCoverDownloadedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index 2a37f086..030ba309 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -92,4 +92,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { onProduceEvent(result) active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(MediaCoverInfoReceivedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt index 3f71a957..4b979281 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt @@ -97,4 +97,10 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(EncodeArgumentCreatedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt index e4be4894..a886b376 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkTaskListener.kt @@ -74,4 +74,11 @@ class EncodeWorkTaskListener : WorkTaskListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(EncodeWorkCreatedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 4a2186ce..5376fe95 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -92,4 +92,11 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(ExtractArgumentCreatedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt index 8a6d2f23..ff6c145f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkTaskListener.kt @@ -83,4 +83,11 @@ class ExtractWorkTaskListener: WorkTaskListener() { } active = false } + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(ExtractWorkCreatedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt index 288d5ef4..cef7a5f5 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt @@ -78,6 +78,13 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { active = false } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(MediaOutInformationConstructedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } + class ProcessMediaInfoAndMetadata(val baseInfo: BaseInfo, val metadata: pyMetadata? = null) { var metadataDeterminedContentType: FileNameDeterminate.ContentType = metadata?.type?.let { contentType -> when (contentType) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt index c98eb72a..02b9d9fc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MetadataWaitOrDefaultTaskListener.kt @@ -113,7 +113,14 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() { timeoutJobs[digestEvent.referenceId()] = ttsc } } - + + override fun produceFailure(incomingEvent: Event) { + onProduceEvent(MediaMetadataReceivedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + )) + } + suspend fun createTimeout(referenceId: String, eventId: String, baseInfo: BaseInfoEvent) { val expiryTime = (Instant.now().epochSecond + metadataTimeout) val dateTime = LocalDateTime.ofEpochSecond(expiryTime, 0, ZoneOffset.UTC) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt index 35241bb6..e8d900cf 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ParseMediaFileStreamsTaskListener.kt @@ -12,6 +12,7 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.Event import no.iktdev.mediaprocessing.shared.common.contract.data.MediaFileStreamsParsedEvent +import no.iktdev.mediaprocessing.shared.common.contract.data.MediaMetadataReceivedEvent import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.common.contract.ffmpeg.SubtitleStream @@ -65,6 +66,14 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() { active = false } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent( + MediaFileStreamsParsedEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + ) + ) + } fun parseStreams(data: JsonObject?): ParsedMediaStreams { val ignoreCodecs = listOf("png", "mjpeg") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt index b033952b..a038115f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/PersistContentTaskListener.kt @@ -125,6 +125,15 @@ class PersistContentTaskListener : CoordinatorEventListener() { active = false } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent( + PersistedContentEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + ) + ) + } + internal data class ComposedMediaInfo( val title: String, val fallbackCollection: String, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt index c8fde36d..30912222 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ReadMediaFileStreamsTaskListener.kt @@ -78,6 +78,15 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() { active = false } + override fun produceFailure(incomingEvent: Event) { + onProduceEvent( + MediaFileStreamsReadEvent( + metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Failed, getProducerName()), + data = null + ) + ) + } + suspend fun fileReadStreams(started: StartEventData, eventId: String): JsonObject? { val file = File(started.file) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArguments.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArguments.kt index 75b55216..c7135556 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArguments.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArguments.kt @@ -45,8 +45,14 @@ class SubtitleArguments(val subtitleStreams: List) { return keywords.any { title.contains(it) } } + private fun SubtitleStream.isNonDialog(): Boolean { + val title = this.tags.title?.lowercase() ?: return false + val keywords = listOf("commentary") + return keywords.any { title.contains(it) } + } + private fun getSubtitleType(stream: SubtitleStream): SubtitleType { - return if (stream.isSignOrSong()) + return if (stream.isSignOrSong() || stream.isNonDialog()) SubtitleType.NON_DIALOGUE else if (stream.isSHD()) { SubtitleType.SHD diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArgumentsTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArgumentsTest.kt index 5770dd05..4e8bb597 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArgumentsTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/streams/SubtitleArgumentsTest.kt @@ -50,6 +50,13 @@ class SubtitleArgumentsTest { assertThat(args.firstOrNull()?.index).isEqualTo(0) } + @Test + fun assertThatCommentaryIsNotSelected() { + val data = Gson().fromJson>(streamsWithCommentary, type) + val args = SubtitleArguments(data).getSubtitleArguments() + assertThat(args).hasSize(1) + assertThat(args.firstOrNull()?.mediaIndex).isEqualTo(4) + } val multipleSubtitleStreamsWithSameLanguage = """ [{ @@ -367,4 +374,193 @@ class SubtitleArgumentsTest { } ] """.trimIndent() + + val streamsWithCommentary = """ + [ + { + "index": 4, + "codec_name": "subrip", + "codec_long_name": "SubRip subtitle", + "codec_type": "subtitle", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "r_frame_rate": "0/0", + "avg_frame_rate": "0/0", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "duration_ts": 5501856, + "duration": "5501.856000", + "disposition": { + "default": 0, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 1, + "hearing_impaired": 0, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + }, + "tags": { + "language": "eng", + "title": "English (Forced)", + "BPS": "4", + "DURATION": "00:50:23.770000000", + "NUMBER_OF_FRAMES": "67", + "NUMBER_OF_BYTES": "1591", + "_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit", + "_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52", + "_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES" + } + }, + { + "index": 5, + "codec_name": "subrip", + "codec_long_name": "SubRip subtitle", + "codec_type": "subtitle", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "r_frame_rate": "0/0", + "avg_frame_rate": "0/0", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "duration_ts": 5501856, + "duration": "5501.856000", + "disposition": { + "default": 0, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 0, + "hearing_impaired": 1, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + }, + "tags": { + "language": "eng", + "title": "English (SDH)", + "BPS": "54", + "DURATION": "01:28:15.462000000", + "NUMBER_OF_FRAMES": "1302", + "NUMBER_OF_BYTES": "35817", + "_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit", + "_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52", + "_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES" + } + }, + { + "index": 8, + "codec_name": "subrip", + "codec_long_name": "SubRip subtitle", + "codec_type": "subtitle", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "r_frame_rate": "0/0", + "avg_frame_rate": "0/0", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "duration_ts": 5501856, + "duration": "5501.856000", + "disposition": { + "default": 0, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 0, + "hearing_impaired": 0, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + }, + "tags": { + "language": "eng", + "title": "English (Commentary #1)", + "BPS": "124", + "DURATION": "01:30:35.847000000", + "NUMBER_OF_FRAMES": "1596", + "NUMBER_OF_BYTES": "84444", + "_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit", + "_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52", + "_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES" + } + }, + { + "index": 9, + "codec_name": "subrip", + "codec_long_name": "SubRip subtitle", + "codec_type": "subtitle", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "r_frame_rate": "0/0", + "avg_frame_rate": "0/0", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "duration_ts": 5501856, + "duration": "5501.856000", + "disposition": { + "default": 0, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 0, + "hearing_impaired": 0, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + }, + "tags": { + "language": "eng", + "title": "English (Commentary #2)", + "BPS": "134", + "DURATION": "01:31:22.561000000", + "NUMBER_OF_FRAMES": "1646", + "NUMBER_OF_BYTES": "92269", + "_STATISTICS_WRITING_APP": "mkvmerge v90.0 ('Hanging On') 64-bit", + "_STATISTICS_WRITING_DATE_UTC": "2025-03-12 18:54:52", + "_STATISTICS_TAGS": "BPS DURATION NUMBER_OF_FRAMES NUMBER_OF_BYTES" + } + } + ] + """.trimIndent() } \ No newline at end of file diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt index 08a2fd40..eef67ec2 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventCoordinator.kt @@ -71,6 +71,7 @@ abstract class EventCoordinator> { } + val bashingReferenceObject: MutableMap>> = mutableMapOf() private suspend fun onEventsReceived(events: List): Boolean = coroutineScope { val listeners = getListeners() @@ -80,18 +81,36 @@ abstract class EventCoordinator> { val consumableEvent = ConsumableEvent(event) listener.onEventsReceived(consumableEvent, events) if (consumableEvent.isConsumed) { - // 🚨 Suppress logging hvis det er en deadlock val referenceId = events.first().referenceId() val listenerName = listener::class.java.simpleName + val eventId = consumableEvent.metadata().eventId + val bashingId = "$eventId-$listenerName" + bashingReferenceObject.computeIfAbsent(referenceId) { mutableListOf() }.add(Pair(bashingId, System.currentTimeMillis())) + + // 🚨 Suppress logging hvis det er en deadlock if (EventDeadlockDetector.detect(referenceId, listenerName, event.eventType.toString())) { log.info { "Consumption detected for $referenceId -> $listenerName on event ${event.eventType}" } EventDeadlockDetector.resolve(referenceId, listenerName, event.eventType.toString()) } + + val bashed = bashingReferenceObject[referenceId]?.takeLast(10) ?: emptyList() + if (bashed.any { it == bashed.first() }) { + // We have entered a deadlock here + // Due to the nature of the deadlock the event will be determined to be dead + log.error { "Producing Failure on $referenceId on event ${event.eventType} due to deadlock in $listenerName" } + listener.produceFailure(event) + } + return@coroutineScope true } } } } + val threshold = System.currentTimeMillis() - 300_000 + bashingReferenceObject.entries.removeIf { entry -> + entry.value.removeIf { it.second < threshold } + entry.value.isEmpty() // Fjern oppføringen hvis listen nå er tom + } log.debug { "No consumption detected for ${events.first().referenceId()}" } false } diff --git a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt index e1d28592..c5588769 100644 --- a/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt +++ b/shared/eventi/src/main/kotlin/no/iktdev/eventi/implementations/EventListenerImpl.kt @@ -32,6 +32,9 @@ abstract class EventListenerImpl> { } } + abstract fun produceFailure(incomingEvent: T) + + open fun isOfEventsIListenFor(event: T): Boolean { return listensForEvents.any { it == event.eventType } } @@ -98,6 +101,7 @@ abstract class EventListenerImpl> { return true } + /** * @param incomingEvent Can be a new event or iterated form sequence in order to re-produce events * @param events Will be all available events for collection with the same reference id