This commit is contained in:
Brage 2024-01-12 20:46:45 +01:00
parent 90e9d873f0
commit 858b66a883
63 changed files with 908 additions and 179 deletions

253
.github/workflows/v2.yml vendored Normal file
View File

@ -0,0 +1,253 @@
name: Build V2
on:
push:
branches:
- v2
pull_request:
branches:
- v2
workflow_dispatch:
jobs:
pre-check:
runs-on: ubuntu-latest
outputs:
pyMetadata: ${{ steps.filter.outputs.pyMetadata }}
commonCode: ${{ steps.filter.outputs.commonCode }}
reader: ${{ steps.filter.outputs.reader }}
encode: ${{ steps.filter.outputs.encode }}
convert: ${{ steps.filter.outputs.convert }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
apps/pyMetadata:
- 'pyMetadata/**'
apps/coordinator:
- 'coordinator/**'
apps/processer:
- 'processer/**'
apps/converter:
- 'converter/**'
shared:
- 'shared/**'
# Step to print the outputs from "pre-check" job
- name: Print Outputs from pre-check job
run: |
echo "Apps"
echo "app:pyMetadata: ${{ needs.pre-check.outputs.pyMetadata }}"
echo "app:coordinator: ${{ needs.pre-check.outputs.coordinator }}"
echo "app:processer: ${{ needs.pre-check.outputs.processer }}"
echo "app:converter: ${{ needs.pre-check.outputs.converter }}"
echo "Shared"
echo "shared: ${{ needs.pre-check.outputs.shared }}"
build-shared:
runs-on: ubuntu-latest
needs: pre-check
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Cache Shared code Gradle dependencies
id: cache-gradle
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
- name: Build Shared code
if: steps.cache-gradle.outputs.cache-hit != 'true' || needs.pre-check.outputs.commonCode == 'true' || github.event_name == 'workflow_dispatch'
run: |
cd CommonCode
chmod +x ./gradlew
./gradlew build
build-processer:
needs: build-shared
if: ${{ needs.pre-check.outputs.processer == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
runs-on: ubuntu-latest
#if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Cache Shared Gradle dependencies
id: cache-gradle
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
- name: Build Processer module
id: build-processer
run: |
cd apps/processer
chmod +x ./gradlew
./gradlew build
echo "Build completed"
- name: Generate Docker image tag
id: docker-tag
run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
- name: Docker login
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v2
with:
context: ./apps/processer
push: true
tags: |
bskjon/mediaprocessing-processer:v2-latest
bskjon/mediaprocessing-processer:v2-${{ github.sha }}
bskjon/mediaprocessing-processer:v2-${{ steps.docker-tag.outputs.tag }}
build-converter:
needs: build-shared
if: ${{ needs.pre-check.outputs.converter == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
runs-on: ubuntu-latest
#if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Cache Shared Gradle dependencies
id: cache-gradle
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
- name: Build Converter module
id: build-converter
run: |
cd apps/converter
chmod +x ./gradlew
./gradlew build
echo "Build completed"
- name: Generate Docker image tag
id: docker-tag
run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
- name: Docker login
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v2
with:
context: ./apps/converter
push: true
tags: |
bskjon/mediaprocessing-converter:v2-latest
bskjon/mediaprocessing-converter:v2-${{ github.sha }}
bskjon/mediaprocessing-converter:v2-${{ steps.docker-tag.outputs.tag }}
build-coordinator:
needs: build-shared
if: ${{ needs.pre-check.outputs.coordinator == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }}
runs-on: ubuntu-latest
#if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Cache Shared Gradle dependencies
id: cache-gradle
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }}
- name: Build Coordinator module
id: build-coordinator
run: |
cd apps/coordinator
chmod +x ./gradlew
./gradlew build
echo "Build completed"
- name: Generate Docker image tag
id: docker-tag
run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
- name: Docker login
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v2
with:
context: ./apps/coordinator
push: true
tags: |
bskjon/mediaprocessing-coordinator:v2-latest
bskjon/mediaprocessing-coordinator:v2-${{ github.sha }}
bskjon/mediaprocessing-coordinator:v2-${{ steps.docker-tag.outputs.tag }}
build-pymetadata:
needs: pre-check
if: ${{ needs.pre-check.outputs.pyMetadata == 'true' || github.event_name == 'workflow_dispatch' }}
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Build pyMetadata module
id: build-pymetadata
run: |
if [[ "${{ steps.check-pymetadata.outputs.changed }}" == "true" || "${{ github.event_name }}" == "push" || "${{ github.event_name }}" == "workflow_dispatch" ]]; then
cd apps/pyMetadata
# Add the necessary build steps for your Python module here
echo "Build completed"
else
echo "pyMetadata has not changed. Skipping pyMetadata module build."
echo "::set-output name=job_skipped::true"
fi
- name: Generate Docker image tag
id: docker-tag
run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)"
- name: Docker login
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
username: ${{ secrets.DOCKER_HUB_NAME }}
password: ${{ secrets.DOCKER_HUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v2
with:
context: ./apps/pyMetadata
push: true
tags: |
bskjon/mediaprocessing-pymetadata:v2-latest
bskjon/mediaprocessing-pymetadata:v2-${{ github.sha }}
bskjon/mediaprocessing-pymetadata:v2-${{ steps.docker-tag.outputs.tag }}

1
.idea/gradle.xml generated
View File

@ -11,7 +11,6 @@
<option value="$PROJECT_DIR$/apps" /> <option value="$PROJECT_DIR$/apps" />
<option value="$PROJECT_DIR$/apps/converter" /> <option value="$PROJECT_DIR$/apps/converter" />
<option value="$PROJECT_DIR$/apps/coordinator" /> <option value="$PROJECT_DIR$/apps/coordinator" />
<option value="$PROJECT_DIR$/apps/encoder" />
<option value="$PROJECT_DIR$/apps/processer" /> <option value="$PROJECT_DIR$/apps/processer" />
<option value="$PROJECT_DIR$/apps/ui" /> <option value="$PROJECT_DIR$/apps/ui" />
<option value="$PROJECT_DIR$/shared" /> <option value="$PROJECT_DIR$/shared" />

View File

@ -16,9 +16,8 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkerRequest
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.util.* import java.util.*
@ -111,6 +110,8 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
} }
fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) { fun skipConvertEvent(event: PersistentProcessDataMessage, requiresEventId: String) {
if (event.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED)
return
val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}") val producesPayload = SimpleMessageData(status = Status.COMPLETED, message = "Convert event contains a payload stating that it waits for eventId: $requiresEventId with referenceId: ${event.referenceId}")
coordinator.producer.sendMessage( coordinator.producer.sendMessage(
referenceId = event.referenceId, referenceId = event.referenceId,
@ -126,7 +127,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
status = Status.COMPLETED, status = Status.COMPLETED,
producedBy = serviceId, producedBy = serviceId,
derivedFromEventId = converter.eventId, derivedFromEventId = converter.eventId,
result = result.map { it.absolutePath } outFiles = result.map { it.absolutePath }
) )
} catch (e: Converter.FileUnavailableException) { } catch (e: Converter.FileUnavailableException) {
e.printStackTrace() e.printStackTrace()
@ -135,7 +136,7 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
message = e.message, message = e.message,
producedBy = serviceId, producedBy = serviceId,
derivedFromEventId = converter.eventId, derivedFromEventId = converter.eventId,
result = emptyList() outFiles = emptyList()
) )
} catch (e : Converter.FileIsNullOrEmpty) { } catch (e : Converter.FileIsNullOrEmpty) {
e.printStackTrace() e.printStackTrace()
@ -144,16 +145,8 @@ class ConvertService(@Autowired override var coordinator: ConverterCoordinator)
message = e.message, message = e.message,
producedBy = serviceId, producedBy = serviceId,
derivedFromEventId = converter.eventId, derivedFromEventId = converter.eventId,
result = emptyList() outFiles = emptyList()
) )
} }
} }
data class PendingWorkerCache(
val referenceId: String,
val eventId: String,
val requiresEventId: String
)
} }

