This commit is contained in:
bskjon 2024-07-13 19:56:33 +02:00
parent 12f3f6e3ac
commit 8ae9cbc302
50 changed files with 37 additions and 1107 deletions

View File

@ -1,15 +1,11 @@
package no.iktdev.mediaprocessing package no.iktdev.mediaprocessing
import kotlinx.serialization.json.*
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.json.JSONArray import org.json.JSONArray
import java.time.LocalDateTime import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
/*
class PersistentMessageFromJsonDump(events: String) { class PersistentMessageFromJsonDump(events: String) {
private var data: JsonArray? private var data: JsonArray?
@ -53,4 +49,4 @@ class PersistentMessageFromJsonDump(events: String) {
} }
} }*/

View File

@ -1,14 +1,9 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.PersistentMessageFromJsonDump
import no.iktdev.mediaprocessing.coordinator.tasksV2.listeners.MediaOutInformationTaskListener import no.iktdev.mediaprocessing.coordinator.tasksV2.listeners.MediaOutInformationTaskListener
import no.iktdev.mediaprocessing.shared.common.lastOrSuccessOf
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
/*
class MetadataAndBaseInfoToFileOutTest { class MetadataAndBaseInfoToFileOutTest {
fun testData(): String { fun testData(): String {
@ -42,4 +37,4 @@ class MetadataAndBaseInfoToFileOutTest {
assertThat(vi).isNotNull() assertThat(vi).isNotNull()
} }
} }*/

View File

@ -6,8 +6,6 @@ import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.streams.AudioArgume
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioPreference import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioPreference
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test

View File

@ -1,33 +0,0 @@
package no.iktdev.mediaprocessing.ui.coordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.tasks.EventBasedMessageListener
import no.iktdev.mediaprocessing.shared.common.tasks.ITaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.tasks.Tasks
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class PersistentEventBasedMessageListener: EventBasedMessageListener<PersistentMessage>() {
override fun listenerWantingEvent(
event: PersistentMessage,
waitingListeners: List<Tasks<PersistentMessage>>
): List<Tasks<PersistentMessage>> {
return waitingListeners.filter { event.event in it.listensForEvents }
}
override fun onForward(
event: PersistentMessage,
history: List<PersistentMessage>,
listeners: List<ITaskCreatorListener<PersistentMessage>>
) {
listeners.forEach {
it.onEventReceived(referenceId = event.referenceId, event = event, events = history)
}
}
override fun waitingListeners(events: List<PersistentMessage>): List<Tasks<PersistentMessage>> {
val nonCreators = listeners.filter { !events.map { e -> e.event }.contains(it.producesEvent) }
return nonCreators
}
}

View File

@ -1,56 +0,0 @@
package no.iktdev.mediaprocessing.ui.service
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.ui.getEventsDatabase
import org.jetbrains.exposed.sql.*
import org.springframework.stereotype.Service
import java.time.LocalDateTime
import javax.annotation.PostConstruct
@Service
class PersistentEventsTableService {
val dzz = DeserializingRegistry()
private var latestPull: LocalDateTime = LocalDateTime.MIN
val cachedEvents: MutableMap<String, List<EventsDto>> = mutableMapOf()
fun pullEvents() {
val pulled = withTransaction(getEventsDatabase()) {
val cached = latestPull
latestPull = LocalDateTime.now()
events.select {
(events.created greaterEq cached)
}
.orderBy(events.created, SortOrder.ASC)
.toEvent(dzz)
.groupBy { it.referenceId }
} ?: emptyMap()
pulled.forEach { (rid, events) ->
val cEvents = cachedEvents[rid] ?: emptyList()
cachedEvents[rid] = cEvents + events
}
}
fun Query?.toEvent(dzz: DeserializingRegistry): List<EventsDto> {
return this?.mapNotNull { fromRow(it, dzz) } ?: emptyList()
}
fun fromRow(row: ResultRow, dez: DeserializingRegistry): EventsDto? {
return EventsDto(
referenceId = row[events.referenceId],
eventId = row[events.eventId],
event = row[events.event],
data = row[events.data],
created = row[events.created]
)
}
@PostConstruct
fun onInitializationCompleted() {
pullEvents()
}
}

View File

@ -1,6 +1,5 @@
package no.iktdev.mediaprocessing.ui.socket package no.iktdev.mediaprocessing.ui.socket
import no.iktdev.mediaprocessing.ui.service.PersistentEventsTableService
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.messaging.handler.annotation.MessageMapping import org.springframework.messaging.handler.annotation.MessageMapping
import org.springframework.messaging.simp.SimpMessagingTemplate import org.springframework.messaging.simp.SimpMessagingTemplate

