mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
crazy commit with --memory-- database stuff...
This commit is contained in:
parent
01714c7691
commit
eb3a74e146
|
@ -3,6 +3,8 @@
|
|||
import structlog
|
||||
import lzma
|
||||
import gzip
|
||||
from message_p2p import MessageP2P
|
||||
from message_system_db_manager import DatabaseManager
|
||||
|
||||
class ARQDataTypeHandler:
|
||||
def __init__(self):
|
||||
|
@ -80,4 +82,8 @@ class ARQDataTypeHandler:
|
|||
def handle_p2pmsg_lzma(self, data):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
|
||||
received_message_obj = MessageP2P.from_payload(decompressed_data)
|
||||
received_message_dict = MessageP2P.to_dict(received_message_obj, received=True)
|
||||
DatabaseManager(uri='sqlite:///:memory:').add_message(received_message_dict)
|
||||
|
||||
return decompressed_data
|
||||
|
|
|
@ -42,7 +42,7 @@ class MessageP2P:
|
|||
payload_message['body'], attachments)
|
||||
|
||||
def get_id(self) -> str:
|
||||
return f"{self.origin}.{self.destination}.{self.timestamp}"
|
||||
return f"{self.origin}_{self.destination}_{self.timestamp}"
|
||||
|
||||
def __encode_attachment__(self, binary_attachment: dict):
|
||||
encoded_attachment = binary_attachment.copy()
|
||||
|
@ -54,14 +54,21 @@ class MessageP2P:
|
|||
decoded_attachment['data'] = base64.b64decode(encoded_attachment['data'])
|
||||
return decoded_attachment
|
||||
|
||||
def to_dict(self):
|
||||
def to_dict(self, received=False):
|
||||
"""Make a dictionary out of the message data
|
||||
"""
|
||||
|
||||
if received:
|
||||
direction = 'receive'
|
||||
else:
|
||||
direction = 'transmit'
|
||||
|
||||
return {
|
||||
'id': self.get_id(),
|
||||
'origin': self.origin,
|
||||
'destination': self.destination,
|
||||
'body': self.body,
|
||||
'direction': direction,
|
||||
'attachments': list(map(self.__encode_attachment__, self.attachments)),
|
||||
}
|
||||
|
||||
|
|
94
modem/message_system_db_manager.py
Normal file
94
modem/message_system_db_manager.py
Normal file
|
@ -0,0 +1,94 @@
|
|||
# database_manager.py
|
||||
|
||||
from sqlalchemy import create_engine
|
||||
from sqlalchemy.orm import scoped_session, sessionmaker
|
||||
from threading import local
|
||||
from message_system_db_model import Base, Station, Status, Attachment, P2PMessage
|
||||
from datetime import datetime
|
||||
import json
|
||||
|
||||
class DatabaseManager:
|
||||
def __init__(self, uri='sqlite:///freedata-messages.db'):
|
||||
self.engine = create_engine(uri, echo=False)
|
||||
self.thread_local = local()
|
||||
self.session_factory = sessionmaker(bind=self.engine)
|
||||
Base.metadata.create_all(self.engine)
|
||||
|
||||
def get_thread_scoped_session(self):
|
||||
if not hasattr(self.thread_local, "session"):
|
||||
self.thread_local.session = scoped_session(self.session_factory)
|
||||
return self.thread_local.session
|
||||
|
||||
def get_or_create_station(self, session, callsign):
|
||||
station = session.query(Station).filter_by(callsign=callsign).first()
|
||||
if not station:
|
||||
station = Station(callsign=callsign)
|
||||
session.add(station)
|
||||
session.flush() # To get the callsign immediately
|
||||
return station
|
||||
|
||||
def get_or_create_status(self, session, status_name):
|
||||
status = session.query(Status).filter_by(name=status_name).first()
|
||||
if not status:
|
||||
status = Status(name=status_name)
|
||||
session.add(status)
|
||||
session.flush() # To get the ID immediately
|
||||
return status
|
||||
|
||||
def add_message(self, message_data):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
# Create and add the origin and destination Stations
|
||||
origin = self.get_or_create_station(session, message_data['origin'])
|
||||
destination = self.get_or_create_station(session, message_data['destination'])
|
||||
|
||||
# Create and add Status if provided
|
||||
status = None
|
||||
if 'status' in message_data:
|
||||
status = self.get_or_create_status(session, message_data['status'])
|
||||
|
||||
# Parse the timestamp from the message ID
|
||||
timestamp = datetime.fromisoformat(message_data['id'].split('_')[2])
|
||||
|
||||
# Create the P2PMessage instance
|
||||
new_message = P2PMessage(
|
||||
id=message_data['id'],
|
||||
origin_callsign=origin.callsign,
|
||||
destination_callsign=destination.callsign,
|
||||
body=message_data['body'],
|
||||
timestamp=timestamp,
|
||||
status_id=status.id if status else None
|
||||
)
|
||||
|
||||
# Process and add attachments
|
||||
for attachment_data in message_data.get('attachments', []):
|
||||
attachment = Attachment(
|
||||
name=attachment_data['name'],
|
||||
data_type=attachment_data['type'],
|
||||
data=attachment_data['data']
|
||||
)
|
||||
new_message.attachments.append(attachment)
|
||||
|
||||
session.add(new_message)
|
||||
session.commit()
|
||||
return new_message.id
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
raise e
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def get_all_messages(self):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
messages = session.query(P2PMessage).all()
|
||||
return [message.to_dict() for message in messages]
|
||||
except Exception as e:
|
||||
raise e
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def get_all_messages_json(self):
|
||||
messages_dict = self.get_all_messages()
|
||||
messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict}
|
||||
return json.dumps(messages_with_header) # Convert to JSON string
|
63
modem/message_system_db_model.py
Normal file
63
modem/message_system_db_model.py
Normal file
|
@ -0,0 +1,63 @@
|
|||
# models.py
|
||||
|
||||
from sqlalchemy import Column, String, Integer, JSON, ForeignKey, DateTime
|
||||
from sqlalchemy.orm import declarative_base, relationship
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
class Station(Base):
|
||||
__tablename__ = 'station'
|
||||
callsign = Column(String, primary_key=True)
|
||||
location = Column(String, nullable=True)
|
||||
info = Column(String, nullable=True)
|
||||
|
||||
class Status(Base):
|
||||
__tablename__ = 'status'
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String, unique=True)
|
||||
|
||||
class P2PMessage(Base):
|
||||
__tablename__ = 'p2p_message'
|
||||
id = Column(String, primary_key=True)
|
||||
origin_callsign = Column(String, ForeignKey('station.callsign'))
|
||||
via = Column(String, nullable=True)
|
||||
destination_callsign = Column(String, ForeignKey('station.callsign'))
|
||||
body = Column(String)
|
||||
attachments = relationship('Attachment', backref='p2p_message')
|
||||
timestamp = Column(DateTime)
|
||||
timestamp_sent = Column(DateTime, nullable=True)
|
||||
status_id = Column(Integer, ForeignKey('status.id'), nullable=True)
|
||||
status = relationship('Status', backref='p2p_messages')
|
||||
direction = Column(String, nullable=True)
|
||||
statistics = Column(JSON, nullable=True)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'timestamp': self.timestamp.isoformat() if self.timestamp else None,
|
||||
'origin': self.origin_callsign,
|
||||
'via': self.via,
|
||||
'destination': self.destination_callsign,
|
||||
'direction': self.direction,
|
||||
'body': self.body,
|
||||
'attachments': [attachment.to_dict() for attachment in self.attachments],
|
||||
'timestamp_sent': self.timestamp_sent.isoformat() if self.timestamp_sent else None,
|
||||
'status': self.status.name if self.status else None,
|
||||
'statistics': self.statistics
|
||||
}
|
||||
|
||||
class Attachment(Base):
|
||||
__tablename__ = 'attachment'
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String)
|
||||
data_type = Column(String)
|
||||
data = Column(String)
|
||||
message_id = Column(String, ForeignKey('p2p_message.id'))
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'id': self.id,
|
||||
'name': self.name,
|
||||
'data_type': self.data_type,
|
||||
'data': self.data # Be cautious with large binary data
|
||||
}
|
|
@ -18,6 +18,8 @@ import command_test
|
|||
import command_arq_raw
|
||||
import command_message_send
|
||||
import event_manager
|
||||
from message_system_db_manager import DatabaseManager
|
||||
|
||||
|
||||
app = Flask(__name__)
|
||||
CORS(app)
|
||||
|
@ -235,8 +237,11 @@ def get_post_radio():
|
|||
elif request.method == 'GET':
|
||||
return api_response(app.state_manager.get_radio_status())
|
||||
|
||||
@app.route('/freedata/messages', methods=['POST'])
|
||||
@app.route('/freedata/messages', methods=['POST', 'GET'])
|
||||
def post_freedata_message():
|
||||
if request.method in ['GET']:
|
||||
result = DatabaseManager(uri='sqlite:///:memory:').get_all_messages_json()
|
||||
return api_response(result)
|
||||
if enqueue_tx_command(command_message_send.SendMessageCommand, request.json):
|
||||
return api_response(request.json)
|
||||
else:
|
||||
|
|
|
@ -27,4 +27,5 @@ pytest-cov
|
|||
pytest-cover
|
||||
pytest-coverage
|
||||
pytest-rerunfailures
|
||||
pick
|
||||
pick
|
||||
sqlalchemy
|
|
@ -5,6 +5,8 @@ import numpy as np
|
|||
import unittest
|
||||
from config import CONFIG
|
||||
from message_p2p import MessageP2P
|
||||
from message_system_db_manager import DatabaseManager
|
||||
|
||||
|
||||
class TestDataFrameFactory(unittest.TestCase):
|
||||
|
||||
|
@ -13,7 +15,7 @@ class TestDataFrameFactory(unittest.TestCase):
|
|||
config_manager = CONFIG('modem/config.ini.example')
|
||||
cls.config = config_manager.read()
|
||||
cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}"
|
||||
|
||||
cls.database_manager = DatabaseManager(uri='sqlite:///:memory:')
|
||||
|
||||
def testFromApiParams(self):
|
||||
api_params = {
|
||||
|
@ -34,10 +36,16 @@ class TestDataFrameFactory(unittest.TestCase):
|
|||
payload = message.to_payload()
|
||||
|
||||
received_message = MessageP2P.from_payload(payload)
|
||||
received_message_dict = MessageP2P.to_dict(received_message, received=True)
|
||||
self.database_manager.add_message(received_message_dict)
|
||||
|
||||
self.assertEqual(message.origin, received_message.origin)
|
||||
self.assertEqual(message.destination, received_message.destination)
|
||||
self.assertCountEqual(message.attachments, received_message.attachments)
|
||||
self.assertEqual(attachment['data'], received_message.attachments[0]['data'])
|
||||
|
||||
result = self.database_manager.get_all_messages()
|
||||
self.assertEqual(result[0]["destination"], message.destination)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in a new issue