Improve protocol tests

This commit is contained in:
Pedro 2023-12-04 15:52:50 +01:00
parent 5a58ef97bb
commit a2edf4967a

View file

@ -30,6 +30,17 @@ class TestDataFrameFactory(unittest.TestCase):
cls.data_queue_received,
cls.modem_transmit_queue)
def shortcutTransmission(self):
transmission_item = self.modem_transmit_queue.get()
frame_bytes = bytes(transmission_item['frame'])
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
def assertEventReceivedType(self, event_type):
ev = self.event_queue.get()
self.assertIn('freedata', ev)
self.assertIn('received', ev)
self.assertEqual(ev['received'], event_type)
def testPingWithAck(self):
# Run ping command
api_params = { "dxcall": "XX1XXX-7" }
@ -37,25 +48,12 @@ class TestDataFrameFactory(unittest.TestCase):
ping_cmd.run(self.event_queue, self.modem_transmit_queue)
# Shortcut the transmit queue directly to the frame dispatcher
transmission_item = self.modem_transmit_queue.get()
frame_bytes = bytes(transmission_item['frame'])
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
# Expect to receive and event of the received PING request
ev = self.event_queue.get()
self.assertIn('freedata', ev)
self.assertIn('received', ev)
self.assertEqual(ev['received'], 'PING')
self.shortcutTransmission()
self.assertEventReceivedType('PING')
# Check ACK
transmission_item = self.modem_transmit_queue.get()
frame_bytes = bytes(transmission_item['frame'])
self.frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
ev = self.event_queue.get()
self.assertIn('freedata', ev)
self.assertIn('received', ev)
self.assertEqual(ev['received'], 'PING_ACK')
self.shortcutTransmission()
self.assertEventReceivedType('PING_ACK')
if __name__ == '__main__':
unittest.main()