Test coverage + some rework

This commit is contained in:
Brage Skjønborg 2026-01-03 03:04:02 +01:00
parent 6d615abb0d
commit 6bc2ade681
13 changed files with 934 additions and 98 deletions

View File

@ -10,15 +10,16 @@ import no.iktdev.mediaprocessing.coordinator.CoordinatorEnv
import no.iktdev.mediaprocessing.shared.common.DownloadClient import no.iktdev.mediaprocessing.shared.common.DownloadClient
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask
import no.iktdev.mediaprocessing.shared.common.notExist
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import java.util.UUID import java.util.*
@Component @Component
class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) { class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override fun getWorkerId(): String { override fun getWorkerId(): String {
return "${this::class.java.simpleName}-${TaskType.CPU_INTENSIVE}-${UUID.randomUUID()}" return "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
} }
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {
@ -30,11 +31,15 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) {
log.info { "Downloading cover from ${pickedTask.data.url}" } log.info { "Downloading cover from ${pickedTask.data.url}" }
val taskData = pickedTask.data val taskData = pickedTask.data
val downloadClient = DownloadClient(taskData.url, CoordinatorEnv.cachedContent, taskData.outputFileName) val downloadClient = getDownloadClient()
val downloadedFile = downloadClient.download() val downloadResult = try {
downloadClient.download(taskData.url, taskData.outputFileName)
} catch (e: Exception) {
return CoverDownloadResultEvent(status = TaskStatus.Failed)
}
val downloadedFile = downloadResult.result
if (downloadResult.success && downloadedFile != null) {
if (downloadedFile?.exists() == true) {
log.info { "Downloaded cover to ${downloadedFile.absolutePath}" } log.info { "Downloaded cover to ${downloadedFile.absolutePath}" }
return CoverDownloadResultEvent( return CoverDownloadResultEvent(
status = TaskStatus.Completed, status = TaskStatus.Completed,
@ -51,5 +56,20 @@ class DownloadCoverTaskListener: TaskListener(TaskType.MIXED) {
} }
} }
open fun getDownloadClient(): DownloadClient {
return DefaultDownloadClient()
}
class DefaultDownloadClient() : DownloadClient(
outDir = CoordinatorEnv.cachedContent,
connectionFactory = DefaultConnectionFactory(),) {
override fun onCreate() {
super.onCreate()
if (outDir.notExist()) {
outDir.mkdirs()
}
}
}
} }

View File

@ -10,14 +10,14 @@ import no.iktdev.mediaprocessing.ffmpeg.FFprobe
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import java.util.UUID import java.util.*
@Component @Component
class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) { class MediaStreamReadTaskListener: FfprobeTaskListener(TaskType.CPU_INTENSIVE) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
override fun getWorkerId(): String { override fun getWorkerId(): String {
return "${this::class.java.simpleName}-${TaskType.CPU_INTENSIVE}-${UUID.randomUUID()}" return "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
} }
override fun supports(task: Task): Boolean { override fun supports(task: Task): Boolean {

View File

@ -5,13 +5,15 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskListener import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskType import no.iktdev.eventi.tasks.TaskType
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
import org.jetbrains.annotations.VisibleForTesting
import org.springframework.stereotype.Component import org.springframework.stereotype.Component
import java.io.File import java.io.File
import java.nio.file.Files import java.nio.file.Files
import java.util.UUID import java.util.*
@Component @Component
class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) { class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
@ -26,18 +28,26 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
override suspend fun onTask(task: Task): Event? { override suspend fun onTask(task: Task): Event? {
val pickedTask = task as? MigrateToContentStoreTask ?: return null val pickedTask = task as? MigrateToContentStoreTask ?: return null
val videoStatus = migrateVideo(pickedTask.data.videoContent) val fs = getFileSystemService()
val subtitleStatus = migrateSubtitle(pickedTask.data.subtitleContent ?: emptyList())
val coverStatus = migrateCover(pickedTask.data.coverContent ?: emptyList()) val videoStatus = migrateVideo(fs, pickedTask.data.videoContent)
val subtitleStatus = migrateSubtitle(fs, pickedTask.data.subtitleContent ?: emptyList())
val coverStatus = migrateCover(fs, pickedTask.data.coverContent ?: emptyList())
var status = TaskStatus.Completed var status = TaskStatus.Completed
if (videoStatus.status != MigrateStatus.Failed && if (videoStatus.status != MigrateStatus.Failed &&
subtitleStatus.none { it.status == MigrateStatus.Failed } && subtitleStatus.none { it.status == MigrateStatus.Failed } &&
coverStatus.none { it.status == MigrateStatus.Failed }) coverStatus.none { it.status == MigrateStatus.Failed })
{ {
pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.deleteOnExit() pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.let {
pickedTask.data.subtitleContent?.forEach { File(it.cachedUri).deleteOnExit() } fs.delete(it)
pickedTask.data.coverContent?.forEach { File(it.cachedUri).deleteOnExit() } }
pickedTask.data.subtitleContent?.map { File(it.cachedUri) }?.forEach {
fs.delete(it)
}
pickedTask.data.coverContent?.map { File(it.cachedUri) }?.forEach {
fs.delete(it)
}
} else { } else {
status = TaskStatus.Failed status = TaskStatus.Failed
} }
@ -54,32 +64,43 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
return completedEvent return completedEvent
} }
private fun migrateVideo(videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration { @VisibleForTesting
internal fun migrateVideo(fs: FileSystemService, videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration {
if (videoContent == null) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent) if (videoContent == null) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)
val source = File(videoContent.cachedUri) val source = File(videoContent.cachedUri)
val destination = File(videoContent.storeUri) val destination = File(videoContent.storeUri)
return try { return try {
source.copyTo(destination, overwrite = true) if (!fs.copy(source, destination)) {
val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L
if (!identical) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
} }
if (!fs.areIdentical(source, destination)) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
}
MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed) MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed)
} catch (e: Exception) { } catch (e: Exception) {
MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed) MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
} }
} }
private fun migrateSubtitle(subtitleContents: List<MigrateToContentStoreTask.Data.SingleSubtitle>): List<MigrateContentToStoreTaskResultEvent.SubtitleMigration> { @VisibleForTesting
internal fun migrateSubtitle(
fs: FileSystemService,
subtitleContents: List<MigrateToContentStoreTask.Data.SingleSubtitle>
): List<MigrateContentToStoreTaskResultEvent.SubtitleMigration> {
if (subtitleContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.SubtitleMigration(null, null, MigrateStatus.NotPresent)) if (subtitleContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.SubtitleMigration(null, null, MigrateStatus.NotPresent))
val results = mutableListOf<MigrateContentToStoreTaskResultEvent.SubtitleMigration>() val results = mutableListOf<MigrateContentToStoreTaskResultEvent.SubtitleMigration>()
for (subtitle in subtitleContents) { for (subtitle in subtitleContents) {
val source = File(subtitle.cachedUri) val source = File(subtitle.cachedUri)
val destination = File(subtitle.storeUri) val destination = File(subtitle.storeUri)
try { try {
source.copyTo(destination, overwrite = true) if (!fs.copy(source, destination)) {
val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed))
if (!identical) { continue
}
if (!fs.areIdentical(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed)) results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed))
} else { } else {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Completed)) results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Completed))
@ -91,16 +112,19 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
return results return results
} }
private fun migrateCover(coverContents: List<MigrateToContentStoreTask.Data.SingleContent>): List<MigrateContentToStoreTaskResultEvent.FileMigration> { @VisibleForTesting
internal fun migrateCover(fs: FileSystemService, coverContents: List<MigrateToContentStoreTask.Data.SingleContent>): List<MigrateContentToStoreTaskResultEvent.FileMigration> {
if (coverContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)) if (coverContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent))
val results = mutableListOf<MigrateContentToStoreTaskResultEvent.FileMigration>() val results = mutableListOf<MigrateContentToStoreTaskResultEvent.FileMigration>()
for (cover in coverContents) { for (cover in coverContents) {
val source = File(cover.cachedUri) val source = File(cover.cachedUri)
val destination = File(cover.storeUri) val destination = File(cover.storeUri)
try { try {
source.copyTo(destination, overwrite = true) if (!fs.copy(source, destination)) {
val identical = Files.mismatch(source.toPath(), destination.toPath()) == -1L results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed))
if (!identical) { continue
}
if (!fs.areIdentical(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed)) results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed))
} else { } else {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed)) results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed))
@ -111,4 +135,28 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
} }
return results return results
} }
open fun getFileSystemService(): FileSystemService {
return DefaultFileSystemService()
}
class DefaultFileSystemService : FileSystemService {
override fun copy(source: File, destination: File): Boolean {
return try {
source.copyTo(destination, overwrite = true)
true
} catch (e: Exception) {
false
}
}
override fun areIdentical(a: File, b: File): Boolean {
return Files.mismatch(a.toPath(), b.toPath()) == -1L
}
override fun delete(file: File) {
file.delete()
}
}
} }

