From ed6bddb95fb5190dc06003a9f184782ecbdae75c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Brage=20Skj=C3=B8nborg?= Date: Sat, 31 Jan 2026 20:53:10 +0100 Subject: [PATCH] Fixed state reporting and producing --- .../mediaprocessing/converter/TaskPoller.kt | 4 +- .../listeners/ConvertTaskListener.kt | 14 + .../listeners/ConvertTaskListenerTest.kt | 4 +- .../mediaprocessing/coordinator/TaskPoller.kt | 4 +- .../tasks/DownloadCoverTaskListener.kt | 13 + .../tasks/MediaStreamReadTaskListener.kt | 13 + .../MigrateContentToStoreTaskListener.kt | 229 ++++++++++---- .../StoreContentAndMetadataTaskListener.kt | 51 ++- .../MigrateCreateStoreTaskListenerTest.kt | 34 +- .../StoreContentAndMetadataListenerTest.kt | 34 +- .../tasks/DownloadCoverTaskListenerTest.kt | 4 +- .../tasks/MediaStreamReadTaskListenerTest.kt | 4 +- .../MigrateContentToStoreTaskListenerTest.kt | 297 +++++++++--------- ...StoreContentAndMetadataTaskListenerTest.kt | 106 +++++-- .../mediaprocessing/processer/TaskPoller.kt | 4 +- .../listeners/SubtitleTaskListener.kt | 13 + .../processer/listeners/VideoTaskListener.kt | 14 + .../listeners/SubtitleTaskListenerTest.kt | 4 +- .../listeners/VideoTaskListenerTest.kt | 4 +- gradle/libs.versions.toml | 2 +- .../MigrateContentToStoreTaskResultEvent.kt | 13 +- .../common/projection/StoreProjection.kt | 7 +- 22 files changed, 554 insertions(+), 318 deletions(-) diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt index f1d43d54..946e88ab 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/TaskPoller.kt @@ -53,11 +53,11 @@ class DefaultTaskReporter() : TaskReporter { } - override fun markFailed(taskId: UUID) { + override fun markFailed(referenceId: UUID, taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Failed) } - override fun markCancelled(taskId: UUID) { + override fun markCancelled(referenceId: UUID, taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Cancelled) } diff --git a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListener.kt b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListener.kt index b4e040b2..2fc97013 100644 --- a/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListener.kt +++ b/apps/converter/src/main/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListener.kt @@ -33,6 +33,7 @@ open class ConvertTaskListener(): TaskListener(TaskType.CPU_INTENSIVE) { return task is ConvertTask } + override suspend fun onTask(task: Task): Event? { if (task !is ConvertTask) { throw IllegalArgumentException("Invalid task type: ${task::class.java.name}") @@ -68,6 +69,19 @@ open class ConvertTaskListener(): TaskListener(TaskType.CPU_INTENSIVE) { } } + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return ConvertTaskResultEvent(null, status, error = message) + } + open fun getConverter(): Converter { return Converter2(getConverterEnvironment(), getListener()) } diff --git a/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt b/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt index 0b7aa85d..afb0edad 100644 --- a/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt +++ b/apps/converter/src/test/kotlin/no/iktdev/mediaprocessing/converter/listeners/ConvertTaskListenerTest.kt @@ -108,8 +108,8 @@ class ConvertTaskListenerTest { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} override fun markCompleted(taskId: UUID) {} - override fun markFailed(taskId: UUID) {} - override fun markCancelled(taskId: UUID) {} + override fun markFailed(referenceId: UUID, taskId: UUID) {} + override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt index 88983326..5328ae9d 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/TaskPoller.kt @@ -52,11 +52,11 @@ class DefaultTaskReporter() : TaskReporter { TaskStore.markConsumed(taskId, TaskStatus.Completed) } - override fun markFailed(taskId: UUID) { + override fun markFailed(referenceId: UUID, taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Failed) } - override fun markCancelled(taskId: UUID) { + override fun markCancelled(referenceId: UUID, taskId: UUID) { TaskStore.markConsumed(taskId, TaskStatus.Cancelled) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt index 554b7fa4..ad38d1ae 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt @@ -58,6 +58,19 @@ class DownloadCoverTaskListener( } } + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return CoverDownloadResultEvent(null, status, error = message) + } + open fun getDownloadClient(): DownloadClient { return DefaultDownloadClient(coordinatorEnv) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt index 89d8e649..0ac227d9 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt @@ -49,6 +49,19 @@ class MediaStreamReadTaskListener( } } + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return CoordinatorReadStreamsResultEvent(null, status, error = message) + } + override fun getFfprobe(): FFprobe { return JsonFfinfo(coordinatorEnv.ffprobe) } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt index 603220df..4f07b427 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt @@ -28,61 +28,89 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { override suspend fun onTask(task: Task): Event? { val pickedTask = task as? MigrateToContentStoreTask ?: return null - val fs = getFileSystemService() + // Disse vil kaste exceptions hvis noe går galt val videoStatus = migrateVideo(fs, pickedTask.data.videoContent) val subtitleStatus = migrateSubtitle(fs, pickedTask.data.subtitleContent ?: emptyList()) val coverStatus = migrateCover(fs, pickedTask.data.coverContent ?: emptyList()) - var status = TaskStatus.Completed - if (videoStatus.status != MigrateStatus.Failed && - subtitleStatus.none { it.status == MigrateStatus.Failed } && - coverStatus.none { it.status == MigrateStatus.Failed }) - { - pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.let { - silentTry { fs.delete(it) } - } - pickedTask.data.subtitleContent?.map { File(it.cachedUri) }?.forEach { - silentTry { fs.delete(it) } - } - pickedTask.data.coverContent?.map { File(it.cachedUri) }?.forEach { - silentTry { fs.delete(it) } - } - } else { - status = TaskStatus.Failed - } + // Hvis vi kommer hit, har ingen migrering kastet exceptions → alt OK + deleteCache(fs, pickedTask) - - val completedEvent = MigrateContentToStoreTaskResultEvent( - status = status, - collection = pickedTask.data.collection, - videoMigrate = videoStatus, - subtitleMigrate = subtitleStatus, - coverMigrate = coverStatus + return MigrateContentToStoreTaskResultEvent( + status = TaskStatus.Completed, + migrateData = MigrateContentToStoreTaskResultEvent.MigrateData( + collection = pickedTask.data.collection, + videoMigrate = videoStatus, + subtitleMigrate = subtitleStatus, + coverMigrate = coverStatus + ) ).producedFrom(task) - - return completedEvent } - @VisibleForTesting - internal fun migrateVideo(fs: FileSystemService, videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration { - if (videoContent == null) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent) + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return MigrateContentToStoreTaskResultEvent(null, status, error = message) + } + + private fun deleteCache(fs: FileSystemService, task: MigrateToContentStoreTask) { + task.data.videoContent?.cachedUri?.let { silentTry { fs.delete(File(it)) } } + task.data.subtitleContent?.forEach { silentTry { fs.delete(File(it.cachedUri)) } } + task.data.coverContent?.forEach { silentTry { fs.delete(File(it.cachedUri)) } } + } + + + internal fun migrateVideo( + fs: FileSystemService, + videoContent: MigrateToContentStoreTask.Data.SingleContent? + ): MigrateContentToStoreTaskResultEvent.FileMigration { + + if (videoContent == null) { + return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent) + } + val source = File(videoContent.cachedUri) val destination = File(videoContent.storeUri) - return try { - if (!fs.copy(source, destination)) { - return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) - } - if (!fs.areIdentical(source, destination)) { - return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) + // 1. Hvis destinasjonen finnes, sjekk identitet + if (destination.exists()) { + if (fs.areIdentical(source, destination)) { + // Skip – allerede migrert + return MigrateContentToStoreTaskResultEvent.FileMigration( + destination.absolutePath, + MigrateStatus.Completed + ) + } else { + throw IllegalStateException( + "Destination file already exists but is not identical: $destination" + ) } - - MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed) - } catch (e: Exception) { - MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) } + + // 2. Utfør kopiering + if (!fs.copy(source, destination)) { + throw IllegalStateException("File could not be copied to: $destination from $source") + } + + // 3. Verifiser kopien (optional) + if (!fs.areIdentical(source, destination)) { + throw IllegalStateException("Copied file is not identical to source: $destination") + } + + return MigrateContentToStoreTaskResultEvent.FileMigration( + destination.absolutePath, + MigrateStatus.Completed + ) } @VisibleForTesting @@ -90,53 +118,116 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { fs: FileSystemService, subtitleContents: List ): List { - if (subtitleContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.SubtitleMigration(null, null, MigrateStatus.NotPresent)) - val results = mutableListOf() - for (subtitle in subtitleContents) { + + if (subtitleContents.isEmpty()) { + return listOf( + MigrateContentToStoreTaskResultEvent.SubtitleMigration( + language = null, + storedUri = null, + status = MigrateStatus.NotPresent + ) + ) + } + + return subtitleContents.map { subtitle -> val source = File(subtitle.cachedUri) val destination = File(subtitle.storeUri) - try { - if (!fs.copy(source, destination)) { - results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed)) - continue - } - if (!fs.areIdentical(source, destination)) { - results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed)) + // 1. Hvis destinasjonen finnes + if (destination.exists()) { + if (fs.areIdentical(source, destination)) { + return@map MigrateContentToStoreTaskResultEvent.SubtitleMigration( + subtitle.language, + destination.absolutePath, + MigrateStatus.Completed + ) } else { - results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Completed)) + throw IllegalStateException( + "Destination subtitle exists but is not identical: ${destination.absolutePath}" + ) } - } catch (e: Exception) { - results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Failed)) } + + // 2. Kopier + if (!fs.copy(source, destination)) { + throw IllegalStateException( + "Failed to copy subtitle ${subtitle.language} from $source to $destination" + ) + } + + // 3. Verifiser + if (!fs.areIdentical(source, destination)) { + throw IllegalStateException( + "Copied subtitle ${subtitle.language} is not identical: ${destination.absolutePath}" + ) + } + + // 4. OK + MigrateContentToStoreTaskResultEvent.SubtitleMigration( + subtitle.language, + destination.absolutePath, + MigrateStatus.Completed + ) } - return results } @VisibleForTesting - internal fun migrateCover(fs: FileSystemService, coverContents: List): List { - if (coverContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)) - val results = mutableListOf() - for (cover in coverContents) { + internal fun migrateCover( + fs: FileSystemService, + coverContents: List + ): List { + + if (coverContents.isEmpty()) { + return listOf( + MigrateContentToStoreTaskResultEvent.FileMigration( + storedUri = null, + status = MigrateStatus.NotPresent + ) + ) + } + + return coverContents.map { cover -> val source = File(cover.cachedUri) val destination = File(cover.storeUri) - try { - if (!fs.copy(source, destination)) { - results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) - continue - } - if (!fs.areIdentical(source, destination)) { - results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) + + // 1. Hvis destinasjonen finnes + if (destination.exists()) { + if (fs.areIdentical(source, destination)) { + return@map MigrateContentToStoreTaskResultEvent.FileMigration( + destination.absolutePath, + MigrateStatus.Completed + ) } else { - results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed)) + throw IllegalStateException( + "Destination cover exists but is not identical: ${destination.absolutePath}" + ) } - } catch (e: Exception) { - results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) } + + // 2. Kopier + if (!fs.copy(source, destination)) { + throw IllegalStateException( + "Failed to copy cover from $source to $destination" + ) + } + + // 3. Verifiser + if (!fs.areIdentical(source, destination)) { + throw IllegalStateException( + "Copied cover is not identical: ${destination.absolutePath}" + ) + } + + // 4. OK + MigrateContentToStoreTaskResultEvent.FileMigration( + destination.absolutePath, + MigrateStatus.Completed + ) } - return results } + + open fun getFileSystemService(): FileSystemService { return DefaultFileSystemService() } diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListener.kt index 31868008..7cfa6d93 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListener.kt @@ -6,7 +6,6 @@ import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskListener import no.iktdev.eventi.tasks.TaskType import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent -import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.HttpEntity @@ -15,10 +14,11 @@ import org.springframework.http.HttpMethod import org.springframework.http.MediaType import org.springframework.stereotype.Component import org.springframework.web.client.RestTemplate -import java.util.UUID +import java.util.* @Component -class StoreContentAndMetadataTaskListener: TaskListener(TaskType.MIXED) { +class StoreContentAndMetadataTaskListener : TaskListener(TaskType.MIXED) { + @Autowired lateinit var streamitRestTemplate: RestTemplate @@ -36,21 +36,38 @@ class StoreContentAndMetadataTaskListener: TaskListener(TaskType.MIXED) { val headers = HttpHeaders().apply { contentType = MediaType.APPLICATION_JSON } val entity = HttpEntity(pickedTask.data, headers) - val response = try { - val res = streamitRestTemplate.exchange( - "open/api/mediaprocesser/import", - HttpMethod.POST, - entity, - Void::class.java, - ) - res.statusCode.is2xxSuccessful - } catch (e: Exception) { - false + // ❗ Ikke fang exceptions — la TaskListener håndtere dem + val response = streamitRestTemplate.exchange( + "/open/api/mediaprocesser/import", + HttpMethod.POST, + entity, + Void::class.java, + ) + + if (!response.statusCode.is2xxSuccessful) { + throw IllegalStateException("StreamIt returned ${response.statusCode}") + } + + // Hvis vi kommer hit → alt OK + return StoreContentAndMetadataTaskResultEvent( + status = TaskStatus.Completed + ).producedFrom(task) + } + + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" } return StoreContentAndMetadataTaskResultEvent( - if (response) TaskStatus.Completed else TaskStatus.Failed - ) + status = status, + error = message + ).producedFrom(task) } - -} \ No newline at end of file +} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt index 48479550..51555abe 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/MigrateCreateStoreTaskListenerTest.kt @@ -279,25 +279,27 @@ class MigrateCreateStoreTaskListenerTest : TestBase() { subtitleUris: List ) = MigrateContentToStoreTaskResultEvent( status = TaskStatus.Completed, - collection = collection, - videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration( - storedUri = videoUri, - status = if (videoUri != null) MigrateStatus.Completed else MigrateStatus.Failed - ), - subtitleMigrate = subtitleUris.map { - MigrateContentToStoreTaskResultEvent.SubtitleMigration( - language = "en", - storedUri = it, - status = MigrateStatus.Completed - ) - }, - coverMigrate = listOfNotNull( - coverUri?.let { - MigrateContentToStoreTaskResultEvent.FileMigration( + migrateData = MigrateContentToStoreTaskResultEvent.MigrateData( + collection = collection, + videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration( + storedUri = videoUri, + status = if (videoUri != null) MigrateStatus.Completed else MigrateStatus.Failed + ), + subtitleMigrate = subtitleUris.map { + MigrateContentToStoreTaskResultEvent.SubtitleMigration( + language = "en", storedUri = it, status = MigrateStatus.Completed ) - } + }, + coverMigrate = listOfNotNull( + coverUri?.let { + MigrateContentToStoreTaskResultEvent.FileMigration( + storedUri = it, + status = MigrateStatus.Completed + ) + } + ) ) ) } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListenerTest.kt index 61ddb1a7..3850d7b2 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/events/StoreContentAndMetadataListenerTest.kt @@ -153,25 +153,27 @@ class StoreContentAndMetadataListenerTest : TestBase() { ): MigrateContentToStoreTaskResultEvent { return MigrateContentToStoreTaskResultEvent( status = status, - collection = collection, - videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration( - storedUri = videoUri, - status = if (videoUri != null) MigrateStatus.Completed else MigrateStatus.Failed - ), - subtitleMigrate = subtitleUris.map { - MigrateContentToStoreTaskResultEvent.SubtitleMigration( - language = "en", - storedUri = it, - status = MigrateStatus.Completed - ) - }, - coverMigrate = listOfNotNull( - coverUri?.let { - MigrateContentToStoreTaskResultEvent.FileMigration( + migrateData = MigrateContentToStoreTaskResultEvent.MigrateData( + collection = collection, + videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration( + storedUri = videoUri, + status = if (videoUri != null) MigrateStatus.Completed else MigrateStatus.Failed + ), + subtitleMigrate = subtitleUris.map { + MigrateContentToStoreTaskResultEvent.SubtitleMigration( + language = "en", storedUri = it, status = MigrateStatus.Completed ) - } + }, + coverMigrate = listOfNotNull( + coverUri?.let { + MigrateContentToStoreTaskResultEvent.FileMigration( + storedUri = it, + status = MigrateStatus.Completed + ) + } + ) ) ) } diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt index 42c2d0ff..9c7802ef 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt @@ -39,8 +39,8 @@ class DownloadCoverTaskListenerTest: TestBase() { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} override fun markCompleted(taskId: UUID) {} - override fun markCancelled(taskId: UUID) {} - override fun markFailed(taskId: UUID) {} + override fun markCancelled(referenceId: UUID, taskId: UUID) {} + override fun markFailed(referenceId: UUID, taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) {} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt index 07be4afe..c276550e 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt @@ -45,8 +45,8 @@ class MediaStreamReadTaskListenerTest: TestBase() { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} override fun markCompleted(taskId: UUID) {} - override fun markFailed(taskId: UUID) {} - override fun markCancelled(taskId: UUID) {} + override fun markFailed(referenceId: UUID, taskId: UUID) {} + override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt index d33b2923..0cd90759 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt @@ -1,30 +1,58 @@ package no.iktdev.mediaprocessing.coordinator.listeners.tasks import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.MockFileSystemService import no.iktdev.mediaprocessing.coordinator.util.FileSystemService import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows +import java.util.* class MigrateContentToStoreTaskListenerTest { - class MigrateContentToStoreTaskListenerTestImplementation: MigrateContentToStoreTaskListener() { + // ------------------------------------------------------------------------- + // Fake Reporter + // ------------------------------------------------------------------------- - var fs: FileSystemService? = null - override fun getFileSystemService(): FileSystemService { - return fs!! + class FakeTaskReporter : TaskReporter { + val events = mutableListOf() + var completed = false + var failed = false + + override fun markClaimed(taskId: UUID, workerId: String) {} + override fun updateLastSeen(taskId: UUID) {} + override fun markCompleted(taskId: UUID) { completed = true } + override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true } + override fun markCancelled(referenceId: UUID, taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} + override fun log(taskId: UUID, message: String) {} + + override fun publishEvent(event: Event) { + events.add(event) } } - val listener = MigrateContentToStoreTaskListenerTestImplementation() + // ------------------------------------------------------------------------- + // Listener with injectable FS + // ------------------------------------------------------------------------- + + class TestListener : MigrateContentToStoreTaskListener() { + var fs: FileSystemService? = null + override fun getFileSystemService(): FileSystemService = fs!! + } + + private val listener = TestListener() // ------------------------------------------------------------------------- - // migrateVideo + // migrateVideo (direct tests) // ------------------------------------------------------------------------- @Test @@ -37,19 +65,14 @@ class MigrateContentToStoreTaskListenerTest { """ ) fun migrateVideo_success() { - val fs = MockFileSystemService().also { - listener.fs = it - } + val fs = MockFileSystemService().also { listener.fs = it } - val content = MigrateToContentStoreTask.Data.SingleContent( - cachedUri = "/tmp/source.mp4", - storeUri = "/tmp/dest.mp4" - ) + val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest") val result = listener.migrateVideo(fs, content) assertEquals(MigrateStatus.Completed, result.status) - assertEquals("/tmp/dest.mp4", result.storedUri) + assertEquals("/tmp/dest", result.storedUri) assertEquals(1, fs.copied.size) } @@ -59,47 +82,36 @@ class MigrateContentToStoreTaskListenerTest { Når migrateVideo kjøres Hvis copy feiler Så: - returneres Failed + kastes exception """ ) fun migrateVideo_copyFails() { - val fs = MockFileSystemService().apply { copyShouldFail = true }.also { - listener.fs = it + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it } + + val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest") + + assertThrows { + listener.migrateVideo(fs, content) } - - - val content = MigrateToContentStoreTask.Data.SingleContent( - cachedUri = "/tmp/source.mp4", - storeUri = "/tmp/dest.mp4" - ) - - val result = listener.migrateVideo(fs, content) - - assertEquals(MigrateStatus.Failed, result.status) } @Test @DisplayName( """ Når migrateVideo kjøres - Hvis copy lykkes men filene ikke er identiske + Hvis filene ikke er identiske Så: - returneres Failed + kastes exception """ ) fun migrateVideo_mismatch() { - val fs = MockFileSystemService().apply { identical = false }.also { - listener.fs = it + val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it } + + val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest") + + assertThrows { + listener.migrateVideo(fs, content) } - - val content = MigrateToContentStoreTask.Data.SingleContent( - cachedUri = "/tmp/source.mp4", - storeUri = "/tmp/dest.mp4" - ) - - val result = listener.migrateVideo(fs, content) - - assertEquals(MigrateStatus.Failed, result.status) } @Test @@ -112,9 +124,7 @@ class MigrateContentToStoreTaskListenerTest { """ ) fun migrateVideo_null() { - val fs = MockFileSystemService().also { - listener.fs = it - } + val fs = MockFileSystemService().also { listener.fs = it } val result = listener.migrateVideo(fs, null) @@ -122,7 +132,7 @@ class MigrateContentToStoreTaskListenerTest { } // ------------------------------------------------------------------------- - // migrateSubtitle + // migrateSubtitle (direct tests) // ------------------------------------------------------------------------- @Test @@ -135,9 +145,7 @@ class MigrateContentToStoreTaskListenerTest { """ ) fun migrateSubtitle_empty() { - val fs = MockFileSystemService().also { - listener.fs = it - } + val fs = MockFileSystemService().also { listener.fs = it } val result = listener.migrateSubtitle(fs, emptyList()) @@ -155,15 +163,9 @@ class MigrateContentToStoreTaskListenerTest { """ ) fun migrateSubtitle_success() { - val fs = MockFileSystemService().also { - listener.fs = it - } + val fs = MockFileSystemService().also { listener.fs = it } - val sub = MigrateToContentStoreTask.Data.SingleSubtitle( - language = "en", - cachedUri = "/tmp/a.srt", - storeUri = "/tmp/b.srt" - ) + val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b") val result = listener.migrateSubtitle(fs, listOf(sub)) @@ -174,34 +176,42 @@ class MigrateContentToStoreTaskListenerTest { @DisplayName( """ Når migrateSubtitle kjøres - Hvis én lykkes og én feiler + Hvis filene ikke er identiske Så: - returneres både Completed og Failed + kastes exception """ ) - fun migrateSubtitle_mixed() { - val fs = MockFileSystemService().apply { - // first OK, second fails - copyShouldFail = false - }.also { listener.fs = it } + fun migrateSubtitle_mismatch() { + val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it } - val subs = listOf( - MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b"), - MigrateToContentStoreTask.Data.SingleSubtitle("no", "/tmp/c", "/tmp/d") - ) + val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b") - // simulate second failing - fs.copyShouldFail = false - val result1 = listener.migrateSubtitle(fs, listOf(subs[0])) - fs.copyShouldFail = true - val result2 = listener.migrateSubtitle(fs, listOf(subs[1])) + assertThrows { + listener.migrateSubtitle(fs, listOf(sub)) + } + } - assertEquals(MigrateStatus.Completed, result1.first().status) - assertEquals(MigrateStatus.Failed, result2.first().status) + @Test + @DisplayName( + """ + Når migrateSubtitle kjøres + Hvis copy feiler + Så: + kastes exception + """ + ) + fun migrateSubtitle_copyFails() { + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it } + + val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b") + + assertThrows { + listener.migrateSubtitle(fs, listOf(sub)) + } } // ------------------------------------------------------------------------- - // migrateCover + // migrateCover (direct tests) // ------------------------------------------------------------------------- @Test @@ -214,14 +224,9 @@ class MigrateContentToStoreTaskListenerTest { """ ) fun migrateCover_success() { - val fs = MockFileSystemService().also { - listener.fs = it - } + val fs = MockFileSystemService().also { listener.fs = it } - val cover = MigrateToContentStoreTask.Data.SingleContent( - cachedUri = "/tmp/c.jpg", - storeUri = "/tmp/c2.jpg" - ) + val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2") val result = listener.migrateCover(fs, listOf(cover)) @@ -232,51 +237,60 @@ class MigrateContentToStoreTaskListenerTest { @DisplayName( """ Når migrateCover kjøres - Hvis flere covers og én feiler + Hvis filene ikke er identiske Så: - returneres både Completed og Failed + kastes exception """ ) - fun migrateCover_mixed() { - val fs = MockFileSystemService().also { listener.fs = it } + fun migrateCover_mismatch() { + val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it } - val covers = listOf( - MigrateToContentStoreTask.Data.SingleContent("/tmp/a", "/tmp/b"), - MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/d") - ) + val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2") - // first OK, second mismatch - fs.identical = true - val ok = listener.migrateCover(fs, listOf(covers[0])) + assertThrows { + listener.migrateCover(fs, listOf(cover)) + } + } - fs.identical = false - val fail = listener.migrateCover(fs, listOf(covers[1])) + @Test + @DisplayName( + """ + Når migrateCover kjøres + Hvis copy feiler + Så: + kastes exception + """ + ) + fun migrateCover_copyFails() { + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it } - assertEquals(MigrateStatus.Completed, ok.first().status) - assertEquals(MigrateStatus.Failed, fail.first().status) + val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2") + + assertThrows { + listener.migrateCover(fs, listOf(cover)) + } } // ------------------------------------------------------------------------- - // onTask + // accept() — full TaskListener flow // ------------------------------------------------------------------------- @Test @DisplayName( """ - Når onTask kjøres + Når accept() kjøres Hvis alle migreringer lykkes Så: - returneres Completed-event og cache slettes + publiseres Completed-event og cache slettes """ ) - fun onTask_success() = runTest { - val fs = MockFileSystemService().also { - listener.fs = it - } + fun accept_success() = runTest { + val fs = MockFileSystemService().also { listener.fs = it } + val reporter = FakeTaskReporter() val task = MigrateToContentStoreTask( MigrateToContentStoreTask.Data( - collection = "col", + "col", videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), subtitleContent = listOf( MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/s", "/tmp/s2") @@ -287,8 +301,12 @@ class MigrateContentToStoreTaskListenerTest { ) ).newReferenceId() - val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent + listener.accept(task, reporter) + listener.currentJob?.join() + val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent + + assertTrue(reporter.completed) assertEquals(TaskStatus.Completed, event.status) assertEquals(3, fs.deleted.size) } @@ -296,28 +314,31 @@ class MigrateContentToStoreTaskListenerTest { @Test @DisplayName( """ - Når onTask kjøres - Hvis en migrering feiler + Når accept() kjøres + Hvis migrateVideo kaster exception Så: - returneres Failed-event og ingenting slettes + publiseres Failed-event og cache slettes ikke """ ) - fun onTask_failure() = runTest { - val fs = MockFileSystemService().apply { copyShouldFail = true }.also { - listener.fs = it - } + fun accept_failure() = runTest { + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it } + val reporter = FakeTaskReporter() val task = MigrateToContentStoreTask( MigrateToContentStoreTask.Data( - collection = "col", + "col", videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), subtitleContent = emptyList(), coverContent = emptyList() ) ).newReferenceId() - val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent + listener.accept(task, reporter) + listener.currentJob?.join() + val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent + + assertTrue(reporter.failed) assertEquals(TaskStatus.Failed, event.status) assertEquals(0, fs.deleted.size) } @@ -325,57 +346,31 @@ class MigrateContentToStoreTaskListenerTest { @Test @DisplayName( """ - Når onTask kjøres - Hvis video feiler - Så: - returneres Failed og ingenting slettes - """ - ) - fun onTask_videoFails() = runTest { - val fs = MockFileSystemService().apply { copyShouldFail = true } - .also { listener.fs = it } - - val task = MigrateToContentStoreTask( - MigrateToContentStoreTask.Data( - collection = "col", - videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), - subtitleContent = emptyList(), - coverContent = emptyList() - ) - ).newReferenceId() - - val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent - - assertEquals(TaskStatus.Failed, event.status) - assertEquals(0, fs.deleted.size) - } - - @Test - @DisplayName( - """ - Når onTask kjøres + Når accept() kjøres Hvis sletting feiler Så: - returneres fortsatt Completed + publiseres fortsatt Completed-event """ ) - fun onTask_deleteFails() = runTest { - val fs = MockFileSystemService().apply { deleteShouldFail = true } - .also { listener.fs = it } - + fun accept_deleteFails() = runTest { + val fs = MockFileSystemService().apply { deleteShouldFail = true }.also { listener.fs = it } + val reporter = FakeTaskReporter() val task = MigrateToContentStoreTask( MigrateToContentStoreTask.Data( - collection = "col", + "col", videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), subtitleContent = emptyList(), coverContent = emptyList() ) ).newReferenceId() - val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent + listener.accept(task, reporter) + listener.currentJob?.join() + val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent + + assertTrue(reporter.completed) assertEquals(TaskStatus.Completed, event.status) } } - diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt index e9a3ff62..1a9acf2f 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/StoreContentAndMetadataTaskListenerTest.kt @@ -1,8 +1,10 @@ package no.iktdev.mediaprocessing.coordinator.listeners.tasks import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask import no.iktdev.mediaprocessing.shared.common.model.ContentExport @@ -21,10 +23,37 @@ import org.springframework.http.HttpMethod import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity import org.springframework.web.client.RestTemplate +import java.util.* @ExtendWith(MockitoExtension::class) class StoreContentAndMetadataTaskListenerTest { + // ------------------------------------------------------------------------- + // Fake Reporter + // ------------------------------------------------------------------------- + + class FakeTaskReporter : TaskReporter { + val events = mutableListOf() + var completed = false + var failed = false + + override fun markClaimed(taskId: UUID, workerId: String) {} + override fun updateLastSeen(taskId: UUID) {} + override fun markCompleted(taskId: UUID) { completed = true } + override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true } + override fun markCancelled(referenceId: UUID, taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} + override fun log(taskId: UUID, message: String) {} + + override fun publishEvent(event: Event) { + events.add(event) + } + } + + // ------------------------------------------------------------------------- + // Setup + // ------------------------------------------------------------------------- + @Mock lateinit var restTemplate: RestTemplate @@ -39,10 +68,19 @@ class StoreContentAndMetadataTaskListenerTest { private fun sampleContentExport(): ContentExport { return ContentExport( collection = "series", - episodeInfo = ContentExport.EpisodeInfo(episodeNumber = 1, seasonNumber = 1, episodeTitle = "Pilot"), + episodeInfo = ContentExport.EpisodeInfo( + episodeNumber = 1, + seasonNumber = 1, + episodeTitle = "Pilot" + ), media = ContentExport.MediaExport( videoFile = "bb.s01e01.mkv", - subtitles = listOf(ContentExport.MediaExport.Subtitle(subtitleFile = "bb.en.srt", language = "en")) + subtitles = listOf( + ContentExport.MediaExport.Subtitle( + subtitleFile = "bb.en.srt", + language = "en" + ) + ) ), metadata = ContentExport.MetadataExport( title = "Breaking Bad", @@ -55,6 +93,9 @@ class StoreContentAndMetadataTaskListenerTest { ) } + // ------------------------------------------------------------------------- + // supports() + // ------------------------------------------------------------------------- @Test @DisplayName( @@ -63,7 +104,7 @@ class StoreContentAndMetadataTaskListenerTest { Når supports() kalles Så: Returnerer true - """ + """ ) fun supports_returnsTrueForCorrectTask() { val task = StoreContentAndMetadataTask(data = sampleContentExport()) @@ -80,7 +121,7 @@ class StoreContentAndMetadataTaskListenerTest { Når supports() kalles Så: Returnerer false - """ + """ ) fun supports_returnsFalseForWrongTask() { val task = object : Task() {} @@ -90,45 +131,53 @@ class StoreContentAndMetadataTaskListenerTest { assertThat(result).isFalse() } + // ------------------------------------------------------------------------- + // accept() — full TaskListener flow + // ------------------------------------------------------------------------- + @Test @DisplayName( """ Gitt at RestTemplate returnerer 200 OK - Når onTask() kalles + Når accept() kjøres Så: - Returnerer Completed-event - """ + Publiseres Completed-event + """ ) - fun onTask_returnsCompletedOnSuccess() = runTest { - val task = StoreContentAndMetadataTask(data = sampleContentExport()) + fun accept_returnsCompletedOnSuccess() = runTest { + val reporter = FakeTaskReporter() + val task = StoreContentAndMetadataTask(data = sampleContentExport()).newReferenceId() whenever( restTemplate.exchange( - eq("open/api/mediaprocesser/import"), + eq("/open/api/mediaprocesser/import"), eq(HttpMethod.POST), any(), eq(Void::class.java) ) ).thenReturn(ResponseEntity(HttpStatus.OK)) - val event = listener.onTask(task) + listener.accept(task, reporter) + listener.currentJob?.join() - assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java) - val result = event as StoreContentAndMetadataTaskResultEvent - assertThat(result.status).isEqualTo(TaskStatus.Completed) + val event = reporter.events.first() as StoreContentAndMetadataTaskResultEvent + + assertThat(reporter.completed).isTrue() + assertThat(event.status).isEqualTo(TaskStatus.Completed) } @Test @DisplayName( """ Gitt at RestTemplate kaster exception - Når onTask() kalles + Når accept() kjøres Så: - Returnerer Failed-event - """ + Publiseres Failed-event via createIncompleteStateTaskEvent() + """ ) - fun onTask_returnsFailedOnException() = runTest { - val task = StoreContentAndMetadataTask(data = sampleContentExport()) + fun accept_returnsFailedOnException() = runTest { + val reporter = FakeTaskReporter() + val task = StoreContentAndMetadataTask(data = sampleContentExport()).newReferenceId() whenever( restTemplate.exchange( @@ -139,13 +188,20 @@ class StoreContentAndMetadataTaskListenerTest { ) ).thenThrow(RuntimeException("boom")) - val event = listener.onTask(task) + listener.accept(task, reporter) + listener.currentJob?.join() - assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java) - val result = event as StoreContentAndMetadataTaskResultEvent - assertThat(result.status).isEqualTo(TaskStatus.Failed) + val event = reporter.events.first() as StoreContentAndMetadataTaskResultEvent + + assertThat(reporter.failed).isTrue() + assertThat(event.status).isEqualTo(TaskStatus.Failed) + assertThat(event.error).contains("boom") } + // ------------------------------------------------------------------------- + // workerId() + // ------------------------------------------------------------------------- + @Test @DisplayName( """ @@ -153,12 +209,12 @@ class StoreContentAndMetadataTaskListenerTest { Når getWorkerId() kalles Så: Returnerer en streng som inneholder klassenavn, tasktype og UUID - """ + """ ) fun workerId_hasCorrectFormat() { val id = listener.getWorkerId() assertThat(id).contains("StoreContentAndMetadataTaskListener-MIXED-") - assertThat(id.split("-").last().length).isGreaterThan(10) // UUID-ish + assertThat(id.split("-").last().length).isGreaterThan(10) } } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt index 27d79a00..6b771f5f 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/TaskPoller.kt @@ -57,12 +57,12 @@ class DefaultTaskReporter() : TaskReporter { TaskStore.markConsumed(taskId, TaskStatus.Completed) } - override fun markFailed(taskId: UUID) { + override fun markFailed(referenceId: UUID, taskId: UUID) { log.info { "Marking task $taskId as failed" } TaskStore.markConsumed(taskId, TaskStatus.Failed) } - override fun markCancelled(taskId: UUID) { + override fun markCancelled(referenceId: UUID, taskId: UUID) { log.info { "Margin task $taskId as cancelled"} TaskStore.markConsumed(taskId, TaskStatus.Cancelled) } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt index da89d4d1..43202538 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListener.kt @@ -71,6 +71,19 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) { ).producedFrom(task) } + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return ProcesserExtractResultEvent(null, status, error = message) + } + override fun getFfmpeg(): FFmpeg { return SubtitleFFmpeg() } diff --git a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt index 67c93469..2989290b 100644 --- a/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt +++ b/apps/processer/src/main/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListener.kt @@ -70,6 +70,20 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff ).producedFrom(task) } + override fun createIncompleteStateTaskEvent( + task: Task, + status: TaskStatus, + exception: Exception? + ): Event { + val message = when (status) { + TaskStatus.Failed -> exception?.message ?: "Unknown error, see log" + TaskStatus.Cancelled -> "Canceled" + else -> "" + } + return ProcesserEncodeResultEvent(null, status, error = message) + } + + override fun getFfmpeg(): FFmpeg { return VideoFFmpeg(object : FFmpeg.Listener { var lastProgress: FfmpegDecodedProgress? = null diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt index 7a9bccdc..f0a6d23e 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/SubtitleTaskListenerTest.kt @@ -39,8 +39,8 @@ class SubtitleTaskListenerTest { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} override fun markCompleted(taskId: UUID) {} - override fun markFailed(taskId: UUID) {} - override fun markCancelled(taskId: UUID) {} + override fun markFailed(referenceId: UUID, taskId: UUID) {} + override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} diff --git a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt index 917d7495..6441981e 100644 --- a/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt +++ b/apps/processer/src/test/kotlin/no/iktdev/mediaprocessing/processer/listeners/VideoTaskListenerTest.kt @@ -41,8 +41,8 @@ class VideoTaskListenerTest { override fun markClaimed(taskId: UUID, workerId: String) {} override fun updateLastSeen(taskId: UUID) {} override fun markCompleted(taskId: UUID) {} - override fun markFailed(taskId: UUID) {} - override fun markCancelled(taskId: UUID) {} + override fun markFailed(referenceId: UUID, taskId: UUID) {} + override fun markCancelled(referenceId: UUID, taskId: UUID) {} override fun updateProgress(taskId: UUID, progress: Int) {} override fun log(taskId: UUID, message: String) {} override fun publishEvent(event: Event) { diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index ddd474fd..a2f6a9b8 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,5 @@ [versions] -eventi = "1.0-rc36" +eventi = "1.0-rc38" exfl = "1.0-rc1" [libraries] diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt index e12d5f93..3f1f5c3f 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/event_task_contract/events/MigrateContentToStoreTaskResultEvent.kt @@ -5,13 +5,18 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus class MigrateContentToStoreTaskResultEvent( - val collection: String, - val videoMigrate: FileMigration, - val subtitleMigrate: List, - val coverMigrate: List, + val migrateData: MigrateData? = null, status: TaskStatus, error: String? = null ) : TaskResultEvent(status, error) { + + data class MigrateData( + val collection: String, + val videoMigrate: FileMigration, + val subtitleMigrate: List, + val coverMigrate: List, + ) + data class FileMigration( val storedUri: String?, val status: MigrateStatus diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt index 6db1bcb5..4d1294ff 100644 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/projection/StoreProjection.kt @@ -14,7 +14,8 @@ class StoreProjection(val events: List) { val metadata = CollectProjection(events).metadata if (metadata != null) { val useCover = if (metadata.cover != null) { - val migrated = events.filterIsInstance().lastOrNull { it.status == TaskStatus.Completed }?.coverMigrate ?: emptyList() + val migrateData = events.filterIsInstance().lastOrNull { it.status == TaskStatus.Completed }?.migrateData + val migrated = migrateData?.coverMigrate ?: emptyList() migrated.filter { it.status == MigrateStatus.Completed && it.storedUri != null } .map { File(it.storedUri!!).name } .find { it == metadata.cover.name } @@ -47,7 +48,7 @@ class StoreProjection(val events: List) { } fun projectMediaFiles(): ContentExport.MediaExport? { - val migrated = events.filterIsInstance().lastOrNull { it.status == TaskStatus.Completed } + val migrated = events.filterIsInstance().lastOrNull { it.status == TaskStatus.Completed }?.migrateData return ContentExport.MediaExport( videoFile = migrated?.videoMigrate?.let { video -> if (video.status == MigrateStatus.Completed) File(video.storedUri!!).name else null @@ -59,7 +60,7 @@ class StoreProjection(val events: List) { fun getCollection(): String? { val migrated = events.filterIsInstance().lastOrNull { it.status == TaskStatus.Completed } ?: return null - return if (migrated.status == TaskStatus.Completed) migrated.collection else null + return if (migrated.status == TaskStatus.Completed) migrated.migrateData?.collection else null } } \ No newline at end of file