V2 update

This commit is contained in:
Brage 2023-12-07 23:34:48 +01:00
parent 1b83bec7c0
commit 729bb03b70
27 changed files with 277 additions and 226 deletions

1
.idea/gradle.xml generated
View File

@ -5,7 +5,6 @@
<option name="linkedExternalProjectsSettings"> <option name="linkedExternalProjectsSettings">
<GradleProjectSettings> <GradleProjectSettings>
<option name="externalProjectPath" value="$PROJECT_DIR$" /> <option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="gradleHome" value="" />
<option name="modules"> <option name="modules">
<set> <set>
<option value="$PROJECT_DIR$" /> <option value="$PROJECT_DIR$" />

2
.idea/misc.xml generated
View File

@ -1,11 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="EntryPointsManager"> <component name="EntryPointsManager">
<list size="1"> <list size="1">
<item index="0" class="java.lang.String" itemvalue="org.springframework.scheduling.annotation.Scheduled" /> <item index="0" class="java.lang.String" itemvalue="org.springframework.scheduling.annotation.Scheduled" />
</list> </list>
</component> </component>
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK"> <component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" /> <output url="file://$PROJECT_DIR$/out" />
</component> </component>

View File

@ -1,5 +1,9 @@
plugins { plugins {
id("java") id("java")
kotlin("jvm")
kotlin("plugin.spring") version "1.5.31"
id("org.springframework.boot") version "2.5.5"
id("io.spring.dependency-management") version "1.0.11.RELEASE"
} }
group = "no.iktdev.mediaprocessing" group = "no.iktdev.mediaprocessing"
@ -10,6 +14,18 @@ repositories {
} }
dependencies { dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
} }

View File

@ -19,31 +19,46 @@ repositories {
} }
} }
val exposedVersion = "0.44.0"
dependencies { dependencies {
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:2.7.0")
implementation("org.springframework.kafka:spring-kafka:3.0.1")
// implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.google.code.gson:gson:2.8.9") implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307") implementation("org.json:json:20210307")
implementation(project(mapOf("path" to ":shared")))
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
implementation(project(mapOf("path" to ":shared:kafka"))) implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared")))
implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation ("mysql:mysql-connector-java:8.0.29")
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
implementation(kotlin("stdlib-jdk8")) implementation(kotlin("stdlib-jdk8"))
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1")
testImplementation("org.assertj:assertj-core:3.21.0")
testImplementation(project(mapOf("path" to ":shared:common")))
} }
tasks.test { tasks.test {

View File

@ -4,24 +4,23 @@ import com.google.gson.Gson
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.persistance.events
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.UUID import java.util.UUID
@Service @Service
class Coordinator { class Coordinator() {
val producer = CoordinatorProducer() val producer = CoordinatorProducer()
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}

View File

@ -1,25 +1,50 @@
package no.iktdev.mediaprocessing.coordinator package no.iktdev.mediaprocessing.coordinator
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.persistance.events import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.events
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext
val log = KotlinLogging.logger {}
@SpringBootApplication @SpringBootApplication
class CoordinatorApplication { class CoordinatorApplication {
} }
private var context: ApplicationContext? = null
@Suppress("unused")
fun getContext(): ApplicationContext? {
return context
}
fun main(args: Array<String>) { fun main(args: Array<String>) {
val dataSource = MySqlDataSource.fromDatabaseEnv(); /*val dataSource = MySqlDataSource.fromDatabaseEnv();
Coroutines.default().launch {
dataSource.createDatabase() dataSource.createDatabase()
dataSource.createTables( dataSource.createTables(
events events
) )
}*/
context = runApplication<CoordinatorApplication>(*args)
printSharedConfig()
} }
fun printSharedConfig() {
log.info { "Kafka topic: ${SharedConfig.kafkaTopic}" }
log.info { "File Input: ${SharedConfig.incomingContent}" }
log.info { "File Output: ${SharedConfig.outgoingContent}" }
log.info { "Ffprobe: ${SharedConfig.ffprobe}" }
log.info { "Ffmpeg: ${SharedConfig.ffmpeg}" }
class SocketImplemented: SocketImplementation() { log.info { "Database: ${DatabaseConfig.database}@${DatabaseConfig.address}:${DatabaseConfig.port}" }
log.info { "Username: ${DatabaseConfig.username}" }
log.info { "Password: ${ if(DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }
} }

View File

@ -1,13 +1,13 @@
package no.iktdev.mediaprocessing.coordinator.mapping package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.dto.MetadataDto import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.pyMetadata import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.pyMetadata
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.persistance.PersistentMessage
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
@ -15,21 +15,21 @@ import no.iktdev.streamit.library.kafka.dto.Status
class MetadataMapping(val events: List<PersistentMessage>) { class MetadataMapping(val events: List<PersistentMessage>) {
fun map(): MetadataDto { fun map(): MetadataDto? {
val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed? val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed?
if (!baseInfo.isSuccess()) { if (!baseInfo.isSuccess()) {
return return null
} }
return null
return MetadataDto( /*return MetadataDto(
title = meta.data?.title, title = meta?.data?.title ?: return null,
type = meta.data.type, type = meta?.data?.type ?: return null,
)
)*/
} }
} }

View File

@ -1,8 +1,8 @@
package no.iktdev.mediaprocessing.coordinator.mapping package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto import no.iktdev.mediaprocessing.shared.contract.reader.MediaProcessedDto
class ProcessMapping(val events: List<PersistentMessage>) { class ProcessMapping(val events: List<PersistentMessage>) {
@ -24,23 +24,23 @@ class ProcessMapping(val events: List<PersistentMessage>) {
} }
fun waitsForEncode(): Boolean { fun waitsForEncode(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED.event } != null val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } != null
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED.event } != null val performed = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } != null
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED.event } != null val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED } != null
return !(isSkipped || (arguments && performed)) return !(isSkipped || (arguments && performed))
} }
fun waitsForExtract(): Boolean { fun waitsForExtract(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED.event } != null val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED } != null
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED.event } != null val performed = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } != null
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED.event } != null val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED } != null
return !(isSkipped || (arguments && performed)) return !(isSkipped || (arguments && performed))
} }
fun waitsForConvert(): Boolean { fun waitsForConvert(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED.event } != null val arguments = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } != null
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED.event } != null val performed = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } != null
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED.event } != null val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED } != null
return !(isSkipped || (arguments && performed)) return !(isSkipped || (arguments && performed))
} }

