mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
Add first protocol test for ping - ping ack
This commit is contained in:
parent
6877e950ad
commit
5a58ef97bb
|
@ -5,7 +5,7 @@ modemport = 3050
|
|||
mycall = XX1XXX
|
||||
mygrid = JN18cv
|
||||
myssid = 6
|
||||
ssid_list = []
|
||||
ssid_list = [1, 7]
|
||||
enable_explorer = True
|
||||
enable_stats = True
|
||||
|
||||
|
|
61
tests/test_protocols.py
Executable file
61
tests/test_protocols.py
Executable file
|
@ -0,0 +1,61 @@
|
|||
import sys
|
||||
sys.path.append('modem')
|
||||
|
||||
import unittest
|
||||
from config import CONFIG
|
||||
from frame_dispatcher import DISPATCHER
|
||||
import helpers
|
||||
import queue
|
||||
from state_manager import StateManager
|
||||
from command_ping import PingCommand
|
||||
|
||||
class TestDataFrameFactory(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
config_manager = CONFIG('modem/config.ini.example')
|
||||
cls.config = config_manager.read()
|
||||
|
||||
cls.state_manager_queue = queue.Queue()
|
||||
cls.state_manager = StateManager(cls.state_manager_queue)
|
||||
|
||||
cls.event_queue = queue.Queue()
|
||||
|
||||
cls.data_queue_received = queue.Queue()
|
||||
cls.modem_transmit_queue = queue.Queue()
|
||||
|
||||
cls.frame_dispatcher = DISPATCHER(cls.config,
|
||||
cls.event_queue,
|
||||
cls.state_manager,
|
||||
cls.data_queue_received,
|
||||
cls.modem_transmit_queue)
|
||||
|
||||
def testPingWithAck(self):
|
||||
# Run ping command
|
||||
api_params = { "dxcall": "XX1XXX-7" }
|
||||
ping_cmd = PingCommand(self.config, self.state_manager, self.event_queue, api_params)
|
||||
ping_cmd.run(self.event_queue, self.modem_transmit_queue)
|
||||
|
||||
# Shortcut the transmit queue directly to the frame dispatcher
|
||||
transmission_item = self.modem_transmit_queue.get()
|
||||
frame_bytes = bytes(transmission_item['frame'])
|
||||
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
|
||||
|
||||
# Expect to receive and event of the received PING request
|
||||
ev = self.event_queue.get()
|
||||
self.assertIn('freedata', ev)
|
||||
self.assertIn('received', ev)
|
||||
self.assertEqual(ev['received'], 'PING')
|
||||
|
||||
# Check ACK
|
||||
transmission_item = self.modem_transmit_queue.get()
|
||||
frame_bytes = bytes(transmission_item['frame'])
|
||||
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
|
||||
|
||||
ev = self.event_queue.get()
|
||||
self.assertIn('freedata', ev)
|
||||
self.assertIn('received', ev)
|
||||
self.assertEqual(ev['received'], 'PING_ACK')
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in a new issue