Database connect if not connected
This commit is contained in:
parent
4f9de1220d
commit
15e719fe8e
@ -95,24 +95,41 @@ class EventsPullerThread(threading.Thread):
|
||||
logger.error("Error inserting into database: %s", err)
|
||||
return False
|
||||
|
||||
def __connect_to_datasource(self):
|
||||
try:
|
||||
self.connection = mysql.connector.connect(
|
||||
host=events_server_address,
|
||||
port=events_server_port,
|
||||
database=events_server_database_name,
|
||||
user=events_server_username,
|
||||
password=events_server_password
|
||||
)
|
||||
if self.connection.is_connected():
|
||||
logging.info(f"Successfully connected to {events_server_database_name} at {events_server_address}:{events_server_port}")
|
||||
except Exception as e:
|
||||
logging.error(f"Error while connecting to database: {e}")
|
||||
self.connection = None
|
||||
|
||||
def run(self) -> None:
|
||||
logger.info(f"Using {events_server_address}:{events_server_port} on table: {events_server_database_name}")
|
||||
while not self.shutdown.is_set():
|
||||
producedMessage: bool = False
|
||||
|
||||
try:
|
||||
connection = mysql.connector.connect(
|
||||
host=events_server_address,
|
||||
port=events_server_port,
|
||||
database=events_server_database_name,
|
||||
user=events_server_username,
|
||||
password=events_server_password
|
||||
)
|
||||
except:
|
||||
logging.error(f"Unable to connect to {events_server_address}:{events_server_port}. Either the server or database: {events_server_database_name} is not present yet!")
|
||||
while not self.shutdown.is_set():
|
||||
if self.connection is None or not self.connection.is_connected():
|
||||
logging.info("Attempting to reconnect to the database...")
|
||||
self.__connect_to_datasource()
|
||||
|
||||
if self.connection is None: # Connection failed, wait and retry
|
||||
time.sleep(5) # Wait 5 seconds before retrying
|
||||
continue # Go back to the start of the connection loop
|
||||
else:
|
||||
# If connection is successful, exit the connection loop and proceed
|
||||
break
|
||||
|
||||
|
||||
try:
|
||||
rows = self.getEventsAvailable(connection=connection)
|
||||
rows = self.getEventsAvailable(connection=self.connection)
|
||||
for row in rows:
|
||||
if (row is not None):
|
||||
try:
|
||||
@ -139,7 +156,7 @@ Producing message
|
||||
============================================================================\n"""
|
||||
logger.info(producedMessage)
|
||||
|
||||
producedEvent = self.storeProducedEvent(connection=connection, event=producedEvent)
|
||||
producedEvent = self.storeProducedEvent(connection=self.connection, event=producedEvent)
|
||||
|
||||
except Exception as e:
|
||||
"""Produce failure here"""
|
||||
@ -156,14 +173,14 @@ Producing message
|
||||
data=None,
|
||||
eventType="EventMediaMetadataSearchPerformed"
|
||||
)
|
||||
self.storeProducedEvent(connection=connection, event=producedEvent)
|
||||
self.storeProducedEvent(connection=self.connection, event=producedEvent)
|
||||
|
||||
except mysql.connector.Error as err:
|
||||
logger.error("Database error: %s", err)
|
||||
finally:
|
||||
if connection:
|
||||
connection.close()
|
||||
connection = None
|
||||
if self.connection:
|
||||
self.connection.close()
|
||||
self.connection = None
|
||||
# Introduce a small sleep to reduce CPU usage
|
||||
time.sleep(2)
|
||||
if (self.shutdown.is_set()):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user