This commit is contained in:
bskjon 2024-07-20 12:03:00 +02:00
parent 199cee8594
commit 072cb2b192
25 changed files with 152 additions and 54 deletions

View File

@ -14,14 +14,18 @@ import no.iktdev.mediaprocessing.shared.contract.data.ConvertData
import no.iktdev.mediaprocessing.shared.contract.data.ConvertWorkPerformed
import no.iktdev.mediaprocessing.shared.contract.data.ConvertedData
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.stereotype.Service
@Service
class ConvertServiceV2(
class ConvertService(
@Autowired var tasks: TaskCoordinator,
) : TaskService(), ConvertListener, TaskCoordinator.TaskEvents {
fun getProducerName(): String {
return this::class.java.simpleName
}
override val log = KotlinLogging.logger {}
override val logDir = ConverterEnv.logDirectory
@ -86,7 +90,8 @@ class ConvertServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Success
status = EventStatus.Success,
source = getProducerName()
),
data = ConvertedData(
outputFiles = outputFiles
@ -105,7 +110,8 @@ class ConvertServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Failed
status = EventStatus.Failed,
source = getProducerName()
)
))
}

View File

@ -29,6 +29,10 @@ class Coordinator(
init {
}
fun getProducerName(): String {
return this::class.java.simpleName
}
public fun startProcess(file: File, type: ProcessType) {
val operations: List<StartOperationEvents> = listOf(
StartOperationEvents.ENCODE,
@ -43,7 +47,8 @@ class Coordinator(
val event = MediaProcessStartEvent(
metadata = EventMetadata(
referenceId = referenceId.toString(),
status = EventStatus.Success
status = EventStatus.Success,
source = getProducerName()
),
data = StartEventData(
file = file.absolutePath,
@ -71,7 +76,8 @@ class Coordinator(
metadata = EventMetadata(
referenceId = referenceId,
derivedFromEventId = eventToAttachTo.eventId(),
status = EventStatus.Success
status = EventStatus.Success,
source = getProducerName()
),
data = message
))

View File

@ -26,7 +26,9 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
override val produceEvent: Events = Events.EventMediaReadBaseInfoPerformed
override val listensForEvents: List<Events> = listOf(Events.EventMediaProcessStarted)
override fun getProducerName(): String {
return this::class.java.simpleName
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<Event>, events: List<Event>) {
val event = incomingEvent.consume()
@ -36,11 +38,11 @@ class BaseInfoFromFileTaskListener() : CoordinatorEventListener() {
}
val message = try {
readFileInfo(event.data as StartEventData, event.metadata.eventId)?.let {
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success), data = it)
} ?: BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed))
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()), data = it)
} ?: BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()))
} catch (e: Exception) {
e.printStackTrace()
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed))
BaseInfoEvent(metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName()))
}
onProduceEvent(message)
}

View File

