Migrated to instant
This commit is contained in:
parent
ab21fc76d9
commit
a86347aaa5
@ -2,18 +2,18 @@ package no.iktdev.mediaprocessing.coordinator.services
|
|||||||
|
|
||||||
import no.iktdev.eventi.ZDS.toEvent
|
import no.iktdev.eventi.ZDS.toEvent
|
||||||
import no.iktdev.eventi.models.store.PersistedEvent
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
import no.iktdev.mediaprocessing.shared.common.LocalDateTimeEpoch
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary
|
import no.iktdev.mediaprocessing.shared.common.dto.SequenceSummary
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
|
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.CollectedEvent
|
||||||
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
|
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
|
||||||
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
import java.time.Instant
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class SequenceAggregatorService() {
|
class SequenceAggregatorService() {
|
||||||
|
|
||||||
fun getActiveSequences(): List<SequenceSummary> {
|
fun getActiveSequences(): List<SequenceSummary> {
|
||||||
val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch)
|
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
||||||
|
|
||||||
// Gruppér først, deserialiser senere
|
// Gruppér først, deserialiser senere
|
||||||
val grouped = allEvents.groupBy { it.referenceId }
|
val grouped = allEvents.groupBy { it.referenceId }
|
||||||
@ -26,7 +26,7 @@ class SequenceAggregatorService() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fun getRecentSequences(limit: Int): List<SequenceSummary> {
|
fun getRecentSequences(limit: Int): List<SequenceSummary> {
|
||||||
val allEvents = EventStore.getPersistedEventsAfter(LocalDateTimeEpoch)
|
val allEvents = EventStore.getPersistedEventsAfter(Instant.EPOCH)
|
||||||
|
|
||||||
val grouped = allEvents.groupBy { it.referenceId }
|
val grouped = allEvents.groupBy { it.referenceId }
|
||||||
|
|
||||||
|
|||||||
@ -2,9 +2,8 @@ from datetime import datetime, timezone
|
|||||||
|
|
||||||
def utc_now():
|
def utc_now():
|
||||||
"""
|
"""
|
||||||
Returnerer en UTC-basert LocalDateTime uten Z eller offset,
|
Matcher nøyaktig formatet Kotlin/Exposed skriver til databasen:
|
||||||
med nanosekund-lignende presisjon (mikrosekunder + padding).
|
yyyy-MM-dd HH:mm:ss.SSSSSS (UTC)
|
||||||
"""
|
"""
|
||||||
dt = datetime.now(timezone.utc).replace(tzinfo=None)
|
dt = datetime.now(timezone.utc)
|
||||||
return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond:06d}000"
|
return dt.strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
|
||||||
|
|||||||
@ -2,9 +2,8 @@ from datetime import datetime, timezone
|
|||||||
|
|
||||||
def utc_now():
|
def utc_now():
|
||||||
"""
|
"""
|
||||||
Returnerer en UTC-basert LocalDateTime uten Z eller offset,
|
Matcher nøyaktig formatet Kotlin/Exposed skriver til databasen:
|
||||||
med nanosekund-lignende presisjon (mikrosekunder + padding).
|
yyyy-MM-dd HH:mm:ss.SSSSSS (UTC)
|
||||||
"""
|
"""
|
||||||
dt = datetime.now(timezone.utc).replace(tzinfo=None)
|
dt = datetime.now(timezone.utc)
|
||||||
return dt.strftime("%Y-%m-%dT%H:%M:%S.") + f"{dt.microsecond:06d}000"
|
return dt.strftime("%Y-%m-%d %H:%M:%S.%f")
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
[versions]
|
[versions]
|
||||||
eventi = "1.0-rc29"
|
eventi = "1.0-rc30"
|
||||||
exfl = "1.0-rc1"
|
exfl = "1.0-rc1"
|
||||||
|
|
||||||
[libraries]
|
[libraries]
|
||||||
|
|||||||
@ -10,8 +10,7 @@ import java.io.FileInputStream
|
|||||||
import java.io.RandomAccessFile
|
import java.io.RandomAccessFile
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
import java.security.MessageDigest
|
import java.security.MessageDigest
|
||||||
import java.time.Clock
|
import java.time.Instant
|
||||||
import java.time.LocalDateTime
|
|
||||||
import java.util.zip.CRC32
|
import java.util.zip.CRC32
|
||||||
|
|
||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
@ -217,9 +216,4 @@ fun File.resolveConflict(): File {
|
|||||||
return candidate
|
return candidate
|
||||||
}
|
}
|
||||||
|
|
||||||
val LocalDateTimeEpoch: LocalDateTime =
|
fun UtcNow(): Instant = Instant.now()
|
||||||
LocalDateTime.of(1970, 1, 1, 0, 0, 0)
|
|
||||||
|
|
||||||
fun UtcNow(): LocalDateTime {
|
|
||||||
return LocalDateTime.now(Clock.systemUTC())
|
|
||||||
}
|
|
||||||
@ -1,10 +1,10 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.common.dto
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
import java.time.LocalDateTime
|
import java.time.Instant
|
||||||
|
|
||||||
data class FileTableItem(
|
data class FileTableItem(
|
||||||
val name: String,
|
val name: String,
|
||||||
val uri: String,
|
val uri: String,
|
||||||
val checksum: String,
|
val checksum: String,
|
||||||
val identifiedAt: LocalDateTime,
|
val identifiedAt: Instant,
|
||||||
)
|
)
|
||||||
@ -2,7 +2,7 @@ package no.iktdev.mediaprocessing.shared.common.dto
|
|||||||
|
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.store.PersistedEvent
|
import no.iktdev.eventi.models.store.PersistedEvent
|
||||||
import java.time.LocalDateTime
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
import kotlin.reflect.KProperty1
|
import kotlin.reflect.KProperty1
|
||||||
|
|
||||||
@ -10,14 +10,14 @@ data class SequenceEvent(
|
|||||||
val eventId: UUID,
|
val eventId: UUID,
|
||||||
val referenceId: UUID,
|
val referenceId: UUID,
|
||||||
val type: String,
|
val type: String,
|
||||||
val timestamp: LocalDateTime,
|
val timestamp: Instant,
|
||||||
val metadata: MetadataDto,
|
val metadata: MetadataDto,
|
||||||
val payload: Map<String, Any?>?
|
val payload: Map<String, Any?>?
|
||||||
)
|
)
|
||||||
|
|
||||||
data class MetadataDto(
|
data class MetadataDto(
|
||||||
val derivedFromEventIds: Set<UUID>?,
|
val derivedFromEventIds: Set<UUID>?,
|
||||||
val createdAt: LocalDateTime
|
val createdAt: Instant
|
||||||
)
|
)
|
||||||
|
|
||||||
fun Event.extractPayload(): Map<String, Any?>? {
|
fun Event.extractPayload(): Map<String, Any?>? {
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.common.dto
|
package no.iktdev.mediaprocessing.shared.common.dto
|
||||||
|
|
||||||
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
|
import no.iktdev.mediaprocessing.shared.common.projection.CollectProjection
|
||||||
import java.time.LocalDateTime
|
import java.time.Instant
|
||||||
|
|
||||||
data class SequenceSummary(
|
data class SequenceSummary(
|
||||||
val referenceId: String,
|
val referenceId: String,
|
||||||
@ -9,7 +9,7 @@ data class SequenceSummary(
|
|||||||
val inputFileName: String?,
|
val inputFileName: String?,
|
||||||
val type: ContextType = ContextType.Content,
|
val type: ContextType = ContextType.Content,
|
||||||
val lastEventId: String,
|
val lastEventId: String,
|
||||||
val lastEventTime: LocalDateTime,
|
val lastEventTime: Instant,
|
||||||
val metadataTaskStatus: CollectProjection.TaskStatus,
|
val metadataTaskStatus: CollectProjection.TaskStatus,
|
||||||
val encodeTaskStatus: CollectProjection.TaskStatus,
|
val encodeTaskStatus: CollectProjection.TaskStatus,
|
||||||
val extractTaskStatus: CollectProjection.TaskStatus,
|
val extractTaskStatus: CollectProjection.TaskStatus,
|
||||||
|
|||||||
@ -9,11 +9,11 @@ import no.iktdev.mediaprocessing.shared.database.tables.EventsTable
|
|||||||
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
import no.iktdev.mediaprocessing.shared.database.withTransaction
|
||||||
import org.jetbrains.exposed.sql.insert
|
import org.jetbrains.exposed.sql.insert
|
||||||
import org.jetbrains.exposed.sql.selectAll
|
import org.jetbrains.exposed.sql.selectAll
|
||||||
import java.time.LocalDateTime
|
import java.time.Instant
|
||||||
import java.util.*
|
import java.util.*
|
||||||
|
|
||||||
object EventStore: EventStore {
|
object EventStore: EventStore {
|
||||||
override fun getPersistedEventsAfter(timestamp: LocalDateTime): List<PersistedEvent> {
|
override fun getPersistedEventsAfter(timestamp: Instant): List<PersistedEvent> {
|
||||||
val result = withTransaction {
|
val result = withTransaction {
|
||||||
EventsTable.selectAll()
|
EventsTable.selectAll()
|
||||||
.where { EventsTable.persistedAt greater timestamp }
|
.where { EventsTable.persistedAt greater timestamp }
|
||||||
|
|||||||
@ -1,17 +1,18 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.database.tables
|
package no.iktdev.mediaprocessing.shared.database.tables
|
||||||
|
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||||
import org.jetbrains.exposed.sql.Column
|
import org.jetbrains.exposed.sql.Column
|
||||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
import org.jetbrains.exposed.sql.javatime.timestamp
|
||||||
import org.jetbrains.exposed.sql.javatime.datetime
|
|
||||||
import java.time.LocalDateTime
|
|
||||||
|
|
||||||
object EventsTable: IntIdTable(name = "EVENTS") {
|
object EventsTable: IntIdTable(name = "EVENTS") {
|
||||||
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
|
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
|
||||||
val eventId: Column<String> = varchar("EVENT_ID", 36)
|
val eventId: Column<String> = varchar("EVENT_ID", 36)
|
||||||
val event: Column<String> = varchar("EVENT",100)
|
val event: Column<String> = varchar("EVENT",100)
|
||||||
val data: Column<String> = text("DATA")
|
val data: Column<String> = text("DATA")
|
||||||
val persistedAt: Column<LocalDateTime> = datetime("PERSISTED_AT").defaultExpression(CurrentDateTime)
|
val persistedAt = timestamp("PERSISTED_AT")
|
||||||
|
.clientDefault { UtcNow() }
|
||||||
|
|
||||||
|
|
||||||
init {
|
init {
|
||||||
uniqueIndex(referenceId, eventId, event)
|
uniqueIndex(referenceId, eventId, event)
|
||||||
|
|||||||
@ -2,12 +2,12 @@ package no.iktdev.mediaprocessing.shared.database.tables
|
|||||||
|
|
||||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||||
import org.jetbrains.exposed.sql.Column
|
import org.jetbrains.exposed.sql.Column
|
||||||
import org.jetbrains.exposed.sql.javatime.datetime
|
import org.jetbrains.exposed.sql.javatime.timestamp
|
||||||
import java.time.LocalDateTime
|
import java.time.Instant
|
||||||
|
|
||||||
object FilesTable: IntIdTable("FILES") {
|
object FilesTable: IntIdTable("FILES") {
|
||||||
val name: Column<String> = varchar("NAME", 255)
|
val name: Column<String> = varchar("NAME", 255)
|
||||||
val uri: Column<String> = text("URI")
|
val uri: Column<String> = text("URI")
|
||||||
val checksum: Column<String> = char("CHECKSUM", 64)
|
val checksum: Column<String> = char("CHECKSUM", 64)
|
||||||
val identifiedAt: Column<LocalDateTime> = datetime("IDENTIFIED_AT")
|
val identifiedAt: Column<Instant> = timestamp("IDENTIFIED_AT")
|
||||||
}
|
}
|
||||||
@ -1,11 +1,11 @@
|
|||||||
package no.iktdev.mediaprocessing.shared.database.tables
|
package no.iktdev.mediaprocessing.shared.database.tables
|
||||||
|
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
import no.iktdev.eventi.models.store.TaskStatus
|
||||||
|
import no.iktdev.mediaprocessing.shared.common.UtcNow
|
||||||
import org.jetbrains.exposed.dao.id.IntIdTable
|
import org.jetbrains.exposed.dao.id.IntIdTable
|
||||||
import org.jetbrains.exposed.sql.Column
|
import org.jetbrains.exposed.sql.Column
|
||||||
import org.jetbrains.exposed.sql.javatime.CurrentDateTime
|
import org.jetbrains.exposed.sql.javatime.timestamp
|
||||||
import org.jetbrains.exposed.sql.javatime.datetime
|
import java.time.Instant
|
||||||
import java.time.LocalDateTime
|
|
||||||
|
|
||||||
object TasksTable: IntIdTable(name = "TASKS") {
|
object TasksTable: IntIdTable(name = "TASKS") {
|
||||||
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
|
val referenceId: Column<String> = varchar("REFERENCE_ID", 36)
|
||||||
@ -16,6 +16,7 @@ object TasksTable: IntIdTable(name = "TASKS") {
|
|||||||
val claimed: Column<Boolean> = bool("CLAIMED").default(false)
|
val claimed: Column<Boolean> = bool("CLAIMED").default(false)
|
||||||
val claimedBy: Column<String?> = varchar("CLAIMED_BY",100).nullable()
|
val claimedBy: Column<String?> = varchar("CLAIMED_BY",100).nullable()
|
||||||
val consumed: Column<Boolean> = bool("CONSUMED").default(false)
|
val consumed: Column<Boolean> = bool("CONSUMED").default(false)
|
||||||
val lastCheckIn: Column<LocalDateTime?> = datetime("LAST_CHECK_IN").nullable()
|
val lastCheckIn: Column<Instant?> = timestamp("LAST_CHECK_IN").nullable()
|
||||||
val persistedAt: Column<LocalDateTime> = datetime("PERSISTED_AT").defaultExpression(CurrentDateTime)
|
val persistedAt = timestamp("PERSISTED_AT")
|
||||||
|
.clientDefault { UtcNow() }
|
||||||
}
|
}
|
||||||
@ -10,6 +10,7 @@ import no.iktdev.mediaprocessing.ffmpeg.decoder.FfmpegProgressDecoder
|
|||||||
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
|
import no.iktdev.mediaprocessing.ffmpeg.util.UtcNow
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.io.FileOutputStream
|
import java.io.FileOutputStream
|
||||||
|
import java.time.ZoneId
|
||||||
import java.time.format.DateTimeFormatter
|
import java.time.format.DateTimeFormatter
|
||||||
|
|
||||||
open class FFmpeg(val executable: String, val logDir: File) {
|
open class FFmpeg(val executable: String, val logDir: File) {
|
||||||
@ -20,9 +21,12 @@ open class FFmpeg(val executable: String, val logDir: File) {
|
|||||||
private val outputCache = mutableListOf<String>()
|
private val outputCache = mutableListOf<String>()
|
||||||
|
|
||||||
//region Log File formatting
|
//region Log File formatting
|
||||||
val currentDateTime = UtcNow()
|
val currentDateTime = UtcNow() // Instant, alltid UTC
|
||||||
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd.HH.mm")
|
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd.HH.mm")
|
||||||
val formattedDateTime = currentDateTime.format(formatter)
|
val formattedDateTime = currentDateTime
|
||||||
|
.atZone(ZoneId.systemDefault())
|
||||||
|
.format(formatter)
|
||||||
|
|
||||||
//endregion
|
//endregion
|
||||||
lateinit var logFile: File
|
lateinit var logFile: File
|
||||||
|
|
||||||
|
|||||||
@ -1,8 +1,5 @@
|
|||||||
package no.iktdev.mediaprocessing.ffmpeg.util
|
package no.iktdev.mediaprocessing.ffmpeg.util
|
||||||
|
|
||||||
import java.time.Clock
|
import java.time.Instant
|
||||||
import java.time.LocalDateTime
|
|
||||||
|
|
||||||
fun UtcNow(): LocalDateTime {
|
fun UtcNow(): Instant = Instant.now()
|
||||||
return LocalDateTime.now(Clock.systemUTC())
|
|
||||||
}
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user