V2 update 2
This commit is contained in:
parent
729bb03b70
commit
57800a1fba
@ -0,0 +1,18 @@
|
||||
package no.iktdev.mediaprocessing.converter
|
||||
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
import org.springframework.context.ApplicationContext
|
||||
|
||||
@SpringBootApplication
|
||||
class ConvertApplication
|
||||
|
||||
private var context: ApplicationContext? = null
|
||||
@Suppress("unused")
|
||||
fun getContext(): ApplicationContext? {
|
||||
return context
|
||||
}
|
||||
fun main(args: Array<String>) {
|
||||
context = runApplication<ConvertApplication>(*args)
|
||||
}
|
||||
//private val logger = KotlinLogging.logger {}
|
||||
@ -0,0 +1,7 @@
|
||||
package no.iktdev.mediaprocessing.coordinator
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
|
||||
|
||||
/*class SocketImplemented: SocketImplementation() {
|
||||
|
||||
}*/
|
||||
3
apps/coordinator/src/test/kotlin/FlowITTest.kt
Normal file
3
apps/coordinator/src/test/kotlin/FlowITTest.kt
Normal file
@ -0,0 +1,3 @@
|
||||
class FlowITTest {
|
||||
//val h2 = H2DataSource()
|
||||
}
|
||||
72
apps/coordinator/src/test/kotlin/TestKafka.kt
Normal file
72
apps/coordinator/src/test/kotlin/TestKafka.kt
Normal file
@ -0,0 +1,72 @@
|
||||
import com.google.gson.Gson
|
||||
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import org.apache.kafka.clients.consumer.ConsumerConfig
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.apache.kafka.clients.producer.ProducerConfig
|
||||
import org.apache.kafka.common.serialization.StringDeserializer
|
||||
import org.apache.kafka.common.serialization.StringSerializer
|
||||
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
|
||||
import org.springframework.kafka.core.DefaultKafkaProducerFactory
|
||||
import org.springframework.kafka.core.KafkaTemplate
|
||||
import org.springframework.kafka.core.ProducerFactory
|
||||
|
||||
class TestKafka {
|
||||
companion object {
|
||||
private var listen: Boolean = false
|
||||
private val topic = "nan"
|
||||
private val gson = Gson()
|
||||
|
||||
val consumer = object : DefaultConsumer() {
|
||||
override fun consumerFactory(): DefaultKafkaConsumerFactory<String, String> {
|
||||
val config: MutableMap<String, Any> = HashMap()
|
||||
config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId"
|
||||
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
|
||||
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
|
||||
config[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = autoCommit
|
||||
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
|
||||
}
|
||||
}
|
||||
|
||||
val listener = object: DefaultMessageListener(topic, consumer) {
|
||||
override fun listen() {
|
||||
listen = true
|
||||
}
|
||||
}
|
||||
|
||||
val producer = object: CoordinatorProducer() {
|
||||
|
||||
val messages = mutableListOf<ConsumerRecord<String, String>>()
|
||||
|
||||
override fun usingKafkaTemplate(): KafkaTemplate<String, String> {
|
||||
val producerFactory: ProducerFactory<String, String> = DefaultKafkaProducerFactory(mapOf(
|
||||
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
|
||||
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java
|
||||
))
|
||||
return KafkaTemplate(producerFactory)
|
||||
}
|
||||
|
||||
|
||||
override fun sendMessage(key: String, message: Message<MessageDataWrapper>) {
|
||||
val mockRecord = ConsumerRecord(
|
||||
topic,
|
||||
0,
|
||||
messages.size.toLong(),
|
||||
key,
|
||||
gson.toJson(message)
|
||||
)
|
||||
if (listen) {
|
||||
messages.add(mockRecord)
|
||||
listener.onMessage(mockRecord)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
6
apps/coordinator/src/test/kotlin/TestMessageListener.kt
Normal file
6
apps/coordinator/src/test/kotlin/TestMessageListener.kt
Normal file
@ -0,0 +1,6 @@
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
||||
|
||||
class TestMessageListener: DefaultMessageListener("nan") {
|
||||
override fun listen() {
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
package no.iktdev.mediaprocessing.coordinator.reader
|
||||
|
||||
import TestKafka
|
||||
import no.iktdev.mediaprocessing.shared.contract.ProcessType
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
import org.assertj.core.api.Assertions.assertThat
|
||||
import org.junit.jupiter.api.Assertions.*
|
||||
import org.junit.jupiter.api.Test
|
||||
import java.io.File
|
||||
import java.util.UUID
|
||||
|
||||
class BaseInfoFromFileTest {
|
||||
val referenceId = UUID.randomUUID().toString()
|
||||
val baseInfoFromFile = BaseInfoFromFile(TestKafka.producer, TestKafka.listener)
|
||||
|
||||
@Test
|
||||
fun testReadFileInfo() {
|
||||
val input = ProcessStarted(Status.COMPLETED, ProcessType.FLOW,
|
||||
File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath
|
||||
)
|
||||
|
||||
val result = baseInfoFromFile.readFileInfo(input)
|
||||
assertThat(result).isInstanceOf(BaseInfoPerformed::class.java)
|
||||
val asResult = result as BaseInfoPerformed
|
||||
assertThat(result.status).isEqualTo(Status.COMPLETED)
|
||||
assertThat(asResult.title).isEqualTo("Kage no Jitsuryokusha ni Naritakute!")
|
||||
assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01")
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package no.iktdev.mediaprocessing.processer
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
//@Service
|
||||
class EncodeService {
|
||||
/*private val log = KotlinLogging.logger {}
|
||||
val io = Coroutines.io()
|
||||
|
||||
val producer = CoordinatorProducer()
|
||||
private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
|
||||
if (event.key == KafkaEvents.EVENT_WORK_ENCODE_CREATED) {
|
||||
|
||||
}
|
||||
}*/
|
||||
|
||||
}
|
||||
@ -0,0 +1,8 @@
|
||||
package no.iktdev.mediaprocessing.processer
|
||||
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class ExtractService {
|
||||
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package no.iktdev.mediaprocessing.processer
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||
import org.springframework.boot.runApplication
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
@SpringBootApplication
|
||||
class ProcesserApplication {
|
||||
}
|
||||
|
||||
fun main(args: Array<String>) {
|
||||
//val dataSource = MySqlDataSource.fromDatabaseEnv();
|
||||
val context = runApplication<ProcesserApplication>(*args)
|
||||
}
|
||||
|
||||
class SocketImplemented: SocketImplementation() {
|
||||
|
||||
}
|
||||
@ -0,0 +1,95 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.exfl.using
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.net.HttpURLConnection
|
||||
import java.net.URL
|
||||
|
||||
open class DownloadClient(val url: String, val outDir: File, val baseName: String) {
|
||||
protected val http: HttpURLConnection = openConnection()
|
||||
private val BUFFER_SIZE = 4096
|
||||
|
||||
private fun openConnection(): HttpURLConnection {
|
||||
try {
|
||||
return URL(url).openConnection() as HttpURLConnection
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
throw BadAddressException("Provided url is either not provided (null) or is not a valid http url")
|
||||
}
|
||||
}
|
||||
|
||||
protected fun getLength(): Int {
|
||||
return http.contentLength
|
||||
}
|
||||
|
||||
protected fun getProgress(read: Int, total: Int = getLength()): Int {
|
||||
return ((read * 100) / total)
|
||||
}
|
||||
|
||||
suspend fun download(): File? {
|
||||
val extension = getExtension()
|
||||
?: throw UnsupportedFormatException("Provided url does not contain a supported file extension")
|
||||
val outFile = outDir.using("$baseName.$extension")
|
||||
val inputStream = http.inputStream
|
||||
val fos = FileOutputStream(outFile, false)
|
||||
|
||||
var totalBytesRead = 0
|
||||
val buffer = ByteArray(BUFFER_SIZE)
|
||||
inputStream.apply {
|
||||
fos.use { fout ->
|
||||
run {
|
||||
var bytesRead = read(buffer)
|
||||
while (bytesRead >= 0) {
|
||||
fout.write(buffer, 0, bytesRead)
|
||||
totalBytesRead += bytesRead
|
||||
bytesRead = read(buffer)
|
||||
// System.out.println(getProgress(totalBytesRead))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
inputStream.close()
|
||||
fos.close()
|
||||
return outFile
|
||||
}
|
||||
|
||||
open fun getExtension(): String? {
|
||||
val possiblyExtension = url.lastIndexOf(".") + 1
|
||||
return if (possiblyExtension > 1) {
|
||||
return url.toString().substring(possiblyExtension)
|
||||
} else {
|
||||
val mimeType = http.contentType ?: null
|
||||
contentTypeToExtension()[mimeType]
|
||||
}
|
||||
}
|
||||
|
||||
open fun contentTypeToExtension(): Map<String, String> {
|
||||
return mapOf(
|
||||
"image/png" to "png",
|
||||
"image/jpeg" to "jpg",
|
||||
"image/webp" to "webp",
|
||||
"image/bmp" to "bmp",
|
||||
"image/tiff" to "tiff"
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
class BadAddressException : java.lang.Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
|
||||
class UnsupportedFormatException : Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
|
||||
class InvalidFileException : Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,54 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import com.google.gson.Gson
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.PreferenceDto
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
class Preference {
|
||||
|
||||
companion object {
|
||||
fun getPreference(): PreferenceDto {
|
||||
val preference = readPreferenceFromFile() ?: PreferenceDto()
|
||||
log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec }
|
||||
log.info { "[Audio]: Language = " + preference.encodePreference.audio.language }
|
||||
log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels }
|
||||
log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate }
|
||||
log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected }
|
||||
|
||||
log.info { "[Video]: Codec = " + preference.encodePreference.video.codec }
|
||||
log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat }
|
||||
log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") }
|
||||
log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold }
|
||||
|
||||
return preference
|
||||
}
|
||||
|
||||
private fun readPreferenceFromFile(): PreferenceDto? {
|
||||
val prefFile = SharedConfig.preference
|
||||
if (!prefFile.exists()) {
|
||||
log.info("Preference file: ${prefFile.absolutePath} does not exists...")
|
||||
log.info("Using default configuration")
|
||||
return null
|
||||
}
|
||||
else {
|
||||
log.info("Preference file: ${prefFile.absolutePath} found")
|
||||
}
|
||||
|
||||
return try {
|
||||
val instr = prefFile.inputStream()
|
||||
val text = instr.bufferedReader().use { it.readText() }
|
||||
Gson().fromJson(text, PreferenceDto::class.java)
|
||||
}
|
||||
catch (e: Exception) {
|
||||
log.error("Failed to read preference file: ${prefFile.absolutePath}.. Will use default configuration")
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@ -0,0 +1,14 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
abstract class ProcessingService(var producer: CoordinatorProducer, var listener: DefaultMessageListener) {
|
||||
val io = Coroutines.io()
|
||||
abstract fun onResult(referenceId: String, data: MessageDataWrapper)
|
||||
|
||||
}
|
||||
@ -0,0 +1,22 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import java.io.File
|
||||
|
||||
object SharedConfig {
|
||||
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
|
||||
var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input")
|
||||
val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")
|
||||
|
||||
val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe"
|
||||
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "no/iktdev/mediaprocessing/shared/contract/ffmpeg"
|
||||
|
||||
val preference: File = File("/data/config/preference.json")
|
||||
}
|
||||
|
||||
object DatabaseConfig {
|
||||
val address: String? = System.getenv("DATABASE_ADDRESS")
|
||||
val port: String? = System.getenv("DATABASE_PORT")
|
||||
val username: String? = System.getenv("DATABASE_USERNAME")
|
||||
val password: String? = System.getenv("DATABASE_PASSWORD")
|
||||
val database: String? = System.getenv("DATABASE_NAME")
|
||||
}
|
||||
@ -0,0 +1,21 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import mu.KotlinLogging
|
||||
import java.io.File
|
||||
import java.io.RandomAccessFile
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
fun isFileAvailable(file: File): Boolean {
|
||||
if (!file.exists()) return false
|
||||
var stream: RandomAccessFile? = null
|
||||
try {
|
||||
stream = RandomAccessFile(file, "rw")
|
||||
stream.close()
|
||||
logger.info { "File ${file.name} is read and writable" }
|
||||
return true
|
||||
} catch (e: Exception) {
|
||||
stream?.close()
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import java.time.Instant
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneId
|
||||
import java.time.ZoneOffset
|
||||
|
||||
abstract class DataSource(val databaseName: String, val address: String, val port: String?, val username: String, val password: String) {
|
||||
|
||||
abstract fun createDatabase(): Database?
|
||||
|
||||
abstract fun createTables(vararg tables: Table)
|
||||
|
||||
abstract fun createDatabaseStatement(): String
|
||||
|
||||
abstract fun toConnectionUrl(): String
|
||||
|
||||
fun toPortedAddress(): String {
|
||||
return if (!address.contains(":") && port?.isBlank() != true) {
|
||||
"$address:$port"
|
||||
} else address
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun timestampToLocalDateTime(timestamp: Int): LocalDateTime {
|
||||
return Instant.ofEpochSecond(timestamp.toLong()).atZone(ZoneId.systemDefault()).toLocalDateTime()
|
||||
}
|
||||
|
||||
fun LocalDateTime.toEpochSeconds(): Long {
|
||||
return this.toEpochSecond(ZoneOffset.ofTotalSeconds(ZoneOffset.systemDefault().rules.getOffset(LocalDateTime.now()).totalSeconds))
|
||||
}
|
||||
@ -0,0 +1,87 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.SchemaUtils
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
|
||||
open class MySqlDataSource(databaseName: String, address: String, port: String = "", username: String, password: String): DataSource(databaseName = databaseName, address = address, port = port, username = username, password = password) {
|
||||
val log = KotlinLogging.logger {}
|
||||
companion object {
|
||||
fun fromDatabaseEnv(): MySqlDataSource {
|
||||
if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'")
|
||||
if (DatabaseConfig.username.isNullOrBlank()) throw RuntimeException("Database username is not defined in 'DATABASE_USERNAME'")
|
||||
if (DatabaseConfig.address.isNullOrBlank()) throw RuntimeException("Database address is not defined in 'DATABASE_ADDRESS'")
|
||||
return MySqlDataSource(
|
||||
databaseName = DatabaseConfig.database,
|
||||
address = DatabaseConfig.address,
|
||||
port = DatabaseConfig.port ?: "",
|
||||
username = DatabaseConfig.username,
|
||||
password = DatabaseConfig.password ?: ""
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createDatabase(): Database? {
|
||||
val ok = transaction(toDatabaseServerConnection()) {
|
||||
val tmc = TransactionManager.current().connection
|
||||
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '$databaseName'"
|
||||
val stmt = tmc.prepareStatement(query, true)
|
||||
|
||||
val resultSet = stmt.executeQuery()
|
||||
val databaseExists = resultSet.next()
|
||||
|
||||
if (!databaseExists) {
|
||||
try {
|
||||
exec(createDatabaseStatement())
|
||||
log.info { "Database $databaseName created." }
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
} else {
|
||||
log.info { "Database $databaseName already exists." }
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
return if (ok) toDatabase() else null
|
||||
}
|
||||
|
||||
override fun createTables(vararg tables: Table) {
|
||||
transaction {
|
||||
SchemaUtils.createMissingTablesAndColumns(*tables)
|
||||
log.info { "Database transaction completed" }
|
||||
}
|
||||
}
|
||||
|
||||
override fun createDatabaseStatement(): String {
|
||||
return "CREATE DATABASE $databaseName"
|
||||
}
|
||||
|
||||
protected fun toDatabaseServerConnection(): Database {
|
||||
return Database.connect(
|
||||
toConnectionUrl(),
|
||||
user = username,
|
||||
password = password
|
||||
)
|
||||
}
|
||||
|
||||
fun toDatabase(): Database {
|
||||
return Database.connect(
|
||||
"${toConnectionUrl()}/$databaseName",
|
||||
user = username,
|
||||
password = password
|
||||
)
|
||||
}
|
||||
|
||||
override fun toConnectionUrl(): String {
|
||||
return "jdbc:mysql://${toPortedAddress()}"
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,67 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.datasource
|
||||
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
open class TableDefaultOperations<T: Table> {
|
||||
|
||||
}
|
||||
|
||||
fun <T> withTransaction(block: () -> T): T? {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> insertWithSuccess(block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeWithStatus(block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,100 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.extended
|
||||
|
||||
import java.io.File
|
||||
|
||||
val validVideoFiles = listOf(
|
||||
"mkv",
|
||||
"avi",
|
||||
"mp4",
|
||||
"wmv",
|
||||
"webm",
|
||||
"mov"
|
||||
)
|
||||
|
||||
fun File.isSupportedVideoFile(): Boolean {
|
||||
return this.isFile && validVideoFiles.contains(this.extension)
|
||||
}
|
||||
|
||||
fun getSanitizedFileName(name: String): String {
|
||||
/**
|
||||
* Modifies the input value and removes "[Text]"
|
||||
* @param text "[TEST] Dummy - 01 [AZ 1080p] "
|
||||
*/
|
||||
fun removeBracketedText(text: String): String {
|
||||
return Regex("\\[.*?]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeParenthesizedText(text: String): String {
|
||||
return Regex("\\(.*?\\)").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeResolutionAndTags(text: String): String {
|
||||
return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ")
|
||||
}
|
||||
|
||||
fun removeInBetweenCharacters(text: String): String {
|
||||
return Regex("[.]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
* @param text "example text with extra spaces"
|
||||
* @return example text with extra spaces
|
||||
*/
|
||||
fun removeExtraWhiteSpace(text: String): String {
|
||||
return Regex("\\s{2,}").replace(text, " ")
|
||||
}
|
||||
|
||||
return name
|
||||
.let { removeBracketedText(it) }
|
||||
.let { removeParenthesizedText(it) }
|
||||
.let { removeResolutionAndTags(it) }
|
||||
.let { removeInBetweenCharacters(it) }
|
||||
.let { removeExtraWhiteSpace(it) }
|
||||
}
|
||||
|
||||
|
||||
fun File.getDesiredVideoFileName(): String? {
|
||||
if (!this.isSupportedVideoFile()) return null
|
||||
val cleanedFileName = getSanitizedFileName(this.nameWithoutExtension)
|
||||
val parts = cleanedFileName.split(" - ")
|
||||
return when {
|
||||
parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> {
|
||||
val title = parts[0]
|
||||
val year = parts[1]
|
||||
"$title ($year)"
|
||||
}
|
||||
|
||||
parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> {
|
||||
val title = parts[0]
|
||||
val episodeWithRevision = parts[2]
|
||||
val episodeParts = episodeWithRevision.split("v", "V")
|
||||
val episodeNumber = episodeParts[0].toInt()
|
||||
val revisionNumber = episodeParts[1].toInt()
|
||||
val seasonEpisode =
|
||||
"S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}"
|
||||
val episodeTitle = if (parts.size > 3) parts[3] else ""
|
||||
"$title - $seasonEpisode - $episodeTitle"
|
||||
}
|
||||
|
||||
else -> cleanedFileName
|
||||
}.trim()
|
||||
}
|
||||
|
||||
fun File.getGuessedVideoTitle(): String? {
|
||||
val desiredFileName = getDesiredVideoFileName() ?: return null
|
||||
val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE)
|
||||
if (seasonRegex.containsMatchIn(desiredFileName)) {
|
||||
return seasonRegex.replace(desiredFileName, "").trim()
|
||||
} else {
|
||||
val result = if (desiredFileName.contains(" - ")) {
|
||||
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
||||
} else desiredFileName
|
||||
return result.trim()
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.kafka
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
|
||||
open class CoordinatorProducer(): DefaultProducer(SharedConfig.kafkaTopic) {
|
||||
fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) {
|
||||
super.sendMessage(event.event, Message(
|
||||
referenceId = referenceId,
|
||||
data = data
|
||||
))
|
||||
}
|
||||
fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) {
|
||||
super.sendMessage(event.event, Message(
|
||||
referenceId = referenceId,
|
||||
eventId = eventId,
|
||||
data = data
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,160 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.parsing
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MovieInfo
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfo
|
||||
|
||||
|
||||
class FileNameDeterminate(val title: String, val sanitizedName: String, val ctype: ContentType = ContentType.UNDEFINED) {
|
||||
|
||||
enum class ContentType {
|
||||
MOVIE,
|
||||
SERIE,
|
||||
UNDEFINED
|
||||
}
|
||||
|
||||
fun getDeterminedVideoInfo(): VideoInfo? {
|
||||
return when (ctype) {
|
||||
ContentType.MOVIE -> determineMovieFileName()
|
||||
ContentType.SERIE -> determineSerieFileName()
|
||||
ContentType.UNDEFINED -> determineUndefinedFileName()
|
||||
}
|
||||
}
|
||||
|
||||
private fun determineMovieFileName(): MovieInfo? {
|
||||
val movieEx = MovieEx(title, sanitizedName)
|
||||
val stripped = when {
|
||||
movieEx.isDefinedWithYear() -> sanitizedName.replace(movieEx.yearRegex(), "").trim()
|
||||
movieEx.doesContainMovieKeywords() -> sanitizedName.replace(Regex("(?i)\\s*\\(\\s*movie\\s*\\)\\s*"), "").trim()
|
||||
else -> sanitizedName
|
||||
}
|
||||
val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped
|
||||
return MovieInfo(cleanup(nonResolutioned), cleanup(nonResolutioned))
|
||||
}
|
||||
|
||||
private fun determineSerieFileName(): EpisodeInfo? {
|
||||
val serieEx = SerieEx(title, sanitizedName)
|
||||
|
||||
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumberSingle = serieEx.findEpisodeNumber()
|
||||
|
||||
val seasonNumber = season ?: "1"
|
||||
val episodeNumber = episode ?: (episodeNumberSingle ?: return null)
|
||||
val seasonEpisodeCombined = serieEx.getSeasonEpisodeCombined(seasonNumber, episodeNumber)
|
||||
val episodeTitle = serieEx.findEpisodeTitle()
|
||||
|
||||
val useTitle = if (title == sanitizedName) {
|
||||
if (title.contains(" - ")) {
|
||||
title.split(" - ").firstOrNull() ?: title
|
||||
} else {
|
||||
val seasonNumberIndex = if (title.indexOf(seasonNumber) < 0) title.length -1 else title.indexOf(seasonNumber)
|
||||
val episodeNumberIndex = if (title.indexOf(episodeNumber) < 0) title.length -1 else title.indexOf(episodeNumber)
|
||||
val closest = listOf<Int>(seasonNumberIndex, episodeNumberIndex).min()
|
||||
val shrunkenTitle = title.substring(0, closest)
|
||||
if (closest - shrunkenTitle.lastIndexOf(" ") < 3) {
|
||||
title.substring(0, shrunkenTitle.lastIndexOf(" "))
|
||||
} else title.substring(0, closest)
|
||||
|
||||
}
|
||||
} else title
|
||||
val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim()
|
||||
return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName)
|
||||
}
|
||||
|
||||
private fun determineUndefinedFileName(): VideoInfo? {
|
||||
val serieEx = SerieEx(title, sanitizedName)
|
||||
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumber = serieEx.findEpisodeNumber()
|
||||
return if ((sanitizedName.contains(" - ") && episodeNumber != null) || season != null || episode != null) {
|
||||
determineSerieFileName()
|
||||
} else {
|
||||
determineMovieFileName()
|
||||
}
|
||||
}
|
||||
|
||||
private fun cleanup(input: String): String {
|
||||
val cleaned = Regex("(?<=\\w)[_.](?=\\w)").replace(input, " ")
|
||||
return Regex("\\s{2,}").replace(cleaned, " ")
|
||||
}
|
||||
|
||||
open internal class Base(val title: String, val sanitizedName: String) {
|
||||
fun getMatch(regex: String): String? {
|
||||
return Regex(regex, RegexOption.IGNORE_CASE).find(sanitizedName)?.value
|
||||
}
|
||||
|
||||
fun removeResolutionAndBeyond(input: String): String? {
|
||||
val removalValue = Regex("(i?)([0-9].*[pk]|[ ._-]+[UHD]+[ ._-])").find(input)?.value ?: return null
|
||||
return input.substring(0, input.indexOf(removalValue))
|
||||
}
|
||||
|
||||
fun yearRegex(): Regex {
|
||||
return Regex("[ .(][0-9]{4}[ .)]")
|
||||
}
|
||||
}
|
||||
|
||||
internal class MovieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||
/**
|
||||
* @return not null if matches " 2020 " or ".2020."
|
||||
*/
|
||||
fun isDefinedWithYear(): Boolean {
|
||||
return getMatch(yearRegex().pattern)?.isNotBlank() ?: false
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the filename contains the keyword movie, if so, default to movie
|
||||
*/
|
||||
fun doesContainMovieKeywords(): Boolean {
|
||||
return getMatch("[(](?<=\\()movie(?=\\))[)]")?.isNotBlank() ?: false
|
||||
}
|
||||
}
|
||||
|
||||
internal class SerieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||
|
||||
fun getSeasonEpisodeCombined(season: String, episode: String): String {
|
||||
return StringBuilder()
|
||||
.append("S")
|
||||
.append(if (season.length < 2) season.padStart(2, '0') else season)
|
||||
.append("E")
|
||||
.append(if (episode.length < 2) episode.padStart(2, '0') else episode)
|
||||
.toString().trim()
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sjekken matcher tekst som dette:
|
||||
* Cool - Season 1 Episode 13
|
||||
* Cool - s1e13
|
||||
* Cool - S1E13
|
||||
* Cool - S1 13
|
||||
*/
|
||||
fun findSeasonAndEpisode(inputText: String): Pair<String?, String?> {
|
||||
val regex = Regex("""(?i)\b(?:S|Season)\s*(\d+).*?(?:E|Episode)?\s*(\d+)\b""")
|
||||
val matchResult = regex.find(inputText)
|
||||
val season = matchResult?.groups?.get(1)?.value
|
||||
val episode = matchResult?.groups?.get(2)?.value
|
||||
return season to episode
|
||||
}
|
||||
|
||||
fun findEpisodeNumber(): String? {
|
||||
val regex = Regex("\\b(\\d+)\\b")
|
||||
val matchResult = regex.find(sanitizedName)
|
||||
return matchResult?.value?.trim()
|
||||
}
|
||||
|
||||
fun findEpisodeTitle(): String? {
|
||||
val seCombo = findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumber = findEpisodeNumber()
|
||||
|
||||
val startPosition = if (seCombo.second != null) sanitizedName.indexOf(seCombo.second!!)+ seCombo.second!!.length
|
||||
else if (episodeNumber != null) sanitizedName.indexOf(episodeNumber) + episodeNumber.length else 0
|
||||
val availableText = sanitizedName.substring(startPosition)
|
||||
|
||||
val cleanedEpisodeTitle = availableText.replace(Regex("""(?i)\b(?:season|episode|ep)\b"""), "")
|
||||
.replace(Regex("""^\s*-\s*"""), "")
|
||||
.replace(Regex("""\s+"""), " ")
|
||||
.trim()
|
||||
|
||||
return cleanedEpisodeTitle
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,95 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.parsing
|
||||
|
||||
class FileNameParser(val fileName: String) {
|
||||
var cleanedFileName: String
|
||||
private set
|
||||
|
||||
init {
|
||||
cleanedFileName = fileName
|
||||
.let { removeBracketedText(it) }
|
||||
.let { removeParenthesizedText(it) }
|
||||
.let { removeResolutionAndTags(it) }
|
||||
.let { removeInBetweenCharacters(it) }
|
||||
.let { removeExtraWhiteSpace(it) }
|
||||
|
||||
}
|
||||
|
||||
fun guessDesiredFileName(): String {
|
||||
val parts = cleanedFileName.split(" - ")
|
||||
return when {
|
||||
parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> {
|
||||
val title = parts[0]
|
||||
val year = parts[1]
|
||||
"$title ($year)"
|
||||
}
|
||||
|
||||
parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> {
|
||||
val title = parts[0]
|
||||
val episodeWithRevision = parts[2]
|
||||
val episodeParts = episodeWithRevision.split("v", "V")
|
||||
val episodeNumber = episodeParts[0].toInt()
|
||||
val revisionNumber = episodeParts[1].toInt()
|
||||
val seasonEpisode =
|
||||
"S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}"
|
||||
val episodeTitle = if (parts.size > 3) parts[3] else ""
|
||||
"$title - $seasonEpisode - $episodeTitle"
|
||||
}
|
||||
|
||||
else -> cleanedFileName
|
||||
}.trim()
|
||||
}
|
||||
|
||||
fun guessDesiredTitle(): String {
|
||||
val desiredFileName = guessDesiredFileName()
|
||||
val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE)
|
||||
if (seasonRegex.containsMatchIn(desiredFileName)) {
|
||||
return seasonRegex.replace(desiredFileName, "").trim()
|
||||
} else {
|
||||
val result = if (desiredFileName.contains(" - ")) {
|
||||
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
||||
} else desiredFileName
|
||||
return result.trim()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Modifies the input value and removes "[Text]"
|
||||
* @param text "[TEST] Dummy - 01 [AZ 1080p] "
|
||||
*/
|
||||
fun removeBracketedText(text: String): String {
|
||||
return Regex("\\[.*?]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeParenthesizedText(text: String): String {
|
||||
return Regex("\\(.*?\\)").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeResolutionAndTags(text: String): String {
|
||||
return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ")
|
||||
}
|
||||
|
||||
fun removeInBetweenCharacters(text: String): String {
|
||||
return Regex("[.]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
* @param text "example text with extra spaces"
|
||||
* @return example text with extra spaces
|
||||
*/
|
||||
fun removeExtraWhiteSpace(text: String): String {
|
||||
return Regex("\\s{2,}").replace(text, " ")
|
||||
}
|
||||
|
||||
|
||||
private fun getMatch(regex: String): String? {
|
||||
return Regex(regex).find(fileName)?.value ?: return null
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,28 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
|
||||
import org.jetbrains.exposed.sql.SortOrder
|
||||
import org.jetbrains.exposed.sql.select
|
||||
import org.jetbrains.exposed.sql.selectAll
|
||||
|
||||
class PersistentDataReader {
|
||||
val dzz = DeserializingRegistry()
|
||||
|
||||
fun getAllMessages(): List<List<PersistentMessage>> {
|
||||
val events = withTransaction {
|
||||
events.selectAll()
|
||||
.groupBy { it[events.referenceId] }
|
||||
}
|
||||
return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList()
|
||||
}
|
||||
|
||||
fun getMessagesFor(referenceId: String): List<PersistentMessage> {
|
||||
return withTransaction {
|
||||
events.select { events.referenceId eq referenceId }
|
||||
.orderBy(events.created, SortOrder.ASC)
|
||||
.mapNotNull { fromRowToPersistentMessage(it, dzz) }
|
||||
} ?: emptyList()
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
|
||||
open class PersistentDataStore {
|
||||
fun storeMessage(event: String, message: Message<*>): Boolean {
|
||||
return executeWithStatus {
|
||||
events.insert {
|
||||
it[events.referenceId] = message.referenceId
|
||||
it[events.eventId] = message.eventId
|
||||
it[events.event] = event
|
||||
it[events.data] = message.dataAsJson()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,32 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import java.time.LocalDateTime
|
||||
|
||||
data class PersistentMessage(
|
||||
val referenceId: String,
|
||||
val eventId: String,
|
||||
val event: KafkaEvents,
|
||||
val data: MessageDataWrapper,
|
||||
val created: LocalDateTime
|
||||
)
|
||||
|
||||
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
|
||||
val kev = try {
|
||||
KafkaEvents.valueOf(row[events.event])
|
||||
} catch (e: IllegalArgumentException) {
|
||||
e.printStackTrace()
|
||||
return null
|
||||
}
|
||||
val dzdata = dez.deserializeData(kev, row[events.data])
|
||||
return PersistentMessage(
|
||||
referenceId = row[events.referenceId],
|
||||
eventId = row[events.eventId],
|
||||
event = kev,
|
||||
data = dzdata,
|
||||
created = row[events.created]
|
||||
)
|
||||
}
|
||||
@ -0,0 +1,19 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||
import org.jetbrains.exposed.sql.Column
|
||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
||||
import org.jetbrains.exposed.sql.javatime.datetime
|
||||
import java.time.LocalDateTime
|
||||
|
||||
object events: IntIdTable() {
|
||||
val referenceId: Column<String> = varchar("referenceId", 50)
|
||||
val eventId: Column<String> = varchar("eventId", 50)
|
||||
val event: Column<String> = varchar("event1",100)
|
||||
val data: Column<String> = text("data")
|
||||
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
||||
|
||||
init {
|
||||
uniqueIndex(referenceId, eventId, event)
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,10 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.persistance
|
||||
|
||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||
import org.jetbrains.exposed.sql.Column
|
||||
|
||||
object processerEvents: IntIdTable() {
|
||||
|
||||
val claimed: Column<Boolean> = bool("claimed")
|
||||
val data: Column<String> = text("data")
|
||||
}
|
||||
@ -0,0 +1,13 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.runner
|
||||
|
||||
interface IRunner {
|
||||
|
||||
fun onStarted() {}
|
||||
|
||||
fun onOutputChanged(line: String) {}
|
||||
|
||||
fun onEnded() {}
|
||||
|
||||
fun onError(code: Int)
|
||||
|
||||
}
|
||||
@ -0,0 +1,20 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.runner
|
||||
|
||||
import com.github.pgreze.process.Redirect
|
||||
import com.github.pgreze.process.process
|
||||
|
||||
data class CodeToOutput(
|
||||
val statusCode: Int,
|
||||
val output: List<String>
|
||||
)
|
||||
|
||||
suspend fun getOutputUsing(executable: String, vararg arguments: String): CodeToOutput {
|
||||
val result: MutableList<String> = mutableListOf()
|
||||
val code = process(executable, *arguments,
|
||||
stderr = Redirect.CAPTURE,
|
||||
stdout = Redirect.CAPTURE,
|
||||
consumer = {
|
||||
result.add(it)
|
||||
}).resultCode
|
||||
return CodeToOutput(statusCode = code, result)
|
||||
}
|
||||
@ -0,0 +1,41 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.runner
|
||||
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
|
||||
open class Runner(open val executable: String, val daemonInterface: IRunner) {
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
val scope = Coroutines.io()
|
||||
var job: Job? = null
|
||||
var executor: com.github.pgreze.process.ProcessResult? = null
|
||||
open suspend fun run(parameters: List<String>): Int {
|
||||
daemonInterface.onStarted()
|
||||
logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" }
|
||||
job = scope.launch {
|
||||
executor = com.github.pgreze.process.process(executable, *parameters.toTypedArray(),
|
||||
stdout = com.github.pgreze.process.Redirect.CAPTURE,
|
||||
stderr = com.github.pgreze.process.Redirect.CAPTURE,
|
||||
consumer = {
|
||||
daemonInterface.onOutputChanged(it)
|
||||
})
|
||||
}
|
||||
job?.join()
|
||||
|
||||
val resultCode = executor?.resultCode ?: -1
|
||||
if (resultCode == 0) {
|
||||
daemonInterface.onEnded()
|
||||
} else daemonInterface.onError(resultCode)
|
||||
logger.info { "$executable result: $resultCode" }
|
||||
return resultCode
|
||||
}
|
||||
|
||||
suspend fun cancel() {
|
||||
job?.cancelAndJoin()
|
||||
scope.cancel("Cancel operation triggered!")
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.socket
|
||||
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
|
||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
|
||||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
|
||||
|
||||
@Configuration
|
||||
@EnableWebSocketMessageBroker
|
||||
open class SocketImplementation: WebSocketMessageBrokerConfigurer {
|
||||
|
||||
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
|
||||
registry.addEndpoint("/ws")
|
||||
.setAllowedOrigins("*://localhost:*/*", "http://localhost:3000/")
|
||||
.withSockJS()
|
||||
}
|
||||
|
||||
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
|
||||
registry.enableSimpleBroker("/topic")
|
||||
registry.setApplicationDestinationPrefixes("/app")
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,70 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
|
||||
import org.h2.jdbcx.JdbcDataSource
|
||||
import java.io.PrintWriter
|
||||
import java.sql.Connection
|
||||
import java.sql.SQLFeatureNotSupportedException
|
||||
import java.util.logging.Logger
|
||||
import javax.sql.DataSource
|
||||
|
||||
class H2DataSource(private val jdbcDataSource: JdbcDataSource, databaseName: String) : DataSource, MySqlDataSource(databaseName = databaseName, address = jdbcDataSource.getUrl(), username = jdbcDataSource.user, password = jdbcDataSource.password) {
|
||||
companion object {
|
||||
fun fromDatabaseEnv(): H2DataSource {
|
||||
if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'")
|
||||
return H2DataSource(
|
||||
JdbcDataSource(),
|
||||
databaseName = DatabaseConfig.database!!,
|
||||
)
|
||||
}
|
||||
}
|
||||
override fun getConnection(): Connection {
|
||||
return jdbcDataSource.connection
|
||||
}
|
||||
|
||||
override fun getConnection(username: String?, password: String?): Connection {
|
||||
return jdbcDataSource.getConnection(username, password)
|
||||
}
|
||||
|
||||
override fun setLoginTimeout(seconds: Int) {
|
||||
jdbcDataSource.loginTimeout = seconds
|
||||
}
|
||||
|
||||
override fun getLoginTimeout(): Int {
|
||||
return jdbcDataSource.loginTimeout
|
||||
}
|
||||
|
||||
override fun getLogWriter(): PrintWriter? {
|
||||
return jdbcDataSource.logWriter
|
||||
}
|
||||
|
||||
override fun setLogWriter(out: PrintWriter?) {
|
||||
jdbcDataSource.logWriter = out
|
||||
}
|
||||
|
||||
override fun getParentLogger(): Logger? {
|
||||
throw SQLFeatureNotSupportedException("getParentLogger is not supported")
|
||||
}
|
||||
|
||||
override fun <T : Any?> unwrap(iface: Class<T>?): T {
|
||||
if (iface != null && iface.isAssignableFrom(this.javaClass)) {
|
||||
return this as T
|
||||
}
|
||||
return jdbcDataSource.unwrap(iface)
|
||||
}
|
||||
|
||||
override fun isWrapperFor(iface: Class<*>?): Boolean {
|
||||
if (iface != null && iface.isAssignableFrom(this.javaClass)) {
|
||||
return true
|
||||
}
|
||||
return jdbcDataSource.isWrapperFor(iface)
|
||||
}
|
||||
|
||||
override fun createDatabaseStatement(): String {
|
||||
return "CREATE SCHEMA $databaseName"
|
||||
}
|
||||
|
||||
override fun toConnectionUrl(): String {
|
||||
return "jdbc:h2:mem:test;MODE=MySQL;DB_CLOSE_DELAY=-1;"
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user