WIP - Looping issue

This commit is contained in:
Brage 2023-12-13 18:26:16 +01:00
parent 1ba4c0ee6d
commit 3d119813dd
36 changed files with 627 additions and 185 deletions

View File

@ -11,23 +11,31 @@ version = "1.0-SNAPSHOT"
repositories { repositories {
mavenCentral() mavenCentral()
maven("https://jitpack.io")
maven {
url = uri("https://reposilite.iktdev.no/releases")
}
maven {
url = uri("https://reposilite.iktdev.no/snapshots")
}
} }
dependencies { dependencies {
/*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:2.7.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation(project(mapOf("path" to ":shared:kafka"))) implementation(project(mapOf("path" to ":shared:kafka")))
implementation(project(mapOf("path" to ":shared"))) implementation(project(mapOf("path" to ":shared")))
implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:contract")))
implementation(project(mapOf("path" to ":shared:common"))) implementation(project(mapOf("path" to ":shared:common")))
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
} }
tasks.test { tasks.test {

View File

@ -20,12 +20,13 @@ repositories {
} }
} }
val exposedVersion = "0.44.0" val exposedVersion = "0.44.0"
dependencies { dependencies {
/*Spring boot*/ /*Spring boot*/
implementation("org.springframework.boot:spring-boot-starter-web") implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter:2.7.0") implementation("org.springframework.boot:spring-boot-starter:3.2.0")
// implementation("org.springframework.kafka:spring-kafka:3.0.1") // implementation("org.springframework.kafka:spring-kafka:3.0.1")
implementation("org.springframework.kafka:spring-kafka:2.8.5") implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3") implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
@ -53,18 +54,35 @@ dependencies {
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
implementation(kotlin("stdlib-jdk8")) implementation(kotlin("stdlib-jdk8"))
testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1")
testImplementation("org.assertj:assertj-core:3.21.0") testImplementation("org.assertj:assertj-core:3.21.0")
testImplementation(project(mapOf("path" to ":shared:common")))
testImplementation("junit:junit:4.12")
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.13.0")
testImplementation("org.skyscreamer:jsonassert:1.5.0")
testImplementation("org.mockito:mockito-core:3.+")
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.mockito:mockito-core:3.+")
testImplementation("org.assertj:assertj-core:3.4.1")
/*testImplementation("org.junit.vintage:junit-vintage-engine")
testImplementation("org.junit.jupiter:junit-jupiter:5.10.1")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.10.1")
testRuntimeOnly ("org.junit.jupiter:junit-jupiter-engine:5.10.1")
testImplementation("org.mockito:mockito-core:5.8.0") // Oppdater versjonen hvis det er nyere tilgjengelig
testImplementation("org.mockito:mockito-junit-jupiter:5.8.0")
testImplementation(platform("org.junit:junit-bom:5.10.1"))
testImplementation("org.junit.platform:junit-platform-runner:1.10.1")*/
} }
tasks.test { tasks.withType<Test> {
useJUnitPlatform() useJUnitPlatform()
} }
kotlin { kotlin {
jvmToolchain(17) jvmToolchain(17)
} }

View File

@ -4,24 +4,32 @@ import com.google.gson.Gson
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataReader
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.UUID import java.util.UUID
import javax.annotation.PostConstruct
@Service @Service
class Coordinator() { class Coordinator() {
val producer = CoordinatorProducer()
@Autowired
private lateinit var producer: CoordinatorProducer
@Autowired
private lateinit var listener: DefaultMessageListener
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
@ -97,18 +105,12 @@ class Coordinator() {
val io = Coroutines.io() val io = Coroutines.io()
private val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
val success = PersistentDataStore().storeMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database!" }
} else
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
fun readAllMessagesFor(referenceId: String, eventId: String) { fun readAllMessagesFor(referenceId: String, eventId: String) {
val messages = PersistentDataReader().getMessagesFor(referenceId)
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
io.launch { io.launch {
val messages = PersistentDataReader().getMessagesFor(referenceId)
createTasksBasedOnEventsAndPersistance(referenceId, eventId, messages)
buildModelBasedOnMessagesFor(referenceId, messages) buildModelBasedOnMessagesFor(referenceId, messages)
} }
} }
@ -120,40 +122,41 @@ class Coordinator() {
} }
fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List<PersistentMessage>) { fun createTasksBasedOnEventsAndPersistance(referenceId: String, eventId: String, messages: List<PersistentMessage>) {
io.launch { val triggered = messages.find { it.eventId == eventId }
val triggered = messages.find { it.eventId == eventId } ?: return@launch if (triggered == null) {
listeners.forEach { it.onEventReceived(referenceId, triggered, messages) } log.error { "Could not find $eventId in provided messages" }
if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) { return
val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted }
if (processStarted.type == ProcessType.FLOW) { listeners.forEach { it.onEventReceived(referenceId, triggered, messages) }
log.info { "Process for $referenceId was started from flow and will be processed" }
if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) { if (listOf(KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED, KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED).contains(triggered.event) && triggered.data.isSuccess()) {
produceEncodeWork(triggered) val processStarted = messages.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED }?.data as ProcessStarted
} else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
produceExtractWork(triggered) if (processStarted.type == ProcessType.FLOW) {
} log.info { "Process for $referenceId was started from flow and will be processed" }
} else { if (triggered.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED) {
log.info { "Process for $referenceId was started manually and will require user input for continuation" } produceEncodeWork(triggered)
} else if (triggered.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED) {
produceExtractWork(triggered)
} }
} else {
log.info { "Process for $referenceId was started manually and will require user input for continuation" }
} }
} }
} }
@PostConstruct
fun onReady() {
init { io.launch {
io.launch { listener.listen() } listener.onMessageReceived = { event ->
val success = PersistentDataStore().storeMessage(event.key.event, event.value)
if (!success) {
log.error { "Unable to store message: ${event.key.event} in database!" }
} else
readAllMessagesFor(event.value.referenceId, event.value.eventId)
}
listener.listen(KafkaEnv.kafkaTopic) }
} }
} }
abstract class TaskCreator: TaskCreatorListener {
val producer = CoordinatorProducer()
open fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean {
return true
}
}
interface TaskCreatorListener {
fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>): Unit
}