View File

@ -8,6 +8,7 @@ findProject(":apps:coordinator")?.name = "coordinator"
findProject(":apps:converter")?.name = "converter" findProject(":apps:converter")?.name = "converter"
findProject(":apps:processer")?.name = "processer" findProject(":apps:processer")?.name = "processer"
findProject(":shared")?.name = "shared" findProject(":shared")?.name = "shared"
findProject(":shared:contract")?.name = "contract" findProject(":shared:contract")?.name = "contract"
findProject(":shared:common")?.name = "common" findProject(":shared:common")?.name = "common"
@ -23,4 +24,3 @@ include("shared")
include("shared:contract") include("shared:contract")
include("shared:common") include("shared:common")
include("shared:eventi") include("shared:eventi")
findProject(":shared:eventi")?.name = "eventi"

View File

@ -1,14 +1,11 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import kotlinx.serialization.json.* import kotlinx.serialization.json.*
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.json.JSONArray import org.json.JSONArray
import java.time.LocalDateTime import java.time.LocalDateTime
import java.time.format.DateTimeFormatter import java.time.format.DateTimeFormatter
/*
class PersistentMessageFromJsonDump(events: String) { class PersistentMessageFromJsonDump(events: String) {
private var data: JsonArray? private var data: JsonArray?
@ -53,4 +50,4 @@ class PersistentMessageFromJsonDump(events: String) {
} }
} }*/

View File

@ -1,6 +1,6 @@
package no.iktdev.mediaprocessing.shared.common.parsing package no.iktdev.mediaprocessing.shared.common.parsing
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo import no.iktdev.mediaprocessing.shared.contract.data.EpisodeInfo
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test

View File

@ -3,19 +3,13 @@ package no.iktdev.mediaprocessing.shared.common.tests
import no.iktdev.mediaprocessing.shared.common.H2DataSource2 import no.iktdev.mediaprocessing.shared.common.H2DataSource2
import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig import no.iktdev.mediaprocessing.shared.common.datasource.DatabaseConnectionConfig
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentEventManager
import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.UUID import java.util.UUID
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.exposed.sql.deleteAll import org.jetbrains.exposed.sql.deleteAll
/*
class PersistentEventMangerTestBase { class PersistentEventMangerTestBase {
val defaultReferenceId = UUID.randomUUID().toString() val defaultReferenceId = UUID.randomUUID().toString()
val dataSource = H2DataSource2(DatabaseConnectionConfig( val dataSource = H2DataSource2(DatabaseConnectionConfig(
@ -492,4 +486,4 @@ class PersistentEventMangerTestBase {
) )
} }
} }*/

View File

@ -45,7 +45,7 @@ open class EventiImplementationBase: EventiApplicationTests() {
return condition() return condition()
} }
fun getEvents(): List<EventImpl> { fun getEvents(): List<List<EventImpl>> {
return coordinator?.eventManager?.readAvailableEvents() ?: emptyList() return coordinator?.eventManager?.readAvailableEvents() ?: emptyList()
} }

View File

