Improve DataFrameFactory and tests

This commit is contained in:
Pedro 2023-11-27 23:08:00 +01:00
parent e9c07731b7
commit f5415ce7cd
3 changed files with 52 additions and 9 deletions

View file

@ -33,6 +33,7 @@ class DataFrameFactory:
self._load_broadcast_templates()
self._load_ping_templates()
self._load_fec_templates()
self._load_arq_templates()
def _load_broadcast_templates(self):
# cq frame
@ -87,6 +88,21 @@ class DataFrameFactory:
"mycallsign": 6
}
def _load_arq_templates(self):
# same structure for narrow and wide types
arq_dc_open_ack = {
"frame_length": self.LENGTH_SIG0_FRAME,
"dxcallsign_crc": 3,
"mycallsign_crc": 3,
"mycallsign": 6,
"session_id": 1,
}
# arq connect frames
self.template_list[FR_TYPE.ARQ_DC_OPEN_ACK_N.value] = arq_dc_open_ack
self.template_list[FR_TYPE.ARQ_DC_OPEN_ACK_W.value] = arq_dc_open_ack
def construct(self, frametype, content):
frame_template = self.template_list[frametype.value]
frame_length = frame_template["frame_length"]
@ -121,14 +137,8 @@ class DataFrameFactory:
data = frame[buffer_position: buffer_position + item_length]
# Process the data based on the key
if key == "mycallsign":
# we are overriding the tempaltes mycallsign, because it will become
# the dxcallsign when receiving
extracted_data["dxcallsign"] = helpers.bytes_to_callsign(data)
elif key == "dxcallsign":
# we are overriding the tempaltes dxcallsign, because it will become
# the mycallsign when receiving
extracted_data["mycallsign"] = helpers.bytes_to_callsign(data)
if key in ["mycallsign", "dxcallsign"]:
extracted_data[key] = helpers.bytes_to_callsign(data).decode()
elif key == "gridsquare":
extracted_data[key] = helpers.decode_grid(data)
@ -200,6 +210,7 @@ class DataFrameFactory:
}
return self.construct(FR_TYPE.FEC_WAKEUP, payload)
def build_fec(self, mode, payload):
mode_int = codec2.freedv_get_mode_value_by_name(mode)
payload_per_frame = codec2.get_bytes_per_frame(mode_int) - 2
@ -213,3 +224,21 @@ class DataFrameFactory:
test_frame = bytearray(126)
test_frame[:1] = bytes([FR_TYPE.TEST_FRAME.value])
return test_frame
def build_arq_connect(self, isWideband, dxcallsign, session_id):
payload = {
"dxcallsign_crc": helpers.get_crc_24(dxcallsign),
"mycallsign_crc": helpers.get_crc_24(self.myfullcall),
"mycallsign": helpers.callsign_to_bytes(self.myfullcall),
"session_id": session_id.to_bytes(1, 'big'),
}
if isWideband:
type = FR_TYPE.ARQ_DC_OPEN_ACK_W
else:
type = FR_TYPE.ARQ_DC_OPEN_ACK_N
return self.construct(type, payload)

View file

@ -168,6 +168,7 @@ class BROADCAST(DATA):
# here we add the received station to the heard stations buffer
beacon_callsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
self.dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8")
self.event_manager.send_custom_event(
freedata="modem-message",
beacon="received",

View file

@ -4,6 +4,7 @@ sys.path.append('modem')
import unittest
from config import CONFIG
import data_frame_factory
import helpers
class TestDataFrameFactory(unittest.TestCase):
@ -17,13 +18,25 @@ class TestDataFrameFactory(unittest.TestCase):
beacon_frame = self.factory.build_beacon()
beacon_data = self.factory.deconstruct(beacon_frame)
self.assertEqual(beacon_data['mycallsign'], self.factory.myfullcall.upper())
self.assertEqual(beacon_data['mygridsquare'], self.factory.mygrid.upper())
self.assertEqual(beacon_data['gridsquare'], self.factory.mygrid.upper())
def testPing(self):
dxcall = "DJ2LS-3"
ping_frame = self.factory.build_ping(dxcall)
ping_data = self.factory.deconstruct(ping_frame)
self.assertEqual(ping_data['mycallsign'], self.factory.myfullcall)
self.assertEqual(ping_data['dxcallsign_crc'], helpers.get_crc_24(dxcall))
def testARQConnectWide(self):
dxcall = "DJ2LS-4"
session_id = 123
frame = self.factory.build_arq_connect(True, dxcall, session_id)
frame_data = self.factory.deconstruct(frame)
self.assertEqual(frame_data['mycallsign'], self.factory.myfullcall)
frame_session_id = int.from_bytes(frame_data['session_id'], 'big')
self.assertEqual(frame_session_id , session_id)
if __name__ == '__main__':
unittest.main()