Cleanup
This commit is contained in:
parent
57800a1fba
commit
dd214c8ff9
2
.idea/vcs.xml
generated
2
.idea/vcs.xml
generated
@ -1,6 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="" vcs="Git" />
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
||||
@ -1,47 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka
|
||||
|
||||
import com.google.gson.Gson
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.content.common.DefaultKafkaReader
|
||||
import no.iktdev.streamit.content.ui.kafka.converter.EventDataConverter
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.listener.ManualAcknowledgeMessageListener
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.kafka.listener.ContainerProperties
|
||||
import org.springframework.stereotype.Service
|
||||
|
||||
@Service
|
||||
class EventConsumer: DefaultKafkaReader() {
|
||||
@Autowired private lateinit var converter: EventDataConverter
|
||||
companion object {
|
||||
val idAndEvents: MutableMap<String, MutableMap<String, Message>> = mutableMapOf()
|
||||
}
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
|
||||
private final val listener = object : ManualAcknowledgeMessageListener(
|
||||
topic = CommonConfig.kafkaTopic,
|
||||
consumer = defaultConsumer,
|
||||
accepts = listOf()
|
||||
) {
|
||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||
applyUpdate(data.value().referenceId, data.key(), data.value())
|
||||
log.info { data.key() + Gson().toJson(data.value()) }
|
||||
converter.convertEventToObject(data.value().referenceId)
|
||||
}
|
||||
}
|
||||
|
||||
private fun applyUpdate(referenceId: String, eventKey: String, value: Message) {
|
||||
val existingData = idAndEvents[referenceId] ?: mutableMapOf()
|
||||
existingData[eventKey] = value
|
||||
idAndEvents[referenceId] = existingData
|
||||
}
|
||||
|
||||
init {
|
||||
defaultConsumer.autoCommit = false
|
||||
defaultConsumer.ackModeOverride = ContainerProperties.AckMode.MANUAL
|
||||
listener.listen()
|
||||
}
|
||||
}
|
||||
@ -1,33 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.content.ui.memActiveEventMap
|
||||
import no.iktdev.streamit.content.ui.kafka.EventConsumer
|
||||
import no.iktdev.streamit.content.ui.memSimpleConvertedEventsMap
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
@Component
|
||||
class EventDataConverter {
|
||||
|
||||
@Autowired private lateinit var detailsConverter: EventDataDetailsSubConverter
|
||||
@Autowired private lateinit var encodeConverter: EventDataEncodeSubConverter
|
||||
@Autowired private lateinit var metadataConverter: EventDataMetadataSubConverter
|
||||
@Autowired private lateinit var fileNameConverter: EventDataFilenameAndTypeDeterminerSubConverter
|
||||
|
||||
fun convertEventToObject(eventReferenceId: String) {
|
||||
val data = memActiveEventMap[eventReferenceId] ?: EventDataObject(id = eventReferenceId)
|
||||
val collection = EventConsumer.idAndEvents[eventReferenceId] ?: emptyMap<String, Message>()
|
||||
|
||||
detailsConverter.convertAndUpdate(data, collection.toMap())
|
||||
encodeConverter.convertAndUpdate(data, collection.toMap())
|
||||
metadataConverter.convertAndUpdate(data, collection.toMap())
|
||||
fileNameConverter.convertAndUpdate(data, collection.toMap())
|
||||
|
||||
|
||||
memActiveEventMap[eventReferenceId] = data
|
||||
memSimpleConvertedEventsMap[eventReferenceId] = data.toSimple()
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,31 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.reader.FileResult
|
||||
import no.iktdev.streamit.content.ui.dto.Details
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
@Component
|
||||
class EventDataDetailsSubConverter : EventDataSubConverterBase() {
|
||||
override fun convertEvents(eventData: EventDataObject, events: Map<String, Message>) {
|
||||
val event = events.entries
|
||||
.asSequence()
|
||||
.filter { it.key == KafkaEvents.EVENT_READER_RECEIVED_FILE.event }
|
||||
.filter { it.value.isSuccessful() }
|
||||
.map { DeserializerRegistry.getDeserializerForEvent(it.key)?.deserialize(it.value) }
|
||||
.filterIsInstance<FileResult>()
|
||||
.lastOrNull() ?: return
|
||||
|
||||
val deserialized = Details(
|
||||
name = event.title,
|
||||
file = event.file,
|
||||
sanitizedName = event.sanitizedName
|
||||
)
|
||||
eventData.details = deserialized
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -1,56 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.State
|
||||
import no.iktdev.streamit.content.common.dto.reader.work.EncodeWork
|
||||
import no.iktdev.streamit.content.ui.dto.Encode
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.content.ui.dto.IO
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
@Component
|
||||
class EventDataEncodeSubConverter : EventDataSubConverterBase() {
|
||||
override fun convertEvents(eventData: EventDataObject, events: Map<String, Message>) {
|
||||
val filteredEvents = events.entries
|
||||
.asSequence()
|
||||
.filter {
|
||||
listOf(
|
||||
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_QUEUED.event,
|
||||
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event,
|
||||
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event
|
||||
).contains(it.key)
|
||||
}
|
||||
|
||||
val event = filteredEvents
|
||||
.map { DeserializerRegistry.getDeserializerForEvent(it.key)?.deserialize(it.value) }
|
||||
.filterIsInstance<EncodeWork>()
|
||||
.lastOrNull() ?: return
|
||||
|
||||
event.let {
|
||||
eventData.details?.apply { this.collection = it.collection }
|
||||
}
|
||||
|
||||
eventData.io = IO(event.inFile, event.outFile)
|
||||
eventData.encode =
|
||||
if (eventData.encode != null)
|
||||
eventData.encode?.apply { state = getState(filteredEvents).name }
|
||||
else
|
||||
Encode(state = getState(filteredEvents).name)
|
||||
|
||||
}
|
||||
|
||||
private fun getState(events: Sequence<Map.Entry<String, Message>>): State {
|
||||
val last = events.lastOrNull()
|
||||
?: return State.QUEUED
|
||||
if (!last.value.isSuccessful()) return State.FAILURE
|
||||
return when (last.key) {
|
||||
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_STARTED.event -> State.STARTED
|
||||
KafkaEvents.EVENT_ENCODER_VIDEO_FILE_ENDED.event -> State.ENDED
|
||||
else -> State.QUEUED
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,40 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.streamit.content.common.deserializers.DeserializerRegistry
|
||||
import no.iktdev.streamit.content.common.dto.ContentOutName
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
@Component
|
||||
class EventDataFilenameAndTypeDeterminerSubConverter : EventDataSubConverterBase() {
|
||||
override fun convertEvents(eventData: EventDataObject, events: Map<String, Message>) {
|
||||
|
||||
val convertedFileNameEvent = events.entries
|
||||
.asSequence()
|
||||
.filter { it.key == KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event }
|
||||
.filter { it.value.isSuccessful() }
|
||||
.map { DeserializerRegistry.getDeserializerForEvent(it.key)?.deserialize(it.value) }
|
||||
.filterIsInstance<ContentOutName>()
|
||||
.lastOrNull() ?: return
|
||||
|
||||
val convertedType = events.entries
|
||||
.asSequence()
|
||||
.filter { it -> listOf(KafkaEvents.EVENT_READER_DETERMINED_SERIE.event, KafkaEvents.EVENT_READER_DETERMINED_MOVIE.event).contains(it.key) }
|
||||
.lastOrNull()
|
||||
?.toPair()
|
||||
val type = when(convertedType?.first) {
|
||||
KafkaEvents.EVENT_READER_DETERMINED_SERIE.event -> "serie"
|
||||
KafkaEvents.EVENT_READER_DETERMINED_MOVIE.event -> "movie"
|
||||
else -> null
|
||||
}
|
||||
|
||||
eventData.details = eventData.details?.apply {
|
||||
this.title = convertedFileNameEvent.baseName
|
||||
this.type = type
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
@ -1,14 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import org.springframework.stereotype.Component
|
||||
|
||||
|
||||
@Component
|
||||
class EventDataMetadataSubConverter: EventDataSubConverterBase() {
|
||||
override fun convertEvents(eventData: EventDataObject, events: Map<String, Message>) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,17 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.kafka.converter
|
||||
|
||||
import no.iktdev.streamit.content.ui.dto.EventDataObject
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
|
||||
abstract class EventDataSubConverterBase {
|
||||
|
||||
protected abstract fun convertEvents(eventData: EventDataObject, events: Map<String, Message>)
|
||||
|
||||
fun convertAndUpdate(eventData: EventDataObject, events: Map<String, Message>) {
|
||||
try {
|
||||
convertEvents(eventData, events)
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,47 +0,0 @@
|
||||
package no.iktdev.streamit.content.ui.socket
|
||||
|
||||
import no.iktdev.streamit.content.common.CommonConfig
|
||||
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||
import no.iktdev.streamit.library.kafka.dto.Message
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.messaging.handler.annotation.MessageMapping
|
||||
import org.springframework.messaging.handler.annotation.Payload
|
||||
import org.springframework.messaging.simp.SimpMessagingTemplate
|
||||
import org.springframework.stereotype.Controller
|
||||
import java.io.File
|
||||
|
||||
@Controller
|
||||
class RequestTopic(
|
||||
@Autowired private val template: SimpMessagingTemplate?
|
||||
) {
|
||||
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
|
||||
|
||||
@MessageMapping("/request/start")
|
||||
fun requestStartOn(@Payload fullName: String) {
|
||||
val file = File(fullName)
|
||||
if (file.exists()) {
|
||||
try {
|
||||
val message = Message(
|
||||
status = Status(Status.SUCCESS),
|
||||
data = fullName
|
||||
)
|
||||
messageProducer.sendMessage(KafkaEvents.REQUEST_FILE_READ.event, message)
|
||||
template?.convertAndSend("/response/request", RequestResponse(true, fullName))
|
||||
|
||||
} catch (e: Exception) {
|
||||
template?.convertAndSend("/response/request", RequestResponse(false, fullName))
|
||||
|
||||
}
|
||||
} else {
|
||||
template?.convertAndSend("/response/request", RequestResponse(false, fullName))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
data class RequestResponse(
|
||||
val success: Boolean,
|
||||
val file: String
|
||||
)
|
||||
@ -1,5 +1,10 @@
|
||||
package no.iktdev.mediaprocessing.shared.common
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
|
||||
import kotlin.reflect.KClass
|
||||
|
||||
class DeserializingRegistry {
|
||||
companion object {
|
||||
val deserializables = mutableListOf<Pair<KafkaEvents, KClass<out MessageDataWrapper>?>>(
|
||||
|
||||
@ -1,19 +0,0 @@
|
||||
import mu.KotlinLogging
|
||||
import java.io.File
|
||||
import java.io.RandomAccessFile
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
fun isFileAvailable(file: File): Boolean {
|
||||
if (!file.exists()) return false
|
||||
var stream: RandomAccessFile? = null
|
||||
try {
|
||||
stream = RandomAccessFile(file, "rw")
|
||||
stream.close()
|
||||
logger.info { "File ${file.name} is read and writable" }
|
||||
return true
|
||||
} catch (e: Exception) {
|
||||
stream?.close()
|
||||
}
|
||||
return false
|
||||
}
|
||||
@ -1,95 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared
|
||||
|
||||
import no.iktdev.exfl.using
|
||||
import java.io.File
|
||||
import java.io.FileOutputStream
|
||||
import java.net.HttpURLConnection
|
||||
import java.net.URL
|
||||
|
||||
open class DownloadClient(val url: String, val outDir: File, val baseName: String) {
|
||||
protected val http: HttpURLConnection = openConnection()
|
||||
private val BUFFER_SIZE = 4096
|
||||
|
||||
private fun openConnection(): HttpURLConnection {
|
||||
try {
|
||||
return URL(url).openConnection() as HttpURLConnection
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
throw BadAddressException("Provided url is either not provided (null) or is not a valid http url")
|
||||
}
|
||||
}
|
||||
|
||||
protected fun getLength(): Int {
|
||||
return http.contentLength
|
||||
}
|
||||
|
||||
protected fun getProgress(read: Int, total: Int = getLength()): Int {
|
||||
return ((read * 100) / total)
|
||||
}
|
||||
|
||||
suspend fun download(): File? {
|
||||
val extension = getExtension()
|
||||
?: throw UnsupportedFormatException("Provided url does not contain a supported file extension")
|
||||
val outFile = outDir.using("$baseName.$extension")
|
||||
val inputStream = http.inputStream
|
||||
val fos = FileOutputStream(outFile, false)
|
||||
|
||||
var totalBytesRead = 0
|
||||
val buffer = ByteArray(BUFFER_SIZE)
|
||||
inputStream.apply {
|
||||
fos.use { fout ->
|
||||
run {
|
||||
var bytesRead = read(buffer)
|
||||
while (bytesRead >= 0) {
|
||||
fout.write(buffer, 0, bytesRead)
|
||||
totalBytesRead += bytesRead
|
||||
bytesRead = read(buffer)
|
||||
// System.out.println(getProgress(totalBytesRead))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
inputStream.close()
|
||||
fos.close()
|
||||
return outFile
|
||||
}
|
||||
|
||||
open fun getExtension(): String? {
|
||||
val possiblyExtension = url.lastIndexOf(".") + 1
|
||||
return if (possiblyExtension > 1) {
|
||||
return url.toString().substring(possiblyExtension)
|
||||
} else {
|
||||
val mimeType = http.contentType ?: null
|
||||
contentTypeToExtension()[mimeType]
|
||||
}
|
||||
}
|
||||
|
||||
open fun contentTypeToExtension(): Map<String, String> {
|
||||
return mapOf(
|
||||
"image/png" to "png",
|
||||
"image/jpeg" to "jpg",
|
||||
"image/webp" to "webp",
|
||||
"image/bmp" to "bmp",
|
||||
"image/tiff" to "tiff"
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
class BadAddressException : java.lang.Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
|
||||
class UnsupportedFormatException : Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
|
||||
class InvalidFileException : Exception {
|
||||
constructor() : super() {}
|
||||
constructor(message: String?) : super(message) {}
|
||||
constructor(message: String?, cause: Throwable?) : super(message, cause) {}
|
||||
}
|
||||
}
|
||||
@ -1,55 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared
|
||||
|
||||
import com.google.gson.Gson
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.mediaprocessing.shared.dto.PreferenceDto
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
private val log = KotlinLogging.logger {}
|
||||
class Preference {
|
||||
|
||||
companion object {
|
||||
fun getPreference(): PreferenceDto {
|
||||
val preference = readPreferenceFromFile() ?: PreferenceDto()
|
||||
log.info { "[Audio]: Codec = " + preference.encodePreference.audio.codec }
|
||||
log.info { "[Audio]: Language = " + preference.encodePreference.audio.language }
|
||||
log.info { "[Audio]: Channels = " + preference.encodePreference.audio.channels }
|
||||
log.info { "[Audio]: Sample rate = " + preference.encodePreference.audio.sample_rate }
|
||||
log.info { "[Audio]: Use EAC3 for surround = " + preference.encodePreference.audio.defaultToEAC3OnSurroundDetected }
|
||||
|
||||
log.info { "[Video]: Codec = " + preference.encodePreference.video.codec }
|
||||
log.info { "[Video]: Pixel format = " + preference.encodePreference.video.pixelFormat }
|
||||
log.info { "[Video]: Pixel format pass-through = " + preference.encodePreference.video.pixelFormatPassthrough.joinToString(", ") }
|
||||
log.info { "[Video]: Threshold = " + preference.encodePreference.video.threshold }
|
||||
|
||||
return preference
|
||||
}
|
||||
|
||||
private fun readPreferenceFromFile(): PreferenceDto? {
|
||||
val prefFile = SharedConfig.preference
|
||||
if (!prefFile.exists()) {
|
||||
log.info("Preference file: ${prefFile.absolutePath} does not exists...")
|
||||
log.info("Using default configuration")
|
||||
return null
|
||||
}
|
||||
else {
|
||||
log.info("Preference file: ${prefFile.absolutePath} found")
|
||||
}
|
||||
|
||||
return try {
|
||||
val instr = prefFile.inputStream()
|
||||
val text = instr.bufferedReader().use { it.readText() }
|
||||
Gson().fromJson(text, PreferenceDto::class.java)
|
||||
}
|
||||
catch (e: Exception) {
|
||||
log.error("Failed to read preference file: ${prefFile.absolutePath}.. Will use default configuration")
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
}
|
||||
@ -1,22 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared
|
||||
|
||||
import java.io.File
|
||||
|
||||
object SharedConfig {
|
||||
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
|
||||
var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input")
|
||||
val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")
|
||||
|
||||
val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe"
|
||||
val ffmpeg: String = System.getenv("SUPPORTING_EXECUTABLE_FFMPEG") ?: "no/iktdev/mediaprocessing/shared/contract/ffmpeg"
|
||||
|
||||
val preference: File = File("/data/config/preference.json")
|
||||
}
|
||||
|
||||
object DatabaseConfig {
|
||||
val address: String? = System.getenv("DATABASE_ADDRESS")
|
||||
val port: String? = System.getenv("DATABASE_PORT")
|
||||
val username: String? = System.getenv("DATABASE_USERNAME")
|
||||
val password: String? = System.getenv("DATABASE_PASSWORD")
|
||||
val database: String? = System.getenv("DATABASE_NAME")
|
||||
}
|
||||
@ -1,34 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.datasource
|
||||
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import java.time.Instant
|
||||
import java.time.LocalDateTime
|
||||
import java.time.ZoneId
|
||||
import java.time.ZoneOffset
|
||||
|
||||
abstract class DataSource(val databaseName: String, val address: String, val port: String?, val username: String, val password: String) {
|
||||
|
||||
abstract fun createDatabase(): Database?
|
||||
|
||||
abstract fun createTables(vararg tables: Table)
|
||||
|
||||
abstract fun createDatabaseStatement(): String
|
||||
|
||||
abstract fun toConnectionUrl(): String
|
||||
|
||||
fun toPortedAddress(): String {
|
||||
return if (!address.contains(":") && port?.isBlank() != true) {
|
||||
"$address:$port"
|
||||
} else address
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
fun timestampToLocalDateTime(timestamp: Int): LocalDateTime {
|
||||
return Instant.ofEpochSecond(timestamp.toLong()).atZone(ZoneId.systemDefault()).toLocalDateTime()
|
||||
}
|
||||
|
||||
fun LocalDateTime.toEpochSeconds(): Long {
|
||||
return this.toEpochSecond(ZoneOffset.ofTotalSeconds(ZoneOffset.systemDefault().rules.getOffset(LocalDateTime.now()).totalSeconds))
|
||||
}
|
||||
@ -1,84 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.datasource
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.DatabaseConfig
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.SchemaUtils
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import org.jetbrains.exposed.sql.transactions.TransactionManager
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
open class MySqlDataSource(databaseName: String, address: String, port: String = "", username: String, password: String): DataSource(databaseName = databaseName, address = address, port = port, username = username, password = password) {
|
||||
companion object {
|
||||
fun fromDatabaseEnv(): MySqlDataSource {
|
||||
if (DatabaseConfig.database.isNullOrBlank()) throw RuntimeException("Database name is not defined in 'DATABASE_NAME'")
|
||||
if (DatabaseConfig.username.isNullOrBlank()) throw RuntimeException("Database username is not defined in 'DATABASE_USERNAME'")
|
||||
if (DatabaseConfig.address.isNullOrBlank()) throw RuntimeException("Database address is not defined in 'DATABASE_ADDRESS'")
|
||||
return MySqlDataSource(
|
||||
databaseName = DatabaseConfig.database,
|
||||
address = DatabaseConfig.address,
|
||||
port = DatabaseConfig.port ?: "",
|
||||
username = DatabaseConfig.username,
|
||||
password = DatabaseConfig.password ?: ""
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
override fun createDatabase(): Database? {
|
||||
val ok = transaction(toDatabaseServerConnection()) {
|
||||
val tmc = TransactionManager.current().connection
|
||||
val query = "SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '$databaseName'"
|
||||
val stmt = tmc.prepareStatement(query, true)
|
||||
|
||||
val resultSet = stmt.executeQuery()
|
||||
val databaseExists = resultSet.next()
|
||||
|
||||
if (!databaseExists) {
|
||||
try {
|
||||
exec(createDatabaseStatement())
|
||||
println("Database $databaseName created.")
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
} else {
|
||||
println("Database $databaseName already exists.")
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
return if (ok) toDatabase() else null
|
||||
}
|
||||
|
||||
override fun createTables(vararg tables: Table) {
|
||||
transaction {
|
||||
SchemaUtils.createMissingTablesAndColumns(*tables)
|
||||
println("Database transaction completed")
|
||||
}
|
||||
}
|
||||
|
||||
override fun createDatabaseStatement(): String {
|
||||
return "CREATE DATABASE $databaseName"
|
||||
}
|
||||
|
||||
protected fun toDatabaseServerConnection(): Database {
|
||||
return Database.connect(
|
||||
toConnectionUrl(),
|
||||
user = username,
|
||||
password = password
|
||||
)
|
||||
}
|
||||
|
||||
fun toDatabase(): Database {
|
||||
return Database.connect(
|
||||
"${toConnectionUrl()}/$databaseName",
|
||||
user = username,
|
||||
password = password
|
||||
)
|
||||
}
|
||||
|
||||
override fun toConnectionUrl(): String {
|
||||
return "jdbc:mysql://${toPortedAddress()}"
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,72 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.datasource
|
||||
|
||||
import org.jetbrains.exposed.dao.id.EntityID
|
||||
import org.jetbrains.exposed.sql.Column
|
||||
import org.jetbrains.exposed.sql.Table
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
import org.jetbrains.exposed.sql.statements.InsertStatement
|
||||
import org.jetbrains.exposed.sql.update
|
||||
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
|
||||
open class TableDefaultOperations<T: Table> {
|
||||
|
||||
}
|
||||
|
||||
fun <T> withTransaction(block: () -> T): T? {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> insertWithSuccess(block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
fun <T> executeWithStatus(block: () -> T): Boolean {
|
||||
return try {
|
||||
transaction {
|
||||
try {
|
||||
block()
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
// log the error here or handle the exception as needed
|
||||
throw e // Optionally, you can rethrow the exception if needed
|
||||
}
|
||||
}
|
||||
true
|
||||
} catch (e: Exception) {
|
||||
e.printStackTrace()
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@ -1,100 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.extended
|
||||
|
||||
import java.io.File
|
||||
|
||||
val validVideoFiles = listOf(
|
||||
"mkv",
|
||||
"avi",
|
||||
"mp4",
|
||||
"wmv",
|
||||
"webm",
|
||||
"mov"
|
||||
)
|
||||
|
||||
fun File.isSupportedVideoFile(): Boolean {
|
||||
return this.isFile && validVideoFiles.contains(this.extension)
|
||||
}
|
||||
|
||||
fun getSanitizedFileName(name: String): String {
|
||||
/**
|
||||
* Modifies the input value and removes "[Text]"
|
||||
* @param text "[TEST] Dummy - 01 [AZ 1080p] "
|
||||
*/
|
||||
fun removeBracketedText(text: String): String {
|
||||
return Regex("\\[.*?]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeParenthesizedText(text: String): String {
|
||||
return Regex("\\(.*?\\)").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeResolutionAndTags(text: String): String {
|
||||
return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ")
|
||||
}
|
||||
|
||||
fun removeInBetweenCharacters(text: String): String {
|
||||
return Regex("[.]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
* @param text "example text with extra spaces"
|
||||
* @return example text with extra spaces
|
||||
*/
|
||||
fun removeExtraWhiteSpace(text: String): String {
|
||||
return Regex("\\s{2,}").replace(text, " ")
|
||||
}
|
||||
|
||||
return name
|
||||
.let { removeBracketedText(it) }
|
||||
.let { removeParenthesizedText(it) }
|
||||
.let { removeResolutionAndTags(it) }
|
||||
.let { removeInBetweenCharacters(it) }
|
||||
.let { removeExtraWhiteSpace(it) }
|
||||
}
|
||||
|
||||
|
||||
fun File.getDesiredVideoFileName(): String? {
|
||||
if (!this.isSupportedVideoFile()) return null
|
||||
val cleanedFileName = getSanitizedFileName(this.nameWithoutExtension)
|
||||
val parts = cleanedFileName.split(" - ")
|
||||
return when {
|
||||
parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> {
|
||||
val title = parts[0]
|
||||
val year = parts[1]
|
||||
"$title ($year)"
|
||||
}
|
||||
|
||||
parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> {
|
||||
val title = parts[0]
|
||||
val episodeWithRevision = parts[2]
|
||||
val episodeParts = episodeWithRevision.split("v", "V")
|
||||
val episodeNumber = episodeParts[0].toInt()
|
||||
val revisionNumber = episodeParts[1].toInt()
|
||||
val seasonEpisode =
|
||||
"S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}"
|
||||
val episodeTitle = if (parts.size > 3) parts[3] else ""
|
||||
"$title - $seasonEpisode - $episodeTitle"
|
||||
}
|
||||
|
||||
else -> cleanedFileName
|
||||
}.trim()
|
||||
}
|
||||
|
||||
fun File.getGuessedVideoTitle(): String? {
|
||||
val desiredFileName = getDesiredVideoFileName() ?: return null
|
||||
val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE)
|
||||
if (seasonRegex.containsMatchIn(desiredFileName)) {
|
||||
return seasonRegex.replace(desiredFileName, "").trim()
|
||||
} else {
|
||||
val result = if (desiredFileName.contains(" - ")) {
|
||||
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
||||
} else desiredFileName
|
||||
return result.trim()
|
||||
}
|
||||
}
|
||||
@ -1,24 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.kafka
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.SharedConfig
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import no.iktdev.streamit.library.kafka.dto.Status
|
||||
|
||||
class CoordinatorProducer(): DefaultProducer(SharedConfig.kafkaTopic) {
|
||||
fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) {
|
||||
super.sendMessage(event.event, Message(
|
||||
referenceId = referenceId,
|
||||
data = data
|
||||
))
|
||||
}
|
||||
fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) {
|
||||
super.sendMessage(event.event, Message(
|
||||
referenceId = referenceId,
|
||||
eventId = eventId,
|
||||
data = data
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -1,160 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.parsing
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MovieInfo
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfo
|
||||
|
||||
|
||||
class FileNameDeterminate(val title: String, val sanitizedName: String, val ctype: ContentType = ContentType.UNDEFINED) {
|
||||
|
||||
enum class ContentType {
|
||||
MOVIE,
|
||||
SERIE,
|
||||
UNDEFINED
|
||||
}
|
||||
|
||||
fun getDeterminedVideoInfo(): VideoInfo? {
|
||||
return when (ctype) {
|
||||
ContentType.MOVIE -> determineMovieFileName()
|
||||
ContentType.SERIE -> determineSerieFileName()
|
||||
ContentType.UNDEFINED -> determineUndefinedFileName()
|
||||
}
|
||||
}
|
||||
|
||||
private fun determineMovieFileName(): MovieInfo? {
|
||||
val movieEx = MovieEx(title, sanitizedName)
|
||||
val stripped = when {
|
||||
movieEx.isDefinedWithYear() -> sanitizedName.replace(movieEx.yearRegex(), "").trim()
|
||||
movieEx.doesContainMovieKeywords() -> sanitizedName.replace(Regex("(?i)\\s*\\(\\s*movie\\s*\\)\\s*"), "").trim()
|
||||
else -> sanitizedName
|
||||
}
|
||||
val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped
|
||||
return MovieInfo(cleanup(nonResolutioned), cleanup(nonResolutioned))
|
||||
}
|
||||
|
||||
private fun determineSerieFileName(): EpisodeInfo? {
|
||||
val serieEx = SerieEx(title, sanitizedName)
|
||||
|
||||
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumberSingle = serieEx.findEpisodeNumber()
|
||||
|
||||
val seasonNumber = season ?: "1"
|
||||
val episodeNumber = episode ?: (episodeNumberSingle ?: return null)
|
||||
val seasonEpisodeCombined = serieEx.getSeasonEpisodeCombined(seasonNumber, episodeNumber)
|
||||
val episodeTitle = serieEx.findEpisodeTitle()
|
||||
|
||||
val useTitle = if (title == sanitizedName) {
|
||||
if (title.contains(" - ")) {
|
||||
title.split(" - ").firstOrNull() ?: title
|
||||
} else {
|
||||
val seasonNumberIndex = if (title.indexOf(seasonNumber) < 0) title.length -1 else title.indexOf(seasonNumber)
|
||||
val episodeNumberIndex = if (title.indexOf(episodeNumber) < 0) title.length -1 else title.indexOf(episodeNumber)
|
||||
val closest = listOf<Int>(seasonNumberIndex, episodeNumberIndex).min()
|
||||
val shrunkenTitle = title.substring(0, closest)
|
||||
if (closest - shrunkenTitle.lastIndexOf(" ") < 3) {
|
||||
title.substring(0, shrunkenTitle.lastIndexOf(" "))
|
||||
} else title.substring(0, closest)
|
||||
|
||||
}
|
||||
} else title
|
||||
val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim()
|
||||
return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName)
|
||||
}
|
||||
|
||||
private fun determineUndefinedFileName(): VideoInfo? {
|
||||
val serieEx = SerieEx(title, sanitizedName)
|
||||
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumber = serieEx.findEpisodeNumber()
|
||||
return if ((sanitizedName.contains(" - ") && episodeNumber != null) || season != null || episode != null) {
|
||||
determineSerieFileName()
|
||||
} else {
|
||||
determineMovieFileName()
|
||||
}
|
||||
}
|
||||
|
||||
private fun cleanup(input: String): String {
|
||||
val cleaned = Regex("(?<=\\w)[_.](?=\\w)").replace(input, " ")
|
||||
return Regex("\\s{2,}").replace(cleaned, " ")
|
||||
}
|
||||
|
||||
open internal class Base(val title: String, val sanitizedName: String) {
|
||||
fun getMatch(regex: String): String? {
|
||||
return Regex(regex, RegexOption.IGNORE_CASE).find(sanitizedName)?.value
|
||||
}
|
||||
|
||||
fun removeResolutionAndBeyond(input: String): String? {
|
||||
val removalValue = Regex("(i?)([0-9].*[pk]|[ ._-]+[UHD]+[ ._-])").find(input)?.value ?: return null
|
||||
return input.substring(0, input.indexOf(removalValue))
|
||||
}
|
||||
|
||||
fun yearRegex(): Regex {
|
||||
return Regex("[ .(][0-9]{4}[ .)]")
|
||||
}
|
||||
}
|
||||
|
||||
internal class MovieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||
/**
|
||||
* @return not null if matches " 2020 " or ".2020."
|
||||
*/
|
||||
fun isDefinedWithYear(): Boolean {
|
||||
return getMatch(yearRegex().pattern)?.isNotBlank() ?: false
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the filename contains the keyword movie, if so, default to movie
|
||||
*/
|
||||
fun doesContainMovieKeywords(): Boolean {
|
||||
return getMatch("[(](?<=\\()movie(?=\\))[)]")?.isNotBlank() ?: false
|
||||
}
|
||||
}
|
||||
|
||||
internal class SerieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||
|
||||
fun getSeasonEpisodeCombined(season: String, episode: String): String {
|
||||
return StringBuilder()
|
||||
.append("S")
|
||||
.append(if (season.length < 2) season.padStart(2, '0') else season)
|
||||
.append("E")
|
||||
.append(if (episode.length < 2) episode.padStart(2, '0') else episode)
|
||||
.toString().trim()
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sjekken matcher tekst som dette:
|
||||
* Cool - Season 1 Episode 13
|
||||
* Cool - s1e13
|
||||
* Cool - S1E13
|
||||
* Cool - S1 13
|
||||
*/
|
||||
fun findSeasonAndEpisode(inputText: String): Pair<String?, String?> {
|
||||
val regex = Regex("""(?i)\b(?:S|Season)\s*(\d+).*?(?:E|Episode)?\s*(\d+)\b""")
|
||||
val matchResult = regex.find(inputText)
|
||||
val season = matchResult?.groups?.get(1)?.value
|
||||
val episode = matchResult?.groups?.get(2)?.value
|
||||
return season to episode
|
||||
}
|
||||
|
||||
fun findEpisodeNumber(): String? {
|
||||
val regex = Regex("\\b(\\d+)\\b")
|
||||
val matchResult = regex.find(sanitizedName)
|
||||
return matchResult?.value?.trim()
|
||||
}
|
||||
|
||||
fun findEpisodeTitle(): String? {
|
||||
val seCombo = findSeasonAndEpisode(sanitizedName)
|
||||
val episodeNumber = findEpisodeNumber()
|
||||
|
||||
val startPosition = if (seCombo.second != null) sanitizedName.indexOf(seCombo.second!!)+ seCombo.second!!.length
|
||||
else if (episodeNumber != null) sanitizedName.indexOf(episodeNumber) + episodeNumber.length else 0
|
||||
val availableText = sanitizedName.substring(startPosition)
|
||||
|
||||
val cleanedEpisodeTitle = availableText.replace(Regex("""(?i)\b(?:season|episode|ep)\b"""), "")
|
||||
.replace(Regex("""^\s*-\s*"""), "")
|
||||
.replace(Regex("""\s+"""), " ")
|
||||
.trim()
|
||||
|
||||
return cleanedEpisodeTitle
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,95 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.parsing
|
||||
|
||||
class FileNameParser(val fileName: String) {
|
||||
var cleanedFileName: String
|
||||
private set
|
||||
|
||||
init {
|
||||
cleanedFileName = fileName
|
||||
.let { removeBracketedText(it) }
|
||||
.let { removeParenthesizedText(it) }
|
||||
.let { removeResolutionAndTags(it) }
|
||||
.let { removeInBetweenCharacters(it) }
|
||||
.let { removeExtraWhiteSpace(it) }
|
||||
|
||||
}
|
||||
|
||||
fun guessDesiredFileName(): String {
|
||||
val parts = cleanedFileName.split(" - ")
|
||||
return when {
|
||||
parts.size == 2 && parts[1].matches(Regex("\\d{4}")) -> {
|
||||
val title = parts[0]
|
||||
val year = parts[1]
|
||||
"$title ($year)"
|
||||
}
|
||||
|
||||
parts.size >= 3 && parts[1].matches(Regex("S\\d+")) && parts[2].matches(Regex("\\d+[vV]\\d+")) -> {
|
||||
val title = parts[0]
|
||||
val episodeWithRevision = parts[2]
|
||||
val episodeParts = episodeWithRevision.split("v", "V")
|
||||
val episodeNumber = episodeParts[0].toInt()
|
||||
val revisionNumber = episodeParts[1].toInt()
|
||||
val seasonEpisode =
|
||||
"S${episodeNumber.toString().padStart(2, '0')}E${revisionNumber.toString().padStart(2, '0')}"
|
||||
val episodeTitle = if (parts.size > 3) parts[3] else ""
|
||||
"$title - $seasonEpisode - $episodeTitle"
|
||||
}
|
||||
|
||||
else -> cleanedFileName
|
||||
}.trim()
|
||||
}
|
||||
|
||||
fun guessDesiredTitle(): String {
|
||||
val desiredFileName = guessDesiredFileName()
|
||||
val seasonRegex = Regex("\\sS[0-9]+(\\s- [0-9]+|\\s[0-9]+)", RegexOption.IGNORE_CASE)
|
||||
if (seasonRegex.containsMatchIn(desiredFileName)) {
|
||||
return seasonRegex.replace(desiredFileName, "").trim()
|
||||
} else {
|
||||
val result = if (desiredFileName.contains(" - ")) {
|
||||
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
||||
} else desiredFileName
|
||||
return result.trim()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Modifies the input value and removes "[Text]"
|
||||
* @param text "[TEST] Dummy - 01 [AZ 1080p] "
|
||||
*/
|
||||
fun removeBracketedText(text: String): String {
|
||||
return Regex("\\[.*?]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeParenthesizedText(text: String): String {
|
||||
return Regex("\\(.*?\\)").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
fun removeResolutionAndTags(text: String): String {
|
||||
return Regex("(.*?)(?=\\d+[pk]\\b)").replace(text, " ")
|
||||
}
|
||||
|
||||
fun removeInBetweenCharacters(text: String): String {
|
||||
return Regex("[.]").replace(text, " ")
|
||||
}
|
||||
|
||||
/**
|
||||
* @param text "example text with extra spaces"
|
||||
* @return example text with extra spaces
|
||||
*/
|
||||
fun removeExtraWhiteSpace(text: String): String {
|
||||
return Regex("\\s{2,}").replace(text, " ")
|
||||
}
|
||||
|
||||
|
||||
private fun getMatch(regex: String): String? {
|
||||
return Regex(regex).find(fileName)?.value ?: return null
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,33 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.persistance
|
||||
|
||||
import com.google.gson.Gson
|
||||
import no.iktdev.mediaprocessing.shared.datasource.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import org.jetbrains.exposed.sql.SortOrder
|
||||
import org.jetbrains.exposed.sql.select
|
||||
import org.jetbrains.exposed.sql.selectAll
|
||||
import java.time.LocalDateTime
|
||||
|
||||
class PersistentDataReader {
|
||||
val dzz = DeserializingRegistry()
|
||||
|
||||
fun getAllMessages(): List<List<PersistentMessage>> {
|
||||
val events = withTransaction {
|
||||
events.selectAll()
|
||||
.groupBy { it[events.referenceId] }
|
||||
}
|
||||
return events?.mapNotNull { it.value.mapNotNull { v -> fromRowToPersistentMessage(v, dzz) } } ?: emptyList()
|
||||
}
|
||||
|
||||
fun getMessagesFor(referenceId: String): List<PersistentMessage> {
|
||||
return withTransaction {
|
||||
events.select { events.referenceId eq referenceId }
|
||||
.orderBy(events.created, SortOrder.ASC)
|
||||
.mapNotNull { fromRowToPersistentMessage(it, dzz) }
|
||||
} ?: emptyList()
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,20 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.persistance
|
||||
|
||||
import no.iktdev.mediaprocessing.shared.datasource.executeWithStatus
|
||||
import no.iktdev.mediaprocessing.shared.datasource.withTransaction
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
|
||||
open class PersistentDataStore {
|
||||
fun storeMessage(event: String, message: Message<*>): Boolean {
|
||||
return executeWithStatus {
|
||||
events.insert {
|
||||
it[referenceId] = message.referenceId
|
||||
it[eventId] = message.eventId
|
||||
it[events.event] = event
|
||||
it[data] = message.dataAsJson()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -1,33 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.persistance
|
||||
|
||||
import com.google.gson.Gson
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
|
||||
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
|
||||
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
|
||||
import org.jetbrains.exposed.sql.ResultRow
|
||||
import java.time.LocalDateTime
|
||||
|
||||
data class PersistentMessage(
|
||||
val referenceId: String,
|
||||
val eventId: String,
|
||||
val event: KafkaEvents,
|
||||
val data: MessageDataWrapper,
|
||||
val created: LocalDateTime
|
||||
)
|
||||
|
||||
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
|
||||
val kev = try {
|
||||
KafkaEvents.valueOf(row[events.event])
|
||||
} catch (e: IllegalArgumentException) {
|
||||
e.printStackTrace()
|
||||
return null
|
||||
}
|
||||
val dzdata = dez.deserializeData(kev, row[events.data])
|
||||
return PersistentMessage(
|
||||
referenceId = row[events.referenceId],
|
||||
eventId = row[events.eventId],
|
||||
event = kev,
|
||||
data = dzdata,
|
||||
created = row[events.created]
|
||||
)
|
||||
}
|
||||
@ -1,19 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.persistance
|
||||
|
||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||
import org.jetbrains.exposed.sql.Column
|
||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
||||
import org.jetbrains.exposed.sql.javatime.datetime
|
||||
import java.time.LocalDateTime
|
||||
|
||||
object events: IntIdTable() {
|
||||
val referenceId: Column<String> = varchar("referenceId", 50)
|
||||
val eventId: Column<String> = varchar("referenceId", 50)
|
||||
val event: Column<String> = varchar("event1",100)
|
||||
val data: Column<String> = text("data")
|
||||
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
||||
|
||||
init {
|
||||
uniqueIndex(referenceId, eventId, event)
|
||||
}
|
||||
}
|
||||
@ -1,13 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.runner
|
||||
|
||||
interface IRunner {
|
||||
|
||||
fun onStarted() {}
|
||||
|
||||
fun onOutputChanged(line: String) {}
|
||||
|
||||
fun onEnded() {}
|
||||
|
||||
fun onError(code: Int)
|
||||
|
||||
}
|
||||
@ -1,20 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.runner
|
||||
|
||||
import com.github.pgreze.process.Redirect
|
||||
import com.github.pgreze.process.process
|
||||
|
||||
data class CodeToOutput(
|
||||
val statusCode: Int,
|
||||
val output: List<String>
|
||||
)
|
||||
|
||||
suspend fun getOutputUsing(executable: String, vararg arguments: String): CodeToOutput {
|
||||
val result: MutableList<String> = mutableListOf()
|
||||
val code = process(executable, *arguments,
|
||||
stderr = Redirect.CAPTURE,
|
||||
stdout = Redirect.CAPTURE,
|
||||
consumer = {
|
||||
result.add(it)
|
||||
}).resultCode
|
||||
return CodeToOutput(statusCode = code, result)
|
||||
}
|
||||
@ -1,41 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.runner
|
||||
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.cancel
|
||||
import kotlinx.coroutines.cancelAndJoin
|
||||
import kotlinx.coroutines.launch
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.exfl.coroutines.Coroutines
|
||||
|
||||
open class Runner(open val executable: String, val daemonInterface: IRunner) {
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
val scope = Coroutines.io()
|
||||
var job: Job? = null
|
||||
var executor: com.github.pgreze.process.ProcessResult? = null
|
||||
open suspend fun run(parameters: List<String>): Int {
|
||||
daemonInterface.onStarted()
|
||||
logger.info { "\nDaemon arguments: $executable \nParamters:\n${parameters.joinToString(" ")}" }
|
||||
job = scope.launch {
|
||||
executor = com.github.pgreze.process.process(executable, *parameters.toTypedArray(),
|
||||
stdout = com.github.pgreze.process.Redirect.CAPTURE,
|
||||
stderr = com.github.pgreze.process.Redirect.CAPTURE,
|
||||
consumer = {
|
||||
daemonInterface.onOutputChanged(it)
|
||||
})
|
||||
}
|
||||
job?.join()
|
||||
|
||||
val resultCode = executor?.resultCode ?: -1
|
||||
if (resultCode == 0) {
|
||||
daemonInterface.onEnded()
|
||||
} else daemonInterface.onError(resultCode)
|
||||
logger.info { "$executable result: $resultCode" }
|
||||
return resultCode
|
||||
}
|
||||
|
||||
suspend fun cancel() {
|
||||
job?.cancelAndJoin()
|
||||
scope.cancel("Cancel operation triggered!")
|
||||
}
|
||||
}
|
||||
@ -1,23 +0,0 @@
|
||||
package no.iktdev.mediaprocessing.shared.socket
|
||||
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.messaging.simp.config.MessageBrokerRegistry
|
||||
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
|
||||
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
|
||||
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
|
||||
|
||||
@Configuration
|
||||
@EnableWebSocketMessageBroker
|
||||
open class SocketImplementation: WebSocketMessageBrokerConfigurer {
|
||||
|
||||
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
|
||||
registry.addEndpoint("/ws")
|
||||
.setAllowedOrigins("*://localhost:*/*", "http://localhost:3000/")
|
||||
.withSockJS()
|
||||
}
|
||||
|
||||
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
|
||||
registry.enableSimpleBroker("/topic")
|
||||
registry.setApplicationDestinationPrefixes("/app")
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user