View File

@ -36,6 +36,8 @@ dependencies {
implementation("org.json:json:20210307") implementation("org.json:json:20210307")
implementation("no.iktdev:exfl:0.0.13-SNAPSHOT") implementation("no.iktdev:exfl:0.0.13-SNAPSHOT")
implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha22")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1")
implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT") implementation("com.github.vishna:watchservice-ktx:master-SNAPSHOT")

View File

@ -17,7 +17,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.DefaultMessageListener
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
@ -41,7 +41,7 @@ class Coordinator() {
public fun startProcess(file: File, type: ProcessType) { public fun startProcess(file: File, type: ProcessType) {
val processStartEvent = ProcessStarted( val processStartEvent = ProcessStarted(
status = Status.STARTED, status = Status.COMPLETED,
file = file.absolutePath, file = file.absolutePath,
type = type type = type
) )
@ -184,6 +184,7 @@ class Coordinator() {
val data = message.data as FfmpegWorkerArgumentsCreated val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach { data.entries.forEach {
FfmpegWorkRequestCreated( FfmpegWorkRequestCreated(
status = Status.COMPLETED,
inputFile = data.inputFile, inputFile = data.inputFile,
arguments = it.arguments, arguments = it.arguments,
outFile = it.outputFile outFile = it.outputFile
@ -206,6 +207,7 @@ class Coordinator() {
val data = message.data as FfmpegWorkerArgumentsCreated val data = message.data as FfmpegWorkerArgumentsCreated
data.entries.forEach { data.entries.forEach {
FfmpegWorkRequestCreated( FfmpegWorkRequestCreated(
status = Status.COMPLETED,
inputFile = data.inputFile, inputFile = data.inputFile,
arguments = it.arguments, arguments = it.arguments,
outFile = it.outputFile outFile = it.outputFile
@ -218,6 +220,7 @@ class Coordinator() {
} }
val outFile = File(it.outputFile) val outFile = File(it.outputFile)
ConvertWorkerRequest( ConvertWorkerRequest(
status = Status.COMPLETED,
requiresEventId = message.eventId, requiresEventId = message.eventId,
inputFile = it.outputFile, inputFile = it.outputFile,
true, true,

View File

@ -1,14 +1,20 @@
package no.iktdev.mediaprocessing.coordinator package no.iktdev.mediaprocessing.coordinator
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import no.iktdev.exfl.coroutines.Coroutines import no.iktdev.exfl.coroutines.Coroutines
import no.iktdev.exfl.observable.Observables
import no.iktdev.mediaprocessing.shared.common.DatabaseConfig import no.iktdev.mediaprocessing.shared.common.DatabaseConfig
import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.SharedConfig
import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource import no.iktdev.mediaprocessing.shared.common.datasource.MySqlDataSource
import no.iktdev.mediaprocessing.shared.common.persistance.events import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEnv
import no.iktdev.streamit.library.db.tables.*
import no.iktdev.streamit.library.db.tables.helper.cast_errors
import no.iktdev.streamit.library.db.tables.helper.data_audio
import no.iktdev.streamit.library.db.tables.helper.data_video
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.transactions.transaction
import org.springframework.boot.autoconfigure.SpringBootApplication import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication import org.springframework.boot.runApplication
import org.springframework.context.ApplicationContext import org.springframework.context.ApplicationContext
@ -27,13 +33,37 @@ fun getContext(): ApplicationContext? {
} }
fun main(args: Array<String>) { fun main(args: Array<String>) {
Coroutines.addListener(listener = object: Observables.ObservableValue.ValueListener<Throwable> {
override fun onUpdated(value: Throwable) {
value.printStackTrace()
}
})
val dataSource = MySqlDataSource.fromDatabaseEnv(); val dataSource = MySqlDataSource.fromDatabaseEnv();
Coroutines.default().launch { dataSource.createDatabase()
dataSource.createDatabase()
dataSource.createTables( val kafkaTables = listOf(
events events, // For kafka
) )
dataSource.createTables(*kafkaTables.toTypedArray())
val tables = arrayOf(
catalog,
genre,
movie,
serie,
subtitle,
summary,
users,
progress,
data_audio,
data_video,
cast_errors
)
transaction {
SchemaUtils.createMissingTablesAndColumns(*tables)
} }
context = runApplication<CoordinatorApplication>(*args) context = runApplication<CoordinatorApplication>(*args)
printSharedConfig() printSharedConfig()
} }
@ -47,5 +77,5 @@ fun printSharedConfig() {
log.info { "Database: ${DatabaseConfig.database}@${DatabaseConfig.address}:${DatabaseConfig.port}" } log.info { "Database: ${DatabaseConfig.database}@${DatabaseConfig.address}:${DatabaseConfig.port}" }
log.info { "Username: ${DatabaseConfig.username}" } log.info { "Username: ${DatabaseConfig.username}" }
log.info { "Password: ${ if(DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" } log.info { "Password: ${if (DatabaseConfig.password.isNullOrBlank()) "Is not set" else "Is set"}" }
} }

View File

@ -1,15 +1,12 @@
package no.iktdev.mediaprocessing.coordinator.mapping package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataCoverDto
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.contract.reader.SummaryInfo
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.pyMetadata
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import java.io.File
class MetadataMapping(val events: List<PersistentMessage>) { class MetadataMapping(val events: List<PersistentMessage>) {
@ -17,19 +14,45 @@ class MetadataMapping(val events: List<PersistentMessage>) {
fun map(): MetadataDto? { fun map(): MetadataDto? {
val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed? val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
val mediaReadOut = events.find { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed? val meta = events.find { it.data is MetadataPerformed }?.data as MetadataPerformed?
val coverDownloadTask = events.find { it.data is CoverInfoPerformed }?.data as CoverInfoPerformed?
val cover = events.find { it.data is CoverDownloadWorkPerformed }?.data as CoverDownloadWorkPerformed?
if (!baseInfo.isSuccess()) { if (!baseInfo.isSuccess()) {
return null return null
} }
return null
/*return MetadataDto(
title = meta?.data?.title ?: return null,
type = meta?.data?.type ?: return null,
val videoInfo = mediaReadOut?.toValueObject()
val mediaCover = if (coverDownloadTask != null || cover != null) {
val coverFile = cover?.coverFile?.let { File(it) }
MetadataCoverDto(
cover = coverFile?.name,
coverFile = cover?.coverFile,
coverUrl = coverDownloadTask?.url
)
} else null
)*/ return if (meta != null || videoInfo != null) {
MetadataDto(
title = meta?.data?.title ?: videoInfo?.fullName ?: return null,
collection = baseInfo?.title ?: return null,
cover = mediaCover,
type = meta?.data?.type ?: videoInfo?.type ?: return null,
summary = meta?.data?.summary?.filter {it.summary != null }?.map { SummaryInfo(language = it.language, summary = it.summary!! ) } ?: emptyList(),
genres = meta?.data?.genres ?: emptyList(),
)
} else null
}
fun getCollection(): String? {
val baseInfo = events.find { it.data is BaseInfoPerformed }?.data as BaseInfoPerformed?
if (!baseInfo.isSuccess()) {
return null
}
return baseInfo?.title
} }
} }

View File

@ -0,0 +1,33 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.OutputFilesDto
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
class OutputFilesMapping(val events: List<PersistentMessage>) {
fun mapTo(): OutputFilesDto {
val videoResult = events.filter { it.data is ProcesserEncodeWorkPerformed }
.map { it.data as ProcesserEncodeWorkPerformed }
val subtitleResult = events.filter { it.data is ProcesserExtractWorkPerformed && it.data.isSuccess() }.map { it.data as ProcesserExtractWorkPerformed }
val convertedSubtitleResult = events.filter { it.data is ConvertWorkPerformed && it.data.isSuccess() }.map { it.data as ConvertWorkPerformed }
return OutputFilesDto(
video = videoResult.lastOrNull { it.isSuccess() }?.outFile,
subtitles = toSubtitleList(subtitleResult, convertedSubtitleResult)
)
}
private fun toSubtitleList(extracted: List<ProcesserExtractWorkPerformed>, converted: List<ConvertWorkPerformed>): List<String> {
val sub1 = extracted.mapNotNull { it.outFile }
val sub2 = converted.flatMap { it.outFiles }
return sub1 + sub2
}
}

View File

@ -10,12 +10,15 @@ class ProcessMapping(val events: List<PersistentMessage>) {
fun map(): MediaProcessedDto? { fun map(): MediaProcessedDto? {
val referenceId = events.firstOrNull()?.referenceId ?: return null val referenceId = events.firstOrNull()?.referenceId ?: return null
val processStarted = getProcessStarted() val processStarted = getProcessStarted()
val meta = MetadataMapping(events)
return MediaProcessedDto( return MediaProcessedDto(
referenceId = referenceId, referenceId = referenceId,
process = processStarted?.type, process = processStarted?.type,
inputFile = processStarted?.file, inputFile = processStarted?.file,
metadata = MetadataMapping(events).map(), collection = meta.getCollection(),
outputFiles = null metadata = meta.map(),
videoDetails = VideoDetailsMapper(events).mapTo(),
outputFiles = OutputFilesMapping(events).mapTo()
) )
} }
@ -23,29 +26,37 @@ class ProcessMapping(val events: List<PersistentMessage>) {
return events.lastOrNull { it.data is ProcessStarted }?.data as ProcessStarted? return events.lastOrNull { it.data is ProcessStarted }?.data as ProcessStarted?
} }
fun waitsForEncode(): Boolean { fun waitsForEncode(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED } != null val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_ENCODE_PARAMETER_CREATED }
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED } != null val created = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_CREATED}
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED } != null
return !(isSkipped || (arguments && performed)) val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_PERFORMED }
val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_ENCODE_SKIPPED }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
fun waitsForExtract(): Boolean { fun waitsForExtract(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED } != null val arguments = events.filter { it.event == KafkaEvents.EVENT_MEDIA_EXTRACT_PARAMETER_CREATED }
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED } != null val created = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_CREATED }
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED } != null
return !(isSkipped || (arguments && performed)) val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED }
val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED }
return (arguments.isNotEmpty() && created.isEmpty()) || created.size > performed.size + isSkipped.size
} }
fun waitsForConvert(): Boolean { fun waitsForConvert(): Boolean {
val arguments = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED } != null val created = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_CREATED }
val performed = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED } != null val performed = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_PERFORMED }
val isSkipped = events.find { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED } != null val isSkipped = events.filter { it.event == KafkaEvents.EVENT_WORK_CONVERT_SKIPPED }
return !(isSkipped || (arguments && performed))
return created.size > performed.size + isSkipped.size
} }
fun canCollect(): Boolean { fun canCollect(): Boolean {
return waitsForEncode() && waitsForExtract() && waitsForConvert() return (!waitsForEncode() && !waitsForExtract() && !waitsForConvert())
} }
} }