View File

@ -0,0 +1,9 @@
package no.iktdev.mediaprocessing.coordinator.util
import java.io.File
interface FileSystemService {
fun copy(source: File, destination: File): Boolean
fun areIdentical(a: File, b: File): Boolean
fun delete(file: File)
}

View File

@ -0,0 +1,32 @@
package no.iktdev.mediaprocessing
import kotlinx.coroutines.delay
import no.iktdev.mediaprocessing.shared.common.DownloadClient
import java.io.File
import java.net.HttpURLConnection
import java.net.URI
class MockDownloadClient(
private val delayMillis: Long = 0,
private val throwException: Boolean = false,
private val mockFile: File? = null
) : DownloadClient(
outDir = File("/null"),
connectionFactory = object : ConnectionFactory {
override fun open(uri: URI): HttpURLConnection {
throw UnsupportedOperationException("MockDownloadClient does not open real connections")
}
}
) {
override suspend fun download(useUrl: String, useBaseName: String): DownloadResult {
if (delayMillis > 0) delay(delayMillis)
if (throwException) throw RuntimeException("Simulated download failure")
return if (mockFile != null) {
DownloadResult(success = true, result = mockFile, error = null)
} else {
DownloadResult(success = false, result = null, error = "No mock file configured")
}
}
}

View File

@ -21,8 +21,9 @@ class MockFFprobe(
} }
companion object { companion object {
fun success(json: JsonObject) = MockFFprobe( fun success(json: JsonObject, delay: Long = 0) = MockFFprobe(
result = FFinfoOutput(success = true, data = json, error = null) result = FFinfoOutput(success = true, data = json, error = null),
delayMillis = delay
) )
fun failure(errorMsg: String) = MockFFprobe( fun failure(errorMsg: String) = MockFFprobe(
result = FFinfoOutput(success = false, data = null, error = errorMsg) result = FFinfoOutput(success = false, data = null, error = errorMsg)

View File

@ -0,0 +1,24 @@
package no.iktdev.mediaprocessing
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import java.io.File
class MockFileSystemService : FileSystemService {
var copyShouldFail = false
var identical = true
val copied = mutableListOf<Pair<File, File>>()
val deleted = mutableListOf<File>()
override fun copy(source: File, destination: File): Boolean {
copied += source to destination
return !copyShouldFail
}
override fun areIdentical(a: File, b: File): Boolean {
return identical
}
override fun delete(file: File) {
deleted += file
}
}

View File

@ -0,0 +1,194 @@
package no.iktdev.mediaprocessing.coordinator.listeners.tasks
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockDownloadClient
import no.iktdev.mediaprocessing.TestBase
import no.iktdev.mediaprocessing.shared.common.DownloadClient
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoverDownloadResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.CoverDownloadTask
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.io.File
import java.util.*
import kotlin.system.measureTimeMillis
class DownloadCoverTaskListenerTest {
class DownloadCoverTaskListenerTestImplementation : DownloadCoverTaskListener() {
fun getJob() = currentJob
lateinit var client: DownloadClient
override fun getDownloadClient(): DownloadClient = client
private var _result: Event? = null
fun getResult(): Event? = _result
override fun onComplete(task: Task, result: Event?) {
super.onComplete(task, result)
this._result = result
}
}
private val overrideReporter = object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markConsumed(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {}
}
private var listener = DownloadCoverTaskListenerTestImplementation()
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis nedlasting tar tid
:
venter listener til jobben er ferdig og returnerer Completed-event
"""
)
fun onTask_waits_for_runner_to_complete() = runTest {
val delay = 1000L
val task = CoverDownloadTask(
CoverDownloadTask.CoverDownloadData(
url = "http://example.com/fancy.jpg",
outputFileName = "potatoland",
source = "fancy"
)
).newReferenceId()
listener = DownloadCoverTaskListenerTestImplementation().apply {
this.client = MockDownloadClient(
delayMillis = delay,
mockFile = File("/tmp/fancy.jpg")
)
}
val time = measureTimeMillis {
listener.accept(task, overrideReporter)
listener.getJob()?.join()
val event = listener.getResult()
assertTrue(event is CoverDownloadResultEvent)
assertEquals(TaskStatus.Completed, (event as CoverDownloadResultEvent).status)
}
assertTrue(time >= delay, "Expected at least $delay ms, got $time ms")
assertTrue(time <= delay * 2, "Expected less than ${delay * 2} ms, got $time ms")
}
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis download-klienten kaster en exception
:
returneres Failed-event
"""
)
fun onTask_returns_failed_on_exception() = runTest {
val task = CoverDownloadTask(
CoverDownloadTask.CoverDownloadData(
url = "http://example.com/fancy.jpg",
outputFileName = "potatoland",
source = "fancy"
)
).newReferenceId()
listener = DownloadCoverTaskListenerTestImplementation().apply {
this.client = MockDownloadClient(throwException = true)
}
listener.accept(task, overrideReporter)
listener.getJob()?.join()
val event = listener.getResult()
assertTrue(event is CoverDownloadResultEvent)
assertEquals(TaskStatus.Failed, (event as CoverDownloadResultEvent).status)
}
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis task ikke er av typen CoverDownloadTask
:
returneres null
"""
)
fun onTask_returns_null_for_unsupported_task() = runTest {
val event = listener.onTask(TestBase.DummyTask()) // fake unsupported task
assertNull(event)
}
@Test
@DisplayName(
"""
Når onTask produserer event
Hvis nedlasting lykkes
:
inneholder event korrekt filsti
"""
)
fun onTask_produces_correct_output_path() = runTest {
val mockFile = File("/tmp/expected.jpg")
listener = DownloadCoverTaskListenerTestImplementation().apply {
this.client = MockDownloadClient(mockFile = mockFile)
}
val task = CoverDownloadTask(
CoverDownloadTask.CoverDownloadData(
url = "http://example.com/img.jpg",
outputFileName = "expected",
source = "unit-test"
)
).newReferenceId()
listener.accept(task, overrideReporter)
listener.getJob()?.join()
val event = listener.getResult() as CoverDownloadResultEvent
assertEquals(mockFile.absolutePath, event.data!!.outputFile)
}
@Test
@DisplayName(
"""
Når accept kalles
Hvis nedlasting skjer asynkront
:
blokkerer ikke tråden
"""
)
fun accept_is_non_blocking() = runTest {
val delay = 500L
listener = DownloadCoverTaskListenerTestImplementation().apply {
this.client = MockDownloadClient(delayMillis = delay, mockFile = File("/tmp/x.jpg"))
}
val task = CoverDownloadTask(
CoverDownloadTask.CoverDownloadData(
url = "http://example.com/img.jpg",
outputFileName = "x",
source = "unit-test"
)
).newReferenceId()
val time = measureTimeMillis {
listener.accept(task, overrideReporter)
// intentionally NOT joining here
}
assertTrue(time < 50, "accept() should return immediately, got $time ms")
}
}

View File

@ -4,27 +4,90 @@ import com.google.gson.JsonObject
import io.mockk.mockk import io.mockk.mockk
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.runTest import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockFFprobe import no.iktdev.mediaprocessing.MockFFprobe
import no.iktdev.mediaprocessing.ffmpeg.FFprobe import no.iktdev.mediaprocessing.ffmpeg.FFprobe
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CoordinatorReadStreamsResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MediaReadTask
import org.junit.jupiter.api.Assertions.* import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test import org.junit.jupiter.api.Test
import java.util.*
import kotlin.system.measureTimeMillis
class MediaStreamReadTaskListenerTest { class MediaStreamReadTaskListenerTest {
class MediaStreamReadTaskListenerTestImplementation(): MediaStreamReadTaskListener() { class MediaStreamReadTaskListenerTestImplementation(): MediaStreamReadTaskListener() {
fun getJob() = currentJob
lateinit var probe: FFprobe lateinit var probe: FFprobe
override fun getFfprobe(): FFprobe { override fun getFfprobe(): FFprobe {
return probe return probe
} }
private var _result: Event? = null
fun getResult(): Event? {
return _result
}
override fun onComplete(task: Task, result: Event?) {
super.onComplete(task, result)
this._result = result
}
}
val overrideReporter = object : TaskReporter {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markConsumed(taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {
}
}
var listener = MediaStreamReadTaskListenerTestImplementation()
@BeforeEach
fun resetListener() {
listener = MediaStreamReadTaskListenerTestImplementation()
}
@Test
fun `onTask waits for runner to complete`() = runTest {
val delay = 1000L
val json = JsonObject().apply { addProperty("codec_type", "video") }
val task = MediaReadTask(fileUri = "test.mp4").newReferenceId()
listener = MediaStreamReadTaskListenerTestImplementation().apply {
this.probe = MockFFprobe.success(json, delay)
}
val time = measureTimeMillis {
listener.accept(task, overrideReporter)
listener.getJob()?.join()
val event = listener.getResult()
assertTrue(event is CoordinatorReadStreamsResultEvent)
val result = event as CoordinatorReadStreamsResultEvent
assertEquals(json, result.data)
assertEquals(TaskStatus.Completed, result.status)
assertEquals("test.mp4", (listener.probe as MockFFprobe).lastInputFile)
assertEquals(TaskStatus.Completed, (event as CoordinatorReadStreamsResultEvent).status)
}
assertTrue(time >= delay, "Expected onTask to wait at least $delay ms, waited for $time ms")
assertTrue(time <= (delay*2), "Expected onTask to wait less than ${(delay*2)} ms, waited for $time ms")
} }
private val listener = MediaStreamReadTaskListenerTestImplementation()
@Test @Test
@DisplayName( @DisplayName(
@ -84,7 +147,6 @@ class MediaStreamReadTaskListenerTest {
Skal MediaStreamReadEvent produseres med data Skal MediaStreamReadEvent produseres med data
""") """)
fun verifyEventProducedOnValidJson() = runTest { fun verifyEventProducedOnValidJson() = runTest {
val listener = MediaStreamReadTaskListenerTestImplementation()
val json = JsonObject().apply { addProperty("codec_type", "video") } val json = JsonObject().apply { addProperty("codec_type", "video") }
listener.probe = MockFFprobe.success(json) listener.probe = MockFFprobe.success(json)
@ -107,7 +169,6 @@ class MediaStreamReadTaskListenerTest {
Skal onTask returnere null og ikke kaste unntak Skal onTask returnere null og ikke kaste unntak
""") """)
fun verifyNullOnParsingError() = runTest { fun verifyNullOnParsingError() = runTest {
val listener = MediaStreamReadTaskListenerTestImplementation()
listener.probe = MockFFprobe.failure("Could not parse") listener.probe = MockFFprobe.failure("Could not parse")
val task = MediaReadTask(fileUri = "corrupt.mp4").newReferenceId() val task = MediaReadTask(fileUri = "corrupt.mp4").newReferenceId()
@ -127,7 +188,6 @@ class MediaStreamReadTaskListenerTest {
Skal onTask returnere null og logge feilen Skal onTask returnere null og logge feilen
""") """)
fun verifyExceptionHandling() = runTest { fun verifyExceptionHandling() = runTest {
val listener = MediaStreamReadTaskListenerTestImplementation()
listener.probe = MockFFprobe.exception() listener.probe = MockFFprobe.exception()
val task = MediaReadTask(fileUri = "broken.mp4").newReferenceId() val task = MediaReadTask(fileUri = "broken.mp4").newReferenceId()
@ -146,7 +206,6 @@ class MediaStreamReadTaskListenerTest {
Skal supports returnere false og onTask returnere null Skal supports returnere false og onTask returnere null
""") """)
fun verifySupportsOnlyMediaReadTask() = runTest { fun verifySupportsOnlyMediaReadTask() = runTest {
val listener = MediaStreamReadTaskListenerTestImplementation()
listener.probe = MockFFprobe.failure("Not used") listener.probe = MockFFprobe.failure("Not used")
val otherTask = object : Task() {}.newReferenceId() val otherTask = object : Task() {}.newReferenceId()

View File

@ -0,0 +1,247 @@
package no.iktdev.mediaprocessing.coordinator.listeners.tasks
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.mediaprocessing.MockFileSystemService
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
class MigrateContentToStoreTaskListenerTest {
class MigrateContentToStoreTaskListenerTestImplementation: MigrateContentToStoreTaskListener() {
var fs: FileSystemService? = null
override fun getFileSystemService(): FileSystemService {
return fs!!
}
}
val listener = MigrateContentToStoreTaskListenerTestImplementation()
// -------------------------------------------------------------------------
// migrateVideo
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Når migrateVideo kjøres
Hvis copy lykkes og filene er identiske
:
returneres Completed
"""
)
fun migrateVideo_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Completed, result.status)
assertEquals("/tmp/dest.mp4", result.storedUri)
assertEquals(1, fs.copied.size)
}
@Test
@DisplayName(
"""
Når migrateVideo kjøres
Hvis copy feiler
:
returneres Failed
"""
)
fun migrateVideo_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also {
listener.fs = it
}
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Failed, result.status)
}
@Test
@DisplayName(
"""
Når migrateVideo kjøres
Hvis copy lykkes men filene ikke er identiske
:
returneres Failed
"""
)
fun migrateVideo_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also {
listener.fs = it
}
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Failed, result.status)
}
// -------------------------------------------------------------------------
// migrateSubtitle
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Når migrateSubtitle kjøres
Hvis listen er tom
:
returneres NotPresent
"""
)
fun migrateSubtitle_empty() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val result = listener.migrateSubtitle(fs, emptyList())
assertEquals(1, result.size)
assertEquals(MigrateStatus.NotPresent, result.first().status)
}
@Test
@DisplayName(
"""
Når migrateSubtitle kjøres
Hvis copy lykkes og filene er identiske
:
returneres Completed
"""
)
fun migrateSubtitle_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val sub = MigrateToContentStoreTask.Data.SingleSubtitle(
language = "en",
cachedUri = "/tmp/a.srt",
storeUri = "/tmp/b.srt"
)
val result = listener.migrateSubtitle(fs, listOf(sub))
assertEquals(MigrateStatus.Completed, result.first().status)
}
// -------------------------------------------------------------------------
// migrateCover
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Når migrateCover kjøres
Hvis copy lykkes og filene er identiske
:
returneres Completed
"""
)
fun migrateCover_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val cover = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/c.jpg",
storeUri = "/tmp/c2.jpg"
)
val result = listener.migrateCover(fs, listOf(cover))
assertEquals(MigrateStatus.Completed, result.first().status)
}
// -------------------------------------------------------------------------
// onTask
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis alle migreringer lykkes
:
returneres Completed-event og cache slettes
"""
)
fun onTask_success() = runTest {
val fs = MockFileSystemService().also {
listener.fs = it
}
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = listOf(
MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/s", "/tmp/s2")
),
coverContent = listOf(
MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
)
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
assertEquals(TaskStatus.Completed, event.status)
assertEquals(3, fs.deleted.size)
}
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis en migrering feiler
:
returneres Failed-event og ingenting slettes
"""
)
fun onTask_failure() = runTest {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also {
listener.fs = it
}
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = emptyList(),
coverContent = emptyList()
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
assertEquals(TaskStatus.Failed, event.status)
assertEquals(0, fs.deleted.size)
}
}

View File

@ -63,18 +63,21 @@ dependencies {
implementation(project(":shared:ffmpeg")) implementation(project(":shared:ffmpeg"))
implementation("no.iktdev:eventi:1.0-rc16") implementation("no.iktdev:eventi:1.0-rc16")
testImplementation(kotlin("test"))
testImplementation(platform("org.junit:junit-bom:5.10.0")) testImplementation(platform("org.junit:junit-bom:5.10.0"))
testImplementation("org.junit.jupiter:junit-jupiter") testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("io.mockk:mockk:1.12.0")
implementation("com.h2database:h2:2.2.220") implementation("com.h2database:h2:2.2.220")
testImplementation("org.assertj:assertj-core:3.24.2") testImplementation("org.assertj:assertj-core:3.24.2")
testImplementation("io.kotest:kotest-assertions-core:5.7.2") testImplementation("io.kotest:kotest-assertions-core:5.7.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0") testImplementation("org.jetbrains.kotlinx:kotlinx-serialization-json:1.3.0")
testImplementation("io.github.classgraph:classgraph:4.8.184") testImplementation("io.github.classgraph:classgraph:4.8.184")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
testImplementation("io.mockk:mockk:1.13.9")
} }

View File

@ -8,97 +8,102 @@ import java.io.File
import java.io.FileOutputStream import java.io.FileOutputStream
import java.net.HttpURLConnection import java.net.HttpURLConnection
import java.net.URI import java.net.URI
import java.net.URL import java.nio.file.Files
import java.util.UUID import java.nio.file.StandardCopyOption
import kotlin.apply import java.util.*
import kotlin.io.use
import kotlin.run
import kotlin.text.lastIndexOf
import kotlin.text.substring
import kotlin.to
open class DownloadClient(val url: String, val outDir: File, val baseName: String) { open class DownloadClient(val outDir: File, private val connectionFactory: ConnectionFactory) {
val log = KotlinLogging.logger {} val log = KotlinLogging.logger {}
protected val http: HttpURLConnection = openConnection()
private val BUFFER_SIZE = 4096 private val BUFFER_SIZE = 4096
private fun openConnection(): HttpURLConnection { open fun onCreate() {}
try {
return URI(url).toURL().openConnection() as HttpURLConnection fun HttpURLConnection.getMetadata(): DownloadMetadata {
return DownloadMetadata(
this.url.toURI(),
this.contentType.also {
if (it.isNullOrBlank()) {
log.error { "Unable to determine mime type for $url" }
} else {
log.info { "Downloading file from $url with mime type $it" }
}
},
this.contentLengthLong
)
}
protected fun getProgress(read: Int, total: Int): Int {
return if (total == 0) 0 else ((read * 100) / total)
}
open suspend fun download(useUrl: String, useBaseName: String): DownloadResult {
return try {
val connection = connectionFactory.open(URI(useUrl))
val metadata = connection.getMetadata()
val downloadedFile = downloadFile(connection)
val resultFile = downloadedFile?.let { file ->
finalizeDownload(file, useBaseName, metadata)
}
DownloadResult(resultFile?.exists() == true, resultFile, null)
} catch (e: Exception) { } catch (e: Exception) {
e.printStackTrace() DownloadResult(false, null, e.message)
throw BadAddressException("Provided url is either not provided (null) or is not a valid http url")
} }
} }
protected fun getLength(): Int = http.contentLength open suspend fun downloadFile(useConnection: HttpURLConnection) = withContext(Dispatchers.IO) {
protected fun getProgress(read: Int, total: Int = getLength()): Int {
return ((read * 100) / total)
}
suspend fun download(): File? = withContext(Dispatchers.IO) {
val downloadFile = outDir.using(UUID.randomUUID().toString() + ".downloading") val downloadFile = outDir.using(UUID.randomUUID().toString() + ".downloading")
if (downloadFile.exists()) { if (downloadFile.exists()) {
log.info { "${downloadFile.name} already exists. Download skipped!" } log.info { "${downloadFile.name} already exists. Download skipped!" }
return@withContext null return@withContext null
} }
val inputStream = http.inputStream
val mimeType: String? = http.contentType
if (mimeType == null) {
log.error { "Unable to determine mime type for $url" }
} else {
log.info { "Downloading file from $url with mime type $mimeType" }
}
val fos = FileOutputStream(downloadFile, false)
var totalBytesRead = 0 var totalBytesRead = 0
val buffer = ByteArray(BUFFER_SIZE) val buffer = ByteArray(BUFFER_SIZE)
inputStream.apply { useConnection.inputStream.use { input ->
fos.use { fout -> FileOutputStream(downloadFile).use { output ->
run { var bytesRead = input.read(buffer)
var bytesRead = read(buffer)
while (bytesRead >= 0) { while (bytesRead >= 0) {
fout.write(buffer, 0, bytesRead) output.write(buffer, 0, bytesRead)
totalBytesRead += bytesRead totalBytesRead += bytesRead
bytesRead = read(buffer) bytesRead = input.read(buffer)
// System.out.println(getProgress(totalBytesRead)) // System.out.println(getProgress(totalBytesRead))
} }
} }
} }
}
inputStream.close()
fos.close()
val extension = getExtension(downloadFile, mimeType ?: "") downloadFile
}
open suspend fun finalizeDownload(tempFile: File, baseName: String, metadata: DownloadMetadata): File = withContext(
Dispatchers.IO) {
val extension = getExtension(tempFile, metadata)
?: throw UnsupportedFormatException("Downloaded file does not contain a supported file extension") ?: throw UnsupportedFormatException("Downloaded file does not contain a supported file extension")
val outFile = outDir.using("$baseName.$extension") val outFile = outDir.using("$baseName.$extension")
val renamed = downloadFile.renameTo(outFile)
if (!renamed) { try {
log.error { "Failed to rename ${downloadFile.name} to ${outFile.name}" } Files.move(
throw InvalidFileException("Failed to rename downloaded file") tempFile.toPath(),
outFile.toPath(),
StandardCopyOption.ATOMIC_MOVE
)
} catch (e: Exception) {
log.error { "Failed to atomically move ${tempFile.name} to ${outFile.name}" }
throw InvalidFileException("Failed to finalize downloaded file")
} }
return@withContext outFile outFile
} }
open fun getExtension(outFile: File, mimeType: String): String? {
val extensionFormat = mimeToExtension(mimeType) ?: outFile.getFileType()
if (extensionFormat == null) { open fun getExtension(outFile: File, metadata: DownloadMetadata): String? {
val possiblyExtension = url.lastIndexOf(".") + 1 return mimeToExtension(metadata.mimeType)
if (possiblyExtension > 1) { ?: outFile.getFileType()
return url.substring(possiblyExtension)
}
}
return null
} }
fun mimeToExtension(mimeType: String): String? {
fun mimeToExtension(mimeType: String?): String? {
return when (mimeType) { return when (mimeType) {
"image/png" -> "png" "image/png" -> "png"
"image/jpg", "image/jpeg" -> "jpg" "image/jpg", "image/jpeg" -> "jpg"
@ -160,4 +165,32 @@ open class DownloadClient(val url: String, val outDir: File, val baseName: Strin
constructor(message: String?) : super(message) {} constructor(message: String?) : super(message) {}
constructor(message: String?, cause: Throwable?) : super(message, cause) {} constructor(message: String?, cause: Throwable?) : super(message, cause) {}
} }
data class DownloadMetadata(
val uri: URI,
val mimeType: String?,
val length: Long
)
data class DownloadResult(
val success: Boolean,
val result: File? = null,
val error: String? = null
)
interface ConnectionFactory {
fun open(uri: URI): HttpURLConnection
}
class DefaultConnectionFactory : ConnectionFactory {
override fun open(uri: URI): HttpURLConnection {
try {
return uri.toURL().openConnection() as HttpURLConnection
} catch (e: Exception) {
e.printStackTrace()
throw BadAddressException("Provided url is either not provided (null) or is not a valid http url")
}
}
}
} }

View File

@ -0,0 +1,166 @@
package no.iktdev.mediaprocessing.shared.common
import io.mockk.every
import io.mockk.mockk
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import java.io.File
import java.net.HttpURLConnection
import java.net.URI
import kotlin.io.path.createTempDirectory
import kotlin.test.assertFailsWith
class DownloadClientTest {
private fun tempDir(): File = createTempDirectory().toFile()
private fun fakeConnection(data: ByteArray): HttpURLConnection {
val mock = mockk<HttpURLConnection>()
every { mock.inputStream } returns data.inputStream()
return mock
}
private fun fakeMetadata(ext: String = "jpg") = DownloadClient.DownloadMetadata(
uri = URI("http://example.com/file.$ext"),
mimeType = "image/$ext",
length = 10
)
private fun client(outDir: File) = object : DownloadClient(
outDir = outDir,
connectionFactory = mockk()
) {}
// ------------------------------------------------------------
// 1. downloadFile
// ------------------------------------------------------------
@Test
@DisplayName(
"""
Når downloadFile kjøres
Hvis input stream inneholder bytes
:
skrives filen til disk og returneres
"""
)
fun downloadFile_writes_file() = runTest {
val outDir = tempDir()
val data = "hello".toByteArray()
val connection = fakeConnection(data)
val client = client(outDir)
val file = client.downloadFile(connection)
assertNotNull(file)
assertTrue(file!!.exists())
assertEquals("hello", file.readText())
}
// ------------------------------------------------------------
// 2. finalizeDownload (happy path)
// ------------------------------------------------------------
@Test
@DisplayName(
"""
Når finalizeDownload kjøres
Hvis atomic move lykkes
:
flyttes temp-filen til endelig filnavn og temp-filen slettes
"""
)
fun finalizeDownload_moves_file_atomically() = runTest {
val outDir = tempDir()
val tempFile = File(outDir, "temp.downloading").apply { writeText("hello") }
val client = client(outDir)
val metadata = fakeMetadata("jpg")
val result = client.finalizeDownload(tempFile, "final", metadata)
assertTrue(result.exists())
assertEquals("final.jpg", result.name)
assertFalse(tempFile.exists())
}
// ------------------------------------------------------------
// 3. finalizeDownload (move failure)
// ------------------------------------------------------------
@Test
@DisplayName(
"""
Når finalizeDownload kjøres
Hvis atomic move feiler
:
kastes InvalidFileException
"""
)
fun finalizeDownload_throws_on_failure() = runTest {
val outDir = tempDir()
val tempFile = File(outDir, "temp.downloading").apply { writeText("hello") }
// Gjør katalogen skrivebeskyttet for å tvinge move-feil
outDir.setWritable(false)
val client = client(outDir)
val metadata = fakeMetadata("jpg")
assertFailsWith<DownloadClient.InvalidFileException> {
client.finalizeDownload(tempFile, "final", metadata)
}
}
// ------------------------------------------------------------
// 5. getExtension
// ------------------------------------------------------------
@Test
@DisplayName(
"""
Når getExtension kjøres
Hvis metadata inneholder MIME-type
:
returneres riktig filendelse
"""
)
fun getExtension_from_metadata() = runTest {
val outDir = tempDir()
val client = client(outDir)
val ext = client.getExtension(File("x"), fakeMetadata("png"))
assertEquals("png", ext)
}
@Test
@DisplayName(
"""
Når getExtension kjøres
Hvis metadata mangler MIME-type
:
returneres null
"""
)
fun getExtension_returns_null_without_mime() = runTest {
val outDir = tempDir()
val client = client(outDir)
val tempFile = File(outDir, "dummy").apply { writeText("irrelevant") }
val metadata = DownloadClient.DownloadMetadata(
uri = URI("http://example.com/file"),
mimeType = null,
length = 10
)
val ext = client.getExtension(tempFile, metadata)
assertNull(ext)
}
}