diff --git a/modem/arq_data_formatter.py b/modem/arq_data_formatter.py new file mode 100644 index 00000000..d81b1fd6 --- /dev/null +++ b/modem/arq_data_formatter.py @@ -0,0 +1,24 @@ +# File: arq_data_formatter.py + +import json + +class ARQDataFormatter: + def __init__(self): + pass + + def encapsulate(self, data, type_key="p2pmsg"): + """Encapsulate data into the specified format with the given type key.""" + formatted_data = {type_key: data} + return json.dumps(formatted_data) + + def decapsulate(self, byte_data): + """Decapsulate data from the specified format, returning both the data and the type.""" + try: + json_data = byte_data.decode('utf-8') # Decode byte array to string + parsed_data = json.loads(json_data) + if parsed_data and isinstance(parsed_data, dict): + for key, value in parsed_data.items(): + return key, value + return "raw", byte_data + except (json.JSONDecodeError, UnicodeDecodeError): + return "raw", byte_data diff --git a/modem/arq_received_data_dispatcher.py b/modem/arq_received_data_dispatcher.py new file mode 100644 index 00000000..b8572841 --- /dev/null +++ b/modem/arq_received_data_dispatcher.py @@ -0,0 +1,35 @@ +# File: arq_received_data_dispatcher.py + +import structlog +from arq_data_formatter import ARQDataFormatter + +class ARQReceivedDataDispatcher: + def __init__(self): + self.logger = structlog.get_logger(type(self).__name__) + self.arq_data_formatter = ARQDataFormatter() + self.endpoints = { + "p2pmsg": self.handle_p2pmsg, + "test": self.handle_test, + } + + def log(self, message, isWarning=False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def dispatch(self, byte_data): + """Use the data formatter to decapsulate and then dispatch data to the appropriate endpoint.""" + type_key, data = self.arq_data_formatter.decapsulate(byte_data) + if type_key in self.endpoints: + self.endpoints[type_key](data) + else: + self.handle_raw(data) + + def handle_p2pmsg(self, data): + self.log(f"Handling p2pmsg: {data}") + + def handle_raw(self, data): + self.log(f"Handling raw data: {data}") + + def handle_test(self, data): + self.log(f"Handling test data: {data}") diff --git a/modem/arq_session.py b/modem/arq_session.py index 9df7cc68..71ef28e3 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -5,6 +5,8 @@ import structlog from event_manager import EventManager from modem_frametypes import FRAME_TYPE import time +from arq_received_data_dispatcher import ARQReceivedDataDispatcher + class ARQSession(): @@ -44,6 +46,7 @@ class ARQSession(): self.frame_factory = data_frame_factory.DataFrameFactory(self.config) self.event_frame_received = threading.Event() + self.arq_received_data_dispatcher = ARQReceivedDataDispatcher() self.id = None self.session_started = time.time() self.session_ended = 0 @@ -88,10 +91,13 @@ class ARQSession(): if self.state in self.STATE_TRANSITION: if frame_type in self.STATE_TRANSITION[self.state]: action_name = self.STATE_TRANSITION[self.state][frame_type] - getattr(self, action_name)(frame) + received_data = getattr(self, action_name)(frame) + if received_data: + self.arq_received_data_dispatcher.dispatch(received_data) + return - self.log(f"Ignoring unknow transition from state {self.state.name} with frame {frame['frame_type']}") + self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") def is_session_outdated(self): session_alivetime = time.time() - self.session_max_age diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index cbdb02e1..7eb7f821 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -5,7 +5,6 @@ from modem_frametypes import FRAME_TYPE from codec2 import FREEDV_MODE from enum import Enum import time -from data_dispatcher import DataDispatcher class IRS_State(Enum): NEW = 0 @@ -193,7 +192,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.set_state(IRS_State.ENDED) self.event_manager.send_arq_session_finished( False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics()) - DataDispatcher().dispatch(self.received_data) + return self.received_data else: ack = self.frame_factory.build_arq_burst_ack(self.id, @@ -209,7 +208,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.set_state(IRS_State.FAILED) self.event_manager.send_arq_session_finished( False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics()) - + return False def calibrate_speed_settings(self): self.speed_level = 0 # for now stay at lowest speed level diff --git a/modem/data_dispatcher.py b/modem/data_dispatcher.py deleted file mode 100644 index f3593f05..00000000 --- a/modem/data_dispatcher.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -import structlog -class DataDispatcher: - def __init__(self): - self.logger = structlog.get_logger(type(self).__name__) - - # endpoints - self.endpoints = { - "p2pmsg": self.handle_p2pmsg, - "test": self.handle_test, - } - - def log(self, message, isWarning = False): - msg = f"[{type(self).__name__}]: {message}" - logger = self.logger.warn if isWarning else self.logger.info - logger(msg) - - def encapsulate(self, data, type_key="p2pmsg"): - """Encapsulate data into the specified format with the given type key.""" - formatted_data = {type_key: data} - return json.dumps(formatted_data) - - def decapsulate(self, byte_data): - """Decapsulate data from the specified format, returning both the data and the type.""" - try: - json_data = byte_data.decode('utf-8') # Decode byte array to string - parsed_data = json.loads(json_data) - if parsed_data and isinstance(parsed_data, dict): - for key, value in parsed_data.items(): - return key, value # Return type and data - return "raw", byte_data # Treat as raw data if no matching type is found - except (json.JSONDecodeError, UnicodeDecodeError): - return "raw", byte_data # Return original data as raw if there's an error - - def dispatch(self, byte_data): - """Decapsulate and dispatch data to the appropriate endpoint based on its type.""" - type_key, data = self.decapsulate(byte_data) - if type_key in self.endpoints: - self.endpoints[type_key](data) - else: - # Use the default handler for unrecognized types - self.handle_raw(data) - - def handle_p2pmsg(self, data): - self.log(f"Handling p2pmsg: {data}") - - def handle_raw(self, data): - self.log(f"Handling raw data: {data}") - - def handle_test(self, data): - self.log(f"Handling test data: {data}") \ No newline at end of file diff --git a/tests/test_data_dispatcher.py b/tests/test_data_dispatcher.py index 6b30c8e8..90b64fa9 100644 --- a/tests/test_data_dispatcher.py +++ b/tests/test_data_dispatcher.py @@ -2,21 +2,23 @@ import sys sys.path.append('modem') import unittest -from data_dispatcher import DataDispatcher +from arq_data_formatter import ARQDataFormatter +from arq_received_data_dispatcher import ARQReceivedDataDispatcher class TestDispatcher(unittest.TestCase): @classmethod def setUpClass(cls): - cls.data_dispatcher = DataDispatcher() + cls.data_dispatcher = ARQReceivedDataDispatcher() + cls.data_formatter = ARQDataFormatter() def testEncapsulator(self): message_type = "p2pmsg" message_data = {"message": "Hello, P2P World!"} - encapsulated = self.data_dispatcher.encapsulate(message_data, message_type) - type, decapsulated = self.data_dispatcher.decapsulate(encapsulated.encode('utf-8')) + encapsulated = self.data_formatter.encapsulate(message_data, message_type) + type, decapsulated = self.data_formatter.decapsulate(encapsulated.encode('utf-8')) self.assertEqual(type, message_type) self.assertEqual(decapsulated, message_data) @@ -24,7 +26,7 @@ class TestDispatcher(unittest.TestCase): message_type = "test" message_data = {"message": "Hello, P2P World!"} - encapsulated = self.data_dispatcher.encapsulate(message_data, message_type) + encapsulated = self.data_formatter.encapsulate(message_data, message_type) self.data_dispatcher.dispatch(encapsulated.encode('utf-8'))