diff --git a/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt index 76e88548..8279dbd1 100644 --- a/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt +++ b/CommonCode/src/main/java/no/iktdev/streamit/content/common/DefaultKafkaReader.kt @@ -33,10 +33,18 @@ abstract class DefaultKafkaReader(val subId: String) { messageProducer.sendMessage(event.event, message) } - fun produceMessage(event: KafkaEvents, baseMessage: Message, status: StatusType = StatusType.SUCCESS, data: Any?) { + fun produceMessage(event: KafkaEvents, baseMessage: Message, data: Any?) { val message = Message( referenceId = baseMessage.referenceId, - Status(statusType = status), + baseMessage.status, + data = data + ) + messageProducer.sendMessage(event.event, message) + } + fun produceSuccessMessage(event: KafkaEvents, referenceId: String, data: Any? = null) { + val message = Message( + referenceId = referenceId, + status = Status(StatusType.SUCCESS), data = data ) messageProducer.sendMessage(event.event, message) diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt index 36f6f3fc..e2b362ff 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/contentDeterminator/ContentDeterminate.kt @@ -77,13 +77,13 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM if (videoInfo is EpisodeInfo) { - produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo) + produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, referenceId, videoInfo) } else if (videoInfo is MovieInfo) { - produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo) + produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, referenceId, videoInfo) } val out = ContentOutName(videoInfo.fullName) - produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out) + produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, referenceId, out) } final override fun loadDeserializers(): Map> { diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt index ef736a39..6ec0cea1 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/analyzer/encoding/EncodedStreams.kt @@ -68,7 +68,7 @@ class EncodedStreams : DefaultKafkaReader("streamSelector"), ISequentialMessageE produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "Failed to generate Video Arguments Bundle") return } - produceMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, Message(referenceId, Status(StatusType.SUCCESS)), videoInstructions) + produceSuccessMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, videoInstructions) } diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt index d7b892d8..122097e4 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ConvertedSubtitleConsumer.kt @@ -46,7 +46,7 @@ class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConverted val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) if (status) { - produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, data.value().referenceId) logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } } else { produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt index 03b81145..3452f2e9 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/EncodedVideoConsumer.kt @@ -117,8 +117,7 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"), } } - val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) - produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", null) logger.info { "Stored ${encodeWork.outFile} video" } } } \ No newline at end of file diff --git a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt index 49828125..912abde0 100644 --- a/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt +++ b/Reader/src/main/kotlin/no/iktdev/streamit/content/reader/collector/ExtractedSubtitleConsumer.kt @@ -43,7 +43,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) if (status) { - produceMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message, null) + produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, data.value().referenceId) logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } } else { produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")