adjusted db model, made databse handlers more modular

This commit is contained in:
DJ2LS 2024-02-06 09:22:55 +01:00
parent 77d95baf00
commit e445d23968
7 changed files with 87 additions and 232 deletions

View file

@ -6,6 +6,7 @@ from arq_session_iss import ARQSessionISS
from message_p2p import MessageP2P
from arq_data_type_handler import ARQ_SESSION_TYPES
from message_system_db_manager import DatabaseManager
from message_system_db_messages import DatabaseManagerMessages
class SendMessageCommand(TxCommand):
"""Command to send a P2P message using an ARQ transfer session
@ -18,7 +19,7 @@ class SendMessageCommand(TxCommand):
print(self.message.id)
print(self.message.to_dict())
print("--------------------------------------- set params from api")
DatabaseManager(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued')
DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued')
def transmit(self, modem):
@ -26,14 +27,14 @@ class SendMessageCommand(TxCommand):
self.log("Modem busy, waiting until ready...")
return
first_queued_message = DatabaseManager(self.event_manager).get_first_queued_message()
first_queued_message = DatabaseManagerMessages(self.event_manager).get_first_queued_message()
if not first_queued_message:
self.log("No queued message in database.")
return
self.log(f"Queued message found: {first_queued_message['id']}")
DatabaseManager(self.event_manager).update_message(first_queued_message["id"], update_data={'status': 'transmitting'})
message_dict = DatabaseManager(self.event_manager).get_message_by_id(first_queued_message["id"])
DatabaseManagerMessages(self.event_manager).update_message(first_queued_message["id"], update_data={'status': 'transmitting'})
message_dict = DatabaseManagerMessages(self.event_manager).get_message_by_id(first_queued_message["id"])
print(message_dict["id"])
message = MessageP2P.from_api_params(message_dict['origin'], message_dict)
print(message.id)

View file

@ -3,6 +3,7 @@ import api_validations
import base64
import json
from message_system_db_manager import DatabaseManager
from message_system_db_messages import DatabaseManagerMessages
#import command_message_send
@ -10,7 +11,7 @@ def message_received(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
received_message_obj = MessageP2P.from_payload(decompressed_json_string)
received_message_dict = MessageP2P.to_dict(received_message_obj)
DatabaseManager(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False)
DatabaseManagerMessages(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False)
def message_failed(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
@ -19,7 +20,7 @@ def message_failed(event_manager, state_manager, data):
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
payload_message = MessageP2P.to_dict(payload_message_obj)
print(payload_message)
DatabaseManager(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'})
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'})
class MessageP2P:
def __init__(self, id: str, origin: str, destination: str, body: str, attachments: list) -> None:

View file

@ -4,11 +4,12 @@ import sqlite3
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from threading import local
from message_system_db_model import Base, Station, Status, Attachment, P2PMessage
from message_system_db_model import Base, Station, Status, Attachment, P2PMessage, MessageAttachment
from datetime import datetime
import json
import structlog
import helpers
import hashlib
class DatabaseManager:
def __init__(self, event_manger, uri='sqlite:///freedata-messages.db'):
@ -111,212 +112,3 @@ class DatabaseManager:
session.flush() # To get the ID immediately
return status
def add_message(self, message_data, direction='receive', status=None, is_read=True):
session = self.get_thread_scoped_session()
try:
# Create and add the origin and destination Stations
origin = self.get_or_create_station(message_data['origin'], session)
destination = self.get_or_create_station(message_data['destination'], session)
# Create and add Status if provided
if status:
status = self.get_or_create_status(session, status)
# Parse the timestamp from the message ID
timestamp = datetime.fromisoformat(message_data['id'].split('_')[2])
# Create the P2PMessage instance
new_message = P2PMessage(
id=message_data['id'],
origin_callsign=origin.callsign,
destination_callsign=destination.callsign,
body=message_data['body'],
timestamp=timestamp,
direction=direction,
status_id=status.id if status else None,
is_read=is_read,
attempt=0
)
# Process and add attachments
for attachment_data in message_data.get('attachments', []):
attachment = Attachment(
name=attachment_data['name'],
data_type=attachment_data['type'],
data=attachment_data['data']
)
new_message.attachments.append(attachment)
session.add(new_message)
session.commit()
self.log(f"Added data to database: {new_message.id}")
self.event_manager.freedata_message_db_change()
return new_message.id
except Exception as e:
session.rollback()
self.log(f"error adding new message to databse with error: {e}", isWarning=True)
self.log(f"---> please delete or update existing database", isWarning=True)
finally:
session.remove()
def get_all_messages(self):
session = self.get_thread_scoped_session()
try:
messages = session.query(P2PMessage).all()
return [message.to_dict() for message in messages]
except Exception as e:
self.log(f"error fetching database messages with error: {e}", isWarning=True)
self.log(f"---> please delete or update existing database", isWarning=True)
return []
finally:
session.remove()
def get_all_messages_json(self):
messages_dict = self.get_all_messages()
messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict}
return messages_with_header
def get_message_by_id(self, message_id):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
return message.to_dict()
else:
return None
except Exception as e:
self.log(f"Error fetching message with ID {message_id}: {e}", isWarning=True)
return None
finally:
session.remove()
def get_message_by_id_json(self, message_id):
message_dict = self.get_message_by_id(message_id)
return json.dumps(message_dict) # Convert to JSON string
def delete_message(self, message_id):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
session.delete(message)
session.commit()
self.log(f"Deleted: {message_id}")
self.event_manager.freedata_message_db_change()
return {'status': 'success', 'message': f'Message {message_id} deleted'}
else:
return {'status': 'failure', 'message': 'Message not found'}
except Exception as e:
session.rollback()
self.log(f"Error deleting message with ID {message_id}: {e}", isWarning=True)
return {'status': 'failure', 'message': str(e)}
finally:
session.remove()
def update_message(self, message_id, update_data):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
# Update fields of the message as per update_data
if 'body' in update_data:
message.body = update_data['body']
if 'status' in update_data:
message.status = self.get_or_create_status(session, update_data['status'])
session.commit()
self.log(f"Updated: {message_id}")
self.event_manager.freedata_message_db_change()
return {'status': 'success', 'message': f'Message {message_id} updated'}
else:
return {'status': 'failure', 'message': 'Message not found'}
except Exception as e:
session.rollback()
self.log(f"Error updating message with ID {message_id}: {e}", isWarning=True)
return {'status': 'failure', 'message': str(e)}
finally:
session.remove()
def get_attachments_by_message_id(self, message_id):
session = self.get_thread_scoped_session()
try:
# Query for the message with the given ID
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
attachments = [attachment.to_dict() for attachment in message.attachments]
return attachments
else:
return []
except Exception as e:
self.log(f"Error fetching attachments for message ID {message_id}: {e}", isWarning=True)
return []
finally:
session.remove()
def get_attachments_by_message_id_json(self, message_id):
attachments = self.get_attachments_by_message_id(message_id)
return json.dumps(attachments)
def get_first_queued_message(self):
session = self.get_thread_scoped_session()
try:
# Find the status object for "queued"
queued_status = session.query(Status).filter_by(name='queued').first()
if queued_status:
# Query for the first (oldest) message with status "queued"
message = session.query(P2PMessage)\
.filter_by(status=queued_status)\
.order_by(P2PMessage.timestamp)\
.first()
if message:
self.log(f"Found queued message with ID {message.id}")
return message.to_dict()
else:
return None
else:
self.log("Queued status not found", isWarning=True)
return None
except Exception as e:
self.log(f"Error fetching the first queued message: {e}", isWarning=True)
return None
finally:
session.remove()
def increment_message_attempts(self, message_id):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
message.attempts += 1
session.commit()
self.log(f"Incremented attempt count for message {message_id}")
else:
self.log(f"Message with ID {message_id} not found")
except Exception as e:
session.rollback()
self.log(f"An error occurred while incrementing attempts for message {message_id}: {e}")
finally:
session.remove()
def mark_message_as_read(self, message_id):
session = self.get_thread_scoped_session()
try:
message = session.query(P2PMessage).filter_by(id=message_id).first()
if message:
message.is_read = True
session.commit()
self.log(f"Marked message {message_id} as read")
else:
self.log(f"Message with ID {message_id} not found")
except Exception as e:
session.rollback()
self.log(f"An error occurred while marking message {message_id} as read: {e}")
finally:
session.remove()

View file

@ -1,10 +1,30 @@
# models.py
from sqlalchemy import Index, Boolean, Column, String, Integer, JSON, ForeignKey, DateTime
from sqlalchemy import Index, Table, Boolean, Column, String, Integer, JSON, ForeignKey, DateTime
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class MessageAttachment(Base):
__tablename__ = 'message_attachment'
message_id = Column(String, ForeignKey('p2p_message.id', ondelete='CASCADE'), primary_key=True)
attachment_id = Column(Integer, ForeignKey('attachment.id', ondelete='CASCADE'), primary_key=True)
message = relationship('P2PMessage', back_populates='message_attachments')
attachment = relationship('Attachment', back_populates='message_attachments')
class Config(Base):
__tablename__ = 'config'
db_variable = Column(String, primary_key=True) # Unique identifier for the configuration setting
db_version = Column(String)
def to_dict(self):
return {
'db_variable': self.db_variable,
'db_settings': self.db_settings
}
class Beacon(Base):
__tablename__ = 'beacon'
id = Column(Integer, primary_key=True)
@ -45,7 +65,9 @@ class P2PMessage(Base):
via_callsign = Column(String, ForeignKey('station.callsign'), nullable=True)
destination_callsign = Column(String, ForeignKey('station.callsign'))
body = Column(String, nullable=True)
attachments = relationship('Attachment', backref='p2p_message')
message_attachments = relationship('MessageAttachment',
back_populates='message',
cascade='all, delete-orphan')
attempt = Column(Integer, default=0)
timestamp = Column(DateTime)
status_id = Column(Integer, ForeignKey('status.id'), nullable=True)
@ -58,6 +80,8 @@ class P2PMessage(Base):
Index('idx_p2p_message_origin_timestamp', 'origin_callsign', 'via_callsign', 'destination_callsign', 'timestamp', 'attachments')
def to_dict(self):
attachments_list = [ma.attachment.to_dict() for ma in self.message_attachments]
return {
'id': self.id,
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
@ -67,7 +91,7 @@ class P2PMessage(Base):
'destination': self.destination_callsign,
'direction': self.direction,
'body': self.body,
'attachments': [attachment.to_dict() for attachment in self.attachments],
'attachments': attachments_list,
'status': self.status.name if self.status else None,
'priority': self.priority,
'is_read': self.is_read,
@ -80,14 +104,18 @@ class Attachment(Base):
name = Column(String)
data_type = Column(String)
data = Column(String)
message_id = Column(String, ForeignKey('p2p_message.id'))
checksum_crc32 = Column(String)
hash_sha512 = Column(String)
message_attachments = relationship("MessageAttachment", back_populates="attachment")
Index('idx_attachments_id_message_id', 'id', 'message_id')
Index('idx_attachments_id_message_id', 'id', 'hash_sha512')
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'data_type': self.data_type,
'data': self.data # Be cautious with large binary data
'data': self.data,
'checksum_crc32': self.checksum_crc32,
'hash_sha512' : self.hash_sha512
}

View file

@ -20,6 +20,8 @@ import command_arq_raw
import command_message_send
import event_manager
from message_system_db_manager import DatabaseManager
from message_system_db_messages import DatabaseManagerMessages
from message_system_db_attachments import DatabaseManagerAttachments
from message_system_db_beacon import DatabaseManagerBeacon
from schedule_manager import ScheduleManager
@ -251,25 +253,25 @@ def get_post_freedata_message():
@app.route('/freedata/messages/<string:message_id>', methods=['GET', 'POST', 'PATCH', 'DELETE'])
def handle_freedata_message(message_id):
if request.method == 'GET':
message = DatabaseManager(app.event_manager).get_message_by_id_json(message_id)
message = DatabaseManagerMessages(app.event_manager).get_message_by_id_json(message_id)
return message
elif request.method == 'POST':
result = DatabaseManager(app.event_manager).update_message(message_id, update_data={'status': 'queued'})
DatabaseManager(app.event_manager).increment_message_attempts(message_id)
result = DatabaseManagerMessages(app.event_manager).update_message(message_id, update_data={'status': 'queued'})
DatabaseManagerMessages(app.event_manager).increment_message_attempts(message_id)
return api_response(result)
elif request.method == 'PATCH':
# Fixme We need to adjust this
result = DatabaseManager(app.event_manager).mark_message_as_read(message_id)
result = DatabaseManagerMessages(app.event_manager).mark_message_as_read(message_id)
return api_response(result)
elif request.method == 'DELETE':
result = DatabaseManager(app.event_manager).delete_message(message_id)
result = DatabaseManagerMessages(app.event_manager).delete_message(message_id)
return api_response(result)
else:
api_abort('Error executing command...', 500)
@app.route('/freedata/messages/<string:message_id>/attachments', methods=['GET'])
def get_message_attachments(message_id):
attachments = DatabaseManager(app.event_manager).get_attachments_by_message_id_json(message_id)
attachments = DatabaseManagerAttachments(app.event_manager).get_attachments_by_message_id_json(message_id)
return api_response(attachments)
@app.route('/freedata/beacons', methods=['GET'])

View file

@ -6,6 +6,9 @@ import unittest
from config import CONFIG
from message_p2p import MessageP2P
from message_system_db_manager import DatabaseManager
from message_system_db_messages import DatabaseManagerMessages
from message_system_db_attachments import DatabaseManagerAttachments
from event_manager import EventManager
import queue
import base64
@ -20,7 +23,8 @@ class TestDataFrameFactory(unittest.TestCase):
cls.event_queue = queue.Queue()
cls.event_manager = EventManager([cls.event_queue])
cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}"
cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:')
cls.database_manager = DatabaseManagerMessages(cls.event_manager)
cls.database_manager_attachments = DatabaseManagerAttachments(cls.event_manager)
def testAddToDatabase(self):
attachment = {
@ -70,7 +74,9 @@ class TestDataFrameFactory(unittest.TestCase):
payload = message.to_payload()
received_message = MessageP2P.from_payload(payload)
received_message_dict = MessageP2P.to_dict(received_message)
print(received_message_dict)
message_id = self.database_manager.add_message(received_message_dict, direction='receive')
print(message_id)
self.database_manager.update_message(message_id, {'body' : 'hello123'})
result = self.database_manager.get_message_by_id(message_id)
@ -98,11 +104,36 @@ class TestDataFrameFactory(unittest.TestCase):
received_message = MessageP2P.from_payload(payload)
received_message_dict = MessageP2P.to_dict(received_message)
message_id = self.database_manager.add_message(received_message_dict)
result = self.database_manager.get_attachments_by_message_id(message_id)
result = self.database_manager_attachments.get_attachments_by_message_id(message_id)
attachment_names = [attachment['name'] for attachment in result]
self.assertIn('test1.gif', attachment_names)
self.assertIn('test2.gif', attachment_names)
self.assertIn('test3.gif', attachment_names)
def testIncrementAttempts(self):
apiParams = {'destination': 'DJ2LS-3', 'body': 'Hello World!', 'attachments': []}
message = MessageP2P.from_api_params(self.mycall, apiParams)
payload = message.to_payload()
received_message = MessageP2P.from_payload(payload)
received_message_dict = MessageP2P.to_dict(received_message)
message_id = self.database_manager.add_message(received_message_dict)
self.database_manager.increment_message_attempts(message_id)
result = self.database_manager.get_message_by_id(message_id)
self.assertEqual(result["attempt"], 1)
def testMarkAsRead(self):
apiParams = {'destination': 'DJ2LS-3', 'body': 'Hello World!', 'attachments': []}
message = MessageP2P.from_api_params(self.mycall, apiParams)
payload = message.to_payload()
received_message = MessageP2P.from_payload(payload)
received_message_dict = MessageP2P.to_dict(received_message)
message_id = self.database_manager.add_message(received_message_dict, is_read=False)
self.database_manager.mark_message_as_read(message_id)
result = self.database_manager.get_message_by_id(message_id)
self.assertEqual(result["is_read"], True)
if __name__ == '__main__':
unittest.main()

View file

@ -5,7 +5,7 @@ import numpy as np
import unittest
from config import CONFIG
from message_p2p import MessageP2P
from message_system_db_manager import DatabaseManager
from message_system_db_messages import DatabaseManagerMessages
from event_manager import EventManager
import queue
import base64
@ -20,7 +20,7 @@ class TestDataFrameFactory(unittest.TestCase):
cls.event_queue = queue.Queue()
cls.event_manager = EventManager([cls.event_queue])
cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}"
cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:')
cls.database_manager = DatabaseManagerMessages(cls.event_manager)
def testFromApiParams(self):
api_params = {