diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index 89b15fb8..6a0c9921 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -3,12 +3,12 @@ import structlog import lzma import gzip -from message_p2p import MessageP2P -from message_system_db_manager import DatabaseManager +from message_p2p import message_received class ARQDataTypeHandler: - def __init__(self): + def __init__(self, event_manager): self.logger = structlog.get_logger(type(self).__name__) + self.event_manager = event_manager self.handlers = { "raw": { 'prepare': self.prepare_raw, @@ -82,9 +82,5 @@ class ARQDataTypeHandler: def handle_p2pmsg_lzma(self, data): decompressed_data = lzma.decompress(data) self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes") - decompressed_json_string = decompressed_data.decode('utf-8') - received_message_obj = MessageP2P.from_payload(decompressed_json_string) - received_message_dict = MessageP2P.to_dict(received_message_obj, received=True) - result = DatabaseManager().add_message(received_message_dict) - + message_received(self.event_manager, decompressed_data) return decompressed_data diff --git a/modem/arq_session.py b/modem/arq_session.py index 26756d90..370a81e6 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -46,7 +46,7 @@ class ARQSession(): self.frame_factory = data_frame_factory.DataFrameFactory(self.config) self.event_frame_received = threading.Event() - self.arq_data_type_handler = ARQDataTypeHandler() + self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager) self.id = None self.session_started = time.time() self.session_ended = 0 diff --git a/modem/command.py b/modem/command.py index 331e3fa8..1ef552e8 100644 --- a/modem/command.py +++ b/modem/command.py @@ -15,7 +15,7 @@ class TxCommand(): self.event_manager = event_manager self.set_params_from_api(apiParams) self.frame_factory = DataFrameFactory(config) - self.arq_data_type_handler = ARQDataTypeHandler() + self.arq_data_type_handler = ARQDataTypeHandler(event_manager) def set_params_from_api(self, apiParams): pass diff --git a/modem/command_message_send.py b/modem/command_message_send.py index 062230b1..6a7d670a 100644 --- a/modem/command_message_send.py +++ b/modem/command_message_send.py @@ -5,6 +5,7 @@ from queue import Queue from arq_session_iss import ARQSessionISS from message_p2p import MessageP2P from arq_data_type_handler import ARQDataTypeHandler +from message_system_db_manager import DatabaseManager class SendMessageCommand(TxCommand): """Command to send a P2P message using an ARQ transfer session @@ -16,9 +17,15 @@ class SendMessageCommand(TxCommand): def transmit(self, modem): # Convert JSON string to bytes (using UTF-8 encoding) + + DatabaseManager().add_message(self.message.to_dict()) + payload = self.message.to_payload().encode('utf-8') json_bytearray = bytearray(payload) data, data_type = self.arq_data_type_handler.prepare(json_bytearray, 'p2pmsg_lzma') + + + iss = ARQSessionISS(self.config, modem, self.message.destination, diff --git a/modem/event_manager.py b/modem/event_manager.py index 15983718..21482ee3 100644 --- a/modem/event_manager.py +++ b/modem/event_manager.py @@ -89,4 +89,7 @@ class EventManager: def modem_failed(self): event = {"modem": "failed"} - self.broadcast(event) \ No newline at end of file + self.broadcast(event) + + def freedata_message_db_change(self): + self.broadcast({"message-db": "changed"}) \ No newline at end of file diff --git a/modem/message_p2p.py b/modem/message_p2p.py index ef8d8680..9e25607d 100644 --- a/modem/message_p2p.py +++ b/modem/message_p2p.py @@ -2,6 +2,14 @@ import datetime import api_validations import base64 import json +from message_system_db_manager import DatabaseManager + + +def message_received(event_manager, data): + decompressed_json_string = data.decode('utf-8') + received_message_obj = MessageP2P.from_payload(decompressed_json_string) + received_message_dict = MessageP2P.to_dict(received_message_obj, received=True) + DatabaseManager(event_manager).add_message(received_message_dict) class MessageP2P: @@ -58,17 +66,12 @@ class MessageP2P: """Make a dictionary out of the message data """ - if received: - direction = 'receive' - else: - direction = 'transmit' - return { 'id': self.get_id(), 'origin': self.origin, 'destination': self.destination, 'body': self.body, - 'direction': direction, + 'direction': 'receive' if received else 'transmit', 'attachments': list(map(self.__encode_attachment__, self.attachments)), } @@ -76,3 +79,4 @@ class MessageP2P: """Make a byte array ready to be sent out of the message data""" json_string = json.dumps(self.to_dict()) return json_string + diff --git a/modem/message_system_db_manager.py b/modem/message_system_db_manager.py index d7598a6b..8e75f759 100644 --- a/modem/message_system_db_manager.py +++ b/modem/message_system_db_manager.py @@ -1,4 +1,5 @@ # database_manager.py +import sqlite3 from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker @@ -8,9 +9,10 @@ from datetime import datetime import json import structlog - class DatabaseManager: - def __init__(self, uri='sqlite:///freedata-messages.db'): + def __init__(self, event_manger, uri='sqlite:///freedata-messages.db'): + self.event_manager = event_manger + self.engine = create_engine(uri, echo=False) self.thread_local = local() self.session_factory = sessionmaker(bind=self.engine) @@ -18,6 +20,32 @@ class DatabaseManager: self.logger = structlog.get_logger(type(self).__name__) + def initialize_default_values(self): + session = self.get_thread_scoped_session() + try: + statuses = [ + "transmitting", + "transmitted", + "received", + "failed", + "failed_checksum", + "aborted" + ] + + # Add default statuses if they don't exist + for status_name in statuses: + existing_status = session.query(Status).filter_by(name=status_name).first() + if not existing_status: + new_status = Status(name=status_name) + session.add(new_status) + + session.commit() + self.log("Initialized database") + except Exception as e: + session.rollback() + self.log(f"An error occurred while initializing default values: {e}", isWarning=True) + finally: + session.remove() def log(self, message, isWarning=False): msg = f"[{type(self).__name__}]: {message}" @@ -59,7 +87,6 @@ class DatabaseManager: # Parse the timestamp from the message ID timestamp = datetime.fromisoformat(message_data['id'].split('_')[2]) - # Create the P2PMessage instance new_message = P2PMessage( id=message_data['id'], @@ -67,6 +94,7 @@ class DatabaseManager: destination_callsign=destination.callsign, body=message_data['body'], timestamp=timestamp, + direction=message_data['direction'], status_id=status.id if status else None ) @@ -83,6 +111,7 @@ class DatabaseManager: session.commit() self.log(f"Added data to database: {new_message.id}") + self.event_manager.freedata_message_db_change() return new_message.id except Exception as e: session.rollback() @@ -95,12 +124,19 @@ class DatabaseManager: try: messages = session.query(P2PMessage).all() return [message.to_dict() for message in messages] + except Exception as e: - raise e + self.log(f"error fetching database messages with error: {e}", isWarning=True) + self.log(f"---> please delete or update existing database", isWarning=True) + + return False + finally: session.remove() def get_all_messages_json(self): messages_dict = self.get_all_messages() - messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict} - return json.dumps(messages_with_header) # Convert to JSON string + if messages_dict: + messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict} + return json.dumps(messages_with_header) # Convert to JSON string + return json.dumps({'error': 'fetching messages from database'}) \ No newline at end of file diff --git a/modem/message_system_db_model.py b/modem/message_system_db_model.py index 8609c1af..6388e52e 100644 --- a/modem/message_system_db_model.py +++ b/modem/message_system_db_model.py @@ -8,8 +8,8 @@ Base = declarative_base() class Station(Base): __tablename__ = 'station' callsign = Column(String, primary_key=True) - location = Column(String, nullable=True) - info = Column(String, nullable=True) + location = Column(JSON, nullable=True) + info = Column(JSON, nullable=True) class Status(Base): __tablename__ = 'status' @@ -20,15 +20,14 @@ class P2PMessage(Base): __tablename__ = 'p2p_message' id = Column(String, primary_key=True) origin_callsign = Column(String, ForeignKey('station.callsign')) - via = Column(String, nullable=True) + via_callsign = Column(String, ForeignKey('station.callsign'), nullable=True) destination_callsign = Column(String, ForeignKey('station.callsign')) - body = Column(String) + body = Column(String, nullable=True) attachments = relationship('Attachment', backref='p2p_message') timestamp = Column(DateTime) - timestamp_sent = Column(DateTime, nullable=True) status_id = Column(Integer, ForeignKey('status.id'), nullable=True) status = relationship('Status', backref='p2p_messages') - direction = Column(String, nullable=True) + direction = Column(String) statistics = Column(JSON, nullable=True) def to_dict(self): @@ -36,12 +35,11 @@ class P2PMessage(Base): 'id': self.id, 'timestamp': self.timestamp.isoformat() if self.timestamp else None, 'origin': self.origin_callsign, - 'via': self.via, + 'via': self.via_callsign, 'destination': self.destination_callsign, 'direction': self.direction, 'body': self.body, 'attachments': [attachment.to_dict() for attachment in self.attachments], - 'timestamp_sent': self.timestamp_sent.isoformat() if self.timestamp_sent else None, 'status': self.status.name if self.status else None, 'statistics': self.statistics } diff --git a/modem/server.py b/modem/server.py index f13f39c4..c20cb597 100644 --- a/modem/server.py +++ b/modem/server.py @@ -240,7 +240,7 @@ def get_post_radio(): @app.route('/freedata/messages', methods=['POST', 'GET']) def get_post_freedata_message(): if request.method in ['GET']: - result = DatabaseManager().get_all_messages_json() + result = DatabaseManager(app.event_manager).get_all_messages_json() return api_response(result) if enqueue_tx_command(command_message_send.SendMessageCommand, request.json): return api_response(request.json) @@ -296,6 +296,8 @@ if __name__ == "__main__": app.service_manager = service_manager.SM(app) # start modem service app.modem_service.put("start") + # initialize databse default values + DatabaseManager(app.event_manager).initialize_default_values() wsm.startThreads(app) app.run() diff --git a/tests/test_data_type_handler.py b/tests/test_data_type_handler.py index b7b8cc26..652fd881 100644 --- a/tests/test_data_type_handler.py +++ b/tests/test_data_type_handler.py @@ -2,16 +2,21 @@ import sys sys.path.append('modem') import unittest +import queue from arq_data_type_handler import ARQDataTypeHandler +from event_manager import EventManager + class TestDispatcher(unittest.TestCase): @classmethod def setUpClass(cls): - cls.arq_data_type_handler = ARQDataTypeHandler() + cls.event_queue = queue.Queue() + cls.event_manager = EventManager([cls.event_queue]) + cls.arq_data_type_handler = ARQDataTypeHandler(cls.event_manager) - def testDataTypeHandlerRaw(self): + def testDataTypeHevent_managerandlerRaw(self): # Example usage example_data = b"Hello FreeDATA!" formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, "raw") diff --git a/tests/test_message_p2p.py b/tests/test_message_p2p.py index 3c0fe285..4cedfbaa 100755 --- a/tests/test_message_p2p.py +++ b/tests/test_message_p2p.py @@ -6,7 +6,8 @@ import unittest from config import CONFIG from message_p2p import MessageP2P from message_system_db_manager import DatabaseManager - +from event_manager import EventManager +import queue class TestDataFrameFactory(unittest.TestCase): @@ -14,8 +15,11 @@ class TestDataFrameFactory(unittest.TestCase): def setUpClass(cls): config_manager = CONFIG('modem/config.ini.example') cls.config = config_manager.read() + + cls.event_queue = queue.Queue() + cls.event_manager = EventManager([cls.event_queue]) cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}" - cls.database_manager = DatabaseManager(uri='sqlite:///:memory:') + cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:') def testFromApiParams(self): api_params = { @@ -34,7 +38,20 @@ class TestDataFrameFactory(unittest.TestCase): } message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment]) payload = message.to_payload() - print(payload) + received_message = MessageP2P.from_payload(payload) + self.assertEqual(message.origin, received_message.origin) + self.assertEqual(message.destination, received_message.destination) + self.assertCountEqual(message.attachments, received_message.attachments) + self.assertEqual(attachment['data'], received_message.attachments[0]['data']) + + def testToPayloadWithAttachmentAndDatabase(self): + attachment = { + 'name': 'test.gif', + 'type': 'image/gif', + 'data': np.random.bytes(1024) + } + message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment]) + payload = message.to_payload() received_message = MessageP2P.from_payload(payload) received_message_dict = MessageP2P.to_dict(received_message, received=True) self.database_manager.add_message(received_message_dict) @@ -47,5 +64,6 @@ class TestDataFrameFactory(unittest.TestCase): result = self.database_manager.get_all_messages() self.assertEqual(result[0]["destination"], message.destination) + if __name__ == '__main__': unittest.main()