This commit is contained in:
bskjon 2024-07-14 01:41:32 +02:00
parent 3a32f30838
commit 038037a15e
2 changed files with 10 additions and 6 deletions

View File

@ -82,8 +82,7 @@ class EventsPullerThread(threading.Thread):
handler_thread = MessageHandlerThread(row)
handler_thread.start()
# Introduce a small sleep to reduce CPU usage
time.sleep(5000)
except mysql.connector.Error as err:
logger.error("Database error: %s", err)
finally:
@ -91,6 +90,8 @@ class EventsPullerThread(threading.Thread):
cursor.close()
if connection:
connection.close()
# Introduce a small sleep to reduce CPU usage
time.sleep(1000)
def stop(self):
self.shutdown.set()
@ -144,12 +145,12 @@ class MessageHandlerThread(threading.Thread):
created= datetime.now().isoformat()
),
data=result,
eventType="event:media-metadata-search:performed"
eventType="EventMediaMetadataSearchPerformed"
)
logger.info("<== Outgoing message: %s \n%s", event.eventType, event_data_to_json(producedEvent))
self.insert_into_database(producedEvent)
self.insert_into_database(producedEvent, "event:media-metadata-search:performed")
@ -178,7 +179,7 @@ class MessageHandlerThread(threading.Thread):
return prefixSelector
return None
def insert_into_database(self, event: MediaEvent):
def insert_into_database(self, event: MediaEvent, eventKey: str):
try:
connection = mysql.connector.connect(
host=events_server_address,
@ -196,7 +197,7 @@ class MessageHandlerThread(threading.Thread):
cursor.execute(query, (
event.metadata.referenceId,
event.metadata.eventId,
event.eventType,
eventKey,
event_data_to_json(event)
))
connection.commit()

View File

@ -4,6 +4,7 @@ import kotlinx.coroutines.*
import mu.KotlinLogging
import no.iktdev.eventi.core.ConsumableEvent
import no.iktdev.eventi.data.EventImpl
import no.iktdev.eventi.data.referenceId
import org.springframework.context.ApplicationContext
import org.springframework.stereotype.Service
import java.util.concurrent.atomic.AtomicLong
@ -50,6 +51,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
}
}
}
log.debug { "No consumption detected for ${events.first().referenceId()}" }
}
}
@ -58,6 +60,7 @@ abstract class EventCoordinator<T : EventImpl, E : EventsManagerImpl<T>> {
private fun pullForEvents() {
coroutine.launch {
while (taskMode == ActiveMode.Active) {
log.debug { "New pull on database" }
val events = eventManager?.readAvailableEvents()
if (events == null) {
log.warn { "EventManager is not loaded!" }