2023-12-04 14:04:14 +00:00
|
|
|
import sys
|
|
|
|
sys.path.append('modem')
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
from config import CONFIG
|
|
|
|
from frame_dispatcher import DISPATCHER
|
|
|
|
import helpers
|
|
|
|
import queue
|
|
|
|
from state_manager import StateManager
|
|
|
|
from command_ping import PingCommand
|
2023-12-04 15:01:18 +00:00
|
|
|
from command_cq import CQCommand
|
2023-12-16 10:33:10 +00:00
|
|
|
import modem
|
2023-12-16 11:54:16 +00:00
|
|
|
import frame_handler
|
|
|
|
|
2023-12-04 14:04:14 +00:00
|
|
|
|
2023-12-04 16:30:07 +00:00
|
|
|
class TestProtocols(unittest.TestCase):
|
2023-12-04 14:04:14 +00:00
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(cls):
|
|
|
|
config_manager = CONFIG('modem/config.ini.example')
|
|
|
|
cls.config = config_manager.read()
|
|
|
|
|
|
|
|
cls.state_manager_queue = queue.Queue()
|
|
|
|
cls.state_manager = StateManager(cls.state_manager_queue)
|
|
|
|
|
|
|
|
cls.event_queue = queue.Queue()
|
|
|
|
|
|
|
|
cls.modem_transmit_queue = queue.Queue()
|
|
|
|
|
2023-12-16 10:33:10 +00:00
|
|
|
cls.modem = modem.RF(cls.config, cls.event_queue, queue.Queue(), queue.Queue(), cls.state_manager)
|
2023-12-16 11:54:16 +00:00
|
|
|
modem.TESTMODE = True
|
|
|
|
frame_handler.TESTMODE = True
|
|
|
|
|
|
|
|
#cls.modem.start_modem()
|
2023-12-04 14:04:14 +00:00
|
|
|
cls.frame_dispatcher = DISPATCHER(cls.config,
|
|
|
|
cls.event_queue,
|
2023-12-16 11:54:16 +00:00
|
|
|
cls.state_manager,
|
2023-12-16 10:33:10 +00:00
|
|
|
cls.modem)
|
|
|
|
|
2023-12-16 11:54:16 +00:00
|
|
|
def shortcutTransmission(self, frame_bytes):
|
2023-12-04 14:04:14 +00:00
|
|
|
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
|
|
|
|
|
2023-12-04 14:52:50 +00:00
|
|
|
def assertEventReceivedType(self, event_type):
|
2023-12-04 14:04:14 +00:00
|
|
|
ev = self.event_queue.get()
|
|
|
|
self.assertIn('freedata', ev)
|
|
|
|
self.assertIn('received', ev)
|
2023-12-04 14:52:50 +00:00
|
|
|
self.assertEqual(ev['received'], event_type)
|
2023-12-04 14:04:14 +00:00
|
|
|
|
2023-12-04 14:52:50 +00:00
|
|
|
def testPingWithAck(self):
|
|
|
|
# Run ping command
|
2023-12-16 11:54:16 +00:00
|
|
|
api_params = { "dxcall": "XX1XXX-7"}
|
2023-12-04 14:52:50 +00:00
|
|
|
ping_cmd = PingCommand(self.config, self.state_manager, self.event_queue, api_params)
|
2023-12-16 11:54:16 +00:00
|
|
|
#ping_cmd.run(self.event_queue, self.modem)
|
|
|
|
frame = ping_cmd.test(self.event_queue)
|
2023-12-04 14:52:50 +00:00
|
|
|
# Shortcut the transmit queue directly to the frame dispatcher
|
2023-12-16 11:54:16 +00:00
|
|
|
self.shortcutTransmission(frame)
|
2023-12-04 14:52:50 +00:00
|
|
|
self.assertEventReceivedType('PING')
|
|
|
|
|
2023-12-16 11:54:16 +00:00
|
|
|
event_frame = self.event_queue.get()
|
2023-12-04 14:52:50 +00:00
|
|
|
# Check ACK
|
2023-12-16 11:54:16 +00:00
|
|
|
self.shortcutTransmission(event_frame)
|
|
|
|
self.assertEventReceivedType('PING_ACK')
|
|
|
|
print("PING/PING ACK CHECK SUCCESSFULLY")
|
2023-12-04 14:04:14 +00:00
|
|
|
|
2023-12-04 15:01:18 +00:00
|
|
|
def testCQWithQRV(self):
|
|
|
|
self.config['MODEM']['respond_to_cq'] = True
|
|
|
|
|
|
|
|
api_params = {}
|
|
|
|
cmd = CQCommand(self.config, self.state_manager, self.event_queue, api_params)
|
2023-12-16 11:54:16 +00:00
|
|
|
#cmd.run(self.event_queue, self.modem)
|
|
|
|
frame = cmd.test(self.event_queue)
|
2023-12-04 15:01:18 +00:00
|
|
|
|
2023-12-16 11:54:16 +00:00
|
|
|
self.shortcutTransmission(frame)
|
2023-12-04 15:01:18 +00:00
|
|
|
self.assertEventReceivedType('CQ')
|
|
|
|
|
2023-12-16 11:54:16 +00:00
|
|
|
event_frame = self.event_queue.get()
|
|
|
|
# Check QRV
|
|
|
|
self.shortcutTransmission(event_frame)
|
2023-12-04 15:01:18 +00:00
|
|
|
self.assertEventReceivedType('QRV')
|
2023-12-16 11:54:16 +00:00
|
|
|
print("CQ/QRV CHECK SUCCESSFULLY")
|
2023-12-04 15:01:18 +00:00
|
|
|
|
2023-12-04 14:04:14 +00:00
|
|
|
if __name__ == '__main__':
|
|
|
|
unittest.main()
|