View File

@ -1,39 +1,42 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.parsing.FileNameParser
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@Service class BaseInfoFromFile(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener(
class BaseInfoFromFile { SharedConfig.kafkaTopic)): ProcessingService(producer, listener) {
val io = Coroutines.io()
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
val message = event.value()
if (message.data is ProcessStarted) {
io.launch {
readFileInfo(message.referenceId, message.data as ProcessStarted)
}
}
}
val producer = CoordinatorProducer()
init { init {
listener.onMessageReceived = { event ->
val message = event.value
if (message.data is ProcessStarted) {
io.launch {
val result = readFileInfo(message.data as ProcessStarted)
onResult(message.referenceId, result)
}
}
}
io.launch { io.launch {
listener.listen() listener.listen()
} }
} }
suspend fun readFileInfo(referenceId: String, started: ProcessStarted) { override fun onResult(referenceId: String, data: MessageDataWrapper) {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data)
}
fun readFileInfo(started: ProcessStarted): MessageDataWrapper {
val result = try { val result = try {
val fileName = File(started.file).nameWithoutExtension val fileName = File(started.file).nameWithoutExtension
val fileNameParser = FileNameParser(fileName) val fileNameParser = FileNameParser(fileName)
@ -46,6 +49,6 @@ class BaseInfoFromFile {
e.printStackTrace() e.printStackTrace()
MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file") MessageDataWrapper(Status.ERROR, e.message ?: "Unable to obtain proper info from file")
} }
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, result) return result
} }
} }

View File