View File

@ -0,0 +1,27 @@
package no.iktdev.mediaprocessing.coordinator.mapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.SerieInfo
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.EpisodeInfo
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
class VideoDetailsMapper(val events: List<PersistentMessage>) {
fun mapTo(): VideoDetails? {
val mediaReadOut = events.lastOrNull { it.data is VideoInfoPerformed }?.data as VideoInfoPerformed?
val proper = mediaReadOut?.toValueObject() ?: return null
val details = VideoDetails(
type = proper.type,
fullName = proper.fullName,
serieInfo = if (proper !is EpisodeInfo) null else SerieInfo(
episodeTitle = proper.episodeTitle,
episodeNumber = proper.episode,
seasonNumber = proper.season,
title = proper.title
)
)
return details
}
}

View File

@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File

View File

@ -0,0 +1,144 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.datasource.executeOrException
import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.contract.reader.MetadataDto
import no.iktdev.mediaprocessing.shared.contract.reader.VideoDetails
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.db.query.*
import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.springframework.stereotype.Service
import java.io.File
import java.sql.SQLIntegrityConstraintViolationException
@Service
class CollectAndStoreTask() : TaskCreator() {
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_COLLECT_AND_STORE
override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_PROCESS_STARTED,
EVENT_PROCESS_COMPLETED
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.find { it.event == EVENT_PROCESS_STARTED } ?: return null
val completed = events.find { it.event == EVENT_PROCESS_COMPLETED } ?: return null
if (!started.data.isSuccess() || !completed.data.isSuccess() && completed.data.status != Status.SKIPPED) {
return null
}
val mapped = ProcessMapping(events).map() ?: return null
val collection = mapped.collection ?: return null
val subtitlesStored = mapped.outputFiles?.subtitles?.let {
storeSubtitles(collection = collection, subtitles = it)
} ?: false
val videoFile = mapped.outputFiles?.video?.let { File(it).name }
val videoInfo = mapped.videoDetails
val genres = mapped.metadata?.genres?.let {
storeAndGetGenres(it)
}
val catalogId = mapped.metadata?.let { meta ->
if (videoInfo == null || videoFile == null)
null
else
storeCatalog(metadata = meta,genres = genres, videoFile = videoFile, videoDetails = videoInfo)
} ?: return SimpleMessageData(Status.ERROR, "Unable to store catalog when metadata is null")
mapped.metadata?.let {
storeMetadata(catalogId = catalogId, metadata = it)
}
return SimpleMessageData(Status.COMPLETED)
}
private fun storeSubtitles(collection: String, subtitles: List<String>): Boolean {
val result = subtitles.map { subtitle ->
val subtitleFile = File(subtitle)
val language = subtitleFile.parentFile.name
subtitle to executeWithStatus {
SubtitleQuery(
collection = collection,
associatedWithVideo = subtitleFile.nameWithoutExtension,
language = language,
format = subtitleFile.extension.uppercase(),
file = subtitleFile.name
).insert()
}
}
return result.none { !it.second }
}
private fun storeMetadata(catalogId: Int, metadata: MetadataDto) {
metadata.summary.forEach {
withTransaction {
SummaryQuery(
cid = catalogId,
language = it.language,
description = it.summary
).insert()
}
}
}
private fun storeAndGetGenres(genres: List<String>): String? {
return withTransaction {
val gq = GenreQuery( *genres.toTypedArray() )
gq.insertAndGetIds()
gq.getIds().joinToString(",")
}
}
private fun storeCatalog(metadata: MetadataDto, videoDetails: VideoDetails, videoFile: String, genres: String?): Int? {
val precreatedCatalogQuery = CatalogQuery(
title = metadata.title,
cover = metadata.cover?.cover,
type = metadata.type,
collection = metadata.collection,
genres = genres
)
val result = when (videoDetails.type) {
"serie" -> {
val serieInfo = videoDetails.serieInfo ?: throw RuntimeException("SerieInfo missing in VideoDetails for Serie! $videoFile")
executeOrException {
precreatedCatalogQuery.insertWithSerie(
episodeTitle = serieInfo.episodeTitle ?: "",
videoFile = videoFile,
episode = serieInfo.episodeNumber,
season = serieInfo.seasonNumber
)
}
}
"movie" -> {
executeOrException {
precreatedCatalogQuery.insertWithMovie(videoFile)
}
}
else -> throw RuntimeException("${videoDetails.type} is not supported!")
}
val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062
return if (result == null || ignoreException ) {
return withTransaction {
precreatedCatalogQuery.getId()
}
} else null
}
}