@ -32,13 +32,14 @@ class ForthEventListener() : MockDataEventListener() {
} }
override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
if (!shouldIProcessAndHandleEvent(incomingEvent, events)) val event = incomingEvent.consume()
if (event == null)
return return
val info = incomingEvent.makeDerivedEventInfo(EventStatus.Success) val info = event.makeDerivedEventInfo(EventStatus.Success)
onProduceEvent(InitEvent( onProduceEvent(InitEvent(
eventType = produceEvent, eventType = produceEvent,
metadata = info, metadata = info,
data = incomingEvent.data as String data = event as String
)) ))
} }

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.mock.listeners package no.iktdev.eventi.mock.listeners
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.EventImpl import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.EventStatus import no.iktdev.eventi.data.EventStatus
import no.iktdev.eventi.mock.MockDataEventListener import no.iktdev.eventi.mock.MockDataEventListener
@ -28,8 +29,11 @@ class SecondEventListener() : MockDataEventListener() {
super.onProduceEvent(event) super.onProduceEvent(event)
} }
override fun onEventsReceived(incomingEvent: EventImpl, events: List<EventImpl>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
val info = incomingEvent.makeDerivedEventInfo(EventStatus.Success) val event = incomingEvent.consume()
if (event == null)
return
val info = event.makeDerivedEventInfo(EventStatus.Success)
onProduceEvent(SecondEvent( onProduceEvent(SecondEvent(
eventType = produceEvent, eventType = produceEvent,
metadata = info metadata = info

View File

@ -1,6 +1,7 @@
package no.iktdev.eventi.mock.listeners package no.iktdev.eventi.mock.listeners
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.implementations.EventCoordinator import no.iktdev.eventi.implementations.EventCoordinator
import no.iktdev.eventi.implementations.EventListenerImpl import no.iktdev.eventi.implementations.EventListenerImpl
import no.iktdev.eventi.data.EventImpl import no.iktdev.eventi.data.EventImpl
@ -31,13 +32,15 @@ class ThirdEventListener() : MockDataEventListener() {
super.onProduceEvent(event) super.onProduceEvent(event)
} }
override fun onEventsReceived(incomingEvent: EventImpl, events: List<EventImpl>) { override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
if (!shouldIProcessAndHandleEvent(incomingEvent, events)) val event = incomingEvent.consume()
if (event == null)
return return
(incomingEvent as SecondEvent).data.elements.forEach { element -> val info = event.makeDerivedEventInfo(EventStatus.Success)
(event as SecondEvent).data.elements.forEach { element ->
onProduceEvent(ThirdEvent( onProduceEvent(ThirdEvent(
eventType = produceEvent, eventType = produceEvent,
metadata = incomingEvent.makeDerivedEventInfo(EventStatus.Success), metadata = event.makeDerivedEventInfo(EventStatus.Success),
data = element data = element
) )
) )

View File

@ -9,7 +9,7 @@ import no.iktdev.eventi.mock.listeners.FirstEventListener
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
/*
class FirstEventListenerImplTestBase : EventiImplementationBase() { class FirstEventListenerImplTestBase : EventiImplementationBase() {
@ -51,4 +51,4 @@ class FirstEventListenerImplTestBase : EventiImplementationBase() {
val events = coordinator?.eventManager?.readAvailableEvents() ?: emptyList() val events = coordinator?.eventManager?.readAvailableEvents() ?: emptyList()
assertThat(events.filterIsInstance<FirstEvent>().distinctBy { it.metadata.referenceId }).hasSize(2) assertThat(events.filterIsInstance<FirstEvent>().distinctBy { it.metadata.referenceId }).hasSize(2)
} }
} }*/

View File

@ -12,7 +12,7 @@ import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
/*
class ForthEventListenerImplTestBase : EventiImplementationBase() { class ForthEventListenerImplTestBase : EventiImplementationBase() {
@ -40,4 +40,4 @@ class ForthEventListenerImplTestBase : EventiImplementationBase() {
assertThat(events.filter { it.eventType == ForthEventListener::class.java.simpleName }).hasSize( assertThat(events.filter { it.eventType == ForthEventListener::class.java.simpleName }).hasSize(
ElementsToCreate().elements.size) ElementsToCreate().elements.size)
} }
} }*/

View File

@ -10,7 +10,7 @@ import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
/*
class SecondEventListenerImplTestBase : EventiImplementationBase() { class SecondEventListenerImplTestBase : EventiImplementationBase() {
@ -38,4 +38,4 @@ class SecondEventListenerImplTestBase : EventiImplementationBase() {
assertThat(events.filterIsInstance<SecondEvent>()).hasSize(2) assertThat(events.filterIsInstance<SecondEvent>()).hasSize(2)
assertThat(events.filterIsInstance<SecondEvent>().distinctBy { it.metadata.referenceId }).hasSize(2) assertThat(events.filterIsInstance<SecondEvent>().distinctBy { it.metadata.referenceId }).hasSize(2)
} }
} }*/

View File

@ -11,7 +11,7 @@ import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
/*
class ThirdEventListenerImplTestBase : EventiImplementationBase() { class ThirdEventListenerImplTestBase : EventiImplementationBase() {
@ -38,4 +38,4 @@ class ThirdEventListenerImplTestBase : EventiImplementationBase() {
assertThat(events).hasSize(3 + ElementsToCreate().elements.size) assertThat(events).hasSize(3 + ElementsToCreate().elements.size)
assertThat(events.filter { it.eventType == ThirdEventListener::class.java.simpleName }).hasSize(ElementsToCreate().elements.size) assertThat(events.filter { it.eventType == ThirdEventListener::class.java.simpleName }).hasSize(ElementsToCreate().elements.size)
} }
} }*/

View File

@ -1,50 +0,0 @@
plugins {
id("java")
kotlin("jvm")
}
group = "no.iktdev.mediaprocessing.shared"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(kotlin("stdlib-jdk8"))
implementation("com.google.code.gson:gson:2.8.9")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation(project(mapOf("path" to ":shared:contract")))
implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("io.github.classgraph:classgraph:4.8.165")
testImplementation("org.springframework.kafka:spring-kafka-test:3.0.1")
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("junit:junit:4.13.2")
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.mockito:mockito-core:3.+")
testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.8.1")
testImplementation("org.mockito:mockito-core:3.10.0") // Oppdater versjonen hvis det er nyere tilgjengelig
testImplementation("org.mockito:mockito-junit-jupiter:3.10.0")
}
tasks.test {
useJUnitPlatform()
}
/*
kotlin {
jvmToolchain(17)
}*/

View File

@ -1,39 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import java.io.File
import kotlin.reflect.KClass
class AnnotationFinder {
fun getClassesWithAnnotation(packageName: String, annotation: KClass<out Annotation>): List<KClass<*>> {
val packageToScan = packageName.replace('.', '/')
val classLoader = Thread.currentThread().contextClassLoader
val resources = classLoader.getResources(packageToScan)
val classes = mutableListOf<KClass<*>>()
while (resources.hasMoreElements()) {
val resource = resources.nextElement()
if (resource.protocol == "file") {
val file = File(resource.file)
if (file.isDirectory) {
val classNames = file.walkTopDown().filter { it.isFile && it.extension == "class" }
.map { it.toRelativeString(file).removeSuffix(".class").replace('/', '.') }
.toList()
classNames.forEach { className ->
try {
val loadedClass = Class.forName(className).kotlin
if (loadedClass.annotations.any { it.annotationClass == annotation }) {
classes.add(loadedClass)
}
} catch (e: ClassNotFoundException) {
// Handle exception if needed
}
}
}
}
}
return classes
}
}

View File

@ -1,49 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
open class CoordinatorProducer() {
@Autowired
lateinit var kafkaTemplate: KafkaTemplate<String, String>
fun sendMessage(referenceId: String, event: KafkaEvents, data: MessageDataWrapper) {
send( KafkaEnv.kafkaTopic,
event.event, Message(
referenceId = referenceId,
data = data
)
)
}
fun sendMessage(referenceId: String, event: KafkaEvents, eventId: String, data: MessageDataWrapper) {
send( KafkaEnv.kafkaTopic,
event.event, Message(
referenceId = referenceId,
eventId = eventId,
data = data
)
)
}
open fun send(topic: String, key: String, message: Message<MessageDataWrapper>) {
val serializedMessage = serializeMessage(message)
try {
kafkaTemplate.send(ProducerRecord(topic, key, serializedMessage))
} catch (e: Exception) {
e.printStackTrace()
}
}
private fun serializeMessage(message: Message<MessageDataWrapper>): String {
val gson = Gson()
return gson.toJson(message)
}
}

View File

@ -1,88 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.stereotype.Component
import java.lang.IllegalArgumentException
import java.util.*
@Component
open class DefaultMessageListener(
) : MessageListener<String, String> {
private val logger = KotlinLogging.logger {}
@Autowired
private lateinit var consumerFactory: ConsumerFactory<String, String>
var onMessageReceived: (DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) -> Unit = {
logger.warn { "onMessageReceived has no listener" }
}
private val deserializer = DeserializingRegistry()
protected var container: KafkaMessageListenerContainer<String, String>? = null
open fun listen(topic: String) {
val listener = ConcurrentKafkaListenerContainerFactory<String,String>().apply {
consumerFactory = this@DefaultMessageListener.consumerFactory
}
val containerProperties = ContainerProperties(topic).apply {
messageListener = this@DefaultMessageListener
}
container = KafkaMessageListenerContainer(listener.consumerFactory, containerProperties)
container?.start()
logger.info { "Listening to topic $topic" }
}
fun stop() {
container?.stop()
container = null
}
fun resume() = container?.resume()
fun pause() = container?.pause()
fun isPaused() = container?.isContainerPaused
fun isRunning() = container?.isRunning
override fun onMessage(data: ConsumerRecord<String, String>) {
val event = try {
KafkaEvents.toEvent(data.key())
} catch (e: IllegalArgumentException) {
logger.error { "${data.key()} is not a member of KafkaEvents" }
null
}
event?.let {
val deserialized = deserializer.deserialize(it, data.value())
val dz = data.toDeserializedConsumerRecord(it, deserialized)
onMessageReceived(dz)
}
}
}
private fun <K, V, KDez, VDez> ConsumerRecord<K, V>.toDeserializedConsumerRecord(
keyzz: KDez,
valuezz: VDez
): DeserializedConsumerRecord<KDez, VDez> {
return DeserializedConsumerRecord(
topic = this.topic(),
partition = this.partition(),
offset = this.offset(),
timestamp = this.timestamp(),
timestampType = this.timestampType(),
headers = this.headers(),
key = keyzz,
value = valuezz,
leaderEpoch = this.leaderEpoch().orElse(null)
)
}

View File

@ -1,89 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import mu.KotlinLogging
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
class DeserializingRegistry {
private val log = KotlinLogging.logger {}
companion object {
val deserializables = mutableMapOf(
KafkaEvents.EventMediaProcessStarted to MediaProcessStarted::class.java,
KafkaEvents.EventMediaReadStreamPerformed to ReaderPerformed::class.java,
KafkaEvents.EventMediaParseStreamPerformed to MediaStreamsParsePerformed::class.java,
KafkaEvents.EventMediaReadBaseInfoPerformed to BaseInfoPerformed::class.java,
KafkaEvents.EventMediaMetadataSearchPerformed to MetadataPerformed::class.java,
KafkaEvents.EventMediaReadOutNameAndType to VideoInfoPerformed::class.java,
KafkaEvents.EventMediaReadOutCover to CoverInfoPerformed::class.java,
KafkaEvents.EventMediaParameterEncodeCreated to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EventMediaParameterExtractCreated to FfmpegWorkerArgumentsCreated::class.java,
KafkaEvents.EventMediaParameterConvertCreated to null,
KafkaEvents.EventMediaParameterDownloadCoverCreated to null,
KafkaEvents.EventNotificationOfWorkItemRemoval to NotificationOfDeletionPerformed::class.java,
KafkaEvents.EventWorkEncodeCreated to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EventWorkExtractCreated to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EventWorkConvertCreated to ConvertWorkerRequest::class.java,
KafkaEvents.EventWorkEncodePerformed to ProcesserEncodeWorkPerformed::class.java,
KafkaEvents.EventWorkExtractPerformed to ProcesserExtractWorkPerformed::class.java,
KafkaEvents.EventWorkConvertPerformed to ConvertWorkPerformed::class.java,
KafkaEvents.EventWorkDownloadCoverPerformed to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EventMediaProcessCompleted to ProcessCompleted::class.java
)
}
fun deserialize(event: KafkaEvents, json: String): Message<out MessageDataWrapper> {
val gson = Gson()
val dezClazz = deserializables[event]
if (dezClazz == null) {
log.warn { "${event.event} will be deserialized with default!" }
}
dezClazz?.let { eventClass ->
try {
val type = TypeToken.getParameterized(Message::class.java, eventClass).type
return gson.fromJson<Message<MessageDataWrapper>>(json, type)
} catch (e: Exception) {
e.printStackTrace()
}
}
// Fallback
val type = object : TypeToken<Message<out SimpleMessageData>>() {}.type
return gson.fromJson<Message<SimpleMessageData>>(json, type)
}
fun deserializeData(event: KafkaEvents, json: String): MessageDataWrapper {
val gson = Gson()
val dezClazz = deserializables[event]
dezClazz?.let { eventClass ->
try {
val type = TypeToken.getParameterized(eventClass).type
return gson.fromJson<MessageDataWrapper>(json, type)
} catch (e: Exception) {
e.printStackTrace()
}
}
try {
// Fallback
val type = object : TypeToken<SimpleMessageData>() {}.type
return gson.fromJson<SimpleMessageData>(json, type)
} catch (e: Exception) {
e.printStackTrace()
}
// Default
val type = object : TypeToken<MessageDataWrapper>() {}.type
return gson.fromJson<MessageDataWrapper>(json, type)
}
}

View File

@ -1,8 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import java.lang.annotation.ElementType
@Retention(AnnotationRetention.RUNTIME)
@Target(AnnotationTarget.CLASS)
annotation class KafkaBelongsToEvent(vararg val event: KafkaEvents)

View File

@ -1,20 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import java.util.UUID
class KafkaEnv {
companion object {
val servers: String = System.getenv("KAFKA_BOOTSTRAP_SERVER") ?: "127.0.0.1:9092"
var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}"
val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest"
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10
val heartbeatIntervalMilliseconds: Int = System.getenv("KAFKA_HEARTBEAT_INTERVAL_MS")?.toIntOrNull() ?: 2000
val sessionTimeOutMilliseconds: Int = System.getenv("KAFKA_SESSION_INACTIVITY_MS")?.toIntOrNull() ?: (listOf(
metadataTimeoutMinutes,
heartbeatIntervalMilliseconds
).max() * 60)
}
}

View File

@ -1,66 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
enum class KafkaEvents(val event: String) {
EventMediaProcessStarted ("event:media-process:started"),
EventMediaReadStreamPerformed ("event:media-read-stream:performed"),
EventMediaParseStreamPerformed ("event:media-parse-stream:performed"),
EventMediaReadBaseInfoPerformed ("event:media-read-base-info:performed"),
EventMediaMetadataSearchPerformed ("event:media-metadata-search:performed"),
EventMediaReadOutNameAndType ("event:media-read-out-name-and-type:performed"),
EventMediaReadOutCover ("event:media-read-out-cover:performed"),
EventMediaParameterEncodeCreated ("event:media-encode-parameter:created"),
EventMediaParameterExtractCreated ("event:media-extract-parameter:created"),
EventMediaParameterConvertCreated ("event:media-convert-parameter:created"),
EventMediaParameterDownloadCoverCreated ("event:media-download-cover-parameter:created"),
EventMediaWorkProceedPermitted ("event:media-work-proceed:permitted"),
EventNotificationOfWorkItemRemoval("event:notification-work-item-removal"),
EventWorkEncodeCreated ("event:work-encode:created"),
EventWorkExtractCreated ("event:work-extract:created"),
EventWorkConvertCreated ("event:work-convert:created"),
EventWorkEncodePerformed ("event:work-encode:performed"),
EventWorkExtractPerformed ("event:work-extract:performed"),
EventWorkConvertPerformed ("event:work-convert:performed"),
EventWorkDownloadCoverPerformed ("event:work-download-cover:performed"),
EVENT_STORE_VIDEO_PERFORMED ("event:store-video:performed"),
EVENT_STORE_SUBTITLE_PERFORMED ("event:store-subtitle:performed"),
EVENT_STORE_COVER_PERFORMED ("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED ("event:store-metadata:performed"),
EventMediaProcessCompleted ("event:media-process:completed"),
EventCollectAndStore ("event::save"),
;
companion object {
fun toEvent(event: String): KafkaEvents? {
return KafkaEvents.entries.find { it.event == event }
}
fun isOfWork(event: KafkaEvents): Boolean {
return event in listOf(
EventWorkConvertCreated,
EventWorkExtractCreated,
EventWorkEncodeCreated,
EventWorkEncodePerformed,
EventWorkConvertPerformed,
EventWorkExtractPerformed
)
}
fun isOfFinalize(event: KafkaEvents): Boolean {
return event in listOf(
EventMediaProcessCompleted,
EventCollectAndStore
)
}
}
}

View File

@ -1,49 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.core
import mu.KotlinLogging
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.core.*
@Configuration
open class KafkaImplementation {
private val log = KotlinLogging.logger {}
@Bean
open fun admin() = KafkaAdmin(mapOf(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to KafkaEnv.servers
))
@Bean
open fun producerFactory(): ProducerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap()
config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
//log.info { config }
return DefaultKafkaProducerFactory(config)
}
@Bean
open fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
open fun consumerFactory(): ConsumerFactory<String, String> {
val config: MutableMap<String, Any> = HashMap()
config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KafkaEnv.servers
config[ConsumerConfig.GROUP_ID_CONFIG] = KafkaEnv.consumerId
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages
config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds
config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds
//log.info { config }
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
}
}

View File

@ -1,7 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
import java.util.*
open class CollectionReference(
@Transient open val referenceId: String = UUID.randomUUID().toString(),
) {}

View File

@ -1,16 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.record.TimestampType
data class DeserializedConsumerRecord<K, V>(
val topic: String,
val partition: Int,
val offset: Long,
val timestamp: Long,
val timestampType: TimestampType,
val headers: Headers,
val key: K,
val value: V,
val leaderEpoch: Int?
)

View File

@ -1,37 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
import java.lang.reflect.Type
import java.util.*
open class Message<C: MessageDataWrapper>(
override val referenceId: String = UUID.randomUUID().toString(),
val eventId: String = UUID.randomUUID().toString(),
val data: C? = null
): CollectionReference() {
fun dataAsJson(): String = Gson().toJson(this.data)
fun <C> dataAs(clazz: C): C? {
return try {
val typeToken = object : TypeToken<C>() {}.type
val gson = Gson()
val json: String = gson.toJson(data)
gson.fromJson<C>(json, typeToken)
} catch (e: Exception) {
e.printStackTrace()
null
}
}
fun <C> dataAs(type: Type): C? {
return try {
val gson = Gson()
val json = dataAsJson()
gson.fromJson<C>(json, type)
} catch (e: Exception) {
null
}
}
}

View File

@ -1,38 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
abstract class MessageDataWrapper(
@Transient open val status: Status = Status.ERROR,
@Transient open val message: String? = null,
@Transient open val derivedFromEventId: String? = null
)
@Suppress("UNCHECKED_CAST")
fun <T> MessageDataWrapper.az(): T? {
return try {
this as T
} catch (e: Exception) {
e.printStackTrace()
null
}
}
data class SimpleMessageData(
override val status: Status,
override val message: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, message, derivedFromEventId)
fun MessageDataWrapper?.isSuccess(): Boolean {
return this != null && this.status == Status.COMPLETED
}
fun MessageDataWrapper?.isFailed(): Boolean {
return if (this == null) true else this.status != Status.COMPLETED
}
fun MessageDataWrapper?.isSkipped(): Boolean {
return this != null && this.status != Status.SKIPPED
}

View File

@ -1,11 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
enum class Status {
SKIPPED,
COMPLETED,
ERROR
}
fun Status.isCompleted(): Boolean {
return this == Status.COMPLETED
}

View File

@ -1,19 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadBaseInfoPerformed)
data class BaseInfoPerformed(
override val status: Status,
val title: String,
val sanitizedName: String,
val searchTitles: List<String> = emptyList<String>(),
override val derivedFromEventId: String
) : MessageDataWrapper(status = status, derivedFromEventId = derivedFromEventId)
fun BaseInfoPerformed?.hasValidData(): Boolean {
return this != null && this.title.isNotBlank() && this.sanitizedName.isNotBlank()
}

View File

@ -1,15 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertPerformed)
data class ConvertWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
override val derivedFromEventId: String,
val outFiles: List<String> = listOf()
): MessageDataWrapper(status, message, derivedFromEventId)

View File

@ -1,19 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.dto.SubtitleFormats
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventWorkConvertCreated)
data class ConvertWorkerRequest(
override val status: Status,
val requiresEventId: String? = null,
override val derivedFromEventId: String? = null,
val inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,
val outDirectory: String,
val outFormats: List<SubtitleFormats> = listOf()
): MessageDataWrapper(status, derivedFromEventId = derivedFromEventId)

View File

@ -1,14 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventWorkDownloadCoverPerformed)
data class CoverDownloadWorkPerformed(
override val status: Status,
override val message: String? = null,
val coverFile: String,
override val derivedFromEventId: String?
): MessageDataWrapper(status, message, derivedFromEventId = derivedFromEventId)

