diff --git a/.github/workflows/v2.yml b/.github/workflows/v2.yml
new file mode 100644
index 00000000..367f5f6b
--- /dev/null
+++ b/.github/workflows/v2.yml
@@ -0,0 +1,253 @@
+name: Build V2
+
+on:
+ push:
+ branches:
+ - v2
+ pull_request:
+ branches:
+ - v2
+ workflow_dispatch:
+
+
+jobs:
+ pre-check:
+ runs-on: ubuntu-latest
+ outputs:
+ pyMetadata: ${{ steps.filter.outputs.pyMetadata }}
+ commonCode: ${{ steps.filter.outputs.commonCode }}
+ reader: ${{ steps.filter.outputs.reader }}
+ encode: ${{ steps.filter.outputs.encode }}
+ convert: ${{ steps.filter.outputs.convert }}
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - uses: dorny/paths-filter@v2
+ id: filter
+ with:
+ filters: |
+ apps/pyMetadata:
+ - 'pyMetadata/**'
+ apps/coordinator:
+ - 'coordinator/**'
+ apps/processer:
+ - 'processer/**'
+ apps/converter:
+ - 'converter/**'
+
+ shared:
+ - 'shared/**'
+ # Step to print the outputs from "pre-check" job
+ - name: Print Outputs from pre-check job
+ run: |
+ echo "Apps"
+ echo "app:pyMetadata: ${{ needs.pre-check.outputs.pyMetadata }}"
+ echo "app:coordinator: ${{ needs.pre-check.outputs.coordinator }}"
+ echo "app:processer: ${{ needs.pre-check.outputs.processer }}"
+ echo "app:converter: ${{ needs.pre-check.outputs.converter }}"
+
+ echo "Shared"
+ echo "shared: ${{ needs.pre-check.outputs.shared }}"
+
+ build-shared:
+ runs-on: ubuntu-latest
+ needs: pre-check
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - name: Cache Shared code Gradle dependencies
+ id: cache-gradle
+ uses: actions/cache@v2
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
+
+ - name: Build Shared code
+ if: steps.cache-gradle.outputs.cache-hit != 'true' || needs.pre-check.outputs.commonCode == 'true' || github.event_name == 'workflow_dispatch'
+ run: |
+ cd CommonCode
+ chmod +x ./gradlew
+ ./gradlew build
+
+
+ build-processer:
+ needs: build-shared
+ if: ${{ needs.pre-check.outputs.processer == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
+ runs-on: ubuntu-latest
+ #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - name: Cache Shared Gradle dependencies
+ id: cache-gradle
+ uses: actions/cache@v2
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
+
+ - name: Build Processer module
+ id: build-processer
+ run: |
+ cd apps/processer
+ chmod +x ./gradlew
+ ./gradlew build
+ echo "Build completed"
+
+
+ - name: Generate Docker image tag
+ id: docker-tag
+ run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
+
+ - name: Docker login
+ uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+ with:
+ username: ${{ secrets.DOCKER_HUB_NAME }}
+ password: ${{ secrets.DOCKER_HUB_TOKEN }}
+
+ - name: Build and push Docker image
+ uses: docker/build-push-action@v2
+ with:
+ context: ./apps/processer
+ push: true
+ tags: |
+ bskjon/mediaprocessing-processer:v2-latest
+ bskjon/mediaprocessing-processer:v2-${{ github.sha }}
+ bskjon/mediaprocessing-processer:v2-${{ steps.docker-tag.outputs.tag }}
+
+ build-converter:
+ needs: build-shared
+ if: ${{ needs.pre-check.outputs.converter == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
+ runs-on: ubuntu-latest
+ #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - name: Cache Shared Gradle dependencies
+ id: cache-gradle
+ uses: actions/cache@v2
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
+
+ - name: Build Converter module
+ id: build-converter
+ run: |
+ cd apps/converter
+ chmod +x ./gradlew
+ ./gradlew build
+ echo "Build completed"
+
+
+ - name: Generate Docker image tag
+ id: docker-tag
+ run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
+
+ - name: Docker login
+ uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+ with:
+ username: ${{ secrets.DOCKER_HUB_NAME }}
+ password: ${{ secrets.DOCKER_HUB_TOKEN }}
+
+ - name: Build and push Docker image
+ uses: docker/build-push-action@v2
+ with:
+ context: ./apps/converter
+ push: true
+ tags: |
+ bskjon/mediaprocessing-converter:v2-latest
+ bskjon/mediaprocessing-converter:v2-${{ github.sha }}
+ bskjon/mediaprocessing-converter:v2-${{ steps.docker-tag.outputs.tag }}
+
+ build-coordinator:
+ needs: build-shared
+ if: ${{ needs.pre-check.outputs.coordinator == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
+ runs-on: ubuntu-latest
+ #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - name: Cache Shared Gradle dependencies
+ id: cache-gradle
+ uses: actions/cache@v2
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
+
+ - name: Build Coordinator module
+ id: build-coordinator
+ run: |
+ cd apps/coordinator
+ chmod +x ./gradlew
+ ./gradlew build
+ echo "Build completed"
+
+
+ - name: Generate Docker image tag
+ id: docker-tag
+ run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
+
+ - name: Docker login
+ uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+ with:
+ username: ${{ secrets.DOCKER_HUB_NAME }}
+ password: ${{ secrets.DOCKER_HUB_TOKEN }}
+
+ - name: Build and push Docker image
+ uses: docker/build-push-action@v2
+ with:
+ context: ./apps/coordinator
+ push: true
+ tags: |
+ bskjon/mediaprocessing-coordinator:v2-latest
+ bskjon/mediaprocessing-coordinator:v2-${{ github.sha }}
+ bskjon/mediaprocessing-coordinator:v2-${{ steps.docker-tag.outputs.tag }}
+
+ build-pymetadata:
+ needs: pre-check
+ if: ${{ needs.pre-check.outputs.pyMetadata == 'true' || github.event_name == 'workflow_dispatch' }}
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v2
+
+ - name: Build pyMetadata module
+ id: build-pymetadata
+ run: |
+ if [[ "${{ steps.check-pymetadata.outputs.changed }}" == "true" || "${{ github.event_name }}" == "push" || "${{ github.event_name }}" == "workflow_dispatch" ]]; then
+ cd apps/pyMetadata
+ # Add the necessary build steps for your Python module here
+ echo "Build completed"
+ else
+ echo "pyMetadata has not changed. Skipping pyMetadata module build."
+ echo "::set-output name=job_skipped::true"
+ fi
+
+ - name: Generate Docker image tag
+ id: docker-tag
+ run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
+
+ - name: Docker login
+ uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
+ with:
+ username: ${{ secrets.DOCKER_HUB_NAME }}
+ password: ${{ secrets.DOCKER_HUB_TOKEN }}
+
+ - name: Build and push Docker image
+ uses: docker/build-push-action@v2
+ with:
+ context: ./apps/pyMetadata
+ push: true
+ tags: |
+ bskjon/mediaprocessing-pymetadata:v2-latest
+ bskjon/mediaprocessing-pymetadata:v2-${{ github.sha }}
+ bskjon/mediaprocessing-pymetadata:v2-${{ steps.docker-tag.outputs.tag }}
\ No newline at end of file
diff --git a/.idea/gradle.xml b/.idea/gradle.xml
index 64cf64d5..b74e6df4 100644
--- a/.idea/gradle.xml
+++ b/.idea/gradle.xml
@@ -11,7 +11,6 @@
-
diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt
index 36934c8e..5948d465 100644
--- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt
+++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt
@@ -16,9 +16,8 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.util.*
@@ -111,6 +110,8 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
}
fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {
+ if (event.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED)
+ return
val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}")
coordinator.producer.sendMessage(
referenceId = event.referenceId,
@@ -126,7 +127,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
status = Status.COMPLETED,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
- result = result.map { it.absolutePath }
+ outFiles = result.map { it.absolutePath }
)
} catch (e: Converter.FileUnavailableException) {
e.printStackTrace()
@@ -135,7 +136,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
- result = emptyList()
+ outFiles = emptyList()
)
} catch (e : Converter.FileIsNullOrEmpty) {
e.printStackTrace()
@@ -144,16 +145,8 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
message = e.message,
producedBy = serviceId,
derivedFromEventId = converter.eventId,
- result = emptyList()
+ outFiles = emptyList()
)
}
}
-
-
-
- data class PendingWorkerCache(
- val referenceId: String,
- val eventId: String,
- val requiresEventId: String
- )
}
\ No newline at end of file
diff --git a/apps/coordinator/build.gradle.kts b/apps/coordinator/build.gradle.kts
index 20868ef7..34e82abf 100644
--- a/apps/coordinator/build.gradle.kts
+++ b/apps/coordinator/build.gradle.kts
@@ -36,6 +36,8 @@ dependencies {
implementation("org.json:json:20210307")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
+ implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha22")
+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt
index 29a7d898..967d4781 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/Coordinator.kt
@@ -17,7 +17,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service
import java.io.File
@@ -41,7 +41,7 @@ class Coordinator() {
public fun startProcess(file: File, type: ProcessType) {
val processStartEvent = ProcessStarted(
- status = Status.STARTED,
+ status = Status.COMPLETED,
file = file.absolutePath,
type = type
)
@@ -184,6 +184,7 @@ class Coordinator() {
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
+ status = Status.COMPLETED,
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
@@ -206,6 +207,7 @@ class Coordinator() {
val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach {
FfmpegWorkRequestCreated(
+ status = Status.COMPLETED,
inputFile = data.inputFile,
arguments = it.arguments,
outFile = it.outputFile
@@ -218,6 +220,7 @@ class Coordinator() {
}
val outFile = File(it.outputFile)
ConvertWorkerRequest(
+ status = Status.COMPLETED,
requiresEventId = message.eventId,
inputFile = it.outputFile,
true,
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt
index cca8ee3b..fed76449 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/CoordinatorApplication.kt
@@ -1,14 +1,20 @@
package no.iktdev.mediaprocessing.coordinator
-import kotlinx.coroutines.launch
import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines
+import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
+import no.iktdev.streamit.library.db.tables.*
+import no.iktdev.streamit.library.db.tables.helper.cast_errors
+import no.iktdev.streamit.library.db.tables.helper.data_audio
+import no.iktdev.streamit.library.db.tables.helper.data_video
+import org.jetbrains.exposed.sql.SchemaUtils
+import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext
@@ -27,13 +33,37 @@ fun getContext(): ApplicationContext? {
}
fun main(args: Array) {
+ Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener {
+ override fun onUpdated(value: Throwable) {
+ value.printStackTrace()
+ }
+ })
val dataSource = MySqlDataSource.fromDatabaseEnv();
- Coroutines.default().launch {
- dataSource.createDatabase()
- dataSource.createTables(
- events
- )
+ dataSource.createDatabase()
+
+ val kafkaTables = listOf(
+ events, // For kafka
+ )
+
+ dataSource.createTables(*kafkaTables.toTypedArray())
+
+ val tables = arrayOf(
+ catalog,
+ genre,
+ movie,
+ serie,
+ subtitle,
+ summary,
+ users,
+ progress,
+ data_audio,
+ data_video,
+ cast_errors
+ )
+ transaction {
+ SchemaUtils.createMissingTablesAndColumns(*tables)
}
+
context = runApplication(*args)
printSharedConfig()
}
@@ -47,5 +77,5 @@ fun printSharedConfig() {
log.info { "Database: ${DatabaseConfig.database}@${DatabaseConfig.address}:${DatabaseConfig.port}" }
log.info { "Username: ${DatabaseConfig.username}" }
- log.info { "Password: ${ if(DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }
+ log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }
}
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/MetadataMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/MetadataMapping.kt
index 5b4394c9..ac65dab7 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/MetadataMapping.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/MetadataMapping.kt
@@ -1,15 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.contract.reader.MetadataCoverDto
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
-import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
-import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.pyMetadata
+import no.iktdev.mediaprocessing.shared.contract.reader.SummaryInfo
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
-import no.iktdev.streamit.library.kafka.dto.Status
-
+import java.io.File
class MetadataMapping(val events: List) {
@@ -17,19 +14,45 @@ class MetadataMapping(val events: List) {
fun map(): MetadataDto? {
val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
+ val mediaReadOut = events.find { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed?
+ val coverDownloadTask = events.find { it.data is CoverInfoPerformed }?.data as CoverInfoPerformed?
+ val cover = events.find { it.data is CoverDownloadWorkPerformed }?.data as CoverDownloadWorkPerformed?
+
if (!baseInfo.isSuccess()) {
return null
}
- return null
- /*return MetadataDto(
- title = meta?.data?.title ?: return null,
- type = meta?.data?.type ?: return null,
+ val videoInfo = mediaReadOut?.toValueObject()
+ val mediaCover = if (coverDownloadTask != null || cover != null) {
+ val coverFile = cover?.coverFile?.let { File(it) }
+ MetadataCoverDto(
+ cover = coverFile?.name,
+ coverFile = cover?.coverFile,
+ coverUrl = coverDownloadTask?.url
+ )
+ } else null
- )*/
+ return if (meta != null || videoInfo != null) {
+ MetadataDto(
+ title = meta?.data?.title ?: videoInfo?.fullName ?: return null,
+ collection = baseInfo?.title ?: return null,
+ cover = mediaCover,
+ type = meta?.data?.type ?: videoInfo?.type ?: return null,
+ summary = meta?.data?.summary?.filter {it.summary != null }?.map { SummaryInfo(language = it.language, summary = it.summary!! ) } ?: emptyList(),
+ genres = meta?.data?.genres ?: emptyList(),
+ )
+ } else null
+ }
+
+ fun getCollection(): String? {
+ val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
+ if (!baseInfo.isSuccess()) {
+ return null
+ }
+ return baseInfo?.title
}
}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/OutputFilesMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/OutputFilesMapping.kt
new file mode 100644
index 00000000..da72553a
--- /dev/null
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/OutputFilesMapping.kt
@@ -0,0 +1,33 @@
+package no.iktdev.mediaprocessing.coordinator.mapping
+
+import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.contract.reader.OutputFilesDto
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
+import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
+
+class OutputFilesMapping(val events: List) {
+
+ fun mapTo(): OutputFilesDto {
+
+ val videoResult = events.filter { it.data is ProcesserEncodeWorkPerformed }
+ .map { it.data as ProcesserEncodeWorkPerformed }
+
+ val subtitleResult = events.filter { it.data is ProcesserExtractWorkPerformed && it.data.isSuccess() }.map { it.data as ProcesserExtractWorkPerformed }
+ val convertedSubtitleResult = events.filter { it.data is ConvertWorkPerformed && it.data.isSuccess() }.map { it.data as ConvertWorkPerformed }
+
+
+
+ return OutputFilesDto(
+ video = videoResult.lastOrNull { it.isSuccess() }?.outFile,
+ subtitles = toSubtitleList(subtitleResult, convertedSubtitleResult)
+ )
+ }
+
+ private fun toSubtitleList(extracted: List, converted: List): List {
+ val sub1 = extracted.mapNotNull { it.outFile }
+ val sub2 = converted.flatMap { it.outFiles }
+ return sub1 + sub2
+ }
+}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt
index a492de20..56115ae7 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/ProcessMapping.kt
@@ -10,12 +10,15 @@ class ProcessMapping(val events: List) {
fun map(): MediaProcessedDto? {
val referenceId = events.firstOrNull()?.referenceId ?: return null
val processStarted = getProcessStarted()
+ val meta = MetadataMapping(events)
return MediaProcessedDto(
referenceId = referenceId,
process = processStarted?.type,
inputFile = processStarted?.file,
- metadata = MetadataMapping(events).map(),
- outputFiles = null
+ collection = meta.getCollection(),
+ metadata = meta.map(),
+ videoDetails = VideoDetailsMapper(events).mapTo(),
+ outputFiles = OutputFilesMapping(events).mapTo()
)
}
@@ -23,29 +26,37 @@ class ProcessMapping(val events: List) {
return events.lastOrNull { it.data is ProcessStarted }?.data as ProcessStarted?
}
+
fun waitsForEncode(): Boolean {
- val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } != null
- val performed = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } != null
- val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED } != null
- return !(isSkipped || (arguments && performed))
+ val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED }
+ val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED}
+
+ val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED }
+ val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED }
+
+ return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
}
fun waitsForExtract(): Boolean {
- val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED } != null
- val performed = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } != null
- val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED } != null
- return !(isSkipped || (arguments && performed))
+ val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }
+ val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
+
+ val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED }
+ val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED }
+
+ return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
}
fun waitsForConvert(): Boolean {
- val arguments = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } != null
- val performed = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } != null
- val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED } != null
- return !(isSkipped || (arguments && performed))
+ val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED }
+ val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED }
+ val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED }
+
+ return created.size > performed.size + isSkipped.size
}
fun canCollect(): Boolean {
- return waitsForEncode() && waitsForExtract() && waitsForConvert()
+ return (!waitsForEncode() && !waitsForExtract() && !waitsForConvert())
}
}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/VideoDetailsMapper.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/VideoDetailsMapper.kt
new file mode 100644
index 00000000..36cf1a5e
--- /dev/null
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/mapping/VideoDetailsMapper.kt
@@ -0,0 +1,27 @@
+package no.iktdev.mediaprocessing.coordinator.mapping
+
+import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.contract.reader.SerieInfo
+import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
+
+class VideoDetailsMapper(val events: List) {
+
+ fun mapTo(): VideoDetails? {
+ val mediaReadOut = events.lastOrNull { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
+ val proper = mediaReadOut?.toValueObject() ?: return null
+
+ val details = VideoDetails(
+ type = proper.type,
+ fullName = proper.fullName,
+ serieInfo = if (proper !is EpisodeInfo) null else SerieInfo(
+ episodeTitle = proper.episodeTitle,
+ episodeNumber = proper.episode,
+ seasonNumber = proper.season,
+ title = proper.title
+ )
+ )
+ return details
+ }
+}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt
index 5c680a8c..40e30de7 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/BaseInfoFromFile.kt
@@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt
new file mode 100644
index 00000000..e6ef7b9b
--- /dev/null
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CollectAndStoreTask.kt
@@ -0,0 +1,144 @@
+package no.iktdev.mediaprocessing.coordinator.tasks.event
+
+import no.iktdev.mediaprocessing.coordinator.TaskCreator
+import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
+import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
+import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
+import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
+import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
+import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
+import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
+import no.iktdev.streamit.library.db.query.*
+import org.jetbrains.exposed.exceptions.ExposedSQLException
+import org.springframework.stereotype.Service
+import java.io.File
+import java.sql.SQLIntegrityConstraintViolationException
+
+@Service
+class CollectAndStoreTask() : TaskCreator() {
+ override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
+
+ override val requiredEvents: List = listOf(
+ EVENT_PROCESS_STARTED,
+ EVENT_PROCESS_COMPLETED
+ )
+ override val listensForEvents: List = KafkaEvents.entries
+
+
+
+ override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? {
+ val started = events.find { it.event == EVENT_PROCESS_STARTED } ?: return null
+ val completed = events.find { it.event == EVENT_PROCESS_COMPLETED } ?: return null
+ if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) {
+ return null
+ }
+ val mapped = ProcessMapping(events).map() ?: return null
+ val collection = mapped.collection ?: return null
+
+ val subtitlesStored = mapped.outputFiles?.subtitles?.let {
+ storeSubtitles(collection = collection, subtitles = it)
+ } ?: false
+
+ val videoFile = mapped.outputFiles?.video?.let { File(it).name }
+ val videoInfo = mapped.videoDetails
+
+
+ val genres = mapped.metadata?.genres?.let {
+ storeAndGetGenres(it)
+ }
+
+ val catalogId = mapped.metadata?.let { meta ->
+ if (videoInfo == null || videoFile == null)
+ null
+ else
+ storeCatalog(metadata = meta,genres = genres, videoFile = videoFile, videoDetails = videoInfo)
+ } ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null")
+
+ mapped.metadata?.let {
+ storeMetadata(catalogId = catalogId, metadata = it)
+ }
+
+ return SimpleMessageData(Status.COMPLETED)
+ }
+
+ private fun storeSubtitles(collection: String, subtitles: List): Boolean {
+ val result = subtitles.map { subtitle ->
+ val subtitleFile = File(subtitle)
+ val language = subtitleFile.parentFile.name
+ subtitle to executeWithStatus {
+ SubtitleQuery(
+ collection = collection,
+ associatedWithVideo = subtitleFile.nameWithoutExtension,
+ language = language,
+ format = subtitleFile.extension.uppercase(),
+ file = subtitleFile.name
+ ).insert()
+ }
+ }
+ return result.none { !it.second }
+ }
+
+ private fun storeMetadata(catalogId: Int, metadata: MetadataDto) {
+ metadata.summary.forEach {
+ withTransaction {
+ SummaryQuery(
+ cid = catalogId,
+ language = it.language,
+ description = it.summary
+ ).insert()
+ }
+ }
+ }
+
+ private fun storeAndGetGenres(genres: List): String? {
+ return withTransaction {
+ val gq = GenreQuery( *genres.toTypedArray() )
+ gq.insertAndGetIds()
+ gq.getIds().joinToString(",")
+ }
+ }
+
+ private fun storeCatalog(metadata: MetadataDto, videoDetails: VideoDetails, videoFile: String, genres: String?): Int? {
+ val precreatedCatalogQuery = CatalogQuery(
+ title = metadata.title,
+ cover = metadata.cover?.cover,
+ type = metadata.type,
+ collection = metadata.collection,
+ genres = genres
+ )
+
+ val result = when (videoDetails.type) {
+ "serie" -> {
+ val serieInfo = videoDetails.serieInfo ?: throw RuntimeException("SerieInfo missing in VideoDetails for Serie! $videoFile")
+ executeOrException {
+ precreatedCatalogQuery.insertWithSerie(
+ episodeTitle = serieInfo.episodeTitle ?: "",
+ videoFile = videoFile,
+ episode = serieInfo.episodeNumber,
+ season = serieInfo.seasonNumber
+ )
+ }
+ }
+ "movie" -> {
+ executeOrException {
+ precreatedCatalogQuery.insertWithMovie(videoFile)
+ }
+ }
+ else -> throw RuntimeException("${videoDetails.type} is not supported!")
+ }
+ val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
+ return if (result == null || ignoreException ) {
+ return withTransaction {
+ precreatedCatalogQuery.getId()
+ }
+ } else null
+ }
+
+
+}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt
new file mode 100644
index 00000000..a973dcf7
--- /dev/null
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/CompleteTask.kt
@@ -0,0 +1,56 @@
+package no.iktdev.mediaprocessing.coordinator.tasks.event
+
+import no.iktdev.mediaprocessing.coordinator.TaskCreator
+import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
+import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
+import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
+import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
+import org.springframework.stereotype.Service
+
+@Service
+class CompleteTask() : TaskCreator() {
+ override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED
+
+ override val requiredEvents: List = listOf(
+ EVENT_PROCESS_STARTED,
+ EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
+ EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
+ )
+ override val listensForEvents: List = KafkaEvents.entries
+
+
+
+ override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? {
+ val started = events.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED } ?: return null
+ if (!started.data.isSuccess()) {
+ return null
+ }
+
+ val receivedEvents = events.map { it.event }
+
+ val requiresOneOf = listOf(
+ EVENT_MEDIA_EXTRACT_PARAMETER_CREATED,
+ EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
+ EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED,
+ EVENT_WORK_CONVERT_CREATED
+ )
+
+ if (!requiresOneOf.any { it in receivedEvents }) {
+ log.info { "Can't complete at this moment. Missing required event" }
+ return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event")
+ }
+
+
+
+
+ val mapper = ProcessMapping(events)
+ if (mapper.canCollect()) {
+ return ProcessCompleted(Status.COMPLETED)
+ }
+ return null
+ }
+}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt
new file mode 100644
index 00000000..95a4d31f
--- /dev/null
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/DownloadAndStoreCoverTask.kt
@@ -0,0 +1,57 @@
+package no.iktdev.mediaprocessing.coordinator.tasks.event
+
+import kotlinx.coroutines.runBlocking
+import no.iktdev.mediaprocessing.coordinator.TaskCreator
+import no.iktdev.mediaprocessing.shared.common.DownloadClient
+import no.iktdev.mediaprocessing.shared.common.getComputername
+import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
+import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverDownloadWorkPerformed
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
+import org.springframework.stereotype.Service
+import java.io.File
+import java.util.*
+
+@Service
+class DownloadAndStoreCoverTask: TaskCreator() {
+ val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
+ override val producesEvent: KafkaEvents
+ get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED
+
+ override val requiredEvents: List
+ get() = listOf(
+ KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED,
+ KafkaEvents.EVENT_MEDIA_READ_OUT_COVER,
+ KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
+ )
+ override fun prerequisitesRequired(events: List): List<() -> Boolean> {
+ return super.prerequisitesRequired(events) + listOf {
+ isPrerequisiteDataPresent(events)
+ }
+ }
+
+ override fun onProcessEvents(event: PersistentMessage, events: List): MessageDataWrapper? {
+ val cover = events.find { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_COVER }
+ if (cover == null || cover.data !is CoverInfoPerformed) {
+ return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId")
+ }
+ val coverData = cover.data as CoverInfoPerformed
+ val outDir = File(coverData.outDir)
+ if (!outDir.exists())
+ return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId")
+
+ val client = DownloadClient(coverData.url, File(coverData.outDir), coverData.outFileBaseName)
+ val result = runBlocking {
+ client.download()
+ }
+ return if (result == null) {
+ SimpleMessageData(Status.ERROR, "Could not download cover, check logs")
+ } else {
+ val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR
+ CoverDownloadWorkPerformed(status = status, coverFile = result.absolutePath)
+ }
+ }
+}
\ No newline at end of file
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt
index a34a3f91..2d61e9fa 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToCoverTask.kt
@@ -8,7 +8,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerforme
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
@Service
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt
index 0e220cdf..916fb7b6 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/MetadataAndBaseInfoToFileOut.kt
@@ -14,7 +14,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerforme
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt
index c1ead14c..dab956fa 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ParseVideoFileStreams.kt
@@ -11,7 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
@Service
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt
index f017ded5..24800d0c 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ReadVideoFileStreams.kt
@@ -12,7 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt
index b77fd762..0c877b92 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/EncodeArgumentCreatorTask.kt
@@ -9,7 +9,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt
index 52678033..c7c87a8b 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/event/ffmpeg/ExtractArgumentCreatorTask.kt
@@ -12,7 +12,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service
import java.io.File
diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt
index 1ee60ca6..5672c595 100644
--- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt
+++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasks/input/watcher/InputDirectoryWatcher.kt
@@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
+import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
@@ -51,6 +52,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
}
when (it.kind) {
Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved)
+ Initialized -> { /* Do nothing */ }
else -> {
if (it.file.isFile && it.file.isSupportedVideoFile()) {
queue.addToQueue(it.file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)
diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt
index 299e8ce9..2e94927c 100644
--- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt
+++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt
@@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Named
import org.junit.jupiter.api.Test
diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt
index 5484e410..da0b05f9 100644
--- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt
+++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/ParseVideoFileStreamsTest.kt
@@ -4,7 +4,7 @@ import com.google.gson.Gson
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.*
diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt
index dfa76e55..839809aa 100644
--- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt
+++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt
@@ -14,10 +14,10 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
@@ -136,7 +136,8 @@ class EncodeService: TaskCreator() {
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId)
}
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
- FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId))
+ ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
+ )
clearWorker()
}
@@ -150,7 +151,8 @@ class EncodeService: TaskCreator() {
}
log.info { "Encode failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
- FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId))
+ ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)
+ )
sendProgress(info = info, ended = true)
clearWorker()
}
diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt
index cc979aef..7404e825 100644
--- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt
+++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt
@@ -15,10 +15,10 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
@@ -141,7 +141,11 @@ class ExtractService: TaskCreator() {
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
- FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
+ ProcesserExtractWorkPerformed(
+ status = Status.COMPLETED,
+ producedBy = extractServiceId,
+ derivedFromEventId = runner.eventId,
+ outFile = runner.info.outFile)
)
log.info { "Extract is releasing worker" }
clearWorker()
@@ -156,7 +160,7 @@ class ExtractService: TaskCreator() {
}
log.info { "Extract failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
- FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
+ ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
)
sendState(info, ended= true)
clearWorker()
diff --git a/apps/pyMetadata/app.py b/apps/pyMetadata/app.py
index 60bbd83d..ceb9ef58 100644
--- a/apps/pyMetadata/app.py
+++ b/apps/pyMetadata/app.py
@@ -7,6 +7,7 @@ import uuid
import threading
import json
import time
+
from kafka import KafkaConsumer, KafkaProducer
from fuzzywuzzy import fuzz
from sources.result import DataResult, Metadata
@@ -18,8 +19,8 @@ from sources.select import UseSource
# Konfigurer Kafka-forbindelsen
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092"
-consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"Metadata-{uuid.uuid4()}"
-kafka_topic = os.environ.get("KAFKA_TOPIC") or "127.0.0.1:9092"
+consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"MetadataConsumer"
+kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents"
# Konfigurer logging
logging.basicConfig(
@@ -154,7 +155,7 @@ class MessageHandlerThread(threading.Thread):
logger.info("Not in cache: %s", name)
logger.info("Searching in sources for information about %s", name)
result: Optional[DataResult] = UseSource(title=name).select_result()
- if (result.status == "SUCCESS"):
+ if (result.status == "COMPLETED"):
logger.info("Storing response for %s in in-memory cache", name)
ResultCache.add(name, result)
return result
diff --git a/apps/pyMetadata/sources/anii.py b/apps/pyMetadata/sources/anii.py
index eaf9576e..86b7f774 100644
--- a/apps/pyMetadata/sources/anii.py
+++ b/apps/pyMetadata/sources/anii.py
@@ -1,5 +1,5 @@
from AnilistPython import Anilist
-from .result import Metadata, DataResult
+from .result import Metadata, DataResult, Summary
class metadata():
name: str = None
@@ -17,19 +17,24 @@ class metadata():
title = result.get("name_english", None),
altTitle = [result.get("name_romaji", [])],
cover = result.get("cover_image", None),
- summary = result.get("desc", None),
+ summary = [
+ Summary(
+ language = "eng",
+ summary = result.get("desc", None)
+ )
+ ],
type = 'movie' if result.get('airing_format', '').lower() == 'movie' else 'serie',
genres = result.get('genres', []),
source="anii",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
- return DataResult("SUCCESS", None, None)
+ return DataResult("COMPLETED", None, None)
- return DataResult("SUCCESS", None, meta)
+ return DataResult("COMPLETED", None, meta)
except IndexError as ingore:
- return DataResult(statusType="SUCCESS", message=f"No result for {self.name}")
+ return DataResult(statusType="COMPLETED", message=f"No result for {self.name}")
except Exception as e:
return DataResult(statusType="ERROR", message=str(e))
\ No newline at end of file
diff --git a/apps/pyMetadata/sources/imdb.py b/apps/pyMetadata/sources/imdb.py
index d60ddad5..8c751d26 100644
--- a/apps/pyMetadata/sources/imdb.py
+++ b/apps/pyMetadata/sources/imdb.py
@@ -1,5 +1,5 @@
import imdb
-from .result import Metadata, DataResult
+from .result import Metadata, DataResult, Summary
class metadata():
name: str = None
@@ -19,15 +19,20 @@ class metadata():
title = result.get("title", None),
altTitle = [result.get("localized title", [])],
cover = result.get("cover url", None),
- summary = result.get("plot outline", None),
+ summary = [
+ Summary(
+ language = "eng",
+ summary = result.get("plot outline", None)
+ )
+ ],
type = 'movie' if result.get('kind', '').lower() == 'movie' else 'serie',
genres = result.get('genres', []),
source="imdb",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
- return DataResult("SUCCESS", None, None)
+ return DataResult("COMPLETED", None, None)
- return DataResult("SUCCESS", None, meta)
+ return DataResult("COMPLETED", None, meta)
except Exception as e:
return DataResult(status="ERROR", data=None, message=str(e))
\ No newline at end of file
diff --git a/apps/pyMetadata/sources/mal.py b/apps/pyMetadata/sources/mal.py
index 48062010..6f405127 100644
--- a/apps/pyMetadata/sources/mal.py
+++ b/apps/pyMetadata/sources/mal.py
@@ -1,5 +1,5 @@
from mal import *
-from .result import Metadata, DataResult
+from .result import Metadata, DataResult, Summary
class metadata():
name: str = None
@@ -11,21 +11,26 @@ class metadata():
try:
search = AnimeSearch(self.name)
if (len(search.results) == 0):
- return DataResult(status="SUCCESS", message="No results")
+ return DataResult(status="SKIPPED", message="No results")
anime = Anime(search.results[0].mal_id)
meta = Metadata(
title = anime.title,
altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName],
cover = anime.image_url,
- summary = anime.synopsis,
+ summary = [
+ Summary(
+ language = "eng",
+ summary = anime.synopsis
+ )
+ ],
type = 'movie' if anime.type.lower() == 'movie' else 'serie',
genres = anime.genres,
source="mal",
usedTitle=self.name
)
if (meta.title is None) or (meta.type is None):
- return DataResult("SUCCESS", None, None)
+ return DataResult("COMPLETED", None, None)
- return DataResult("SUCCESS", None, meta)
+ return DataResult("COMPLETED", None, meta)
except Exception as e:
return DataResult(status="ERROR", message=str(e))
\ No newline at end of file
diff --git a/apps/pyMetadata/sources/result.py b/apps/pyMetadata/sources/result.py
index a5bfcbbc..13457cad 100644
--- a/apps/pyMetadata/sources/result.py
+++ b/apps/pyMetadata/sources/result.py
@@ -1,13 +1,21 @@
from typing import List, Optional
from dataclasses import dataclass, asdict
+@dataclass
+class Summary:
+ summary: str
+ language: str
+
+ def to_dict(self):
+ return asdict(self)
+
@dataclass
class Metadata:
title: str
altTitle: List[str]
cover: str
type: str # Serie/Movie
- summary: str
+ summary: List[Summary]
genres: List[str]
source: str
usedTitle: str
@@ -15,6 +23,7 @@ class Metadata:
def to_dict(self):
return asdict(self)
+
@dataclass
class DataResult:
status: str # COMPLETED / ERROR
diff --git a/apps/pyMetadata/sources/select.py b/apps/pyMetadata/sources/select.py
index 1cd8c596..a24c479c 100644
--- a/apps/pyMetadata/sources/select.py
+++ b/apps/pyMetadata/sources/select.py
@@ -28,11 +28,11 @@ class UseSource():
mal = MalMetadata(title).lookup()
result: List[WeightedData] = []
- if (anii is not None) and (anii.status == "SUCCESS" and anii.data is not None):
+ if (anii is not None) and (anii.status == "COMPLETED" and anii.data is not None):
result.append(WeightedData(anii, 4))
- if (imdb is not None) and (imdb.status == "SUCCESS" and imdb.data is not None):
+ if (imdb is not None) and (imdb.status == "COMPLETED" and imdb.data is not None):
result.append(WeightedData(imdb, 1))
- if (mal is not None) and (mal.status == "SUCCESS" and mal.data is not None):
+ if (mal is not None) and (mal.status == "COMPLETED" and mal.data is not None):
result.append(WeightedData(mal, 8))
return result
diff --git a/settings.gradle.kts b/settings.gradle.kts
index 1cc5e456..4e0fbf1f 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -3,20 +3,21 @@ plugins {
}
rootProject.name = "MediaProcessing"
include("apps")
+include("apps:ui")
+include("apps:coordinator")
+include("apps:converter")
+include("apps:processer")
+
include("shared")
include("shared:kafka")
-findProject(":shared:kafka")?.name = "kafka"
-include("apps:coordinator")
-findProject(":apps:coordinator")?.name = "coordinator"
-include("apps:ui")
-findProject(":apps:ui")?.name = "ui"
-include("apps:encoder")
-findProject(":apps:encoder")?.name = "encoder"
-include("apps:converter")
-findProject(":apps:converter")?.name = "converter"
include("shared:contract")
-findProject(":shared:contract")?.name = "contract"
include("shared:common")
-findProject(":shared:common")?.name = "common"
-include("apps:processer")
+
+findProject(":apps:ui")?.name = "ui"
+findProject(":apps:coordinator")?.name = "coordinator"
+findProject(":apps:converter")?.name = "converter"
findProject(":apps:processer")?.name = "processer"
+
+findProject(":shared:kafka")?.name = "kafka"
+findProject(":shared:contract")?.name = "contract"
+findProject(":shared:common")?.name = "common"
diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt
index f7c33d26..05728756 100644
--- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt
+++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt
@@ -1,5 +1,8 @@
package no.iktdev.mediaprocessing.shared.common
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.withContext
+import mu.KotlinLogging
import no.iktdev.exfl.using
import java.io.File
import java.io.FileOutputStream
@@ -7,6 +10,7 @@ import java.net.HttpURLConnection
import java.net.URL
open class DownloadClient(val url: String, val outDir: File, val baseName: String) {
+ val log = KotlinLogging.logger {}
protected val http: HttpURLConnection = openConnection()
private val BUFFER_SIZE = 4096
@@ -27,10 +31,17 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin
return ((read * 100) / total)
}
- suspend fun download(): File? {
+ suspend fun download(): File? = withContext(Dispatchers.IO) {
val extension = getExtension()
?: throw UnsupportedFormatException("Provided url does not contain a supported file extension")
val outFile = outDir.using("$baseName.$extension")
+ if (!outDir.exists())
+ return@withContext null
+ if (outFile.exists()) {
+ log.info { "${outFile.name} already exists. Download skipped!" }
+ return@withContext outFile
+ }
+
val inputStream = http.inputStream
val fos = FileOutputStream(outFile, false)
@@ -51,7 +62,7 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin
}
inputStream.close()
fos.close()
- return outFile
+ return@withContext outFile
}
open fun getExtension(): String? {
diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt
index f6966bf6..e08d9422 100644
--- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt
+++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/datasource/TableDefaultOperations.kt
@@ -4,7 +4,7 @@ import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.transactions.transaction
-open class TableDefaultOperations {
+open class TableDefaultOperations {
}
@@ -45,7 +45,7 @@ fun insertWithSuccess(block: () -> T): Boolean {
}
}
-fun executeOrException(block: () -> T): Exception? {
+fun executeOrException(rollbackOnFailure: Boolean = false, block: () -> T): Exception? {
return try {
transaction {
try {
@@ -54,7 +54,8 @@ fun executeOrException(block: () -> T): Exception? {
null
} catch (e: Exception) {
// log the error here or handle the exception as needed
- rollback()
+ if (rollbackOnFailure)
+ rollback()
e
}
@@ -65,6 +66,25 @@ fun executeOrException(block: () -> T): Exception? {
}
}
+fun executeWithResult(block: () -> T): Pair {
+ return try {
+ transaction {
+ try {
+ val res = block()
+ commit()
+ res to null
+ } catch (e: Exception) {
+ // log the error here or handle the exception as needed
+ rollback()
+ null to e
+ }
+ }
+ } catch (e: Exception) {
+ e.printStackTrace()
+ return null to e
+ }
+}
+
fun executeWithStatus(block: () -> T): Boolean {
return try {
transaction {
diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt
index 0c990e0b..6fa21c15 100644
--- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt
+++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/PersistentDataStore.kt
@@ -5,7 +5,6 @@ import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException
-import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
@@ -24,6 +23,7 @@ open class PersistentDataStore {
}
return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) {
+ exception.printStackTrace()
(exception as ExposedSQLException).errorCode == 1062
}
else {
diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt
index 4490bef9..ca7ed292 100644
--- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt
+++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/persistance/events.kt
@@ -11,6 +11,7 @@ object events: IntIdTable() {
val eventId: Column = varchar("eventId", 50)
val event: Column = varchar("event",100)
val data: Column = text("data")
+ //val success: Column = bool("success").default(false)
val created: Column = datetime("created").defaultExpression(CurrentDateTime)
init {
diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MediaProcessedDto.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MediaProcessedDto.kt
index 74a8b9c6..d9f06d58 100644
--- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MediaProcessedDto.kt
+++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MediaProcessedDto.kt
@@ -5,7 +5,9 @@ import no.iktdev.mediaprocessing.shared.contract.ProcessType
data class MediaProcessedDto(
val referenceId: String,
val process: ProcessType?,
+ val collection: String?,
val inputFile: String?,
val metadata: MetadataDto?,
+ val videoDetails: VideoDetails? = null,
val outputFiles: OutputFilesDto?
)
\ No newline at end of file
diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MetadataDto.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MetadataDto.kt
index 11af4a1b..ecf2b204 100644
--- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MetadataDto.kt
+++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/MetadataDto.kt
@@ -2,14 +2,20 @@ package no.iktdev.mediaprocessing.shared.contract.reader
data class MetadataDto(
val title: String,
+ val collection: String,
val type: String,
- val cover: MetadataCoverDto,
+ val cover: MetadataCoverDto?,
+ val summary: List = emptyList(),
+ val genres: List,
+)
+
+data class SummaryInfo(
val summary: String,
- val genres: List
+ val language: String = "eng"
)
data class MetadataCoverDto(
- val cover: String,
- val coverUrl: String,
- val coverFile: String
+ val cover: String?, // ex Fancy.jpeg
+ val coverUrl: String?,
+ val coverFile: String?
)
\ No newline at end of file
diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/OutputFilesDto.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/OutputFilesDto.kt
index af00615a..14375732 100644
--- a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/OutputFilesDto.kt
+++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/OutputFilesDto.kt
@@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.shared.contract.reader
-class OutputFilesDto(
- val videoFile: String,
- val videoArguments: List,
- val subtitleFiles: List
-)
\ No newline at end of file
+
+data class OutputFilesDto(
+ val video: String?, // FullName (Path + name)
+ val subtitles: List = emptyList() // (Path + Name)
+)
diff --git a/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/VideoDetails.kt b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/VideoDetails.kt
new file mode 100644
index 00000000..9fcb6ffa
--- /dev/null
+++ b/shared/contract/src/main/kotlin/no/iktdev/mediaprocessing/shared/contract/reader/VideoDetails.kt
@@ -0,0 +1,14 @@
+package no.iktdev.mediaprocessing.shared.contract.reader
+
+data class VideoDetails(
+ val serieInfo: SerieInfo? = null,
+ val type: String,
+ val fullName: String
+)
+
+data class SerieInfo(
+ val episodeTitle: String? = null,
+ val episodeNumber: Int,
+ val seasonNumber: Int,
+ val title: String
+)
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt
index 968372c0..63877af6 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/DeserializingRegistry.kt
@@ -7,6 +7,8 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
+import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
class DeserializingRegistry {
private val log = KotlinLogging.logger {}
@@ -29,14 +31,16 @@ class DeserializingRegistry {
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java,
- KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java,
- KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java,
- KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null,
- KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null,
+ KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to ProcesserEncodeWorkPerformed::class.java,
+ KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to ProcesserExtractWorkPerformed::class.java,
+ KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to ConvertWorkPerformed::class.java,
+ KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null,
KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null,
KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null,
+
+ KafkaEvents.EVENT_PROCESS_COMPLETED to ProcessCompleted::class.java
)
}
@@ -55,7 +59,7 @@ class DeserializingRegistry {
}
}
// Fallback
- val type = object : TypeToken>() {}.type
+ val type = object : TypeToken>() {}.type
return gson.fromJson>(json, type)
}
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt
index e334da6c..87db5aea 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/core/KafkaEvents.kt
@@ -35,7 +35,8 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
- EVENT_PROCESS_COMPLETED("event:process:completed");
+ EVENT_PROCESS_COMPLETED("event:process:completed"),
+ EVENT_COLLECT_AND_STORE("event::save");
companion object {
fun toEvent(event: String): KafkaEvents? {
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Message.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Message.kt
index 8d7b55dd..7d1789fc 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Message.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Message.kt
@@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto
import com.google.gson.Gson
import com.google.gson.reflect.TypeToken
-import no.iktdev.streamit.library.kafka.dto.Status
import java.lang.reflect.Type
import java.util.*
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt
index 928bd772..fe17395f 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/MessageDataWrapper.kt
@@ -1,7 +1,5 @@
package no.iktdev.mediaprocessing.shared.kafka.dto
-import no.iktdev.streamit.library.kafka.dto.Status
-
open class MessageDataWrapper(
@Transient open val status: Status = Status.ERROR,
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt
index 84544f6b..0e95023c 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/Status.kt
@@ -1,7 +1,7 @@
-package no.iktdev.streamit.library.kafka.dto
+package no.iktdev.mediaprocessing.shared.kafka.dto
enum class Status {
- STARTED,
+ SKIPPED,
COMPLETED,
ERROR
}
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt
index adb3cc36..b3e19fd9 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/BaseInfoPerformed.kt
@@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED)
data class BaseInfoPerformed(
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt
index 9254d13e..4754292e 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkPerformed.kt
@@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED)
data class ConvertWorkPerformed(
@@ -11,5 +11,5 @@ data class ConvertWorkPerformed(
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
- val result: List
+ val outFiles: List
): MessageDataWrapper(status, message)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt
index 1f5de38c..43b01fb0 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ConvertWorkerRequest.kt
@@ -3,12 +3,14 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED)
data class ConvertWorkerRequest(
+ override val status: Status,
val requiresEventId: String? = null,
val inputFile: String,
val allowOverwrite: Boolean,
val outFileBaseName: String,
val outDirectory: String
-): MessageDataWrapper()
\ No newline at end of file
+): MessageDataWrapper(status)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaConvertInfo.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt
similarity index 52%
rename from shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaConvertInfo.kt
rename to shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt
index dc7c23ae..4565bf8f 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaConvertInfo.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverDownloadWorkPerformed.kt
@@ -3,11 +3,11 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
-@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED)
-data class MediaConvertInfo(
+@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED)
+data class CoverDownloadWorkPerformed(
override val status: Status,
- val arguments: List>
-
-): MessageDataWrapper(status)
\ No newline at end of file
+ override val message: String? = null,
+ val coverFile: String
+): MessageDataWrapper(status, message)
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt
index 0fe775e7..851d76d6 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/CoverInfoPerformed.kt
@@ -1,8 +1,11 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_COVER)
data class CoverInfoPerformed(
override val status: Status,
val url: String,
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt
index b7c968ce..7778d425 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkRequestCreated.kt
@@ -3,13 +3,15 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED
)
data class FfmpegWorkRequestCreated(
+ override val status: Status,
val inputFile: String,
val arguments: List,
val outFile: String
-): MessageDataWrapper()
\ No newline at end of file
+): MessageDataWrapper(status)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt
index 50dec32d..e0b8e355 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkerArgumentsCreated.kt
@@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
/**
* @param status Status type
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaEncodeInfo.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaEncodeInfo.kt
deleted file mode 100644
index 20f1c5f7..00000000
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaEncodeInfo.kt
+++ /dev/null
@@ -1,13 +0,0 @@
-package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
-
-import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
-import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
-import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
-
-@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED)
-data class MediaEncodeInfo(
- override val status: Status,
- val arguments: List
-) :
- MessageDataWrapper(status)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaExtractInfo.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaExtractInfo.kt
deleted file mode 100644
index 9fd7b49a..00000000
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaExtractInfo.kt
+++ /dev/null
@@ -1,12 +0,0 @@
-package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
-
-import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
-import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
-import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
-
-@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
-data class MediaExtractInfo(
- override val status: Status,
- val arguments: List>
-) : MessageDataWrapper(status)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt
index 649ccac2..7164a584 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MediaStreamsParsePerformed.kt
@@ -4,7 +4,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED)
data class MediaStreamsParsePerformed(
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt
index b9c41a2a..f7133b61 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/MetadataPerformed.kt
@@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED)
data class MetadataPerformed(
@@ -17,6 +17,11 @@ data class pyMetadata(
val altTitle: List = emptyList(),
val cover: String? = null,
val type: String,
- val summary: String? = null,
+ val summary: List = emptyList(),
val genres: List = emptyList()
+)
+
+data class pySummary(
+ val summary: String?,
+ val language: String = "eng"
)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt
index 4e12bc5a..f53662ae 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessCompleted.kt
@@ -3,8 +3,10 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_COMPLETED)
-data class ProcessCompleted(override val status: Status) : MessageDataWrapper(status) {
+data class ProcessCompleted(
+ override val status: Status
+) : MessageDataWrapper(status) {
}
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt
index 0f250c2e..5cd48614 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ProcessStarted.kt
@@ -4,7 +4,7 @@ import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_STARTED)
data class ProcessStarted(
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt
index 6e560514..8382fd42 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/ReaderPerformed.kt
@@ -4,7 +4,7 @@ import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED)
data class ReaderPerformed(
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt
index a033e472..0ec9191a 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/VideoInfoPerformed.kt
@@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.Gson
import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
data class VideoInfoPerformed(
override val status: Status,
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt
new file mode 100644
index 00000000..eed1469e
--- /dev/null
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserEncodeWorkPerformed.kt
@@ -0,0 +1,18 @@
+package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
+
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
+import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
+import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+
+// Derived from ffmpeg work
+@KafkaBelongsToEvent(
+ KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
+)
+data class ProcesserEncodeWorkPerformed(
+ override val status: Status,
+ override val message: String? = null,
+ val producedBy: String,
+ val derivedFromEventId: String,
+ val outFile: String? = null
+): MessageDataWrapper(status, message)
\ No newline at end of file
diff --git a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt
similarity index 70%
rename from shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt
rename to shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt
index 00eaf027..8a5a4d8f 100644
--- a/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/FfmpegWorkPerformed.kt
+++ b/shared/kafka/src/main/kotlin/no/iktdev/mediaprocessing/shared/kafka/dto/events_result/work/ProcesserExtractWorkPerformed.kt
@@ -1,17 +1,18 @@
-package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
+package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
+// Derived from ffmpeg work
@KafkaBelongsToEvent(
- KafkaEvents.EVENT_WORK_ENCODE_PERFORMED,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
)
-data class FfmpegWorkPerformed(
+data class ProcesserExtractWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
- val derivedFromEventId: String
+ val derivedFromEventId: String,
+ val outFile: String? = null
): MessageDataWrapper(status, message)
\ No newline at end of file
diff --git a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt
index 4b132bdd..ea6716d1 100644
--- a/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt
+++ b/shared/kafka/src/test/kotlin/no/iktdev/mediaprocessing/shared/kafka/SerializationTest.kt
@@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
-import no.iktdev.streamit.library.kafka.dto.Status
+import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test
import org.assertj.core.api.Assertions.assertThat