Updated with file out name
This commit is contained in:
parent
3a837cef0c
commit
31f4851401
@ -36,36 +36,16 @@ class Naming(val fileName: String) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
else -> cleanedFileName
|
else -> cleanedFileName
|
||||||
}
|
}.trim()
|
||||||
}
|
}
|
||||||
|
|
||||||
fun guessDesiredTitle(): String {
|
fun guessDesiredTitle(): String {
|
||||||
val desiredFileName = guessDesiredFileName()
|
val desiredFileName = guessDesiredFileName()
|
||||||
return if (desiredFileName.contains(" - ")) {
|
val result = if (desiredFileName.contains(" - ")) {
|
||||||
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
return desiredFileName.split(" - ").firstOrNull() ?: desiredFileName
|
||||||
} else desiredFileName
|
} else desiredFileName
|
||||||
}
|
|
||||||
|
|
||||||
|
return result.trim()
|
||||||
/**
|
|
||||||
* Checks whether the filename contains the keyword movie, if so, default to movie
|
|
||||||
*/
|
|
||||||
fun doesContainMovieKeywords(): Boolean {
|
|
||||||
return getMatch("[(](?<=\\()movie(?=\\))[)]")?.isBlank() ?: false
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return not null if matches "S01E01"
|
|
||||||
*/
|
|
||||||
fun isSeasonEpisodeDefined(): String? {
|
|
||||||
return getMatch("(?i)S[0-9]+E[0-9]+(?i)")
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return not null if matches " 2020 " or ".2020."
|
|
||||||
*/
|
|
||||||
fun isDefinedWithYear(): String? {
|
|
||||||
return getMatch("[ .][0-9]{4}[ .]")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,5 @@
|
|||||||
|
package no.iktdev.streamit.content.common.dto
|
||||||
|
|
||||||
|
data class ContentOutName(
|
||||||
|
val baseName: String
|
||||||
|
)
|
||||||
@ -0,0 +1,10 @@
|
|||||||
|
package no.iktdev.streamit.content.common.dto
|
||||||
|
|
||||||
|
data class Metadata(
|
||||||
|
val title: String,
|
||||||
|
val altTitle: List<String> = emptyList(),
|
||||||
|
val cover: String? = null,
|
||||||
|
val type: String,
|
||||||
|
val summary: String? = null,
|
||||||
|
val genres: List<String> = emptyList()
|
||||||
|
)
|
||||||
@ -23,7 +23,7 @@ repositories {
|
|||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha45")
|
implementation("no.iktdev.streamit.library:streamit-library-kafka:0.0.2-alpha47")
|
||||||
implementation("no.iktdev:exfl:0.0.8-SNAPSHOT")
|
implementation("no.iktdev:exfl:0.0.8-SNAPSHOT")
|
||||||
|
|
||||||
|
|
||||||
@ -39,6 +39,7 @@ dependencies {
|
|||||||
implementation("org.springframework.kafka:spring-kafka:2.8.5")
|
implementation("org.springframework.kafka:spring-kafka:2.8.5")
|
||||||
|
|
||||||
implementation(project(":CommonCode"))
|
implementation(project(":CommonCode"))
|
||||||
|
|
||||||
testImplementation("junit:junit:4.13.2")
|
testImplementation("junit:junit:4.13.2")
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter")
|
testImplementation("org.junit.jupiter:junit-jupiter")
|
||||||
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
|
testImplementation("org.junit.jupiter:junit-jupiter-api:5.8.1")
|
||||||
@ -48,6 +49,7 @@ dependencies {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
tasks.test {
|
tasks.test {
|
||||||
useJUnitPlatform()
|
useJUnitPlatform()
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,33 @@
|
|||||||
|
package no.iktdev.streamit.content.reader
|
||||||
|
|
||||||
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.Status
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
|
|
||||||
|
abstract class DefaultKafkaReader(val subId: String) {
|
||||||
|
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
|
||||||
|
val defaultConsumer = DefaultConsumer(subId = subId)
|
||||||
|
|
||||||
|
fun produceErrorMessage(baseMessage: Message, reason: String) {
|
||||||
|
val message = Message(
|
||||||
|
referenceId = baseMessage.referenceId,
|
||||||
|
actionType = baseMessage.actionType,
|
||||||
|
Status(statusType = StatusType.ERROR, message = reason)
|
||||||
|
)
|
||||||
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
fun produceMessage(event: KafkaEvents, baseMessage: Message, data: Any?) {
|
||||||
|
val message = Message(
|
||||||
|
referenceId = baseMessage.referenceId,
|
||||||
|
actionType = baseMessage.actionType,
|
||||||
|
Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED),
|
||||||
|
data = data
|
||||||
|
)
|
||||||
|
messageProducer.sendMessage(event.event, message)
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,6 +1,6 @@
|
|||||||
package no.iktdev.streamit.content.reader
|
package no.iktdev.streamit.content.reader
|
||||||
|
|
||||||
import no.iktdev.streamit.content.reader.analyzer.encoding.PreferenceReader
|
import no.iktdev.streamit.content.reader.analyzer.encoding.helpers.PreferenceReader
|
||||||
import org.springframework.boot.autoconfigure.SpringBootApplication
|
import org.springframework.boot.autoconfigure.SpringBootApplication
|
||||||
import org.springframework.boot.runApplication
|
import org.springframework.boot.runApplication
|
||||||
import org.springframework.context.ApplicationContext
|
import org.springframework.context.ApplicationContext
|
||||||
@ -10,6 +10,8 @@ class ReaderApplication
|
|||||||
|
|
||||||
val preference = PreferenceReader().getPreference()
|
val preference = PreferenceReader().getPreference()
|
||||||
private var context: ApplicationContext? = null
|
private var context: ApplicationContext? = null
|
||||||
|
|
||||||
|
@Suppress("unused")
|
||||||
fun getContext(): ApplicationContext? {
|
fun getContext(): ApplicationContext? {
|
||||||
return context
|
return context
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,47 +0,0 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer
|
|
||||||
|
|
||||||
import com.google.gson.Gson
|
|
||||||
import com.google.gson.JsonObject
|
|
||||||
import com.google.gson.reflect.TypeToken
|
|
||||||
import no.iktdev.streamit.content.common.streams.*
|
|
||||||
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import java.io.File
|
|
||||||
|
|
||||||
class EncodeStreamsMessageParser {
|
|
||||||
fun getFileNameFromEvent(records: MutableList<ConsumerRecord<String, Message>>): FileWatcher.FileResult? {
|
|
||||||
val file = records.find { it.key() == KnownEvents.EVENT_READER_RECEIVED_FILE.event } ?: return null
|
|
||||||
if (file.value().status.statusType != StatusType.SUCCESS) return null
|
|
||||||
return file.value().dataAs(FileWatcher.FileResult::class.java)
|
|
||||||
}
|
|
||||||
|
|
||||||
fun getMediaStreamsFromJsonString(streamAsJson: String): MediaStreams? {
|
|
||||||
val gson = Gson()
|
|
||||||
/*return gson.fromJson(streams.value().data as String, MediaStreams::class.java)*/
|
|
||||||
val jsonObject = gson.fromJson(streamAsJson, JsonObject::class.java)
|
|
||||||
|
|
||||||
val streamsJsonArray = jsonObject.getAsJsonArray("streams")
|
|
||||||
|
|
||||||
val rstreams = streamsJsonArray.mapNotNull { streamJson ->
|
|
||||||
val streamObject = streamJson.asJsonObject
|
|
||||||
|
|
||||||
val codecType = streamObject.get("codec_type").asString
|
|
||||||
if (streamObject.has("codec_name") && streamObject.get("codec_name").asString == "mjpeg") {
|
|
||||||
null
|
|
||||||
} else {
|
|
||||||
when (codecType) {
|
|
||||||
"video" -> gson.fromJson(streamObject, VideoStream::class.java)
|
|
||||||
"audio" -> gson.fromJson(streamObject, AudioStream::class.java)
|
|
||||||
"subtitle" -> gson.fromJson(streamObject, SubtitleStream::class.java)
|
|
||||||
else -> null //throw IllegalArgumentException("Unknown stream type: $codecType")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return MediaStreams(rstreams)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -0,0 +1,71 @@
|
|||||||
|
package no.iktdev.streamit.content.reader.analyzer.contentDeterminator
|
||||||
|
|
||||||
|
import mu.KotlinLogging
|
||||||
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
|
import no.iktdev.streamit.content.common.dto.ContentOutName
|
||||||
|
import no.iktdev.streamit.content.common.dto.Metadata
|
||||||
|
import no.iktdev.streamit.content.reader.DefaultKafkaReader
|
||||||
|
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
||||||
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.Status
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
|
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
|
||||||
|
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
|
||||||
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
|
@Service
|
||||||
|
class ContentDeterminate: DefaultKafkaReader("contentDeterminate"), ISequentialMessageEvent {
|
||||||
|
|
||||||
|
final val mainListener = object : SequentialMessageListener(
|
||||||
|
topic = CommonConfig.kafkaTopic,
|
||||||
|
consumer = defaultConsumer,
|
||||||
|
accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event,
|
||||||
|
subAccepts = listOf(KafkaEvents.EVENT_METADATA_OBTAINED.event),
|
||||||
|
deserializers = Deserializers().getDeserializers(),
|
||||||
|
this
|
||||||
|
) {}
|
||||||
|
|
||||||
|
init {
|
||||||
|
mainListener.listen()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
override fun getRequiredMessages(): List<String> {
|
||||||
|
return listOf(KafkaEvents.EVENT_READER_RECEIVED_FILE.event, KafkaEvents.EVENT_METADATA_OBTAINED.event)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
|
||||||
|
logger.info { "All messages are received" }
|
||||||
|
|
||||||
|
val initMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]
|
||||||
|
if (initMessage == null) {
|
||||||
|
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val fileResult = initMessage.data as FileWatcher.FileResult?
|
||||||
|
if (fileResult == null) {
|
||||||
|
produceErrorMessage(initMessage, "FileResult is either null or not deserializable!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val metadataMessage = result[KafkaEvents.EVENT_METADATA_OBTAINED.event]
|
||||||
|
val metadata = if (metadataMessage?.status?.statusType == StatusType.SUCCESS) metadataMessage.data as Metadata? else null
|
||||||
|
|
||||||
|
val baseFileName = if (metadata?.type == null) {
|
||||||
|
FileNameDeterminate(fileResult.title, fileResult.sanitizedName).getDeterminedFileName()
|
||||||
|
} else if (metadata.type.lowercase() == "movie") {
|
||||||
|
FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.MOVIE).getDeterminedFileName()
|
||||||
|
} else {
|
||||||
|
FileNameDeterminate(fileResult.title, fileResult.sanitizedName, FileNameDeterminate.ContentType.SERIE).getDeterminedFileName()
|
||||||
|
}
|
||||||
|
|
||||||
|
val out = ContentOutName(baseFileName)
|
||||||
|
produceMessage(KafkaEvents.EVENT_READER_DETERMINED_FILENAME, initMessage, out)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,37 @@
|
|||||||
|
package no.iktdev.streamit.content.reader.analyzer.contentDeterminator
|
||||||
|
|
||||||
|
import no.iktdev.streamit.content.common.dto.Metadata
|
||||||
|
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
||||||
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
|
import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization
|
||||||
|
|
||||||
|
class Deserializers {
|
||||||
|
|
||||||
|
val fileReceived = object : IMessageDataDeserialization<FileWatcher.FileResult> {
|
||||||
|
override fun deserialize(incomingMessage: Message): FileWatcher.FileResult? {
|
||||||
|
if (incomingMessage.status.statusType != StatusType.SUCCESS) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
return incomingMessage.dataAs(FileWatcher.FileResult::class.java)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val metadataReceived = object: IMessageDataDeserialization<Metadata> {
|
||||||
|
override fun deserialize(incomingMessage: Message): Metadata? {
|
||||||
|
if (incomingMessage.status.statusType != StatusType.SUCCESS) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
return incomingMessage.dataAs(Metadata::class.java)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
fun getDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||||
|
return mutableMapOf(
|
||||||
|
KafkaEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived,
|
||||||
|
KafkaEvents.EVENT_METADATA_OBTAINED.event to metadataReceived
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@ -0,0 +1,138 @@
|
|||||||
|
package no.iktdev.streamit.content.reader.analyzer.contentDeterminator
|
||||||
|
|
||||||
|
class FileNameDeterminate(val title: String, val sanitizedName: String, val ctype: ContentType = ContentType.UNDEFINED) {
|
||||||
|
|
||||||
|
enum class ContentType {
|
||||||
|
MOVIE,
|
||||||
|
SERIE,
|
||||||
|
UNDEFINED
|
||||||
|
}
|
||||||
|
|
||||||
|
fun getDeterminedFileName(): String {
|
||||||
|
return when (ctype) {
|
||||||
|
ContentType.MOVIE -> determineMovieFileName()
|
||||||
|
ContentType.SERIE -> determineSerieFileName()
|
||||||
|
ContentType.UNDEFINED -> determineUndefinedFileName()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun determineMovieFileName(): String {
|
||||||
|
val movieEx = MovieEx(title, sanitizedName)
|
||||||
|
val result = when {
|
||||||
|
movieEx.isDefinedWithYear() != null -> sanitizedName.replace(movieEx.isDefinedWithYear()!!, "").trim()
|
||||||
|
movieEx.doesContainMovieKeywords() -> sanitizedName.replace(Regex("(?i)\\s*\\(\\s*movie\\s*\\)\\s*"), "").trim()
|
||||||
|
else -> title
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun determineSerieFileName(): String {
|
||||||
|
val serieEx = SerieEx(title, sanitizedName)
|
||||||
|
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||||
|
val episodeNumberSingle = serieEx.findEpisodeNumber()
|
||||||
|
|
||||||
|
val seasonNumber = season ?: "1"
|
||||||
|
val episodeNumber = episode ?: (episodeNumberSingle ?: return sanitizedName)
|
||||||
|
val seasonEpisodeCombined = serieEx.getSeasonEpisodeCombined(seasonNumber, episodeNumber)
|
||||||
|
val episodeTitle = serieEx.findEpisodeTitle()
|
||||||
|
|
||||||
|
val useTitle = if (title == sanitizedName) {
|
||||||
|
if (title.contains(" - ")) {
|
||||||
|
title.split(" - ").firstOrNull() ?: title
|
||||||
|
} else {
|
||||||
|
val seasonNumberIndex = if (title.indexOf(seasonNumber) < 0) title.length -1 else title.indexOf(seasonNumber)
|
||||||
|
val episodeNumberIndex = if (title.indexOf(episodeNumber) < 0) title.length -1 else title.indexOf(episodeNumber)
|
||||||
|
val closest = listOf<Int>(seasonNumberIndex, episodeNumberIndex).min()
|
||||||
|
val shrunkenTitle = title.substring(0, closest)
|
||||||
|
if (closest - shrunkenTitle.lastIndexOf(" ") < 3) {
|
||||||
|
title.substring(0, shrunkenTitle.lastIndexOf(" "))
|
||||||
|
} else title.substring(0, closest)
|
||||||
|
|
||||||
|
}
|
||||||
|
} else title
|
||||||
|
|
||||||
|
return "${useTitle.trim()} - $seasonEpisodeCombined ${if (episodeTitle.isNullOrEmpty()) "" else "- $episodeTitle"}".trim()
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun determineUndefinedFileName(): String {
|
||||||
|
val serieEx = SerieEx(title, sanitizedName)
|
||||||
|
val (season, episode) = serieEx.findSeasonAndEpisode(sanitizedName)
|
||||||
|
return if (sanitizedName.contains(" - ") || season != null || episode != null) {
|
||||||
|
determineSerieFileName()
|
||||||
|
} else {
|
||||||
|
determineMovieFileName()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
open internal class Base(val title: String, val sanitizedName: String) {
|
||||||
|
fun getMatch(regex: String): String? {
|
||||||
|
return Regex(regex, RegexOption.IGNORE_CASE).find(sanitizedName)?.value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class MovieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||||
|
/**
|
||||||
|
* @return not null if matches " 2020 " or ".2020."
|
||||||
|
*/
|
||||||
|
fun isDefinedWithYear(): String? {
|
||||||
|
return getMatch("[ .][0-9]{4}[ .]")
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks whether the filename contains the keyword movie, if so, default to movie
|
||||||
|
*/
|
||||||
|
fun doesContainMovieKeywords(): Boolean {
|
||||||
|
return getMatch("[(](?<=\\()movie(?=\\))[)]")?.isNotBlank() ?: false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
internal class SerieEx(title: String, sanitizedName: String) : Base(title, sanitizedName) {
|
||||||
|
|
||||||
|
fun getSeasonEpisodeCombined(season: String, episode: String): String {
|
||||||
|
return StringBuilder()
|
||||||
|
.append("S")
|
||||||
|
.append(if (season.length < 2) season.padStart(2, '0') else season)
|
||||||
|
.append("E")
|
||||||
|
.append(if (episode.length < 2) episode.padStart(2, '0') else episode)
|
||||||
|
.toString().trim()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sjekken matcher tekst som dette:
|
||||||
|
* Cool - Season 1 Episode 13
|
||||||
|
* Cool - s1e13
|
||||||
|
* Cool - S1E13
|
||||||
|
* Cool - S1 13
|
||||||
|
*/
|
||||||
|
fun findSeasonAndEpisode(inputText: String): Pair<String?, String?> {
|
||||||
|
val regex = Regex("""(?i)\b(?:S|Season)\s*(\d+).*?(?:E|Episode)?\s*(\d+)\b""")
|
||||||
|
val matchResult = regex.find(inputText)
|
||||||
|
val season = matchResult?.groups?.get(1)?.value
|
||||||
|
val episode = matchResult?.groups?.get(2)?.value
|
||||||
|
return season to episode
|
||||||
|
}
|
||||||
|
|
||||||
|
fun findEpisodeNumber(): String? {
|
||||||
|
val regex = Regex("\\b(\\d+)\\b")
|
||||||
|
val matchResult = regex.find(sanitizedName)
|
||||||
|
return matchResult?.value?.trim()
|
||||||
|
}
|
||||||
|
|
||||||
|
fun findEpisodeTitle(): String? {
|
||||||
|
val seCombo = findSeasonAndEpisode(sanitizedName)
|
||||||
|
val episodeNumber = findEpisodeNumber()
|
||||||
|
|
||||||
|
val startPosition = if (seCombo.second != null) sanitizedName.indexOf(seCombo.second!!)+ seCombo.second!!.length
|
||||||
|
else if (episodeNumber != null) sanitizedName.indexOf(episodeNumber) + episodeNumber.length else 0
|
||||||
|
val availableText = sanitizedName.substring(startPosition)
|
||||||
|
|
||||||
|
val cleanedEpisodeTitle = availableText.replace(Regex("""(?i)\b(?:season|episode|ep)\b"""), "")
|
||||||
|
.replace(Regex("""^\s*-\s*"""), "")
|
||||||
|
.replace(Regex("""\s+"""), " ")
|
||||||
|
.trim()
|
||||||
|
|
||||||
|
return cleanedEpisodeTitle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,14 +1,14 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer
|
package no.iktdev.streamit.content.reader.analyzer.encoding
|
||||||
|
|
||||||
import com.google.gson.Gson
|
import com.google.gson.Gson
|
||||||
import com.google.gson.JsonObject
|
import com.google.gson.JsonObject
|
||||||
import com.sun.net.httpserver.Authenticator.Success
|
import no.iktdev.streamit.content.common.dto.ContentOutName
|
||||||
import no.iktdev.streamit.content.common.streams.AudioStream
|
import no.iktdev.streamit.content.common.streams.AudioStream
|
||||||
import no.iktdev.streamit.content.common.streams.MediaStreams
|
import no.iktdev.streamit.content.common.streams.MediaStreams
|
||||||
import no.iktdev.streamit.content.common.streams.SubtitleStream
|
import no.iktdev.streamit.content.common.streams.SubtitleStream
|
||||||
import no.iktdev.streamit.content.common.streams.VideoStream
|
import no.iktdev.streamit.content.common.streams.VideoStream
|
||||||
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization
|
import no.iktdev.streamit.library.kafka.listener.sequential.IMessageDataDeserialization
|
||||||
@ -25,6 +25,17 @@ class EncodedDeserializers {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
val determinedFileNameReceived = object: IMessageDataDeserialization<ContentOutName> {
|
||||||
|
override fun deserialize(incomingMessage: Message): ContentOutName? {
|
||||||
|
if (incomingMessage.status.statusType != StatusType.SUCCESS) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
return incomingMessage.dataAs(ContentOutName::class.java)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
val mediaStreams = object : IMessageDataDeserialization<MediaStreams> {
|
val mediaStreams = object : IMessageDataDeserialization<MediaStreams> {
|
||||||
override fun deserialize(incomingMessage: Message): MediaStreams? {
|
override fun deserialize(incomingMessage: Message): MediaStreams? {
|
||||||
return try {
|
return try {
|
||||||
@ -66,8 +77,9 @@ class EncodedDeserializers {
|
|||||||
|
|
||||||
fun getDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
fun getDeserializers(): Map<String, IMessageDataDeserialization<*>> {
|
||||||
return mutableMapOf(
|
return mutableMapOf(
|
||||||
KnownEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived,
|
KafkaEvents.EVENT_READER_RECEIVED_FILE.event to fileReceived,
|
||||||
KnownEvents.EVENT_READER_RECEIVED_STREAMS.event to mediaStreams
|
KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event to mediaStreams,
|
||||||
|
KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event to determinedFileNameReceived
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1,17 +1,17 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer
|
package no.iktdev.streamit.content.reader.analyzer.encoding
|
||||||
|
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
|
import no.iktdev.streamit.content.common.dto.ContentOutName
|
||||||
import no.iktdev.streamit.content.common.streams.MediaStreams
|
import no.iktdev.streamit.content.common.streams.MediaStreams
|
||||||
import no.iktdev.streamit.content.reader.analyzer.encoding.EncodeArgumentSelector
|
|
||||||
import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation
|
import no.iktdev.streamit.content.reader.analyzer.encoding.dto.EncodeInformation
|
||||||
|
import no.iktdev.streamit.content.reader.analyzer.encoding.helpers.EncodeArgumentSelector
|
||||||
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
import no.iktdev.streamit.library.kafka.dto.Status
|
import no.iktdev.streamit.library.kafka.dto.Status
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.ActionType
|
|
||||||
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
|
import no.iktdev.streamit.library.kafka.listener.sequential.ISequentialMessageEvent
|
||||||
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
|
import no.iktdev.streamit.library.kafka.listener.sequential.SequentialMessageListener
|
||||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
@ -25,16 +25,17 @@ class EncodedStreams : ISequentialMessageEvent {
|
|||||||
|
|
||||||
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
|
val messageProducer = DefaultProducer(CommonConfig.kafkaTopic)
|
||||||
|
|
||||||
final val defaultConsumer = DefaultConsumer(subId = "encodedStreams").apply {
|
final val defaultConsumer = DefaultConsumer(subId = "encodedStreams")
|
||||||
autoCommit = false
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
final val mainListener = object : SequentialMessageListener(
|
final val mainListener = object : SequentialMessageListener(
|
||||||
topic = CommonConfig.kafkaTopic,
|
topic = CommonConfig.kafkaTopic,
|
||||||
consumer = defaultConsumer,
|
consumer = defaultConsumer,
|
||||||
accept = KnownEvents.EVENT_READER_RECEIVED_FILE.event,
|
accept = KafkaEvents.EVENT_READER_RECEIVED_FILE.event,
|
||||||
subAccepts = listOf(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event),
|
subAccepts = listOf(
|
||||||
|
KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event,
|
||||||
|
KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event
|
||||||
|
),
|
||||||
deserializers = EncodedDeserializers().getDeserializers(),
|
deserializers = EncodedDeserializers().getDeserializers(),
|
||||||
this
|
this
|
||||||
) {}
|
) {}
|
||||||
@ -44,13 +45,64 @@ class EncodedStreams : ISequentialMessageEvent {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
override fun getRequiredMessages(): List<String> {
|
||||||
|
return listOf(KafkaEvents.EVENT_READER_RECEIVED_FILE.event, KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event)
|
||||||
|
}
|
||||||
|
|
||||||
|
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
|
||||||
|
logger.info { "All messages are received" }
|
||||||
|
val baseMessage = result[KafkaEvents.EVENT_READER_RECEIVED_FILE.event]
|
||||||
|
if (baseMessage == null) {
|
||||||
|
produceErrorMessage(
|
||||||
|
Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)),
|
||||||
|
"Initiator message not found!"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result.values.any { it?.status?.statusType != StatusType.SUCCESS }) {
|
||||||
|
produceErrorMessage(
|
||||||
|
Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)),
|
||||||
|
"Failed messages found!"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val fileResult = baseMessage.data as FileWatcher.FileResult?
|
||||||
|
if (fileResult == null) {
|
||||||
|
produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val determinedfnm = result[KafkaEvents.EVENT_READER_DETERMINED_FILENAME.event]
|
||||||
|
val determinedFileName = determinedfnm?.data as ContentOutName
|
||||||
|
|
||||||
|
val outFileName = if (determinedfnm.status.statusType == StatusType.SUCCESS)
|
||||||
|
determinedFileName.baseName
|
||||||
|
else fileResult.sanitizedName.ifBlank { File(fileResult.file).nameWithoutExtension }
|
||||||
|
|
||||||
|
|
||||||
|
val streams = result[KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams?
|
||||||
|
if (streams == null) {
|
||||||
|
produceErrorMessage(baseMessage, "No streams received!")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
val encodeInformation =
|
||||||
|
EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
|
||||||
|
produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments())
|
||||||
|
encodeInformation.getSubtitleArguments().forEach { s ->
|
||||||
|
produceEncodeMessage(baseMessage, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
private fun produceErrorMessage(baseMessage: Message, reason: String) {
|
private fun produceErrorMessage(baseMessage: Message, reason: String) {
|
||||||
val message = Message(
|
val message = Message(
|
||||||
referenceId = baseMessage.referenceId,
|
referenceId = baseMessage.referenceId,
|
||||||
actionType = baseMessage.actionType,
|
actionType = baseMessage.actionType,
|
||||||
Status(statusType = StatusType.ERROR, message = reason)
|
Status(statusType = StatusType.ERROR, message = reason)
|
||||||
)
|
)
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun produceEncodeMessage(baseMessage: Message, data: EncodeInformation?) {
|
private fun produceEncodeMessage(baseMessage: Message, data: EncodeInformation?) {
|
||||||
@ -60,51 +112,7 @@ class EncodedStreams : ISequentialMessageEvent {
|
|||||||
Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED),
|
Status(statusType = if (data != null) StatusType.SUCCESS else StatusType.IGNORED),
|
||||||
data = data
|
data = data
|
||||||
)
|
)
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_ENCODE_GENERATED.event, message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_ENCODE_GENERATED.event, message)
|
||||||
}
|
|
||||||
|
|
||||||
override fun areAllMessagesPresent(currentEvents: List<String>): Boolean {
|
|
||||||
val expected = listOf(KnownEvents.EVENT_READER_RECEIVED_FILE.event, KnownEvents.EVENT_READER_RECEIVED_STREAMS.event)
|
|
||||||
val waitingFor = expected.filter { !currentEvents.contains(it) }
|
|
||||||
return if (waitingFor.isEmpty()) {
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
logger.info { "Waiting for events: \n ${waitingFor.joinToString("\n\t")}" }
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override fun onAllMessagesProcessed(referenceId: String, result: Map<String, Message?>) {
|
|
||||||
logger.info { "All messages are received" }
|
|
||||||
val baseMessage = result[KnownEvents.EVENT_READER_RECEIVED_FILE.event]
|
|
||||||
if (baseMessage == null) {
|
|
||||||
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Initiator message not found!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result.values.any { it?.status?.statusType != StatusType.SUCCESS }) {
|
|
||||||
produceErrorMessage(Message(referenceId = referenceId, status = Status(statusType = StatusType.ERROR)), "Failed messages found!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
val fileResult = baseMessage.data as FileWatcher.FileResult?
|
|
||||||
if (fileResult == null) {
|
|
||||||
produceErrorMessage(baseMessage, "FileResult is either null or not deserializable!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
val outFileName = fileResult.desiredNewName.ifBlank { File(fileResult.file).nameWithoutExtension }
|
|
||||||
|
|
||||||
val streams = result[KnownEvents.EVENT_READER_RECEIVED_STREAMS.event]?.data as MediaStreams?
|
|
||||||
if (streams == null) {
|
|
||||||
produceErrorMessage(baseMessage, "No streams received!")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
val encodeInformation = EncodeArgumentSelector(inputFile = fileResult.file, streams = streams, outFileName = outFileName)
|
|
||||||
produceEncodeMessage(baseMessage, encodeInformation.getVideoAndAudioArguments())
|
|
||||||
encodeInformation.getSubtitleArguments().forEach { s ->
|
|
||||||
produceEncodeMessage(baseMessage, s)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer.encoding
|
package no.iktdev.streamit.content.reader.analyzer.encoding.helpers
|
||||||
|
|
||||||
import no.iktdev.streamit.content.common.streams.AudioStream
|
import no.iktdev.streamit.content.common.streams.AudioStream
|
||||||
import no.iktdev.streamit.content.common.streams.MediaStreams
|
import no.iktdev.streamit.content.common.streams.MediaStreams
|
||||||
@ -1,4 +1,4 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer.encoding
|
package no.iktdev.streamit.content.reader.analyzer.encoding.helpers
|
||||||
|
|
||||||
import com.google.gson.Gson
|
import com.google.gson.Gson
|
||||||
import no.iktdev.streamit.content.reader.ReaderEnv
|
import no.iktdev.streamit.content.reader.ReaderEnv
|
||||||
@ -10,13 +10,11 @@ import no.iktdev.exfl.coroutines.Coroutines
|
|||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
import no.iktdev.streamit.content.common.Naming
|
import no.iktdev.streamit.content.common.Naming
|
||||||
|
|
||||||
import no.iktdev.streamit.content.reader.ReaderEnv
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
import no.iktdev.streamit.library.kafka.dto.Status
|
import no.iktdev.streamit.library.kafka.dto.Status
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
|
|
||||||
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
@ -53,7 +51,7 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
object : SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KnownEvents.REQUEST_FILE_READ.event)) {
|
object : SimpleMessageListener(CommonConfig.kafkaTopic, defaultConsumer, listOf(KafkaEvents.REQUEST_FILE_READ.event)) {
|
||||||
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
override fun onMessageReceived(data: ConsumerRecord<String, Message>) {
|
||||||
if (data.value().status.statusType == StatusType.SUCCESS) {
|
if (data.value().status.statusType == StatusType.SUCCESS) {
|
||||||
if (data.value().data is String) {
|
if (data.value().data is String) {
|
||||||
@ -78,10 +76,10 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
val message = Message(
|
val message = Message(
|
||||||
referenceId = file.id,
|
referenceId = file.id,
|
||||||
status = Status(StatusType.SUCCESS),
|
status = Status(StatusType.SUCCESS),
|
||||||
data = FileResult(file = file.file.absolutePath, title = naming.guessDesiredTitle(), desiredNewName = naming.guessDesiredFileName())
|
data = FileResult(file = file.file.absolutePath, title = naming.guessDesiredTitle(), sanitizedName = naming.guessDesiredFileName())
|
||||||
)
|
)
|
||||||
logger.debug { "Producing message: ${Gson().toJson(message)}" }
|
logger.debug { "Producing message: ${Gson().toJson(message)}" }
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event, message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_FILE.event, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFilePending(file: PendingFile) {
|
override fun onFilePending(file: PendingFile) {
|
||||||
@ -89,7 +87,7 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
status = Status(StatusType.PENDING),
|
status = Status(StatusType.PENDING),
|
||||||
data = FileResult(file = file.file.absolutePath)
|
data = FileResult(file = file.file.absolutePath)
|
||||||
)
|
)
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFileFailed(file: PendingFile) {
|
override fun onFileFailed(file: PendingFile) {
|
||||||
@ -97,7 +95,7 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
status = Status(StatusType.ERROR),
|
status = Status(StatusType.ERROR),
|
||||||
data = file.file.absolutePath
|
data = file.file.absolutePath
|
||||||
)
|
)
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onFileRemoved(file: PendingFile) {
|
override fun onFileRemoved(file: PendingFile) {
|
||||||
@ -105,13 +103,13 @@ class FileWatcher: FileWatcherEvents {
|
|||||||
status = Status(StatusType.IGNORED),
|
status = Status(StatusType.IGNORED),
|
||||||
data = file.file.absolutePath
|
data = file.file.absolutePath
|
||||||
)
|
)
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_FILE.event , message)
|
||||||
}
|
}
|
||||||
|
|
||||||
data class FileResult(
|
data class FileResult(
|
||||||
val file: String,
|
val file: String,
|
||||||
val title: String = "",
|
val title: String = "",
|
||||||
val desiredNewName: String = ""
|
val sanitizedName: String = ""
|
||||||
)
|
)
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -1,6 +1,5 @@
|
|||||||
package no.iktdev.streamit.content.reader.streams
|
package no.iktdev.streamit.content.reader.streams
|
||||||
|
|
||||||
import com.google.gson.Gson
|
|
||||||
import kotlinx.coroutines.runBlocking
|
import kotlinx.coroutines.runBlocking
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import no.iktdev.streamit.content.common.CommonConfig
|
import no.iktdev.streamit.content.common.CommonConfig
|
||||||
@ -8,19 +7,19 @@ import no.iktdev.streamit.content.common.deamon.Daemon
|
|||||||
import no.iktdev.streamit.content.common.deamon.IDaemon
|
import no.iktdev.streamit.content.common.deamon.IDaemon
|
||||||
import no.iktdev.streamit.content.reader.ReaderEnv
|
import no.iktdev.streamit.content.reader.ReaderEnv
|
||||||
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
import no.iktdev.streamit.content.reader.fileWatcher.FileWatcher
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
import no.iktdev.streamit.library.kafka.KafkaEvents
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents.EVENT_READER_RECEIVED_FILE
|
import no.iktdev.streamit.library.kafka.KafkaEvents.EVENT_READER_RECEIVED_FILE
|
||||||
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
import no.iktdev.streamit.library.kafka.dto.Message
|
||||||
import no.iktdev.streamit.library.kafka.dto.Status
|
import no.iktdev.streamit.library.kafka.dto.Status
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
import no.iktdev.streamit.library.kafka.dto.StatusType
|
||||||
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
|
||||||
import no.iktdev.streamit.library.kafka.listener.EventMessageListener
|
|
||||||
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
import no.iktdev.streamit.library.kafka.listener.SimpleMessageListener
|
||||||
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
import no.iktdev.streamit.library.kafka.producer.DefaultProducer
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||||
import org.springframework.stereotype.Service
|
import org.springframework.stereotype.Service
|
||||||
|
|
||||||
private val logger = KotlinLogging.logger {}
|
private val logger = KotlinLogging.logger {}
|
||||||
|
|
||||||
@Service
|
@Service
|
||||||
class StreamsReader {
|
class StreamsReader {
|
||||||
|
|
||||||
@ -68,7 +67,7 @@ class StreamsReader {
|
|||||||
}
|
}
|
||||||
|
|
||||||
val message = Message(referenceId = data.value().referenceId, status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n"))
|
val message = Message(referenceId = data.value().referenceId, status = Status( statusType = if (resultCode == 0) StatusType.SUCCESS else StatusType.ERROR), data = output.joinToString("\n"))
|
||||||
messageProducer.sendMessage(KnownEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
|
messageProducer.sendMessage(KafkaEvents.EVENT_READER_RECEIVED_STREAMS.event, message)
|
||||||
}
|
}
|
||||||
|
|
||||||
}.listen()
|
}.listen()
|
||||||
|
|||||||
@ -1,34 +0,0 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer
|
|
||||||
|
|
||||||
import no.iktdev.streamit.content.reader.Resources
|
|
||||||
import no.iktdev.streamit.library.kafka.KnownEvents
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.Message
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.Status
|
|
||||||
import no.iktdev.streamit.library.kafka.dto.StatusType
|
|
||||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
|
||||||
import org.junit.jupiter.api.Test
|
|
||||||
|
|
||||||
import org.junit.jupiter.api.Assertions.*
|
|
||||||
|
|
||||||
class EncodeStreamsMessageParserTest {
|
|
||||||
val parser = EncodeStreamsMessageParser()
|
|
||||||
val baseEvent = Message(status = Status( statusType = StatusType.SUCCESS))
|
|
||||||
|
|
||||||
/*@Test
|
|
||||||
fun getFileNameFromEvent() {
|
|
||||||
val payload = Resources.Streams().getSample(3)
|
|
||||||
assertDoesNotThrow {
|
|
||||||
val msg = baseEvent.copy(data = payload)
|
|
||||||
val result = parser.getMediaStreamsFromEvent(mutableListOf(
|
|
||||||
Resources().getConsumerRecord(
|
|
||||||
KnownEvents.EVENT_READER_RECEIVED_STREAMS.event,
|
|
||||||
msg
|
|
||||||
)
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
fun getMediaStreamsFromEvent() {
|
|
||||||
}*/
|
|
||||||
}
|
|
||||||
@ -1,6 +1,7 @@
|
|||||||
package no.iktdev.streamit.content.reader.analyzer
|
package no.iktdev.streamit.content.reader.analyzer
|
||||||
|
|
||||||
import no.iktdev.streamit.content.common.streams.MediaStreams
|
import no.iktdev.streamit.content.common.streams.MediaStreams
|
||||||
|
import no.iktdev.streamit.content.reader.analyzer.encoding.EncodedDeserializers
|
||||||
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
import no.iktdev.streamit.library.kafka.consumers.DefaultConsumer
|
||||||
import org.assertj.core.api.Assertions.assertThat
|
import org.assertj.core.api.Assertions.assertThat
|
||||||
import org.junit.jupiter.api.Assertions.*
|
import org.junit.jupiter.api.Assertions.*
|
||||||
|
|||||||
@ -0,0 +1,125 @@
|
|||||||
|
package no.iktdev.streamit.content.reader.analyzer.contentDeterminator
|
||||||
|
|
||||||
|
import org.assertj.core.api.AssertionsForInterfaceTypes.assertThat
|
||||||
|
import org.junit.jupiter.api.Assertions.assertEquals
|
||||||
|
import org.junit.jupiter.api.Named
|
||||||
|
import org.junit.jupiter.api.Test
|
||||||
|
import org.junit.jupiter.params.ParameterizedTest
|
||||||
|
import org.junit.jupiter.params.provider.MethodSource
|
||||||
|
|
||||||
|
|
||||||
|
class FileNameDeterminateTest {
|
||||||
|
|
||||||
|
data class TestData(
|
||||||
|
val expected: String,
|
||||||
|
val input: String
|
||||||
|
)
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@MethodSource("serieTestCases")
|
||||||
|
fun testDetermineFileNameForSerie(namedTestData: TestData) {
|
||||||
|
val fileNameDeterminate =
|
||||||
|
FileNameDeterminate("Iseleve", namedTestData.input, FileNameDeterminate.ContentType.SERIE)
|
||||||
|
assertEquals(
|
||||||
|
namedTestData.expected,
|
||||||
|
fileNameDeterminate.getDeterminedFileName(),
|
||||||
|
"Test case: ${namedTestData.input}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest(name = "{0}")
|
||||||
|
@MethodSource("movieTestCases")
|
||||||
|
fun testDetermineFileNameForMovie(namedTestData: TestData) {
|
||||||
|
val fileNameDeterminate = FileNameDeterminate(
|
||||||
|
namedTestData.input, namedTestData.input, FileNameDeterminate.ContentType.MOVIE
|
||||||
|
)
|
||||||
|
assertEquals(
|
||||||
|
namedTestData.expected,
|
||||||
|
fileNameDeterminate.getDeterminedFileName(),
|
||||||
|
"Test case: ${namedTestData.input}"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest()
|
||||||
|
@MethodSource("undefinedTestCases")
|
||||||
|
fun testDetermineFileNameForUndefined(namedTestData: TestData) {
|
||||||
|
val fileNameDeterminate = FileNameDeterminate(
|
||||||
|
namedTestData.input, namedTestData.input, FileNameDeterminate.ContentType.UNDEFINED
|
||||||
|
)
|
||||||
|
assertThat(fileNameDeterminate.getDeterminedFileName()).isEqualTo(namedTestData.expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
fun test() {
|
||||||
|
val namedTestData = TestData("Game of Thrones - S01E01", "Game of Thrones - 01")
|
||||||
|
val fileNameDeterminate = FileNameDeterminate(
|
||||||
|
namedTestData.input, namedTestData.input, FileNameDeterminate.ContentType.UNDEFINED
|
||||||
|
)
|
||||||
|
assertThat(fileNameDeterminate.getDeterminedFileName()).isEqualTo(namedTestData.expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
companion object {
|
||||||
|
@JvmStatic
|
||||||
|
fun serieTestCases(): List<Named<TestData>> {
|
||||||
|
return listOf(
|
||||||
|
Named.of("Is defined", TestData("Iseleve - S01E13", "Iseleve - 13")),
|
||||||
|
Named.of("Contains episode title", TestData("Iseleve - S01E13 - potetmos", "Iseleve - 13 potetmos")),
|
||||||
|
Named.of("Season and Episode in S01E01 format", TestData("Iseleve - S01E13", "Iseleve - S1E13")),
|
||||||
|
Named.of(
|
||||||
|
"Season and Episode with episode title",
|
||||||
|
TestData("Iseleve - S01E13 - potetmos", "Iseleve - S1E13 potetmos")
|
||||||
|
),
|
||||||
|
Named.of("Season and Episode with space separator", TestData("Iseleve - S01E13", "Iseleve - S1 13")),
|
||||||
|
Named.of(
|
||||||
|
"Season and Episode with space separator and episode title",
|
||||||
|
TestData("Iseleve - S01E13 - potetos", "Iseleve - S1 13 potetos")
|
||||||
|
),
|
||||||
|
Named.of("Lowercase season and episode", TestData("Iseleve - S01E13", "Iseleve - s1e13")),
|
||||||
|
Named.of(
|
||||||
|
"Episode title with Season and Episode in text",
|
||||||
|
TestData("Iseleve - S01E13", "Iseleve - Season 1 Episode 13")
|
||||||
|
),
|
||||||
|
Named.of(
|
||||||
|
"Episode title with Season and Episode in text and episode title",
|
||||||
|
TestData("Iseleve - S01E13 - Potetmos", "Iseleve - Season 1 Episode 13 Potetmos")
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@JvmStatic
|
||||||
|
fun movieTestCases(): List<Named<TestData>> {
|
||||||
|
return listOf(
|
||||||
|
Named.of("Movie with year", TestData("Some Movie (2012)", "Some Movie (2012)")),
|
||||||
|
Named.of("Movie without year", TestData("Another Movie", "Another Movie")),
|
||||||
|
Named.of("Movie with year and additional info", TestData("Awesome Movie (2012) - Part 1", "Awesome Movie (2012) - Part 1")),
|
||||||
|
//Named.of("Movie with year and spaces", TestData("Space Odyssey (2010)", "Space Odyssey (2010)")),
|
||||||
|
//Named.of("Movie with year and parentheses", TestData("Sci-Fi Movie (2015)", "Sci-Fi Movie (((2015)))")),
|
||||||
|
//Named.of("Movie with year and hyphen", TestData("Action Flick (2008)", "Action Flick - 2008")),
|
||||||
|
//Named.of("Movie with year and brackets", TestData("Blockbuster (2011)", "Blockbuster [2011]")),
|
||||||
|
//Named.of("Movie with year and period", TestData("Time Travelers. (2022)", "Time Travelers. .2022.")),
|
||||||
|
//Named.of("Movie with year and underscores", TestData("Hidden Gem (1999)", "Hidden Gem _1999_")),
|
||||||
|
Named.of("Movie with title as '2012'", TestData("2012", "2012")),
|
||||||
|
Named.of("Movie with title as '2020'", TestData("2020 (2012)", "2020 (2012)")),
|
||||||
|
Named.of("Movie with title as '2049'", TestData("2049 (2017)", "2049 (2017)")),
|
||||||
|
Named.of("Movie with title as '3000'", TestData("3000 (2000)", "3000 (2000)"))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
@JvmStatic
|
||||||
|
fun undefinedTestCases(): List<Named<TestData>> {
|
||||||
|
return listOf(
|
||||||
|
Named.of("Undefined - Movie", TestData("Avengers - Endgame", "Avengers - Endgame")),
|
||||||
|
Named.of("Undefined - Series", TestData("Stranger Things", "Stranger Things")),
|
||||||
|
Named.of("Undefined - Movie with Year", TestData("Inception (2010)", "Inception (2010)")),
|
||||||
|
Named.of("Undefined - Series with Year", TestData("Friends (1994)", "Friends (1994)")),
|
||||||
|
Named.of("Undefined - Movie with Genre", TestData("The Dark Knight", "The Dark Knight")),
|
||||||
|
Named.of("Undefined - Series with Genre", TestData("Breaking Bad", "Breaking Bad")),
|
||||||
|
Named.of("Undefined - Movie with Keywords", TestData("The Lord of the Rings", "The Lord of the Rings (Movie)")),
|
||||||
|
Named.of("Undefined - Series with Keywords", TestData("Game of Thrones 01", "Game of Thrones 01")),
|
||||||
|
Named.of("Undefined - Series with number", TestData("Game of Thrones - S01E01", "Game of Thrones - 01")),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user