Merge pull request #618 from DJ2LS/dev-data-dispatcher

This commit is contained in:
DJ2LS 2024-01-23 09:28:44 +01:00 committed by GitHub
commit e7cbb7b514
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 168 additions and 21 deletions

View file

@ -0,0 +1,83 @@
# File: arq_data_type_handler.py
import structlog
import lzma
import gzip
class ARQDataTypeHandler:
def __init__(self):
self.logger = structlog.get_logger(type(self).__name__)
self.handlers = {
"raw": {
'prepare': self.prepare_raw,
'handle': self.handle_raw
},
"raw_lzma": {
'prepare': self.prepare_raw_lzma,
'handle': self.handle_raw_lzma
},
"raw_gzip": {
'prepare': self.prepare_raw_gzip,
'handle': self.handle_raw_gzip
},
"p2pmsg_lzma": {
'prepare': self.prepare_p2pmsg_lzma,
'handle': self.handle_p2pmsg_lzma
},
}
def dispatch(self, type_byte: int, data: bytearray):
endpoint_name = list(self.handlers.keys())[type_byte]
if endpoint_name in self.handlers and 'handle' in self.handlers[endpoint_name]:
return self.handlers[endpoint_name]['handle'](data)
else:
self.log(f"Unknown handling endpoint: {endpoint_name}", isWarning=True)
def prepare(self, data: bytearray, endpoint_name="raw" ):
if endpoint_name in self.handlers and 'prepare' in self.handlers[endpoint_name]:
return self.handlers[endpoint_name]['prepare'](data), list(self.handlers.keys()).index(endpoint_name)
else:
self.log(f"Unknown preparation endpoint: {endpoint_name}", isWarning=True)
def log(self, message, isWarning=False):
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
logger(msg)
def prepare_raw(self, data):
self.log(f"Preparing uncompressed data: {len(data)} Bytes")
return data
def handle_raw(self, data):
self.log(f"Handling uncompressed data: {len(data)} Bytes")
return data
def prepare_raw_lzma(self, data):
compressed_data = lzma.compress(data)
self.log(f"Preparing LZMA compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
return compressed_data
def handle_raw_lzma(self, data):
decompressed_data = lzma.decompress(data)
self.log(f"Handling LZMA compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
return decompressed_data
def prepare_raw_gzip(self, data):
compressed_data = gzip.compress(data)
self.log(f"Preparing GZIP compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
return compressed_data
def handle_raw_gzip(self, data):
decompressed_data = gzip.decompress(data)
self.log(f"Handling GZIP compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
return decompressed_data
def prepare_p2pmsg_lzma(self, data):
compressed_data = lzma.compress(data)
self.log(f"Preparing LZMA compressed P2PMSG data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
return compressed_data
def handle_p2pmsg_lzma(self, data):
decompressed_data = lzma.decompress(data)
self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
return decompressed_data

View file

@ -5,6 +5,8 @@ import structlog
from event_manager import EventManager from event_manager import EventManager
from modem_frametypes import FRAME_TYPE from modem_frametypes import FRAME_TYPE
import time import time
from arq_data_type_handler import ARQDataTypeHandler
class ARQSession(): class ARQSession():
@ -44,6 +46,7 @@ class ARQSession():
self.frame_factory = data_frame_factory.DataFrameFactory(self.config) self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
self.event_frame_received = threading.Event() self.event_frame_received = threading.Event()
self.arq_data_type_handler = ARQDataTypeHandler()
self.id = None self.id = None
self.session_started = time.time() self.session_started = time.time()
self.session_ended = 0 self.session_ended = 0
@ -88,10 +91,13 @@ class ARQSession():
if self.state in self.STATE_TRANSITION: if self.state in self.STATE_TRANSITION:
if frame_type in self.STATE_TRANSITION[self.state]: if frame_type in self.STATE_TRANSITION[self.state]:
action_name = self.STATE_TRANSITION[self.state][frame_type] action_name = self.STATE_TRANSITION[self.state][frame_type]
getattr(self, action_name)(frame) received_data, type_byte = getattr(self, action_name)(frame)
if isinstance(received_data, bytearray) and isinstance(type_byte, int):
self.arq_data_type_handler.dispatch(type_byte, received_data)
return return
self.log(f"Ignoring unknow transition from state {self.state.name} with frame {frame['frame_type']}") self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
def is_session_outdated(self): def is_session_outdated(self):
session_alivetime = time.time() - self.session_max_age session_alivetime = time.time() - self.session_max_age

View file

@ -5,6 +5,7 @@ from modem_frametypes import FRAME_TYPE
from codec2 import FREEDV_MODE from codec2 import FREEDV_MODE
from enum import Enum from enum import Enum
import time import time
class IRS_State(Enum): class IRS_State(Enum):
NEW = 0 NEW = 0
OPEN_ACK_SENT = 1 OPEN_ACK_SENT = 1
@ -68,6 +69,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.state = IRS_State.NEW self.state = IRS_State.NEW
self.state_enum = IRS_State # needed for access State enum from outside self.state_enum = IRS_State # needed for access State enum from outside
self.type_byte = None
self.total_length = 0 self.total_length = 0
self.total_crc = '' self.total_crc = ''
self.received_data = None self.received_data = None
@ -114,6 +116,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
if not self.abort: if not self.abort:
self.set_state(IRS_State.OPEN_ACK_SENT) self.set_state(IRS_State.OPEN_ACK_SENT)
return None, None
def send_info_ack(self, info_frame): def send_info_ack(self, info_frame):
# Get session info from ISS # Get session info from ISS
@ -121,6 +124,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.total_length = info_frame['total_length'] self.total_length = info_frame['total_length']
self.total_crc = info_frame['total_crc'] self.total_crc = info_frame['total_crc']
self.dx_snr.append(info_frame['snr']) self.dx_snr.append(info_frame['snr'])
self.type_byte = info_frame['type']
self.log(f"New transfer of {self.total_length} bytes") self.log(f"New transfer of {self.total_length} bytes")
self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length, self.state.name) self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length, self.state.name)
@ -134,7 +138,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
if not self.abort: if not self.abort:
self.set_state(IRS_State.INFO_ACK_SENT) self.set_state(IRS_State.INFO_ACK_SENT)
return None, None
def process_incoming_data(self, frame): def process_incoming_data(self, frame):
if frame['offset'] != self.received_bytes: if frame['offset'] != self.received_bytes:
@ -174,7 +178,7 @@ class ARQSessionIRS(arq_session.ARQSession):
# self.transmitted_acks += 1 # self.transmitted_acks += 1
self.set_state(IRS_State.BURST_REPLY_SENT) self.set_state(IRS_State.BURST_REPLY_SENT)
self.launch_transmit_and_wait(ack, self.TIMEOUT_DATA, mode=FREEDV_MODE.signalling) self.launch_transmit_and_wait(ack, self.TIMEOUT_DATA, mode=FREEDV_MODE.signalling)
return return None, None
if self.final_crc_matches(): if self.final_crc_matches():
self.log("All data received successfully!") self.log("All data received successfully!")
@ -192,6 +196,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.event_manager.send_arq_session_finished( self.event_manager.send_arq_session_finished(
False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics()) False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics())
return self.received_data, self.type_byte
else: else:
ack = self.frame_factory.build_arq_burst_ack(self.id, ack = self.frame_factory.build_arq_burst_ack(self.id,
@ -207,7 +212,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.set_state(IRS_State.FAILED) self.set_state(IRS_State.FAILED)
self.event_manager.send_arq_session_finished( self.event_manager.send_arq_session_finished(
False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics()) False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
return False, False
def calibrate_speed_settings(self): def calibrate_speed_settings(self):
self.speed_level = 0 # for now stay at lowest speed level self.speed_level = 0 # for now stay at lowest speed level
@ -231,3 +236,4 @@ class ARQSessionIRS(arq_session.ARQSession):
self.set_state(IRS_State.ABORTED) self.set_state(IRS_State.ABORTED)
self.event_manager.send_arq_session_finished( self.event_manager.send_arq_session_finished(
False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics()) False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
return None, None

View file

@ -53,13 +53,13 @@ class ARQSessionISS(arq_session.ARQSession):
} }
} }
def __init__(self, config: dict, modem, dxcall: str, data: bytearray, state_manager): def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes):
super().__init__(config, modem, dxcall) super().__init__(config, modem, dxcall)
self.state_manager = state_manager self.state_manager = state_manager
self.data = data self.data = data
self.total_length = len(data) self.total_length = len(data)
self.data_crc = '' self.data_crc = ''
self.type_byte = type_byte
self.confirmed_bytes = 0 self.confirmed_bytes = 0
self.state = ISS_State.NEW self.state = ISS_State.NEW
@ -119,11 +119,13 @@ class ARQSessionISS(arq_session.ARQSession):
info_frame = self.frame_factory.build_arq_session_info(self.id, self.total_length, info_frame = self.frame_factory.build_arq_session_info(self.id, self.total_length,
helpers.get_crc_32(self.data), helpers.get_crc_32(self.data),
self.snr[0]) self.snr[0], self.type_byte)
self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
self.set_state(ISS_State.INFO_SENT) self.set_state(ISS_State.INFO_SENT)
return None, None
def send_data(self, irs_frame): def send_data(self, irs_frame):
self.set_speed_and_frames_per_burst(irs_frame) self.set_speed_and_frames_per_burst(irs_frame)
@ -137,15 +139,15 @@ class ARQSessionISS(arq_session.ARQSession):
# check if we received an abort flag # check if we received an abort flag
if irs_frame["flag"]["ABORT"]: if irs_frame["flag"]["ABORT"]:
self.transmission_aborted(irs_frame) self.transmission_aborted(irs_frame)
return return None, None
if irs_frame["flag"]["FINAL"]: if irs_frame["flag"]["FINAL"]:
if self.confirmed_bytes == self.total_length and irs_frame["flag"]["CHECKSUM"]: if self.confirmed_bytes == self.total_length and irs_frame["flag"]["CHECKSUM"]:
self.transmission_ended(irs_frame) self.transmission_ended(irs_frame)
return
else: else:
self.transmission_failed() self.transmission_failed()
return return None, None
payload_size = self.get_data_payload_size() payload_size = self.get_data_payload_size()
burst = [] burst = []
@ -158,6 +160,7 @@ class ARQSessionISS(arq_session.ARQSession):
burst.append(data_frame) burst.append(data_frame)
self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_CONNECT, mode='auto') self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_CONNECT, mode='auto')
self.set_state(ISS_State.BURST_SENT) self.set_state(ISS_State.BURST_SENT)
return None, None
def transmission_ended(self, irs_frame): def transmission_ended(self, irs_frame):
# final function for sucessfully ended transmissions # final function for sucessfully ended transmissions
@ -166,6 +169,7 @@ class ARQSessionISS(arq_session.ARQSession):
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics()) self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics())
self.state_manager.remove_arq_iss_session(self.id) self.state_manager.remove_arq_iss_session(self.id)
return None, None
def transmission_failed(self, irs_frame=None): def transmission_failed(self, irs_frame=None):
# final function for failed transmissions # final function for failed transmissions
@ -173,6 +177,7 @@ class ARQSessionISS(arq_session.ARQSession):
self.set_state(ISS_State.FAILED) self.set_state(ISS_State.FAILED)
self.log(f"Transmission failed!") self.log(f"Transmission failed!")
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=self.calculate_session_statistics()) self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=self.calculate_session_statistics())
return None, None
def abort_transmission(self, irs_frame=None): def abort_transmission(self, irs_frame=None):
# function for starting the abort sequence # function for starting the abort sequence
@ -202,4 +207,5 @@ class ARQSessionISS(arq_session.ARQSession):
self.event_manager.send_arq_session_finished( self.event_manager.send_arq_session_finished(
True, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics()) True, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
self.state_manager.remove_arq_iss_session(self.id) self.state_manager.remove_arq_iss_session(self.id)
return None, None

View file

@ -3,6 +3,8 @@ import queue
from codec2 import FREEDV_MODE from codec2 import FREEDV_MODE
import structlog import structlog
from state_manager import StateManager from state_manager import StateManager
from arq_data_type_handler import ARQDataTypeHandler
class TxCommand(): class TxCommand():
@ -13,6 +15,7 @@ class TxCommand():
self.event_manager = event_manager self.event_manager = event_manager
self.set_params_from_api(apiParams) self.set_params_from_api(apiParams)
self.frame_factory = DataFrameFactory(config) self.frame_factory = DataFrameFactory(config)
self.arq_data_type_handler = ARQDataTypeHandler()
def set_params_from_api(self, apiParams): def set_params_from_api(self, apiParams):
pass pass

View file

@ -13,13 +13,20 @@ class ARQRawCommand(TxCommand):
if not api_validations.validate_freedata_callsign(self.dxcall): if not api_validations.validate_freedata_callsign(self.dxcall):
self.dxcall = f"{self.dxcall}-0" self.dxcall = f"{self.dxcall}-0"
try:
self.type = apiParams['type']
except KeyError:
self.type = "raw"
self.data = base64.b64decode(apiParams['data']) self.data = base64.b64decode(apiParams['data'])
def run(self, event_queue: Queue, modem): def run(self, event_queue: Queue, modem):
self.emit_event(event_queue) self.emit_event(event_queue)
self.logger.info(self.log_message()) self.logger.info(self.log_message())
iss = ARQSessionISS(self.config, modem, self.dxcall, self.data, self.state_manager) prepared_data, type_byte = self.arq_data_type_handler.prepare(self.data, self.type)
iss = ARQSessionISS(self.config, modem, self.dxcall, self.state_manager, prepared_data, type_byte)
if iss.id: if iss.id:
self.state_manager.register_arq_iss_session(iss) self.state_manager.register_arq_iss_session(iss)
iss.start() iss.start()

View file

@ -15,7 +15,6 @@ class DataFrameFactory:
'FINAL': 0, # Bit-position for indicating the FINAL state 'FINAL': 0, # Bit-position for indicating the FINAL state
'ABORT': 1, # Bit-position for indicating the ABORT request 'ABORT': 1, # Bit-position for indicating the ABORT request
'CHECKSUM': 2, # Bit-position for indicating the CHECKSUM is correct or not 'CHECKSUM': 2, # Bit-position for indicating the CHECKSUM is correct or not
'ENABLE_COMPRESSION': 3 # Bit-position for indicating compression is enabled
} }
def __init__(self, config): def __init__(self, config):
@ -118,6 +117,7 @@ class DataFrameFactory:
"total_crc": 4, "total_crc": 4,
"snr": 1, "snr": 1,
"flag": 1, "flag": 1,
"type": 1,
} }
self.template_list[FR_TYPE.ARQ_SESSION_INFO_ACK.value] = { self.template_list[FR_TYPE.ARQ_SESSION_INFO_ACK.value] = {
@ -218,7 +218,7 @@ class DataFrameFactory:
elif key in ["session_id", "speed_level", elif key in ["session_id", "speed_level",
"frames_per_burst", "version", "frames_per_burst", "version",
"offset", "total_length", "state"]: "offset", "total_length", "state", "type"]:
extracted_data[key] = int.from_bytes(data, 'big') extracted_data[key] = int.from_bytes(data, 'big')
elif key in ["snr"]: elif key in ["snr"]:
@ -350,10 +350,8 @@ class DataFrameFactory:
} }
return self.construct(FR_TYPE.ARQ_SESSION_OPEN_ACK, payload) return self.construct(FR_TYPE.ARQ_SESSION_OPEN_ACK, payload)
def build_arq_session_info(self, session_id: int, total_length: int, total_crc: bytes, snr, flag_compression=False): def build_arq_session_info(self, session_id: int, total_length: int, total_crc: bytes, snr, type):
flag = 0b00000000 flag = 0b00000000
if flag_compression:
flag = helpers.set_flag(flag, 'ENABLE_COMPRESSION', True, self.ARQ_FLAGS)
payload = { payload = {
"session_id": session_id.to_bytes(1, 'big'), "session_id": session_id.to_bytes(1, 'big'),
@ -361,6 +359,7 @@ class DataFrameFactory:
"total_crc": total_crc, "total_crc": total_crc,
"snr": helpers.snr_to_bytes(1), "snr": helpers.snr_to_bytes(1),
"flag": flag.to_bytes(1, 'big'), "flag": flag.to_bytes(1, 'big'),
"type": type.to_bytes(1, 'big'),
} }
return self.construct(FR_TYPE.ARQ_SESSION_INFO, payload) return self.construct(FR_TYPE.ARQ_SESSION_INFO, payload)
@ -377,7 +376,6 @@ class DataFrameFactory:
} }
return self.construct(FR_TYPE.ARQ_STOP_ACK, payload) return self.construct(FR_TYPE.ARQ_STOP_ACK, payload)
def build_arq_session_info_ack(self, session_id, total_crc, snr, speed_level, frames_per_burst, flag_final=False, flag_abort=False): def build_arq_session_info_ack(self, session_id, total_crc, snr, speed_level, frames_per_burst, flag_final=False, flag_abort=False):
flag = 0b00000000 flag = 0b00000000
if flag_final: if flag_final:

