2024-01-25 14:17:38 +00:00
|
|
|
# database_manager.py
|
2024-01-27 11:07:07 +00:00
|
|
|
import sqlite3
|
2024-01-25 14:17:38 +00:00
|
|
|
|
|
|
|
from sqlalchemy import create_engine
|
|
|
|
from sqlalchemy.orm import scoped_session, sessionmaker
|
|
|
|
from threading import local
|
|
|
|
from message_system_db_model import Base, Station, Status, Attachment, P2PMessage
|
|
|
|
from datetime import datetime
|
|
|
|
import json
|
2024-01-25 14:48:00 +00:00
|
|
|
import structlog
|
2024-01-30 20:22:04 +00:00
|
|
|
import helpers
|
2024-01-25 14:48:00 +00:00
|
|
|
|
2024-01-25 14:17:38 +00:00
|
|
|
class DatabaseManager:
|
2024-01-27 11:07:07 +00:00
|
|
|
def __init__(self, event_manger, uri='sqlite:///freedata-messages.db'):
|
|
|
|
self.event_manager = event_manger
|
|
|
|
|
2024-01-25 14:17:38 +00:00
|
|
|
self.engine = create_engine(uri, echo=False)
|
|
|
|
self.thread_local = local()
|
|
|
|
self.session_factory = sessionmaker(bind=self.engine)
|
|
|
|
Base.metadata.create_all(self.engine)
|
|
|
|
|
2024-01-25 14:49:37 +00:00
|
|
|
self.logger = structlog.get_logger(type(self).__name__)
|
|
|
|
|
2024-01-27 11:07:07 +00:00
|
|
|
def initialize_default_values(self):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
statuses = [
|
|
|
|
"transmitting",
|
|
|
|
"transmitted",
|
|
|
|
"received",
|
|
|
|
"failed",
|
|
|
|
"failed_checksum",
|
2024-02-02 18:37:02 +00:00
|
|
|
"aborted",
|
|
|
|
"queued"
|
2024-01-27 11:07:07 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
# Add default statuses if they don't exist
|
|
|
|
for status_name in statuses:
|
|
|
|
existing_status = session.query(Status).filter_by(name=status_name).first()
|
|
|
|
if not existing_status:
|
|
|
|
new_status = Status(name=status_name)
|
|
|
|
session.add(new_status)
|
|
|
|
|
|
|
|
session.commit()
|
|
|
|
self.log("Initialized database")
|
|
|
|
except Exception as e:
|
|
|
|
session.rollback()
|
|
|
|
self.log(f"An error occurred while initializing default values: {e}", isWarning=True)
|
|
|
|
finally:
|
|
|
|
session.remove()
|
2024-01-25 14:49:37 +00:00
|
|
|
|
2024-01-25 14:48:00 +00:00
|
|
|
def log(self, message, isWarning=False):
|
|
|
|
msg = f"[{type(self).__name__}]: {message}"
|
|
|
|
logger = self.logger.warn if isWarning else self.logger.info
|
|
|
|
logger(msg)
|
|
|
|
|
2024-01-25 14:17:38 +00:00
|
|
|
def get_thread_scoped_session(self):
|
|
|
|
if not hasattr(self.thread_local, "session"):
|
|
|
|
self.thread_local.session = scoped_session(self.session_factory)
|
|
|
|
return self.thread_local.session
|
|
|
|
|
2024-01-30 20:22:04 +00:00
|
|
|
def get_or_create_station(self, callsign, session=None):
|
|
|
|
own_session = False
|
|
|
|
if not session:
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
own_session = True
|
|
|
|
|
|
|
|
try:
|
|
|
|
station = session.query(Station).filter_by(callsign=callsign).first()
|
|
|
|
if not station:
|
|
|
|
self.log(f"Updating station list with {callsign}")
|
|
|
|
station = Station(callsign=callsign, checksum=helpers.get_crc_24(callsign).hex())
|
|
|
|
session.add(station)
|
|
|
|
session.flush()
|
|
|
|
|
|
|
|
if own_session:
|
|
|
|
session.commit() # Only commit if we created the session
|
|
|
|
|
|
|
|
return station
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
if own_session:
|
|
|
|
session.rollback()
|
|
|
|
|
|
|
|
finally:
|
|
|
|
if own_session:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_callsign_by_checksum(self, checksum):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
station = session.query(Station).filter_by(checksum=checksum).first()
|
|
|
|
if station:
|
|
|
|
self.log(f"Found callsign [{station.callsign}] for checksum [{station.checksum}]")
|
|
|
|
return station.callsign
|
|
|
|
else:
|
|
|
|
self.log(f"No callsign found for [{checksum}]")
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
|
|
self.log(f"Error fetching callsign for checksum {checksum}: {e}", isWarning=True)
|
|
|
|
return {'status': 'failure', 'message': str(e)}
|
|
|
|
finally:
|
|
|
|
session.remove()
|
2024-01-25 14:17:38 +00:00
|
|
|
|
|
|
|
def get_or_create_status(self, session, status_name):
|
|
|
|
status = session.query(Status).filter_by(name=status_name).first()
|
|
|
|
if not status:
|
|
|
|
status = Status(name=status_name)
|
|
|
|
session.add(status)
|
|
|
|
session.flush() # To get the ID immediately
|
|
|
|
return status
|
|
|
|
|
2024-01-29 16:50:28 +00:00
|
|
|
def add_message(self, message_data, direction='receive', status=None):
|
2024-01-25 14:17:38 +00:00
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
# Create and add the origin and destination Stations
|
2024-01-30 20:22:04 +00:00
|
|
|
origin = self.get_or_create_station(message_data['origin'], session)
|
|
|
|
destination = self.get_or_create_station(message_data['destination'], session)
|
2024-01-25 14:17:38 +00:00
|
|
|
|
|
|
|
# Create and add Status if provided
|
2024-01-29 16:50:28 +00:00
|
|
|
if status:
|
|
|
|
status = self.get_or_create_status(session, status)
|
2024-01-25 14:17:38 +00:00
|
|
|
|
|
|
|
# Parse the timestamp from the message ID
|
|
|
|
timestamp = datetime.fromisoformat(message_data['id'].split('_')[2])
|
|
|
|
# Create the P2PMessage instance
|
|
|
|
new_message = P2PMessage(
|
|
|
|
id=message_data['id'],
|
|
|
|
origin_callsign=origin.callsign,
|
|
|
|
destination_callsign=destination.callsign,
|
|
|
|
body=message_data['body'],
|
|
|
|
timestamp=timestamp,
|
2024-01-29 16:50:28 +00:00
|
|
|
direction=direction,
|
2024-01-25 14:17:38 +00:00
|
|
|
status_id=status.id if status else None
|
|
|
|
)
|
|
|
|
|
|
|
|
# Process and add attachments
|
|
|
|
for attachment_data in message_data.get('attachments', []):
|
|
|
|
attachment = Attachment(
|
|
|
|
name=attachment_data['name'],
|
|
|
|
data_type=attachment_data['type'],
|
|
|
|
data=attachment_data['data']
|
|
|
|
)
|
|
|
|
new_message.attachments.append(attachment)
|
|
|
|
|
|
|
|
session.add(new_message)
|
|
|
|
session.commit()
|
2024-01-25 14:48:00 +00:00
|
|
|
|
|
|
|
self.log(f"Added data to database: {new_message.id}")
|
2024-01-27 11:07:07 +00:00
|
|
|
self.event_manager.freedata_message_db_change()
|
2024-01-25 14:17:38 +00:00
|
|
|
return new_message.id
|
|
|
|
except Exception as e:
|
|
|
|
session.rollback()
|
2024-01-27 11:17:31 +00:00
|
|
|
self.log(f"error adding new message to databse with error: {e}", isWarning=True)
|
|
|
|
self.log(f"---> please delete or update existing database", isWarning=True)
|
2024-01-25 14:17:38 +00:00
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_all_messages(self):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
messages = session.query(P2PMessage).all()
|
|
|
|
return [message.to_dict() for message in messages]
|
2024-01-27 11:07:07 +00:00
|
|
|
|
2024-01-25 14:17:38 +00:00
|
|
|
except Exception as e:
|
2024-01-27 11:07:07 +00:00
|
|
|
self.log(f"error fetching database messages with error: {e}", isWarning=True)
|
|
|
|
self.log(f"---> please delete or update existing database", isWarning=True)
|
|
|
|
|
2024-01-27 11:09:59 +00:00
|
|
|
return []
|
2024-01-27 11:07:07 +00:00
|
|
|
|
2024-01-25 14:17:38 +00:00
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_all_messages_json(self):
|
|
|
|
messages_dict = self.get_all_messages()
|
2024-01-27 11:09:59 +00:00
|
|
|
messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict}
|
|
|
|
return json.dumps(messages_with_header) # Convert to JSON string
|
2024-01-28 11:08:55 +00:00
|
|
|
|
|
|
|
def get_message_by_id(self, message_id):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
|
|
|
if message:
|
|
|
|
return message.to_dict()
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
|
|
self.log(f"Error fetching message with ID {message_id}: {e}", isWarning=True)
|
|
|
|
return None
|
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_message_by_id_json(self, message_id):
|
|
|
|
message_dict = self.get_message_by_id(message_id)
|
|
|
|
return json.dumps(message_dict) # Convert to JSON string
|
|
|
|
|
|
|
|
def delete_message(self, message_id):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
|
|
|
if message:
|
|
|
|
session.delete(message)
|
|
|
|
session.commit()
|
|
|
|
self.log(f"Deleted: {message_id}")
|
2024-01-28 11:32:16 +00:00
|
|
|
self.event_manager.freedata_message_db_change()
|
2024-01-28 11:08:55 +00:00
|
|
|
return {'status': 'success', 'message': f'Message {message_id} deleted'}
|
|
|
|
else:
|
|
|
|
return {'status': 'failure', 'message': 'Message not found'}
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
session.rollback()
|
|
|
|
self.log(f"Error deleting message with ID {message_id}: {e}", isWarning=True)
|
|
|
|
return {'status': 'failure', 'message': str(e)}
|
|
|
|
|
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def update_message(self, message_id, update_data):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
|
|
|
if message:
|
|
|
|
# Update fields of the message as per update_data
|
|
|
|
if 'body' in update_data:
|
|
|
|
message.body = update_data['body']
|
2024-01-29 16:50:28 +00:00
|
|
|
if 'status' in update_data:
|
|
|
|
message.status = self.get_or_create_status(session, update_data['status'])
|
|
|
|
|
2024-01-28 11:08:55 +00:00
|
|
|
session.commit()
|
|
|
|
self.log(f"Updated: {message_id}")
|
2024-01-28 11:32:16 +00:00
|
|
|
self.event_manager.freedata_message_db_change()
|
2024-01-28 11:08:55 +00:00
|
|
|
return {'status': 'success', 'message': f'Message {message_id} updated'}
|
|
|
|
else:
|
|
|
|
return {'status': 'failure', 'message': 'Message not found'}
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
session.rollback()
|
|
|
|
self.log(f"Error updating message with ID {message_id}: {e}", isWarning=True)
|
|
|
|
return {'status': 'failure', 'message': str(e)}
|
|
|
|
|
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_attachments_by_message_id(self, message_id):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
# Query for the message with the given ID
|
|
|
|
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
|
|
|
if message:
|
|
|
|
attachments = [attachment.to_dict() for attachment in message.attachments]
|
|
|
|
return attachments
|
|
|
|
else:
|
|
|
|
return []
|
|
|
|
except Exception as e:
|
|
|
|
self.log(f"Error fetching attachments for message ID {message_id}: {e}", isWarning=True)
|
|
|
|
return []
|
|
|
|
finally:
|
|
|
|
session.remove()
|
|
|
|
|
|
|
|
def get_attachments_by_message_id_json(self, message_id):
|
|
|
|
attachments = self.get_attachments_by_message_id(message_id)
|
2024-02-02 18:37:02 +00:00
|
|
|
return json.dumps(attachments)
|
|
|
|
|
|
|
|
def get_first_queued_message(self):
|
|
|
|
session = self.get_thread_scoped_session()
|
|
|
|
try:
|
|
|
|
# Find the status object for "queued"
|
|
|
|
queued_status = session.query(Status).filter_by(name='queued').first()
|
|
|
|
if queued_status:
|
|
|
|
# Query for the first (oldest) message with status "queued"
|
|
|
|
message = session.query(P2PMessage)\
|
|
|
|
.filter_by(status=queued_status)\
|
|
|
|
.order_by(P2PMessage.timestamp)\
|
|
|
|
.first()
|
|
|
|
if message:
|
|
|
|
self.log(f"Found queued message with ID {message.id}")
|
|
|
|
return message.to_dict()
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
self.log("Queued status not found", isWarning=True)
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
|
|
self.log(f"Error fetching the first queued message: {e}", isWarning=True)
|
|
|
|
return None
|
|
|
|
finally:
|
|
|
|
session.remove()
|