View File

@ -0,0 +1,56 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.coordinator.mapping.ProcessMapping
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents.*
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessCompleted
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import org.springframework.stereotype.Service
@Service
class CompleteTask() : TaskCreator() {
override val producesEvent: KafkaEvents = KafkaEvents.EVENT_PROCESS_COMPLETED
override val requiredEvents: List<KafkaEvents> = listOf(
EVENT_PROCESS_STARTED,
EVENT_MEDIA_READ_BASE_INFO_PERFORMED,
EVENT_MEDIA_READ_OUT_NAME_AND_TYPE
)
override val listensForEvents: List<KafkaEvents> = KafkaEvents.entries
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val started = events.find { it.event == KafkaEvents.EVENT_PROCESS_STARTED } ?: return null
if (!started.data.isSuccess()) {
return null
}
val receivedEvents = events.map { it.event }
val requiresOneOf = listOf(
EVENT_MEDIA_EXTRACT_PARAMETER_CREATED,
EVENT_MEDIA_ENCODE_PARAMETER_CREATED,
EVENT_MEDIA_DOWNLOAD_COVER_PARAMETER_CREATED,
EVENT_WORK_CONVERT_CREATED
)
if (!requiresOneOf.any { it in receivedEvents }) {
log.info { "Can't complete at this moment. Missing required event" }
return null //SimpleMessageData(Status.SKIPPED, "Can't collect at this moment. Missing required event")
}
val mapper = ProcessMapping(events)
if (mapper.canCollect()) {
return ProcessCompleted(Status.COMPLETED)
}
return null
}
}

View File

@ -0,0 +1,57 @@
package no.iktdev.mediaprocessing.coordinator.tasks.event
import kotlinx.coroutines.runBlocking
import no.iktdev.mediaprocessing.coordinator.TaskCreator
import no.iktdev.mediaprocessing.shared.common.DownloadClient
import no.iktdev.mediaprocessing.shared.common.getComputername
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverDownloadWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
import org.springframework.stereotype.Service
import java.io.File
import java.util.*
@Service
class DownloadAndStoreCoverTask: TaskCreator() {
val serviceId = "${getComputername()}::${this.javaClass.simpleName}::${UUID.randomUUID()}"
override val producesEvent: KafkaEvents
get() = KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED
override val requiredEvents: List<KafkaEvents>
get() = listOf(
KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED,
KafkaEvents.EVENT_MEDIA_READ_OUT_COVER,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
)
override fun prerequisitesRequired(events: List<PersistentMessage>): List<() -> Boolean> {
return super.prerequisitesRequired(events) + listOf {
isPrerequisiteDataPresent(events)
}
}
override fun onProcessEvents(event: PersistentMessage, events: List<PersistentMessage>): MessageDataWrapper? {
val cover = events.find { it.event == KafkaEvents.EVENT_MEDIA_READ_OUT_COVER }
if (cover == null || cover.data !is CoverInfoPerformed) {
return SimpleMessageData(Status.ERROR, "Wrong type triggered and caused an execution for $serviceId")
}
val coverData = cover.data as CoverInfoPerformed
val outDir = File(coverData.outDir)
if (!outDir.exists())
return SimpleMessageData(Status.ERROR, "Check for output directory for cover storage failed for $serviceId")
val client = DownloadClient(coverData.url, File(coverData.outDir), coverData.outFileBaseName)
val result = runBlocking {
client.download()
}
return if (result == null) {
SimpleMessageData(Status.ERROR, "Could not download cover, check logs")
} else {
val status = if (result.exists() && result.canRead()) Status.COMPLETED else Status.ERROR
CoverDownloadWorkPerformed(status = status, coverFile = result.absolutePath)
}
}
}

View File

@ -8,7 +8,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerforme
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.CoverInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service

View File

@ -14,7 +14,7 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MetadataPerforme
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.VideoInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.hasValidData
import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess import no.iktdev.mediaprocessing.shared.kafka.dto.isSuccess
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.scheduling.annotation.EnableScheduling import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.scheduling.annotation.Scheduled import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Service import org.springframework.stereotype.Service

View File

@ -11,7 +11,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.MediaStreamsParsePerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
@Service @Service

View File

@ -12,7 +12,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File

View File

@ -9,7 +9,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.*
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File

View File

@ -12,7 +12,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.SubtitleStream
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File

View File

