ARQ WIP - Burst Frame tests ok

This commit is contained in:
Pedro 2023-12-13 09:56:58 +01:00
parent 227bc40a00
commit d17878b123
2 changed files with 30 additions and 9 deletions

View file

@ -158,9 +158,17 @@ class DataFrameFactory:
buffer_position = 1
for key, item_length in frame_template.items():
if key != "frame_length":
frame[buffer_position: buffer_position + item_length] = content[key]
buffer_position += item_length
if key == "frame_length":
continue
if not isinstance(item_length, int):
item_length = len(content[key])
if buffer_position + item_length > frame_length:
raise RuntimeError("Frame data overflow!")
frame[buffer_position: buffer_position + item_length] = content[key]
buffer_position += item_length
return frame
@ -199,7 +207,7 @@ class DataFrameFactory:
elif key in ["session_id", "speed_level",
"frames_per_burst", "version",
"snr"]:
"snr", "offset"]:
extracted_data[key] = int.from_bytes(data, 'big')
else:
@ -210,8 +218,8 @@ class DataFrameFactory:
return extracted_data
def get_bytes_per_frame(mode: int) -> int:
freedv = codec2.open_instance(mode)
def get_bytes_per_frame(self, mode: codec2.FREEDV_MODE) -> int:
freedv = codec2.open_instance(mode.value)
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
return bytes_per_frame
@ -323,7 +331,7 @@ class DataFrameFactory:
}
return self.construct(FR_TYPE.ARQ_SESSION_INFO_ACK, payload)
def build_arq_burst_frame(self, freedv_mode: int, session_id: int, offset: int, data: bytes):
def build_arq_burst_frame(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, offset: int, data: bytes):
payload = {
"session_id": session_id.to_bytes(1, 'big'),
"offset": offset.to_bytes(4, 'big'),

View file

@ -3,7 +3,8 @@ sys.path.append('modem')
import unittest
from config import CONFIG
import data_frame_factory
from data_frame_factory import DataFrameFactory
from codec2 import FREEDV_MODE
import helpers
class TestDataFrameFactory(unittest.TestCase):
@ -12,7 +13,7 @@ class TestDataFrameFactory(unittest.TestCase):
def setUpClass(cls):
config_manager = CONFIG('modem/config.ini.example')
config = config_manager.read()
cls.factory = data_frame_factory.DataFrameFactory(config)
cls.factory = DataFrameFactory(config)
def testBeacon(self):
beacon_frame = self.factory.build_beacon()
@ -41,6 +42,18 @@ class TestDataFrameFactory(unittest.TestCase):
frame_data = self.factory.deconstruct(frame)
self.assertEqual(frame_data['origin'], self.factory.myfullcall)
self.assertEqual(frame_data['gridsquare'], self.factory.mygrid.upper())
def testBurstDataFrames(self):
session_id = 123
offset = 40
payload = b'Hello World!'
frame = self.factory.build_arq_burst_frame(FREEDV_MODE.datac3,
session_id, offset, payload)
frame_data = self.factory.deconstruct(frame)
self.assertEqual(frame_data['session_id'], session_id)
self.assertEqual(frame_data['offset'], offset)
data = frame_data['data'][:len(payload)]
self.assertEqual(data, payload)
if __name__ == '__main__':
unittest.main()