Files table
This commit is contained in:
parent
6269f625e3
commit
57cb1bc8fb
@ -7,6 +7,7 @@ import no.iktdev.mediaprocessing.coordinator.eventDatabase
|
||||
import no.iktdev.mediaprocessing.coordinator.getStoreDatabase
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.data.*
|
||||
import no.iktdev.mediaprocessing.shared.common.database.tables.processed
|
||||
import no.iktdev.mediaprocessing.shared.common.getChecksum
|
||||
import no.iktdev.streamit.library.db.executeOrException
|
||||
import no.iktdev.streamit.library.db.withTransaction
|
||||
import org.jetbrains.exposed.sql.insert
|
||||
@ -15,18 +16,19 @@ object ProcessedItemsStore {
|
||||
val log = KotlinLogging.logger {}
|
||||
|
||||
fun store(title: String, events: List<Event>, processedFiles: List<String>) {
|
||||
val inputFile = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.file ?: return
|
||||
|
||||
val inputFilePath = events.findFirstEventOf<MediaProcessStartEvent>()?.data?.file ?: return
|
||||
val checksum = getChecksum(inputFilePath)
|
||||
val isEncoded = events.findEventsOf<EncodeWorkPerformedEvent>().any { it.isSuccessful() }
|
||||
val isExtracted = events.findEventsOf<EncodeWorkPerformedEvent>().any { it.isSuccessful() }
|
||||
|
||||
withTransaction(eventDatabase.database.database, block = {
|
||||
processed.insert {
|
||||
it[this.title] = title
|
||||
it[this.fileName] = inputFile
|
||||
it[this.fileName] = inputFilePath
|
||||
it[this.processedFiles] = Gson().toJson(processedFiles)
|
||||
it[this.encoded] = isEncoded
|
||||
it[this.extracted] = isExtracted
|
||||
it[this.checksum] = checksum
|
||||
}
|
||||
}) {
|
||||
it.printStackTrace()
|
||||
|
||||
@ -7,11 +7,18 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.channels.consumeEach
|
||||
import kotlinx.coroutines.delay
|
||||
import mu.KotlinLogging
|
||||
import no.iktdev.eventi.database.executeWithResult
|
||||
import no.iktdev.eventi.database.withTransaction
|
||||
import no.iktdev.mediaprocessing.coordinator.*
|
||||
import no.iktdev.mediaprocessing.shared.common.SharedConfig
|
||||
import no.iktdev.mediaprocessing.shared.common.contract.ProcessType
|
||||
import no.iktdev.mediaprocessing.shared.common.database.tables.files
|
||||
import no.iktdev.mediaprocessing.shared.common.extended.isSupportedVideoFile
|
||||
import no.iktdev.mediaprocessing.shared.common.ifNotEmpty
|
||||
import no.iktdev.mediaprocessing.shared.common.md5
|
||||
import no.iktdev.streamit.library.db.executeOrException
|
||||
import no.iktdev.streamit.library.db.withTransaction
|
||||
import org.jetbrains.exposed.sql.insertIgnore
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.stereotype.Service
|
||||
import java.io.File
|
||||
@ -149,6 +156,14 @@ class InputDirectoryWatcher(@Autowired var coordinator: Coordinator): FileWatche
|
||||
logger.info { "File available ${file.file.name}" }
|
||||
|
||||
// This sends it to coordinator to start the process
|
||||
executeWithResult(eventDatabase.database.database) {
|
||||
files.insertIgnore {
|
||||
it[baseName] = file.file.nameWithoutExtension
|
||||
it[folder] = file.file.parentFile.absolutePath
|
||||
it[fileName] = file.file.absolutePath
|
||||
it[checksum] = file.file.md5()
|
||||
}
|
||||
}
|
||||
coordinator.startProcess(file.file, ProcessType.FLOW)
|
||||
}
|
||||
|
||||
|
||||
@ -3,8 +3,10 @@ package no.iktdev.mediaprocessing.shared.common
|
||||
import kotlinx.coroutines.delay
|
||||
import mu.KotlinLogging
|
||||
import java.io.File
|
||||
import java.io.FileInputStream
|
||||
import java.io.RandomAccessFile
|
||||
import java.net.InetAddress
|
||||
import java.security.MessageDigest
|
||||
import java.util.zip.CRC32
|
||||
|
||||
private val logger = KotlinLogging.logger {}
|
||||
@ -137,3 +139,27 @@ fun <T> List<T>.ifNotEmpty(block: (List<T>) -> Unit) {
|
||||
block(this)
|
||||
}
|
||||
}
|
||||
|
||||
fun File.md5(): String {
|
||||
return getChecksum(this.absolutePath)
|
||||
}
|
||||
|
||||
fun getChecksum(filePath: String): String {
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
val fis = FileInputStream(filePath)
|
||||
val byteArray = ByteArray(1024)
|
||||
var bytesCount: Int
|
||||
|
||||
while (fis.read(byteArray).also { bytesCount = it } != -1) {
|
||||
digest.update(byteArray, 0, bytesCount)
|
||||
}
|
||||
|
||||
fis.close()
|
||||
|
||||
val bytes = digest.digest()
|
||||
val sb = StringBuilder()
|
||||
for (byte in bytes) {
|
||||
sb.append(String.format("%02x", byte))
|
||||
}
|
||||
return sb.toString()
|
||||
}
|
||||
|
||||
@ -0,0 +1,11 @@
|
||||
package no.iktdev.mediaprocessing.shared.common.database.tables
|
||||
|
||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||
import org.jetbrains.exposed.sql.Column
|
||||
|
||||
object files: IntIdTable() {
|
||||
val baseName: Column<String> = varchar("baseName", 256)
|
||||
val folder: Column<String> = varchar("folder", 256)
|
||||
val fileName: Column<String> = varchar("fileName", 256)
|
||||
val checksum: Column<String> = varchar("checksum", 256).uniqueIndex()
|
||||
}
|
||||
@ -15,4 +15,5 @@ object processed: IntIdTable() {
|
||||
val encoded: Column<Boolean> = bool("encoded").default(false)
|
||||
val extracted: Column<Boolean> = bool("extracted").default(false)
|
||||
val created: Column<LocalDateTime> = datetime("created").defaultExpression(CurrentDateTime)
|
||||
val checksum: Column<String?> = varchar("checksum", 256).nullable()
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user