@ -1,6 +1,7 @@
package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher package no.iktdev.mediaprocessing.coordinator.tasks.input.watcher
import dev.vishna.watchservice.KWatchEvent.Kind.Deleted import dev.vishna.watchservice.KWatchEvent.Kind.Deleted
import dev.vishna.watchservice.KWatchEvent.Kind.Initialized
import dev.vishna.watchservice.asWatchChannel import dev.vishna.watchservice.asWatchChannel
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -51,6 +52,7 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
} }
when (it.kind) { when (it.kind) {
Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved) Deleted -> queue.removeFromQueue(it.file, this@InputDirectoryWatcher::onFileRemoved)
Initialized -> { /* Do nothing */ }
else -> { else -> {
if (it.file.isFile && it.file.isSupportedVideoFile()) { if (it.file.isFile && it.file.isSupportedVideoFile()) {
queue.addToQueue(it.file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable) queue.addToQueue(it.file, this@InputDirectoryWatcher::onFilePending, this@InputDirectoryWatcher::onFileAvailable)

View File

@ -5,7 +5,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.contract.ProcessType import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Named import org.junit.jupiter.api.Named
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test

View File

@ -4,7 +4,7 @@ import com.google.gson.Gson
import com.google.gson.JsonObject import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ReaderPerformed
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*

View File

@ -14,10 +14,10 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.* import java.util.*
@ -136,7 +136,8 @@ class EncodeService: TaskCreator() {
readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId) readbackIsSuccess = PersistentDataReader().isProcessEventDefinedAsConsumed(runner.referenceId, runner.eventId, encodeServiceId)
} }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)) ProcesserEncodeWorkPerformed(status = Status.COMPLETED, producedBy = encodeServiceId, derivedFromEventId = runner.eventId, outFile = runner.info.outFile)
)
clearWorker() clearWorker()
} }
@ -150,7 +151,8 @@ class EncodeService: TaskCreator() {
} }
log.info { "Encode failed for ${runner.referenceId}" } log.info { "Encode failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)) ProcesserEncodeWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = encodeServiceId, derivedFromEventId = runner.eventId)
)
sendProgress(info = info, ended = true) sendProgress(info = info, ended = true)
clearWorker() clearWorker()
} }

View File

@ -15,10 +15,10 @@ import no.iktdev.mediaprocessing.shared.common.persistance.PersistentDataStore
import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage import no.iktdev.mediaprocessing.shared.common.persistance.PersistentProcessDataMessage
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.FfmpegWorkRequestCreated
import no.iktdev.mediaprocessing.processer.ProcesserEnv import no.iktdev.mediaprocessing.processer.ProcesserEnv
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
import org.springframework.stereotype.Service import org.springframework.stereotype.Service
import java.io.File import java.io.File
import java.util.* import java.util.*
@ -141,7 +141,11 @@ class ExtractService: TaskCreator() {
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.COMPLETED, producedBy = extractServiceId, derivedFromEventId = runner.eventId) ProcesserExtractWorkPerformed(
status = Status.COMPLETED,
producedBy = extractServiceId,
derivedFromEventId = runner.eventId,
outFile = runner.info.outFile)
) )
log.info { "Extract is releasing worker" } log.info { "Extract is releasing worker" }
clearWorker() clearWorker()
@ -156,7 +160,7 @@ class ExtractService: TaskCreator() {
} }
log.info { "Extract failed for ${runner.referenceId}" } log.info { "Extract failed for ${runner.referenceId}" }
producer.sendMessage(referenceId = runner.referenceId, event = producesEvent, producer.sendMessage(referenceId = runner.referenceId, event = producesEvent,
FfmpegWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId) ProcesserExtractWorkPerformed(status = Status.ERROR, message = errorMessage, producedBy = extractServiceId, derivedFromEventId = runner.eventId)
) )
sendState(info, ended= true) sendState(info, ended= true)
clearWorker() clearWorker()

View File

@ -7,6 +7,7 @@ import uuid
import threading import threading
import json import json
import time import time
from kafka import KafkaConsumer, KafkaProducer from kafka import KafkaConsumer, KafkaProducer
from fuzzywuzzy import fuzz from fuzzywuzzy import fuzz
from sources.result import DataResult, Metadata from sources.result import DataResult, Metadata
@ -18,8 +19,8 @@ from sources.select import UseSource
# Konfigurer Kafka-forbindelsen # Konfigurer Kafka-forbindelsen
bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092" bootstrap_servers = os.environ.get("KAFKA_BOOTSTRAP_SERVER") or "127.0.0.1:9092"
consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"Metadata-{uuid.uuid4()}" consumer_group = os.environ.get("KAFKA_CONSUMER_ID") or f"MetadataConsumer"
kafka_topic = os.environ.get("KAFKA_TOPIC") or "127.0.0.1:9092" kafka_topic = os.environ.get("KAFKA_TOPIC") or "mediaEvents"
# Konfigurer logging # Konfigurer logging
logging.basicConfig( logging.basicConfig(
@ -154,7 +155,7 @@ class MessageHandlerThread(threading.Thread):
logger.info("Not in cache: %s", name) logger.info("Not in cache: %s", name)
logger.info("Searching in sources for information about %s", name) logger.info("Searching in sources for information about %s", name)
result: Optional[DataResult] = UseSource(title=name).select_result() result: Optional[DataResult] = UseSource(title=name).select_result()
if (result.status == "SUCCESS"): if (result.status == "COMPLETED"):
logger.info("Storing response for %s in in-memory cache", name) logger.info("Storing response for %s in in-memory cache", name)
ResultCache.add(name, result) ResultCache.add(name, result)
return result return result

View File

@ -1,5 +1,5 @@
from AnilistPython import Anilist from AnilistPython import Anilist
from .result import Metadata, DataResult from .result import Metadata, DataResult, Summary
class metadata(): class metadata():
name: str = None name: str = None
@ -17,19 +17,24 @@ class metadata():
title = result.get("name_english", None), title = result.get("name_english", None),
altTitle = [result.get("name_romaji", [])], altTitle = [result.get("name_romaji", [])],
cover = result.get("cover_image", None), cover = result.get("cover_image", None),
summary = result.get("desc", None), summary = [
Summary(
language = "eng",
summary = result.get("desc", None)
)
],
type = 'movie' if result.get('airing_format', '').lower() == 'movie' else 'serie', type = 'movie' if result.get('airing_format', '').lower() == 'movie' else 'serie',
genres = result.get('genres', []), genres = result.get('genres', []),
source="anii", source="anii",
usedTitle=self.name usedTitle=self.name
) )
if (meta.title is None) or (meta.type is None): if (meta.title is None) or (meta.type is None):
return DataResult("SUCCESS", None, None) return DataResult("COMPLETED", None, None)
return DataResult("SUCCESS", None, meta) return DataResult("COMPLETED", None, meta)
except IndexError as ingore: except IndexError as ingore:
return DataResult(statusType="SUCCESS", message=f"No result for {self.name}") return DataResult(statusType="COMPLETED", message=f"No result for {self.name}")
except Exception as e: except Exception as e:
return DataResult(statusType="ERROR", message=str(e)) return DataResult(statusType="ERROR", message=str(e))

View File

@ -1,5 +1,5 @@
import imdb import imdb
from .result import Metadata, DataResult from .result import Metadata, DataResult, Summary
class metadata(): class metadata():
name: str = None name: str = None
@ -19,15 +19,20 @@ class metadata():
title = result.get("title", None), title = result.get("title", None),
altTitle = [result.get("localized title", [])], altTitle = [result.get("localized title", [])],
cover = result.get("cover url", None), cover = result.get("cover url", None),
summary = result.get("plot outline", None), summary = [
Summary(
language = "eng",
summary = result.get("plot outline", None)
)
],
type = 'movie' if result.get('kind', '').lower() == 'movie' else 'serie', type = 'movie' if result.get('kind', '').lower() == 'movie' else 'serie',
genres = result.get('genres', []), genres = result.get('genres', []),
source="imdb", source="imdb",
usedTitle=self.name usedTitle=self.name
) )
if (meta.title is None) or (meta.type is None): if (meta.title is None) or (meta.type is None):
return DataResult("SUCCESS", None, None) return DataResult("COMPLETED", None, None)
return DataResult("SUCCESS", None, meta) return DataResult("COMPLETED", None, meta)
except Exception as e: except Exception as e:
return DataResult(status="ERROR", data=None, message=str(e)) return DataResult(status="ERROR", data=None, message=str(e))

