This commit is contained in:
Brage 2023-07-26 00:05:45 +02:00
parent e9f972f8b6
commit c20ce36c41
6 changed files with 17 additions and 10 deletions

View File

@ -33,10 +33,18 @@ abstract class DefaultKafkaReader(val subId: String) {
messageProducer.sendMessage(event.event, message) messageProducer.sendMessage(event.event, message)
} }
fun produceMessage(event: KafkaEvents, baseMessage: Message, status: StatusType = StatusType.SUCCESS, data: Any?) { fun produceMessage(event: KafkaEvents, baseMessage: Message, data: Any?) {
val message = Message( val message = Message(
referenceId = baseMessage.referenceId, referenceId = baseMessage.referenceId,
Status(statusType = status), baseMessage.status,
data = data
)
messageProducer.sendMessage(event.event, message)
}
fun produceSuccessMessage(event: KafkaEvents, referenceId: String, data: Any? = null) {
val message = Message(
referenceId = referenceId,
status = Status(StatusType.SUCCESS),
data = data data = data
) )
messageProducer.sendMessage(event.event, message) messageProducer.sendMessage(event.event, message)

View File

@ -77,13 +77,13 @@ class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialM
if (videoInfo is EpisodeInfo) { if (videoInfo is EpisodeInfo) {
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, initMessage, videoInfo) produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_SERIE, referenceId, videoInfo)
} else if (videoInfo is MovieInfo) { } else if (videoInfo is MovieInfo) {
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, initMessage, videoInfo) produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_MOVIE, referenceId, videoInfo)
} }
val out = ContentOutName(videoInfo.fullName) val out = ContentOutName(videoInfo.fullName)
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out) produceSuccessMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, referenceId, out)
} }
final override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> { final override fun loadDeserializers(): Map<String, IMessageDataDeserialization<*>> {

View File

@ -68,7 +68,7 @@ class EncodedStreams : DefaultKafkaReader("streamSelector"), ISequentialMessageE
produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "Failed to generate Video Arguments Bundle") produceErrorMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, "Failed to generate Video Arguments Bundle")
return return
} }
produceMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, Message(referenceId, Status(StatusType.SUCCESS)), videoInstructions) produceSuccessMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED_VIDEO, referenceId, videoInstructions)
} }

View File

@ -46,7 +46,7 @@ class ConvertedSubtitleConsumer : DefaultKafkaReader("collectorConsumerConverted
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) { if (status) {
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, data.value().referenceId)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else { } else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")

View File

@ -117,8 +117,7 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
} }
} }
val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, collection.getReferenceId() ?: "M.I.A", null)
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)
logger.info { "Stored ${encodeWork.outFile} video" } logger.info { "Stored ${encodeWork.outFile} video" }
} }
} }

View File

@ -43,7 +43,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted
val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS)) val message = Message(referenceId = data.value()?.referenceId ?: "M.I.A", status = Status(statusType = StatusType.SUCCESS))
if (status) { if (status) {
produceMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message, null) produceSuccessMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, data.value().referenceId)
logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" } logger.info { "Stored ${File(workResult.outFile).absolutePath} subtitle" }
} else { } else {
produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log") produceErrorMessage(KafkaEvents.EVENT_COLLECTOR_SUBTITLE_STORED, message.withNewStatus(status = Status(statusType = StatusType.ERROR)), "Unknown, see log")