From 3d119813ddf5c8a1a9b2fdb2552067d3150cf2ac Mon Sep 17 00:00:00 2001 From: Brage Date: Wed, 13 Dec 2023 18:26:16 +0100 Subject: [PATCH] WIP - Looping issue --- apps/converter/build.gradle.kts | 14 +- apps/coordinator/build.gradle.kts | 32 +++- .../coordinator/Coordinator.kt | 83 ++++----- .../coordinator/CoordinatorApplication.kt | 9 +- .../coordinator/Implementations.kt | 1 - .../coordinator/MessageOperator.kt | 8 + .../mediaprocessing/coordinator/Task.kt | 18 ++ .../coordination/MessageSequence.kt | 5 + .../coordinator/reader/BaseInfoFromFile.kt | 41 +++-- .../coordinator/reader/MediaStreamsAnalyze.kt | 10 +- .../reader/ParseVideoFileStreams.kt | 43 ++--- .../reader/ReadVideoFileStreams.kt | 40 +++-- ...etadataAndBaseInfoToFileOutAndCoverTask.kt | 27 +-- .../event/OutNameToWorkArgumentCreator.kt | 17 +- .../src/main/resources/application.properties | 3 + .../reader/BaseInfoFromFileTest.kt | 84 +++++++-- .../coordinator/reader/KafkaTestBase.kt | 32 ++++ .../reader/ParseVideoFileStreamsTest.kt | 168 ++++++++++++++++++ .../processer/EncodeService.kt | 2 +- .../shared/common/ProcessingService.kt | 15 +- .../shared/common/SharedConfig.kt | 1 - .../datasource/TableDefaultOperations.kt | 17 ++ .../common/parsing/FileNameDeterminate.kt | 4 +- .../common/persistance/PersistentDataStore.kt | 14 +- .../common/persistance/PersistentMessage.kt | 4 +- shared/kafka/build.gradle.kts | 12 +- .../kafka/core/DefaultMessageListener.kt | 33 ++-- .../kafka/core/DeserializingRegistry.kt | 11 +- .../shared/kafka/core/KafkaEnv.kt | 7 +- .../shared/kafka/core/KafkaImplementation.kt | 2 + .../shared/kafka/dto/MessageDataWrapper.kt | 5 + .../MediaStreamsParsePerformed.kt | 3 +- .../dto/events_result/ReaderPerformed.kt | 3 +- .../dto/events_result/VideoInfoPerformed.kt | 11 +- .../shared/kafka/KafkaTestBase.kt | 29 +++ .../shared/kafka}/SerializationTest.kt | 4 +- 36 files changed, 627 insertions(+), 185 deletions(-) create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt create mode 100644 apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt create mode 100644 apps/coordinator/src/main/resources/application.properties create mode 100644 apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/KafkaTestBase.kt create mode 100644 apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt create mode 100644 shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/KafkaTestBase.kt rename shared/kafka/src/test/kotlin/{ => no/iktdev/mediaprocessing/shared/kafka}/SerializationTest.kt (91%) diff --git a/apps/converter/build.gradle.kts b/apps/converter/build.gradle.kts index 7adffbe5..d2ebe0bb 100644 --- a/apps/converter/build.gradle.kts +++ b/apps/converter/build.gradle.kts @@ -11,23 +11,31 @@ version = "1.0-SNAPSHOT" repositories { mavenCentral() + maven("https://jitpack.io") + maven { + url = uri("https://reposilite.iktdev.no/releases") + } + maven { + url = uri("https://reposilite.iktdev.no/snapshots") + } } dependencies { + /*Spring boot*/ implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter:2.7.0") + // implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") + implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") + implementation(project(mapOf("path" to ":shared:kafka"))) implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:common"))) - - testImplementation(platform("org.junit:junit-bom:5.9.1")) - testImplementation("org.junit.jupiter:junit-jupiter") } tasks.test { diff --git a/apps/coordinator/build.gradle.kts b/apps/coordinator/build.gradle.kts index cfaca258..20868ef7 100644 --- a/apps/coordinator/build.gradle.kts +++ b/apps/coordinator/build.gradle.kts @@ -20,12 +20,13 @@ repositories { } } + val exposedVersion = "0.44.0" dependencies { /*Spring boot*/ implementation("org.springframework.boot:spring-boot-starter-web") - implementation("org.springframework.boot:spring-boot-starter:2.7.0") + implementation("org.springframework.boot:spring-boot-starter:3.2.0") // implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") @@ -53,18 +54,35 @@ dependencies { - testImplementation(platform("org.junit:junit-bom:5.9.1")) - testImplementation("org.junit.jupiter:junit-jupiter") implementation(kotlin("stdlib-jdk8")) - testImplementation("org.junit.jupiter:junit-jupiter:5.8.1") - testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1") testImplementation("org.assertj:assertj-core:3.21.0") - testImplementation(project(mapOf("path" to ":shared:common"))) + + testImplementation("junit:junit:4.12") + testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.0") + testImplementation("org.skyscreamer:jsonassert:1.5.0") + testImplementation("org.mockito:mockito-core:3.+") + + testImplementation(platform("org.junit:junit-bom:5.9.1")) + testImplementation("org.assertj:assertj-core:3.4.1") + testImplementation("org.mockito:mockito-core:3.+") + testImplementation("org.assertj:assertj-core:3.4.1") + + /*testImplementation("org.junit.vintage:junit-vintage-engine") + testImplementation("org.junit.jupiter:junit-jupiter:5.10.1") + testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1") + testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.1") + testRuntimeOnly ("org.junit.jupiter:junit-jupiter-engine:5.10.1") + testImplementation("org.mockito:mockito-core:5.8.0") // Oppdater versjonen hvis det er nyere tilgjengelig + testImplementation("org.mockito:mockito-junit-jupiter:5.8.0") + testImplementation(platform("org.junit:junit-bom:5.10.1")) + testImplementation("org.junit.platform:junit-platform-runner:1.10.1")*/ + } -tasks.test { +tasks.withType { useJUnitPlatform() } + kotlin { jvmToolchain(17) } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt index 2c2b2982..4af3cd2c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt @@ -4,24 +4,32 @@ import com.google.gson.Gson import kotlinx.coroutines.launch import mu.KotlinLogging import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ProcessType +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File import java.util.UUID +import javax.annotation.PostConstruct @Service class Coordinator() { - val producer = CoordinatorProducer() + + @Autowired + private lateinit var producer: CoordinatorProducer + + @Autowired + private lateinit var listener: DefaultMessageListener + private val log = KotlinLogging.logger {} @@ -97,18 +105,12 @@ class Coordinator() { val io = Coroutines.io() - private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> - val success = PersistentDataStore().storeMessage(event.key.event, event.value) - if (!success) { - log.error { "Unable to store message: ${event.key.event} in database!" } - } else - readAllMessagesFor(event.value.referenceId, event.value.eventId) - } fun readAllMessagesFor(referenceId: String, eventId: String) { + val messages = PersistentDataReader().getMessagesFor(referenceId) + createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) + io.launch { - val messages = PersistentDataReader().getMessagesFor(referenceId) - createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages) buildModelBasedOnMessagesFor(referenceId, messages) } } @@ -120,40 +122,41 @@ class Coordinator() { } fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List) { - io.launch { - val triggered = messages.find { it.eventId == eventId } ?: return@launch - listeners.forEach { it.onEventReceived(referenceId, triggered, messages) } - if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) { - val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted - if (processStarted.type == ProcessType.FLOW) { - log.info { "Process for $referenceId was started from flow and will be processed" } - if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { - produceEncodeWork(triggered) - } else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { - produceExtractWork(triggered) - } - } else { - log.info { "Process for $referenceId was started manually and will require user input for continuation" } + val triggered = messages.find { it.eventId == eventId } + if (triggered == null) { + log.error { "Could not find $eventId in provided messages" } + return + } + listeners.forEach { it.onEventReceived(referenceId, triggered, messages) } + + if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) { + val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted + + if (processStarted.type == ProcessType.FLOW) { + log.info { "Process for $referenceId was started from flow and will be processed" } + if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { + produceEncodeWork(triggered) + } else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) { + produceExtractWork(triggered) } + } else { + log.info { "Process for $referenceId was started manually and will require user input for continuation" } } } } - - - init { - io.launch { listener.listen() } + @PostConstruct + fun onReady() { + io.launch { + listener.onMessageReceived = { event -> + val success = PersistentDataStore().storeMessage(event.key.event, event.value) + if (!success) { + log.error { "Unable to store message: ${event.key.event} in database!" } + } else + readAllMessagesFor(event.value.referenceId, event.value.eventId) + } + listener.listen(KafkaEnv.kafkaTopic) } } } -abstract class TaskCreator: TaskCreatorListener { - val producer = CoordinatorProducer() - open fun isPrerequisitesOk(events: List): Boolean { - return true - } -} - -interface TaskCreatorListener { - fun onEventReceived(referenceId: String, event: PersistentMessage, events: List): Unit -} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt index 9bb256c8..cca8ee3b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt @@ -8,6 +8,7 @@ import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.persistance.events +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.runApplication import org.springframework.context.ApplicationContext @@ -26,19 +27,19 @@ fun getContext(): ApplicationContext? { } fun main(args: Array) { - // val dataSource = MySqlDataSource.fromDatabaseEnv(); - /*Coroutines.default().launch { + val dataSource = MySqlDataSource.fromDatabaseEnv(); + Coroutines.default().launch { dataSource.createDatabase() dataSource.createTables( events ) - }*/ + } context = runApplication(*args) printSharedConfig() } fun printSharedConfig() { - log.info { "Kafka topic: ${SharedConfig.kafkaTopic}" } + log.info { "Kafka topic: ${KafkaEnv.kafkaTopic}" } log.info { "File Input: ${SharedConfig.incomingContent}" } log.info { "File Output: ${SharedConfig.outgoingContent}" } log.info { "Ffprobe: ${SharedConfig.ffprobe}" } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt index 9212a982..6bdca54f 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Implementations.kt @@ -6,7 +6,6 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import -import org.springframework.stereotype.Component @Configuration class SocketLocalInit: SocketImplementation() diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt new file mode 100644 index 00000000..eaff68cf --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/MessageOperator.kt @@ -0,0 +1,8 @@ +package no.iktdev.mediaprocessing.coordinator + +import org.springframework.stereotype.Service + +@Service +class MessageOperator { + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt new file mode 100644 index 00000000..244a5196 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Task.kt @@ -0,0 +1,18 @@ +package no.iktdev.mediaprocessing.coordinator + +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents +import org.springframework.beans.factory.annotation.Autowired + +abstract class TaskCreator: TaskCreatorListener { + @Autowired + lateinit var producer: CoordinatorProducer + open fun isPrerequisitesOk(events: List): Boolean { + return true + } +} + +interface TaskCreatorListener { + fun onEventReceived(referenceId: String, event: PersistentMessage, events: List): Unit +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt new file mode 100644 index 00000000..99554637 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/coordination/MessageSequence.kt @@ -0,0 +1,5 @@ +package no.iktdev.mediaprocessing.coordinator.coordination + +class MessageSequence { + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt index bc637b2a..25b0ec13 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFile.kt @@ -1,41 +1,44 @@ package no.iktdev.mediaprocessing.coordinator.reader import kotlinx.coroutines.launch +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener import no.iktdev.mediaprocessing.shared.common.ProcessingService -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File -class BaseInfoFromFile(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( - SharedConfig.kafkaTopic)): ProcessingService(producer, listener) { +@Service +class BaseInfoFromFile(@Autowired var coordinator: Coordinator): ProcessingService() { - init { - listener.onMessageReceived = { event -> - val message = event.value - if (message.data is ProcessStarted) { - io.launch { - val result = readFileInfo(message.data as ProcessStarted) - onResult(message.referenceId, result) - } - } - } - io.launch { - listener.listen() - } - } override fun onResult(referenceId: String, data: MessageDataWrapper) { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data) } + override fun onReady() { + coordinator.addListener(object : TaskCreatorListener { + override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { + if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) { + io.launch { + val result = readFileInfo(event.data as ProcessStarted) + onResult(referenceId, result) + } + } + } + + }) + } + fun readFileInfo(started: ProcessStarted): MessageDataWrapper { val result = try { val fileName = File(started.file).nameWithoutExtension diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt index 235f7486..62618270 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/MediaStreamsAnalyze.kt @@ -1,21 +1,17 @@ package no.iktdev.mediaprocessing.coordinator.reader import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener -import no.iktdev.streamit.library.kafka.dto.Status -import org.springframework.stereotype.Service + class MediaStreamsAnalyze { val io = Coroutines.io() - +/* val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) { if (event.value.data?.status == Status.COMPLETED) { } } - } + }*/ } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt index ffb5e473..b32ac133 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreams.kt @@ -1,51 +1,52 @@ package no.iktdev.mediaprocessing.coordinator.reader import com.google.gson.Gson -import com.google.gson.JsonObject import kotlinx.coroutines.launch +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener import no.iktdev.mediaprocessing.shared.common.ProcessingService -import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service - -class ParseVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( - SharedConfig.kafkaTopic)): ProcessingService(producer, listener) { +@Service +class ParseVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { override fun onResult(referenceId: String, data: MessageDataWrapper) { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data) } - init { - listener.onMessageReceived = { event -> - val message = event.value - if (message.data is ReaderPerformed) { - io.launch { - val result = parseStreams(message.data as ReaderPerformed) - onResult(message.referenceId, result) + override fun onReady() { + coordinator.addListener(object : TaskCreatorListener { + override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { + if (event.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED && event.data.isSuccess()) { + io.launch { + val result = parseStreams(event.data as ReaderPerformed) + onResult(referenceId, result) + } } } - } - io.launch { - listener.listen() - } + + }) } fun parseStreams(data: ReaderPerformed): MessageDataWrapper { val gson = Gson() return try { - val jsonObject = gson.fromJson(data.output, JsonObject::class.java) - val jStreams = jsonObject.getAsJsonArray("streams") + val jStreams = data.output.getAsJsonArray("streams") val videoStreams = mutableListOf() val audioStreams = mutableListOf() @@ -70,7 +71,7 @@ class ParseVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer( audioStream = audioStreams, subtitleStream = subtitleStreams ) - MessageDataWrapper(Status.COMPLETED, gson.toJson(parsedStreams)) + MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams) } catch (e: Exception) { e.printStackTrace() diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt index 21307877..85690cd8 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ReadVideoFileStreams.kt @@ -1,48 +1,54 @@ package no.iktdev.mediaprocessing.coordinator.reader +import com.google.gson.Gson +import com.google.gson.JsonObject import kotlinx.coroutines.launch +import no.iktdev.mediaprocessing.coordinator.Coordinator +import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener import no.iktdev.mediaprocessing.shared.common.ProcessingService import no.iktdev.mediaprocessing.shared.common.SharedConfig -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed +import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status +import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import java.io.File -class ReadVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( - SharedConfig.kafkaTopic) -): ProcessingService(producer, listener) { +@Service +class ReadVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() { override fun onResult(referenceId: String, data: MessageDataWrapper) { producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data) } - init { - listener.onMessageReceived = { event -> - val message = event.value - if (message.data is ProcessStarted) { - io.launch { - val result = fileReadStreams(message.data as ProcessStarted) - onResult(message.referenceId, result) + override fun onReady() { + coordinator.addListener(object : TaskCreatorListener { + override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { + if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) { + io.launch { + val result = fileReadStreams(event.data as ProcessStarted) + onResult(referenceId, result) + } } } - } - io.launch { - listener.listen() - } + + }) } suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { val file = File(started.file) return if (file.exists() && file.isFile) { val result = readStreams(file) - ReaderPerformed(Status.COMPLETED, file = started.file, output = result.output.joinToString("\n")) + val joined = result.output.joinToString(" ") + val jsoned = Gson().fromJson(joined, JsonObject::class.java) + ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned) } else { MessageDataWrapper(Status.ERROR, "File in data is not a file or does not exist") } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt index 214fd881..4818162a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOutAndCoverTask.kt @@ -3,18 +3,20 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event import mu.KotlinLogging import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator -import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener +import no.iktdev.mediaprocessing.coordinator.TaskCreator import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage +import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.streamit.library.kafka.dto.Status import org.springframework.beans.factory.annotation.Autowired +import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Service import java.time.LocalDateTime @@ -23,20 +25,17 @@ import java.time.LocalDateTime * */ @Service -class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreatorListener { +@EnableScheduling +class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreator() { private val log = KotlinLogging.logger {} init { coordinator.addListener(this) } - val producer = CoordinatorProducer() val waitingProcessesForMeta: MutableMap = mutableMapOf() override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { - if (!listOf( - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, - KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) - .contains(event.event)) { + if (!listOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED).contains(event.event)) { return } @@ -48,8 +47,9 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord return } if (baseInfo.isSuccess() && meta == null) { + log.info { "Sending ${baseInfo?.title} to waiting queue" } if (!waitingProcessesForMeta.containsKey(referenceId)) { - waitingProcessesForMeta[referenceId] + waitingProcessesForMeta[referenceId] = LocalDateTime.now() } return } @@ -104,14 +104,15 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord } - @Scheduled(fixedDelay = (60_000)) + //@Scheduled(fixedDelay = (60_000)) + @Scheduled(fixedDelay = (1_000)) fun sendErrorMessageForMetadata() { - //val timeThresholdInMinutes = 10 * 60_000 val expired = waitingProcessesForMeta.filter { - LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + 10 * 60) + LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + KafkaEnv.metadataTimeoutMinutes * 60) } expired.forEach { - producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MessageDataWrapper(status = Status.ERROR, "Timed Out by: ${this::javaClass.name}")) + log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" } + producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOutAndCoverTask::class.simpleName}")) waitingProcessesForMeta.remove(it.key) } } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt index fc2e536f..5c6ea92b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/OutNameToWorkArgumentCreator.kt @@ -31,17 +31,19 @@ class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : Ta override fun isPrerequisitesOk(events: List): Boolean { val required = listOf( - KafkaEvents.EVENT_PROCESS_STARTED.event, - KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED.event, - KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED.event + KafkaEvents.EVENT_PROCESS_STARTED, + KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, + KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, + KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE ) - return events.filter { it.eventId in required }.all { it.data.isSuccess() } + val currentEvents = events.map { it.event } + val hasAllRequiredEvents = required.all { currentEvents.contains(it) } + val hasAllRequiredData = events.filter { e -> e.event in required }.all { it.data.isSuccess() } + return hasAllRequiredData && hasAllRequiredEvents } override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List) { val preference = Preference.getPreference() - if (event.event != KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED) - return if (!isPrerequisitesOk(events)) { return @@ -49,8 +51,7 @@ class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : Ta val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed - val serializedParsedStreams = - Gson().fromJson(readStreamsEvent.parsedAsJson, ParsedMediaStreams::class.java) + val serializedParsedStreams = readStreamsEvent.streams val outDir = SharedConfig.outgoingContent.using(baseInfo.title) diff --git a/apps/coordinator/src/main/resources/application.properties b/apps/coordinator/src/main/resources/application.properties new file mode 100644 index 00000000..6dd22f9b --- /dev/null +++ b/apps/coordinator/src/main/resources/application.properties @@ -0,0 +1,3 @@ +spring.output.ansi.enabled=always +logging.level.org.apache.kafka=INFO +logging.level.root=INFO diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt index 2eb76bf7..299e8ce9 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt @@ -1,25 +1,37 @@ package no.iktdev.mediaprocessing.coordinator.reader - -import TestKafka +/* +import com.google.gson.Gson +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.contract.ProcessType -import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents -import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.streamit.library.kafka.dto.Status import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Named import org.junit.jupiter.api.Test +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.skyscreamer.jsonassert.JSONAssert +import org.springframework.beans.factory.annotation.Autowired import java.io.File -import java.util.UUID class BaseInfoFromFileTest { - val referenceId = UUID.randomUUID().toString() - val baseInfoFromFile = BaseInfoFromFile(TestKafka.producer, TestKafka.listener) + + @Autowired + private lateinit var testBase: KafkaTestBase + + @Mock + lateinit var coordinatorProducer: CoordinatorProducer + + + + val baseInfoFromFile = BaseInfoFromFile(coordinatorProducer) @Test fun testReadFileInfo() { - val input = ProcessStarted(Status.COMPLETED, ProcessType.FLOW, + val input = ProcessStarted( + Status.COMPLETED, ProcessType.FLOW, File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath ) @@ -31,6 +43,58 @@ class BaseInfoFromFileTest { assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01") } + @ParameterizedTest + @MethodSource("names") + fun test(data: TestInfo) { + val gson = Gson() + val result = baseInfoFromFile.readFileInfo(data.input) + JSONAssert.assertEquals( + data.expected, + gson.toJson(result), + false + ) + } + data class TestInfo( + val input: ProcessStarted, + val expected: String + ) -} \ No newline at end of file + companion object { + @JvmStatic + private fun names(): List> { + return listOf( + Named.of( + "Potato", TestInfo( + ProcessStarted( + Status.COMPLETED, ProcessType.FLOW, + "E:\\input\\Top Clown Findout.1080p.H264.AAC5.1.mkv" + ), + """ + { + "status": "COMPLETED", + "title": "Top Clown Findout", + "sanitizedName": "Top Clown Findout" + } + """.trimIndent() + ) + ), + Named.of("Filename with UHD wild tag", TestInfo( + ProcessStarted( + Status.COMPLETED, ProcessType.FLOW, + "E:\\input\\Wicked.Potato.Chapter.1.2023.UHD.BluRay.2160p.DDP.7.1.DV.HDR.x265.mp4" + ), + """ + { + "status": "COMPLETED", + "title": "Wicked Potato Chapter 1", + "sanitizedName": "Wicked Potato Chapter 1" + } + """.trimIndent() + ) + ) + ) + } + } + +}*/ \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/KafkaTestBase.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/KafkaTestBase.kt new file mode 100644 index 00000000..391a51b6 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/KafkaTestBase.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing.coordinator.reader +/* +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer +import org.apache.kafka.clients.admin.AdminClient +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.InjectMocks +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.springframework.kafka.core.KafkaTemplate +import org.springframework.kafka.core.ProducerFactory + +@ExtendWith(MockitoExtension::class) +class KafkaTestBase { + + @Mock + lateinit var kafkaTemplate: KafkaTemplate + + @Mock + lateinit var adminClient: AdminClient + + @InjectMocks + lateinit var defaultProducer: DefaultProducer + + @InjectMocks + lateinit var defaultConsumer: DefaultConsumer + + @InjectMocks + lateinit var defaultListener: DefaultMessageListener + +}*/ \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt new file mode 100644 index 00000000..5484e410 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt @@ -0,0 +1,168 @@ +package no.iktdev.mediaprocessing.coordinator.reader +/* +import com.google.gson.Gson +import com.google.gson.JsonObject +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed +import no.iktdev.streamit.library.kafka.dto.Status +import org.junit.jupiter.api.Test + +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Named +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource +import org.mockito.Mock +import org.skyscreamer.jsonassert.JSONAssert +import org.springframework.beans.factory.annotation.Autowired + +class ParseVideoFileStreamsTest { + + @Autowired + private lateinit var testBase: KafkaTestBase + + @Mock + lateinit var coordinatorProducer: CoordinatorProducer + + val parseVideoStreams = ParseVideoFileStreams(coordinatorProducer) + + @ParameterizedTest + @MethodSource("streams") + fun parseStreams(data: TestInfo) { + val gson = Gson() + val converted = gson.fromJson(data.input, JsonObject::class.java) + val result = parseVideoStreams.parseStreams(ReaderPerformed( + Status.COMPLETED, + file = "ignore", + output = converted + )) + JSONAssert.assertEquals( + data.expected, + gson.toJson(result), + false + ) + + } + + data class TestInfo( + val input: String, + val expected: String + ) + + companion object { + @JvmStatic + fun streams(): List> { + return listOf( + Named.of( + "Top Clown streams", TestInfo( + """ + { + "streams": [ + { + "index": 0, + "codec_name": "h264", + "codec_long_name": "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10", + "profile": "Main", + "codec_type": "video", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "width": 1920, + "height": 1080, + "coded_width": 1920, + "coded_height": 1080, + "closed_captions": 0, + "film_grain": 0, + "has_b_frames": 0, + "sample_aspect_ratio": "1:1", + "display_aspect_ratio": "16:9", + "pix_fmt": "yuv420p", + "level": 40, + "chroma_location": "left", + "field_order": "progressive", + "refs": 1, + "is_avc": "true", + "nal_length_size": "4", + "r_frame_rate": "24000/1001", + "avg_frame_rate": "24000/1001", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "bits_per_raw_sample": "8", + "extradata_size": 55, + "disposition": { + "default": 1, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 0, + "hearing_impaired": 0, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + } + }, + { + "index": 1, + "codec_name": "aac", + "codec_long_name": "AAC (Advanced Audio Coding)", + "profile": "HE-AAC", + "codec_type": "audio", + "codec_tag_string": "[0][0][0][0]", + "codec_tag": "0x0000", + "sample_fmt": "fltp", + "sample_rate": "48000", + "channels": 6, + "channel_layout": "5.1", + "bits_per_sample": 0, + "initial_padding": 0, + "r_frame_rate": "0/0", + "avg_frame_rate": "0/0", + "time_base": "1/1000", + "start_pts": 0, + "start_time": "0.000000", + "extradata_size": 2, + "disposition": { + "default": 1, + "dub": 0, + "original": 0, + "comment": 0, + "lyrics": 0, + "karaoke": 0, + "forced": 0, + "hearing_impaired": 0, + "visual_impaired": 0, + "clean_effects": 0, + "attached_pic": 0, + "timed_thumbnails": 0, + "non_diegetic": 0, + "captions": 0, + "descriptions": 0, + "metadata": 0, + "dependent": 0, + "still_image": 0 + }, + "tags": { + "language": "eng" + } + } + ] + } + + """.trimIndent(), + """ + + """.trimIndent() + ) + ) + ) + } + } +}*/ \ No newline at end of file diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt index 24b9ac0f..75d84588 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/EncodeService.kt @@ -5,7 +5,7 @@ import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.mediaprocessing.shared.common.SharedConfig import org.springframework.stereotype.Service -//@Service +@Service class EncodeService { /*private val log = KotlinLogging.logger {} val io = Coroutines.io() diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt index 842eca67..e948b4cf 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/ProcessingService.kt @@ -1,14 +1,23 @@ package no.iktdev.mediaprocessing.shared.common import no.iktdev.exfl.coroutines.Coroutines -import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer +import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.context.annotation.Import import org.springframework.stereotype.Service +import javax.annotation.PostConstruct @Service -abstract class ProcessingService(var producer: CoordinatorProducer, var listener: DefaultMessageListener) { +@Import(DefaultMessageListener::class) +abstract class ProcessingService() { val io = Coroutines.io() - abstract fun onResult(referenceId: String, data: MessageDataWrapper) + @Autowired + lateinit var producer: CoordinatorProducer + + abstract fun onResult(referenceId: String, data: MessageDataWrapper) + @PostConstruct + abstract fun onReady(): Unit } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index e767447d..e831a2f0 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common import java.io.File object SharedConfig { - var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt index 740275f7..8a680882 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt @@ -44,6 +44,23 @@ fun insertWithSuccess(block: () -> T): Boolean { } } +fun executeOrException(block: () -> T): Exception? { + return try { + transaction { + try { + block() + null + } catch (e: Exception) { + // log the error here or handle the exception as needed + e + } + } + } catch (e: Exception) { + e.printStackTrace() + return e + } +} + fun executeWithStatus(block: () -> T): Boolean { return try { transaction { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt index 86c93e27..5481f648 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/parsing/FileNameDeterminate.kt @@ -29,7 +29,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp else -> sanitizedName } val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped - return MovieInfo(cleanup(nonResolutioned), cleanup(nonResolutioned)) + return MovieInfo(type = "movie", cleanup(nonResolutioned), cleanup(nonResolutioned)) } private fun determineSerieFileName(): EpisodeInfo? { @@ -58,7 +58,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp } } else title val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() - return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) + return EpisodeInfo(type= "serie", title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) } private fun determineUndefinedFileName(): VideoInfo? { diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt index 925373e6..295bcea9 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt @@ -1,12 +1,15 @@ package no.iktdev.mediaprocessing.shared.common.persistance +import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.kafka.dto.Message +import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.sql.insert +import java.sql.SQLIntegrityConstraintViolationException open class PersistentDataStore { fun storeMessage(event: String, message: Message<*>): Boolean { - return executeWithStatus { + val exception = executeOrException { events.insert { it[events.referenceId] = message.referenceId it[events.eventId] = message.eventId @@ -14,6 +17,15 @@ open class PersistentDataStore { it[events.data] = message.dataAsJson() } } + return if (exception == null) true else { + if (exception.cause is SQLIntegrityConstraintViolationException) { + (exception as ExposedSQLException).errorCode == 1062 + } + else { + exception.printStackTrace() + false + } + } } } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt index 2a30d67f..e36bd085 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentMessage.kt @@ -16,11 +16,11 @@ data class PersistentMessage( fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { val kev = try { - KafkaEvents.valueOf(row[events.event]) + KafkaEvents.toEvent(row[events.event]) } catch (e: IllegalArgumentException) { e.printStackTrace() return null - } + }?: return null val dzdata = dez.deserializeData(kev, row[events.data]) return PersistentMessage( referenceId = row[events.referenceId], diff --git a/shared/kafka/build.gradle.kts b/shared/kafka/build.gradle.kts index c772f154..61bc26aa 100644 --- a/shared/kafka/build.gradle.kts +++ b/shared/kafka/build.gradle.kts @@ -15,7 +15,7 @@ dependencies { implementation("com.google.code.gson:gson:2.8.9") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") - implementation("org.springframework.kafka:spring-kafka:3.0.1") + implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0") implementation(project(mapOf("path" to ":shared:contract"))) @@ -27,13 +27,15 @@ dependencies { testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("junit:junit:4.13.2") - testImplementation("org.junit.jupiter:junit-jupiter") - testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1") - testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1") + testImplementation("org.junit.jupiter:junit-jupiter:5.8.1") testImplementation("org.assertj:assertj-core:3.4.1") testImplementation("org.mockito:mockito-core:3.+") testImplementation("org.assertj:assertj-core:3.4.1") + testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1") + testImplementation("org.junit.jupiter:junit-jupiter-engine:5.8.1") + testImplementation("org.mockito:mockito-core:3.10.0") // Oppdater versjonen hvis det er nyere tilgjengelig + testImplementation("org.mockito:mockito-junit-jupiter:3.10.0") } tasks.test { @@ -41,4 +43,4 @@ tasks.test { } kotlin { jvmToolchain(17) -} \ No newline at end of file +} diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultMessageListener.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultMessageListener.kt index bcfde8e1..ac3f4bc2 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultMessageListener.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DefaultMessageListener.kt @@ -5,27 +5,37 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import org.apache.kafka.clients.consumer.ConsumerRecord +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.MessageListener +import org.springframework.stereotype.Component import java.lang.IllegalArgumentException import java.util.* +@Component open class DefaultMessageListener( - open val topic: String, - open val consumer: DefaultConsumer = DefaultConsumer(subId = UUID.randomUUID().toString()), - open var onMessageReceived: (DeserializedConsumerRecord>) -> Unit = {} -) - : MessageListener { - +) : MessageListener { private val logger = KotlinLogging.logger {} + + @Autowired + private lateinit var consumerFactory: ConsumerFactory + + var onMessageReceived: (DeserializedConsumerRecord>) -> Unit = { + logger.warn { "onMessageReceived has no listener" } + } + private val deserializer = DeserializingRegistry() protected var container: KafkaMessageListenerContainer? = null - open fun listen() { - val listener = consumer.consumerFactoryListener() + open fun listen(topic: String) { + val listener = ConcurrentKafkaListenerContainerFactory().apply { + consumerFactory = this@DefaultMessageListener.consumerFactory + } val containerProperties = ContainerProperties(topic).apply { messageListener = this@DefaultMessageListener } @@ -46,7 +56,7 @@ open class DefaultMessageListener( override fun onMessage(data: ConsumerRecord) { val event = try { - KafkaEvents.valueOf(data.key()) + KafkaEvents.toEvent(data.key()) } catch (e: IllegalArgumentException) { logger.error { "${data.key()} is not a member of KafkaEvents" } null @@ -60,7 +70,10 @@ open class DefaultMessageListener( } -private fun ConsumerRecord.toDeserializedConsumerRecord(keyzz: KDez, valuezz: VDez): DeserializedConsumerRecord { +private fun ConsumerRecord.toDeserializedConsumerRecord( + keyzz: KDez, + valuezz: VDez +): DeserializedConsumerRecord { return DeserializedConsumerRecord( topic = this.topic(), partition = this.partition(), diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt index 4d015808..73464e69 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt @@ -4,6 +4,7 @@ import com.google.gson.Gson import com.google.gson.reflect.TypeToken import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper +import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import java.lang.reflect.Type import kotlin.reflect.KClass @@ -64,7 +65,15 @@ class DeserializingRegistry { e.printStackTrace() } } - // Fallback + try { + // Fallback + val type = object : TypeToken() {}.type + return gson.fromJson(json, type) + } catch (e: Exception) { + e.printStackTrace() + } + + // Default val type = object : TypeToken() {}.type return gson.fromJson(json, type) } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt index 50a24e31..01728887 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEnv.kt @@ -6,10 +6,15 @@ class KafkaEnv { companion object { val servers: String = System.getenv("KAFKA_BOOTSTRAP_SERVER") ?: "127.0.0.1:9092" var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}" - var enabled: Boolean = System.getenv("KAFKA_ENABLED").toBoolean() val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest" var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" + val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10 + val heartbeatIntervalMilliseconds: Int = System.getenv("KAFKA_HEARTBEAT_INTERVAL_MS")?.toIntOrNull() ?: 2000 + val sessionTimeOutMilliseconds: Int = System.getenv("KAFKA_SESSION_INACTIVITY_MS")?.toIntOrNull() ?: (listOf( + metadataTimeoutMinutes, + heartbeatIntervalMilliseconds + ).max() * 60) } } \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt index 2d7ce3aa..8acf5e43 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaImplementation.kt @@ -41,6 +41,8 @@ open class KafkaImplementation { config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages + config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds + config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds log.info { config } return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) } diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt index efd7af47..ae993a55 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt @@ -1,6 +1,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto import com.google.gson.Gson +import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.streamit.library.kafka.dto.Status import java.io.Serializable import java.lang.reflect.Type @@ -12,6 +13,10 @@ open class MessageDataWrapper( @Transient open val message: String? = null ) +data class SimpleMessageData( + override val status: Status, + override val message: String? +) : MessageDataWrapper(status, message) fun MessageDataWrapper?.isSuccess(): Boolean { diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt index e8ee2a9a..649ccac2 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result +import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -8,6 +9,6 @@ import no.iktdev.streamit.library.kafka.dto.Status @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED) data class MediaStreamsParsePerformed( override val status: Status, - val parsedAsJson: String + val streams: ParsedMediaStreams ): MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt index 033e97ff..6e560514 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt @@ -1,5 +1,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result +import com.google.gson.JsonObject import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper @@ -9,5 +10,5 @@ import no.iktdev.streamit.library.kafka.dto.Status data class ReaderPerformed( override val status: Status, val file: String, //AbsolutePath - val output: String + val output: JsonObject ) : MessageDataWrapper(status) \ No newline at end of file diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt index e94ce7d9..6f9e7eb2 100644 --- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt +++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt @@ -5,23 +5,25 @@ import no.iktdev.streamit.library.kafka.dto.Status data class VideoInfoPerformed( override val status: Status, - val data: VideoInfo + val info: VideoInfo ) : MessageDataWrapper(status) data class EpisodeInfo( + override val type: String, val title: String, val episode: Int, val season: Int, val episodeTitle: String?, override val fullName: String -): VideoInfo(fullName) +): VideoInfo(type, fullName) data class MovieInfo( + override val type: String, val title: String, override val fullName: String -) : VideoInfo(fullName) +) : VideoInfo(type, fullName) data class SubtitleInfo( val inputFile: String, @@ -29,6 +31,7 @@ data class SubtitleInfo( val language: String ) -abstract class VideoInfo( +open class VideoInfo( + @Transient open val type: String, @Transient open val fullName: String ) \ No newline at end of file diff --git a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/KafkaTestBase.kt b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/KafkaTestBase.kt new file mode 100644 index 00000000..7812637c --- /dev/null +++ b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/KafkaTestBase.kt @@ -0,0 +1,29 @@ +package no.iktdev.mediaprocessing.shared.kafka + +import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener +import org.apache.kafka.clients.admin.AdminClient +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.InjectMocks +import org.mockito.Mock +import org.mockito.junit.jupiter.MockitoExtension +import org.springframework.kafka.core.KafkaTemplate + +@ExtendWith(MockitoExtension::class) +class KafkaTestBase { + + @Mock + lateinit var kafkaTemplate: KafkaTemplate + + @Mock + lateinit var adminClient: AdminClient + + /*@InjectMocks + lateinit var defaultProducer: DefaultProducer + + @InjectMocks + lateinit var defaultConsumer: DefaultConsumer*/ + + @InjectMocks + lateinit var defaultListener: DefaultMessageListener + +} \ No newline at end of file diff --git a/shared/kafka/src/test/kotlin/SerializationTest.kt b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt similarity index 91% rename from shared/kafka/src/test/kotlin/SerializationTest.kt rename to shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt index d3a8e389..4b132bdd 100644 --- a/shared/kafka/src/test/kotlin/SerializationTest.kt +++ b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt @@ -1,7 +1,7 @@ -import com.fasterxml.jackson.databind.ObjectMapper +package no.iktdev.mediaprocessing.shared.kafka + import com.google.gson.Gson import no.iktdev.mediaprocessing.shared.contract.ProcessType -import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.dto.Message