View File

@ -1,5 +1,5 @@
from mal import * from mal import *
from .result import Metadata, DataResult from .result import Metadata, DataResult, Summary
class metadata(): class metadata():
name: str = None name: str = None
@ -11,21 +11,26 @@ class metadata():
try: try:
search = AnimeSearch(self.name) search = AnimeSearch(self.name)
if (len(search.results) == 0): if (len(search.results) == 0):
return DataResult(status="SUCCESS", message="No results") return DataResult(status="SKIPPED", message="No results")
anime = Anime(search.results[0].mal_id) anime = Anime(search.results[0].mal_id)
meta = Metadata( meta = Metadata(
title = anime.title, title = anime.title,
altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName], altTitle = [altName for altName in [anime.title_english, *anime.title_synonyms] if altName],
cover = anime.image_url, cover = anime.image_url,
summary = anime.synopsis, summary = [
Summary(
language = "eng",
summary = anime.synopsis
)
],
type = 'movie' if anime.type.lower() == 'movie' else 'serie', type = 'movie' if anime.type.lower() == 'movie' else 'serie',
genres = anime.genres, genres = anime.genres,
source="mal", source="mal",
usedTitle=self.name usedTitle=self.name
) )
if (meta.title is None) or (meta.type is None): if (meta.title is None) or (meta.type is None):
return DataResult("SUCCESS", None, None) return DataResult("COMPLETED", None, None)
return DataResult("SUCCESS", None, meta) return DataResult("COMPLETED", None, meta)
except Exception as e: except Exception as e:
return DataResult(status="ERROR", message=str(e)) return DataResult(status="ERROR", message=str(e))

View File

@ -1,13 +1,21 @@
from typing import List, Optional from typing import List, Optional
from dataclasses import dataclass, asdict from dataclasses import dataclass, asdict
@dataclass
class Summary:
summary: str
language: str
def to_dict(self):
return asdict(self)
@dataclass @dataclass
class Metadata: class Metadata:
title: str title: str
altTitle: List[str] altTitle: List[str]
cover: str cover: str
type: str # Serie/Movie type: str # Serie/Movie
summary: str summary: List[Summary]
genres: List[str] genres: List[str]
source: str source: str
usedTitle: str usedTitle: str
@ -15,6 +23,7 @@ class Metadata:
def to_dict(self): def to_dict(self):
return asdict(self) return asdict(self)
@dataclass @dataclass
class DataResult: class DataResult:
status: str # COMPLETED / ERROR status: str # COMPLETED / ERROR

View File

@ -28,11 +28,11 @@ class UseSource():
mal = MalMetadata(title).lookup() mal = MalMetadata(title).lookup()
result: List[WeightedData] = [] result: List[WeightedData] = []
if (anii is not None) and (anii.status == "SUCCESS" and anii.data is not None): if (anii is not None) and (anii.status == "COMPLETED" and anii.data is not None):
result.append(WeightedData(anii, 4)) result.append(WeightedData(anii, 4))
if (imdb is not None) and (imdb.status == "SUCCESS" and imdb.data is not None): if (imdb is not None) and (imdb.status == "COMPLETED" and imdb.data is not None):
result.append(WeightedData(imdb, 1)) result.append(WeightedData(imdb, 1))
if (mal is not None) and (mal.status == "SUCCESS" and mal.data is not None): if (mal is not None) and (mal.status == "COMPLETED" and mal.data is not None):
result.append(WeightedData(mal, 8)) result.append(WeightedData(mal, 8))
return result return result

View File

@ -3,20 +3,21 @@ plugins {
} }
rootProject.name = "MediaProcessing" rootProject.name = "MediaProcessing"
include("apps") include("apps")
include("apps:ui")
include("apps:coordinator")
include("apps:converter")
include("apps:processer")
include("shared") include("shared")
include("shared:kafka") include("shared:kafka")
findProject(":shared:kafka")?.name = "kafka"
include("apps:coordinator")
findProject(":apps:coordinator")?.name = "coordinator"
include("apps:ui")
findProject(":apps:ui")?.name = "ui"
include("apps:encoder")
findProject(":apps:encoder")?.name = "encoder"
include("apps:converter")
findProject(":apps:converter")?.name = "converter"
include("shared:contract") include("shared:contract")
findProject(":shared:contract")?.name = "contract"
include("shared:common") include("shared:common")
findProject(":shared:common")?.name = "common"
include("apps:processer") findProject(":apps:ui")?.name = "ui"
findProject(":apps:coordinator")?.name = "coordinator"
findProject(":apps:converter")?.name = "converter"
findProject(":apps:processer")?.name = "processer" findProject(":apps:processer")?.name = "processer"
findProject(":shared:kafka")?.name = "kafka"
findProject(":shared:contract")?.name = "contract"
findProject(":shared:common")?.name = "common"

View File