View File

@ -1,15 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutCover)
data class CoverInfoPerformed(
override val status: Status,
val url: String,
val outDir: String,
val outFileBaseName: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,18 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(
KafkaEvents.EventWorkEncodeCreated,
KafkaEvents.EventWorkExtractCreated
)
data class FfmpegWorkRequestCreated(
override val status: Status,
val inputFile: String,
val arguments: List<String>,
val outFile: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,28 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
/**
* @param status Status type
* @param inputFile File.absolutePath
* @param outputFile File.absolutePath
* @param arguments Requires arguments, instructions for what ffmpeg should do
*/
@KafkaBelongsToEvent(
KafkaEvents.EventMediaParameterEncodeCreated,
KafkaEvents.EventMediaParameterExtractCreated
)
data class FfmpegWorkerArgumentsCreated(
override val status: Status,
val inputFile: String, // absolutePath
val entries: List<FfmpegWorkerArgument>,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)
data class FfmpegWorkerArgument(
val outputFile: String,
val arguments: List<String>
)

View File

@ -1,20 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.contract.dto.StartOperationEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaProcessStarted)
data class MediaProcessStarted(
override val status: Status,
val type: ProcessType = ProcessType.FLOW,
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
StartOperationEvents.EXTRACT,
StartOperationEvents.CONVERT
),
val file: String // AbsolutePath
) : MessageDataWrapper(status)