@ -1,19 +1,18 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service
class MediaStreamsAnalyze { class MediaStreamsAnalyze {
val io = Coroutines.io() val io = Coroutines.io()
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
if (event.key() == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED.event) { if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) {
if (event.value().data?.status == Status.COMPLETED) { if (event.value.data?.status == Status.COMPLETED) {
} }
} }

View File

@ -3,13 +3,13 @@ package no.iktdev.mediaprocessing.coordinator.reader
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.JsonObject import com.google.gson.JsonObject
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.ffmpeg.VideoStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -18,28 +18,32 @@ import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service class ParseVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener(
class ParseVideoFileStreams { SharedConfig.kafkaTopic)): ProcessingService(producer, listener) {
val io = Coroutines.io()
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> override fun onResult(referenceId: String, data: MessageDataWrapper) {
val message = event.value() producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data)
if (message.data is ReaderPerformed) {
io.launch {
parseStreams(message.referenceId, message.data as ReaderPerformed)
} }
}
}
val producer = CoordinatorProducer()
init { init {
listener.onMessageReceived = { event ->
val message = event.value
if (message.data is ReaderPerformed) {
io.launch {
val result = parseStreams(message.data as ReaderPerformed)
onResult(message.referenceId, result)
}
}
}
io.launch { io.launch {
listener.listen() listener.listen()
} }
} }
suspend fun parseStreams(referenceId: String, data: ReaderPerformed) {
fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
val gson = Gson() val gson = Gson()
try { return try {
val jsonObject = gson.fromJson(data.output, JsonObject::class.java) val jsonObject = gson.fromJson(data.output, JsonObject::class.java)
val jStreams = jsonObject.getAsJsonArray("streams") val jStreams = jsonObject.getAsJsonArray("streams")
@ -66,16 +70,11 @@ class ParseVideoFileStreams {
audioStream = audioStreams, audioStream = audioStreams,
subtitleStream = subtitleStreams subtitleStream = subtitleStreams
) )
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, MessageDataWrapper(Status.COMPLETED, gson.toJson(parsedStreams))
MessageDataWrapper(Status.COMPLETED, gson.toJson(parsedStreams)
)
)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
MessageDataWrapper(Status.ERROR, message = e.message) MessageDataWrapper(Status.ERROR, message = e.message)
)
} }
} }

View File

@ -1,52 +1,50 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.runner.getOutputUsing
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@Service class ReadVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener(
class ReadVideoFileStreams { SharedConfig.kafkaTopic)
val io = Coroutines.io() ): ProcessingService(producer, listener) {
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
val message = event.value() override fun onResult(referenceId: String, data: MessageDataWrapper) {
if (message.data is ProcessStarted) { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data)
io.launch {
fileReadStreams(message.referenceId, message.data as ProcessStarted)
} }
}
}
val producer = CoordinatorProducer()
init { init {
listener.onMessageReceived = { event ->
val message = event.value
if (message.data is ProcessStarted) {
io.launch {
val result = fileReadStreams(message.data as ProcessStarted)
onResult(message.referenceId, result)
}
}
}
io.launch { io.launch {
listener.listen() listener.listen()
} }
} }
suspend fun fileReadStreams(referenceId: String, started: ProcessStarted) { suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper {
val file = File(started.file) val file = File(started.file)
if (file.exists() && file.isFile) { return if (file.exists() && file.isFile) {
val result = readStreams(file) val result = readStreams(file)
producer.sendMessage(
referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED,
ReaderPerformed(Status.COMPLETED, file = started.file, output = result.output.joinToString("\n")) ReaderPerformed(Status.COMPLETED, file = started.file, output = result.output.joinToString("\n"))
)
} else { } else {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED,
MessageDataWrapper(Status.ERROR, "File in data is not a file or does not exist") MessageDataWrapper(Status.ERROR, "File in data is not a file or does not exist")
)
} }
} }

View File

