Fixed in migration

This commit is contained in:
Brage Skjønborg 2026-02-01 00:43:49 +01:00
parent 7cc7679746
commit 6b47327f87
4 changed files with 140 additions and 167 deletions

View File

@ -5,62 +5,63 @@ import no.iktdev.eventi.models.Task
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskListener
import no.iktdev.eventi.tasks.TaskType
import no.iktdev.mediaprocessing.coordinator.util.FileServiceException
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
import no.iktdev.mediaprocessing.shared.common.model.MigrateStatus
import no.iktdev.mediaprocessing.shared.common.silentTry
import org.jetbrains.annotations.VisibleForTesting
import org.springframework.stereotype.Component
import java.io.File
import java.nio.file.Files
import java.util.*
@Component
class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
override fun getWorkerId(): String {
return "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
}
class MigrateContentToStoreTaskListener : TaskListener(TaskType.IO_INTENSIVE) {
override fun supports(task: Task): Boolean {
return task is MigrateToContentStoreTask
}
override fun getWorkerId(): String =
"${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
override fun supports(task: Task): Boolean =
task is MigrateToContentStoreTask
override suspend fun onTask(task: Task): Event? {
val pickedTask = task as? MigrateToContentStoreTask ?: return null
val picked = task as? MigrateToContentStoreTask ?: return null
val fs = getFileSystemService()
// Disse vil kaste exceptions hvis noe går galt
val videoStatus = migrateVideo(fs, pickedTask.data.videoContent)
val subtitleStatus = migrateSubtitle(fs, pickedTask.data.subtitleContent ?: emptyList())
val coverStatus = migrateCover(fs, pickedTask.data.coverContent ?: emptyList())
val video = migrateVideo(fs, picked.data.videoContent)
val subs = migrateSubtitle(fs, picked.data.subtitleContent ?: emptyList())
val covers = migrateCover(fs, picked.data.coverContent ?: emptyList())
// Hvis vi kommer hit, har ingen migrering kastet exceptions → alt OK
deleteCache(fs, pickedTask)
deleteCache(fs, picked)
return MigrateContentToStoreTaskResultEvent(
status = TaskStatus.Completed,
migrateData = MigrateContentToStoreTaskResultEvent.MigrateData(
collection = pickedTask.data.collection,
videoMigrate = videoStatus,
subtitleMigrate = subtitleStatus,
coverMigrate = coverStatus
collection = picked.data.collection,
videoMigrate = video,
subtitleMigrate = subs,
coverMigrate = covers
)
).producedFrom(task)
}
override fun createIncompleteStateTaskEvent(
task: Task,
status: TaskStatus,
exception: Exception?
): Event {
val message = when (status) {
TaskStatus.Failed -> exception?.message ?: "Unknown error, see log"
TaskStatus.Failed -> exception?.message ?: "Unknown error"
TaskStatus.Cancelled -> "Canceled"
else -> ""
}
return MigrateContentToStoreTaskResultEvent(null, status, error = message)
return MigrateContentToStoreTaskResultEvent(
migrateData = null,
status = status,
error = message
).producedFrom(task)
}
private fun deleteCache(fs: FileSystemService, task: MigrateToContentStoreTask) {
@ -69,57 +70,51 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
task.data.coverContent?.forEach { silentTry { fs.delete(File(it.cachedUri)) } }
}
// -------------------------------------------------------------------------
// MIGRATION HELPERS
// -------------------------------------------------------------------------
private fun migrateFile(fs: FileSystemService, source: File, destination: File) {
if (destination.exists()) {
try {
fs.verifyIdentical(source, destination)
return
} catch (e: FileServiceException.VerificationFailed) {
throw FileServiceException.DestinationExistsButDifferent(source, destination)
}
}
fs.copy(source, destination)
fs.verifyIdentical(source, destination)
}
internal fun migrateVideo(
fs: FileSystemService,
videoContent: MigrateToContentStoreTask.Data.SingleContent?
content: MigrateToContentStoreTask.Data.SingleContent?
): MigrateContentToStoreTaskResultEvent.FileMigration {
if (videoContent == null) {
if (content == null) {
return MigrateContentToStoreTaskResultEvent.FileMigration(null, MigrateStatus.NotPresent)
}
val source = File(videoContent.cachedUri)
val destination = File(videoContent.storeUri)
val source = File(content.cachedUri)
val dest = File(content.storeUri)
// 1. Hvis destinasjonen finnes, sjekk identitet
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
// Skip allerede migrert
return MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
} else {
throw IllegalStateException(
"Destination file already exists but is not identical: $destination"
)
}
}
// 2. Utfør kopiering
if (!fs.copy(source, destination)) {
throw IllegalStateException("File could not be copied to: $destination from $source")
}
// 3. Verifiser kopien (optional)
if (!fs.areIdentical(source, destination)) {
throw IllegalStateException("Copied file is not identical to source: $destination")
}
migrateFile(fs, source, dest)
return MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
storedUri = dest.absolutePath,
status = MigrateStatus.Completed
)
}
@VisibleForTesting
internal fun migrateSubtitle(
fs: FileSystemService,
subtitleContents: List<MigrateToContentStoreTask.Data.SingleSubtitle>
subs: List<MigrateToContentStoreTask.Data.SingleSubtitle>
): List<MigrateContentToStoreTaskResultEvent.SubtitleMigration> {
if (subtitleContents.isEmpty()) {
if (subs.isEmpty()) {
return listOf(
MigrateContentToStoreTaskResultEvent.SubtitleMigration(
language = null,
@ -129,55 +124,26 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
)
}
return subtitleContents.map { subtitle ->
val source = File(subtitle.cachedUri)
val destination = File(subtitle.storeUri)
return subs.map { sub ->
val source = File(sub.cachedUri)
val dest = File(sub.storeUri)
// 1. Hvis destinasjonen finnes
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
return@map MigrateContentToStoreTaskResultEvent.SubtitleMigration(
subtitle.language,
destination.absolutePath,
MigrateStatus.Completed
)
} else {
throw IllegalStateException(
"Destination subtitle exists but is not identical: ${destination.absolutePath}"
)
}
}
migrateFile(fs, source, dest)
// 2. Kopier
if (!fs.copy(source, destination)) {
throw IllegalStateException(
"Failed to copy subtitle ${subtitle.language} from $source to $destination"
)
}
// 3. Verifiser
if (!fs.areIdentical(source, destination)) {
throw IllegalStateException(
"Copied subtitle ${subtitle.language} is not identical: ${destination.absolutePath}"
)
}
// 4. OK
MigrateContentToStoreTaskResultEvent.SubtitleMigration(
subtitle.language,
destination.absolutePath,
MigrateStatus.Completed
language = sub.language,
storedUri = dest.absolutePath,
status = MigrateStatus.Completed
)
}
}
@VisibleForTesting
internal fun migrateCover(
fs: FileSystemService,
coverContents: List<MigrateToContentStoreTask.Data.SingleContent>
covers: List<MigrateToContentStoreTask.Data.SingleContent>
): List<MigrateContentToStoreTaskResultEvent.FileMigration> {
if (coverContents.isEmpty()) {
if (covers.isEmpty()) {
return listOf(
MigrateContentToStoreTaskResultEvent.FileMigration(
storedUri = null,
@ -186,64 +152,41 @@ class MigrateContentToStoreTaskListener: TaskListener(TaskType.IO_INTENSIVE) {
)
}
return coverContents.map { cover ->
return covers.map { cover ->
val source = File(cover.cachedUri)
val destination = File(cover.storeUri)
val dest = File(cover.storeUri)
// 1. Hvis destinasjonen finnes
if (destination.exists()) {
if (fs.areIdentical(source, destination)) {
return@map MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
)
} else {
throw IllegalStateException(
"Destination cover exists but is not identical: ${destination.absolutePath}"
)
}
}
migrateFile(fs, source, dest)
// 2. Kopier
if (!fs.copy(source, destination)) {
throw IllegalStateException(
"Failed to copy cover from $source to $destination"
)
}
// 3. Verifiser
if (!fs.areIdentical(source, destination)) {
throw IllegalStateException(
"Copied cover is not identical: ${destination.absolutePath}"
)
}
// 4. OK
MigrateContentToStoreTaskResultEvent.FileMigration(
destination.absolutePath,
MigrateStatus.Completed
storedUri = dest.absolutePath,
status = MigrateStatus.Completed
)
}
}
open fun getFileSystemService(): FileSystemService {
return DefaultFileSystemService()
}
open fun getFileSystemService(): FileSystemService =
DefaultFileSystemService()
class DefaultFileSystemService : FileSystemService {
override fun copy(source: File, destination: File): Boolean {
return try {
override fun copy(source: File, destination: File) {
if (!source.exists()) {
throw FileServiceException.SourceMissing(source)
}
try {
source.copyTo(destination, overwrite = true)
true
} catch (e: Exception) {
false
throw FileServiceException.CopyFailed(source, destination, e)
}
}
override fun areIdentical(a: File, b: File): Boolean {
return Files.mismatch(a.toPath(), b.toPath()) == -1L
override fun verifyIdentical(source: File, destination: File) {
val mismatch = Files.mismatch(source.toPath(), destination.toPath())
if (mismatch != -1L) {
throw FileServiceException.VerificationFailed(source, destination)
}
}
override fun delete(file: File) {

View File

@ -3,7 +3,23 @@ package no.iktdev.mediaprocessing.coordinator.util
import java.io.File
interface FileSystemService {
fun copy(source: File, destination: File): Boolean
fun areIdentical(a: File, b: File): Boolean
fun copy(source: File, destination: File)
fun verifyIdentical(original: File, target: File)
fun delete(file: File)
}
sealed class FileServiceException(message: String, cause: Throwable? = null) : RuntimeException(message, cause) {
class SourceMissing(val source: File) :
FileServiceException("Source file does not exist: ${source.absolutePath}")
class DestinationExistsButDifferent(val source: File, val destination: File) :
FileServiceException("Destination exists but differs: ${destination.absolutePath}")
class CopyFailed(val source: File, val destination: File, cause: Throwable?) :
FileServiceException("Failed to copy ${source.absolutePath}${destination.absolutePath}", cause)
class VerificationFailed(val source: File, val destination: File) :
FileServiceException("Copied file is not identical: ${destination.absolutePath}")
}

View File

@ -1,27 +1,48 @@
package no.iktdev.mediaprocessing
import no.iktdev.mediaprocessing.coordinator.util.FileServiceException
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import java.io.File
class MockFileSystemService : FileSystemService {
// Controls
var copyShouldFail = false
var deleteShouldFail = false
var identical = true
var sourceExists = true
// Tracking
val copied = mutableListOf<Pair<File, File>>()
val verified = mutableListOf<Pair<File, File>>()
val deleted = mutableListOf<File>()
override fun copy(source: File, destination: File): Boolean {
override fun copy(source: File, destination: File) {
copied += source to destination
return !copyShouldFail
if (!sourceExists) {
throw FileServiceException.SourceMissing(source)
}
override fun areIdentical(a: File, b: File): Boolean {
return identical
if (copyShouldFail) {
throw FileServiceException.CopyFailed(source, destination, RuntimeException("copy failed"))
}
// Simulate successful copy by doing nothing
}
override fun verifyIdentical(source: File, destination: File) {
verified += source to destination
if (!identical) {
throw FileServiceException.VerificationFailed(source, destination)
}
}
override fun delete(file: File) {
if (deleteShouldFail) throw RuntimeException("delete failed")
if (deleteShouldFail) {
throw RuntimeException("delete failed")
}
deleted += file
}
}

View File

@ -5,6 +5,7 @@ import no.iktdev.eventi.models.Event
import no.iktdev.eventi.models.store.TaskStatus
import no.iktdev.eventi.tasks.TaskReporter
import no.iktdev.mediaprocessing.MockFileSystemService
import no.iktdev.mediaprocessing.coordinator.util.FileServiceException
import no.iktdev.mediaprocessing.coordinator.util.FileSystemService
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.MigrateContentToStoreTaskResultEvent
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.MigrateToContentStoreTask
@ -52,7 +53,7 @@ class MigrateContentToStoreTaskListenerTest {
private val listener = TestListener()
// -------------------------------------------------------------------------
// migrateVideo (direct tests)
// migrateVideo
// -------------------------------------------------------------------------
@Test
@ -66,7 +67,6 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateVideo_success() {
val fs = MockFileSystemService().also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
val result = listener.migrateVideo(fs, content)
@ -74,6 +74,7 @@ class MigrateContentToStoreTaskListenerTest {
assertEquals(MigrateStatus.Completed, result.status)
assertEquals("/tmp/dest", result.storedUri)
assertEquals(1, fs.copied.size)
assertEquals(1, fs.verified.size)
}
@Test
@ -87,10 +88,9 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateVideo_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.CopyFailed> {
listener.migrateVideo(fs, content)
}
}
@ -106,10 +106,9 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateVideo_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val content = MigrateToContentStoreTask.Data.SingleContent("/tmp/source", "/tmp/dest")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.VerificationFailed> {
listener.migrateVideo(fs, content)
}
}
@ -132,7 +131,7 @@ class MigrateContentToStoreTaskListenerTest {
}
// -------------------------------------------------------------------------
// migrateSubtitle (direct tests)
// migrateSubtitle
// -------------------------------------------------------------------------
@Test
@ -164,7 +163,6 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateSubtitle_success() {
val fs = MockFileSystemService().also { listener.fs = it }
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
val result = listener.migrateSubtitle(fs, listOf(sub))
@ -183,10 +181,9 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateSubtitle_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.VerificationFailed> {
listener.migrateSubtitle(fs, listOf(sub))
}
}
@ -202,16 +199,15 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateSubtitle_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
val sub = MigrateToContentStoreTask.Data.SingleSubtitle("en", "/tmp/a", "/tmp/b")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.CopyFailed> {
listener.migrateSubtitle(fs, listOf(sub))
}
}
// -------------------------------------------------------------------------
// migrateCover (direct tests)
// migrateCover
// -------------------------------------------------------------------------
@Test
@ -225,7 +221,6 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateCover_success() {
val fs = MockFileSystemService().also { listener.fs = it }
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
val result = listener.migrateCover(fs, listOf(cover))
@ -244,10 +239,9 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateCover_mismatch() {
val fs = MockFileSystemService().apply { identical = false }.also { listener.fs = it }
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.VerificationFailed> {
listener.migrateCover(fs, listOf(cover))
}
}
@ -263,16 +257,15 @@ class MigrateContentToStoreTaskListenerTest {
)
fun migrateCover_copyFails() {
val fs = MockFileSystemService().apply { copyShouldFail = true }.also { listener.fs = it }
val cover = MigrateToContentStoreTask.Data.SingleContent("/tmp/c", "/tmp/c2")
assertThrows<IllegalStateException> {
assertThrows<FileServiceException.CopyFailed> {
listener.migrateCover(fs, listOf(cover))
}
}
// -------------------------------------------------------------------------
// accept() — full TaskListener flow
// accept()
// -------------------------------------------------------------------------
@Test