diff --git a/.github/workflows/v4.yml b/.github/workflows/v4.yml new file mode 100644 index 00000000..0244704d --- /dev/null +++ b/.github/workflows/v4.yml @@ -0,0 +1,295 @@ +name: Build v4 + +on: + push: + branches: + - v4 + pull_request: + branches: + - v4 + workflow_dispatch: + + +jobs: + pre-check: + runs-on: ubuntu-latest + outputs: + pyMetadata: ${{ steps.filter.outputs.pyMetadata }} + coordinator: ${{ steps.filter.outputs.coordinator }} + processer: ${{ steps.filter.outputs.processer }} + converter: ${{ steps.filter.outputs.converter }} + shared: ${{ steps.filter.outputs.shared }} + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - uses: dorny/paths-filter@v2 + id: filter + with: + filters: | + pyMetadata: + - 'apps/pyMetadata/**' + apps/coordinator: + - 'apps/coordinator/**' + apps/processer: + - 'apps/processer/**' + apps/converter: + - 'apps/converter/**' + + shared: + - 'shared/**' + # Step to print the outputs from "pre-check" job + - name: Print Outputs from pre-check job + run: | + echo "Apps\n" + echo "app:pyMetadata: ${{ needs.pre-check.outputs.pyMetadata }}" + echo "app:coordinator: ${{ needs.pre-check.outputs.coordinator }}" + echo "app:processer: ${{ needs.pre-check.outputs.processer }}" + echo "app:converter: ${{ needs.pre-check.outputs.converter }}" + + echo "Shared" + echo "shared: ${{ needs.pre-check.outputs.shared }}" + echo "\n" + echo "${{ needs.pre-check.outputs }}" + echo "${{ needs.pre-check }}" + + build-shared: + runs-on: ubuntu-latest + needs: pre-check + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Cache Shared code Gradle dependencies + id: cache-gradle + uses: actions/cache@v2 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }} + + - name: Build Shared code + if: steps.cache-gradle.outputs.cache-hit != 'true' || needs.pre-check.outputs.shared == 'true' || github.event_name == 'workflow_dispatch' + run: | + chmod +x ./gradlew + ./gradlew :shared:build --stacktrace --info + + + build-processer: + needs: build-shared + if: ${{ needs.pre-check.outputs.processer == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }} + runs-on: ubuntu-latest + #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Cache Shared Gradle dependencies + id: cache-gradle + uses: actions/cache@v2 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }} + + - name: Extract version from build.gradle.kts + id: extract_version + run: | + VERSION=$(cat ./apps/processer/build.gradle.kts | grep '^version\s*=\s*\".*\"' | sed 's/^version\s*=\s*\"\(.*\)\"/\1/') + echo "VERSION=$VERSION" + echo "VERSION=$VERSION" >> $GITHUB_ENV + + + - name: Build Processer module + id: build-processer + run: | + chmod +x ./gradlew + ./gradlew :apps:processer:bootJar --info --stacktrace + echo "Build completed" + + + - name: Generate Docker image tag + id: docker-tag + run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" + + - name: Docker login + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: . + file: ./dockerfiles/DebianJavaFfmpeg + build-args: | + MODULE_NAME=processer + PASS_APP_VERSION=${{ env.VERSION }} + push: true + tags: | + bskjon/mediaprocessing-processer:v4 + bskjon/mediaprocessing-processer:v4-${{ github.sha }} + bskjon/mediaprocessing-processer:v4-${{ steps.docker-tag.outputs.tag }} + + build-converter: + needs: build-shared + if: ${{ needs.pre-check.outputs.converter == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }} + runs-on: ubuntu-latest + #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Cache Shared Gradle dependencies + id: cache-gradle + uses: actions/cache@v3 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }} + + - name: Extract version from build.gradle.kts + id: extract_version + run: | + VERSION=$(cat ./apps/converter/build.gradle.kts | grep '^version\s*=\s*\".*\"' | sed 's/^version\s*=\s*\"\(.*\)\"/\1/') + echo "VERSION=$VERSION" + echo "VERSION=$VERSION" >> $GITHUB_ENV + + + - name: Build Converter module + id: build-converter + run: | + chmod +x ./gradlew + ./gradlew :apps:converter:bootJar --info --debug + echo "Build completed" + + + - name: Generate Docker image tag + id: docker-tag + run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" + + - name: Docker login + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: . + file: ./dockerfiles/DebianJava + build-args: | + MODULE_NAME=converter + PASS_APP_VERSION=${{ env.VERSION }} + push: true + tags: | + bskjon/mediaprocessing-converter:v4 + bskjon/mediaprocessing-converter:v4-${{ github.sha }} + bskjon/mediaprocessing-converter:v4-${{ steps.docker-tag.outputs.tag }} + + build-coordinator: + needs: build-shared + if: ${{ needs.pre-check.outputs.coordinator == 'true' || github.event_name == 'workflow_dispatch' || needs.pre-check.outputs.shared == 'true' }} + runs-on: ubuntu-latest + #if: ${{ github.event_name == 'push' || github.event_name == 'workflow_dispatch' }} + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Cache Shared Gradle dependencies + id: cache-gradle + uses: actions/cache@v2 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('shared/build.gradle.kts') }} + + - name: Extract version from build.gradle.kts + id: extract_version + run: | + VERSION=$(cat ./apps/coordinator/build.gradle.kts | grep '^version\s*=\s*\".*\"' | sed 's/^version\s*=\s*\"\(.*\)\"/\1/') + echo "VERSION=$VERSION" + echo "VERSION=$VERSION" >> $GITHUB_ENV + + - name: Build Coordinator module + id: build-coordinator + run: | + chmod +x ./gradlew + ./gradlew :apps:coordinator:bootJar + echo "Build completed" + + + - name: Generate Docker image tag + id: docker-tag + run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" + + - name: Docker login + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Debug Check extracted version + run: | + echo "Extracted version: ${{ env.VERSION }}" + + - name: Build and push Docker image + uses: docker/build-push-action@v2 + with: + context: . + file: ./dockerfiles/DebianJavaFfmpeg + build-args: | + MODULE_NAME=coordinator + PASS_APP_VERSION=${{ env.VERSION }} + push: true + tags: | + bskjon/mediaprocessing-coordinator:v4 + bskjon/mediaprocessing-coordinator:v4-${{ github.sha }} + bskjon/mediaprocessing-coordinator:v4-${{ steps.docker-tag.outputs.tag }} + + build-pymetadata: + needs: pre-check + if: ${{ needs.pre-check.outputs.pyMetadata == 'true' || github.event_name == 'workflow_dispatch' }} + runs-on: ubuntu-latest + + steps: + - name: Checkout repository + uses: actions/checkout@v2 + + - name: Build pyMetadata module + id: build-pymetadata + run: | + if [[ "${{ steps.check-pymetadata.outputs.changed }}" == "true" || "${{ github.event_name }}" == "push" || "${{ github.event_name }}" == "workflow_dispatch" ]]; then + cd apps/pyMetadata + # Add the necessary build steps for your Python module here + echo "Build completed" + else + echo "pyMetadata has not changed. Skipping pyMetadata module build." + echo "::set-output name=job_skipped::true" + fi + + - name: Generate Docker image tag + id: docker-tag + run: echo "::set-output name=tag::$(date -u +'%Y.%m.%d')-$(uuidgen | cut -c 1-8)" + + - name: Docker login + uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9 + with: + username: ${{ secrets.DOCKER_HUB_NAME }} + password: ${{ secrets.DOCKER_HUB_TOKEN }} + + - name: Build and push Docker image + uses: docker/build-push-action@v5.1.0 + with: + context: . + file: ./dockerfiles/Python + build-args: + MODULE_NAME=pyMetadata + push: true + tags: | + bskjon/mediaprocessing-pymetadata:v4 + bskjon/mediaprocessing-pymetadata:v4-${{ github.sha }} + bskjon/mediaprocessing-pymetadata:v4-${{ steps.docker-tag.outputs.tag }} diff --git a/.idea/workspace.xml b/.idea/workspace.xml index eb377d6e..2c8c4988 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -4,37 +4,44 @@ + @@ -105,28 +118,32 @@ - { - "keyToString": { - "Gradle.AudioArgumentsTest.executor": "Run", - "Gradle.AudioArgumentsTest.validateChecks1.executor": "Run", - "Gradle.AudioArgumentsTest.validateChecks3.executor": "Run", - "Gradle.Build MediaProcessing2.executor": "Run", - "Gradle.EncodeArgumentCreatorTaskTest.executor": "Run", - "Gradle.MediaProcessing2 [build].executor": "Run", - "Kotlin.UIApplicationKt.executor": "Run", - "RunOnceActivity.OpenProjectViewOnStart": "true", - "RunOnceActivity.ShowReadmeOnStart": "true", - "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrary": "JUnit5", - "com.intellij.testIntegration.createTest.CreateTestDialog.defaultLibrarySuperClass.JUnit5": "", - "git-widget-placeholder": "v3", - "ignore.virus.scanning.warn.message": "true", - "kotlin-language-version-configured": "true", - "last_opened_file_path": "D:/Workspace/MediaProcessing2/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract", - "project.structure.last.edited": "Modules", - "project.structure.proportion": "0.0", - "project.structure.side.proportion": "0.0" + +}]]> @@ -154,8 +171,8 @@ - - + + - + - + - - - + + + - - - - + + + + @@ -298,8 +315,201 @@ + + + + + + + + + + + + + + + + + + + + + + @@ -308,6 +518,61 @@ 23 diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt index 27dcc244..f94c82fc 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/tasks/ConvertService.kt @@ -72,6 +72,7 @@ class ConvertService( override fun onCompleted(inputFile: String, outputFiles: List) { val task = assignedTask ?: return + val taskData: ConvertData = task.data as ConvertData log.info { "Convert completed for ${task.referenceId}" } val claimSuccessful = taskManager.markTaskAsCompleted(task.referenceId, task.eventId) @@ -95,6 +96,7 @@ class ConvertService( source = getProducerName() ), data = ConvertedData( + language = taskData.language, outputFiles = outputFiles ) )) diff --git a/apps/coordinator/build.gradle.kts b/apps/coordinator/build.gradle.kts index d4352d8b..5966f70e 100644 --- a/apps/coordinator/build.gradle.kts +++ b/apps/coordinator/build.gradle.kts @@ -37,7 +37,7 @@ dependencies { implementation("org.json:json:20210307") implementation("no.iktdev:exfl:0.0.16-SNAPSHOT") - implementation("no.iktdev.streamit.library:streamit-library-db:0.0.6-alpha27") + implementation("no.iktdev.streamit.library:streamit-library-db:1.0.0-alpha11") implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.7.1") diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt index 47223269..9eef48d6 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CompletedTaskListener.kt @@ -7,22 +7,17 @@ import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.coordinator.getStoreDatabase import no.iktdev.eventi.database.executeOrException -import no.iktdev.eventi.database.executeWithStatus import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store.* +import no.iktdev.mediaprocessing.coordinator.tasksV2.validator.CompletionValidator import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.* -import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents -import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats import no.iktdev.mediaprocessing.shared.common.contract.reader.* -import no.iktdev.streamit.library.db.query.CatalogQuery -import no.iktdev.streamit.library.db.query.GenreQuery -import no.iktdev.streamit.library.db.query.SubtitleQuery import no.iktdev.streamit.library.db.query.SummaryQuery import no.iktdev.streamit.library.db.tables.catalog import no.iktdev.streamit.library.db.tables.titles import org.jetbrains.exposed.exceptions.ExposedSQLException -import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.insertIgnore import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.update @@ -32,7 +27,7 @@ import java.io.File import java.sql.SQLIntegrityConstraintViolationException @Service -class CompletedTaskListener: CoordinatorEventListener() { +class CompletedTaskListener : CoordinatorEventListener() { val log = KotlinLogging.logger {} var doNotProduceComplete = System.getenv("DISABLE_COMPLETE").toBoolean() ?: false @@ -64,104 +59,6 @@ class CompletedTaskListener: CoordinatorEventListener() { Events.EventWorkExtractPerformed ) - /** - * Checks whether it requires encode or extract or both, and it has created events with args - */ - fun req1(started: MediaProcessStartEvent, events: List): Boolean { - val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.ENCODE) == true) { - events.any { it.eventType == Events.EventMediaParameterEncodeCreated } - } else true - - val extractFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.EXTRACT) == true) { - events.any { it.eventType == Events.EventMediaParameterExtractCreated } - } else true - - if (!encodeFulfilledOrSkipped || !extractFulfilledOrSkipped) { - return false - } else return true - } - - /** - * Checks whether work that was supposed to be created has been created. - * Checks if all subtitles that can be processed has been created if convert is set. - */ - fun req2(operations: List, events: List): Boolean { - if (StartOperationEvents.ENCODE in operations) { - val encodeParamter = events.find { it.eventType == Events.EventMediaParameterEncodeCreated }?.az() - val encodeWork = events.find { it.eventType == Events.EventWorkEncodeCreated } - if (encodeParamter?.isSuccessful() == true && (encodeWork == null)) - return false - } - - val extractParamter = events.find { it.eventType == Events.EventMediaParameterExtractCreated }?.az() - val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } - if (StartOperationEvents.EXTRACT in operations) { - if (extractParamter?.isSuccessful() == true && extractParamter.data?.size != extractWork.size) - return false - } - - if (StartOperationEvents.CONVERT in operations) { - val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } - - val supportedSubtitleFormats = SubtitleFormats.entries.map { it.name } - val eventsSupportsConvert = extractWork.filter { it.data is ExtractArgumentData } - .filter { (it.dataAs()?.outputFile?.let { f -> File(f).extension.uppercase() } in supportedSubtitleFormats) } - - if (convertWork.size != eventsSupportsConvert.size) - return false - } - - return true - } - - /** - * Checks whether all work that has been created has been completed - */ - fun req3(operations: List, events: List): Boolean { - if (StartOperationEvents.ENCODE in operations) { - val encodeWork = events.filter { it.eventType == Events.EventWorkEncodeCreated } - val encodePerformed = events.filter { it.eventType == Events.EventWorkEncodePerformed } - if (encodePerformed.size < encodeWork.size) - return false - } - - if (StartOperationEvents.EXTRACT in operations) { - val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } - val extractPerformed = events.filter { it.eventType == Events.EventWorkExtractPerformed } - if (extractPerformed.size < extractWork.size) - return false - } - - if (StartOperationEvents.CONVERT in operations) { - val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } - val convertPerformed = events.filter { it.eventType == Events.EventWorkConvertPerformed } - if (convertPerformed.size < convertWork.size) - return false - } - - return true - } - - /** - * Checks if metadata has cover, if so, 2 events are expected - */ - fun req4(events: List): Boolean { - val metadata = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed } - if (metadata?.isSuccessful() != true) { - return true - } - - val hasCover = metadata.dataAs()?.cover != null - if (hasCover == false) { - return true - } - - if (events.any { it.eventType == Events.EventMediaReadOutCover } && events.any { it.eventType == Events.EventWorkDownloadCoverPerformed }) { - return true - } - return false - } - override fun isPrerequisitesFulfilled(incomingEvent: Event, events: List): Boolean { val started = events.find { it.eventType == Events.EventMediaProcessStarted }?.az() @@ -171,104 +68,28 @@ class CompletedTaskListener: CoordinatorEventListener() { } val viableEvents = events.filter { it.isSuccessful() } - - if (!req1(started, events)) { - //log.info { "${this::class.java.simpleName} Failed Req1" } + if (!CompletionValidator.req1(started, events)) { return false } - if (!req2(started.data?.operations ?: emptyList(), viableEvents)) { - //log.info { "${this::class.java.simpleName} Failed Req2" } + if (!CompletionValidator.req2(started.data?.operations ?: emptyList(), viableEvents)) { return false } - if (!req3(started.data?.operations ?: emptyList(), events)) { - //log.info { "${this::class.java.simpleName} Failed Req3" } + if (!CompletionValidator.req3(started.data?.operations ?: emptyList(), events)) { return false } - if (!req4(events)) { - log.info { "${this::class.java.simpleName} Failed Req4" } + if (!CompletionValidator.req4(events)) { return false } - return super.isPrerequisitesFulfilled(incomingEvent, events) } - fun getMetadata(events: List): MetadataDto? { - val baseInfo = events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az() - val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az() - val metadataInfo = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az() - val coverInfo = events.find { it.eventType == Events.EventWorkDownloadCoverPerformed }?.az() - val coverTask = events.find { it.eventType == Events.EventMediaReadOutCover }?.az() - - if (baseInfo == null) { - log.info { "Cant find BaseInfoEvent on ${Events.EventMediaReadBaseInfoPerformed}" } - return null - } - - if (mediaInfo == null) { - log.info { "Cant find MediaOutInformationConstructedEvent on ${Events.EventMediaReadOutNameAndType}" } - return null - } - - if (metadataInfo == null) { - log.info { "Cant find MediaMetadataReceivedEvent on ${Events.EventMediaMetadataSearchPerformed}" } - return null - } - - if (coverTask?.isSkipped() == false && coverInfo == null) { - log.info { "Cant find MediaCoverDownloadedEvent on ${Events.EventWorkDownloadCoverPerformed}" } - } - - val mediaInfoData = mediaInfo.data?.toValueObject() - val baseInfoData = baseInfo.data - val metadataInfoData = metadataInfo.data - - - val collection = mediaInfo.data?.outDirectory?.let { File(it).name } ?: baseInfoData?.title - - val coverFileName = coverInfo?.data?.absoluteFilePath?.let { - File(it).name - } - - return MetadataDto( - title = mediaInfoData?.title ?: baseInfoData?.title ?: metadataInfoData?.title ?: return null, - collection = collection ?: return null, - cover = coverFileName, - type = metadataInfoData?.type ?: mediaInfoData?.type ?: return null, - summary = metadataInfoData?.summary?.filter {it.summary != null }?.map { SummaryInfo(language = it.language, summary = it.summary!! ) } ?: emptyList(), - genres = metadataInfoData?.genres ?: emptyList(), - titles = (metadataInfoData?.altTitle ?: emptyList()) + listOfNotNull(mediaInfoData?.title, baseInfoData?.title) - ) - } - - fun getGenres(events: List): List { - val metadataInfo = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az() - return metadataInfo?.data?.genres ?: emptyList() - } - - fun getSubtitles(metadataDto: MetadataDto?, events: List): List { - val extracted = events.filter { it.eventType == Events.EventWorkExtractPerformed }.mapNotNull { it.dataAs() } - val converted = events.filter { it.eventType == Events.EventWorkConvertPerformed }.mapNotNull { it.dataAs() } - - val outFiles = extracted.map { it.outputFile } + converted.flatMap { it.outputFiles } - - return outFiles.map { - val subtitleFile = File(it) - SubtitlesDto( - collection = metadataDto?.collection ?: subtitleFile.parentFile.parentFile.name, - language = subtitleFile.parentFile.name, - subtitleFile = subtitleFile.name, - format = subtitleFile.extension.uppercase(), - associatedWithVideo = subtitleFile.nameWithoutExtension, - ) - } - } - fun getVideo(events: List): VideoDetails? { - val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType }?.az() + val mediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType } + ?.az() val encoded = events.find { it.eventType == Events.EventWorkEncodePerformed }?.dataAs()?.outputFile if (encoded == null) { log.warn { "No encode no video details!" } @@ -290,120 +111,8 @@ class CompletedTaskListener: CoordinatorEventListener() { return details } - fun storeSubtitles(subtitles: List) { - subtitles.forEach { subtitle -> - subtitle to executeWithStatus(getStoreDatabase()) { - SubtitleQuery( - collection = subtitle.collection, - associatedWithVideo = subtitle.associatedWithVideo, - language = subtitle.language, - format = subtitle.format, - file = subtitle.subtitleFile - ).insert() - } - } - } - fun storeTitles(usedTitle: String, metadata: MetadataDto) { - try { - withTransaction(getStoreDatabase()) { - titles.insertIgnore { - it[masterTitle] = metadata.collection - it[title] = NameHelper.normalize(usedTitle) - it[type] = 1 - } - titles.insertIgnore { - it[masterTitle] = usedTitle - it[title] = NameHelper.normalize(usedTitle) - it[type] = 2 - } - metadata.titles.forEach { title -> - titles.insertIgnore { - it[masterTitle] = usedTitle - it[titles.title] = title - } - } - } - } catch (e: Exception) { - e.printStackTrace() - } - } - - fun storeMetadata(catalogId: Int, metadata: MetadataDto) { - if (!metadata.cover.isNullOrBlank()) { - withTransaction(getStoreDatabase()) { - val storedCatalogCover = catalog.select { - (catalog.id eq catalogId) - }.map { it[catalog.cover] }.firstOrNull() - if (storedCatalogCover.isNullOrBlank()) { - catalog.update({ - catalog.id eq catalogId - }) { - it[catalog.cover] = metadata.cover - } - } - } - } - - - metadata.summary.forEach { - val result = executeOrException(getStoreDatabase().database) { - SummaryQuery( - cid = catalogId, - language = it.language, - description = it.summary - ).insert() - } - val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062 - if (!ignoreException) { - result?.printStackTrace() - } - } - } - - fun storeAndGetGenres(genres: List): String? { - return withTransaction(getStoreDatabase()) { - val gq = GenreQuery( *genres.toTypedArray() ) - gq.insertAndGetIds() - gq.getIds().joinToString(",") - } - } - - fun storeCatalog(metadata: MetadataDto, videoDetails: VideoDetails, genres: String?): Int? { - val precreatedCatalogQuery = CatalogQuery( - title = NameHelper.normalize(metadata.title), - cover = metadata.cover, - type = metadata.type, - collection = metadata.collection, - genres = genres - ) - - val result = when (videoDetails.type) { - "serie" -> { - val serieInfo = videoDetails.serieInfo ?: throw RuntimeException("SerieInfo missing in VideoDetails for Serie! ${videoDetails.fileName}") - executeOrException { - precreatedCatalogQuery.insertWithSerie( - episodeTitle = serieInfo.episodeTitle ?: "", - videoFile = videoDetails.fileName, - episode = serieInfo.episodeNumber, - season = serieInfo.seasonNumber - ) - } - } - "movie" -> { - executeOrException { - precreatedCatalogQuery.insertWithMovie(videoDetails.fileName) - } - } - else -> throw RuntimeException("${videoDetails.type} is not supported!") - } - val ignoreException = result?.cause is SQLIntegrityConstraintViolationException && (result as ExposedSQLException).errorCode == 1062 - return withTransaction(getStoreDatabase()) { - precreatedCatalogQuery.getId() - } - } - override fun shouldIProcessAndHandleEvent(incomingEvent: Event, events: List): Boolean { val result = super.shouldIProcessAndHandleEvent(incomingEvent, events) return result @@ -413,27 +122,63 @@ class CompletedTaskListener: CoordinatorEventListener() { val event = incomingEvent.consume() ?: return active = true - val metadata = getMetadata(events) - val genres = getGenres(events) - val subtitles = getSubtitles(metadata, events) - val video = getVideo(events) + val mediaInfo: ComposedMediaInfo = composeMediaInfo(events) ?: run { + log.error { "Unable to compose media info for ${event.referenceId()}" } + return + } + + val existingTitles = ContentTitleStore.findMasterTitles(mediaInfo.titles) + + val usableCollection: String = if (existingTitles.isNotEmpty()) + ContentCatalogStore.getCollectionByTitleAndType(mediaInfo.type, existingTitles) ?: run { + log.warn { "Did not receive collection based on titles provided in list ${existingTitles.joinToString(",")}, falling back to fallbackCollection: ${mediaInfo.fallbackCollection}" } + mediaInfo.fallbackCollection + } else mediaInfo.fallbackCollection + + val mover = ContentCompletionMover(usableCollection, events) + val newVideoPath = mover.moveVideo() + val newCoverPath = mover.moveCover() + val newSubtitles = mover.moveSubtitles() + + val genreIdsForCatalog = ContentGenresStore.storeAndGetIds(mediaInfo.genres) - val storedGenres = storeAndGetGenres(genres) - val catalogId = if (metadata != null && video != null) { - storeCatalog(metadata, video, storedGenres) - } else null + val catalogId = ContentCatalogStore.storeCatalog( + title = mediaInfo.title, + collection = usableCollection, + type = mediaInfo.type, + cover = newCoverPath?.second?.let { dp -> File(dp).name }, + genres = genreIdsForCatalog, + ) + + getVideo(events)?.let { video -> + ContentCatalogStore.storeMedia( + title = mediaInfo.title, + collection = usableCollection, + type = mediaInfo.type, + videoDetails = video + ) + } - - storeSubtitles(subtitles) - metadata?.let { - storeTitles(metadata = metadata, usedTitle = metadata.title) - catalogId?.let { id -> - storeMetadata(catalogId = id, metadata = it) + val storedSubtitles = newSubtitles?.let { subtitles -> + subtitles.mapNotNull { + ContentSubtitleStore.storeSubtitles( + collection = usableCollection, + language = it.language, + destinationFile = it.destination + ) } } + catalogId?.let { cid -> + mediaInfo.summaries.forEach { + ContentMetadataStore.storeSummary(cid, it) + } + ContentTitleStore.store(mediaInfo.title, mediaInfo.titles) + } + + if (!doNotProduceComplete) { onProduceEvent(MediaProcessCompletedEvent( metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), @@ -448,4 +193,57 @@ class CompletedTaskListener: CoordinatorEventListener() { active = false } + internal data class ComposedMediaInfo( + val title: String, + val fallbackCollection: String, + + val titles: List, + val type: String, + val summaries: List, + val genres: List + ) + + private fun composeMediaInfo(events: List): ComposedMediaInfo? { + val baseInfo = + events.find { it.eventType == Events.EventMediaReadBaseInfoPerformed }?.az()?.let { + it.data + } ?: run { + log.info { "Cant find BaseInfoEvent on ${Events.EventMediaReadBaseInfoPerformed}" } + return null + } + val metadataInfo = + events.find { it.eventType == Events.EventMediaMetadataSearchPerformed }?.az()?.data + ?: run { + log.info { "Cant find MediaMetadataReceivedEvent on ${Events.EventMediaMetadataSearchPerformed}" } + null + } + val mediaInfo: MediaInfo = events.find { it.eventType == Events.EventMediaReadOutNameAndType } + ?.az()?.let { + it.data?.toValueObject() + } ?: run { + log.info { "Cant find MediaOutInformationConstructedEvent on ${Events.EventMediaReadOutNameAndType}" } + return null + } + + val summaries = metadataInfo?.summary?.filter { it.summary != null } + ?.map { SummaryInfo(language = it.language, summary = it.summary!!) } ?: emptyList() + + val titles: MutableList = mutableListOf(mediaInfo.title) + metadataInfo?.let { + titles.addAll(it.altTitle) + titles.add(it.title) + titles.add(NameHelper.normalize(it.title)) + } + + return ComposedMediaInfo( + title = NameHelper.normalize(metadataInfo?.title ?: mediaInfo.title), + fallbackCollection = baseInfo.title, + titles = titles, + type = metadataInfo?.type ?: mediaInfo.type, + summaries = summaries, + genres = metadataInfo?.genres ?: emptyList() + ) + } + + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt index 38648fc0..55f13a16 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ConvertWorkTaskListener.kt @@ -63,8 +63,13 @@ class ConvertWorkTaskListener: WorkTaskListener() { return } + var language: String? = null + + val file = if (event.eventType == Events.EventWorkExtractPerformed) { - event.az()?.data?.outputFile + val foundEvent = event.az()?.data + language = foundEvent?.language + foundEvent?.outputFile } else if (event.eventType == Events.EventMediaProcessStarted) { val startEvent = event.az()?.data if (startEvent?.operations?.isOnly(StartOperationEvents.CONVERT) == true) { @@ -77,6 +82,15 @@ class ConvertWorkTaskListener: WorkTaskListener() { val convertFile = file?.let { File(it) } + if (language.isNullOrEmpty()) { + convertFile?.parentFile?.nameWithoutExtension?.let { + if (it.length == 3) { + language = it.lowercase() + } + } + } + + if (convertFile == null || !convertFile.exists()) { onProduceEvent(ConvertWorkCreatedEvent( metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()) @@ -84,6 +98,7 @@ class ConvertWorkTaskListener: WorkTaskListener() { return } else { val convertData = ConvertData( + language = language ?: "unk", inputFile = convertFile.absolutePath, outputFileName = convertFile.nameWithoutExtension, outputDirectory = convertFile.parentFile.absolutePath, diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt index f5231a55..7f37b29a 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverDownloadTaskListener.kt @@ -6,9 +6,11 @@ import no.iktdev.eventi.core.ConsumableEvent import no.iktdev.eventi.core.WGson import no.iktdev.eventi.data.EventStatus import no.iktdev.eventi.implementations.EventCoordinator +import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener import no.iktdev.mediaprocessing.shared.common.DownloadClient +import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.EventsListenerContract import no.iktdev.mediaprocessing.shared.common.contract.EventsManagerContract @@ -49,24 +51,13 @@ class CoverDownloadTaskListener : CoordinatorEventListener() { return } - val outDir = File(data.outDir) - .also { - if (!it.exists()) { - it.mkdirs() - } - } - if (!outDir.exists()) { - log.error { "Check for output directory for cover storage failed for ${event.metadata.eventId} " } - onProduceEvent(failedEventDefault) - } - - val client = DownloadClient(data.url, File(data.outDir), data.outFileBaseName) + val client = DownloadClient(data.url, SharedConfig.cachedContent, data.outFileBaseName) val outFile = runBlocking { client.getOutFile() } - val coversInDifferentFormats = outDir.listFiles { it -> it.isFile && it.extension.lowercase() in client.contentTypeToExtension().values } ?: emptyArray() + val coversInDifferentFormats = SharedConfig.cachedContent.listFiles { it -> it.isFile && it.extension.lowercase() in client.contentTypeToExtension().values } ?: emptyArray() val result = if (outFile?.exists() == true) { outFile diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt index ab0b0e49..0827d9b7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/CoverFromMetadataTaskListener.kt @@ -86,7 +86,6 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() { data = CoverDetails( url = coverUrl, outFileBaseName = NameHelper.normalize(coverTitle), - outDir = mediaOutInfo.outDirectory, ) ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt index 05e8cac1..af87ad37 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/EncodeWorkArgumentsTaskListener.kt @@ -84,7 +84,6 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() { val mapper = EncodeWorkArgumentsMapping( inputFile = inputFile, outFileFullName = mediaInfoData.fullName, - outFileAbsolutePathFile = mediaInfo.data?.outDirectory?.let { File(it) } ?: return, streams = streams, preference = preference.encodePreference ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt index 67e7932b..4888e1d4 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/ExtractWorkArgumentsTaskListener.kt @@ -77,7 +77,6 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() { val mapper = ExtractWorkArgumentsMapping( inputFile = inputFile, outFileFullName = mediaInfoData.fullName, - outFileAbsolutePathFile = mediaInfo.data?.outDirectory?.let { File(it) } ?: return, streams = streams ) diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt index 2225b22d..b7b13cfd 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/listeners/MediaOutInformationTaskListener.kt @@ -7,12 +7,10 @@ import no.iktdev.eventi.data.EventStatus import no.iktdev.exfl.using import no.iktdev.mediaprocessing.coordinator.Coordinator import no.iktdev.mediaprocessing.coordinator.CoordinatorEventListener -import no.iktdev.mediaprocessing.coordinator.utils.log +import no.iktdev.mediaprocessing.coordinator.log import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.parsing.FileNameDeterminate import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper -import no.iktdev.mediaprocessing.shared.common.parsing.Regexes -import no.iktdev.mediaprocessing.shared.common.parsing.isCharOnlyUpperCase import no.iktdev.mediaprocessing.shared.common.contract.Events import no.iktdev.mediaprocessing.shared.common.contract.data.* import no.iktdev.mediaprocessing.shared.common.contract.data.EpisodeInfo @@ -68,7 +66,6 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { val result = if (vi != null) { MediaInfoReceived( - outDirectory = pm.getOutputDirectory().absolutePath, info = vi ).let { MediaOutInformationConstructedEvent( metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), @@ -135,7 +132,6 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { val filteredMetaTitles = metaTitles.filter { it.lowercase().contains(baseInfo.title.lowercase()) || NameHelper.normalize(it).lowercase().contains(baseInfo.title.lowercase()) } - //val viableFileTitles = filteredMetaTitles.filter { !it.isCharOnlyUpperCase() } return if (collection == baseInfo.title) { collection @@ -162,10 +158,6 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() { } } - fun getOutputDirectory() = SharedConfig.outgoingContent.using(NameHelper.normalize(getCollection())) - - - } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMapping.kt index 6ecf2b5b..0e3575cc 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMapping.kt @@ -13,13 +13,11 @@ import java.io.File class EncodeWorkArgumentsMapping( val inputFile: String, val outFileFullName: String, - val outFileAbsolutePathFile: File, val streams: ParsedMediaStreams, val preference: EncodingPreference ) { fun getArguments(): EncodeArgumentData? { - val outVideoFileAbsolutePath = outFileAbsolutePathFile.using("${outFileFullName}.mp4").absolutePath val vaas = VideoAndAudioSelector(streams, preference) val vArg = vaas.getVideoStream() ?.let { VideoArguments(it, streams, preference.video).getVideoArguments() } @@ -32,7 +30,7 @@ class EncodeWorkArgumentsMapping( } else { EncodeArgumentData( inputFile = inputFile, - outputFile = outVideoFileAbsolutePath, + outputFileName = "${outFileFullName}.mp4", arguments = vaArgs ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/ExtractWorkArgumentsMapping.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/ExtractWorkArgumentsMapping.kt index 13895a66..31d7aa6b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/ExtractWorkArgumentsMapping.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/ExtractWorkArgumentsMapping.kt @@ -9,19 +9,18 @@ import java.io.File class ExtractWorkArgumentsMapping( val inputFile: String, val outFileFullName: String, - val outFileAbsolutePathFile: File, val streams: ParsedMediaStreams ) { fun getArguments(): List { - val subDir = outFileAbsolutePathFile.using("sub") val sArg = SubtitleArguments(streams.subtitleStream).getSubtitleArguments() val entries = sArg.map { ExtractArgumentData( inputFile = inputFile, + language = it.language, arguments = it.codecParameters + it.optionalParameters + listOf("-map", "0:s:${it.index}"), - outputFile = subDir.using(it.language, "${outFileFullName}.${it.format}").absolutePath + outputFileName = "${outFileFullName}.${it.language}.${it.format}" ) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCatalogStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCatalogStore.kt new file mode 100644 index 00000000..4c42b7ed --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCatalogStore.kt @@ -0,0 +1,129 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import mu.KotlinLogging +import no.iktdev.eventi.database.executeOrException +import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase +import no.iktdev.mediaprocessing.shared.common.contract.reader.MetadataDto +import no.iktdev.mediaprocessing.shared.common.contract.reader.VideoDetails +import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper +import no.iktdev.streamit.library.db.insertWithSuccess +import no.iktdev.streamit.library.db.query.CatalogQuery +import no.iktdev.streamit.library.db.query.MovieQuery +import no.iktdev.streamit.library.db.query.SerieQuery +import no.iktdev.streamit.library.db.tables.catalog +import no.iktdev.streamit.library.db.tables.serie +import org.jetbrains.exposed.exceptions.ExposedSQLException +import org.jetbrains.exposed.sql.* +import java.sql.SQLIntegrityConstraintViolationException + +object ContentCatalogStore { + val log = KotlinLogging.logger {} + + /** + * Given a list of titles and type, + * the codes purpose is to find the matching collection in the catalog by title + */ + fun getCollectionByTitleAndType(type: String, titles: List): String? { + return withTransaction(getStoreDatabase()) { + catalog.select { + (catalog.type eq type) and + ((catalog.title inList titles) or + (catalog.collection inList titles)) + }.map { + it[catalog.collection] + }.firstOrNull() + } + } + + private fun getCover(collection: String, type: String): String? { + return withTransaction(getStoreDatabase()) { + catalog.select { + (catalog.collection eq collection) and + (catalog.type eq type) + }.map { it[catalog.cover] }.firstOrNull() + } + } + + fun storeCatalog(title: String, collection: String, type: String, cover: String?, genres: String?): Int? { + withTransaction(getStoreDatabase()) { + val existingRow = catalog.select { + (catalog.collection eq collection) and + (catalog.type eq type) + }.firstOrNull() + + if (existingRow == null) { + catalog.insertIgnore { + it[catalog.title] = title + it[catalog.cover] = cover + it[catalog.type] = type + it[catalog.collection] = collection + it[catalog.genres] = genres + } + } else { + val id = existingRow[catalog.id] + val storedTitle = existingRow[catalog.title] + val useCover = existingRow[catalog.cover] ?: cover + val useGenres = existingRow[catalog.genres] ?: genres + + catalog.update({ + (catalog.id eq id) and + (catalog.collection eq collection) + }) { + it[catalog.cover] = useCover + it[catalog.genres] = useGenres + } + } + } + return getId(title, collection, type) + } + + private fun storeMovie(catalogId: Int, videoDetails: VideoDetails) { + val iid = MovieQuery(videoDetails.fileName).insertAndGetId() ?: run { + log.error { "Movie id was not returned!" } + return + } + withTransaction(getStoreDatabase()) { + catalog.update({ + (catalog.id eq catalogId) + }) { + it[catalog.iid] = iid + } + } + } + + private fun storeSerie(collection: String, videoDetails: VideoDetails) { + val serieInfo = videoDetails.serieInfo ?: run { + log.error { "serieInfo in videoDetails is null!" } + return + } + val insert = withTransaction(getStoreDatabase()) { + serie.insertIgnore { + it[title] = serieInfo.episodeTitle + it[episode] = serieInfo.episodeNumber + it[season] = serieInfo.seasonNumber + it[video] = videoDetails.fileName + it[serie.collection] = collection + } + } + } + + fun storeMedia(title: String, collection: String, type: String, videoDetails: VideoDetails) { + val catalogId = getId(title, collection, type) ?: return + when (type) { + "movie" -> storeMovie(catalogId, videoDetails) + "serie" -> storeSerie(collection, videoDetails) + } + } + + fun getId(title: String, collection: String, type: String): Int? { + return no.iktdev.streamit.library.db.withTransaction { + catalog.select { catalog.title eq title }.andWhere { + catalog.type eq type + }.map { it[catalog.id].value }.firstOrNull() + } + } + + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCompletionMover.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCompletionMover.kt new file mode 100644 index 00000000..264a5649 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentCompletionMover.kt @@ -0,0 +1,102 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import mu.KotlinLogging +import no.iktdev.eventi.data.dataAs +import no.iktdev.exfl.using +import no.iktdev.mediaprocessing.shared.common.SharedConfig +import no.iktdev.mediaprocessing.shared.common.contract.Events +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.moveTo +import no.iktdev.mediaprocessing.shared.common.notExist +import java.io.File + +class ContentCompletionMover(val collection: String, val events: List) { + val log = KotlinLogging.logger {} + val storeFolder = SharedConfig.outgoingContent.using(collection) + + init { + if (storeFolder.notExist()) { + log.info { "Creating missing folders for path ${storeFolder.absolutePath}" } + storeFolder.mkdirs() + } + } + + + /** + * @return Pair or null if no file found + */ + fun moveVideo(): Pair? { + val encodedFile = events.find { it.eventType == Events.EventWorkEncodePerformed }?.dataAs()?.outputFile?.let { + File(it) + } ?: return null + if (!encodedFile.exists()) { + log.error { "Provided file ${encodedFile.absolutePath} does not exist at the given location" } + return null + } + val storeFile = storeFolder.using(encodedFile.name) + val result = encodedFile.moveTo(storeFile) { + + } + return if (result) Pair(encodedFile.absolutePath, storeFile.absolutePath) else null + } + + fun moveCover(): Pair? { + val coverFile = events.find { it.eventType == Events.EventWorkDownloadCoverPerformed }?. + az()?.data?.absoluteFilePath?.let { + File(it) + } ?: return null + + if (coverFile.notExist()) { + log.error { "Provided file ${coverFile.absolutePath} does not exist at the given location" } + return null + } + val storeFile = storeFolder.using(coverFile.name) + val result = coverFile.moveTo(storeFile) + return if (result) Pair(coverFile.absolutePath, storeFile.absolutePath) else null + } + + + fun getMovableSubtitles(): Map> { + val extracted = + events.filter { it.eventType == Events.EventWorkExtractPerformed }.mapNotNull { it.dataAs() } + val converted = + events.filter { it.eventType == Events.EventWorkConvertPerformed }.mapNotNull { it.dataAs() } + + return extracted.groupBy { it.language }.mapValues { v -> v.value.map { File(it.outputFile) } } + + converted.groupBy { it.language }.mapValues { v -> v.value.flatMap { it.outputFiles }.map { File(it) } } + } + + data class MovedSubtitle( + val language: String, + val source: File, + val destination: File + ) + + fun moveSubtitles(): List? { + val subtitleFolder = storeFolder.using("sub") + val moved: MutableList = mutableListOf() + + val subtitles = getMovableSubtitles() + if (subtitles.isEmpty() || subtitles.values.isEmpty()) { + return null + } + for ((lang, files) in subtitles) { + val languageFolder = subtitleFolder.using(lang).also { + if (it.notExist()) { + it.mkdirs() + } + } + for (file in files) { + val storeFile = languageFolder.using(file.name) + val success = file.moveTo(storeFile) + if (success) { + moved.add(MovedSubtitle(lang, file, storeFile)) + } + } + + } + return moved + } + + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentGenresStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentGenresStore.kt new file mode 100644 index 00000000..6c6e761a --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentGenresStore.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase +import no.iktdev.streamit.library.db.query.GenreQuery + +object ContentGenresStore { + fun storeAndGetIds(genres: List): String? { + return try { + withTransaction(getStoreDatabase()) { + val gq = GenreQuery( *genres.toTypedArray() ) + gq.insertAndGetIds() + gq.getIds().joinToString(",") + } + } catch (e: Exception) { + e.printStackTrace() + return null + } + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentMetadataStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentMetadataStore.kt new file mode 100644 index 00000000..3cff632d --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentMetadataStore.kt @@ -0,0 +1,20 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import no.iktdev.eventi.database.executeOrException +import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase +import no.iktdev.mediaprocessing.shared.common.contract.reader.SummaryInfo +import no.iktdev.streamit.library.db.query.SummaryQuery + +object ContentMetadataStore { + + fun storeSummary(catalogId: Int, summaryInfo: SummaryInfo) { + val result = executeOrException(getStoreDatabase().database) { + SummaryQuery( + cid = catalogId, + language = summaryInfo.language, + description = summaryInfo.summary + ).insert() + } + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentSubtitleStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentSubtitleStore.kt new file mode 100644 index 00000000..36b2cb5e --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentSubtitleStore.kt @@ -0,0 +1,24 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import no.iktdev.eventi.database.executeWithStatus +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase +import no.iktdev.streamit.library.db.query.SubtitleQuery +import no.iktdev.streamit.library.db.tables.subtitle +import org.jetbrains.exposed.sql.insert +import java.io.File + +object ContentSubtitleStore { + + fun storeSubtitles(collection: String, language: String, destinationFile: File): Boolean { + return executeWithStatus (getStoreDatabase()) { + subtitle.insert { + it[this.associatedWithVideo] = destinationFile.nameWithoutExtension + it[this.language] = language + it[this.collection] = collection + it[this.format] = destinationFile.extension.uppercase() + it[this.subtitle] = destinationFile.name + } + } + } + +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentTitleStore.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentTitleStore.kt new file mode 100644 index 00000000..637526b7 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/store/ContentTitleStore.kt @@ -0,0 +1,42 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.mapping.store + +import no.iktdev.eventi.database.withTransaction +import no.iktdev.mediaprocessing.coordinator.getStoreDatabase +import no.iktdev.mediaprocessing.shared.common.parsing.NameHelper +import no.iktdev.streamit.library.db.tables.titles +import org.jetbrains.exposed.sql.insertIgnore +import org.jetbrains.exposed.sql.or +import org.jetbrains.exposed.sql.select + +object ContentTitleStore { + + fun store(mainTitle: String, otherTitles: List) { + try { + withTransaction(getStoreDatabase()) { + val titlesToUse = otherTitles + listOf( + NameHelper.normalize(mainTitle) + ).filter { it != mainTitle } + + titlesToUse.forEach { t -> + titles.insertIgnore { + it[masterTitle] = mainTitle + it[alternativeTitle] = t + } + } + } + } catch (e: Exception) { + e.printStackTrace() + } + } + + fun findMasterTitles(titleList: List): List { + return withTransaction(getStoreDatabase()) { + titles.select { + (titles.alternativeTitle inList titleList) or + (titles.masterTitle inList titleList) + }.map { + it[titles.masterTitle] + }.distinctBy { it } + } ?: emptyList() + } +} \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt new file mode 100644 index 00000000..9033a1ac --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/validator/CompletionValidator.kt @@ -0,0 +1,113 @@ +package no.iktdev.mediaprocessing.coordinator.tasksV2.validator + +import no.iktdev.eventi.data.dataAs +import no.iktdev.eventi.data.isSuccessful +import no.iktdev.mediaprocessing.shared.common.contract.Events +import no.iktdev.mediaprocessing.shared.common.contract.data.* +import no.iktdev.mediaprocessing.shared.common.contract.dto.StartOperationEvents +import no.iktdev.mediaprocessing.shared.common.contract.dto.SubtitleFormats +import java.io.File + +/** + * Validates whether all the required work has been created and processed in accordance with expected behaviour and sequence + */ +object CompletionValidator { + + /** + * Checks whether it requires encode or extract or both, and it has created events with args + */ + fun req1(started: MediaProcessStartEvent, events: List): Boolean { + val encodeFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.ENCODE) == true) { + events.any { it.eventType == Events.EventMediaParameterEncodeCreated } + } else true + + val extractFulfilledOrSkipped = if (started.data?.operations?.contains(StartOperationEvents.EXTRACT) == true) { + events.any { it.eventType == Events.EventMediaParameterExtractCreated } + } else true + + if (!encodeFulfilledOrSkipped || !extractFulfilledOrSkipped) { + return false + } else return true + } + + /** + * Checks whether work that was supposed to be created has been created. + * Checks if all subtitles that can be processed has been created if convert is set. + */ + fun req2(operations: List, events: List): Boolean { + if (StartOperationEvents.ENCODE in operations) { + val encodeParamter = events.find { it.eventType == Events.EventMediaParameterEncodeCreated }?.az() + val encodeWork = events.find { it.eventType == Events.EventWorkEncodeCreated } + if (encodeParamter?.isSuccessful() == true && (encodeWork == null)) + return false + } + + val extractParamter = events.find { it.eventType == Events.EventMediaParameterExtractCreated }?.az() + val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } + if (StartOperationEvents.EXTRACT in operations) { + if (extractParamter?.isSuccessful() == true && extractParamter.data?.size != extractWork.size) + return false + } + + if (StartOperationEvents.CONVERT in operations) { + val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } + + val supportedSubtitleFormats = SubtitleFormats.entries.map { it.name } + val eventsSupportsConvert = extractWork.filter { it.data is ExtractArgumentData } + .filter { (it.dataAs()?.outputFileName?.let { f -> File(f).extension.uppercase() } in supportedSubtitleFormats) } + + if (convertWork.size != eventsSupportsConvert.size) + return false + } + + return true + } + + /** + * Checks whether all work that has been created has been completed + */ + fun req3(operations: List, events: List): Boolean { + if (StartOperationEvents.ENCODE in operations) { + val encodeWork = events.filter { it.eventType == Events.EventWorkEncodeCreated } + val encodePerformed = events.filter { it.eventType == Events.EventWorkEncodePerformed } + if (encodePerformed.size < encodeWork.size) + return false + } + + if (StartOperationEvents.EXTRACT in operations) { + val extractWork = events.filter { it.eventType == Events.EventWorkExtractCreated } + val extractPerformed = events.filter { it.eventType == Events.EventWorkExtractPerformed } + if (extractPerformed.size < extractWork.size) + return false + } + + if (StartOperationEvents.CONVERT in operations) { + val convertWork = events.filter { it.eventType == Events.EventWorkConvertCreated } + val convertPerformed = events.filter { it.eventType == Events.EventWorkConvertPerformed } + if (convertPerformed.size < convertWork.size) + return false + } + + return true + } + + /** + * Checks if metadata has cover, if so, 2 events are expected + */ + fun req4(events: List): Boolean { + val metadata = events.find { it.eventType == Events.EventMediaMetadataSearchPerformed } + if (metadata?.isSuccessful() != true) { + return true + } + + val hasCover = metadata.dataAs()?.cover != null + if (hasCover == false) { + return true + } + + if (events.any { it.eventType == Events.EventMediaReadOutCover } && events.any { it.eventType == Events.EventWorkDownloadCoverPerformed }) { + return true + } + return false + } +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt deleted file mode 100644 index 13fea9a5..00000000 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/reader/BaseInfoFromFileTest.kt +++ /dev/null @@ -1,100 +0,0 @@ -package no.iktdev.mediaprocessing.coordinator.reader -/* -import com.google.gson.Gson -import no.iktdev.mediaprocessing.shared.kafka.core.CoordinatorProducer -import no.iktdev.mediaprocessing.shared.common.contract.ProcessType -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.BaseInfoPerformed -import no.iktdev.mediaprocessing.shared.kafka.dto.events_result.ProcessStarted -import no.iktdev.mediaprocessing.shared.kafka.dto.Status -import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.Named -import org.junit.jupiter.api.Test -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource -import org.mockito.Mock -import org.skyscreamer.jsonassert.JSONAssert -import org.springframework.beans.factory.annotation.Autowired -import java.io.File - -class BaseInfoFromFileTest { - - @Autowired - private lateinit var testBase: KafkaTestBase - - @Mock - lateinit var coordinatorProducer: CoordinatorProducer - - - - val baseInfoFromFile = BaseInfoFromFile(coordinatorProducer) - - @Test - fun testReadFileInfo() { - val input = ProcessStarted( - Status.COMPLETED, ProcessType.FLOW, - File("/var/cache/[POTATO] Kage no Jitsuryokusha ni Naritakute! S2 - 01 [h265].mkv").absolutePath - ) - - val result = baseInfoFromFile.readFileInfo(input) - assertThat(result).isInstanceOf(BaseInfoPerformed::class.java) - val asResult = result as BaseInfoPerformed - assertThat(result.status).isEqualTo(Status.COMPLETED) - assertThat(asResult.title).isEqualTo("Kage no Jitsuryokusha ni Naritakute!") - assertThat(asResult.sanitizedName).isEqualTo("Kage no Jitsuryokusha ni Naritakute! S2 - 01") - } - - @ParameterizedTest - @MethodSource("names") - fun test(data: TestInfo) { - val gson = Gson() - val result = baseInfoFromFile.readFileInfo(data.input) - JSONAssert.assertEquals( - data.expected, - gson.toJson(result), - false - ) - } - - data class TestInfo( - val input: ProcessStarted, - val expected: String - ) - - companion object { - @JvmStatic - private fun names(): List> { - return listOf( - Named.of( - "Potato", TestInfo( - ProcessStarted( - Status.COMPLETED, ProcessType.FLOW, - "E:\\input\\Top Clown Findout.1080p.H264.AAC5.1.mkv" - ), - """ - { - "status": "COMPLETED", - "title": "Top Clown Findout", - "sanitizedName": "Top Clown Findout" - } - """.trimIndent() - ) - ), - Named.of("Filename with UHD wild tag", TestInfo( - ProcessStarted( - Status.COMPLETED, ProcessType.FLOW, - "E:\\input\\Wicked.Potato.Chapter.1.2023.UHD.BluRay.2160p.DDP.7.1.DV.HDR.x265.mp4" - ), - """ - { - "status": "COMPLETED", - "title": "Wicked Potato Chapter 1", - "sanitizedName": "Wicked Potato Chapter 1" - } - """.trimIndent() - ) - ) - ) - } - } - -}*/ \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMappingTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMappingTest.kt index e4301fec..2c9ee302 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMappingTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/tasksV2/mapping/EncodeWorkArgumentsMappingTest.kt @@ -20,7 +20,6 @@ class EncodeWorkArgumentsMappingTest { val parser = EncodeWorkArgumentsMapping( "potato.mkv", "potato.mp4", - File(".\\potato.mp4"), event.az()!!.data!!, EncodingPreference(VideoPreference(), AudioPreference()) ) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegTaskService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegTaskService.kt index df4fb982..71ee8d69 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegTaskService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/ffmpeg/FfmpegTaskService.kt @@ -2,8 +2,10 @@ package no.iktdev.mediaprocessing.processer.ffmpeg import kotlinx.coroutines.cancel import mu.KLogger +import no.iktdev.exfl.using import no.iktdev.mediaprocessing.processer.taskManager import no.iktdev.mediaprocessing.shared.common.ClaimableTask +import no.iktdev.mediaprocessing.shared.common.SharedConfig import no.iktdev.mediaprocessing.shared.common.TaskQueueListener import no.iktdev.mediaprocessing.shared.common.getComputername import no.iktdev.mediaprocessing.shared.common.services.TaskService @@ -18,6 +20,10 @@ abstract class FfmpegTaskService: TaskService(), FfmpegListener { protected var runner: FfmpegRunner? = null + fun getTemporaryStoreFile(fileName: String): File { + return SharedConfig.cachedContent.using(fileName) + } + override fun onTaskAvailable(data: ClaimableTask) { if (runner?.isWorking() == true) { //log.info { "Worker is already running.., will not consume" } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt index 7a682ac5..f3007856 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/EncodeService.kt @@ -62,7 +62,7 @@ class EncodeService( fun startEncode(event: Task) { val ffwrc = event.data as EncodeArgumentData - val outFile = File(ffwrc.outputFile) + val outFile = getTemporaryStoreFile(ffwrc.outputFileName) outFile.parentFile.mkdirs() if (!logDir.exists()) { logDir.mkdirs() @@ -75,7 +75,7 @@ class EncodeService( log.info { "Claim successful for ${event.referenceId} encode" } runner = FfmpegRunner( inputFile = ffwrc.inputFile, - outputFile = ffwrc.outputFile, + outputFile = outFile.absolutePath, arguments = ffwrc.arguments, logDir = logDir, listener = this ) @@ -83,7 +83,7 @@ class EncodeService( if (ffwrc.arguments.firstOrNull() != "-y") { this.onError( ffwrc.inputFile, - "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outputFile}" + "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outFile.absolutePath}" ) // Setting consumed to prevent spamming taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR) diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt index d6ef246d..987fdfd6 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/services/ExtractService.kt @@ -15,6 +15,7 @@ import no.iktdev.mediaprocessing.shared.common.limitedWhile import no.iktdev.mediaprocessing.shared.common.database.cal.Status import no.iktdev.mediaprocessing.shared.common.task.Task import no.iktdev.mediaprocessing.shared.common.contract.data.ExtractArgumentData +import no.iktdev.mediaprocessing.shared.common.contract.data.ExtractWorkCreatedEvent import no.iktdev.mediaprocessing.shared.common.contract.data.ExtractWorkPerformedEvent import no.iktdev.mediaprocessing.shared.common.contract.data.ExtractedData import no.iktdev.mediaprocessing.shared.common.contract.dto.ProcesserEventInfo @@ -60,9 +61,7 @@ class ExtractService( fun startExtract(event: Task) { val ffwrc = event.data as ExtractArgumentData - val outFile = File(ffwrc.outputFile).also { - it.parentFile.mkdirs() - } + val outputFile = getTemporaryStoreFile(ffwrc.outputFileName) if (!logDir.exists()) { logDir.mkdirs() } @@ -72,16 +71,16 @@ class ExtractService( log.info { "Claim successful for ${event.referenceId} extract" } runner = FfmpegRunner( inputFile = ffwrc.inputFile, - outputFile = ffwrc.outputFile, + outputFile = outputFile.absolutePath, arguments = ffwrc.arguments, logDir = logDir, listener = this ) - if (outFile.exists()) { + if (outputFile.exists()) { if (ffwrc.arguments.firstOrNull() != "-y") { this.onError( ffwrc.inputFile, - "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${ffwrc.outputFile}" + "${this::class.java.simpleName} identified the file as already existing, either allow overwrite or delete the offending file: ${outputFile.absolutePath}" ) // Setting consumed to prevent spamming taskManager.markTaskAsCompleted(event.referenceId, event.eventId, Status.ERROR) @@ -102,6 +101,8 @@ class ExtractService( override fun onCompleted(inputFile: String, outputFile: String) { val task = assignedTask ?: return + assert(task.data is ExtractArgumentData) { "Wrong data type found!" } + val taskData = task.data as ExtractArgumentData log.info { "Extract completed for ${task.referenceId}" } runBlocking { var successfulComplete = false @@ -119,6 +120,7 @@ class ExtractService( source = getProducerName() ), data = ExtractedData( + taskData.language, outputFile ) ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt index 60b7bc34..514525d3 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/SharedConfig.kt @@ -6,6 +6,7 @@ import java.io.File object SharedConfig { var incomingContent: File = if (!System.getenv("DIRECTORY_CONTENT_INCOMING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_INCOMING")) else File("/src/input") + var cachedContent: File = if (!System.getenv("DIRECTORY_CONTENT_CACHE").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_CACHE")) else File("/src/cache") val outgoingContent: File = if (!System.getenv("DIRECTORY_CONTENT_OUTGOING").isNullOrBlank()) File(System.getenv("DIRECTORY_CONTENT_OUTGOING")) else File("/src/output") val ffprobe: String = System.getenv("SUPPORTING_EXECUTABLE_FFPROBE") ?: "ffprobe" diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt index 2e2d6e48..78a6ccab 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/Utils.kt @@ -5,9 +5,14 @@ import mu.KotlinLogging import java.io.File import java.io.RandomAccessFile import java.net.InetAddress +import java.util.zip.CRC32 private val logger = KotlinLogging.logger {} +fun File.notExist(): Boolean { + return !this.exists() +} + fun isFileAvailable(file: File): Boolean { if (!file.exists()) return false var stream: RandomAccessFile? = null @@ -59,4 +64,66 @@ fun silentTry(code: () -> Unit) { try { code.invoke() } catch (_: Exception) {} +} + +fun File.getCRC32(): Long { + val crc = CRC32() + this.inputStream().use { input -> + val buffer = ByteArray(1024) + var bytesRead: Int + while (input.read(buffer).also { bytesRead = it } != -1) { + crc.update(buffer, 0, bytesRead) + } + } + return crc.value +} + +fun File.moveTo(destinationFile: File, onProgress: (Double) -> Unit = {}): Boolean { + require(this.exists()) + require(destinationFile.notExist()) + val tempDestinationFile = File(destinationFile.parentFile, "${destinationFile.name}.tmp") + + + val success: Boolean = run { + try { + val totalBytes = this.length() + var copiedBytes = 0L + + this.inputStream().use { input -> + tempDestinationFile.outputStream().use { output -> + val buffer = ByteArray(1024) + var bytesRead: Int + while (input.read(buffer).also { bytesRead = it } != -1) { + output.write(buffer, 0, bytesRead) + copiedBytes += bytesRead + onProgress(copiedBytes.toDouble() / totalBytes * 100) + } + } + } + true + } catch (e: Exception) { + e.printStackTrace() + false + } + } + + if (!success) { + return false + } + + val sourceHash = this.getCRC32() + val tempFileHash = tempDestinationFile.getCRC32() + + if (sourceHash == tempFileHash) { + if (!tempDestinationFile.renameTo(destinationFile)) { + logger.error { "${tempDestinationFile.name} failed to rename to ${destinationFile.name}" } + return false + } + this.delete() + } else { + logger.error { "${tempDestinationFile.name} failed integrity check" } + return false + } + + return true } \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkCreatedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkCreatedEvent.kt index c0153abc..e898b719 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkCreatedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkCreatedEvent.kt @@ -14,6 +14,7 @@ data class ConvertWorkCreatedEvent( data class ConvertData( override val inputFile: String, + val language: String, val outputDirectory: String, val outputFileName: String, val formats: List = emptyList(), diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkPerformed.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkPerformed.kt index 35780415..1efe16cd 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkPerformed.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ConvertWorkPerformed.kt @@ -12,5 +12,6 @@ class ConvertWorkPerformed( } data class ConvertedData( + val language: String, val outputFiles: List ) diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/EncodeArgumentCreatedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/EncodeArgumentCreatedEvent.kt index 72fed4e7..0bd68a00 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/EncodeArgumentCreatedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/EncodeArgumentCreatedEvent.kt @@ -14,6 +14,6 @@ data class EncodeArgumentCreatedEvent( data class EncodeArgumentData( val arguments: List, - val outputFile: String, + val outputFileName: String, override val inputFile: String ): TaskData() \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractArgumentCreatedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractArgumentCreatedEvent.kt index cc805907..cf257c26 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractArgumentCreatedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractArgumentCreatedEvent.kt @@ -13,6 +13,7 @@ data class ExtractArgumentCreatedEvent( data class ExtractArgumentData( val arguments: List, - val outputFile: String, + val language: String, + val outputFileName: String, override val inputFile: String ): TaskData() \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractWorkPerformedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractWorkPerformedEvent.kt index 51cf22e9..1bb82740 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractWorkPerformedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/ExtractWorkPerformedEvent.kt @@ -12,5 +12,6 @@ data class ExtractWorkPerformedEvent( } data class ExtractedData( + val language: String, val outputFile: String ) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaCoverInfoReceivedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaCoverInfoReceivedEvent.kt index 4d215503..296e9fb1 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaCoverInfoReceivedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaCoverInfoReceivedEvent.kt @@ -12,6 +12,5 @@ data class MediaCoverInfoReceivedEvent( data class CoverDetails( val url: String, - val outDir: String, val outFileBaseName: String, ) \ No newline at end of file diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaOutInformationConstructedEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaOutInformationConstructedEvent.kt index 7bbdeb39..d15bcc41 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaOutInformationConstructedEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/contract/data/MediaOutInformationConstructedEvent.kt @@ -14,7 +14,6 @@ data class MediaOutInformationConstructedEvent( data class MediaInfoReceived( val info: JsonObject, - val outDirectory: String, ) { fun toValueObject(): MediaInfo? { val type = info.get("type").asString diff --git a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/MockEventManager.kt b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/MockEventManager.kt index 77e7f0f7..3f777ed1 100644 --- a/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/MockEventManager.kt +++ b/shared/eventi/src/test/kotlin/no/iktdev/eventi/mock/MockEventManager.kt @@ -7,6 +7,10 @@ import org.springframework.stereotype.Component @Component class MockEventManager(dataSource: MockDataSource = MockDataSource()) : EventsManagerImpl(dataSource) { val events: MutableList = mutableListOf() + override fun getAvailableReferenceIds(): List { + throw RuntimeException("Not implemented") + } + override fun readAvailableEvents(): List> { return listOf(events) }