View file

@ -31,7 +31,6 @@ class FrameHandler():
def is_frame_for_me(self): def is_frame_for_me(self):
call_with_ssid = self.config['STATION']['mycall'] + "-" + str(self.config['STATION']['myssid']) call_with_ssid = self.config['STATION']['mycall'] + "-" + str(self.config['STATION']['myssid'])
ft = self.details['frame']['frame_type'] ft = self.details['frame']['frame_type']
print(self.details)
valid = False valid = False
# Check for callsign checksum # Check for callsign checksum
if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']: if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']:

View file

@ -126,12 +126,13 @@ class TestARQSession(unittest.TestCase):
def testARQSessionSmallPayload(self): def testARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability # set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 50 self.loss_probability = 0
self.establishChannels() self.establishChannels()
params = { params = {
'dxcall': "XX1XXX-1", 'dxcall': "XX1XXX-1",
'data': base64.b64encode(bytes("Hello world!", encoding="utf-8")), 'data': base64.b64encode(bytes("Hello world!", encoding="utf-8")),
'type': "raw_lzma"
} }
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem) cmd.run(self.iss_event_queue, self.iss_modem)
@ -146,6 +147,7 @@ class TestARQSession(unittest.TestCase):
params = { params = {
'dxcall': "XX1XXX-1", 'dxcall': "XX1XXX-1",
'data': base64.b64encode(np.random.bytes(1000)), 'data': base64.b64encode(np.random.bytes(1000)),
'type': "raw_lzma"
} }
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem) cmd.run(self.iss_event_queue, self.iss_modem)

View file

@ -0,0 +1,37 @@
import sys
sys.path.append('modem')
import unittest
from arq_data_type_handler import ARQDataTypeHandler
class TestDispatcher(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.arq_data_type_handler = ARQDataTypeHandler()
def testDataTypeHandlerRaw(self):
# Example usage
example_data = b"Hello FreeDATA!"
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, "raw")
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data)
self.assertEqual(example_data, dispatched_data)
def testDataTypeHandlerLZMA(self):
# Example usage
example_data = b"Hello FreeDATA!"
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, "raw_lzma")
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data)
self.assertEqual(example_data, dispatched_data)
def testDataTypeHandlerGZIP(self):
# Example usage
example_data = b"Hello FreeDATA!"
formatted_data, type_byte = self.arq_data_type_handler.prepare(example_data, "raw_gzip")
dispatched_data = self.arq_data_type_handler.dispatch(type_byte, formatted_data)
self.assertEqual(example_data, dispatched_data)
if __name__ == '__main__':
unittest.main()