View File

@ -1,14 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaParseStreamPerformed)
data class MediaStreamsParsePerformed(
override val status: Status,
val streams: ParsedMediaStreams,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,28 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaMetadataSearchPerformed)
data class MetadataPerformed(
override val status: Status,
override val message: String? = null,
val data: pyMetadata? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)
data class pyMetadata(
val title: String,
val altTitle: List<String> = emptyList(),
val cover: String? = null,
val type: String,
val summary: List<pySummary> = emptyList(),
val genres: List<String> = emptyList()
)
data class pySummary(
val summary: String?,
val language: String = "eng"
)

View File

@ -1,15 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventNotificationOfWorkItemRemoval)
data class NotificationOfDeletionPerformed(
override val status: Status = Status.COMPLETED,
override val message: String? = null,
override val derivedFromEventId: String? = null, // Skal aldri settes derived
val deletedEventId: String,
val deletedEvent: KafkaEvents
): MessageDataWrapper()

View File

@ -1,12 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaProcessCompleted)
data class ProcessCompleted(
override val status: Status,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,15 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadStreamPerformed)
data class ReaderPerformed(
override val status: Status,
val file: String, //AbsolutePath
val output: JsonObject,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,58 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.Gson
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType)
data class VideoInfoPerformed(
override val status: Status,
val info: JsonObject,
val outDirectory: String,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId) {
fun toValueObject(): VideoInfo? {
val type = info.get("type").asString
return when (type) {
"movie" -> Gson().fromJson(info.toString(), MovieInfo::class.java)
"serie" -> Gson().fromJson(info.toString(), EpisodeInfo::class.java)
else -> null
}
}
}
data class EpisodeInfo(
override val type: String = "serie",
override val title: String,
val episode: Int,
val season: Int,
val episodeTitle: String?,
override val fullName: String
): VideoInfo(type, title, fullName)
data class MovieInfo(
override val type: String = "movie",
override val title: String,
override val fullName: String
) : VideoInfo(type, title, fullName)
data class SubtitleInfo(
val inputFile: String,
val collection: String,
val language: String
)
@KafkaBelongsToEvent(KafkaEvents.EventMediaReadOutNameAndType)
open class VideoInfo(
@Transient open val type: String,
@Transient open val title: String,
@Transient open val fullName: String
) {
fun toJsonObject(): JsonObject {
return Gson().toJsonTree(this).asJsonObject
}
}