@ -33,6 +33,9 @@ class CompletedTaskListener: CoordinatorEventListener() {
var doNotProduceComplete = System.getenv("DISABLE_COMPLETE").toBoolean() ?: false
override fun getProducerName(): String {
return this::class.java.simpleName
}
override fun onReady() {
super.onReady()
@ -410,7 +413,7 @@ class CompletedTaskListener: CoordinatorEventListener() {
onProduceEvent(MediaProcessCompletedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = CompletedEventData(
events.map { it.eventId() }
)

View File

@ -24,6 +24,10 @@ import java.io.File
class ConvertWorkTaskListener: WorkTaskListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkConvertCreated
@ -73,7 +77,7 @@ class ConvertWorkTaskListener: WorkTaskListener() {
val convertFile = file?.let { File(it) }
if (convertFile == null || !convertFile.exists()) {
onProduceEvent(ConvertWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
))
return
} else {
@ -86,7 +90,7 @@ class ConvertWorkTaskListener: WorkTaskListener() {
ConvertWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = convertData
).also { event ->
onProduceEvent(event)

View File

@ -21,6 +21,10 @@ import java.io.File
class CoverDownloadTaskListener : CoordinatorEventListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkDownloadCoverPerformed
@ -34,7 +38,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
val failedEventDefault = MediaCoverDownloadedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
val data = event.az<MediaCoverInfoReceivedEvent>()?.data
@ -83,7 +87,7 @@ class CoverDownloadTaskListener : CoordinatorEventListener() {
return
}
onProduceEvent(MediaCoverDownloadedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = DownloadedCover(result.absolutePath)
))
}

View File

@ -18,6 +18,9 @@ import org.springframework.stereotype.Service
class CoverFromMetadataTaskListener: CoordinatorEventListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -73,11 +76,11 @@ class CoverFromMetadataTaskListener: CoordinatorEventListener() {
val result = if (coverUrl.isNullOrBlank()) {
log.warn { "No cover available for ${baseInfo.title}" }
MediaCoverInfoReceivedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Skipped)
metadata = event.makeDerivedEventInfo(EventStatus.Skipped, getProducerName())
)
} else {
MediaCoverInfoReceivedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = CoverDetails(
url = coverUrl,
outFileBaseName = NameHelper.normalize(coverTitle),

View File

@ -22,6 +22,9 @@ import java.io.File
class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -74,11 +77,11 @@ class EncodeWorkArgumentsTaskListener: CoordinatorEventListener() {
val result = mapper.getArguments()
if (result == null) {
onProduceEvent(EncodeArgumentCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
))
} else {
onProduceEvent(EncodeArgumentCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = result
))
}

View File

@ -22,6 +22,10 @@ import org.springframework.stereotype.Service
class EncodeWorkTaskListener : WorkTaskListener() {
private val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkEncodeCreated
@ -55,7 +59,7 @@ class EncodeWorkTaskListener : WorkTaskListener() {
return
}
EncodeWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = encodeArguments
).also { event ->
onProduceEvent(event)

View File

@ -21,6 +21,10 @@ import java.io.File
class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventMediaParameterExtractCreated
@ -68,11 +72,11 @@ class ExtractWorkArgumentsTaskListener: CoordinatorEventListener() {
val result = mapper.getArguments()
if (result.isEmpty()) {
onProduceEvent(ExtractArgumentCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Skipped)
metadata = event.makeDerivedEventInfo(EventStatus.Skipped, getProducerName())
))
} else {
onProduceEvent(ExtractArgumentCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = result
))
}

View File

@ -22,6 +22,10 @@ import org.springframework.stereotype.Service
class ExtractWorkTaskListener: WorkTaskListener() {
private val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
override val produceEvent: Events = Events.EventWorkExtractCreated
@ -62,14 +66,14 @@ class ExtractWorkTaskListener: WorkTaskListener() {
}
if (arguments.isEmpty()) {
ExtractWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
return
}
arguments.mapNotNull {
ExtractWorkCreatedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = it
)
}.forEach { event ->

View File

@ -27,6 +27,11 @@ import java.io.FileFilter
@Service
class MediaOutInformationTaskListener: CoordinatorEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -52,7 +57,7 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
log.error { "Required event ${Events.EventMediaReadBaseInfoPerformed} is not present" }
coordinator?.produceNewEvent(
MediaOutInformationConstructedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
)
return
@ -65,12 +70,12 @@ class MediaOutInformationTaskListener: CoordinatorEventListener() {
outDirectory = pm.getOutputDirectory().absolutePath,
info = vi
).let { MediaOutInformationConstructedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = it
) }
} else {
MediaOutInformationConstructedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
}
onProduceEvent(result)

View File

@ -29,6 +29,11 @@ val metadataTimeoutMinutes: Int = System.getenv("METADATA_TIMEOUT")?.toIntOrNull
@Service
@EnableScheduling
class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -101,7 +106,8 @@ class MetadataWaitOrDefaultTaskListener() : CoordinatorEventListener() {
metadata = EventMetadata(
referenceId = it.key,
derivedFromEventId = it.value.eventId,
status = EventStatus.Skipped
status = EventStatus.Skipped,
source = getProducerName()
)
)

View File

@ -27,6 +27,10 @@ import org.springframework.stereotype.Service
class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
val log = KotlinLogging.logger {}
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -50,13 +54,13 @@ class ParseMediaFileStreamsTaskListener() : CoordinatorEventListener() {
val readData = event.dataAs<JsonObject>()
val result = try {
MediaFileStreamsParsedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = parseStreams(readData)
)
} catch (e: Exception) {
e.printStackTrace()
MediaFileStreamsParsedEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
}
onProduceEvent(result)

View File

@ -27,6 +27,11 @@ import java.io.File
@Service
class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: Coordinator? = null
@ -57,13 +62,13 @@ class ReadMediaFileStreamsTaskListener() : CoordinatorEventListener() {
try {
val data = fileReadStreams(startEvent, event.metadata.eventId)
MediaFileStreamsReadEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = data
)
} catch (e: Exception) {
e.printStackTrace()
MediaFileStreamsReadEvent(
metadata = event.makeDerivedEventInfo(EventStatus.Failed)
metadata = event.makeDerivedEventInfo(EventStatus.Failed, getProducerName())
)
}
}

View File

@ -4,7 +4,6 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import no.iktdev.eventi.core.WGson
import no.iktdev.eventi.data.EventMetadata
import no.iktdev.eventi.data.EventStatus
import no.iktdev.mediaprocessing.processer.ProcesserEnv
@ -15,9 +14,7 @@ import no.iktdev.mediaprocessing.processer.ffmpeg.FfmpegTaskService
import no.iktdev.mediaprocessing.processer.ffmpeg.progress.FfmpegDecodedProgress
import no.iktdev.mediaprocessing.processer.taskManager
import no.iktdev.mediaprocessing.shared.common.persistance.Status
import no.iktdev.mediaprocessing.shared.common.persistance.events
import no.iktdev.mediaprocessing.shared.common.task.Task
import no.iktdev.mediaprocessing.shared.contract.Events
import no.iktdev.mediaprocessing.shared.contract.data.EncodeArgumentData
import no.iktdev.mediaprocessing.shared.contract.data.EncodeWorkPerformedEvent
import no.iktdev.mediaprocessing.shared.contract.data.EncodedData
@ -29,11 +26,15 @@ import java.io.File
import java.time.Duration
@Service
class EncodeServiceV2(
class EncodeService(
@Autowired var tasks: TaskCoordinator,
@Autowired private val reporter: Reporter
) : FfmpegTaskService(), TaskCoordinator.TaskEvents {
fun getProducerName(): String {
return this::class.java.simpleName
}
override val log = KotlinLogging.logger {}
override val logDir = ProcesserEnv.encodeLogDirectory
@ -139,7 +140,8 @@ class EncodeServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Success
status = EventStatus.Success,
source = getProducerName()
),
data = EncodedData(
outputFile
@ -168,7 +170,8 @@ class EncodeServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Failed
status = EventStatus.Failed,
source = getProducerName()
)
))
sendProgress(

View File

@ -24,11 +24,15 @@ import org.springframework.stereotype.Service
import java.io.File
@Service
class ExtractServiceV2(
class ExtractService(
@Autowired var tasks: TaskCoordinator,
@Autowired private val reporter: Reporter
) : FfmpegTaskService(), TaskCoordinator.TaskEvents {
fun getProducerName(): String {
return this::class.java.simpleName
}
override val log = KotlinLogging.logger {}
override val logDir = ProcesserEnv.encodeLogDirectory
@ -111,7 +115,8 @@ class ExtractServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Success
status = EventStatus.Success,
source = getProducerName()
),
data = ExtractedData(
outputFile
@ -141,7 +146,8 @@ class ExtractServiceV2(
metadata = EventMetadata(
referenceId = task.referenceId,
derivedFromEventId = task.eventId,
status = EventStatus.Failed
status = EventStatus.Failed,
source = getProducerName()
)
)
)

View File

@ -34,7 +34,7 @@ events_server_address = os.environ.get("DATABASE_ADDRESS") or "192.168.2.250" #
events_server_port = os.environ.get("DATABASE_PORT") or "3306"
events_server_database_name = os.environ.get("DATABASE_NAME_E") or "eventsV3" # "events"
events_server_username = os.environ.get("DATABASE_USERNAME") or "root"
events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root"
events_server_password = os.environ.get("DATABASE_PASSWORD") or "shFZ27eL2x2NoxyEDBMfDWkvFO" #"root" // default password
@ -65,8 +65,8 @@ class EventsPullerThread(threading.Thread):
GROUP BY referenceId
HAVING
SUM(event = 'event:media-read-base-info:performed') > 0
AND SUM(event = 'event:media-metadata-search:performed') != 0
AND SUM(event = 'event:media-process:completed') != 0
AND SUM(event = 'event:media-metadata-search:performed') = 0
AND SUM(event = 'event:media-process:completed') = 0
)
AND event = 'event:media-read-base-info:performed';
""")
@ -211,7 +211,8 @@ class MetadataEventHandler:
eventId=str(uuid.uuid4()),
derivedFromEventId=event.metadata.eventId,
status="Failed" if result is None else "Success",
created=datetime.now().isoformat()
created=datetime.now().isoformat(),
source="metadataApp"
),
data=result,
eventType="EventMediaMetadataSearchPerformed"

View File

@ -11,6 +11,7 @@ class EventMetadata:
referenceId: str
status: str
created: datetime
source: str
def to_dict(self):
return asdict(self)
@ -53,7 +54,8 @@ def json_to_media_event(json_data: str) -> MediaEvent:
eventId=metadata_dict['eventId'],
referenceId=metadata_dict['referenceId'],
status=metadata_dict['status'],
created=parse_datetime(metadata_dict['created'])
created=parse_datetime(metadata_dict['created']),
source=metadata_dict['source']
)
event_data = EventData(

View File

@ -24,7 +24,8 @@ data class EventMetadata(
val eventId: String = UUID.randomUUID().toString(),
val referenceId: String,
val status: EventStatus,
val created: LocalDateTime = LocalDateTime.now()
val created: LocalDateTime = LocalDateTime.now(),
val source: String = "Unknown producer"
)
enum class EventStatus {

View File

@ -19,6 +19,7 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
onReady()
}
abstract fun getProducerName(): String
protected open fun onProduceEvent(event: T) {
coordinator?.produceNewEvent(event) ?: {
@ -82,11 +83,12 @@ abstract class EventListenerImpl<T: EventImpl, E: EventsManagerImpl<T>> {
*/
abstract fun onEventsReceived(incomingEvent: ConsumableEvent<T>, events: List<T>)
fun T.makeDerivedEventInfo(status: EventStatus): EventMetadata {
fun T.makeDerivedEventInfo(status: EventStatus, source: String): EventMetadata {
return EventMetadata(
referenceId = this.metadata.referenceId,
derivedFromEventId = this.metadata.eventId,
status = status
status = status,
source = source
)
}

View File

@ -12,6 +12,11 @@ import org.springframework.stereotype.Service
@Service
class FirstEventListener() : MockDataEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: MockEventCoordinator? = null
@ -30,7 +35,7 @@ class FirstEventListener() : MockDataEventListener() {
}
override fun onEventsReceived(incomingEvent: ConsumableEvent<EventImpl>, events: List<EventImpl>) {
val info = incomingEvent.consume()!!.makeDerivedEventInfo(EventStatus.Success)
val info = incomingEvent.consume()!!.makeDerivedEventInfo(EventStatus.Success, getProducerName())
onProduceEvent(FirstEvent(
eventType = produceEvent,
metadata = info,

View File

@ -14,6 +14,11 @@ import org.springframework.stereotype.Service
@Service
class ForthEventListener() : MockDataEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: MockEventCoordinator? = null
@ -35,7 +40,7 @@ class ForthEventListener() : MockDataEventListener() {
val event = incomingEvent.consume()
if (event == null)
return
val info = event.makeDerivedEventInfo(EventStatus.Success)
val info = event.makeDerivedEventInfo(EventStatus.Success, getProducerName())
onProduceEvent(InitEvent(
eventType = produceEvent,
metadata = info,

View File

@ -12,6 +12,11 @@ import org.springframework.stereotype.Service
@Service
class SecondEventListener() : MockDataEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: MockEventCoordinator? = null
@ -33,7 +38,7 @@ class SecondEventListener() : MockDataEventListener() {
val event = incomingEvent.consume()
if (event == null)
return
val info = event.makeDerivedEventInfo(EventStatus.Success)
val info = event.makeDerivedEventInfo(EventStatus.Success,getProducerName())
onProduceEvent(SecondEvent(
eventType = produceEvent,
metadata = info

View File

@ -15,6 +15,11 @@ import org.springframework.stereotype.Service
@Service
class ThirdEventListener() : MockDataEventListener() {
override fun getProducerName(): String {
return this::class.java.simpleName
}
@Autowired
override var coordinator: MockEventCoordinator? = null
@ -36,11 +41,11 @@ class ThirdEventListener() : MockDataEventListener() {
val event = incomingEvent.consume()
if (event == null)
return
val info = event.makeDerivedEventInfo(EventStatus.Success)
val info = event.makeDerivedEventInfo(EventStatus.Success, getProducerName())
(event as SecondEvent).data.elements.forEach { element ->
onProduceEvent(ThirdEvent(
eventType = produceEvent,
metadata = event.makeDerivedEventInfo(EventStatus.Success),
metadata = event.makeDerivedEventInfo(EventStatus.Success, getProducerName()),
data = element
)
)