Wrapped in transactions

This commit is contained in:
Brage 2023-07-25 01:22:58 +02:00
parent dc8d5c44e4
commit 0b9e08c22a
3 changed files with 47 additions and 27 deletions

View File

@ -34,6 +34,7 @@ class ConvertRunner(val referenceId: String, val listener: IConvertListener) {
val syncedDialogs = Syncro().sync(dialogs) val syncedDialogs = Syncro().sync(dialogs)
try {
val converted = Export(inFile, syncedDialogs).write() val converted = Export(inFile, syncedDialogs).write()
converted.forEach { converted.forEach {
val item = ConvertWork( val item = ConvertWork(
@ -46,6 +47,12 @@ class ConvertRunner(val referenceId: String, val listener: IConvertListener) {
listener.onEnded(referenceId, subtitleInfo, work = item) listener.onEnded(referenceId, subtitleInfo, work = item)
} }
} }
} catch (e: Exception) {
e.printStackTrace()
withContext(Dispatchers.Default) {
listener.onError(referenceId, subtitleInfo, "See log")
}
}
} }

View File

@ -14,6 +14,7 @@ import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener import no.iktdev.streamit.library.kafka.listener.collector.CollectorMessageListener
import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent import no.iktdev.streamit.library.kafka.listener.collector.ICollectedMessagesEvent
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@ -66,10 +67,15 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
val contentType = metadata?.type ?: return val contentType = metadata?.type ?: return
val iid = if (contentType == "movie") MovieQuery(videoFileNameWithExtension).insertAndGetId() else null val iid = if (contentType == "movie") transaction {
MovieQuery(videoFileNameWithExtension).insertAndGetId()
} else null
if (serieData != null) { if (serieData != null) {
val success = SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension).insertAndGetStatus() val success = transaction {
SerieQuery(serieData.title, serieData.episode, serieData.season, fileData.title, videoFileNameWithExtension)
.insertAndGetStatus()
}
if (!success) if (!success)
return return
} }
@ -86,8 +92,10 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
} }
val metaGenre = metadata.genres val metaGenre = metadata.genres
val gq = GenreQuery(*metaGenre.toTypedArray()) val gq = GenreQuery(*metaGenre.toTypedArray())
transaction {
gq.insertAndGetIds() gq.insertAndGetIds()
val gids = gq.getIds().joinToString(",") }
val gids = transaction { gq.getIds().joinToString(",") }
val cq = CatalogQuery( val cq = CatalogQuery(
title = fileData.title, title = fileData.title,
@ -97,15 +105,17 @@ class EncodedVideoConsumer: DefaultKafkaReader("collectorConsumerEncodedVideo"),
iid = iid, iid = iid,
genres = gids genres = gids
) )
val cid = cq.insertAndGetId() ?: cq.getId() ?: return val cid = transaction { cq.insertAndGetId() ?: cq.getId() } ?: return
if (!metadata.summary.isNullOrBlank()) { if (!metadata.summary.isNullOrBlank()) {
val summary = metadata.summary ?: return val summary = metadata.summary ?: return
transaction {
SummaryQuery( SummaryQuery(
cid = cid, cid = cid,
language = "eng", // TODO: Fix later, language = "eng", // TODO: Fix later,
description = summary description = summary
) )
} }
}
val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS)) val message = Message(referenceId = collection.getReferenceId() ?: "M.I.A", status = Status(StatusType.SUCCESS))
produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null) produceMessage(KafkaEvents.EVENT_COLLECTOR_VIDEO_STORED, message, null)

View File

@ -10,6 +10,7 @@ import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization import no.iktdev.streamit.library.kafka.listener.deserializer.IMessageDataDeserialization
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.jetbrains.exposed.sql.transactions.transaction
import java.io.File import java.io.File
class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") { class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtractedSubtitle") {
@ -26,6 +27,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted
} }
val of = File(workResult.outFile) val of = File(workResult.outFile)
transaction {
SubtitleQuery( SubtitleQuery(
title = of.nameWithoutExtension, title = of.nameWithoutExtension,
language = workResult.language, language = workResult.language,
@ -34,6 +36,7 @@ class ExtractedSubtitleConsumer : DefaultKafkaReader("collectorConsumerExtracted
).insertAndGetStatus() ).insertAndGetStatus()
} }
} }
}
init { init {
listener.listen() listener.listen()