@ -1,5 +1,8 @@
package no.iktdev.mediaprocessing.shared.common package no.iktdev.mediaprocessing.shared.common
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import no.iktdev.exfl.using import no.iktdev.exfl.using
import java.io.File import java.io.File
import java.io.FileOutputStream import java.io.FileOutputStream
@ -7,6 +10,7 @@ import java.net.HttpURLConnection
import java.net.URL import java.net.URL
open class DownloadClient(val url: String, val outDir: File, val baseName: String) { open class DownloadClient(val url: String, val outDir: File, val baseName: String) {
val log = KotlinLogging.logger {}
protected val http: HttpURLConnection = openConnection() protected val http: HttpURLConnection = openConnection()
private val BUFFER_SIZE = 4096 private val BUFFER_SIZE = 4096
@ -27,10 +31,17 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin
return ((read * 100) / total) return ((read * 100) / total)
} }
suspend fun download(): File? { suspend fun download(): File? = withContext(Dispatchers.IO) {
val extension = getExtension() val extension = getExtension()
?: throw UnsupportedFormatException("Provided url does not contain a supported file extension") ?: throw UnsupportedFormatException("Provided url does not contain a supported file extension")
val outFile = outDir.using("$baseName.$extension") val outFile = outDir.using("$baseName.$extension")
if (!outDir.exists())
return@withContext null
if (outFile.exists()) {
log.info { "${outFile.name} already exists. Download skipped!" }
return@withContext outFile
}
val inputStream = http.inputStream val inputStream = http.inputStream
val fos = FileOutputStream(outFile, false) val fos = FileOutputStream(outFile, false)
@ -51,7 +62,7 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin
} }
inputStream.close() inputStream.close()
fos.close() fos.close()
return outFile return@withContext outFile
} }
open fun getExtension(): String? { open fun getExtension(): String? {

View File

@ -4,7 +4,7 @@ import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
open class TableDefaultOperations<T: Table> { open class TableDefaultOperations<T : Table> {
} }
@ -45,7 +45,7 @@ fun <T> insertWithSuccess(block: () -> T): Boolean {
} }
} }
fun <T> executeOrException(block: () -> T): Exception? { fun <T> executeOrException(rollbackOnFailure: Boolean = false, block: () -> T): Exception? {
return try { return try {
transaction { transaction {
try { try {
@ -54,7 +54,8 @@ fun <T> executeOrException(block: () -> T): Exception? {
null null
} catch (e: Exception) { } catch (e: Exception) {
// log the error here or handle the exception as needed // log the error here or handle the exception as needed
rollback() if (rollbackOnFailure)
rollback()
e e
} }
@ -65,6 +66,25 @@ fun <T> executeOrException(block: () -> T): Exception? {
} }
} }
fun <T> executeWithResult(block: () -> T): Pair<T?, Exception?> {
return try {
transaction {
try {
val res = block()
commit()
res to null
} catch (e: Exception) {
// log the error here or handle the exception as needed
rollback()
null to e
}
}
} catch (e: Exception) {
e.printStackTrace()
return null to e
}
}
fun <T> executeWithStatus(block: () -> T): Boolean { fun <T> executeWithStatus(block: () -> T): Boolean {
return try { return try {
transaction { transaction {

View File

@ -5,7 +5,6 @@ import no.iktdev.mediaprocessing.shared.common.datasource.executeWithStatus
import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction import no.iktdev.mediaprocessing.shared.common.datasource.withTransaction
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import org.jetbrains.exposed.exceptions.ExposedSQLException import org.jetbrains.exposed.exceptions.ExposedSQLException
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.insert import org.jetbrains.exposed.sql.insert
import org.jetbrains.exposed.sql.javatime.CurrentDateTime import org.jetbrains.exposed.sql.javatime.CurrentDateTime
@ -24,6 +23,7 @@ open class PersistentDataStore {
} }
return if (exception == null) true else { return if (exception == null) true else {
if (exception.cause is SQLIntegrityConstraintViolationException) { if (exception.cause is SQLIntegrityConstraintViolationException) {
exception.printStackTrace()
(exception as ExposedSQLException).errorCode == 1062 (exception as ExposedSQLException).errorCode == 1062
} }
else { else {

View File

@ -11,6 +11,7 @@ object events: IntIdTable() {
val eventId: Column<String> = varchar("eventId", 50) val eventId: Column<String> = varchar("eventId", 50)
val event: Column<String> = varchar("event",100) val event: Column<String> = varchar("event",100)
val data: Column<String> = text("data") val data: Column<String> = text("data")
//val success: Column<Boolean> = bool("success").default(false)
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime) val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
init { init {

View File

@ -5,7 +5,9 @@ import no.iktdev.mediaprocessing.shared.contract.ProcessType
data class MediaProcessedDto( data class MediaProcessedDto(
val referenceId: String, val referenceId: String,
val process: ProcessType?, val process: ProcessType?,
val collection: String?,
val inputFile: String?, val inputFile: String?,
val metadata: MetadataDto?, val metadata: MetadataDto?,
val videoDetails: VideoDetails? = null,
val outputFiles: OutputFilesDto? val outputFiles: OutputFilesDto?
) )

View File

@ -2,14 +2,20 @@ package no.iktdev.mediaprocessing.shared.contract.reader
data class MetadataDto( data class MetadataDto(
val title: String, val title: String,
val collection: String,
val type: String, val type: String,
val cover: MetadataCoverDto, val cover: MetadataCoverDto?,
val summary: List<SummaryInfo> = emptyList(),
val genres: List<String>,
)
data class SummaryInfo(
val summary: String, val summary: String,
val genres: List<String> val language: String = "eng"
) )
data class MetadataCoverDto( data class MetadataCoverDto(
val cover: String, val cover: String?, // ex Fancy.jpeg
val coverUrl: String, val coverUrl: String?,
val coverFile: String val coverFile: String?
) )

View File

@ -1,7 +1,7 @@
package no.iktdev.mediaprocessing.shared.contract.reader package no.iktdev.mediaprocessing.shared.contract.reader
class OutputFilesDto(
val videoFile: String, data class OutputFilesDto(
val videoArguments: List<String>, val video: String?, // FullName (Path + name)
val subtitleFiles: List<String> val subtitles: List<String> = emptyList() // (Path + Name)
) )

View File

@ -0,0 +1,14 @@
package no.iktdev.mediaprocessing.shared.contract.reader
data class VideoDetails(
val serieInfo: SerieInfo? = null,
val type: String,
val fullName: String
)
data class SerieInfo(
val episodeTitle: String? = null,
val episodeNumber: Int,
val seasonNumber: Int,
val title: String
)

View File

@ -7,6 +7,8 @@ import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData import no.iktdev.mediaprocessing.shared.kafka.dto.SimpleMessageData
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.* import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.*
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserEncodeWorkPerformed
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work.ProcesserExtractWorkPerformed
class DeserializingRegistry { class DeserializingRegistry {
private val log = KotlinLogging.logger {} private val log = KotlinLogging.logger {}
@ -29,14 +31,16 @@ class DeserializingRegistry {
KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java, KafkaEvents.EVENT_WORK_EXTRACT_CREATED to FfmpegWorkRequestCreated::class.java,
KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java, KafkaEvents.EVENT_WORK_CONVERT_CREATED to ConvertWorkerRequest::class.java,
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to FfmpegWorkPerformed::class.java, KafkaEvents.EVENT_WORK_ENCODE_PERFORMED to ProcesserEncodeWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to FfmpegWorkPerformed::class.java, KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED to ProcesserExtractWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to null, KafkaEvents.EVENT_WORK_CONVERT_PERFORMED to ConvertWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to null, KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED to CoverDownloadWorkPerformed::class.java,
KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null, KafkaEvents.EVENT_WORK_ENCODE_SKIPPED to null,
KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null, KafkaEvents.EVENT_WORK_EXTRACT_SKIPPED to null,
KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null, KafkaEvents.EVENT_WORK_CONVERT_SKIPPED to null,
KafkaEvents.EVENT_PROCESS_COMPLETED to ProcessCompleted::class.java
) )
} }
@ -55,7 +59,7 @@ class DeserializingRegistry {
} }
} }
// Fallback // Fallback
val type = object : TypeToken<Message<out MessageDataWrapper>>() {}.type val type = object : TypeToken<Message<out SimpleMessageData>>() {}.type
return gson.fromJson<Message<SimpleMessageData>>(json, type) return gson.fromJson<Message<SimpleMessageData>>(json, type)
} }

View File

@ -35,7 +35,8 @@ enum class KafkaEvents(val event: String) {
EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"), EVENT_STORE_COVER_PERFORMED("event:store-cover:performed"),
EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"), EVENT_STORE_METADATA_PERFORMED("event:store-metadata:performed"),
EVENT_PROCESS_COMPLETED("event:process:completed"); EVENT_PROCESS_COMPLETED("event:process:completed"),
EVENT_COLLECT_AND_STORE("event::save");
companion object { companion object {
fun toEvent(event: String): KafkaEvents? { fun toEvent(event: String): KafkaEvents? {

View File

@ -2,7 +2,6 @@ package no.iktdev.mediaprocessing.shared.kafka.dto
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.reflect.TypeToken import com.google.gson.reflect.TypeToken
import no.iktdev.streamit.library.kafka.dto.Status
import java.lang.reflect.Type import java.lang.reflect.Type
import java.util.* import java.util.*

View File

@ -1,7 +1,5 @@
package no.iktdev.mediaprocessing.shared.kafka.dto package no.iktdev.mediaprocessing.shared.kafka.dto
import no.iktdev.streamit.library.kafka.dto.Status
open class MessageDataWrapper( open class MessageDataWrapper(
@Transient open val status: Status = Status.ERROR, @Transient open val status: Status = Status.ERROR,

View File

@ -1,7 +1,7 @@
package no.iktdev.streamit.library.kafka.dto package no.iktdev.mediaprocessing.shared.kafka.dto
enum class Status { enum class Status {
STARTED, SKIPPED,
COMPLETED, COMPLETED,
ERROR ERROR
} }

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_BASE_INFO_PERFORMED)
data class BaseInfoPerformed( data class BaseInfoPerformed(

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_PERFORMED)
data class ConvertWorkPerformed( data class ConvertWorkPerformed(
@ -11,5 +11,5 @@ data class ConvertWorkPerformed(
override val message: String? = null, override val message: String? = null,
val producedBy: String, val producedBy: String,
val derivedFromEventId: String, val derivedFromEventId: String,
val result: List<String> val outFiles: List<String>
): MessageDataWrapper(status, message) ): MessageDataWrapper(status, message)

View File

@ -3,12 +3,14 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED) @KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED)
data class ConvertWorkerRequest( data class ConvertWorkerRequest(
override val status: Status,
val requiresEventId: String? = null, val requiresEventId: String? = null,
val inputFile: String, val inputFile: String,
val allowOverwrite: Boolean, val allowOverwrite: Boolean,
val outFileBaseName: String, val outFileBaseName: String,
val outDirectory: String val outDirectory: String
): MessageDataWrapper() ): MessageDataWrapper(status)

View File

@ -3,11 +3,11 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_CONVERT_CREATED) @KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_DOWNLOAD_COVER_PERFORMED)
data class MediaConvertInfo( data class CoverDownloadWorkPerformed(
override val status: Status, override val status: Status,
val arguments: List<List<String>> override val message: String? = null,
val coverFile: String
): MessageDataWrapper(status) ): MessageDataWrapper(status, message)

View File

@ -1,8 +1,11 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_OUT_COVER)
data class CoverInfoPerformed( data class CoverInfoPerformed(
override val status: Status, override val status: Status,
val url: String, val url: String,

View File

@ -3,13 +3,15 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent( @KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_CREATED, KafkaEvents.EVENT_WORK_ENCODE_CREATED,
KafkaEvents.EVENT_WORK_EXTRACT_CREATED KafkaEvents.EVENT_WORK_EXTRACT_CREATED
) )
data class FfmpegWorkRequestCreated( data class FfmpegWorkRequestCreated(
override val status: Status,
val inputFile: String, val inputFile: String,
val arguments: List<String>, val arguments: List<String>,
val outFile: String val outFile: String
): MessageDataWrapper() ): MessageDataWrapper(status)

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
/** /**
* @param status Status type * @param status Status type

View File

@ -1,13 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_ENCODE_CREATED)
data class MediaEncodeInfo(
override val status: Status,
val arguments: List<String>
) :
MessageDataWrapper(status)

View File

@ -1,12 +0,0 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_WORK_EXTRACT_CREATED)
data class MediaExtractInfo(
override val status: Status,
val arguments: List<List<String>>
) : MessageDataWrapper(status)

View File

@ -4,7 +4,7 @@ import no.iktdev.mediaprocessing.shared.contract.ffmpeg.ParsedMediaStreams
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_PARSE_STREAM_PERFORMED)
data class MediaStreamsParsePerformed( data class MediaStreamsParsePerformed(

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_METADATA_SEARCH_PERFORMED)
data class MetadataPerformed( data class MetadataPerformed(
@ -17,6 +17,11 @@ data class pyMetadata(
val altTitle: List<String> = emptyList(), val altTitle: List<String> = emptyList(),
val cover: String? = null, val cover: String? = null,
val type: String, val type: String,
val summary: String? = null, val summary: List<pySummary> = emptyList(),
val genres: List<String> = emptyList() val genres: List<String> = emptyList()
)
data class pySummary(
val summary: String?,
val language: String = "eng"
) )

View File

@ -3,8 +3,10 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_COMPLETED) @KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_COMPLETED)
data class ProcessCompleted(override val status: Status) : MessageDataWrapper(status) { data class ProcessCompleted(
override val status: Status
) : MessageDataWrapper(status) {
} }

View File

@ -4,7 +4,7 @@ import no.iktdev.mediaprocessing.shared.contract.ProcessType
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_STARTED) @KafkaBelongsToEvent(KafkaEvents.EVENT_PROCESS_STARTED)
data class ProcessStarted( data class ProcessStarted(

View File

@ -4,7 +4,7 @@ import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
@KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED) @KafkaBelongsToEvent(KafkaEvents.EVENT_MEDIA_READ_STREAM_PERFORMED)
data class ReaderPerformed( data class ReaderPerformed(

View File

@ -3,7 +3,7 @@ package no.iktdev.mediaprocessing.shared.kafka.dto.events_result
import com.google.gson.Gson import com.google.gson.Gson
import com.google.gson.JsonObject import com.google.gson.JsonObject
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
data class VideoInfoPerformed( data class VideoInfoPerformed(
override val status: Status, override val status: Status,

View File

@ -0,0 +1,18 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED
)
data class ProcesserEncodeWorkPerformed(
override val status: Status,
override val message: String? = null,
val producedBy: String,
val derivedFromEventId: String,
val outFile: String? = null
): MessageDataWrapper(status, message)

View File

@ -1,17 +1,18 @@
package no.iktdev.mediaprocessing.shared.kafka.dto.events_result package no.iktdev.mediaprocessing.shared.kafka.dto.events_result.work
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent import no.iktdev.mediaprocessing.shared.kafka.core.KafkaBelongsToEvent
import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
// Derived from ffmpeg work
@KafkaBelongsToEvent( @KafkaBelongsToEvent(
KafkaEvents.EVENT_WORK_ENCODE_PERFORMED,
KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED KafkaEvents.EVENT_WORK_EXTRACT_PERFORMED
) )
data class FfmpegWorkPerformed( data class ProcesserExtractWorkPerformed(
override val status: Status, override val status: Status,
override val message: String? = null, override val message: String? = null,
val producedBy: String, val producedBy: String,
val derivedFromEventId: String val derivedFromEventId: String,
val outFile: String? = null
): MessageDataWrapper(status, message) ): MessageDataWrapper(status, message)

View File

@ -7,7 +7,7 @@ import no.iktdev.mediaprocessing.shared.kafka.core.KafkaEvents
import no.iktdev.mediaprocessing.shared.kafka.dto.Message import no.iktdev.mediaprocessing.shared.kafka.dto.Message
import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper import no.iktdev.mediaprocessing.shared.kafka.dto.MessageDataWrapper
import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted
import no.iktdev.streamit.library.kafka.dto.Status import no.iktdev.mediaprocessing.shared.kafka.dto.Status
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThat