Fixed state reporting and producing

This commit is contained in:
Brage Skjønborg 2026-01-31 20:53:10 +01:00
parent 6dab106f2a
commit ed6bddb95f
22 changed files with 554 additions and 318 deletions

View File

@ -53,11 +53,11 @@ class DefaultTaskReporter() : TaskReporter {
}
override fun markFailed(taskId: UUID) {
override fun markFailed(referenceId: UUID, taskId: UUID) {
TaskStore.markConsumed(taskId, TaskStatus.Failed)
}
override fun markCancelled(taskId: UUID) {
override fun markCancelled(referenceId: UUID, taskId: UUID) {
TaskStore.markConsumed(taskId, TaskStatus.Cancelled)
}

View File

@ -33,6 +33,7 @@ open class ConvertTaskListener(): TaskListener(TaskType.CPU_INTENSIVE) {
return task is ConvertTask
}
override suspend fun onTask(task: Task): Event? {
if (task !is ConvertTask) {
throw IllegalArgumentException("Invalid task type: ${task::class.java.name}")
@ -68,6 +69,19 @@ open class ConvertTaskListener(): TaskListener(TaskType.CPU_INTENSIVE) {
}
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return ConvertTaskResultEvent(null, status, error = message)
}
open fun getConverter(): Converter {
return Converter2(getConverterEnvironment(), getListener())
}

View File

@ -108,8 +108,8 @@ class ConvertTaskListenerTest {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun markFailed(referenceId: UUID, taskId: UUID) {}
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {

View File

@ -52,11 +52,11 @@ class DefaultTaskReporter() : TaskReporter {
TaskStore.markConsumed(taskId, TaskStatus.Completed)
}
override fun markFailed(taskId: UUID) {
override fun markFailed(referenceId: UUID, taskId: UUID) {
TaskStore.markConsumed(taskId, TaskStatus.Failed)
}
override fun markCancelled(taskId: UUID) {
override fun markCancelled(referenceId: UUID, taskId: UUID) {
TaskStore.markConsumed(taskId, TaskStatus.Cancelled)
}

View File

@ -58,6 +58,19 @@ class DownloadCoverTaskListener(
}
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return CoverDownloadResultEvent(null, status, error = message)
}
open fun getDownloadClient(): DownloadClient {
return DefaultDownloadClient(coordinatorEnv)
}

View File

@ -49,6 +49,19 @@ class MediaStreamReadTaskListener(
}
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return CoordinatorReadStreamsResultEvent(null, status, error = message)
}
override fun getFfprobe(): FFprobe {
return JsonFfinfo(coordinatorEnv.ffprobe)
}

View File

@ -28,61 +28,89 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
override suspend fun onTask(task: Task): Event? {
val pickedTask = task as? MigrateToContentStoreTask ?: return null
val fs = getFileSystemService()
// Disse vil kaste exceptions hvis noe går galt
val videoStatus = migrateVideo(fs, pickedTask.data.videoContent)
val subtitleStatus = migrateSubtitle(fs, pickedTask.data.subtitleContent ?: emptyList())
val coverStatus = migrateCover(fs, pickedTask.data.coverContent ?: emptyList())
var status = TaskStatus.Completed
if (videoStatus.status != MigrateStatus.Failed &&
subtitleStatus.none { it.status == MigrateStatus.Failed } &&
coverStatus.none { it.status == MigrateStatus.Failed })
{
pickedTask.data.videoContent?.cachedUri?.let { File(it) }?.let {
silentTry { fs.delete(it) }
}
pickedTask.data.subtitleContent?.map { File(it.cachedUri) }?.forEach {
silentTry { fs.delete(it) }
}
pickedTask.data.coverContent?.map { File(it.cachedUri) }?.forEach {
silentTry { fs.delete(it) }
}
} else {
status = TaskStatus.Failed
}
// Hvis vi kommer hit, har ingen migrering kastet exceptions → alt OK
deleteCache(fs, pickedTask)
val completedEvent = MigrateContentToStoreTaskResultEvent(
status = status,
return MigrateContentToStoreTaskResultEvent(
status = TaskStatus.Completed,
migrateData = MigrateContentToStoreTaskResultEvent.MigrateData(
collection = pickedTask.data.collection,
videoMigrate = videoStatus,
subtitleMigrate = subtitleStatus,
coverMigrate = coverStatus
)
).producedFrom(task)
return completedEvent
}
@VisibleForTesting
internal fun migrateVideo(fs: FileSystemService, videoContent: MigrateToContentStoreTask.Data.SingleContent?): MigrateContentToStoreTaskResultEvent.FileMigration {
if (videoContent == null) return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return MigrateContentToStoreTaskResultEvent(null, status, error = message)
}
private fun deleteCache(fs: FileSystemService, task: MigrateToContentStoreTask) {
task.data.videoContent?.cachedUri?.let { silentTry { fs.delete(File(it)) } }
task.data.subtitleContent?.forEach { silentTry { fs.delete(File(it.cachedUri)) } }
task.data.coverContent?.forEach { silentTry { fs.delete(File(it.cachedUri)) } }
}
internal fun migrateVideo(
fs: FileSystemService,
videoContent: MigrateToContentStoreTask.Data.SingleContent?
): MigrateContentToStoreTaskResultEvent.FileMigration {
if (videoContent == null) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)
}
val source = File(videoContent.cachedUri)
val destination = File(videoContent.storeUri)
return try {
// 1. Hvis destinasjonen finnes, sjekk identitet
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
// Skip allerede migrert
return MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
} else {
throw IllegalStateException(
"Destination file already exists but is not identical: $destination"
)
}
}
// 2. Utfør kopiering
if (!fs.copy(source, destination)) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
throw IllegalStateException("File could not be copied to: $destination from $source")
}
// 3. Verifiser kopien (optional)
if (!fs.areIdentical(source, destination)) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
throw IllegalStateException("Copied file is not identical to source: $destination")
}
MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed)
} catch (e: Exception) {
MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.Failed)
}
return MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
}
@VisibleForTesting
@ -90,53 +118,116 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
fs: FileSystemService,
subtitleContents: List<MigrateToContentStoreTask.Data.SingleSubtitle>
): List<MigrateContentToStoreTaskResultEvent.SubtitleMigration> {
if (subtitleContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.SubtitleMigration(null, null, MigrateStatus.NotPresent))
val results = mutableListOf<MigrateContentToStoreTaskResultEvent.SubtitleMigration>()
for (subtitle in subtitleContents) {
val source = File(subtitle.cachedUri)
val destination = File(subtitle.storeUri)
try {
if (!fs.copy(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed))
continue
if (subtitleContents.isEmpty()) {
return listOf(
MigrateContentToStoreTaskResultEvent.SubtitleMigration(
language = null,
storedUri = null,
status = MigrateStatus.NotPresent
)
)
}
if (!fs.areIdentical(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language, destination.absolutePath, MigrateStatus.Failed))
return subtitleContents.map { subtitle ->
val source = File(subtitle.cachedUri)
val destination = File(subtitle.storeUri)
// 1. Hvis destinasjonen finnes
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
return@map MigrateContentToStoreTaskResultEvent.SubtitleMigration(
subtitle.language,
destination.absolutePath,
MigrateStatus.Completed
)
} else {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Completed))
}
} catch (e: Exception) {
results.add(MigrateContentToStoreTaskResultEvent.SubtitleMigration(subtitle.language,destination.absolutePath, MigrateStatus.Failed))
throw IllegalStateException(
"Destination subtitle exists but is not identical: ${destination.absolutePath}"
)
}
}
return results
// 2. Kopier
if (!fs.copy(source, destination)) {
throw IllegalStateException(
"Failed to copy subtitle ${subtitle.language} from $source to $destination"
)
}
// 3. Verifiser
if (!fs.areIdentical(source, destination)) {
throw IllegalStateException(
"Copied subtitle ${subtitle.language} is not identical: ${destination.absolutePath}"
)
}
// 4. OK
MigrateContentToStoreTaskResultEvent.SubtitleMigration(
subtitle.language,
destination.absolutePath,
MigrateStatus.Completed
)
}
}
@VisibleForTesting
internal fun migrateCover(fs: FileSystemService, coverContents: List<MigrateToContentStoreTask.Data.SingleContent>): List<MigrateContentToStoreTaskResultEvent.FileMigration> {
if (coverContents.isEmpty()) return listOf(MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent))
val results = mutableListOf<MigrateContentToStoreTaskResultEvent.FileMigration>()
for (cover in coverContents) {
internal fun migrateCover(
fs: FileSystemService,
coverContents: List<MigrateToContentStoreTask.Data.SingleContent>
): List<MigrateContentToStoreTaskResultEvent.FileMigration> {
if (coverContents.isEmpty()) {
return listOf(
MigrateContentToStoreTaskResultEvent.FileMigration(
storedUri = null,
status = MigrateStatus.NotPresent
)
)
}
return coverContents.map { cover ->
val source = File(cover.cachedUri)
val destination = File(cover.storeUri)
try {
if (!fs.copy(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed))
continue
}
if (!fs.areIdentical(source, destination)) {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed))
// 1. Hvis destinasjonen finnes
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
return@map MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
} else {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Completed))
}
} catch (e: Exception) {
results.add(MigrateContentToStoreTaskResultEvent.FileMigration(destination.absolutePath, MigrateStatus.Failed))
throw IllegalStateException(
"Destination cover exists but is not identical: ${destination.absolutePath}"
)
}
}
return results
// 2. Kopier
if (!fs.copy(source, destination)) {
throw IllegalStateException(
"Failed to copy cover from $source to $destination"
)
}
// 3. Verifiser
if (!fs.areIdentical(source, destination)) {
throw IllegalStateException(
"Copied cover is not identical: ${destination.absolutePath}"
)
}
// 4. OK
MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
}
}
open fun getFileSystemService(): FileSystemService {
return DefaultFileSystemService()
}

View File

@ -6,7 +6,6 @@ import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskType
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.HttpEntity
@ -15,10 +14,11 @@ import org.springframework.http.HttpMethod
import org.springframework.http.MediaType
import org.springframework.stereotype.Component
import org.springframework.web.client.RestTemplate
import java.util.UUID
import java.util.*
@Component
class StoreContentAndMetadataTaskListener: TaskListener(TaskType.MIXED) {
class StoreContentAndMetadataTaskListener : TaskListener(TaskType.MIXED) {
@Autowired
lateinit var streamitRestTemplate: RestTemplate
@ -36,21 +36,38 @@ class StoreContentAndMetadataTaskListener: TaskListener(TaskType.MIXED) {
val headers = HttpHeaders().apply { contentType = MediaType.APPLICATION_JSON }
val entity = HttpEntity(pickedTask.data, headers)
val response = try {
val res = streamitRestTemplate.exchange(
"open/api/mediaprocesser/import",
// ❗ Ikke fang exceptions — la TaskListener håndtere dem
val response = streamitRestTemplate.exchange(
"/open/api/mediaprocesser/import",
HttpMethod.POST,
entity,
Void::class.java,
)
res.statusCode.is2xxSuccessful
} catch (e: Exception) {
false
if (!response.statusCode.is2xxSuccessful) {
throw IllegalStateException("StreamIt returned ${response.statusCode}")
}
// Hvis vi kommer hit → alt OK
return StoreContentAndMetadataTaskResultEvent(
status = TaskStatus.Completed
).producedFrom(task)
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return StoreContentAndMetadataTaskResultEvent(
if (response) TaskStatus.Completed else TaskStatus.Failed
)
status = status,
error = message
).producedFrom(task)
}
}

View File

@ -279,6 +279,7 @@ class MigrateCreateStoreTaskListenerTest : TestBase() {
subtitleUris: List<String>
) = MigrateContentToStoreTaskResultEvent(
status = TaskStatus.Completed,
migrateData = MigrateContentToStoreTaskResultEvent.MigrateData(
collection = collection,
videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration(
storedUri = videoUri,
@ -300,4 +301,5 @@ class MigrateCreateStoreTaskListenerTest : TestBase() {
}
)
)
)
}

View File

@ -153,6 +153,7 @@ class StoreContentAndMetadataListenerTest : TestBase() {
): MigrateContentToStoreTaskResultEvent {
return MigrateContentToStoreTaskResultEvent(
status = status,
migrateData = MigrateContentToStoreTaskResultEvent.MigrateData(
collection = collection,
videoMigrate = MigrateContentToStoreTaskResultEvent.FileMigration(
storedUri = videoUri,
@ -174,6 +175,7 @@ class StoreContentAndMetadataListenerTest : TestBase() {
}
)
)
)
}
class DummyEvent : Event()

View File

@ -39,8 +39,8 @@ class DownloadCoverTaskListenerTest: TestBase() {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun markFailed(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {}

View File

@ -45,8 +45,8 @@ class MediaStreamReadTaskListenerTest: TestBase() {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun markFailed(referenceId: UUID, taskId: UUID) {}
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {

View File

@ -1,30 +1,58 @@
package no.iktdev.mediaprocessing.coordinator.listeners.tasks
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockFileSystemService
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertThrows
import java.util.*
class MigrateContentToStoreTaskListenerTest {
class MigrateContentToStoreTaskListenerTestImplementation: MigrateContentToStoreTaskListener() {
// -------------------------------------------------------------------------
// Fake Reporter
// -------------------------------------------------------------------------
var fs: FileSystemService? = null
override fun getFileSystemService(): FileSystemService {
return fs!!
class FakeTaskReporter : TaskReporter {
val events = mutableListOf<Event>()
var completed = false
var failed = false
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {
events.add(event)
}
}
val listener = MigrateContentToStoreTaskListenerTestImplementation()
// -------------------------------------------------------------------------
// migrateVideo
// Listener with injectable FS
// -------------------------------------------------------------------------
class TestListener : MigrateContentToStoreTaskListener() {
var fs: FileSystemService? = null
override fun getFileSystemService(): FileSystemService = fs!!
}
private val listener = TestListener()
// -------------------------------------------------------------------------
// migrateVideo (direct tests)
// -------------------------------------------------------------------------
@Test
@ -37,19 +65,14 @@ class MigrateContentToStoreTaskListenerTest {
"""
)
fun migrateVideo_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val fs = MockFileSystemService().also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Completed, result.status)
assertEquals("/tmp/dest.mp4", result.storedUri)
assertEquals("/tmp/dest", result.storedUri)
assertEquals(1, fs.copied.size)
}
@ -59,47 +82,36 @@ class MigrateContentToStoreTaskListenerTest {
Når migrateVideo kjøres
Hvis copy feiler
:
returneres Failed
kastes exception
"""
)
fun migrateVideo_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also {
listener.fs = it
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
assertThrows<IllegalStateException> {
listener.migrateVideo(fs, content)
}
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Failed, result.status)
}
@Test
@DisplayName(
"""
Når migrateVideo kjøres
Hvis copy lykkes men filene ikke er identiske
Hvis filene ikke er identiske
:
returneres Failed
kastes exception
"""
)
fun migrateVideo_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also {
listener.fs = it
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
assertThrows<IllegalStateException> {
listener.migrateVideo(fs, content)
}
val content = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/source.mp4",
storeUri = "/tmp/dest.mp4"
)
val result = listener.migrateVideo(fs, content)
assertEquals(MigrateStatus.Failed, result.status)
}
@Test
@ -112,9 +124,7 @@ class MigrateContentToStoreTaskListenerTest {
"""
)
fun migrateVideo_null() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val fs = MockFileSystemService().also { listener.fs = it }
val result = listener.migrateVideo(fs, null)
@ -122,7 +132,7 @@ class MigrateContentToStoreTaskListenerTest {
}
// -------------------------------------------------------------------------
// migrateSubtitle
// migrateSubtitle (direct tests)
// -------------------------------------------------------------------------
@Test
@ -135,9 +145,7 @@ class MigrateContentToStoreTaskListenerTest {
"""
)
fun migrateSubtitle_empty() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val fs = MockFileSystemService().also { listener.fs = it }
val result = listener.migrateSubtitle(fs, emptyList())
@ -155,15 +163,9 @@ class MigrateContentToStoreTaskListenerTest {
"""
)
fun migrateSubtitle_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val fs = MockFileSystemService().also { listener.fs = it }
val sub = MigrateToContentStoreTask.Data.SingleSubtitle(
language = "en",
cachedUri = "/tmp/a.srt",
storeUri = "/tmp/b.srt"
)
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
val result = listener.migrateSubtitle(fs, listOf(sub))
@ -174,34 +176,42 @@ class MigrateContentToStoreTaskListenerTest {
@DisplayName(
"""
Når migrateSubtitle kjøres
Hvis én lykkes og én feiler
Hvis filene ikke er identiske
:
returneres både Completed og Failed
kastes exception
"""
)
fun migrateSubtitle_mixed() {
val fs = MockFileSystemService().apply {
// first OK, second fails
copyShouldFail = false
}.also { listener.fs = it }
fun migrateSubtitle_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val subs = listOf(
MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b"),
MigrateToContentStoreTask.Data.SingleSubtitle("no", "/tmp/c", "/tmp/d")
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
assertThrows<IllegalStateException> {
listener.migrateSubtitle(fs, listOf(sub))
}
}
@Test
@DisplayName(
"""
Når migrateSubtitle kjøres
Hvis copy feiler
:
kastes exception
"""
)
fun migrateSubtitle_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
// simulate second failing
fs.copyShouldFail = false
val result1 = listener.migrateSubtitle(fs, listOf(subs[0]))
fs.copyShouldFail = true
val result2 = listener.migrateSubtitle(fs, listOf(subs[1]))
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
assertEquals(MigrateStatus.Completed, result1.first().status)
assertEquals(MigrateStatus.Failed, result2.first().status)
assertThrows<IllegalStateException> {
listener.migrateSubtitle(fs, listOf(sub))
}
}
// -------------------------------------------------------------------------
// migrateCover
// migrateCover (direct tests)
// -------------------------------------------------------------------------
@Test
@ -214,14 +224,9 @@ class MigrateContentToStoreTaskListenerTest {
"""
)
fun migrateCover_success() {
val fs = MockFileSystemService().also {
listener.fs = it
}
val fs = MockFileSystemService().also { listener.fs = it }
val cover = MigrateToContentStoreTask.Data.SingleContent(
cachedUri = "/tmp/c.jpg",
storeUri = "/tmp/c2.jpg"
)
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
val result = listener.migrateCover(fs, listOf(cover))
@ -232,51 +237,60 @@ class MigrateContentToStoreTaskListenerTest {
@DisplayName(
"""
Når migrateCover kjøres
Hvis flere covers og én feiler
Hvis filene ikke er identiske
:
returneres både Completed og Failed
kastes exception
"""
)
fun migrateCover_mixed() {
val fs = MockFileSystemService().also { listener.fs = it }
fun migrateCover_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val covers = listOf(
MigrateToContentStoreTask.Data.SingleContent("/tmp/a", "/tmp/b"),
MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/d")
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
assertThrows<IllegalStateException> {
listener.migrateCover(fs, listOf(cover))
}
}
@Test
@DisplayName(
"""
Når migrateCover kjøres
Hvis copy feiler
:
kastes exception
"""
)
fun migrateCover_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
// first OK, second mismatch
fs.identical = true
val ok = listener.migrateCover(fs, listOf(covers[0]))
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
fs.identical = false
val fail = listener.migrateCover(fs, listOf(covers[1]))
assertEquals(MigrateStatus.Completed, ok.first().status)
assertEquals(MigrateStatus.Failed, fail.first().status)
assertThrows<IllegalStateException> {
listener.migrateCover(fs, listOf(cover))
}
}
// -------------------------------------------------------------------------
// onTask
// accept() — full TaskListener flow
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Når onTask kjøres
Når accept() kjøres
Hvis alle migreringer lykkes
:
returneres Completed-event og cache slettes
publiseres Completed-event og cache slettes
"""
)
fun onTask_success() = runTest {
val fs = MockFileSystemService().also {
listener.fs = it
}
fun accept_success() = runTest {
val fs = MockFileSystemService().also { listener.fs = it }
val reporter = FakeTaskReporter()
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
"col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = listOf(
MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/s", "/tmp/s2")
@ -287,8 +301,12 @@ class MigrateContentToStoreTaskListenerTest {
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
listener.accept(task, reporter)
listener.currentJob?.join()
val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent
assertTrue(reporter.completed)
assertEquals(TaskStatus.Completed, event.status)
assertEquals(3, fs.deleted.size)
}
@ -296,28 +314,31 @@ class MigrateContentToStoreTaskListenerTest {
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis en migrering feiler
Når accept() kjøres
Hvis migrateVideo kaster exception
:
returneres Failed-event og ingenting slettes
publiseres Failed-event og cache slettes ikke
"""
)
fun onTask_failure() = runTest {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also {
listener.fs = it
}
fun accept_failure() = runTest {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
val reporter = FakeTaskReporter()
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
"col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = emptyList(),
coverContent = emptyList()
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
listener.accept(task, reporter)
listener.currentJob?.join()
val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent
assertTrue(reporter.failed)
assertEquals(TaskStatus.Failed, event.status)
assertEquals(0, fs.deleted.size)
}
@ -325,57 +346,31 @@ class MigrateContentToStoreTaskListenerTest {
@Test
@DisplayName(
"""
Når onTask kjøres
Hvis video feiler
:
returneres Failed og ingenting slettes
"""
)
fun onTask_videoFails() = runTest {
val fs = MockFileSystemService().apply { copyShouldFail = true }
.also { listener.fs = it }
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = emptyList(),
coverContent = emptyList()
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
assertEquals(TaskStatus.Failed, event.status)
assertEquals(0, fs.deleted.size)
}
@Test
@DisplayName(
"""
Når onTask kjøres
Når accept() kjøres
Hvis sletting feiler
:
returneres fortsatt Completed
publiseres fortsatt Completed-event
"""
)
fun onTask_deleteFails() = runTest {
val fs = MockFileSystemService().apply { deleteShouldFail = true }
.also { listener.fs = it }
fun accept_deleteFails() = runTest {
val fs = MockFileSystemService().apply { deleteShouldFail = true }.also { listener.fs = it }
val reporter = FakeTaskReporter()
val task = MigrateToContentStoreTask(
MigrateToContentStoreTask.Data(
collection = "col",
"col",
videoContent = MigrateToContentStoreTask.Data.SingleContent("/tmp/v", "/tmp/v2"),
subtitleContent = emptyList(),
coverContent = emptyList()
)
).newReferenceId()
val event = listener.onTask(task) as MigrateContentToStoreTaskResultEvent
listener.accept(task, reporter)
listener.currentJob?.join()
val event = reporter.events.first() as MigrateContentToStoreTaskResultEvent
assertTrue(reporter.completed)
assertEquals(TaskStatus.Completed, event.status)
}
}

View File

@ -1,8 +1,10 @@
package no.iktdev.mediaprocessing.coordinator.listeners.tasks
import kotlinx.coroutines.test.runTest
import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.StoreContentAndMetadataTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.StoreContentAndMetadataTask
import no.iktdev.mediaprocessing.shared.common.model.ContentExport
@ -21,10 +23,37 @@ import org.springframework.http.HttpMethod
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.client.RestTemplate
import java.util.*
@ExtendWith(MockitoExtension::class)
class StoreContentAndMetadataTaskListenerTest {
// -------------------------------------------------------------------------
// Fake Reporter
// -------------------------------------------------------------------------
class FakeTaskReporter : TaskReporter {
val events = mutableListOf<Event>()
var completed = false
var failed = false
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) { completed = true }
override fun markFailed(referenceId: UUID, taskId: UUID) { failed = true }
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {
events.add(event)
}
}
// -------------------------------------------------------------------------
// Setup
// -------------------------------------------------------------------------
@Mock
lateinit var restTemplate: RestTemplate
@ -39,10 +68,19 @@ class StoreContentAndMetadataTaskListenerTest {
private fun sampleContentExport(): ContentExport {
return ContentExport(
collection = "series",
episodeInfo = ContentExport.EpisodeInfo(episodeNumber = 1, seasonNumber = 1, episodeTitle = "Pilot"),
episodeInfo = ContentExport.EpisodeInfo(
episodeNumber = 1,
seasonNumber = 1,
episodeTitle = "Pilot"
),
media = ContentExport.MediaExport(
videoFile = "bb.s01e01.mkv",
subtitles = listOf(ContentExport.MediaExport.Subtitle(subtitleFile = "bb.en.srt", language = "en"))
subtitles = listOf(
ContentExport.MediaExport.Subtitle(
subtitleFile = "bb.en.srt",
language = "en"
)
)
),
metadata = ContentExport.MetadataExport(
title = "Breaking Bad",
@ -55,6 +93,9 @@ class StoreContentAndMetadataTaskListenerTest {
)
}
// -------------------------------------------------------------------------
// supports()
// -------------------------------------------------------------------------
@Test
@DisplayName(
@ -90,45 +131,53 @@ class StoreContentAndMetadataTaskListenerTest {
assertThat(result).isFalse()
}
// -------------------------------------------------------------------------
// accept() — full TaskListener flow
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
Gitt at RestTemplate returnerer 200 OK
Når onTask() kalles
Når accept() kjøres
:
Returnerer Completed-event
Publiseres Completed-event
"""
)
fun onTask_returnsCompletedOnSuccess() = runTest {
val task = StoreContentAndMetadataTask(data = sampleContentExport())
fun accept_returnsCompletedOnSuccess() = runTest {
val reporter = FakeTaskReporter()
val task = StoreContentAndMetadataTask(data = sampleContentExport()).newReferenceId()
whenever(
restTemplate.exchange(
eq("open/api/mediaprocesser/import"),
eq("/open/api/mediaprocesser/import"),
eq(HttpMethod.POST),
any(),
eq(Void::class.java)
)
).thenReturn(ResponseEntity(HttpStatus.OK))
val event = listener.onTask(task)
listener.accept(task, reporter)
listener.currentJob?.join()
assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java)
val result = event as StoreContentAndMetadataTaskResultEvent
assertThat(result.status).isEqualTo(TaskStatus.Completed)
val event = reporter.events.first() as StoreContentAndMetadataTaskResultEvent
assertThat(reporter.completed).isTrue()
assertThat(event.status).isEqualTo(TaskStatus.Completed)
}
@Test
@DisplayName(
"""
Gitt at RestTemplate kaster exception
Når onTask() kalles
Når accept() kjøres
:
Returnerer Failed-event
Publiseres Failed-event via createIncompleteStateTaskEvent()
"""
)
fun onTask_returnsFailedOnException() = runTest {
val task = StoreContentAndMetadataTask(data = sampleContentExport())
fun accept_returnsFailedOnException() = runTest {
val reporter = FakeTaskReporter()
val task = StoreContentAndMetadataTask(data = sampleContentExport()).newReferenceId()
whenever(
restTemplate.exchange(
@ -139,13 +188,20 @@ class StoreContentAndMetadataTaskListenerTest {
)
).thenThrow(RuntimeException("boom"))
val event = listener.onTask(task)
listener.accept(task, reporter)
listener.currentJob?.join()
assertThat(event).isInstanceOf(StoreContentAndMetadataTaskResultEvent::class.java)
val result = event as StoreContentAndMetadataTaskResultEvent
assertThat(result.status).isEqualTo(TaskStatus.Failed)
val event = reporter.events.first() as StoreContentAndMetadataTaskResultEvent
assertThat(reporter.failed).isTrue()
assertThat(event.status).isEqualTo(TaskStatus.Failed)
assertThat(event.error).contains("boom")
}
// -------------------------------------------------------------------------
// workerId()
// -------------------------------------------------------------------------
@Test
@DisplayName(
"""
@ -159,6 +215,6 @@ class StoreContentAndMetadataTaskListenerTest {
val id = listener.getWorkerId()
assertThat(id).contains("StoreContentAndMetadataTaskListener-MIXED-")
assertThat(id.split("-").last().length).isGreaterThan(10) // UUID-ish
assertThat(id.split("-").last().length).isGreaterThan(10)
}
}

View File

@ -57,12 +57,12 @@ class DefaultTaskReporter() : TaskReporter {
TaskStore.markConsumed(taskId, TaskStatus.Completed)
}
override fun markFailed(taskId: UUID) {
override fun markFailed(referenceId: UUID, taskId: UUID) {
log.info { "Marking task $taskId as failed" }
TaskStore.markConsumed(taskId, TaskStatus.Failed)
}
override fun markCancelled(taskId: UUID) {
override fun markCancelled(referenceId: UUID, taskId: UUID) {
log.info { "Margin task $taskId as cancelled"}
TaskStore.markConsumed(taskId, TaskStatus.Cancelled)
}

View File

@ -71,6 +71,19 @@ class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
).producedFrom(task)
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return ProcesserExtractResultEvent(null, status, error = message)
}
override fun getFfmpeg(): FFmpeg {
return SubtitleFFmpeg()
}

View File

@ -70,6 +70,20 @@ class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): Ff
).producedFrom(task)
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return ProcesserEncodeResultEvent(null, status, error = message)
}
override fun getFfmpeg(): FFmpeg {
return VideoFFmpeg(object : FFmpeg.Listener {
var lastProgress: FfmpegDecodedProgress? = null

View File

@ -39,8 +39,8 @@ class SubtitleTaskListenerTest {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun markFailed(referenceId: UUID, taskId: UUID) {}
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}

View File

@ -41,8 +41,8 @@ class VideoTaskListenerTest {
override fun markClaimed(taskId: UUID, workerId: String) {}
override fun updateLastSeen(taskId: UUID) {}
override fun markCompleted(taskId: UUID) {}
override fun markFailed(taskId: UUID) {}
override fun markCancelled(taskId: UUID) {}
override fun markFailed(referenceId: UUID, taskId: UUID) {}
override fun markCancelled(referenceId: UUID, taskId: UUID) {}
override fun updateProgress(taskId: UUID, progress: Int) {}
override fun log(taskId: UUID, message: String) {}
override fun publishEvent(event: Event) {

View File

@ -1,5 +1,5 @@
[versions]
eventi = "1.0-rc36"
eventi = "1.0-rc38"
exfl = "1.0-rc1"
[libraries]

View File

@ -5,13 +5,18 @@ import no.iktdev.mediaprocessing.shared.common.event_task_contract.TaskResultEve
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
class MigrateContentToStoreTaskResultEvent(
val migrateData: MigrateData? = null,
status: TaskStatus,
error: String? = null
) : TaskResultEvent(status, error) {
data class MigrateData(
val collection: String,
val videoMigrate: FileMigration,
val subtitleMigrate: List<SubtitleMigration>,
val coverMigrate: List<FileMigration>,
status: TaskStatus,
error: String? = null
) : TaskResultEvent(status, error) {
)
data class FileMigration(
val storedUri: String?,
val status: MigrateStatus

View File

@ -14,7 +14,8 @@ class StoreProjection(val events: List<Event>) {
val metadata = CollectProjection(events).metadata
if (metadata != null) {
val useCover = if (metadata.cover != null) {
val migrated = events.filterIsInstance<MigrateContentToStoreTaskResultEvent>().lastOrNull { it.status == TaskStatus.Completed }?.coverMigrate ?: emptyList()
val migrateData = events.filterIsInstance<MigrateContentToStoreTaskResultEvent>().lastOrNull { it.status == TaskStatus.Completed }?.migrateData
val migrated = migrateData?.coverMigrate ?: emptyList()
migrated.filter { it.status == MigrateStatus.Completed && it.storedUri != null }
.map { File(it.storedUri!!).name }
.find { it == metadata.cover.name }
@ -47,7 +48,7 @@ class StoreProjection(val events: List<Event>) {
}
fun projectMediaFiles(): ContentExport.MediaExport? {
val migrated = events.filterIsInstance<MigrateContentToStoreTaskResultEvent>().lastOrNull { it.status == TaskStatus.Completed }
val migrated = events.filterIsInstance<MigrateContentToStoreTaskResultEvent>().lastOrNull { it.status == TaskStatus.Completed }?.migrateData
return ContentExport.MediaExport(
videoFile = migrated?.videoMigrate?.let { video ->
if (video.status == MigrateStatus.Completed) File(video.storedUri!!).name else null
@ -59,7 +60,7 @@ class StoreProjection(val events: List<Event>) {
fun getCollection(): String? {
val migrated = events.filterIsInstance<MigrateContentToStoreTaskResultEvent>().lastOrNull { it.status == TaskStatus.Completed } ?: return null
return if (migrated.status == TaskStatus.Completed) migrated.collection else null
return if (migrated.status == TaskStatus.Completed) migrated.migrateData?.collection else null
}
}