Updated reader

This commit is contained in:
Brage 2023-07-18 17:09:12 +02:00
parent 6c5b3b88bc
commit 33190a8a62
9 changed files with 89 additions and 26 deletions

View File

@ -23,7 +23,7 @@ repositories {
}
dependencies {
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha13")
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha16")
implementation("no.iktdev:exfl:0.0.4-SNAPSHOT")
implementation("com.github.pgreze:kotlin-process:1.3.1")

View File

@ -6,8 +6,8 @@ import com.google.gson.reflect.TypeToken
import no.iktdev.streamit.content.common.streams.*
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.StatusType
import org.apache.kafka.clients.consumer.ConsumerRecord
import java.io.File

View File

@ -6,9 +6,9 @@ import no.iktdev.streamit.content.common.streams.MediaStreams
import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeInformation
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.Status
import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.pooled.IPooledEvents
import no.iktdev.streamit.library.kafka.listener.pooled.PooledEventMessageListener
@ -43,7 +43,7 @@ class EncodeStreamsProducer: IPooledEvents.OnEventsReceived {
private fun produceErrorMessage(referenceId: String, reason: String) {
val message = Message(referenceId = referenceId,
Status(statusType = StatusType.ERROR, errorMessage = reason)
Status(statusType = StatusType.ERROR, message = reason)
)
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
}

View File

@ -12,9 +12,9 @@ import no.iktdev.streamit.content.common.Naming
import no.iktdev.streamit.content.reader.ReaderEnv
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.Status
import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer

View File

@ -10,9 +10,9 @@ import no.iktdev.streamit.content.reader.ReaderEnv
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.KnownEvents.EVENT_READER_RECEIVED_FILE
import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.Status
import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
@ -34,14 +34,11 @@ class StreamsReader {
if (data.value().status.statusType != StatusType.SUCCESS) {
logger.info { "Ignoring event: ${data.key()} as status is not Success!" }
return
} else if (data.value().data !is String) {
logger.info { "Ignoring event: ${data.key()} as values is not of expected type!, ${data.value().data}" }
return
}
val dataValue = try {
Gson().fromJson(data.value().data as String, FileWatcher.FileResult::class.java)
} catch (e: Exception) {
logger.info { "Ignoring event: ${data.key()} as value failed to be converted" }
val dataValue = data.value().dataAs(FileWatcher.FileResult::class.java)
if (dataValue == null) {
logger.info { "Ignoring event: ${data.key()} as values is not of expected type!, ${data.value().data}" }
return
}
logger.info { "Preparing Probe for ${dataValue.file}" }

View File

@ -2,9 +2,9 @@ package no.iktdev.streamit.content.reader.analyzer
import no.iktdev.streamit.content.reader.Resources
import no.iktdev.streamit.library.kafka.KnownEvents
import no.iktdev.streamit.library.kafka.Message
import no.iktdev.streamit.library.kafka.Status
import no.iktdev.streamit.library.kafka.StatusType
import no.iktdev.streamit.library.kafka.dto.Message
import no.iktdev.streamit.library.kafka.dto.Status
import no.iktdev.streamit.library.kafka.dto.StatusType
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.jupiter.api.Test

View File

@ -0,0 +1,34 @@
package no.iktdev.streamit.content.reader.streams
import com.google.gson.Gson
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
import no.iktdev.streamit.library.kafka.dto.Message
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
import org.junit.jupiter.api.Assertions.*
import org.junit.jupiter.api.Test
class StreamsReaderTest {
@Test
fun testDecode() {
val data = """
{
"referenceId": "7b332099-c663-4158-84d0-9972770316bb",
"status": {
"statusType": "SUCCESS"
},
"data": {
"file": "/src/input/[AAA] Iseleve - 13 [1080p HEVC][00000].mkv",
"title": "Iseleve",
"desiredNewName": "Iseleve - 13 "
}
}
""".trimIndent()
assertDoesNotThrow {
val message = Gson().fromJson(data, Message::class.java)
val result = message.dataAs(FileWatcher.FileResult::class.java)
assertThat(result?.title).isEqualTo("Iseleve")
}
}
}

View File

@ -1,7 +1,12 @@
plugins {
id("java")
id("org.springframework.boot") version "2.7.4"
id("io.spring.dependency-management") version "1.0.14.RELEASE"
kotlin("jvm") version "1.6.21"
kotlin("plugin.spring") version "1.6.21"
}
base.archivesBaseName = "ui"
group = "no.iktdev.streamit.content"
version = "1.0-SNAPSHOT"
@ -10,8 +15,14 @@ repositories {
}
dependencies {
testImplementation(platform("org.junit:junit-bom:5.9.1"))
testImplementation("org.junit.jupiter:junit-jupiter")
implementation("org.springframework.boot:spring-boot-starter-graphql:2.7.4")
implementation("org.springframework.boot:spring-boot-starter-web:3.0.4")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.14.2")
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("com.google.code.gson:gson:2.9.0")
implementation("org.springframework.boot:spring-boot-starter-websocket:2.6.3")
}
tasks.test {

View File

@ -0,0 +1,21 @@
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.simp.config.MessageBrokerRegistry
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker
import org.springframework.web.socket.config.annotation.StompEndpointRegistry
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer
@Configuration
@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {
override fun registerStompEndpoints(registry: StompEndpointRegistry) {
registry.addEndpoint("/ws")
// .setAllowedOrigins("*")
.withSockJS()
}
override fun configureMessageBroker(registry: MessageBrokerRegistry) {
registry.enableSimpleBroker("/topic")
registry.setApplicationDestinationPrefixes("/app")
}
}