@ -4,15 +4,15 @@ import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.datasource.toEpochSeconds import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
import no.iktdev.mediaprocessing.shared.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.parsing.FileNameDeterminate
import no.iktdev.mediaprocessing.shared.persistance.PersistentMessage
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
@ -23,7 +23,7 @@ import java.time.LocalDateTime
* *
*/ */
@Service @Service
class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired coordinator: Coordinator): TaskCreatorListener { class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreatorListener {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
init { init {
coordinator.addListener(this) coordinator.addListener(this)

View File

@ -5,14 +5,14 @@ import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.Preference import no.iktdev.mediaprocessing.shared.common.Preference
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.* import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.mediaprocessing.shared.persistance.PersistentMessage
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -22,7 +22,7 @@ import java.io.File
* Is to be called or to run with the result from FileOout * Is to be called or to run with the result from FileOout
*/ */
@Service @Service
class OutNameToWorkArgumentCreator(@Autowired coordinator: Coordinator) : TaskCreator() { class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : TaskCreator() {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
init { init {

View File

@ -1,10 +1,10 @@
package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
import isFileAvailable
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.isFileAvailable
import java.io.File import java.io.File
import java.util.UUID import java.util.UUID

View File

@ -7,9 +7,9 @@ import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.extended.isSupportedVideoFile
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service

View File

@ -11,22 +11,43 @@ version = "1.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
maven("https://jitpack.io")
maven {
url = uri("https://reposilite.iktdev.no/releases")
}
maven {
url = uri("https://reposilite.iktdev.no/snapshots")
}
} }
dependencies { dependencies {
implementation(kotlin("stdlib-jdk8")) /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.springframework.boot:spring-boot-starter-web:3.0.4")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared")))
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("com.google.code.gson:gson:2.9.0") implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20210307")
implementation(project(mapOf("path" to ":shared")))
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
//implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common")))
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
implementation(kotlin("stdlib-jdk8"))
} }
tasks.test { tasks.test {

View File

@ -25,7 +25,6 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-web:3.0.4") implementation("org.springframework.boot:spring-boot-starter-web:3.0.4")
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.14.2") implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.14.2")
implementation("org.jetbrains.kotlin:kotlin-reflect") implementation("org.jetbrains.kotlin:kotlin-reflect")
@ -38,8 +37,8 @@ dependencies {
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:common")))
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")

View File

@ -1,6 +1,6 @@
package no.iktdev.streamit.content.ui package no.iktdev.streamit.content.ui
import no.iktdev.mediaprocessing.shared.socket.SocketImplementation import no.iktdev.mediaprocessing.shared.common.socket.SocketImplementation
import org.springframework.beans.factory.annotation.Value import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory
import org.springframework.boot.web.server.WebServerFactoryCustomizer import org.springframework.boot.web.server.WebServerFactoryCustomizer

View File

@ -6,15 +6,11 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.observable.ObservableMap import no.iktdev.exfl.observable.ObservableMap
import no.iktdev.exfl.observable.Observables import no.iktdev.exfl.observable.Observables
import no.iktdev.exfl.observable.observableMapOf import no.iktdev.exfl.observable.observableMapOf
import no.iktdev.mediaprocessing.shared.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.streamit.content.ui.dto.EventDataObject import no.iktdev.streamit.content.ui.dto.EventDataObject
import no.iktdev.streamit.content.ui.dto.ExplorerItem import no.iktdev.streamit.content.ui.dto.ExplorerItem
import no.iktdev.streamit.content.ui.dto.SimpleEventDataObject import no.iktdev.streamit.content.ui.dto.SimpleEventDataObject
import org.apache.kafka.clients.admin.AdminClient
import org.apache.kafka.clients.admin.AdminClientConfig
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -47,7 +43,7 @@ fun main(args: Array<String>) {
}) })
try { try {
val admincli = AdminClient.create(mapOf( /*val admincli = AdminClient.create(mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to KafkaEnv.servers, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to KafkaEnv.servers,
AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG to "1000", AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG to "1000",
AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG to "5000" AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG to "5000"
@ -60,7 +56,7 @@ fun main(args: Array<String>) {
deleteResult.all().whenComplete { result, throwable -> deleteResult.all().whenComplete { result, throwable ->
kafkaClearedLatch.countDown() kafkaClearedLatch.countDown()
} }
} }*/
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()
@ -71,7 +67,7 @@ fun main(args: Array<String>) {
kafkaClearedLatch.await(5, TimeUnit.MINUTES) kafkaClearedLatch.await(5, TimeUnit.MINUTES)
logger.info { "Offset cleared!" } logger.info { "Offset cleared!" }
Thread.sleep(10000) Thread.sleep(10000)
context = runApplication<UIApplication>(*args)
} }

View File

@ -5,7 +5,7 @@ import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.streamit.content.common.CommonConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.streamit.content.ui.explorer.ExplorerCore import no.iktdev.streamit.content.ui.explorer.ExplorerCore
import no.iktdev.streamit.content.ui.fileRegister import no.iktdev.streamit.content.ui.fileRegister
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@ -15,7 +15,7 @@ import java.security.MessageDigest
@Service @Service
class FileRegisterService { class FileRegisterService {
val watcherChannel = CommonConfig.incomingContent.asWatchChannel() val watcherChannel = SharedConfig.incomingContent.asWatchChannel()
val core = ExplorerCore() val core = ExplorerCore()
fun fid(name: String): String { fun fid(name: String): String {

View File

@ -3,7 +3,6 @@ package no.iktdev.streamit.content.ui.socket.internal
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.streamit.content.common.dto.WorkOrderItem
import no.iktdev.streamit.content.ui.UIEnv import no.iktdev.streamit.content.ui.UIEnv
import no.iktdev.streamit.content.ui.dto.EventDataObject import no.iktdev.streamit.content.ui.dto.EventDataObject
import no.iktdev.streamit.content.ui.memActiveEventMap import no.iktdev.streamit.content.ui.memActiveEventMap
@ -22,7 +21,7 @@ class EncoderReaderService {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
fun startSubscription(session: StompSession) { /*fun startSubscription(session: StompSession) {
session.subscribe("/topic/encoder/workorder", object : StompFrameHandler { session.subscribe("/topic/encoder/workorder", object : StompFrameHandler {
override fun getPayloadType(headers: StompHeaders): Type { override fun getPayloadType(headers: StompHeaders): Type {
return object : TypeToken<WorkOrderItem>() {}.type return object : TypeToken<WorkOrderItem>() {}.type
@ -76,6 +75,6 @@ class EncoderReaderService {
init { init {
client.connect(UIEnv.socketEncoder, sessionHandler) client.connect(UIEnv.socketEncoder, sessionHandler)
} }*/
} }

View File

@ -9,11 +9,44 @@ version = "1.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
maven("https://jitpack.io")
maven {
url = uri("https://reposilite.iktdev.no/releases")
}
maven {
url = uri("https://reposilite.iktdev.no/snapshots")
}
} }
val exposedVersion = "0.44.0"
dependencies { dependencies {
implementation("com.github.pgreze:kotlin-process:1.3.1")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.google.code.gson:gson:2.8.9")
implementation("org.json:json:20230227")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("org.jetbrains.exposed:exposed-core:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-dao:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-jdbc:$exposedVersion")
implementation("org.jetbrains.exposed:exposed-java-time:$exposedVersion")
implementation ("mysql:mysql-connector-java:8.0.29")
implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared:contract")))
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("io.mockk:mockk:1.12.0")
testImplementation("com.h2database:h2:1.4.200")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.7.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.7.2")
testImplementation("io.kotlintest:kotlintest-assertions:3.3.2")
} }
tasks.test { tasks.test {

View File

@ -22,7 +22,7 @@ open class DefaultConsumer(val subId: String = UUID.randomUUID().toString()) {
var autoCommit: Boolean = true var autoCommit: Boolean = true
var ackModeOverride: AckMode? = null var ackModeOverride: AckMode? = null
fun consumerFactory(): DefaultKafkaConsumerFactory<String, String> { open fun consumerFactory(): DefaultKafkaConsumerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap() val config: MutableMap<String, Any> = HashMap()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId" config[ConsumerConfig.GROUP_ID_CONFIG] = "${KafkaEnv.consumerId}:$subId"
@ -41,57 +41,6 @@ open class DefaultConsumer(val subId: String = UUID.randomUUID().toString()) {
ackModeOverride?.let { ackModeOverride?.let {
factory.containerProperties.ackMode = it factory.containerProperties.ackMode = it
} }
return factory return factory
} }
class GsonDeserializer : org.apache.kafka.common.serialization.Deserializer<Message<out MessageDataWrapper>> {
private val gson = Gson()
val log = KotlinLogging.logger {}
fun getAnnotatedClasses(): List<Pair<KafkaEvents, KClass<*>>> {
val classesWithAnnotation = AnnotationFinder().getClassesWithAnnotation("no.iktdev.mediaprocessing.shared.kafka.dto.events_result", KafkaBelongsToEvent::class)
.mapNotNull { clazz ->
val annotation = clazz.findAnnotation<KafkaBelongsToEvent>()
annotation?.event?.let { kafkaEvent ->
kafkaEvent to clazz
}
}
classesWithAnnotation.forEach { (event, clazz) ->
println("Event: $event, Class: $clazz")
}
return classesWithAnnotation
}
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
// Ingen ekstra konfigurasjon kreves
}
override fun deserialize(topic: String, data: ByteArray): Message<out MessageDataWrapper> {
val jsonString = try { String(data) } catch (e: Exception) {e.printStackTrace(); null}
return deserialiseJsonString(jsonString)
}
fun deserialiseJsonString(json: String?): Message<out MessageDataWrapper> {
if (json.isNullOrBlank()) {
log.error { "Data is null or empty" }
}
try {
val type = object : TypeToken<Message<out MessageDataWrapper>>() {}.type
return gson.fromJson<Message<MessageDataWrapper>>(json, Message::class.java)
} catch (e: Exception) {
e.printStackTrace()
}
val type = object : TypeToken<Message<out MessageDataWrapper>>() {}.type
return gson.fromJson(json, type)
}
override fun close() {
// Ingen ressurser å lukke
}
}
} }

View File

@ -24,7 +24,7 @@ open class DefaultMessageListener(
protected var container: KafkaMessageListenerContainer<String, String>? = null protected var container: KafkaMessageListenerContainer<String, String>? = null
fun listen() { open fun listen() {
val listener = consumer.consumerFactoryListener() val listener = consumer.consumerFactoryListener()
val containerProperties = ContainerProperties(topic).apply { val containerProperties = ContainerProperties(topic).apply {
messageListener = this@DefaultMessageListener messageListener = this@DefaultMessageListener

View File

@ -11,23 +11,27 @@ import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory import org.springframework.kafka.core.ProducerFactory
open class DefaultProducer(val topic: String) { open class DefaultProducer(val topic: String) {
private val producerFactory: ProducerFactory<String, String> private var kafkaTemplate: KafkaTemplate<String, String>? = null
open fun createKafkaTemplate(): KafkaTemplate<String, String> {
val producerFactory: ProducerFactory<String, String>
init {
val config: MutableMap<String, Any> = HashMap() val config: MutableMap<String, Any> = HashMap()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
producerFactory = DefaultKafkaProducerFactory(config) producerFactory = DefaultKafkaProducerFactory(config)
}
fun createKafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory) return KafkaTemplate(producerFactory)
} }
fun sendMessage(key: String, message: Message<MessageDataWrapper>) { open fun usingKafkaTemplate(): KafkaTemplate<String, String> {
val kafkaTemplate = createKafkaTemplate() return kafkaTemplate ?: createKafkaTemplate().also { kafkaTemplate = it }
}
open fun sendMessage(key: String, message: Message<MessageDataWrapper>) {
val kafkaTemplate = usingKafkaTemplate()
val serializedMessage = serializeMessage(message) val serializedMessage = serializeMessage(message)
kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage)) kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage))
} }

View File

@ -1,8 +1,12 @@
import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.ObjectMapper
import com.google.gson.Gson import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
@ -16,25 +20,20 @@ class SerializationTest {
val message = Message( val message = Message(
"d2fb1472-ebdd-4fce-9ffd-7202a1ad911d", "d2fb1472-ebdd-4fce-9ffd-7202a1ad911d",
"01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e", "01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e",
data = MockData( data = ProcessStarted(
Status.COMPLETED, Status.COMPLETED,
"Test" ProcessType.MANUAL,
file = "Potato.mp4"
)) ))
val json = gson.toJson(message) val json = gson.toJson(message)
val objectMapper = ObjectMapper() val deserializer = DeserializingRegistry()
val result = objectMapper.readValue(json, Message::class.java) val result = deserializer.deserialize(KafkaEvents.EVENT_PROCESS_STARTED, json)
assertThat(result.data).isInstanceOf(MockData::class.java) assertThat(result.data).isInstanceOf(ProcessStarted::class.java)
} }
@Test
fun getAnnotatedClasses() {
val serializer = DefaultConsumer.GsonDeserializer()
val result = serializer.getAnnotatedClasses()
assertThat(result).isNotEmpty()
}