FreeDATA/tests/test_protocols.py

86 lines
3 KiB
Python
Raw Normal View History

import sys
2024-04-17 20:49:08 +00:00
sys.path.append('freedata-server')
import unittest
from config import CONFIG
from frame_dispatcher import DISPATCHER
import helpers
import queue
from state_manager import StateManager
2024-01-06 14:43:16 +00:00
from event_manager import EventManager
from command_ping import PingCommand
2023-12-04 15:01:18 +00:00
from command_cq import CQCommand
2023-12-16 10:33:10 +00:00
import modem
2023-12-16 11:54:16 +00:00
import frame_handler
2024-01-12 14:51:23 +00:00
from radio_manager import RadioManager
2023-12-04 16:30:07 +00:00
class TestProtocols(unittest.TestCase):
@classmethod
def setUpClass(cls):
2024-04-17 20:49:08 +00:00
config_manager = CONFIG('freedata-server/config.ini.example')
cls.config = config_manager.read()
cls.state_manager_queue = queue.Queue()
cls.state_manager = StateManager(cls.state_manager_queue)
cls.event_queue = queue.Queue()
2024-01-06 14:43:16 +00:00
cls.event_manager = EventManager([cls.event_queue])
2024-01-12 14:51:23 +00:00
cls.radio_manager = RadioManager(cls.config, cls.state_manager, cls.event_manager)
cls.modem_transmit_queue = queue.Queue()
2024-01-12 14:51:23 +00:00
cls.modem = modem.RF(cls.config, cls.event_queue, queue.Queue(), queue.Queue(), cls.state_manager, cls.radio_manager)
2023-12-16 11:54:16 +00:00
modem.TESTMODE = True
frame_handler.TESTMODE = True
2024-04-17 20:49:08 +00:00
#cls.freedata-server.start_modem()
cls.frame_dispatcher = DISPATCHER(cls.config,
2024-01-06 14:43:16 +00:00
cls.event_manager,
2023-12-16 11:54:16 +00:00
cls.state_manager,
2023-12-16 10:33:10 +00:00
cls.modem)
2023-12-16 11:54:16 +00:00
def shortcutTransmission(self, frame_bytes):
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
2023-12-04 14:52:50 +00:00
def assertEventReceivedType(self, event_type):
ev = self.event_queue.get()
2024-01-06 14:43:16 +00:00
self.assertIn('type', ev)
self.assertIn('received', ev)
2023-12-04 14:52:50 +00:00
self.assertEqual(ev['received'], event_type)
2023-12-04 14:52:50 +00:00
def testPingWithAck(self):
# Run ping command
2024-02-19 19:08:05 +00:00
api_params = { "dxcall": "AA1AAA-1"}
2024-01-06 14:43:16 +00:00
ping_cmd = PingCommand(self.config, self.state_manager, self.event_manager, api_params)
2024-04-17 20:49:08 +00:00
#ping_cmd.run(self.event_queue, self.freedata-server)
2023-12-16 11:54:16 +00:00
frame = ping_cmd.test(self.event_queue)
2023-12-04 14:52:50 +00:00
# Shortcut the transmit queue directly to the frame dispatcher
2023-12-16 11:54:16 +00:00
self.shortcutTransmission(frame)
2023-12-04 14:52:50 +00:00
self.assertEventReceivedType('PING')
2023-12-16 11:54:16 +00:00
event_frame = self.event_queue.get()
2023-12-04 14:52:50 +00:00
# Check ACK
2023-12-16 11:54:16 +00:00
self.shortcutTransmission(event_frame)
self.assertEventReceivedType('PING_ACK')
print("PING/PING ACK CHECK SUCCESSFULLY")
2023-12-04 15:01:18 +00:00
def testCQWithQRV(self):
self.config['MODEM']['respond_to_cq'] = True
api_params = {}
2024-01-06 14:43:16 +00:00
cmd = CQCommand(self.config, self.state_manager, self.event_manager, api_params)
2024-04-17 20:49:08 +00:00
#cmd.run(self.event_queue, self.freedata-server)
2023-12-16 11:54:16 +00:00
frame = cmd.test(self.event_queue)
2023-12-04 15:01:18 +00:00
2023-12-16 11:54:16 +00:00
self.shortcutTransmission(frame)
2023-12-04 15:01:18 +00:00
self.assertEventReceivedType('CQ')
2023-12-16 11:54:16 +00:00
event_frame = self.event_queue.get()
# Check QRV
self.shortcutTransmission(event_frame)
2023-12-04 15:01:18 +00:00
self.assertEventReceivedType('QRV')
2023-12-16 11:54:16 +00:00
print("CQ/QRV CHECK SUCCESSFULLY")
2023-12-04 15:01:18 +00:00
if __name__ == '__main__':
unittest.main()