From 57800a1fba03cc917ba5dc1d4310f96ee7784b90 Mon Sep 17 00:00:00 2001 From: Brage Date: Thu, 7 Dec 2023 23:35:05 +0100 Subject: [PATCH] V2 update 2 --- .../converter/ConverterApplication.kt | 18 ++ .../coordinator/Implementations.kt | 7 + .../coordinator/src/test/kotlin/FlowITTest.kt | 3 + apps/coordinator/src/test/kotlin/TestKafka.kt | 72 ++++++++ .../src/test/kotlin/TestMessageListener.kt | 6 + .../reader/BaseInfoFromFileTest.kt | 36 ++++ .../processer/EncodeService.kt | 20 +++ .../processer/ExtractService.kt | 8 + .../processer/ProcesserApplication.kt | 22 +++ .../shared/common/DownloadClient.kt | 95 +++++++++++ .../shared/common/Preference.kt | 54 ++++++ .../shared/common/ProcessingService.kt | 14 ++ .../shared/common/SharedConfig.kt | 22 +++ .../mediaprocessing/shared/common/Utils.kt | 21 +++ .../shared/common/datasource/DataSource.kt | 34 ++++ .../common/datasource/MySqlDataSource.kt | 87 ++++++++++ .../datasource/TableDefaultOperations.kt | 67 ++++++++ .../shared/common/extended/FileExt.kt | 100 +++++++++++ .../common/kafka/CoordinatorProducer.kt | 24 +++ .../common/parsing/FileNameDeterminate.kt | 160 ++++++++++++++++++ .../shared/common/parsing/FileNameParser.kt | 95 +++++++++++ .../persistance/PersistentDataReader.kt | 28 +++ .../common/persistance/PersistentDataStore.kt | 19 +++ .../common/persistance/PersistentMessage.kt | 32 ++++ .../shared/common/persistance/events.kt | 19 +++ .../common/persistance/processerEvents.kt | 10 ++ .../shared/common/runner/IRunner.kt | 13 ++ .../shared/common/runner/ResultRunner.kt | 20 +++ .../shared/common/runner/Runner.kt | 41 +++++ .../common/socket/SocketImplementation.kt | 23 +++ .../shared/common/H2DataSource.kt | 70 ++++++++ 31 files changed, 1240 insertions(+) create mode 100644 apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt create mode 100644 apps/coordinator/src/test/kotlin/FlowITTest.kt create mode 100644 apps/coordinator/src/test/kotlin/TestKafka.kt create mode 100644 apps/coordinator/src/test/kotlin/TestMessageListener.kt create mode 100644 apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt create mode 100644 apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/extended/FileExt.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameParser.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/IRunner.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/ResultRunner.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt create mode 100644 shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt create mode 100644 shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt new file mode 100644 index 00000000..d937c34a --- /dev/null +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/ConverterApplication.kt @@ -0,0 +1,18 @@ +package no.iktdev.mediaprocessing.converter + +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.runApplication +import org.springframework.context.ApplicationContext + +@SpringBootApplication +class ConvertApplication + +private var context: ApplicationContext? = null +@Suppress("unused") +fun getContext(): ApplicationContext? { + return context +} +fun main(args: Array) { + context = runApplication(*args) +} +//private val logger = KotlinLogging.logger {} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt new file mode 100644 index 00000000..14f8fe69 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt @@ -0,0 +1,7 @@ +package no.iktdev.mediaprocessing.coordinator + +import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation + +/*class SocketImplemented: SocketImplementation() { + +}*/ \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/FlowITTest.kt b/apps/coordinator/src/test/kotlin/FlowITTest.kt new file mode 100644 index 00000000..56c4fe80 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/FlowITTest.kt @@ -0,0 +1,3 @@ +class FlowITTest { + //val h2 = H2DataSource() +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/TestKafka.kt b/apps/coordinator/src/test/kotlin/TestKafka.kt new file mode 100644 index 00000000..f1829611 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/TestKafka.kt @@ -0,0 +1,72 @@ +import com.google.gson.Gson +import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.apache.kafka.common.serialization.StringSerializer +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory + +class TestKafka { + companion object { + private var listen: Boolean = false + private val topic = "nan" + private val gson = Gson() + + val consumer = object : DefaultConsumer() { + override fun consumerFactory(): DefaultKafkaConsumerFactory { + val config: MutableMap = HashMap() + config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId" + config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java + config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit + return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) + } + } + + val listener = object: DefaultMessageListener(topic, consumer) { + override fun listen() { + listen = true + } + } + + val producer = object: CoordinatorProducer() { + + val messages = mutableListOf>() + + override fun usingKafkaTemplate(): KafkaTemplate { + val producerFactory: ProducerFactory = DefaultKafkaProducerFactory(mapOf( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java + )) + return KafkaTemplate(producerFactory) + } + + + override fun sendMessage(key: String, message: Message) { + val mockRecord = ConsumerRecord( + topic, + 0, + messages.size.toLong(), + key, + gson.toJson(message) + ) + if (listen) { + messages.add(mockRecord) + listener.onMessage(mockRecord) + } + } + } + } + + +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/TestMessageListener.kt b/apps/coordinator/src/test/kotlin/TestMessageListener.kt new file mode 100644 index 00000000..bdf0a596 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/TestMessageListener.kt @@ -0,0 +1,6 @@ +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener + +class TestMessageListener: DefaultMessageListener("nan") { + override fun listen() { + } +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt new file mode 100644 index 00000000..2eb76bf7 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt @@ -0,0 +1,36 @@ +package no.iktdev.mediaprocessing.coordinator.reader + +import TestKafka +import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.streamit.library.kafka.dto.Status +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Test +import java.io.File +import java.util.UUID + +class BaseInfoFromFileTest { + val referenceId = UUID.randomUUID().toString() + val baseInfoFromFile = BaseInfoFromFile(TestKafka.producer, TestKafka.listener) + + @Test + fun testReadFileInfo() { + val input = ProcessStarted(Status.COMPLETED, ProcessType.FLOW, + File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath + ) + + val result = baseInfoFromFile.readFileInfo(input) + assertThat(result).isInstanceOf(BaseInfoPerformed::class.java) + val asResult = result as BaseInfoPerformed + assertThat(result.status).isEqualTo(Status.COMPLETED) + assertThat(asResult.title).isEqualTo("Kage no Jitsuryokusha ni Naritakute!") + assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01") + } + + + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt new file mode 100644 index 00000000..24b9ac0f --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.processer + +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import org.springframework.stereotype.Service + +//@Service +class EncodeService { + /*private val log = KotlinLogging.logger {} + val io = Coroutines.io() + + val producer = CoordinatorProducer() + private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> + if (event.key == KafkaEvents.EVENT_WORK_ENCODE_CREATED) { + + } + }*/ + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt new file mode 100644 index 00000000..7429d816 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ExtractService.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.processer + +import org.springframework.stereotype.Service + +@Service +class ExtractService { + +} \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt new file mode 100644 index 00000000..19ae6ea1 --- /dev/null +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ProcesserApplication.kt @@ -0,0 +1,22 @@ +package no.iktdev.mediaprocessing.processer + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation +import org.springframework.boot.autoconfigure.SpringBootApplication +import org.springframework.boot.runApplication + +private val logger = KotlinLogging.logger {} + +@SpringBootApplication +class ProcesserApplication { +} + +fun main(args: Array) { + //val dataSource = MySqlDataSource.fromDatabaseEnv(); + val context = runApplication(*args) +} + +class SocketImplemented: SocketImplementation() { + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt new file mode 100644 index 00000000..f7c33d26 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt @@ -0,0 +1,95 @@ +package no.iktdev.mediaprocessing.shared.common + +import no.iktdev.exfl.using +import java.io.File +import java.io.FileOutputStream +import java.net.HttpURLConnection +import java.net.URL + +open class DownloadClient(val url: String, val outDir: File, val baseName: String) { + protected val http: HttpURLConnection = openConnection() + private val BUFFER_SIZE = 4096 + + private fun openConnection(): HttpURLConnection { + try { + return URL(url).openConnection() as HttpURLConnection + } catch (e: Exception) { + e.printStackTrace() + throw BadAddressException("Provided url is either not provided (null) or is not a valid http url") + } + } + + protected fun getLength(): Int { + return http.contentLength + } + + protected fun getProgress(read: Int, total: Int = getLength()): Int { + return ((read * 100) / total) + } + + suspend fun download(): File? { + val extension = getExtension() + ?: throw UnsupportedFormatException("Provided url does not contain a supported file extension") + val outFile = outDir.using("$baseName.$extension") + val inputStream = http.inputStream + val fos = FileOutputStream(outFile, false) + + var totalBytesRead = 0 + val buffer = ByteArray(BUFFER_SIZE) + inputStream.apply { + fos.use { fout -> + run { + var bytesRead = read(buffer) + while (bytesRead >= 0) { + fout.write(buffer, 0, bytesRead) + totalBytesRead += bytesRead + bytesRead = read(buffer) + // System.out.println(getProgress(totalBytesRead)) + } + } + } + } + inputStream.close() + fos.close() + return outFile + } + + open fun getExtension(): String? { + val possiblyExtension = url.lastIndexOf(".") + 1 + return if (possiblyExtension > 1) { + return url.toString().substring(possiblyExtension) + } else { + val mimeType = http.contentType ?: null + contentTypeToExtension()[mimeType] + } + } + + open fun contentTypeToExtension(): Map { + return mapOf( + "image/png" to "png", + "image/jpeg" to "jpg", + "image/webp" to "webp", + "image/bmp" to "bmp", + "image/tiff" to "tiff" + ) + } + + + class BadAddressException : java.lang.Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } + + class UnsupportedFormatException : Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } + + class InvalidFileException : Exception { + constructor() : super() {} + constructor(message: String?) : super(message) {} + constructor(message: String?, cause: Throwable?) : super(message, cause) {} + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt new file mode 100644 index 00000000..d4c11736 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Preference.kt @@ -0,0 +1,54 @@ +package no.iktdev.mediaprocessing.shared.common + +import com.google.gson.Gson +import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.PreferenceDto + +private val log = KotlinLogging.logger {} +class Preference { + + companion object { + fun getPreference(): PreferenceDto { + val preference = readPreferenceFromFile() ?: PreferenceDto() + log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec } + log.info { "[Audio]: Language = " + preference.encodePreference.audio.language } + log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels } + log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate } + log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected } + + log.info { "[Video]: Codec = " + preference.encodePreference.video.codec } + log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat } + log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") } + log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold } + + return preference + } + + private fun readPreferenceFromFile(): PreferenceDto? { + val prefFile = SharedConfig.preference + if (!prefFile.exists()) { + log.info("Preference file: ${prefFile.absolutePath} does not exists...") + log.info("Using default configuration") + return null + } + else { + log.info("Preference file: ${prefFile.absolutePath} found") + } + + return try { + val instr = prefFile.inputStream() + val text = instr.bufferedReader().use { it.readText() } + Gson().fromJson(text, PreferenceDto::class.java) + } + catch (e: Exception) { + log.error("Failed to read preference file: ${prefFile.absolutePath}.. Will use default configuration") + null + } + } + } + + + + + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt new file mode 100644 index 00000000..842eca67 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt @@ -0,0 +1,14 @@ +package no.iktdev.mediaprocessing.shared.common + +import no.iktdev.exfl.coroutines.Coroutines +import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.stereotype.Service + +@Service +abstract class ProcessingService(var producer: CoordinatorProducer, var listener: DefaultMessageListener) { + val io = Coroutines.io() + abstract fun onResult(referenceId: String, data: MessageDataWrapper) + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt new file mode 100644 index 00000000..e767447d --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -0,0 +1,22 @@ +package no.iktdev.mediaprocessing.shared.common + +import java.io.File + +object SharedConfig { + var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" + var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") + val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") + + val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe" + val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "no/iktdev/mediaprocessing/shared/contract/ffmpeg" + + val preference: File = File("/data/config/preference.json") +} + +object DatabaseConfig { + val address: String? = System.getenv("DATABASE_ADDRESS") + val port: String? = System.getenv("DATABASE_PORT") + val username: String? = System.getenv("DATABASE_USERNAME") + val password: String? = System.getenv("DATABASE_PASSWORD") + val database: String? = System.getenv("DATABASE_NAME") +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt new file mode 100644 index 00000000..86842154 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -0,0 +1,21 @@ +package no.iktdev.mediaprocessing.shared.common + +import mu.KotlinLogging +import java.io.File +import java.io.RandomAccessFile + +private val logger = KotlinLogging.logger {} + +fun isFileAvailable(file: File): Boolean { + if (!file.exists()) return false + var stream: RandomAccessFile? = null + try { + stream = RandomAccessFile(file, "rw") + stream.close() + logger.info { "File ${file.name} is read and writable" } + return true + } catch (e: Exception) { + stream?.close() + } + return false +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt new file mode 100644 index 00000000..b737135b --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/DataSource.kt @@ -0,0 +1,34 @@ +package no.iktdev.mediaprocessing.shared.common.datasource + +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.Table +import java.time.Instant +import java.time.LocalDateTime +import java.time.ZoneId +import java.time.ZoneOffset + +abstract class DataSource(val databaseName: String, val address: String, val port: String?, val username: String, val password: String) { + + abstract fun createDatabase(): Database? + + abstract fun createTables(vararg tables: Table) + + abstract fun createDatabaseStatement(): String + + abstract fun toConnectionUrl(): String + + fun toPortedAddress(): String { + return if (!address.contains(":") && port?.isBlank() != true) { + "$address:$port" + } else address + } + +} + +fun timestampToLocalDateTime(timestamp: Int): LocalDateTime { + return Instant.ofEpochSecond(timestamp.toLong()).atZone(ZoneId.systemDefault()).toLocalDateTime() +} + +fun LocalDateTime.toEpochSeconds(): Long { + return this.toEpochSecond(ZoneOffset.ofTotalSeconds(ZoneOffset.systemDefault().rules.getOffset(LocalDateTime.now()).totalSeconds)) +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt new file mode 100644 index 00000000..6373310b --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/MySqlDataSource.kt @@ -0,0 +1,87 @@ +package no.iktdev.mediaprocessing.shared.common.datasource + +import mu.KotlinLogging +import no.iktdev.mediaprocessing.shared.common.DatabaseConfig +import org.jetbrains.exposed.sql.Database +import org.jetbrains.exposed.sql.SchemaUtils +import org.jetbrains.exposed.sql.Table +import org.jetbrains.exposed.sql.transactions.TransactionManager +import org.jetbrains.exposed.sql.transactions.transaction + + +open class MySqlDataSource(databaseName: String, address: String, port: String = "", username: String, password: String): DataSource(databaseName = databaseName, address = address, port = port, username = username, password = password) { + val log = KotlinLogging.logger {} + companion object { + fun fromDatabaseEnv(): MySqlDataSource { + if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'") + if (DatabaseConfig.username.isNullOrBlank()) throw RuntimeException("Database username is not defined in 'DATABASE_USERNAME'") + if (DatabaseConfig.address.isNullOrBlank()) throw RuntimeException("Database address is not defined in 'DATABASE_ADDRESS'") + return MySqlDataSource( + databaseName = DatabaseConfig.database, + address = DatabaseConfig.address, + port = DatabaseConfig.port ?: "", + username = DatabaseConfig.username, + password = DatabaseConfig.password ?: "" + ) + } + } + + override fun createDatabase(): Database? { + val ok = transaction(toDatabaseServerConnection()) { + val tmc = TransactionManager.current().connection + val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '$databaseName'" + val stmt = tmc.prepareStatement(query, true) + + val resultSet = stmt.executeQuery() + val databaseExists = resultSet.next() + + if (!databaseExists) { + try { + exec(createDatabaseStatement()) + log.info { "Database $databaseName created." } + true + } catch (e: Exception) { + e.printStackTrace() + false + } + } else { + log.info { "Database $databaseName already exists." } + true + } + } + + return if (ok) toDatabase() else null + } + + override fun createTables(vararg tables: Table) { + transaction { + SchemaUtils.createMissingTablesAndColumns(*tables) + log.info { "Database transaction completed" } + } + } + + override fun createDatabaseStatement(): String { + return "CREATE DATABASE $databaseName" + } + + protected fun toDatabaseServerConnection(): Database { + return Database.connect( + toConnectionUrl(), + user = username, + password = password + ) + } + + fun toDatabase(): Database { + return Database.connect( + "${toConnectionUrl()}/$databaseName", + user = username, + password = password + ) + } + + override fun toConnectionUrl(): String { + return "jdbc:mysql://${toPortedAddress()}" + } + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt new file mode 100644 index 00000000..740275f7 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -0,0 +1,67 @@ +package no.iktdev.mediaprocessing.shared.common.datasource + +import org.jetbrains.exposed.sql.Table + +import org.jetbrains.exposed.sql.transactions.transaction + +open class TableDefaultOperations { + +} + +fun withTransaction(block: () -> T): T? { + return try { + transaction { + try { + block() + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + throw e // Optionally, you can rethrow the exception if needed + } + } + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + null + } +} + +fun insertWithSuccess(block: () -> T): Boolean { + return try { + transaction { + try { + block() + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + throw e // Optionally, you can rethrow the exception if needed + } + } + true + } catch (e: Exception) { + e.printStackTrace() + false + } +} + +fun executeWithStatus(block: () -> T): Boolean { + return try { + transaction { + try { + block() + } catch (e: Exception) { + e.printStackTrace() + // log the error here or handle the exception as needed + throw e // Optionally, you can rethrow the exception if needed + } + } + true + } catch (e: Exception) { + e.printStackTrace() + false + } +} + + + + diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/extended/FileExt.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/extended/FileExt.kt new file mode 100644 index 00000000..af8e0fb4 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/extended/FileExt.kt @@ -0,0 +1,100 @@ +package no.iktdev.mediaprocessing.shared.common.extended + +import java.io.File + +val validVideoFiles = listOf( + "mkv", + "avi", + "mp4", + "wmv", + "webm", + "mov" +) + +fun File.isSupportedVideoFile(): Boolean { + return this.isFile && validVideoFiles.contains(this.extension) +} + +fun getSanitizedFileName(name: String): String { + /** + * Modifies the input value and removes "[Text]" + * @param text "[TEST] Dummy - 01 [AZ 1080p] " + */ + fun removeBracketedText(text: String): String { + return Regex("\\[.*?]").replace(text, " ") + } + + /** + * + */ + fun removeParenthesizedText(text: String): String { + return Regex("\\(.*?\\)").replace(text, " ") + } + + /** + * + */ + fun removeResolutionAndTags(text: String): String { + return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ") + } + + fun removeInBetweenCharacters(text: String): String { + return Regex("[.]").replace(text, " ") + } + + /** + * @param text "example text with extra spaces" + * @return example text with extra spaces + */ + fun removeExtraWhiteSpace(text: String): String { + return Regex("\\s{2,}").replace(text, " ") + } + + return name + .let { removeBracketedText(it) } + .let { removeParenthesizedText(it) } + .let { removeResolutionAndTags(it) } + .let { removeInBetweenCharacters(it) } + .let { removeExtraWhiteSpace(it) } +} + + +fun File.getDesiredVideoFileName(): String? { + if (!this.isSupportedVideoFile()) return null + val cleanedFileName = getSanitizedFileName(this.nameWithoutExtension) + val parts = cleanedFileName.split(" - ") + return when { + parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> { + val title = parts[0] + val year = parts[1] + "$title ($year)" + } + + parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> { + val title = parts[0] + val episodeWithRevision = parts[2] + val episodeParts = episodeWithRevision.split("v", "V") + val episodeNumber = episodeParts[0].toInt() + val revisionNumber = episodeParts[1].toInt() + val seasonEpisode = + "S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}" + val episodeTitle = if (parts.size > 3) parts[3] else "" + "$title - $seasonEpisode - $episodeTitle" + } + + else -> cleanedFileName + }.trim() +} + +fun File.getGuessedVideoTitle(): String? { + val desiredFileName = getDesiredVideoFileName() ?: return null + val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE) + if (seasonRegex.containsMatchIn(desiredFileName)) { + return seasonRegex.replace(desiredFileName, "").trim() + } else { + val result = if (desiredFileName.contains(" - ")) { + return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName + } else desiredFileName + return result.trim() + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt new file mode 100644 index 00000000..62dac523 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/kafka/CoordinatorProducer.kt @@ -0,0 +1,24 @@ +package no.iktdev.mediaprocessing.shared.common.kafka + +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.streamit.library.kafka.dto.Status + +open class CoordinatorProducer(): DefaultProducer(SharedConfig.kafkaTopic) { + fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) { + super.sendMessage(event.event, Message( + referenceId = referenceId, + data = data + )) + } + fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) { + super.sendMessage(event.event, Message( + referenceId = referenceId, + eventId = eventId, + data = data + )) + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt new file mode 100644 index 00000000..86c93e27 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt @@ -0,0 +1,160 @@ +package no.iktdev.mediaprocessing.shared.common.parsing + +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MovieInfo +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfo + + +class FileNameDeterminate(val title: String, val sanitizedName: String, val ctype: ContentType = ContentType.UNDEFINED) { + + enum class ContentType { + MOVIE, + SERIE, + UNDEFINED + } + + fun getDeterminedVideoInfo(): VideoInfo? { + return when (ctype) { + ContentType.MOVIE -> determineMovieFileName() + ContentType.SERIE -> determineSerieFileName() + ContentType.UNDEFINED -> determineUndefinedFileName() + } + } + + private fun determineMovieFileName(): MovieInfo? { + val movieEx = MovieEx(title, sanitizedName) + val stripped = when { + movieEx.isDefinedWithYear() -> sanitizedName.replace(movieEx.yearRegex(), "").trim() + movieEx.doesContainMovieKeywords() -> sanitizedName.replace(Regex("(?i)\\s*\\(\\s*movie\\s*\\)\\s*"), "").trim() + else -> sanitizedName + } + val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped + return MovieInfo(cleanup(nonResolutioned), cleanup(nonResolutioned)) + } + + private fun determineSerieFileName(): EpisodeInfo? { + val serieEx = SerieEx(title, sanitizedName) + + val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName) + val episodeNumberSingle = serieEx.findEpisodeNumber() + + val seasonNumber = season ?: "1" + val episodeNumber = episode ?: (episodeNumberSingle ?: return null) + val seasonEpisodeCombined = serieEx.getSeasonEpisodeCombined(seasonNumber, episodeNumber) + val episodeTitle = serieEx.findEpisodeTitle() + + val useTitle = if (title == sanitizedName) { + if (title.contains(" - ")) { + title.split(" - ").firstOrNull() ?: title + } else { + val seasonNumberIndex = if (title.indexOf(seasonNumber) < 0) title.length -1 else title.indexOf(seasonNumber) + val episodeNumberIndex = if (title.indexOf(episodeNumber) < 0) title.length -1 else title.indexOf(episodeNumber) + val closest = listOf(seasonNumberIndex, episodeNumberIndex).min() + val shrunkenTitle = title.substring(0, closest) + if (closest - shrunkenTitle.lastIndexOf(" ") < 3) { + title.substring(0, shrunkenTitle.lastIndexOf(" ")) + } else title.substring(0, closest) + + } + } else title + val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() + return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) + } + + private fun determineUndefinedFileName(): VideoInfo? { + val serieEx = SerieEx(title, sanitizedName) + val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName) + val episodeNumber = serieEx.findEpisodeNumber() + return if ((sanitizedName.contains(" - ") && episodeNumber != null) || season != null || episode != null) { + determineSerieFileName() + } else { + determineMovieFileName() + } + } + + private fun cleanup(input: String): String { + val cleaned = Regex("(?<=\\w)[_.](?=\\w)").replace(input, " ") + return Regex("\\s{2,}").replace(cleaned, " ") + } + + open internal class Base(val title: String, val sanitizedName: String) { + fun getMatch(regex: String): String? { + return Regex(regex, RegexOption.IGNORE_CASE).find(sanitizedName)?.value + } + + fun removeResolutionAndBeyond(input: String): String? { + val removalValue = Regex("(i?)([0-9].*[pk]|[ ._-]+[UHD]+[ ._-])").find(input)?.value ?: return null + return input.substring(0, input.indexOf(removalValue)) + } + + fun yearRegex(): Regex { + return Regex("[ .(][0-9]{4}[ .)]") + } + } + + internal class MovieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) { + /** + * @return not null if matches " 2020 " or ".2020." + */ + fun isDefinedWithYear(): Boolean { + return getMatch(yearRegex().pattern)?.isNotBlank() ?: false + } + + /** + * Checks whether the filename contains the keyword movie, if so, default to movie + */ + fun doesContainMovieKeywords(): Boolean { + return getMatch("[(](?<=\\()movie(?=\\))[)]")?.isNotBlank() ?: false + } + } + + internal class SerieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) { + + fun getSeasonEpisodeCombined(season: String, episode: String): String { + return StringBuilder() + .append("S") + .append(if (season.length < 2) season.padStart(2, '0') else season) + .append("E") + .append(if (episode.length < 2) episode.padStart(2, '0') else episode) + .toString().trim() + } + + + /** + * Sjekken matcher tekst som dette: + * Cool - Season 1 Episode 13 + * Cool - s1e13 + * Cool - S1E13 + * Cool - S1 13 + */ + fun findSeasonAndEpisode(inputText: String): Pair { + val regex = Regex("""(?i)\b(?:S|Season)\s*(\d+).*?(?:E|Episode)?\s*(\d+)\b""") + val matchResult = regex.find(inputText) + val season = matchResult?.groups?.get(1)?.value + val episode = matchResult?.groups?.get(2)?.value + return season to episode + } + + fun findEpisodeNumber(): String? { + val regex = Regex("\\b(\\d+)\\b") + val matchResult = regex.find(sanitizedName) + return matchResult?.value?.trim() + } + + fun findEpisodeTitle(): String? { + val seCombo = findSeasonAndEpisode(sanitizedName) + val episodeNumber = findEpisodeNumber() + + val startPosition = if (seCombo.second != null) sanitizedName.indexOf(seCombo.second!!)+ seCombo.second!!.length + else if (episodeNumber != null) sanitizedName.indexOf(episodeNumber) + episodeNumber.length else 0 + val availableText = sanitizedName.substring(startPosition) + + val cleanedEpisodeTitle = availableText.replace(Regex("""(?i)\b(?:season|episode|ep)\b"""), "") + .replace(Regex("""^\s*-\s*"""), "") + .replace(Regex("""\s+"""), " ") + .trim() + + return cleanedEpisodeTitle + } + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameParser.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameParser.kt new file mode 100644 index 00000000..32a6c226 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameParser.kt @@ -0,0 +1,95 @@ +package no.iktdev.mediaprocessing.shared.common.parsing + +class FileNameParser(val fileName: String) { + var cleanedFileName: String + private set + + init { + cleanedFileName = fileName + .let { removeBracketedText(it) } + .let { removeParenthesizedText(it) } + .let { removeResolutionAndTags(it) } + .let { removeInBetweenCharacters(it) } + .let { removeExtraWhiteSpace(it) } + + } + + fun guessDesiredFileName(): String { + val parts = cleanedFileName.split(" - ") + return when { + parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> { + val title = parts[0] + val year = parts[1] + "$title ($year)" + } + + parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> { + val title = parts[0] + val episodeWithRevision = parts[2] + val episodeParts = episodeWithRevision.split("v", "V") + val episodeNumber = episodeParts[0].toInt() + val revisionNumber = episodeParts[1].toInt() + val seasonEpisode = + "S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}" + val episodeTitle = if (parts.size > 3) parts[3] else "" + "$title - $seasonEpisode - $episodeTitle" + } + + else -> cleanedFileName + }.trim() + } + + fun guessDesiredTitle(): String { + val desiredFileName = guessDesiredFileName() + val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE) + if (seasonRegex.containsMatchIn(desiredFileName)) { + return seasonRegex.replace(desiredFileName, "").trim() + } else { + val result = if (desiredFileName.contains(" - ")) { + return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName + } else desiredFileName + return result.trim() + } + } + + + /** + * Modifies the input value and removes "[Text]" + * @param text "[TEST] Dummy - 01 [AZ 1080p] " + */ + fun removeBracketedText(text: String): String { + return Regex("\\[.*?]").replace(text, " ") + } + + /** + * + */ + fun removeParenthesizedText(text: String): String { + return Regex("\\(.*?\\)").replace(text, " ") + } + + /** + * + */ + fun removeResolutionAndTags(text: String): String { + return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ") + } + + fun removeInBetweenCharacters(text: String): String { + return Regex("[.]").replace(text, " ") + } + + /** + * @param text "example text with extra spaces" + * @return example text with extra spaces + */ + fun removeExtraWhiteSpace(text: String): String { + return Regex("\\s{2,}").replace(text, " ") + } + + + private fun getMatch(regex: String): String? { + return Regex(regex).find(fileName)?.value ?: return null + } + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt new file mode 100644 index 00000000..6ce12f05 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataReader.kt @@ -0,0 +1,28 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import org.jetbrains.exposed.sql.SortOrder +import org.jetbrains.exposed.sql.select +import org.jetbrains.exposed.sql.selectAll + +class PersistentDataReader { + val dzz = DeserializingRegistry() + + fun getAllMessages(): List> { + val events = withTransaction { + events.selectAll() + .groupBy { it[events.referenceId] } + } + return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList() + } + + fun getMessagesFor(referenceId: String): List { + return withTransaction { + events.select { events.referenceId eq referenceId } + .orderBy(events.created, SortOrder.ASC) + .mapNotNull { fromRowToPersistentMessage(it, dzz) } + } ?: emptyList() + } + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt new file mode 100644 index 00000000..925373e6 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus +import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import org.jetbrains.exposed.sql.insert + +open class PersistentDataStore { + fun storeMessage(event: String, message: Message<*>): Boolean { + return executeWithStatus { + events.insert { + it[events.referenceId] = message.referenceId + it[events.eventId] = message.eventId + it[events.event] = event + it[events.data] = message.dataAsJson() + } + } + } + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt new file mode 100644 index 00000000..2a30d67f --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.jetbrains.exposed.sql.ResultRow +import java.time.LocalDateTime + +data class PersistentMessage( + val referenceId: String, + val eventId: String, + val event: KafkaEvents, + val data: MessageDataWrapper, + val created: LocalDateTime +) + +fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { + val kev = try { + KafkaEvents.valueOf(row[events.event]) + } catch (e: IllegalArgumentException) { + e.printStackTrace() + return null + } + val dzdata = dez.deserializeData(kev, row[events.data]) + return PersistentMessage( + referenceId = row[events.referenceId], + eventId = row[events.eventId], + event = kev, + data = dzdata, + created = row[events.created] + ) +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt new file mode 100644 index 00000000..4dcfaaf0 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt @@ -0,0 +1,19 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import org.jetbrains.exposed.dao.id.IntIdTable +import org.jetbrains.exposed.sql.Column +import org.jetbrains.exposed.sql.javatime.CurrentDateTime +import org.jetbrains.exposed.sql.javatime.datetime +import java.time.LocalDateTime + +object events: IntIdTable() { + val referenceId: Column = varchar("referenceId", 50) + val eventId: Column = varchar("eventId", 50) + val event: Column = varchar("event1",100) + val data: Column = text("data") + val created: Column = datetime("created").defaultExpression(CurrentDateTime) + + init { + uniqueIndex(referenceId, eventId, event) + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt new file mode 100644 index 00000000..77bf86e8 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/processerEvents.kt @@ -0,0 +1,10 @@ +package no.iktdev.mediaprocessing.shared.common.persistance + +import org.jetbrains.exposed.dao.id.IntIdTable +import org.jetbrains.exposed.sql.Column + +object processerEvents: IntIdTable() { + + val claimed: Column = bool("claimed") + val data: Column = text("data") +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/IRunner.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/IRunner.kt new file mode 100644 index 00000000..0dc998fa --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/IRunner.kt @@ -0,0 +1,13 @@ +package no.iktdev.mediaprocessing.shared.common.runner + +interface IRunner { + + fun onStarted() {} + + fun onOutputChanged(line: String) {} + + fun onEnded() {} + + fun onError(code: Int) + +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/ResultRunner.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/ResultRunner.kt new file mode 100644 index 00000000..7b411bdc --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/ResultRunner.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.shared.common.runner + +import com.github.pgreze.process.Redirect +import com.github.pgreze.process.process + +data class CodeToOutput( + val statusCode: Int, + val output: List +) + +suspend fun getOutputUsing(executable: String, vararg arguments: String): CodeToOutput { + val result: MutableList = mutableListOf() + val code = process(executable, *arguments, + stderr = Redirect.CAPTURE, + stdout = Redirect.CAPTURE, + consumer = { + result.add(it) + }).resultCode + return CodeToOutput(statusCode = code, result) +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt new file mode 100644 index 00000000..a38a716a --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/runner/Runner.kt @@ -0,0 +1,41 @@ +package no.iktdev.mediaprocessing.shared.common.runner + +import kotlinx.coroutines.Job +import kotlinx.coroutines.cancel +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.launch +import mu.KotlinLogging +import no.iktdev.exfl.coroutines.Coroutines + +open class Runner(open val executable: String, val daemonInterface: IRunner) { + private val logger = KotlinLogging.logger {} + + val scope = Coroutines.io() + var job: Job? = null + var executor: com.github.pgreze.process.ProcessResult? = null + open suspend fun run(parameters: List): Int { + daemonInterface.onStarted() + logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" } + job = scope.launch { + executor = com.github.pgreze.process.process(executable, *parameters.toTypedArray(), + stdout = com.github.pgreze.process.Redirect.CAPTURE, + stderr = com.github.pgreze.process.Redirect.CAPTURE, + consumer = { + daemonInterface.onOutputChanged(it) + }) + } + job?.join() + + val resultCode = executor?.resultCode ?: -1 + if (resultCode == 0) { + daemonInterface.onEnded() + } else daemonInterface.onError(resultCode) + logger.info { "$executable result: $resultCode" } + return resultCode + } + + suspend fun cancel() { + job?.cancelAndJoin() + scope.cancel("Cancel operation triggered!") + } +} \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt new file mode 100644 index 00000000..f96296e1 --- /dev/null +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/socket/SocketImplementation.kt @@ -0,0 +1,23 @@ +package no.iktdev.mediaprocessing.shared.common.socket + +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.simp.config.MessageBrokerRegistry +import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker +import org.springframework.web.socket.config.annotation.StompEndpointRegistry +import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer + +@Configuration +@EnableWebSocketMessageBroker +open class SocketImplementation: WebSocketMessageBrokerConfigurer { + + override fun registerStompEndpoints(registry: StompEndpointRegistry) { + registry.addEndpoint("/ws") + .setAllowedOrigins("*://localhost:*/*", "http://localhost:3000/") + .withSockJS() + } + + override fun configureMessageBroker(registry: MessageBrokerRegistry) { + registry.enableSimpleBroker("/topic") + registry.setApplicationDestinationPrefixes("/app") + } +} \ No newline at end of file diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt new file mode 100644 index 00000000..ab8b9f49 --- /dev/null +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/H2DataSource.kt @@ -0,0 +1,70 @@ +package no.iktdev.mediaprocessing.shared.common + +import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource +import org.h2.jdbcx.JdbcDataSource +import java.io.PrintWriter +import java.sql.Connection +import java.sql.SQLFeatureNotSupportedException +import java.util.logging.Logger +import javax.sql.DataSource + +class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password) { + companion object { + fun fromDatabaseEnv(): H2DataSource { + if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'") + return H2DataSource( + JdbcDataSource(), + databaseName = DatabaseConfig.database!!, + ) + } + } + override fun getConnection(): Connection { + return jdbcDataSource.connection + } + + override fun getConnection(username: String?, password: String?): Connection { + return jdbcDataSource.getConnection(username, password) + } + + override fun setLoginTimeout(seconds: Int) { + jdbcDataSource.loginTimeout = seconds + } + + override fun getLoginTimeout(): Int { + return jdbcDataSource.loginTimeout + } + + override fun getLogWriter(): PrintWriter? { + return jdbcDataSource.logWriter + } + + override fun setLogWriter(out: PrintWriter?) { + jdbcDataSource.logWriter = out + } + + override fun getParentLogger(): Logger? { + throw SQLFeatureNotSupportedException("getParentLogger is not supported") + } + + override fun unwrap(iface: Class?): T { + if (iface != null && iface.isAssignableFrom(this.javaClass)) { + return this as T + } + return jdbcDataSource.unwrap(iface) + } + + override fun isWrapperFor(iface: Class<*>?): Boolean { + if (iface != null && iface.isAssignableFrom(this.javaClass)) { + return true + } + return jdbcDataSource.isWrapperFor(iface) + } + + override fun createDatabaseStatement(): String { + return "CREATE SCHEMA $databaseName" + } + + override fun toConnectionUrl(): String { + return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;" + } +}