Update
This commit is contained in:
parent
039542d27c
commit
9af35dbaa7
@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
|
|||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
data class ExecutionBlock(
|
data class ExecutionBlock(
|
||||||
|
val workId: String,
|
||||||
val type: String,
|
val type: String,
|
||||||
val work: suspend () -> Int
|
val work: suspend () -> Int
|
||||||
)
|
)
|
||||||
@ -56,8 +57,9 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
|
|||||||
val job = processWorkItem(workItem)
|
val job = processWorkItem(workItem)
|
||||||
inProgressJobs.add(job)
|
inProgressJobs.add(job)
|
||||||
job.invokeOnCompletion {
|
job.invokeOnCompletion {
|
||||||
|
logger.info { "OnCompletion invoked!\n\nWorkId: ${workItem.workId}-${workItem.type} \n\tCurrent active worksers: ${jobsInProgress.get()}" }
|
||||||
val workers = jobsInProgress.decrementAndGet()
|
val workers = jobsInProgress.decrementAndGet()
|
||||||
logger.info { "Available workers: ${workers}/${maxConcurrentJobs}" }
|
logger.info { "Worker Released: Available: ${workers}/${maxConcurrentJobs}" }
|
||||||
inProgressJobs.remove(job)
|
inProgressJobs.remove(job)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -86,7 +88,7 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
|
|||||||
logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" }
|
logger.info { "\nreferenceId: ${message.referenceId} \nStarting encoding. \nWorkId: ${data.workId}" }
|
||||||
encodeDaemon.runUsingWorkItem()
|
encodeDaemon.runUsingWorkItem()
|
||||||
}
|
}
|
||||||
val result = queue.trySend(ExecutionBlock("encode", workBlock))
|
val result = queue.trySend(ExecutionBlock(work.workId, "encode", workBlock))
|
||||||
val statusType = when (result.isClosed) {
|
val statusType = when (result.isClosed) {
|
||||||
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
|
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
|
||||||
false -> {
|
false -> {
|
||||||
@ -111,13 +113,14 @@ class RunnerCoordinator(private var maxConcurrentJobs: Int = 1) {
|
|||||||
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
|
producer.sendMessage(KafkaEvents.EVENT_ENCODER_SUBTITLE_FILE_QUEUED.event, message.withNewStatus(Status(StatusType.PENDING)))
|
||||||
try {
|
try {
|
||||||
if (message.data != null && message.data is ExtractWork) {
|
if (message.data != null && message.data is ExtractWork) {
|
||||||
|
val work = message.data as ExtractWork
|
||||||
val workBlock = suspend {
|
val workBlock = suspend {
|
||||||
val data: ExtractWork = message.data as ExtractWork
|
val data: ExtractWork = work
|
||||||
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
val extractDaemon = ExtractDaemon(message.referenceId, data, extractListener)
|
||||||
logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" }
|
logger.info { "\nreferenceId: ${message.referenceId} \nStarting extracting. \nWorkId: ${data.workId}" }
|
||||||
extractDaemon.runUsingWorkItem()
|
extractDaemon.runUsingWorkItem()
|
||||||
}
|
}
|
||||||
val result = queue.trySend(ExecutionBlock("extract", workBlock))
|
val result = queue.trySend(ExecutionBlock(work.workId,"extract", workBlock))
|
||||||
val statusType = when (result.isClosed) {
|
val statusType = when (result.isClosed) {
|
||||||
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
|
true -> StatusType.IGNORED // Køen er lukket, jobben ble ignorert
|
||||||
false -> {
|
false -> {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user