From e445d239685721ffb50b4eebdeac079ef2af3b68 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 6 Feb 2024 09:22:55 +0100 Subject: [PATCH] adjusted db model, made databse handlers more modular --- modem/command_message_send.py | 9 +- modem/message_p2p.py | 5 +- modem/message_system_db_manager.py | 212 +---------------------------- modem/message_system_db_model.py | 40 +++++- modem/server.py | 14 +- tests/test_message_database.py | 35 ++++- tests/test_message_p2p.py | 4 +- 7 files changed, 87 insertions(+), 232 deletions(-) diff --git a/modem/command_message_send.py b/modem/command_message_send.py index 3507d54d..8f28f0f8 100644 --- a/modem/command_message_send.py +++ b/modem/command_message_send.py @@ -6,6 +6,7 @@ from arq_session_iss import ARQSessionISS from message_p2p import MessageP2P from arq_data_type_handler import ARQ_SESSION_TYPES from message_system_db_manager import DatabaseManager +from message_system_db_messages import DatabaseManagerMessages class SendMessageCommand(TxCommand): """Command to send a P2P message using an ARQ transfer session @@ -18,7 +19,7 @@ class SendMessageCommand(TxCommand): print(self.message.id) print(self.message.to_dict()) print("--------------------------------------- set params from api") - DatabaseManager(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued') + DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued') def transmit(self, modem): @@ -26,14 +27,14 @@ class SendMessageCommand(TxCommand): self.log("Modem busy, waiting until ready...") return - first_queued_message = DatabaseManager(self.event_manager).get_first_queued_message() + first_queued_message = DatabaseManagerMessages(self.event_manager).get_first_queued_message() if not first_queued_message: self.log("No queued message in database.") return self.log(f"Queued message found: {first_queued_message['id']}") - DatabaseManager(self.event_manager).update_message(first_queued_message["id"], update_data={'status': 'transmitting'}) - message_dict = DatabaseManager(self.event_manager).get_message_by_id(first_queued_message["id"]) + DatabaseManagerMessages(self.event_manager).update_message(first_queued_message["id"], update_data={'status': 'transmitting'}) + message_dict = DatabaseManagerMessages(self.event_manager).get_message_by_id(first_queued_message["id"]) print(message_dict["id"]) message = MessageP2P.from_api_params(message_dict['origin'], message_dict) print(message.id) diff --git a/modem/message_p2p.py b/modem/message_p2p.py index 735aea34..5bb363c5 100644 --- a/modem/message_p2p.py +++ b/modem/message_p2p.py @@ -3,6 +3,7 @@ import api_validations import base64 import json from message_system_db_manager import DatabaseManager +from message_system_db_messages import DatabaseManagerMessages #import command_message_send @@ -10,7 +11,7 @@ def message_received(event_manager, state_manager, data): decompressed_json_string = data.decode('utf-8') received_message_obj = MessageP2P.from_payload(decompressed_json_string) received_message_dict = MessageP2P.to_dict(received_message_obj) - DatabaseManager(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False) + DatabaseManagerMessages(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False) def message_failed(event_manager, state_manager, data): decompressed_json_string = data.decode('utf-8') @@ -19,7 +20,7 @@ def message_failed(event_manager, state_manager, data): payload_message_obj = MessageP2P.from_payload(decompressed_json_string) payload_message = MessageP2P.to_dict(payload_message_obj) print(payload_message) - DatabaseManager(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'}) + DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'}) class MessageP2P: def __init__(self, id: str, origin: str, destination: str, body: str, attachments: list) -> None: diff --git a/modem/message_system_db_manager.py b/modem/message_system_db_manager.py index 0e117472..69faf113 100644 --- a/modem/message_system_db_manager.py +++ b/modem/message_system_db_manager.py @@ -4,11 +4,12 @@ import sqlite3 from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker from threading import local -from message_system_db_model import Base, Station, Status, Attachment, P2PMessage +from message_system_db_model import Base, Station, Status, Attachment, P2PMessage, MessageAttachment from datetime import datetime import json import structlog import helpers +import hashlib class DatabaseManager: def __init__(self, event_manger, uri='sqlite:///freedata-messages.db'): @@ -111,212 +112,3 @@ class DatabaseManager: session.flush() # To get the ID immediately return status - def add_message(self, message_data, direction='receive', status=None, is_read=True): - session = self.get_thread_scoped_session() - try: - # Create and add the origin and destination Stations - origin = self.get_or_create_station(message_data['origin'], session) - destination = self.get_or_create_station(message_data['destination'], session) - - # Create and add Status if provided - if status: - status = self.get_or_create_status(session, status) - - # Parse the timestamp from the message ID - timestamp = datetime.fromisoformat(message_data['id'].split('_')[2]) - # Create the P2PMessage instance - new_message = P2PMessage( - id=message_data['id'], - origin_callsign=origin.callsign, - destination_callsign=destination.callsign, - body=message_data['body'], - timestamp=timestamp, - direction=direction, - status_id=status.id if status else None, - is_read=is_read, - attempt=0 - ) - - # Process and add attachments - for attachment_data in message_data.get('attachments', []): - attachment = Attachment( - name=attachment_data['name'], - data_type=attachment_data['type'], - data=attachment_data['data'] - ) - new_message.attachments.append(attachment) - - session.add(new_message) - session.commit() - - self.log(f"Added data to database: {new_message.id}") - self.event_manager.freedata_message_db_change() - return new_message.id - except Exception as e: - session.rollback() - self.log(f"error adding new message to databse with error: {e}", isWarning=True) - self.log(f"---> please delete or update existing database", isWarning=True) - finally: - session.remove() - - def get_all_messages(self): - session = self.get_thread_scoped_session() - try: - messages = session.query(P2PMessage).all() - return [message.to_dict() for message in messages] - - except Exception as e: - self.log(f"error fetching database messages with error: {e}", isWarning=True) - self.log(f"---> please delete or update existing database", isWarning=True) - - return [] - - finally: - session.remove() - - def get_all_messages_json(self): - messages_dict = self.get_all_messages() - messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict} - return messages_with_header - - def get_message_by_id(self, message_id): - session = self.get_thread_scoped_session() - try: - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - return message.to_dict() - else: - return None - except Exception as e: - self.log(f"Error fetching message with ID {message_id}: {e}", isWarning=True) - return None - finally: - session.remove() - - def get_message_by_id_json(self, message_id): - message_dict = self.get_message_by_id(message_id) - return json.dumps(message_dict) # Convert to JSON string - - def delete_message(self, message_id): - session = self.get_thread_scoped_session() - try: - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - session.delete(message) - session.commit() - self.log(f"Deleted: {message_id}") - self.event_manager.freedata_message_db_change() - return {'status': 'success', 'message': f'Message {message_id} deleted'} - else: - return {'status': 'failure', 'message': 'Message not found'} - - except Exception as e: - session.rollback() - self.log(f"Error deleting message with ID {message_id}: {e}", isWarning=True) - return {'status': 'failure', 'message': str(e)} - - finally: - session.remove() - - def update_message(self, message_id, update_data): - session = self.get_thread_scoped_session() - try: - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - # Update fields of the message as per update_data - if 'body' in update_data: - message.body = update_data['body'] - if 'status' in update_data: - message.status = self.get_or_create_status(session, update_data['status']) - - session.commit() - self.log(f"Updated: {message_id}") - self.event_manager.freedata_message_db_change() - return {'status': 'success', 'message': f'Message {message_id} updated'} - else: - return {'status': 'failure', 'message': 'Message not found'} - - except Exception as e: - session.rollback() - self.log(f"Error updating message with ID {message_id}: {e}", isWarning=True) - return {'status': 'failure', 'message': str(e)} - - finally: - session.remove() - - def get_attachments_by_message_id(self, message_id): - session = self.get_thread_scoped_session() - try: - # Query for the message with the given ID - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - attachments = [attachment.to_dict() for attachment in message.attachments] - return attachments - else: - return [] - except Exception as e: - self.log(f"Error fetching attachments for message ID {message_id}: {e}", isWarning=True) - return [] - finally: - session.remove() - - def get_attachments_by_message_id_json(self, message_id): - attachments = self.get_attachments_by_message_id(message_id) - return json.dumps(attachments) - - def get_first_queued_message(self): - session = self.get_thread_scoped_session() - try: - # Find the status object for "queued" - queued_status = session.query(Status).filter_by(name='queued').first() - if queued_status: - # Query for the first (oldest) message with status "queued" - message = session.query(P2PMessage)\ - .filter_by(status=queued_status)\ - .order_by(P2PMessage.timestamp)\ - .first() - if message: - self.log(f"Found queued message with ID {message.id}") - return message.to_dict() - else: - return None - else: - self.log("Queued status not found", isWarning=True) - return None - except Exception as e: - self.log(f"Error fetching the first queued message: {e}", isWarning=True) - return None - finally: - session.remove() - - def increment_message_attempts(self, message_id): - session = self.get_thread_scoped_session() - try: - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - message.attempts += 1 - session.commit() - self.log(f"Incremented attempt count for message {message_id}") - else: - self.log(f"Message with ID {message_id} not found") - except Exception as e: - session.rollback() - self.log(f"An error occurred while incrementing attempts for message {message_id}: {e}") - finally: - session.remove() - - def mark_message_as_read(self, message_id): - session = self.get_thread_scoped_session() - try: - message = session.query(P2PMessage).filter_by(id=message_id).first() - if message: - message.is_read = True - session.commit() - self.log(f"Marked message {message_id} as read") - else: - self.log(f"Message with ID {message_id} not found") - except Exception as e: - session.rollback() - self.log(f"An error occurred while marking message {message_id} as read: {e}") - finally: - session.remove() \ No newline at end of file diff --git a/modem/message_system_db_model.py b/modem/message_system_db_model.py index 5b8eb096..57ad8696 100644 --- a/modem/message_system_db_model.py +++ b/modem/message_system_db_model.py @@ -1,10 +1,30 @@ # models.py -from sqlalchemy import Index, Boolean, Column, String, Integer, JSON, ForeignKey, DateTime +from sqlalchemy import Index, Table, Boolean, Column, String, Integer, JSON, ForeignKey, DateTime from sqlalchemy.orm import declarative_base, relationship Base = declarative_base() +class MessageAttachment(Base): + __tablename__ = 'message_attachment' + message_id = Column(String, ForeignKey('p2p_message.id', ondelete='CASCADE'), primary_key=True) + attachment_id = Column(Integer, ForeignKey('attachment.id', ondelete='CASCADE'), primary_key=True) + + message = relationship('P2PMessage', back_populates='message_attachments') + attachment = relationship('Attachment', back_populates='message_attachments') + +class Config(Base): + __tablename__ = 'config' + db_variable = Column(String, primary_key=True) # Unique identifier for the configuration setting + db_version = Column(String) + + def to_dict(self): + return { + 'db_variable': self.db_variable, + 'db_settings': self.db_settings + } + + class Beacon(Base): __tablename__ = 'beacon' id = Column(Integer, primary_key=True) @@ -45,7 +65,9 @@ class P2PMessage(Base): via_callsign = Column(String, ForeignKey('station.callsign'), nullable=True) destination_callsign = Column(String, ForeignKey('station.callsign')) body = Column(String, nullable=True) - attachments = relationship('Attachment', backref='p2p_message') + message_attachments = relationship('MessageAttachment', + back_populates='message', + cascade='all, delete-orphan') attempt = Column(Integer, default=0) timestamp = Column(DateTime) status_id = Column(Integer, ForeignKey('status.id'), nullable=True) @@ -58,6 +80,8 @@ class P2PMessage(Base): Index('idx_p2p_message_origin_timestamp', 'origin_callsign', 'via_callsign', 'destination_callsign', 'timestamp', 'attachments') def to_dict(self): + attachments_list = [ma.attachment.to_dict() for ma in self.message_attachments] + return { 'id': self.id, 'timestamp': self.timestamp.isoformat() if self.timestamp else None, @@ -67,7 +91,7 @@ class P2PMessage(Base): 'destination': self.destination_callsign, 'direction': self.direction, 'body': self.body, - 'attachments': [attachment.to_dict() for attachment in self.attachments], + 'attachments': attachments_list, 'status': self.status.name if self.status else None, 'priority': self.priority, 'is_read': self.is_read, @@ -80,14 +104,18 @@ class Attachment(Base): name = Column(String) data_type = Column(String) data = Column(String) - message_id = Column(String, ForeignKey('p2p_message.id')) + checksum_crc32 = Column(String) + hash_sha512 = Column(String) + message_attachments = relationship("MessageAttachment", back_populates="attachment") - Index('idx_attachments_id_message_id', 'id', 'message_id') + Index('idx_attachments_id_message_id', 'id', 'hash_sha512') def to_dict(self): return { 'id': self.id, 'name': self.name, 'data_type': self.data_type, - 'data': self.data # Be cautious with large binary data + 'data': self.data, + 'checksum_crc32': self.checksum_crc32, + 'hash_sha512' : self.hash_sha512 } diff --git a/modem/server.py b/modem/server.py index 754fd1ef..42266159 100644 --- a/modem/server.py +++ b/modem/server.py @@ -20,6 +20,8 @@ import command_arq_raw import command_message_send import event_manager from message_system_db_manager import DatabaseManager +from message_system_db_messages import DatabaseManagerMessages +from message_system_db_attachments import DatabaseManagerAttachments from message_system_db_beacon import DatabaseManagerBeacon from schedule_manager import ScheduleManager @@ -251,25 +253,25 @@ def get_post_freedata_message(): @app.route('/freedata/messages/', methods=['GET', 'POST', 'PATCH', 'DELETE']) def handle_freedata_message(message_id): if request.method == 'GET': - message = DatabaseManager(app.event_manager).get_message_by_id_json(message_id) + message = DatabaseManagerMessages(app.event_manager).get_message_by_id_json(message_id) return message elif request.method == 'POST': - result = DatabaseManager(app.event_manager).update_message(message_id, update_data={'status': 'queued'}) - DatabaseManager(app.event_manager).increment_message_attempts(message_id) + result = DatabaseManagerMessages(app.event_manager).update_message(message_id, update_data={'status': 'queued'}) + DatabaseManagerMessages(app.event_manager).increment_message_attempts(message_id) return api_response(result) elif request.method == 'PATCH': # Fixme We need to adjust this - result = DatabaseManager(app.event_manager).mark_message_as_read(message_id) + result = DatabaseManagerMessages(app.event_manager).mark_message_as_read(message_id) return api_response(result) elif request.method == 'DELETE': - result = DatabaseManager(app.event_manager).delete_message(message_id) + result = DatabaseManagerMessages(app.event_manager).delete_message(message_id) return api_response(result) else: api_abort('Error executing command...', 500) @app.route('/freedata/messages//attachments', methods=['GET']) def get_message_attachments(message_id): - attachments = DatabaseManager(app.event_manager).get_attachments_by_message_id_json(message_id) + attachments = DatabaseManagerAttachments(app.event_manager).get_attachments_by_message_id_json(message_id) return api_response(attachments) @app.route('/freedata/beacons', methods=['GET']) diff --git a/tests/test_message_database.py b/tests/test_message_database.py index 8c32a63e..0e0ebf25 100644 --- a/tests/test_message_database.py +++ b/tests/test_message_database.py @@ -6,6 +6,9 @@ import unittest from config import CONFIG from message_p2p import MessageP2P from message_system_db_manager import DatabaseManager +from message_system_db_messages import DatabaseManagerMessages +from message_system_db_attachments import DatabaseManagerAttachments + from event_manager import EventManager import queue import base64 @@ -20,7 +23,8 @@ class TestDataFrameFactory(unittest.TestCase): cls.event_queue = queue.Queue() cls.event_manager = EventManager([cls.event_queue]) cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}" - cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:') + cls.database_manager = DatabaseManagerMessages(cls.event_manager) + cls.database_manager_attachments = DatabaseManagerAttachments(cls.event_manager) def testAddToDatabase(self): attachment = { @@ -70,7 +74,9 @@ class TestDataFrameFactory(unittest.TestCase): payload = message.to_payload() received_message = MessageP2P.from_payload(payload) received_message_dict = MessageP2P.to_dict(received_message) + print(received_message_dict) message_id = self.database_manager.add_message(received_message_dict, direction='receive') + print(message_id) self.database_manager.update_message(message_id, {'body' : 'hello123'}) result = self.database_manager.get_message_by_id(message_id) @@ -98,11 +104,36 @@ class TestDataFrameFactory(unittest.TestCase): received_message = MessageP2P.from_payload(payload) received_message_dict = MessageP2P.to_dict(received_message) message_id = self.database_manager.add_message(received_message_dict) - result = self.database_manager.get_attachments_by_message_id(message_id) + result = self.database_manager_attachments.get_attachments_by_message_id(message_id) attachment_names = [attachment['name'] for attachment in result] self.assertIn('test1.gif', attachment_names) self.assertIn('test2.gif', attachment_names) self.assertIn('test3.gif', attachment_names) + def testIncrementAttempts(self): + apiParams = {'destination': 'DJ2LS-3', 'body': 'Hello World!', 'attachments': []} + message = MessageP2P.from_api_params(self.mycall, apiParams) + payload = message.to_payload() + received_message = MessageP2P.from_payload(payload) + received_message_dict = MessageP2P.to_dict(received_message) + message_id = self.database_manager.add_message(received_message_dict) + self.database_manager.increment_message_attempts(message_id) + + + result = self.database_manager.get_message_by_id(message_id) + self.assertEqual(result["attempt"], 1) + + def testMarkAsRead(self): + apiParams = {'destination': 'DJ2LS-3', 'body': 'Hello World!', 'attachments': []} + message = MessageP2P.from_api_params(self.mycall, apiParams) + payload = message.to_payload() + received_message = MessageP2P.from_payload(payload) + received_message_dict = MessageP2P.to_dict(received_message) + message_id = self.database_manager.add_message(received_message_dict, is_read=False) + self.database_manager.mark_message_as_read(message_id) + + result = self.database_manager.get_message_by_id(message_id) + self.assertEqual(result["is_read"], True) + if __name__ == '__main__': unittest.main() diff --git a/tests/test_message_p2p.py b/tests/test_message_p2p.py index b5b665c7..5930fbbc 100755 --- a/tests/test_message_p2p.py +++ b/tests/test_message_p2p.py @@ -5,7 +5,7 @@ import numpy as np import unittest from config import CONFIG from message_p2p import MessageP2P -from message_system_db_manager import DatabaseManager +from message_system_db_messages import DatabaseManagerMessages from event_manager import EventManager import queue import base64 @@ -20,7 +20,7 @@ class TestDataFrameFactory(unittest.TestCase): cls.event_queue = queue.Queue() cls.event_manager = EventManager([cls.event_queue]) cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}" - cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:') + cls.database_manager = DatabaseManagerMessages(cls.event_manager) def testFromApiParams(self): api_params = {