View File

@ -1,18 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent(
KafkaEvents.EventWorkEncodePerformed
)
data class ProcesserEncodeWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val outFile: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,18 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent(
KafkaEvents.EventWorkExtractPerformed
)
data class ProcesserExtractWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val outFile: String? = null,
override val derivedFromEventId: String?
) : MessageDataWrapper(status, derivedFromEventId)

View File

@ -1,29 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import org.apache.kafka.clients.admin.AdminClient
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.InjectMocks
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.springframework.kafka.core.KafkaTemplate
@ExtendWith(MockitoExtension::class)
class KafkaTestBase {
@Mock
lateinit var kafkaTemplate: KafkaTemplate<String, String>
@Mock
lateinit var adminClient: AdminClient
/*@InjectMocks
lateinit var defaultProducer: DefaultProducer
@InjectMocks
lateinit var defaultConsumer: DefaultConsumer*/
@InjectMocks
lateinit var defaultListener: DefaultMessageListener
}

View File

@ -1,46 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka
import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test
import org.assertj.core.api.Assertions.assertThat
class SerializationTest {
@Test
fun serialize() {
val gson = Gson()
val message = Message(
"d2fb1472-ebdd-4fce-9ffd-7202a1ad911d",
"01e4420d-f7ab-49b5-ac5b-8b0f4f4a600e",
data = MediaProcessStarted(
Status.COMPLETED,
ProcessType.MANUAL,
file = "Potato.mp4"
))
val json = gson.toJson(message)
val deserializer = DeserializingRegistry()
val result = deserializer.deserialize(KafkaEvents.EventMediaProcessStarted, json)
assertThat(result.data).isInstanceOf(MediaProcessStarted::class.java)
}
}
data class MockData(
override val status: Status,
val tekst: String
): MessageDataWrapper(status)