mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
additional message endpoints and tests
This commit is contained in:
parent
a66e875be2
commit
69a7705043
|
@ -139,3 +139,83 @@ class DatabaseManager:
|
|||
messages_dict = self.get_all_messages()
|
||||
messages_with_header = {'total_messages' : len(messages_dict), 'messages' : messages_dict}
|
||||
return json.dumps(messages_with_header) # Convert to JSON string
|
||||
|
||||
def get_message_by_id(self, message_id):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
||||
if message:
|
||||
return message.to_dict()
|
||||
else:
|
||||
return None
|
||||
except Exception as e:
|
||||
self.log(f"Error fetching message with ID {message_id}: {e}", isWarning=True)
|
||||
return None
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def get_message_by_id_json(self, message_id):
|
||||
message_dict = self.get_message_by_id(message_id)
|
||||
return json.dumps(message_dict) # Convert to JSON string
|
||||
|
||||
def delete_message(self, message_id):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
||||
if message:
|
||||
session.delete(message)
|
||||
session.commit()
|
||||
self.log(f"Deleted: {message_id}")
|
||||
return {'status': 'success', 'message': f'Message {message_id} deleted'}
|
||||
else:
|
||||
return {'status': 'failure', 'message': 'Message not found'}
|
||||
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
self.log(f"Error deleting message with ID {message_id}: {e}", isWarning=True)
|
||||
return {'status': 'failure', 'message': str(e)}
|
||||
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def update_message(self, message_id, update_data):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
||||
if message:
|
||||
# Update fields of the message as per update_data
|
||||
if 'body' in update_data:
|
||||
message.body = update_data['body']
|
||||
session.commit()
|
||||
self.log(f"Updated: {message_id}")
|
||||
return {'status': 'success', 'message': f'Message {message_id} updated'}
|
||||
else:
|
||||
return {'status': 'failure', 'message': 'Message not found'}
|
||||
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
self.log(f"Error updating message with ID {message_id}: {e}", isWarning=True)
|
||||
return {'status': 'failure', 'message': str(e)}
|
||||
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def get_attachments_by_message_id(self, message_id):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
# Query for the message with the given ID
|
||||
message = session.query(P2PMessage).filter_by(id=message_id).first()
|
||||
if message:
|
||||
attachments = [attachment.to_dict() for attachment in message.attachments]
|
||||
return attachments
|
||||
else:
|
||||
return []
|
||||
except Exception as e:
|
||||
self.log(f"Error fetching attachments for message ID {message_id}: {e}", isWarning=True)
|
||||
return []
|
||||
finally:
|
||||
session.remove()
|
||||
|
||||
def get_attachments_by_message_id_json(self, message_id):
|
||||
attachments = self.get_attachments_by_message_id(message_id)
|
||||
return json.dumps(attachments)
|
|
@ -247,6 +247,23 @@ def get_post_freedata_message():
|
|||
else:
|
||||
api_abort('Error executing command...', 500)
|
||||
|
||||
@app.route('/freedata/messages/<string:message_id>', methods=['GET', 'DELETE'])
|
||||
def handle_freedata_message(message_id):
|
||||
if request.method == 'GET':
|
||||
message = DatabaseManager(app.event_manager).get_message_by_id_json(message_id)
|
||||
return message
|
||||
|
||||
elif request.method == 'DELETE':
|
||||
result = DatabaseManager(app.event_manager).delete_message(message_id)
|
||||
return api_response(result)
|
||||
else:
|
||||
api_abort('Error executing command...', 500)
|
||||
|
||||
@app.route('/freedata/messages/<string:message_id>/attachments', methods=['GET'])
|
||||
def get_message_attachments(message_id):
|
||||
attachments = DatabaseManager(app.event_manager).get_attachments_by_message_id_json(message_id)
|
||||
return api_response(attachments)
|
||||
|
||||
# @app.route('/modem/arq_connect', methods=['POST'])
|
||||
# @app.route('/modem/arq_disconnect', methods=['POST'])
|
||||
# @app.route('/modem/send_raw', methods=['POST'])
|
||||
|
|
103
tests/test_message_database.py
Normal file
103
tests/test_message_database.py
Normal file
|
@ -0,0 +1,103 @@
|
|||
import sys
|
||||
sys.path.append('modem')
|
||||
import numpy as np
|
||||
|
||||
import unittest
|
||||
from config import CONFIG
|
||||
from message_p2p import MessageP2P
|
||||
from message_system_db_manager import DatabaseManager
|
||||
from event_manager import EventManager
|
||||
import queue
|
||||
|
||||
class TestDataFrameFactory(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
config_manager = CONFIG('modem/config.ini.example')
|
||||
cls.config = config_manager.read()
|
||||
|
||||
cls.event_queue = queue.Queue()
|
||||
cls.event_manager = EventManager([cls.event_queue])
|
||||
cls.mycall = f"{cls.config['STATION']['mycall']}-{cls.config['STATION']['myssid']}"
|
||||
cls.database_manager = DatabaseManager(cls.event_manager, uri='sqlite:///:memory:')
|
||||
|
||||
def testAddToDatabase(self):
|
||||
attachment = {
|
||||
'name': 'test.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment])
|
||||
payload = message.to_payload()
|
||||
received_message = MessageP2P.from_payload(payload)
|
||||
received_message_dict = MessageP2P.to_dict(received_message, received=True)
|
||||
self.database_manager.add_message(received_message_dict)
|
||||
|
||||
result = self.database_manager.get_all_messages()
|
||||
self.assertEqual(result[0]["destination"], message.destination)
|
||||
|
||||
def testDeleteFromDatabase(self):
|
||||
attachment = {
|
||||
'name': 'test.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment])
|
||||
payload = message.to_payload()
|
||||
received_message = MessageP2P.from_payload(payload)
|
||||
received_message_dict = MessageP2P.to_dict(received_message, received=True)
|
||||
self.database_manager.add_message(received_message_dict)
|
||||
|
||||
result = self.database_manager.get_all_messages()
|
||||
message_id = result[0]["id"]
|
||||
self.database_manager.delete_message(message_id)
|
||||
|
||||
result = self.database_manager.get_all_messages()
|
||||
self.assertNotIn(message_id, result)
|
||||
|
||||
def testUpdateMessage(self):
|
||||
attachment = {
|
||||
'name': 'test.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment])
|
||||
payload = message.to_payload()
|
||||
received_message = MessageP2P.from_payload(payload)
|
||||
received_message_dict = MessageP2P.to_dict(received_message, received=True)
|
||||
message_id = self.database_manager.add_message(received_message_dict)
|
||||
|
||||
self.database_manager.update_message(message_id, {'body' : 'hello123'})
|
||||
|
||||
result = self.database_manager.get_message_by_id(message_id)
|
||||
self.assertIn('hello123', result['body'])
|
||||
|
||||
def testGetAttachments(self):
|
||||
attachment1 = {
|
||||
'name': 'test1.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
attachment2 = {
|
||||
'name': 'test2.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
attachment3 = {
|
||||
'name': 'test3.gif',
|
||||
'type': 'image/gif',
|
||||
'data': np.random.bytes(1024)
|
||||
}
|
||||
message = MessageP2P(self.mycall, 'DJ2LS-3', 'Hello World!', [attachment1, attachment2, attachment3])
|
||||
payload = message.to_payload()
|
||||
received_message = MessageP2P.from_payload(payload)
|
||||
received_message_dict = MessageP2P.to_dict(received_message, received=True)
|
||||
message_id = self.database_manager.add_message(received_message_dict)
|
||||
result = self.database_manager.get_attachments_by_message_id(message_id)
|
||||
attachment_names = [attachment['name'] for attachment in result]
|
||||
self.assertIn('test1.gif', attachment_names)
|
||||
self.assertIn('test2.gif', attachment_names)
|
||||
self.assertIn('test3.gif', attachment_names)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in a new issue