mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
ARQ WIP - Fix data factory tests!
This commit is contained in:
parent
4c3b3d79af
commit
227bc40a00
|
@ -2,23 +2,6 @@ from modem_frametypes import FRAME_TYPE as FR_TYPE
|
||||||
import helpers
|
import helpers
|
||||||
import codec2
|
import codec2
|
||||||
|
|
||||||
"""
|
|
||||||
How to use this class:
|
|
||||||
|
|
||||||
builder = DataFrameFactory()
|
|
||||||
payload = {
|
|
||||||
"origin" : helpers.callsign_to_bytes("DJ2LS-9"),
|
|
||||||
"gridsquare": helpers.encode_grid("JN49ea"),
|
|
||||||
"data": bytes(4)
|
|
||||||
}
|
|
||||||
|
|
||||||
frame = builder.construct(FR_TYPE.CQ, payload)
|
|
||||||
decoded_frame = builder.deconstruct(frame)
|
|
||||||
decoded_frame: {'frame_type': 'CQ', 'origin': b'DJ2LS-9', 'gridsquare': 'JN49EA', 'data': bytearray(b'\x00\x00\x00\x00')}
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
class DataFrameFactory:
|
class DataFrameFactory:
|
||||||
|
|
||||||
LENGTH_SIG0_FRAME = 14
|
LENGTH_SIG0_FRAME = 14
|
||||||
|
@ -135,7 +118,7 @@ class DataFrameFactory:
|
||||||
|
|
||||||
# arq burst frame
|
# arq burst frame
|
||||||
self.template_list[FR_TYPE.BURST_FRAME.value] = {
|
self.template_list[FR_TYPE.BURST_FRAME.value] = {
|
||||||
"frame_length": "dynamic",
|
"frame_length": None,
|
||||||
"session_id": 1,
|
"session_id": 1,
|
||||||
"offset": 4,
|
"offset": 4,
|
||||||
"data": "dynamic",
|
"data": "dynamic",
|
||||||
|
@ -161,21 +144,17 @@ class DataFrameFactory:
|
||||||
"snr": 1,
|
"snr": 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
def construct(self, frametype, content, frame_length=LENGTH_SIG1_FRAME):
|
def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME):
|
||||||
# frame_length: can be set manually for data frames, whose length can be dynamic regarding corresponding mode
|
|
||||||
# data bursts have a frame type range of 01-50
|
|
||||||
if frametype in range(1, 50):
|
|
||||||
frame_template = self.template_list[frametype]
|
|
||||||
frame = bytearray(frame_length)
|
|
||||||
# override "dynamic" value of payload length
|
|
||||||
self.template_list[frametype]["data"] = frame_length - 3
|
|
||||||
frame[:1] = bytes([frametype])
|
|
||||||
|
|
||||||
|
frame_template = self.template_list[frametype.value]
|
||||||
|
|
||||||
|
if isinstance(frame_template["frame_length"], int):
|
||||||
|
length = frame_template["frame_length"]
|
||||||
else:
|
else:
|
||||||
frame_template = self.template_list[frametype.value]
|
length = frame_length
|
||||||
frame_length = frame_template["frame_length"]
|
|
||||||
frame = bytearray(frame_length)
|
frame = bytearray(frame_length)
|
||||||
frame[:1] = bytes([frametype.value])
|
frame[:1] = bytes([frametype.value])
|
||||||
|
|
||||||
buffer_position = 1
|
buffer_position = 1
|
||||||
for key, item_length in frame_template.items():
|
for key, item_length in frame_template.items():
|
||||||
|
@ -198,53 +177,44 @@ class DataFrameFactory:
|
||||||
extracted_data = {"frame_type": FR_TYPE(frametype).name, "frame_type_int": frametype}
|
extracted_data = {"frame_type": FR_TYPE(frametype).name, "frame_type_int": frametype}
|
||||||
|
|
||||||
for key, item_length in frame_template.items():
|
for key, item_length in frame_template.items():
|
||||||
if key != "frame_length":
|
if key == "frame_length":
|
||||||
# data is always on the last payload slots
|
continue
|
||||||
if item_length in ["dynamic"] and key in["data"]:
|
|
||||||
data = frame[buffer_position:]
|
|
||||||
item_length = len(data)
|
|
||||||
else:
|
|
||||||
data = frame[buffer_position: buffer_position + item_length]
|
|
||||||
|
|
||||||
# Process the data based on the key
|
# data is always on the last payload slots
|
||||||
if key in ["origin", "destination"]:
|
if item_length in ["dynamic"] and key in["data"]:
|
||||||
extracted_data[key] = helpers.bytes_to_callsign(data).decode()
|
data = frame[buffer_position:]
|
||||||
|
item_length = len(data)
|
||||||
|
else:
|
||||||
|
data = frame[buffer_position: buffer_position + item_length]
|
||||||
|
|
||||||
if key in ["origin_crc", "destination_crc"]:
|
# Process the data based on the key
|
||||||
extracted_data[key] = data.hex()
|
if key in ["origin", "destination"]:
|
||||||
|
extracted_data[key] = helpers.bytes_to_callsign(data).decode()
|
||||||
|
|
||||||
elif key == "gridsquare":
|
elif key in ["origin_crc", "destination_crc"]:
|
||||||
extracted_data[key] = helpers.decode_grid(data)
|
extracted_data[key] = data.hex()
|
||||||
|
|
||||||
elif key in ["session_id", "speed_level",
|
elif key == "gridsquare":
|
||||||
"frames_per_burst", "version",
|
extracted_data[key] = helpers.decode_grid(data)
|
||||||
"snr"]:
|
|
||||||
extracted_data[key] = int.from_bytes(data, 'big')
|
|
||||||
|
|
||||||
else:
|
elif key in ["session_id", "speed_level",
|
||||||
extracted_data[key] = data
|
"frames_per_burst", "version",
|
||||||
|
"snr"]:
|
||||||
|
extracted_data[key] = int.from_bytes(data, 'big')
|
||||||
|
|
||||||
buffer_position += item_length
|
else:
|
||||||
|
extracted_data[key] = data
|
||||||
|
|
||||||
|
buffer_position += item_length
|
||||||
|
|
||||||
return extracted_data
|
return extracted_data
|
||||||
|
|
||||||
|
|
||||||
def get_bytes_per_frame(mode: int) -> int:
|
def get_bytes_per_frame(mode: int) -> int:
|
||||||
"""
|
|
||||||
Provide bytes per frame information for accessing from data handler
|
|
||||||
|
|
||||||
:param mode: Codec2 mode to query
|
|
||||||
:type mode: int or str
|
|
||||||
:return: Bytes per frame of the supplied codec2 data mode
|
|
||||||
:rtype: int
|
|
||||||
"""
|
|
||||||
freedv = codec2.open_instance(mode)
|
freedv = codec2.open_instance(mode)
|
||||||
# get number of bytes per frame for mode
|
|
||||||
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
|
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
|
||||||
# TODO add close session
|
|
||||||
return bytes_per_frame
|
return bytes_per_frame
|
||||||
|
|
||||||
|
|
||||||
def build_ping(self, destination):
|
def build_ping(self, destination):
|
||||||
payload = {
|
payload = {
|
||||||
"destination_crc": helpers.get_crc_24(destination),
|
"destination_crc": helpers.get_crc_24(destination),
|
||||||
|
@ -353,13 +323,13 @@ class DataFrameFactory:
|
||||||
}
|
}
|
||||||
return self.construct(FR_TYPE.ARQ_SESSION_INFO_ACK, payload)
|
return self.construct(FR_TYPE.ARQ_SESSION_INFO_ACK, payload)
|
||||||
|
|
||||||
def build_arq_burst_frame(self, session_id: int, offset: int, data: bytes):
|
def build_arq_burst_frame(self, freedv_mode: int, session_id: int, offset: int, data: bytes):
|
||||||
payload = {
|
payload = {
|
||||||
"session_id": session_id.to_bytes(1, 'big'),
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
"offset": offset.to_bytes(4, 'big'),
|
"offset": offset.to_bytes(4, 'big'),
|
||||||
"data": data,
|
"data": data,
|
||||||
}
|
}
|
||||||
return self.construct(FR_TYPE.BURST_FRAME, payload)
|
return self.construct(FR_TYPE.BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode))
|
||||||
|
|
||||||
def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int,
|
def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int,
|
||||||
frames_per_burst: int, snr: int):
|
frames_per_burst: int, snr: int):
|
||||||
|
|
|
@ -25,16 +25,15 @@ class TestDataFrameFactory(unittest.TestCase):
|
||||||
ping_frame = self.factory.build_ping(dxcall)
|
ping_frame = self.factory.build_ping(dxcall)
|
||||||
ping_data = self.factory.deconstruct(ping_frame)
|
ping_data = self.factory.deconstruct(ping_frame)
|
||||||
self.assertEqual(ping_data['origin'], self.factory.myfullcall)
|
self.assertEqual(ping_data['origin'], self.factory.myfullcall)
|
||||||
self.assertEqual(ping_data['destination_crc'], helpers.get_crc_24(dxcall))
|
self.assertEqual(ping_data['destination_crc'], helpers.get_crc_24(dxcall).hex())
|
||||||
|
|
||||||
def testARQConnectWide(self):
|
def testARQConnect(self):
|
||||||
dxcall = "DJ2LS-4"
|
dxcall = "DJ2LS-4"
|
||||||
session_id = 123
|
session_id = 123
|
||||||
frame = self.factory.build_arq_connect(True, dxcall, session_id)
|
frame = self.factory.build_arq_session_open(dxcall, session_id)
|
||||||
frame_data = self.factory.deconstruct(frame)
|
frame_data = self.factory.deconstruct(frame)
|
||||||
|
|
||||||
self.assertEqual(frame_data['origin'], self.factory.myfullcall)
|
self.assertEqual(frame_data['origin'], self.factory.myfullcall)
|
||||||
|
|
||||||
self.assertEqual(frame_data['session_id'] , session_id)
|
self.assertEqual(frame_data['session_id'] , session_id)
|
||||||
|
|
||||||
def testCQ(self):
|
def testCQ(self):
|
||||||
|
|
Loading…
Reference in a new issue