Introduced logging in processer
This commit is contained in:
parent
8aeae926fb
commit
4708252aea
@ -3,18 +3,18 @@ package no.iktdev.mediaprocessing.processer
|
|||||||
import kotlinx.coroutines.CoroutineScope
|
import kotlinx.coroutines.CoroutineScope
|
||||||
import kotlinx.coroutines.Dispatchers
|
import kotlinx.coroutines.Dispatchers
|
||||||
import kotlinx.coroutines.launch
|
import kotlinx.coroutines.launch
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
import no.iktdev.eventi.models.store.TaskStatus
|
||||||
import no.iktdev.eventi.tasks.TaskPollerImplementation
|
import no.iktdev.eventi.tasks.TaskPollerImplementation
|
||||||
import no.iktdev.eventi.tasks.TaskReporter
|
import no.iktdev.eventi.tasks.TaskReporter
|
||||||
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
import no.iktdev.mediaprocessing.shared.database.stores.EventStore
|
||||||
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
|
import no.iktdev.mediaprocessing.shared.database.stores.TaskStore
|
||||||
|
|
||||||
import org.springframework.boot.ApplicationArguments
|
import org.springframework.boot.ApplicationArguments
|
||||||
import org.springframework.boot.ApplicationRunner
|
import org.springframework.boot.ApplicationRunner
|
||||||
import org.springframework.stereotype.Component
|
import org.springframework.stereotype.Component
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import java.util.UUID
|
import java.util.*
|
||||||
|
|
||||||
@Component
|
@Component
|
||||||
class PollerAdministrator(
|
class PollerAdministrator(
|
||||||
@ -41,7 +41,10 @@ class TaskPoller(
|
|||||||
|
|
||||||
@Component
|
@Component
|
||||||
class DefaultTaskReporter() : TaskReporter {
|
class DefaultTaskReporter() : TaskReporter {
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
override fun markClaimed(taskId: UUID, workerId: String) {
|
override fun markClaimed(taskId: UUID, workerId: String) {
|
||||||
|
log.info { "$workerId claiming task $taskId" }
|
||||||
TaskStore.claim(taskId, workerId)
|
TaskStore.claim(taskId, workerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,6 +53,7 @@ class DefaultTaskReporter() : TaskReporter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun markConsumed(taskId: UUID) {
|
override fun markConsumed(taskId: UUID) {
|
||||||
|
log.info { "Marking task $taskId as completed" }
|
||||||
TaskStore.markConsumed(taskId, TaskStatus.Completed)
|
TaskStore.markConsumed(taskId, TaskStatus.Completed)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,9 +1,10 @@
|
|||||||
package no.iktdev.mediaprocessing.processer.listeners
|
package no.iktdev.mediaprocessing.processer.listeners
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
import no.iktdev.eventi.models.store.TaskStatus
|
||||||
import no.iktdev.eventi.tasks.TaskListener
|
import no.iktdev.eventi.tasks.TaskReporter
|
||||||
import no.iktdev.eventi.tasks.TaskType
|
import no.iktdev.eventi.tasks.TaskType
|
||||||
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
|
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
|
||||||
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
|
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
|
||||||
@ -12,14 +13,25 @@ import no.iktdev.mediaprocessing.processer.Util
|
|||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
|
import no.iktdev.mediaprocessing.shared.common.event_task_contract.events.ProcesserExtractResultEvent
|
||||||
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask
|
import no.iktdev.mediaprocessing.shared.common.event_task_contract.tasks.ExtractSubtitleTask
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
import java.util.UUID
|
import java.util.*
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
|
class SubtitleTaskListener: FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
|
|
||||||
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
|
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
|
||||||
|
|
||||||
override fun supports(task: Task) = task is ExtractSubtitleTask
|
override fun supports(task: Task) = task is ExtractSubtitleTask
|
||||||
|
|
||||||
|
override fun accept(task: Task, reporter: TaskReporter): Boolean {
|
||||||
|
val accepts = super.accept(task, reporter)
|
||||||
|
if (accepts) {
|
||||||
|
log.info { "${getWorkerId()} accepts subtitle task ${task.taskId}" }
|
||||||
|
}
|
||||||
|
return accepts
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
val taskData = task as ExtractSubtitleTask
|
val taskData = task as ExtractSubtitleTask
|
||||||
|
|
||||||
|
|||||||
@ -1,8 +1,10 @@
|
|||||||
package no.iktdev.mediaprocessing.processer.listeners
|
package no.iktdev.mediaprocessing.processer.listeners
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
import no.iktdev.eventi.models.Event
|
import no.iktdev.eventi.models.Event
|
||||||
import no.iktdev.eventi.models.Task
|
import no.iktdev.eventi.models.Task
|
||||||
import no.iktdev.eventi.models.store.TaskStatus
|
import no.iktdev.eventi.models.store.TaskStatus
|
||||||
|
import no.iktdev.eventi.tasks.TaskReporter
|
||||||
import no.iktdev.eventi.tasks.TaskType
|
import no.iktdev.eventi.tasks.TaskType
|
||||||
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
|
import no.iktdev.mediaprocessing.ffmpeg.FFmpeg
|
||||||
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
|
import no.iktdev.mediaprocessing.ffmpeg.arguments.MpegArgument
|
||||||
@ -17,11 +19,20 @@ import java.util.*
|
|||||||
|
|
||||||
@Service
|
@Service
|
||||||
class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
|
class VideoTaskListener(private var coordinatorWebClient: CoordinatorClient): FfmpegTaskListener(TaskType.CPU_INTENSIVE) {
|
||||||
|
private val log = KotlinLogging.logger {}
|
||||||
|
|
||||||
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
|
override fun getWorkerId() = "${this::class.java.simpleName}-${taskType}-${UUID.randomUUID()}"
|
||||||
|
|
||||||
override fun supports(task: Task) = task is EncodeTask
|
override fun supports(task: Task) = task is EncodeTask
|
||||||
|
|
||||||
|
override fun accept(task: Task, reporter: TaskReporter): Boolean {
|
||||||
|
val accepts = super.accept(task, reporter)
|
||||||
|
if (accepts) {
|
||||||
|
log.info { "${getWorkerId()} accepts video task ${task.taskId}" }
|
||||||
|
}
|
||||||
|
return accepts
|
||||||
|
}
|
||||||
|
|
||||||
override suspend fun onTask(task: Task): Event? {
|
override suspend fun onTask(task: Task): Event? {
|
||||||
val taskData = task as EncodeTask
|
val taskData = task as EncodeTask
|
||||||
val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also {
|
val cachedOutFile = Util.getTemporaryStoreFile(taskData.data.outputFileName).also {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user