View File

@ -8,6 +8,7 @@ import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
@ -26,19 +27,19 @@ fun getContext(): ApplicationContext? {
} }
fun main(args: Array<String>) { fun main(args: Array<String>) {
// val dataSource = MySqlDataSource.fromDatabaseEnv(); val dataSource = MySqlDataSource.fromDatabaseEnv();
/*Coroutines.default().launch { Coroutines.default().launch {
dataSource.createDatabase() dataSource.createDatabase()
dataSource.createTables( dataSource.createTables(
events events
) )
}*/ }
context = runApplication<CoordinatorApplication>(*args) context = runApplication<CoordinatorApplication>(*args)
printSharedConfig() printSharedConfig()
} }
fun printSharedConfig() { fun printSharedConfig() {
log.info { "Kafka topic: ${SharedConfig.kafkaTopic}" } log.info { "Kafka topic: ${KafkaEnv.kafkaTopic}" }
log.info { "File Input: ${SharedConfig.incomingContent}" } log.info { "File Input: ${SharedConfig.incomingContent}" }
log.info { "File Output: ${SharedConfig.outgoingContent}" } log.info { "File Output: ${SharedConfig.outgoingContent}" }
log.info { "Ffprobe: ${SharedConfig.ffprobe}" } log.info { "Ffprobe: ${SharedConfig.ffprobe}" }

View File

@ -6,7 +6,6 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation import no.iktdev.mediaprocessing.shared.kafka.core.KafkaImplementation
import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import import org.springframework.context.annotation.Import
import org.springframework.stereotype.Component
@Configuration @Configuration
class SocketLocalInit: SocketImplementation() class SocketLocalInit: SocketImplementation()

View File

@ -0,0 +1,8 @@
package no.iktdev.mediaprocessing.coordinator
import org.springframework.stereotype.Service
@Service
class MessageOperator {
}

View File

@ -0,0 +1,18 @@
package no.iktdev.mediaprocessing.coordinator
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import org.springframework.beans.factory.annotation.Autowired
abstract class TaskCreator: TaskCreatorListener {
@Autowired
lateinit var producer: CoordinatorProducer
open fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean {
return true
}
}
interface TaskCreatorListener {
fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>): Unit
}

View File

@ -0,0 +1,5 @@
package no.iktdev.mediaprocessing.coordinator.coordination
class MessageSequence {
}

View File

@ -1,41 +1,44 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser import no.iktdev.mediaprocessing.shared.common.parsing.FileNameParser
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
class BaseInfoFromFile(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( @Service
SharedConfig.kafkaTopic)): ProcessingService(producer, listener) { class BaseInfoFromFile(@Autowired var coordinator: Coordinator): ProcessingService() {
init {
listener.onMessageReceived = { event ->
val message = event.value
if (message.data is ProcessStarted) {
io.launch {
val result = readFileInfo(message.data as ProcessStarted)
onResult(message.referenceId, result)
}
}
}
io.launch {
listener.listen()
}
}
override fun onResult(referenceId: String, data: MessageDataWrapper) { override fun onResult(referenceId: String, data: MessageDataWrapper) {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data) producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, data)
} }
override fun onReady() {
coordinator.addListener(object : TaskCreatorListener {
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) {
io.launch {
val result = readFileInfo(event.data as ProcessStarted)
onResult(referenceId, result)
}
}
}
})
}
fun readFileInfo(started: ProcessStarted): MessageDataWrapper { fun readFileInfo(started: ProcessStarted): MessageDataWrapper {
val result = try { val result = try {
val fileName = File(started.file).nameWithoutExtension val fileName = File(started.file).nameWithoutExtension

View File

@ -1,21 +1,17 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.stereotype.Service
class MediaStreamsAnalyze { class MediaStreamsAnalyze {
val io = Coroutines.io() val io = Coroutines.io()
/*
val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event -> val listener = DefaultMessageListener(SharedConfig.kafkaTopic) { event ->
if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) { if (event.key == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) {
if (event.value.data?.status == Status.COMPLETED) { if (event.value.data?.status == Status.COMPLETED) {
} }
} }
} }*/
} }

View File

@ -1,51 +1,52 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.JsonObject
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.AudioStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream import no.iktdev.mediaprocessing.shared.contract.ffmpeg.VideoStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service
class ParseVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( class ParseVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() {
SharedConfig.kafkaTopic)): ProcessingService(producer, listener) {
override fun onResult(referenceId: String, data: MessageDataWrapper) { override fun onResult(referenceId: String, data: MessageDataWrapper) {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data) producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED, data)
} }
init { override fun onReady() {
listener.onMessageReceived = { event -> coordinator.addListener(object : TaskCreatorListener {
val message = event.value override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (message.data is ReaderPerformed) { if (event.event == KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED && event.data.isSuccess()) {
io.launch { io.launch {
val result = parseStreams(message.data as ReaderPerformed) val result = parseStreams(event.data as ReaderPerformed)
onResult(message.referenceId, result) onResult(referenceId, result)
}
} }
} }
}
io.launch { })
listener.listen()
}
} }
fun parseStreams(data: ReaderPerformed): MessageDataWrapper { fun parseStreams(data: ReaderPerformed): MessageDataWrapper {
val gson = Gson() val gson = Gson()
return try { return try {
val jsonObject = gson.fromJson(data.output, JsonObject::class.java) val jStreams = data.output.getAsJsonArray("streams")
val jStreams = jsonObject.getAsJsonArray("streams")
val videoStreams = mutableListOf<VideoStream>() val videoStreams = mutableListOf<VideoStream>()
val audioStreams = mutableListOf<AudioStream>() val audioStreams = mutableListOf<AudioStream>()
@ -70,7 +71,7 @@ class ParseVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(
audioStream = audioStreams, audioStream = audioStreams,
subtitleStream = subtitleStreams subtitleStream = subtitleStreams
) )
MessageDataWrapper(Status.COMPLETED, gson.toJson(parsedStreams)) MediaStreamsParsePerformed(Status.COMPLETED, parsedStreams)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() e.printStackTrace()

View File

@ -1,48 +1,54 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
import com.google.gson.Gson
import com.google.gson.JsonObject
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener
import no.iktdev.mediaprocessing.shared.common.ProcessingService import no.iktdev.mediaprocessing.shared.common.ProcessingService
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput import no.iktdev.mediaprocessing.shared.common.runner.CodeToOutput
import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing import no.iktdev.mediaprocessing.shared.common.runner.getOutputUsing
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
class ReadVideoFileStreams(producer: CoordinatorProducer = CoordinatorProducer(), listener: DefaultMessageListener = DefaultMessageListener( @Service
SharedConfig.kafkaTopic) class ReadVideoFileStreams(@Autowired var coordinator: Coordinator): ProcessingService() {
): ProcessingService(producer, listener) {
override fun onResult(referenceId: String, data: MessageDataWrapper) { override fun onResult(referenceId: String, data: MessageDataWrapper) {
producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data) producer.sendMessage(referenceId, KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED, data)
} }
init { override fun onReady() {
listener.onMessageReceived = { event -> coordinator.addListener(object : TaskCreatorListener {
val message = event.value override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (message.data is ProcessStarted) { if (event.event == KafkaEvents.EVENT_PROCESS_STARTED && event.data.isSuccess()) {
io.launch { io.launch {
val result = fileReadStreams(message.data as ProcessStarted) val result = fileReadStreams(event.data as ProcessStarted)
onResult(message.referenceId, result) onResult(referenceId, result)
}
} }
} }
}
io.launch { })
listener.listen()
}
} }
suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper { suspend fun fileReadStreams(started: ProcessStarted): MessageDataWrapper {
val file = File(started.file) val file = File(started.file)
return if (file.exists() && file.isFile) { return if (file.exists() && file.isFile) {
val result = readStreams(file) val result = readStreams(file)
ReaderPerformed(Status.COMPLETED, file = started.file, output = result.output.joinToString("\n")) val joined = result.output.joinToString(" ")
val jsoned = Gson().fromJson(joined, JsonObject::class.java)
ReaderPerformed(Status.COMPLETED, file = started.file, output = jsoned)
} else { } else {
MessageDataWrapper(Status.ERROR, "File in data is not a file or does not exist") MessageDataWrapper(Status.ERROR, "File in data is not a file or does not exist")
} }

View File

@ -3,18 +3,20 @@ package no.iktdev.mediaprocessing.coordinator.tasks.event
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.Coordinator
import no.iktdev.mediaprocessing.coordinator.TaskCreatorListener import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds import no.iktdev.mediaprocessing.shared.common.datasource.toEpochSeconds
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.time.LocalDateTime import java.time.LocalDateTime
@ -23,20 +25,17 @@ import java.time.LocalDateTime
* *
*/ */
@Service @Service
class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreatorListener { @EnableScheduling
class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coordinator): TaskCreator() {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
init { init {
coordinator.addListener(this) coordinator.addListener(this)
} }
val producer = CoordinatorProducer()
val waitingProcessesForMeta: MutableMap<String, LocalDateTime> = mutableMapOf() val waitingProcessesForMeta: MutableMap<String, LocalDateTime> = mutableMapOf()
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) { override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
if (!listOf( if (!listOf(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED).contains(event.event)) {
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED)
.contains(event.event)) {
return return
} }
@ -48,8 +47,9 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord
return return
} }
if (baseInfo.isSuccess() && meta == null) { if (baseInfo.isSuccess() && meta == null) {
log.info { "Sending ${baseInfo?.title} to waiting queue" }
if (!waitingProcessesForMeta.containsKey(referenceId)) { if (!waitingProcessesForMeta.containsKey(referenceId)) {
waitingProcessesForMeta[referenceId] waitingProcessesForMeta[referenceId] = LocalDateTime.now()
} }
return return
} }
@ -104,14 +104,15 @@ class MetadataAndBaseInfoToFileOutAndCoverTask(@Autowired var coordinator: Coord
} }
@Scheduled(fixedDelay = (60_000)) //@Scheduled(fixedDelay = (60_000))
@Scheduled(fixedDelay = (1_000))
fun sendErrorMessageForMetadata() { fun sendErrorMessageForMetadata() {
//val timeThresholdInMinutes = 10 * 60_000
val expired = waitingProcessesForMeta.filter { val expired = waitingProcessesForMeta.filter {
LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + 10 * 60) LocalDateTime.now().toEpochSeconds() > (it.value.toEpochSeconds() + KafkaEnv.metadataTimeoutMinutes * 60)
} }
expired.forEach { expired.forEach {
producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MessageDataWrapper(status = Status.ERROR, "Timed Out by: ${this::javaClass.name}")) log.info { "Producing timeout for ${it.key} ${LocalDateTime.now()}" }
producer.sendMessage(it.key, KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED, MetadataPerformed(status = Status.ERROR, "Timed Out by: ${this@MetadataAndBaseInfoToFileOutAndCoverTask::class.simpleName}"))
waitingProcessesForMeta.remove(it.key) waitingProcessesForMeta.remove(it.key)
} }
} }

View File

@ -31,17 +31,19 @@ class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : Ta
override fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean { override fun isPrerequisitesOk(events: List<PersistentMessage>): Boolean {
val required = listOf( val required = listOf(
KafkaEvents.EVENT_PROCESS_STARTED.event, KafkaEvents.EVENT_PROCESS_STARTED,
KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED.event, KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED.event KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
) )
return events.filter { it.eventId in required }.all { it.data.isSuccess() } val currentEvents = events.map { it.event }
val hasAllRequiredEvents = required.all { currentEvents.contains(it) }
val hasAllRequiredData = events.filter { e -> e.event in required }.all { it.data.isSuccess() }
return hasAllRequiredData && hasAllRequiredEvents
} }
override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) { override fun onEventReceived(referenceId: String, event: PersistentMessage, events: List<PersistentMessage>) {
val preference = Preference.getPreference() val preference = Preference.getPreference()
if (event.event != KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED)
return
if (!isPrerequisitesOk(events)) { if (!isPrerequisitesOk(events)) {
return return
@ -49,8 +51,7 @@ class OutNameToWorkArgumentCreator(@Autowired var coordinator: Coordinator) : Ta
val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted val inputFile = events.find { it.data is ProcessStarted }?.data as ProcessStarted
val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed val baseInfo = events.findLast { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed
val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed val readStreamsEvent = events.find { it.data is MediaStreamsParsePerformed }?.data as MediaStreamsParsePerformed
val serializedParsedStreams = val serializedParsedStreams = readStreamsEvent.streams
Gson().fromJson<ParsedMediaStreams>(readStreamsEvent.parsedAsJson, ParsedMediaStreams::class.java)
val outDir = SharedConfig.outgoingContent.using(baseInfo.title) val outDir = SharedConfig.outgoingContent.using(baseInfo.title)

View File

@ -0,0 +1,3 @@
spring.output.ansi.enabled=always
logging.level.org.apache.kafka=INFO
logging.level.root=INFO

View File

@ -1,25 +1,37 @@
package no.iktdev.mediaprocessing.coordinator.reader package no.iktdev.mediaprocessing.coordinator.reader
/*
import TestKafka import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Named
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mock
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.beans.factory.annotation.Autowired
import java.io.File import java.io.File
import java.util.UUID
class BaseInfoFromFileTest { class BaseInfoFromFileTest {
val referenceId = UUID.randomUUID().toString()
val baseInfoFromFile = BaseInfoFromFile(TestKafka.producer, TestKafka.listener) @Autowired
private lateinit var testBase: KafkaTestBase
@Mock
lateinit var coordinatorProducer: CoordinatorProducer
val baseInfoFromFile = BaseInfoFromFile(coordinatorProducer)
@Test @Test
fun testReadFileInfo() { fun testReadFileInfo() {
val input = ProcessStarted(Status.COMPLETED, ProcessType.FLOW, val input = ProcessStarted(
Status.COMPLETED, ProcessType.FLOW,
File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath
) )
@ -31,6 +43,58 @@ class BaseInfoFromFileTest {
assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01") assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01")
} }
@ParameterizedTest
@MethodSource("names")
fun test(data: TestInfo) {
val gson = Gson()
val result = baseInfoFromFile.readFileInfo(data.input)
JSONAssert.assertEquals(
data.expected,
gson.toJson(result),
false
)
}
data class TestInfo(
val input: ProcessStarted,
val expected: String
)
} companion object {
@JvmStatic
private fun names(): List<Named<TestInfo>> {
return listOf(
Named.of(
"Potato", TestInfo(
ProcessStarted(
Status.COMPLETED, ProcessType.FLOW,
"E:\\input\\Top Clown Findout.1080p.H264.AAC5.1.mkv"
),
"""
{
"status": "COMPLETED",
"title": "Top Clown Findout",
"sanitizedName": "Top Clown Findout"
}
""".trimIndent()
)
),
Named.of("Filename with UHD wild tag", TestInfo(
ProcessStarted(
Status.COMPLETED, ProcessType.FLOW,
"E:\\input\\Wicked.Potato.Chapter.1.2023.UHD.BluRay.2160p.DDP.7.1.DV.HDR.x265.mp4"
),
"""
{
"status": "COMPLETED",
"title": "Wicked Potato Chapter 1",
"sanitizedName": "Wicked Potato Chapter 1"
}
""".trimIndent()
)
)
)
}
}
}*/

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing.coordinator.reader
/*
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultProducer
import org.apache.kafka.clients.admin.AdminClient
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.InjectMocks
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
@ExtendWith(MockitoExtension::class)
class KafkaTestBase {
@Mock
lateinit var kafkaTemplate: KafkaTemplate<String, String>
@Mock
lateinit var adminClient: AdminClient
@InjectMocks
lateinit var defaultProducer: DefaultProducer
@InjectMocks
lateinit var defaultConsumer: DefaultConsumer
@InjectMocks
lateinit var defaultListener: DefaultMessageListener
}*/

View File

@ -0,0 +1,168 @@
package no.iktdev.mediaprocessing.coordinator.reader
/*
import com.google.gson.Gson
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.streamit.library.kafka.dto.Status
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Named
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import org.mockito.Mock
import org.skyscreamer.jsonassert.JSONAssert
import org.springframework.beans.factory.annotation.Autowired
class ParseVideoFileStreamsTest {
@Autowired
private lateinit var testBase: KafkaTestBase
@Mock
lateinit var coordinatorProducer: CoordinatorProducer
val parseVideoStreams = ParseVideoFileStreams(coordinatorProducer)
@ParameterizedTest
@MethodSource("streams")
fun parseStreams(data: TestInfo) {
val gson = Gson()
val converted = gson.fromJson(data.input, JsonObject::class.java)
val result = parseVideoStreams.parseStreams(ReaderPerformed(
Status.COMPLETED,
file = "ignore",
output = converted
))
JSONAssert.assertEquals(
data.expected,
gson.toJson(result),
false
)
}
data class TestInfo(
val input: String,
val expected: String
)
companion object {
@JvmStatic
fun streams(): List<Named<TestInfo>> {
return listOf(
Named.of(
"Top Clown streams", TestInfo(
"""
{
"streams": [
{
"index": 0,
"codec_name": "h264",
"codec_long_name": "H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10",
"profile": "Main",
"codec_type": "video",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"width": 1920,
"height": 1080,
"coded_width": 1920,
"coded_height": 1080,
"closed_captions": 0,
"film_grain": 0,
"has_b_frames": 0,
"sample_aspect_ratio": "1:1",
"display_aspect_ratio": "16:9",
"pix_fmt": "yuv420p",
"level": 40,
"chroma_location": "left",
"field_order": "progressive",
"refs": 1,
"is_avc": "true",
"nal_length_size": "4",
"r_frame_rate": "24000/1001",
"avg_frame_rate": "24000/1001",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"bits_per_raw_sample": "8",
"extradata_size": 55,
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
}
},
{
"index": 1,
"codec_name": "aac",
"codec_long_name": "AAC (Advanced Audio Coding)",
"profile": "HE-AAC",
"codec_type": "audio",
"codec_tag_string": "[0][0][0][0]",
"codec_tag": "0x0000",
"sample_fmt": "fltp",
"sample_rate": "48000",
"channels": 6,
"channel_layout": "5.1",
"bits_per_sample": 0,
"initial_padding": 0,
"r_frame_rate": "0/0",
"avg_frame_rate": "0/0",
"time_base": "1/1000",
"start_pts": 0,
"start_time": "0.000000",
"extradata_size": 2,
"disposition": {
"default": 1,
"dub": 0,
"original": 0,
"comment": 0,
"lyrics": 0,
"karaoke": 0,
"forced": 0,
"hearing_impaired": 0,
"visual_impaired": 0,
"clean_effects": 0,
"attached_pic": 0,
"timed_thumbnails": 0,
"non_diegetic": 0,
"captions": 0,
"descriptions": 0,
"metadata": 0,
"dependent": 0,
"still_image": 0
},
"tags": {
"language": "eng"
}
}
]
}
""".trimIndent(),
"""
""".trimIndent()
)
)
)
}
}
}*/

View File

@ -5,7 +5,7 @@ import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
//@Service @Service
class EncodeService { class EncodeService {
/*private val log = KotlinLogging.logger {} /*private val log = KotlinLogging.logger {}
val io = Coroutines.io() val io = Coroutines.io()

View File

@ -1,14 +1,23 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.mediaprocessing.shared.common.kafka.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.annotation.Import
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import javax.annotation.PostConstruct
@Service @Service
abstract class ProcessingService(var producer: CoordinatorProducer, var listener: DefaultMessageListener) { @Import(DefaultMessageListener::class)
abstract class ProcessingService() {
val io = Coroutines.io() val io = Coroutines.io()
abstract fun onResult(referenceId: String, data: MessageDataWrapper)
@Autowired
lateinit var producer: CoordinatorProducer
abstract fun onResult(referenceId: String, data: MessageDataWrapper)
@PostConstruct
abstract fun onReady(): Unit
} }

View File

@ -3,7 +3,6 @@ package no.iktdev.mediaprocessing.shared.common
import java.io.File import java.io.File
object SharedConfig { object SharedConfig {
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input")
val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output")

View File

@ -44,6 +44,23 @@ fun <T> insertWithSuccess(block: () -> T): Boolean {
} }
} }
fun <T> executeOrException(block: () -> T): Exception? {
return try {
transaction {
try {
block()
null
} catch (e: Exception) {
// log the error here or handle the exception as needed
e
}
}
} catch (e: Exception) {
e.printStackTrace()
return e
}
}
fun <T> executeWithStatus(block: () -> T): Boolean { fun <T> executeWithStatus(block: () -> T): Boolean {
return try { return try {
transaction { transaction {

View File

@ -29,7 +29,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp
else -> sanitizedName else -> sanitizedName
} }
val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped val nonResolutioned = movieEx.removeResolutionAndBeyond(stripped) ?: stripped
return MovieInfo(cleanup(nonResolutioned), cleanup(nonResolutioned)) return MovieInfo(type = "movie", cleanup(nonResolutioned), cleanup(nonResolutioned))
} }
private fun determineSerieFileName(): EpisodeInfo? { private fun determineSerieFileName(): EpisodeInfo? {
@ -58,7 +58,7 @@ class FileNameDeterminate(val title: String, val sanitizedName: String, val ctyp
} }
} else title } else title
val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim() val fullName = "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim()
return EpisodeInfo(title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName) return EpisodeInfo(type= "serie", title, episodeNumber.toInt(), seasonNumber.toInt(), episodeTitle, fullName)
} }
private fun determineUndefinedFileName(): VideoInfo? { private fun determineUndefinedFileName(): VideoInfo? {

View File

@ -1,12 +1,15 @@
package no.iktdev.mediaprocessing.shared.common.persistance package no.iktdev.mediaprocessing.shared.common.persistance
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.insert
import java.sql.SQLIntegrityConstraintViolationException
open class PersistentDataStore { open class PersistentDataStore {
fun storeMessage(event: String, message: Message<*>): Boolean { fun storeMessage(event: String, message: Message<*>): Boolean {
return executeWithStatus { val exception = executeOrException {
events.insert { events.insert {
it[events.referenceId] = message.referenceId it[events.referenceId] = message.referenceId
it[events.eventId] = message.eventId it[events.eventId] = message.eventId
@ -14,6 +17,15 @@ open class PersistentDataStore {
it[events.data] = message.dataAsJson() it[events.data] = message.dataAsJson()
} }
} }
return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) {
(exception as ExposedSQLException).errorCode == 1062
}
else {
exception.printStackTrace()
false
}
}
} }
} }

View File

@ -16,11 +16,11 @@ data class PersistentMessage(
fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? { fun fromRowToPersistentMessage(row: ResultRow, dez: DeserializingRegistry): PersistentMessage? {
val kev = try { val kev = try {
KafkaEvents.valueOf(row[events.event]) KafkaEvents.toEvent(row[events.event])
} catch (e: IllegalArgumentException) { } catch (e: IllegalArgumentException) {
e.printStackTrace() e.printStackTrace()
return null return null
} }?: return null
val dzdata = dez.deserializeData(kev, row[events.data]) val dzdata = dez.deserializeData(kev, row[events.data])
return PersistentMessage( return PersistentMessage(
referenceId = row[events.referenceId], referenceId = row[events.referenceId],

View File

@ -15,7 +15,7 @@ dependencies {
implementation("com.google.code.gson:gson:2.8.9") implementation("com.google.code.gson:gson:2.8.9")
implementation("io.github.microutils:kotlin-logging-jvm:2.0.11") implementation("io.github.microutils:kotlin-logging-jvm:2.0.11")
implementation("org.springframework.kafka:spring-kafka:3.0.1") implementation("org.springframework.kafka:spring-kafka:2.8.5")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0") implementation("com.fasterxml.jackson.core:jackson-databind:2.13.0")
implementation(project(mapOf("path" to ":shared:contract"))) implementation(project(mapOf("path" to ":shared:contract")))
@ -27,13 +27,15 @@ dependencies {
testImplementation(platform("org.junit:junit-bom:5.9.1")) testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("junit:junit:4.13.2") testImplementation("junit:junit:4.13.2")
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-params:5.8.1")
testImplementation("org.assertj:assertj-core:3.4.1") testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.mockito:mockito-core:3.+") testImplementation("org.mockito:mockito-core:3.+")
testImplementation("org.assertj:assertj-core:3.4.1") testImplementation("org.assertj:assertj-core:3.4.1")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.8.1")
testImplementation("org.mockito:mockito-core:3.10.0") // Oppdater versjonen hvis det er nyere tilgjengelig
testImplementation("org.mockito:mockito-junit-jupiter:3.10.0")
} }
tasks.test { tasks.test {

View File

@ -5,27 +5,37 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.DeserializedConsumerRecord
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecord
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.listener.ContainerProperties import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.listener.MessageListener
import org.springframework.stereotype.Component
import java.lang.IllegalArgumentException import java.lang.IllegalArgumentException
import java.util.* import java.util.*
@Component
open class DefaultMessageListener( open class DefaultMessageListener(
open val topic: String, ) : MessageListener<String, String> {
open val consumer: DefaultConsumer = DefaultConsumer(subId = UUID.randomUUID().toString()),
open var onMessageReceived: (DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) -> Unit = {}
)
: MessageListener<String, String> {
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
@Autowired
private lateinit var consumerFactory: ConsumerFactory<String, String>
var onMessageReceived: (DeserializedConsumerRecord<KafkaEvents, Message<out MessageDataWrapper>>) -> Unit = {
logger.warn { "onMessageReceived has no listener" }
}
private val deserializer = DeserializingRegistry() private val deserializer = DeserializingRegistry()
protected var container: KafkaMessageListenerContainer<String, String>? = null protected var container: KafkaMessageListenerContainer<String, String>? = null
open fun listen() { open fun listen(topic: String) {
val listener = consumer.consumerFactoryListener() val listener = ConcurrentKafkaListenerContainerFactory<String,String>().apply {
consumerFactory = this@DefaultMessageListener.consumerFactory
}
val containerProperties = ContainerProperties(topic).apply { val containerProperties = ContainerProperties(topic).apply {
messageListener = this@DefaultMessageListener messageListener = this@DefaultMessageListener
} }
@ -46,7 +56,7 @@ open class DefaultMessageListener(
override fun onMessage(data: ConsumerRecord<String, String>) { override fun onMessage(data: ConsumerRecord<String, String>) {
val event = try { val event = try {
KafkaEvents.valueOf(data.key()) KafkaEvents.toEvent(data.key())
} catch (e: IllegalArgumentException) { } catch (e: IllegalArgumentException) {
logger.error { "${data.key()} is not a member of KafkaEvents" } logger.error { "${data.key()} is not a member of KafkaEvents" }
null null
@ -60,7 +70,10 @@ open class DefaultMessageListener(
} }
private fun <K, V, KDez, VDez> ConsumerRecord<K, V>.toDeserializedConsumerRecord(keyzz: KDez, valuezz: VDez): DeserializedConsumerRecord<KDez, VDez> { private fun <K, V, KDez, VDez> ConsumerRecord<K, V>.toDeserializedConsumerRecord(
keyzz: KDez,
valuezz: VDez
): DeserializedConsumerRecord<KDez, VDez> {
return DeserializedConsumerRecord( return DeserializedConsumerRecord(
topic = this.topic(), topic = this.topic(),
partition = this.partition(), partition = this.partition(),

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import java.lang.reflect.Type import java.lang.reflect.Type
import kotlin.reflect.KClass import kotlin.reflect.KClass
@ -64,7 +65,15 @@ class DeserializingRegistry {
e.printStackTrace() e.printStackTrace()
} }
} }
// Fallback try {
// Fallback
val type = object : TypeToken<SimpleMessageData>() {}.type
return gson.fromJson<SimpleMessageData>(json, type)
} catch (e: Exception) {
e.printStackTrace()
}
// Default
val type = object : TypeToken<MessageDataWrapper>() {}.type val type = object : TypeToken<MessageDataWrapper>() {}.type
return gson.fromJson<MessageDataWrapper>(json, type) return gson.fromJson<MessageDataWrapper>(json, type)
} }

View File

@ -6,10 +6,15 @@ class KafkaEnv {
companion object { companion object {
val servers: String = System.getenv("KAFKA_BOOTSTRAP_SERVER") ?: "127.0.0.1:9092" val servers: String = System.getenv("KAFKA_BOOTSTRAP_SERVER") ?: "127.0.0.1:9092"
var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}" var consumerId: String = System.getenv("KAFKA_CONSUMER_ID") ?: "LibGenerated-${UUID.randomUUID()}"
var enabled: Boolean = System.getenv("KAFKA_ENABLED").toBoolean()
val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest" val loadMessages: String = System.getenv("KAFKA_MESSAGES_USE") ?: "earliest"
var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents" var kafkaTopic: String = System.getenv("KAFKA_TOPIC") ?: "contentEvents"
val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull() ?: 10
val heartbeatIntervalMilliseconds: Int = System.getenv("KAFKA_HEARTBEAT_INTERVAL_MS")?.toIntOrNull() ?: 2000
val sessionTimeOutMilliseconds: Int = System.getenv("KAFKA_SESSION_INACTIVITY_MS")?.toIntOrNull() ?: (listOf(
metadataTimeoutMinutes,
heartbeatIntervalMilliseconds
).max() * 60)
} }
} }

View File

@ -41,6 +41,8 @@ open class KafkaImplementation {
config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages config[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = KafkaEnv.loadMessages
config[ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG] = KafkaEnv.sessionTimeOutMilliseconds
config[ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG] = KafkaEnv.heartbeatIntervalMilliseconds
log.info { config } log.info { config }
return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer()) return DefaultKafkaConsumerFactory(config, StringDeserializer(), StringDeserializer())
} }

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.shared.kafka.dto package no.iktdev.mediaprocessing.shared.kafka.dto
import com.google.gson.Gson import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.streamit.library.kafka.dto.Status
import java.io.Serializable import java.io.Serializable
import java.lang.reflect.Type import java.lang.reflect.Type
@ -12,6 +13,10 @@ open class MessageDataWrapper(
@Transient open val message: String? = null @Transient open val message: String? = null
) )
data class SimpleMessageData(
override val status: Status,
override val message: String?
) : MessageDataWrapper(status, message)
fun MessageDataWrapper?.isSuccess(): Boolean { fun MessageDataWrapper?.isSuccess(): Boolean {

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -8,6 +9,6 @@ import no.iktdev.streamit.library.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED)
data class MediaStreamsParsePerformed( data class MediaStreamsParsePerformed(
override val status: Status, override val status: Status,
val parsedAsJson: String val streams: ParsedMediaStreams
): MessageDataWrapper(status) ): MessageDataWrapper(status)

View File

@ -1,5 +1,6 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
@ -9,5 +10,5 @@ import no.iktdev.streamit.library.kafka.dto.Status
data class ReaderPerformed( data class ReaderPerformed(
override val status: Status, override val status: Status,
val file: String, //AbsolutePath val file: String, //AbsolutePath
val output: String val output: JsonObject
) : MessageDataWrapper(status) ) : MessageDataWrapper(status)

View File

@ -5,23 +5,25 @@ import no.iktdev.streamit.library.kafka.dto.Status
data class VideoInfoPerformed( data class VideoInfoPerformed(
override val status: Status, override val status: Status,
val data: VideoInfo val info: VideoInfo
) )
: MessageDataWrapper(status) : MessageDataWrapper(status)
data class EpisodeInfo( data class EpisodeInfo(
override val type: String,
val title: String, val title: String,
val episode: Int, val episode: Int,
val season: Int, val season: Int,
val episodeTitle: String?, val episodeTitle: String?,
override val fullName: String override val fullName: String
): VideoInfo(fullName) ): VideoInfo(type, fullName)
data class MovieInfo( data class MovieInfo(
override val type: String,
val title: String, val title: String,
override val fullName: String override val fullName: String
) : VideoInfo(fullName) ) : VideoInfo(type, fullName)
data class SubtitleInfo( data class SubtitleInfo(
val inputFile: String, val inputFile: String,
@ -29,6 +31,7 @@ data class SubtitleInfo(
val language: String val language: String
) )
abstract class VideoInfo( open class VideoInfo(
@Transient open val type: String,
@Transient open val fullName: String @Transient open val fullName: String
) )

View File

@ -0,0 +1,29 @@
package no.iktdev.mediaprocessing.shared.kafka
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import org.apache.kafka.clients.admin.AdminClient
import org.junit.jupiter.api.extension.ExtendWith
import org.mockito.InjectMocks
import org.mockito.Mock
import org.mockito.junit.jupiter.MockitoExtension
import org.springframework.kafka.core.KafkaTemplate
@ExtendWith(MockitoExtension::class)
class KafkaTestBase {
@Mock
lateinit var kafkaTemplate: KafkaTemplate<String, String>
@Mock
lateinit var adminClient: AdminClient
/*@InjectMocks
lateinit var defaultProducer: DefaultProducer
@InjectMocks
lateinit var defaultConsumer: DefaultConsumer*/
@InjectMocks
lateinit var defaultListener: DefaultMessageListener
}

View File

@ -1,7 +1,7 @@
import com.fasterxml.jackson.databind.ObjectMapper package no.iktdev.mediaprocessing.shared.kafka
import com.google.gson.Gson import com.google.gson.Gson
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.DefaultConsumer
import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry import no.iktdev.mediaprocessing.shared.kafka.core.DeserializingRegistry
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message