diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt index e7729c58..ba43003c 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListener.kt @@ -10,15 +10,16 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv import no.iktdev.mediaprocessing.shared.common.DownloadClient import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask +import no.iktdev.mediaprocessing.shared.common.notExist import org.springframework.stereotype.Component -import java.util.UUID +import java.util.* @Component class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { val log = KotlinLogging.logger {} override fun getWorkerId(): String { - return "${this::class.java.simpleName}-${TaskType.CPU_INTENSIVE}-${UUID.randomUUID()}" + return "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" } override fun supports(task: Task): Boolean { @@ -30,11 +31,15 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { log.info { "Downloading cover from ${pickedTask.data.url}" } val taskData = pickedTask.data - val downloadClient = DownloadClient(taskData.url, CoordinatorEnv.cachedContent, taskData.outputFileName) - val downloadedFile = downloadClient.download() + val downloadClient = getDownloadClient() + val downloadResult = try { + downloadClient.download(taskData.url, taskData.outputFileName) + } catch (e: Exception) { + return CoverDownloadResultEvent(status = TaskStatus.Failed) + } + val downloadedFile = downloadResult.result - - if (downloadedFile?.exists() == true) { + if (downloadResult.success && downloadedFile != null) { log.info { "Downloaded cover to ${downloadedFile.absolutePath}" } return CoverDownloadResultEvent( status = TaskStatus.Completed, @@ -51,5 +56,20 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { } } + open fun getDownloadClient(): DownloadClient { + return DefaultDownloadClient() + } + + class DefaultDownloadClient() : DownloadClient( + outDir = CoordinatorEnv.cachedContent, + connectionFactory = DefaultConnectionFactory(),) { + override fun onCreate() { + super.onCreate() + if (outDir.notExist()) { + outDir.mkdirs() + } + } + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt index 6bd4badb..414fe36b 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListener.kt @@ -10,14 +10,14 @@ import no.iktdev.mediaprocessing.ffmpeg.FFprobe import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask import org.springframework.stereotype.Component -import java.util.UUID +import java.util.* @Component class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) { val log = KotlinLogging.logger {} override fun getWorkerId(): String { - return "${this::class.java.simpleName}-${TaskType.CPU_INTENSIVE}-${UUID.randomUUID()}" + return "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}" } override fun supports(task: Task): Boolean { diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt index 2e0eb1b1..dab59ed7 100644 --- a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListener.kt @@ -5,13 +5,15 @@ import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.tasks.TaskListener import no.iktdev.eventi.tasks.TaskType +import no.iktdev.mediaprocessing.coordinator.util.FileSystemService import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus +import org.jetbrains.annotations.VisibleForTesting import org.springframework.stereotype.Component import java.io.File import java.nio.file.Files -import java.util.UUID +import java.util.* @Component class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { @@ -26,18 +28,26 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { override suspend fun onTask(task: Task): Event? { val pickedTask = task as? MigrateToContentStoreTask ?: return null - val videoStatus = migrateVideo(pickedTask.data.videoContent) - val subtitleStatus = migrateSubtitle(pickedTask.data.subtitleContent ?: emptyList()) - val coverStatus = migrateCover(pickedTask.data.coverContent ?: emptyList()) + val fs = getFileSystemService() + + val videoStatus = migrateVideo(fs, pickedTask.data.videoContent) + val subtitleStatus = migrateSubtitle(fs, pickedTask.data.subtitleContent ?: emptyList()) + val coverStatus = migrateCover(fs, pickedTask.data.coverContent ?: emptyList()) var status = TaskStatus.Completed if (videoStatus.status != MigrateStatus.Failed && subtitleStatus.none { it.status == MigrateStatus.Failed } && coverStatus.none { it.status == MigrateStatus.Failed }) { - pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.deleteOnExit() - pickedTask.data.subtitleContent?.forEach { File(it.cachedUri).deleteOnExit() } - pickedTask.data.coverContent?.forEach { File(it.cachedUri).deleteOnExit() } + pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.let { + fs.delete(it) + } + pickedTask.data.subtitleContent?.map { File(it.cachedUri) }?.forEach { + fs.delete(it) + } + pickedTask.data.coverContent?.map { File(it.cachedUri) }?.forEach { + fs.delete(it) + } } else { status = TaskStatus.Failed } @@ -54,32 +64,43 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { return completedEvent } - private fun migrateVideo(videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration { + @VisibleForTesting + internal fun migrateVideo(fs: FileSystemService, videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration { if (videoContent == null) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent) val source = File(videoContent.cachedUri) val destination = File(videoContent.storeUri) return try { - source.copyTo(destination, overwrite = true) - val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L - if (!identical) { + if (!fs.copy(source, destination)) { return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) } + + if (!fs.areIdentical(source, destination)) { + return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) + } + MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed) } catch (e: Exception) { MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) } } - private fun migrateSubtitle(subtitleContents: List): List { + @VisibleForTesting + internal fun migrateSubtitle( + fs: FileSystemService, + subtitleContents: List + ): List { if (subtitleContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.SubtitleMigration(null, null, MigrateStatus.NotPresent)) val results = mutableListOf() for (subtitle in subtitleContents) { val source = File(subtitle.cachedUri) val destination = File(subtitle.storeUri) try { - source.copyTo(destination, overwrite = true) - val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L - if (!identical) { + if (!fs.copy(source, destination)) { + results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed)) + continue + } + + if (!fs.areIdentical(source, destination)) { results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed)) } else { results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Completed)) @@ -91,16 +112,19 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { return results } - private fun migrateCover(coverContents: List): List { + @VisibleForTesting + internal fun migrateCover(fs: FileSystemService, coverContents: List): List { if (coverContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)) val results = mutableListOf() for (cover in coverContents) { val source = File(cover.cachedUri) val destination = File(cover.storeUri) try { - source.copyTo(destination, overwrite = true) - val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L - if (!identical) { + if (!fs.copy(source, destination)) { + results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) + continue + } + if (!fs.areIdentical(source, destination)) { results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) } else { results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed)) @@ -111,4 +135,28 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { } return results } + + open fun getFileSystemService(): FileSystemService { + return DefaultFileSystemService() + } + + class DefaultFileSystemService : FileSystemService { + override fun copy(source: File, destination: File): Boolean { + return try { + source.copyTo(destination, overwrite = true) + true + } catch (e: Exception) { + false + } + } + + override fun areIdentical(a: File, b: File): Boolean { + return Files.mismatch(a.toPath(), b.toPath()) == -1L + } + + override fun delete(file: File) { + file.delete() + } + } + } \ No newline at end of file diff --git a/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/FileSystemService.kt b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/FileSystemService.kt new file mode 100644 index 00000000..e5301a82 --- /dev/null +++ b/apps/coordinator/src/main/kotlin/no/iktdev/mediaprocessing/coordinator/util/FileSystemService.kt @@ -0,0 +1,9 @@ +package no.iktdev.mediaprocessing.coordinator.util + +import java.io.File + +interface FileSystemService { + fun copy(source: File, destination: File): Boolean + fun areIdentical(a: File, b: File): Boolean + fun delete(file: File) +} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockDownloadClient.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockDownloadClient.kt new file mode 100644 index 00000000..05e57427 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockDownloadClient.kt @@ -0,0 +1,32 @@ +package no.iktdev.mediaprocessing + +import kotlinx.coroutines.delay +import no.iktdev.mediaprocessing.shared.common.DownloadClient +import java.io.File +import java.net.HttpURLConnection +import java.net.URI + +class MockDownloadClient( + private val delayMillis: Long = 0, + private val throwException: Boolean = false, + private val mockFile: File? = null +) : DownloadClient( + outDir = File("/null"), + connectionFactory = object : ConnectionFactory { + override fun open(uri: URI): HttpURLConnection { + throw UnsupportedOperationException("MockDownloadClient does not open real connections") + } + } +) { + + override suspend fun download(useUrl: String, useBaseName: String): DownloadResult { + if (delayMillis > 0) delay(delayMillis) + if (throwException) throw RuntimeException("Simulated download failure") + + return if (mockFile != null) { + DownloadResult(success = true, result = mockFile, error = null) + } else { + DownloadResult(success = false, result = null, error = "No mock file configured") + } + } +} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFFprobe.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFFprobe.kt index b6776a74..16b7b3b7 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFFprobe.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFFprobe.kt @@ -21,8 +21,9 @@ class MockFFprobe( } companion object { - fun success(json: JsonObject) = MockFFprobe( - result = FFinfoOutput(success = true, data = json, error = null) + fun success(json: JsonObject, delay: Long = 0) = MockFFprobe( + result = FFinfoOutput(success = true, data = json, error = null), + delayMillis = delay ) fun failure(errorMsg: String) = MockFFprobe( result = FFinfoOutput(success = false, data = null, error = errorMsg) diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFileSystemService.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFileSystemService.kt new file mode 100644 index 00000000..65fb637d --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/MockFileSystemService.kt @@ -0,0 +1,24 @@ +package no.iktdev.mediaprocessing + +import no.iktdev.mediaprocessing.coordinator.util.FileSystemService +import java.io.File + +class MockFileSystemService : FileSystemService { + var copyShouldFail = false + var identical = true + val copied = mutableListOf>() + val deleted = mutableListOf() + + override fun copy(source: File, destination: File): Boolean { + copied += source to destination + return !copyShouldFail + } + + override fun areIdentical(a: File, b: File): Boolean { + return identical + } + + override fun delete(file: File) { + deleted += file + } +} \ No newline at end of file diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt new file mode 100644 index 00000000..12a53b76 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/DownloadCoverTaskListenerTest.kt @@ -0,0 +1,194 @@ +package no.iktdev.mediaprocessing.coordinator.listeners.tasks + +import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.models.Event +import no.iktdev.eventi.models.Task +import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.eventi.tasks.TaskReporter +import no.iktdev.mediaprocessing.MockDownloadClient +import no.iktdev.mediaprocessing.TestBase +import no.iktdev.mediaprocessing.shared.common.DownloadClient +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import java.io.File +import java.util.* +import kotlin.system.measureTimeMillis + +class DownloadCoverTaskListenerTest { + + class DownloadCoverTaskListenerTestImplementation : DownloadCoverTaskListener() { + fun getJob() = currentJob + + lateinit var client: DownloadClient + override fun getDownloadClient(): DownloadClient = client + + private var _result: Event? = null + fun getResult(): Event? = _result + + override fun onComplete(task: Task, result: Event?) { + super.onComplete(task, result) + this._result = result + } + } + + private val overrideReporter = object : TaskReporter { + override fun markClaimed(taskId: UUID, workerId: String) {} + override fun updateLastSeen(taskId: UUID) {} + override fun markConsumed(taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} + override fun log(taskId: UUID, message: String) {} + override fun publishEvent(event: Event) {} + } + + private var listener = DownloadCoverTaskListenerTestImplementation() + + @Test + @DisplayName( + """ + Når onTask kjøres + Hvis nedlasting tar tid + Så: + venter listener til jobben er ferdig og returnerer Completed-event + """ + ) + fun onTask_waits_for_runner_to_complete() = runTest { + val delay = 1000L + + val task = CoverDownloadTask( + CoverDownloadTask.CoverDownloadData( + url = "http://example.com/fancy.jpg", + outputFileName = "potatoland", + source = "fancy" + ) + ).newReferenceId() + + listener = DownloadCoverTaskListenerTestImplementation().apply { + this.client = MockDownloadClient( + delayMillis = delay, + mockFile = File("/tmp/fancy.jpg") + ) + } + + val time = measureTimeMillis { + listener.accept(task, overrideReporter) + listener.getJob()?.join() + + val event = listener.getResult() + assertTrue(event is CoverDownloadResultEvent) + assertEquals(TaskStatus.Completed, (event as CoverDownloadResultEvent).status) + } + + assertTrue(time >= delay, "Expected at least $delay ms, got $time ms") + assertTrue(time <= delay * 2, "Expected less than ${delay * 2} ms, got $time ms") + } + + @Test + @DisplayName( + """ + Når onTask kjøres + Hvis download-klienten kaster en exception + Så: + returneres Failed-event + """ + ) + fun onTask_returns_failed_on_exception() = runTest { + val task = CoverDownloadTask( + CoverDownloadTask.CoverDownloadData( + url = "http://example.com/fancy.jpg", + outputFileName = "potatoland", + source = "fancy" + ) + ).newReferenceId() + + listener = DownloadCoverTaskListenerTestImplementation().apply { + this.client = MockDownloadClient(throwException = true) + } + + listener.accept(task, overrideReporter) + listener.getJob()?.join() + + val event = listener.getResult() + assertTrue(event is CoverDownloadResultEvent) + assertEquals(TaskStatus.Failed, (event as CoverDownloadResultEvent).status) + } + + @Test + @DisplayName( + """ + Når onTask kjøres + Hvis task ikke er av typen CoverDownloadTask + Så: + returneres null + """ + ) + fun onTask_returns_null_for_unsupported_task() = runTest { + val event = listener.onTask(TestBase.DummyTask()) // fake unsupported task + assertNull(event) + } + + @Test + @DisplayName( + """ + Når onTask produserer event + Hvis nedlasting lykkes + Så: + inneholder event korrekt filsti + """ + ) + fun onTask_produces_correct_output_path() = runTest { + val mockFile = File("/tmp/expected.jpg") + + listener = DownloadCoverTaskListenerTestImplementation().apply { + this.client = MockDownloadClient(mockFile = mockFile) + } + + val task = CoverDownloadTask( + CoverDownloadTask.CoverDownloadData( + url = "http://example.com/img.jpg", + outputFileName = "expected", + source = "unit-test" + ) + ).newReferenceId() + + listener.accept(task, overrideReporter) + listener.getJob()?.join() + + val event = listener.getResult() as CoverDownloadResultEvent + assertEquals(mockFile.absolutePath, event.data!!.outputFile) + } + + @Test + @DisplayName( + """ + Når accept kalles + Hvis nedlasting skjer asynkront + Så: + blokkerer ikke tråden + """ + ) + fun accept_is_non_blocking() = runTest { + val delay = 500L + + listener = DownloadCoverTaskListenerTestImplementation().apply { + this.client = MockDownloadClient(delayMillis = delay, mockFile = File("/tmp/x.jpg")) + } + + val task = CoverDownloadTask( + CoverDownloadTask.CoverDownloadData( + url = "http://example.com/img.jpg", + outputFileName = "x", + source = "unit-test" + ) + ).newReferenceId() + + val time = measureTimeMillis { + listener.accept(task, overrideReporter) + // intentionally NOT joining here + } + + assertTrue(time < 50, "accept() should return immediately, got $time ms") + } +} diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt index 63d80a9f..24f49a5c 100644 --- a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MediaStreamReadTaskListenerTest.kt @@ -4,27 +4,90 @@ import com.google.gson.JsonObject import io.mockk.mockk import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.models.Event import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.eventi.tasks.TaskReporter import no.iktdev.mediaprocessing.MockFFprobe import no.iktdev.mediaprocessing.ffmpeg.FFprobe import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.Test +import java.util.* +import kotlin.system.measureTimeMillis class MediaStreamReadTaskListenerTest { class MediaStreamReadTaskListenerTestImplementation(): MediaStreamReadTaskListener() { + fun getJob() = currentJob lateinit var probe: FFprobe override fun getFfprobe(): FFprobe { return probe } + + private var _result: Event? = null + fun getResult(): Event? { + return _result + } + override fun onComplete(task: Task, result: Event?) { + super.onComplete(task, result) + this._result = result + } + } + + val overrideReporter = object : TaskReporter { + override fun markClaimed(taskId: UUID, workerId: String) {} + override fun updateLastSeen(taskId: UUID) {} + override fun markConsumed(taskId: UUID) {} + override fun updateProgress(taskId: UUID, progress: Int) {} + override fun log(taskId: UUID, message: String) {} + override fun publishEvent(event: Event) { + + } + } + + var listener = MediaStreamReadTaskListenerTestImplementation() + + @BeforeEach + fun resetListener() { + listener = MediaStreamReadTaskListenerTestImplementation() + } + + + @Test + fun `onTask waits for runner to complete`() = runTest { + val delay = 1000L + + val json = JsonObject().apply { addProperty("codec_type", "video") } + + val task = MediaReadTask(fileUri = "test.mp4").newReferenceId() + + listener = MediaStreamReadTaskListenerTestImplementation().apply { + this.probe = MockFFprobe.success(json, delay) + } + + + val time = measureTimeMillis { + listener.accept(task, overrideReporter) + listener.getJob()?.join() + val event = listener.getResult() + assertTrue(event is CoordinatorReadStreamsResultEvent) + val result = event as CoordinatorReadStreamsResultEvent + assertEquals(json, result.data) + assertEquals(TaskStatus.Completed, result.status) + assertEquals("test.mp4", (listener.probe as MockFFprobe).lastInputFile) + assertEquals(TaskStatus.Completed, (event as CoordinatorReadStreamsResultEvent).status) + } + + assertTrue(time >= delay, "Expected onTask to wait at least $delay ms, waited for $time ms") + assertTrue(time <= (delay*2), "Expected onTask to wait less than ${(delay*2)} ms, waited for $time ms") + } - private val listener = MediaStreamReadTaskListenerTestImplementation() @Test @DisplayName( @@ -84,7 +147,6 @@ class MediaStreamReadTaskListenerTest { Skal MediaStreamReadEvent produseres med data """) fun verifyEventProducedOnValidJson() = runTest { - val listener = MediaStreamReadTaskListenerTestImplementation() val json = JsonObject().apply { addProperty("codec_type", "video") } listener.probe = MockFFprobe.success(json) @@ -107,7 +169,6 @@ class MediaStreamReadTaskListenerTest { Skal onTask returnere null og ikke kaste unntak """) fun verifyNullOnParsingError() = runTest { - val listener = MediaStreamReadTaskListenerTestImplementation() listener.probe = MockFFprobe.failure("Could not parse") val task = MediaReadTask(fileUri = "corrupt.mp4").newReferenceId() @@ -127,7 +188,6 @@ class MediaStreamReadTaskListenerTest { Skal onTask returnere null og logge feilen """) fun verifyExceptionHandling() = runTest { - val listener = MediaStreamReadTaskListenerTestImplementation() listener.probe = MockFFprobe.exception() val task = MediaReadTask(fileUri = "broken.mp4").newReferenceId() @@ -146,7 +206,6 @@ class MediaStreamReadTaskListenerTest { Skal supports returnere false og onTask returnere null """) fun verifySupportsOnlyMediaReadTask() = runTest { - val listener = MediaStreamReadTaskListenerTestImplementation() listener.probe = MockFFprobe.failure("Not used") val otherTask = object : Task() {}.newReferenceId() diff --git a/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt new file mode 100644 index 00000000..8cc05e13 --- /dev/null +++ b/apps/coordinator/src/test/kotlin/no/iktdev/mediaprocessing/coordinator/listeners/tasks/MigrateContentToStoreTaskListenerTest.kt @@ -0,0 +1,247 @@ +package no.iktdev.mediaprocessing.coordinator.listeners.tasks + +import kotlinx.coroutines.test.runTest +import no.iktdev.eventi.models.store.TaskStatus +import no.iktdev.mediaprocessing.MockFileSystemService +import no.iktdev.mediaprocessing.coordinator.util.FileSystemService +import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent +import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask +import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class MigrateContentToStoreTaskListenerTest { + + class MigrateContentToStoreTaskListenerTestImplementation: MigrateContentToStoreTaskListener() { + + var fs: FileSystemService? = null + override fun getFileSystemService(): FileSystemService { + return fs!! + } + } + + val listener = MigrateContentToStoreTaskListenerTestImplementation() + + // ------------------------------------------------------------------------- + // migrateVideo + // ------------------------------------------------------------------------- + + @Test + @DisplayName( + """ + Når migrateVideo kjøres + Hvis copy lykkes og filene er identiske + Så: + returneres Completed + """ + ) + fun migrateVideo_success() { + val fs = MockFileSystemService().also { + listener.fs = it + } + + val content = MigrateToContentStoreTask.Data.SingleContent( + cachedUri = "/tmp/source.mp4", + storeUri = "/tmp/dest.mp4" + ) + + val result = listener.migrateVideo(fs, content) + + assertEquals(MigrateStatus.Completed, result.status) + assertEquals("/tmp/dest.mp4", result.storedUri) + assertEquals(1, fs.copied.size) + } + + @Test + @DisplayName( + """ + Når migrateVideo kjøres + Hvis copy feiler + Så: + returneres Failed + """ + ) + fun migrateVideo_copyFails() { + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { + listener.fs = it + } + + + val content = MigrateToContentStoreTask.Data.SingleContent( + cachedUri = "/tmp/source.mp4", + storeUri = "/tmp/dest.mp4" + ) + + val result = listener.migrateVideo(fs, content) + + assertEquals(MigrateStatus.Failed, result.status) + } + + @Test + @DisplayName( + """ + Når migrateVideo kjøres + Hvis copy lykkes men filene ikke er identiske + Så: + returneres Failed + """ + ) + fun migrateVideo_mismatch() { + val fs = MockFileSystemService().apply { identical = false }.also { + listener.fs = it + } + + val content = MigrateToContentStoreTask.Data.SingleContent( + cachedUri = "/tmp/source.mp4", + storeUri = "/tmp/dest.mp4" + ) + + val result = listener.migrateVideo(fs, content) + + assertEquals(MigrateStatus.Failed, result.status) + } + + // ------------------------------------------------------------------------- + // migrateSubtitle + // ------------------------------------------------------------------------- + + @Test + @DisplayName( + """ + Når migrateSubtitle kjøres + Hvis listen er tom + Så: + returneres NotPresent + """ + ) + fun migrateSubtitle_empty() { + val fs = MockFileSystemService().also { + listener.fs = it + } + + val result = listener.migrateSubtitle(fs, emptyList()) + + assertEquals(1, result.size) + assertEquals(MigrateStatus.NotPresent, result.first().status) + } + + @Test + @DisplayName( + """ + Når migrateSubtitle kjøres + Hvis copy lykkes og filene er identiske + Så: + returneres Completed + """ + ) + fun migrateSubtitle_success() { + val fs = MockFileSystemService().also { + listener.fs = it + } + + val sub = MigrateToContentStoreTask.Data.SingleSubtitle( + language = "en", + cachedUri = "/tmp/a.srt", + storeUri = "/tmp/b.srt" + ) + + val result = listener.migrateSubtitle(fs, listOf(sub)) + + assertEquals(MigrateStatus.Completed, result.first().status) + } + + // ------------------------------------------------------------------------- + // migrateCover + // ------------------------------------------------------------------------- + + @Test + @DisplayName( + """ + Når migrateCover kjøres + Hvis copy lykkes og filene er identiske + Så: + returneres Completed + """ + ) + fun migrateCover_success() { + val fs = MockFileSystemService().also { + listener.fs = it + } + + val cover = MigrateToContentStoreTask.Data.SingleContent( + cachedUri = "/tmp/c.jpg", + storeUri = "/tmp/c2.jpg" + ) + + val result = listener.migrateCover(fs, listOf(cover)) + + assertEquals(MigrateStatus.Completed, result.first().status) + } + + // ------------------------------------------------------------------------- + // onTask + // ------------------------------------------------------------------------- + + @Test + @DisplayName( + """ + Når onTask kjøres + Hvis alle migreringer lykkes + Så: + returneres Completed-event og cache slettes + """ + ) + fun onTask_success() = runTest { + val fs = MockFileSystemService().also { + listener.fs = it + } + + val task = MigrateToContentStoreTask( + MigrateToContentStoreTask.Data( + collection = "col", + videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), + subtitleContent = listOf( + MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/s", "/tmp/s2") + ), + coverContent = listOf( + MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2") + ) + ) + ).newReferenceId() + + val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent + + assertEquals(TaskStatus.Completed, event.status) + assertEquals(3, fs.deleted.size) + } + + @Test + @DisplayName( + """ + Når onTask kjøres + Hvis en migrering feiler + Så: + returneres Failed-event og ingenting slettes + """ + ) + fun onTask_failure() = runTest { + val fs = MockFileSystemService().apply { copyShouldFail = true }.also { + listener.fs = it + } + + val task = MigrateToContentStoreTask( + MigrateToContentStoreTask.Data( + collection = "col", + videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"), + subtitleContent = emptyList(), + coverContent = emptyList() + ) + ).newReferenceId() + + val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent + + assertEquals(TaskStatus.Failed, event.status) + assertEquals(0, fs.deleted.size) + } +} diff --git a/shared/common/build.gradle.kts b/shared/common/build.gradle.kts index db7c6240..cac2a14e 100644 --- a/shared/common/build.gradle.kts +++ b/shared/common/build.gradle.kts @@ -63,18 +63,21 @@ dependencies { implementation(project(":shared:ffmpeg")) implementation("no.iktdev:eventi:1.0-rc16") + testImplementation(kotlin("test")) testImplementation(platform("org.junit:junit-bom:5.10.0")) testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.springframework.boot:spring-boot-starter-test") - testImplementation("io.mockk:mockk:1.12.0") implementation("com.h2database:h2:2.2.220") testImplementation("org.assertj:assertj-core:3.24.2") testImplementation("io.kotest:kotest-assertions-core:5.7.2") testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0") testImplementation("io.github.classgraph:classgraph:4.8.184") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2") + testImplementation("io.mockk:mockk:1.13.9") + } diff --git a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt index 113a4f1e..37e040ac 100755 --- a/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt +++ b/shared/common/src/main/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClient.kt @@ -8,98 +8,103 @@ import java.io.File import java.io.FileOutputStream import java.net.HttpURLConnection import java.net.URI -import java.net.URL -import java.util.UUID -import kotlin.apply -import kotlin.io.use -import kotlin.run -import kotlin.text.lastIndexOf -import kotlin.text.substring -import kotlin.to +import java.nio.file.Files +import java.nio.file.StandardCopyOption +import java.util.* -open class DownloadClient(val url: String, val outDir: File, val baseName: String) { +open class DownloadClient(val outDir: File, private val connectionFactory: ConnectionFactory) { val log = KotlinLogging.logger {} - protected val http: HttpURLConnection = openConnection() private val BUFFER_SIZE = 4096 - private fun openConnection(): HttpURLConnection { - try { - return URI(url).toURL().openConnection() as HttpURLConnection + open fun onCreate() {} + + fun HttpURLConnection.getMetadata(): DownloadMetadata { + return DownloadMetadata( + this.url.toURI(), + this.contentType.also { + if (it.isNullOrBlank()) { + log.error { "Unable to determine mime type for $url" } + } else { + log.info { "Downloading file from $url with mime type $it" } + } + }, + this.contentLengthLong + ) + } + + protected fun getProgress(read: Int, total: Int): Int { + return if (total == 0) 0 else ((read * 100) / total) + } + + open suspend fun download(useUrl: String, useBaseName: String): DownloadResult { + return try { + val connection = connectionFactory.open(URI(useUrl)) + val metadata = connection.getMetadata() + val downloadedFile = downloadFile(connection) + val resultFile = downloadedFile?.let { file -> + finalizeDownload(file, useBaseName, metadata) + } + DownloadResult(resultFile?.exists() == true, resultFile, null) } catch (e: Exception) { - e.printStackTrace() - throw BadAddressException("Provided url is either not provided (null) or is not a valid http url") + DownloadResult(false, null, e.message) } } - protected fun getLength(): Int = http.contentLength - - - protected fun getProgress(read: Int, total: Int = getLength()): Int { - return ((read * 100) / total) - } - - suspend fun download(): File? = withContext(Dispatchers.IO) { + open suspend fun downloadFile(useConnection: HttpURLConnection) = withContext(Dispatchers.IO) { val downloadFile = outDir.using(UUID.randomUUID().toString() + ".downloading") - if (downloadFile.exists()) { log.info { "${downloadFile.name} already exists. Download skipped!" } return@withContext null } - val inputStream = http.inputStream - val mimeType: String? = http.contentType - if (mimeType == null) { - log.error { "Unable to determine mime type for $url" } - } else { - log.info { "Downloading file from $url with mime type $mimeType" } - } - - val fos = FileOutputStream(downloadFile, false) - var totalBytesRead = 0 val buffer = ByteArray(BUFFER_SIZE) - inputStream.apply { - fos.use { fout -> - run { - var bytesRead = read(buffer) - while (bytesRead >= 0) { - fout.write(buffer, 0, bytesRead) - totalBytesRead += bytesRead - bytesRead = read(buffer) - // System.out.println(getProgress(totalBytesRead)) - } + useConnection.inputStream.use { input -> + FileOutputStream(downloadFile).use { output -> + var bytesRead = input.read(buffer) + while (bytesRead >= 0) { + output.write(buffer, 0, bytesRead) + totalBytesRead += bytesRead + bytesRead = input.read(buffer) + // System.out.println(getProgress(totalBytesRead)) } } } - inputStream.close() - fos.close() - val extension = getExtension(downloadFile, mimeType ?: "") + downloadFile + } + + open suspend fun finalizeDownload(tempFile: File, baseName: String, metadata: DownloadMetadata): File = withContext( + Dispatchers.IO) { + val extension = getExtension(tempFile, metadata) ?: throw UnsupportedFormatException("Downloaded file does not contain a supported file extension") val outFile = outDir.using("$baseName.$extension") - val renamed = downloadFile.renameTo(outFile) - if (!renamed) { - log.error { "Failed to rename ${downloadFile.name} to ${outFile.name}" } - throw InvalidFileException("Failed to rename downloaded file") + + try { + Files.move( + tempFile.toPath(), + outFile.toPath(), + StandardCopyOption.ATOMIC_MOVE + ) + } catch (e: Exception) { + log.error { "Failed to atomically move ${tempFile.name} to ${outFile.name}" } + throw InvalidFileException("Failed to finalize downloaded file") } - return@withContext outFile + outFile } - open fun getExtension(outFile: File, mimeType: String): String? { - val extensionFormat = mimeToExtension(mimeType) ?: outFile.getFileType() - if (extensionFormat == null) { - val possiblyExtension = url.lastIndexOf(".") + 1 - if (possiblyExtension > 1) { - return url.substring(possiblyExtension) - } - } - return null + + + open fun getExtension(outFile: File, metadata: DownloadMetadata): String? { + return mimeToExtension(metadata.mimeType) + ?: outFile.getFileType() } - fun mimeToExtension(mimeType: String): String? { - return when(mimeType) { + + fun mimeToExtension(mimeType: String?): String? { + return when (mimeType) { "image/png" -> "png" "image/jpg", "image/jpeg" -> "jpg" "image/webp" -> "webp" @@ -160,4 +165,32 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin constructor(message: String?) : super(message) {} constructor(message: String?, cause: Throwable?) : super(message, cause) {} } + + data class DownloadMetadata( + val uri: URI, + val mimeType: String?, + val length: Long + ) + + data class DownloadResult( + val success: Boolean, + val result: File? = null, + val error: String? = null + ) + + interface ConnectionFactory { + fun open(uri: URI): HttpURLConnection + } + + class DefaultConnectionFactory : ConnectionFactory { + override fun open(uri: URI): HttpURLConnection { + try { + return uri.toURL().openConnection() as HttpURLConnection + } catch (e: Exception) { + e.printStackTrace() + throw BadAddressException("Provided url is either not provided (null) or is not a valid http url") + } + } + + } } \ No newline at end of file diff --git a/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClientTest.kt b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClientTest.kt new file mode 100644 index 00000000..96fce81d --- /dev/null +++ b/shared/common/src/test/kotlin/no/iktdev/mediaprocessing/shared/common/DownloadClientTest.kt @@ -0,0 +1,166 @@ +package no.iktdev.mediaprocessing.shared.common + +import io.mockk.every +import io.mockk.mockk +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import java.io.File +import java.net.HttpURLConnection +import java.net.URI +import kotlin.io.path.createTempDirectory +import kotlin.test.assertFailsWith + +class DownloadClientTest { + + private fun tempDir(): File = createTempDirectory().toFile() + + private fun fakeConnection(data: ByteArray): HttpURLConnection { + val mock = mockk() + every { mock.inputStream } returns data.inputStream() + return mock + } + + private fun fakeMetadata(ext: String = "jpg") = DownloadClient.DownloadMetadata( + uri = URI("http://example.com/file.$ext"), + mimeType = "image/$ext", + length = 10 + ) + + private fun client(outDir: File) = object : DownloadClient( + outDir = outDir, + connectionFactory = mockk() + ) {} + + // ------------------------------------------------------------ + // 1. downloadFile + // ------------------------------------------------------------ + + @Test + @DisplayName( + """ + Når downloadFile kjøres + Hvis input stream inneholder bytes + Så: + skrives filen til disk og returneres + """ + ) + fun downloadFile_writes_file() = runTest { + val outDir = tempDir() + val data = "hello".toByteArray() + + val connection = fakeConnection(data) + val client = client(outDir) + + val file = client.downloadFile(connection) + + assertNotNull(file) + assertTrue(file!!.exists()) + assertEquals("hello", file.readText()) + } + + // ------------------------------------------------------------ + // 2. finalizeDownload (happy path) + // ------------------------------------------------------------ + + @Test + @DisplayName( + """ + Når finalizeDownload kjøres + Hvis atomic move lykkes + Så: + flyttes temp-filen til endelig filnavn og temp-filen slettes + """ + ) + fun finalizeDownload_moves_file_atomically() = runTest { + val outDir = tempDir() + val tempFile = File(outDir, "temp.downloading").apply { writeText("hello") } + + val client = client(outDir) + val metadata = fakeMetadata("jpg") + + val result = client.finalizeDownload(tempFile, "final", metadata) + + assertTrue(result.exists()) + assertEquals("final.jpg", result.name) + assertFalse(tempFile.exists()) + } + + // ------------------------------------------------------------ + // 3. finalizeDownload (move failure) + // ------------------------------------------------------------ + + @Test + @DisplayName( + """ + Når finalizeDownload kjøres + Hvis atomic move feiler + Så: + kastes InvalidFileException + """ + ) + fun finalizeDownload_throws_on_failure() = runTest { + val outDir = tempDir() + val tempFile = File(outDir, "temp.downloading").apply { writeText("hello") } + + // Gjør katalogen skrivebeskyttet for å tvinge move-feil + outDir.setWritable(false) + + val client = client(outDir) + val metadata = fakeMetadata("jpg") + + assertFailsWith { + client.finalizeDownload(tempFile, "final", metadata) + } + } + + // ------------------------------------------------------------ + // 5. getExtension + // ------------------------------------------------------------ + + @Test + @DisplayName( + """ + Når getExtension kjøres + Hvis metadata inneholder MIME-type + Så: + returneres riktig filendelse + """ + ) + fun getExtension_from_metadata() = runTest { + val outDir = tempDir() + val client = client(outDir) + + val ext = client.getExtension(File("x"), fakeMetadata("png")) + + assertEquals("png", ext) + } + + @Test + @DisplayName( + """ + Når getExtension kjøres + Hvis metadata mangler MIME-type + Så: + returneres null + """ + ) + fun getExtension_returns_null_without_mime() = runTest { + val outDir = tempDir() + val client = client(outDir) + + val tempFile = File(outDir, "dummy").apply { writeText("irrelevant") } + + val metadata = DownloadClient.DownloadMetadata( + uri = URI("http://example.com/file"), + mimeType = null, + length = 10 + ) + + val ext = client.getExtension(tempFile, metadata) + + assertNull(ext) + } + +}