FreeDATA/modem/data_frame_factory.py

429 lines
15 KiB
Python
Raw Normal View History

from modem_frametypes import FRAME_TYPE as FR_TYPE
import helpers
2023-11-23 15:59:53 +00:00
import codec2
2023-11-26 09:25:13 +00:00
"""
How to use this class:
builder = DataFrameFactory()
payload = {
2023-11-28 19:34:33 +00:00
"origin" : helpers.callsign_to_bytes("DJ2LS-9"),
2023-11-26 09:25:13 +00:00
"gridsquare": helpers.encode_grid("JN49ea"),
"data": bytes(4)
}
frame = builder.construct(FR_TYPE.CQ, payload)
decoded_frame = builder.deconstruct(frame)
2023-11-28 19:34:33 +00:00
decoded_frame: {'frame_type': 'CQ', 'origin': b'DJ2LS-9', 'gridsquare': 'JN49EA', 'data': bytearray(b'\x00\x00\x00\x00')}
2023-11-26 09:25:13 +00:00
"""
class DataFrameFactory:
LENGTH_SIG0_FRAME = 14
2023-11-28 19:43:41 +00:00
LENGTH_SIG1_FRAME = 14
def __init__(self, config):
self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}"
self.mygrid = config['STATION']['mygrid']
2023-11-26 09:33:06 +00:00
# table for holding our frame templates
self.template_list = {}
2023-11-26 09:25:13 +00:00
self._load_broadcast_templates()
self._load_ping_templates()
self._load_fec_templates()
2023-11-27 22:08:00 +00:00
self._load_arq_templates()
2023-11-26 09:25:13 +00:00
def _load_broadcast_templates(self):
# cq frame
self.template_list[FR_TYPE.CQ.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"origin": 6,
2023-11-26 09:25:13 +00:00
"gridsquare": 4
}
# qrv frame
self.template_list[FR_TYPE.QRV.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"origin": 6,
2023-11-26 09:25:13 +00:00
"gridsquare": 4,
"snr": 1
}
# beacon frame
self.template_list[FR_TYPE.BEACON.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"origin": 6,
2023-11-26 09:25:13 +00:00
"gridsquare": 4
}
def _load_ping_templates(self):
# ping frame
self.template_list[FR_TYPE.PING.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"destination_crc": 3,
"origin_crc": 3,
"origin": 6
2023-11-26 09:25:13 +00:00
}
2023-11-29 23:15:16 +00:00
# ping ack
self.template_list[FR_TYPE.PING_ACK.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
"destination_crc": 3,
"origin_crc": 3,
"gridsquare": 4,
"snr": 1,
}
2023-11-26 09:25:13 +00:00
def _load_fec_templates(self):
# fec wakeup frame
self.template_list[FR_TYPE.FEC_WAKEUP.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"origin": 6,
2023-11-26 09:25:13 +00:00
"mode": 1,
"n_bursts": 1,
}
# fec frame
self.template_list[FR_TYPE.FEC.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
"data": self.LENGTH_SIG0_FRAME - 1
2023-11-26 09:25:13 +00:00
}
# fec is writing frame
self.template_list[FR_TYPE.IS_WRITING.value] = {
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"origin": 6
2023-11-26 09:25:13 +00:00
}
2023-11-27 22:08:00 +00:00
def _load_arq_templates(self):
# same structure for narrow and wide types
2023-11-30 19:35:07 +00:00
2023-12-08 10:42:38 +00:00
arq_session_open = {
2023-11-27 22:08:00 +00:00
"frame_length": self.LENGTH_SIG0_FRAME,
2023-11-28 19:34:33 +00:00
"destination_crc": 3,
"origin_crc": 3,
"origin": 6,
2023-11-27 22:08:00 +00:00
"session_id": 1,
}
# arq connect frames
2023-12-08 10:42:38 +00:00
self.template_list[FR_TYPE.ARQ_SESSION_OPEN_N.value] = arq_session_open
self.template_list[FR_TYPE.ARQ_SESSION_OPEN_W.value] = arq_session_open
2023-11-30 19:35:07 +00:00
# same structure for narrow and wide types
2023-12-08 10:42:38 +00:00
arq_session_open_ack = {
2023-11-30 19:35:07 +00:00
"frame_length": self.LENGTH_SIG0_FRAME,
"session_id": 1,
"speed_level": 1,
"arq_protocol_version": 1
}
# arq connect ack frames
2023-12-08 10:42:38 +00:00
self.template_list[FR_TYPE.ARQ_SESSION_OPEN_ACK_N.value] = arq_session_open_ack
self.template_list[FR_TYPE.ARQ_SESSION_OPEN_ACK_W.value] = arq_session_open_ack
2023-11-27 22:08:00 +00:00
2023-11-30 19:35:07 +00:00
2023-12-08 10:42:38 +00:00
# arq data frame
# register n frames
2023-12-09 12:28:32 +00:00
for n_frame in range(1,5):
self.template_list[FR_TYPE.BURST_01.value + (n_frame-1)] = {
2023-12-08 10:42:38 +00:00
"frame_length": "dynamic",
"n_frames_per_burst": 1,
"session_id": 1,
2023-12-09 11:31:08 +00:00
"data": "dynamic",
2023-12-08 10:42:38 +00:00
}
# arq burst ack
2023-11-28 19:34:33 +00:00
self.template_list[FR_TYPE.BURST_ACK.value] = {
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"snr":1,
"speed_level": 1,
"len_arq_rx_frame_buffer": 4
}
# arq frame ack TODO We should rename this to "session ack"
self.template_list[FR_TYPE.FR_ACK.value] = {
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"snr":1
}
2023-11-28 19:34:33 +00:00
# arq burst nack
self.template_list[FR_TYPE.BURST_NACK.value] = {
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"snr": 1,
"speed_level": 1,
"len_arq_rx_frame_buffer": 4,
"n_frames_per_burst": 1
}
# arq frame nack --> TODO We should rename this to "session nack"
self.template_list[FR_TYPE.FR_NACK.value] = {
"frame_length": self.LENGTH_SIG1_FRAME,
"session_id": 1,
"snr": 1,
"speed_level": 1,
"len_arq_rx_frame_buffer": 4,
"n_frames_per_burst": 1
}
2023-12-08 10:42:38 +00:00
def construct(self, frametype, content, frame_length=LENGTH_SIG1_FRAME):
# frame_length: can be set manually for data frames, whose length can be dynamic regarding corresponding mode
# data bursts have a frame type range of 01-50
if frametype in range(1, 50):
2023-12-09 11:31:08 +00:00
frame_template = self.template_list[frametype]
2023-12-08 10:42:38 +00:00
frame = bytearray(frame_length)
# override "dynamic" value of payload length
2023-12-09 11:31:08 +00:00
self.template_list[frametype]["data"] = frame_length - 3
frame[:1] = bytes([frametype])
2023-12-08 10:42:38 +00:00
else:
frame_template = self.template_list[frametype.value]
frame_length = frame_template["frame_length"]
frame = bytearray(frame_length)
2023-12-09 11:31:08 +00:00
frame[:1] = bytes([frametype.value])
2023-11-26 09:25:13 +00:00
buffer_position = 1
for key, item_length in frame_template.items():
if key != "frame_length":
frame[buffer_position: buffer_position + item_length] = content[key]
buffer_position += item_length
return frame
def deconstruct(self, frame):
buffer_position = 1
# Extract frametype and get the corresponding template
frametype = int.from_bytes(frame[:1], "big")
frame_template = self.template_list.get(frametype)
if not frame_template:
# Handle the case where the frame type is not recognized
raise ValueError(f"Unknown frame type: {frametype}")
2023-11-28 19:00:39 +00:00
extracted_data = {"frame_type": FR_TYPE(frametype).name, "frame_type_int": frametype}
2023-11-26 09:25:13 +00:00
for key, item_length in frame_template.items():
if key != "frame_length":
2023-12-09 11:31:08 +00:00
# data is always on the last payload slots
if item_length in ["dynamic"] and key in["data"]:
data = frame[buffer_position:]
item_length = len(data)
else:
data = frame[buffer_position: buffer_position + item_length]
2023-11-26 09:25:13 +00:00
# Process the data based on the key
2023-11-28 19:34:33 +00:00
if key in ["origin", "destination"]:
2023-11-27 22:08:00 +00:00
extracted_data[key] = helpers.bytes_to_callsign(data).decode()
2023-12-09 12:31:19 +00:00
if key in ["origin_crc", "destination_crc"]:
extracted_data[key] = data.hex()
2023-11-26 09:25:13 +00:00
elif key == "gridsquare":
extracted_data[key] = helpers.decode_grid(data)
2023-12-05 17:50:39 +00:00
2023-12-10 08:59:02 +00:00
elif key in ["session_id", "speed_level", "n_frames_per_burst", "arq_protocol_version"]:
2023-12-05 17:50:39 +00:00
extracted_data[key] = int.from_bytes(data, 'big')
2023-11-26 09:25:13 +00:00
else:
extracted_data[key] = data
buffer_position += item_length
return extracted_data
def get_bytes_per_frame(mode: int) -> int:
"""
Provide bytes per frame information for accessing from data handler
:param mode: Codec2 mode to query
:type mode: int or str
:return: Bytes per frame of the supplied codec2 data mode
:rtype: int
"""
freedv = codec2.open_instance(mode)
# get number of bytes per frame for mode
2023-12-08 10:42:38 +00:00
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
# TODO add close session
return bytes_per_frame
2023-11-28 19:34:33 +00:00
def build_ping(self, destination):
2023-11-26 09:25:13 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"destination_crc": helpers.get_crc_24(destination),
"origin_crc": helpers.get_crc_24(self.myfullcall),
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
}
return self.construct(FR_TYPE.PING, payload)
2023-11-29 23:15:16 +00:00
def build_ping_ack(self, destination, snr):
payload = {
"destination_crc": helpers.get_crc_24(destination),
"origin_crc": helpers.get_crc_24(self.myfullcall),
"gridsquare": helpers.encode_grid(self.mygrid),
"snr": helpers.snr_to_bytes(snr)
}
return self.construct(FR_TYPE.PING_ACK, payload)
def build_cq(self):
2023-11-26 09:25:13 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
"gridsquare": helpers.encode_grid(self.mygrid)
}
return self.construct(FR_TYPE.CQ, payload)
2023-11-24 09:46:51 +00:00
def build_qrv(self, snr):
2023-11-26 09:25:13 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
"gridsquare": helpers.encode_grid(self.mygrid),
"snr": helpers.snr_to_bytes(snr)
}
return self.construct(FR_TYPE.QRV, payload)
def build_beacon(self):
2023-11-26 09:25:13 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
"gridsquare": helpers.encode_grid(self.mygrid)
}
return self.construct(FR_TYPE.BEACON, payload)
def build_fec_is_writing(self):
payload = {
2023-11-28 19:34:33 +00:00
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
}
return self.construct(FR_TYPE.IS_WRITING, payload)
def build_fec_wakeup(self, mode):
2023-11-25 22:22:31 +00:00
mode_int = codec2.freedv_get_mode_value_by_name(mode)
2023-11-26 09:25:13 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-26 09:25:13 +00:00
"mode": bytes([mode_int]),
"n_bursts": bytes([1]) # n payload bursts,
}
return self.construct(FR_TYPE.FEC_WAKEUP, payload)
2023-11-25 22:22:31 +00:00
def build_fec(self, mode, payload):
mode_int = codec2.freedv_get_mode_value_by_name(mode)
payload_per_frame = codec2.get_bytes_per_frame(mode_int) - 2
fec_payload_length = payload_per_frame - 1
fec_frame = bytearray(payload_per_frame)
fec_frame[:1] = bytes([FR_TYPE.FEC.value])
fec_frame[1:payload_per_frame] = bytes(payload[:fec_payload_length])
return fec_frame
def build_test(self):
test_frame = bytearray(126)
test_frame[:1] = bytes([FR_TYPE.TEST_FRAME.value])
2023-11-26 13:25:14 +00:00
return test_frame
2023-11-27 22:08:00 +00:00
def build_arq_session_connect(self, isWideband, destination, session_id):
2023-12-09 11:31:08 +00:00
2023-11-27 22:08:00 +00:00
payload = {
2023-11-28 19:34:33 +00:00
"destination_crc": helpers.get_crc_24(destination),
"origin_crc": helpers.get_crc_24(self.myfullcall),
"origin": helpers.callsign_to_bytes(self.myfullcall),
2023-11-27 22:08:00 +00:00
"session_id": session_id.to_bytes(1, 'big'),
}
channel_type = FR_TYPE.ARQ_SESSION_OPEN_W if isWideband else FR_TYPE.ARQ_SESSION_OPEN_N
2023-11-30 19:35:07 +00:00
return self.construct(channel_type, payload)
def build_arq_session_connect_ack(self, isWideband, session_id, speed_level,arq_protocol_version):
2023-11-30 19:35:07 +00:00
#connection_frame = bytearray(self.length_sig0_frame)
#connection_frame[:1] = frametype
#connection_frame[1:2] = self.session_id
#connection_frame[8:9] = bytes([self.speed_level])
#connection_frame[13:14] = bytes([self.arq_protocol_version])
payload = {
"session_id": session_id.to_bytes(1, 'big'),
"speed_level": bytes([speed_level]),
"arq_protocol_version": bytes([arq_protocol_version]),
}
channel_type = FR_TYPE.ARQ_SESSION_OPEN_ACK_W if isWideband else FR_TYPE.ARQ_SESSION_OPEN_ACK_N
2023-12-05 14:40:04 +00:00
return self.construct(channel_type, payload)
2023-11-30 19:35:07 +00:00
2023-12-08 10:42:38 +00:00
def build_arq_data_frame(self, session_id: bytes, n_frames_per_burst: int, max_size: int, n_frame: int, frame_payload: bytes):
payload = {
"n_frames_per_burst": bytes([n_frames_per_burst]),
2023-12-09 11:31:08 +00:00
"session_id": session_id.to_bytes(1, 'big'),
2023-12-08 10:42:38 +00:00
"data": frame_payload
}
2023-12-09 11:31:08 +00:00
return self.construct(FR_TYPE.BURST_01.value + (n_frame-1), payload, frame_length=max_size)
2023-12-08 10:42:38 +00:00
2023-11-28 19:34:33 +00:00
def build_arq_burst_ack(self, session_id: bytes, snr: int, speed_level: int, len_arq_rx_frame_buffer: int):
# ack_frame = bytearray(self.length_sig1_frame)
# ack_frame[:1] = bytes([FR_TYPE.BURST_ACK.value])
# ack_frame[1:2] = self.session_id
# ack_frame[2:3] = helpers.snr_to_bytes(snr)
# ack_frame[3:4] = bytes([int(self.speed_level)])
# ack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
payload = {
"session_id": session_id,
"snr": helpers.snr_to_bytes(snr),
"speed_level": bytes([speed_level]),
"len_arq_rx_frame_buffer": bytes([len_arq_rx_frame_buffer])
}
return self.construct(FR_TYPE.BURST_ACK, payload)
def build_arq_frame_ack(self, session_id: bytes, snr: int):
# ack_frame = bytearray(self.length_sig1_frame)
# ack_frame[:1] = bytes([FR_TYPE.FR_ACK.value])
# ack_frame[1:2] = self.session_id
# ack_frame[2:3] = helpers.snr_to_bytes(snr)
payload = {
"session_id": session_id,
"snr": helpers.snr_to_bytes(snr)
}
return self.construct(FR_TYPE.FR_ACK, payload)
2023-11-27 22:08:00 +00:00
2023-11-28 19:34:33 +00:00
def build_arq_burst_nack(self, session_id: bytes, snr: int, speed_level: int, len_arq_rx_frame_buffer: int, n_frames_per_burst: int):
# nack_frame = bytearray(self.length_sig1_frame)
# nack_frame[:1] = bytes([FR_TYPE.BURST_NACK.value])
# nack_frame[1:2] = self.session_id
# nack_frame[2:3] = helpers.snr_to_bytes(0)
# nack_frame[3:4] = bytes([int(self.speed_level)])
# nack_frame[4:5] = bytes([int(tx_n_frames_per_burst)])
# nack_frame[5:9] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
payload = {
"session_id": session_id,
"snr": helpers.snr_to_bytes(snr),
"speed_level": bytes([speed_level]),
"len_arq_rx_frame_buffer": bytes([len_arq_rx_frame_buffer]),
"n_frames_per_burst": bytes([n_frames_per_burst])
}
return self.construct(FR_TYPE.BURST_NACK, payload)
def build_arq_frame_nack(self, session_id: bytes, snr: int, speed_level: int, len_arq_rx_frame_buffer: int, n_frames_per_burst: int):
# nack_frame = bytearray(self.length_sig1_frame)
# nack_frame[:1] = bytes([FR_TYPE.FR_NACK.value])
# nack_frame[1:2] = self.session_id
# nack_frame[2:3] = helpers.snr_to_bytes(snr)
# nack_frame[3:4] = bytes([int(self.speed_level)])
# nack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
payload = {
"session_id": session_id,
"snr": helpers.snr_to_bytes(snr),
"speed_level": bytes([speed_level]),
"len_arq_rx_frame_buffer": bytes([len_arq_rx_frame_buffer]),
"n_frames_per_burst": bytes([n_frames_per_burst])
}
return self.construct(FR_TYPE.FR_NACK, payload)