mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
adjusted and splitted dispatcher
This commit is contained in:
parent
47363b2521
commit
26478ef0a4
6 changed files with 76 additions and 61 deletions
24
modem/arq_data_formatter.py
Normal file
24
modem/arq_data_formatter.py
Normal file
|
@ -0,0 +1,24 @@
|
||||||
|
# File: arq_data_formatter.py
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
class ARQDataFormatter:
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def encapsulate(self, data, type_key="p2pmsg"):
|
||||||
|
"""Encapsulate data into the specified format with the given type key."""
|
||||||
|
formatted_data = {type_key: data}
|
||||||
|
return json.dumps(formatted_data)
|
||||||
|
|
||||||
|
def decapsulate(self, byte_data):
|
||||||
|
"""Decapsulate data from the specified format, returning both the data and the type."""
|
||||||
|
try:
|
||||||
|
json_data = byte_data.decode('utf-8') # Decode byte array to string
|
||||||
|
parsed_data = json.loads(json_data)
|
||||||
|
if parsed_data and isinstance(parsed_data, dict):
|
||||||
|
for key, value in parsed_data.items():
|
||||||
|
return key, value
|
||||||
|
return "raw", byte_data
|
||||||
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||||
|
return "raw", byte_data
|
35
modem/arq_received_data_dispatcher.py
Normal file
35
modem/arq_received_data_dispatcher.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
# File: arq_received_data_dispatcher.py
|
||||||
|
|
||||||
|
import structlog
|
||||||
|
from arq_data_formatter import ARQDataFormatter
|
||||||
|
|
||||||
|
class ARQReceivedDataDispatcher:
|
||||||
|
def __init__(self):
|
||||||
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
|
self.arq_data_formatter = ARQDataFormatter()
|
||||||
|
self.endpoints = {
|
||||||
|
"p2pmsg": self.handle_p2pmsg,
|
||||||
|
"test": self.handle_test,
|
||||||
|
}
|
||||||
|
|
||||||
|
def log(self, message, isWarning=False):
|
||||||
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
|
def dispatch(self, byte_data):
|
||||||
|
"""Use the data formatter to decapsulate and then dispatch data to the appropriate endpoint."""
|
||||||
|
type_key, data = self.arq_data_formatter.decapsulate(byte_data)
|
||||||
|
if type_key in self.endpoints:
|
||||||
|
self.endpoints[type_key](data)
|
||||||
|
else:
|
||||||
|
self.handle_raw(data)
|
||||||
|
|
||||||
|
def handle_p2pmsg(self, data):
|
||||||
|
self.log(f"Handling p2pmsg: {data}")
|
||||||
|
|
||||||
|
def handle_raw(self, data):
|
||||||
|
self.log(f"Handling raw data: {data}")
|
||||||
|
|
||||||
|
def handle_test(self, data):
|
||||||
|
self.log(f"Handling test data: {data}")
|
|
@ -5,6 +5,8 @@ import structlog
|
||||||
from event_manager import EventManager
|
from event_manager import EventManager
|
||||||
from modem_frametypes import FRAME_TYPE
|
from modem_frametypes import FRAME_TYPE
|
||||||
import time
|
import time
|
||||||
|
from arq_received_data_dispatcher import ARQReceivedDataDispatcher
|
||||||
|
|
||||||
|
|
||||||
class ARQSession():
|
class ARQSession():
|
||||||
|
|
||||||
|
@ -44,6 +46,7 @@ class ARQSession():
|
||||||
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
||||||
self.event_frame_received = threading.Event()
|
self.event_frame_received = threading.Event()
|
||||||
|
|
||||||
|
self.arq_received_data_dispatcher = ARQReceivedDataDispatcher()
|
||||||
self.id = None
|
self.id = None
|
||||||
self.session_started = time.time()
|
self.session_started = time.time()
|
||||||
self.session_ended = 0
|
self.session_ended = 0
|
||||||
|
@ -88,10 +91,13 @@ class ARQSession():
|
||||||
if self.state in self.STATE_TRANSITION:
|
if self.state in self.STATE_TRANSITION:
|
||||||
if frame_type in self.STATE_TRANSITION[self.state]:
|
if frame_type in self.STATE_TRANSITION[self.state]:
|
||||||
action_name = self.STATE_TRANSITION[self.state][frame_type]
|
action_name = self.STATE_TRANSITION[self.state][frame_type]
|
||||||
getattr(self, action_name)(frame)
|
received_data = getattr(self, action_name)(frame)
|
||||||
|
if received_data:
|
||||||
|
self.arq_received_data_dispatcher.dispatch(received_data)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
self.log(f"Ignoring unknow transition from state {self.state.name} with frame {frame['frame_type']}")
|
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
|
||||||
|
|
||||||
def is_session_outdated(self):
|
def is_session_outdated(self):
|
||||||
session_alivetime = time.time() - self.session_max_age
|
session_alivetime = time.time() - self.session_max_age
|
||||||
|
|
|
@ -5,7 +5,6 @@ from modem_frametypes import FRAME_TYPE
|
||||||
from codec2 import FREEDV_MODE
|
from codec2 import FREEDV_MODE
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import time
|
import time
|
||||||
from data_dispatcher import DataDispatcher
|
|
||||||
|
|
||||||
class IRS_State(Enum):
|
class IRS_State(Enum):
|
||||||
NEW = 0
|
NEW = 0
|
||||||
|
@ -193,7 +192,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.set_state(IRS_State.ENDED)
|
self.set_state(IRS_State.ENDED)
|
||||||
self.event_manager.send_arq_session_finished(
|
self.event_manager.send_arq_session_finished(
|
||||||
False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics())
|
False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics())
|
||||||
DataDispatcher().dispatch(self.received_data)
|
return self.received_data
|
||||||
else:
|
else:
|
||||||
|
|
||||||
ack = self.frame_factory.build_arq_burst_ack(self.id,
|
ack = self.frame_factory.build_arq_burst_ack(self.id,
|
||||||
|
@ -209,7 +208,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.set_state(IRS_State.FAILED)
|
self.set_state(IRS_State.FAILED)
|
||||||
self.event_manager.send_arq_session_finished(
|
self.event_manager.send_arq_session_finished(
|
||||||
False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
|
False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
|
||||||
|
return False
|
||||||
|
|
||||||
def calibrate_speed_settings(self):
|
def calibrate_speed_settings(self):
|
||||||
self.speed_level = 0 # for now stay at lowest speed level
|
self.speed_level = 0 # for now stay at lowest speed level
|
||||||
|
|
|
@ -1,51 +0,0 @@
|
||||||
import json
|
|
||||||
import structlog
|
|
||||||
class DataDispatcher:
|
|
||||||
def __init__(self):
|
|
||||||
self.logger = structlog.get_logger(type(self).__name__)
|
|
||||||
|
|
||||||
# endpoints
|
|
||||||
self.endpoints = {
|
|
||||||
"p2pmsg": self.handle_p2pmsg,
|
|
||||||
"test": self.handle_test,
|
|
||||||
}
|
|
||||||
|
|
||||||
def log(self, message, isWarning = False):
|
|
||||||
msg = f"[{type(self).__name__}]: {message}"
|
|
||||||
logger = self.logger.warn if isWarning else self.logger.info
|
|
||||||
logger(msg)
|
|
||||||
|
|
||||||
def encapsulate(self, data, type_key="p2pmsg"):
|
|
||||||
"""Encapsulate data into the specified format with the given type key."""
|
|
||||||
formatted_data = {type_key: data}
|
|
||||||
return json.dumps(formatted_data)
|
|
||||||
|
|
||||||
def decapsulate(self, byte_data):
|
|
||||||
"""Decapsulate data from the specified format, returning both the data and the type."""
|
|
||||||
try:
|
|
||||||
json_data = byte_data.decode('utf-8') # Decode byte array to string
|
|
||||||
parsed_data = json.loads(json_data)
|
|
||||||
if parsed_data and isinstance(parsed_data, dict):
|
|
||||||
for key, value in parsed_data.items():
|
|
||||||
return key, value # Return type and data
|
|
||||||
return "raw", byte_data # Treat as raw data if no matching type is found
|
|
||||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
||||||
return "raw", byte_data # Return original data as raw if there's an error
|
|
||||||
|
|
||||||
def dispatch(self, byte_data):
|
|
||||||
"""Decapsulate and dispatch data to the appropriate endpoint based on its type."""
|
|
||||||
type_key, data = self.decapsulate(byte_data)
|
|
||||||
if type_key in self.endpoints:
|
|
||||||
self.endpoints[type_key](data)
|
|
||||||
else:
|
|
||||||
# Use the default handler for unrecognized types
|
|
||||||
self.handle_raw(data)
|
|
||||||
|
|
||||||
def handle_p2pmsg(self, data):
|
|
||||||
self.log(f"Handling p2pmsg: {data}")
|
|
||||||
|
|
||||||
def handle_raw(self, data):
|
|
||||||
self.log(f"Handling raw data: {data}")
|
|
||||||
|
|
||||||
def handle_test(self, data):
|
|
||||||
self.log(f"Handling test data: {data}")
|
|
|
@ -2,21 +2,23 @@ import sys
|
||||||
sys.path.append('modem')
|
sys.path.append('modem')
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from data_dispatcher import DataDispatcher
|
from arq_data_formatter import ARQDataFormatter
|
||||||
|
from arq_received_data_dispatcher import ARQReceivedDataDispatcher
|
||||||
|
|
||||||
class TestDispatcher(unittest.TestCase):
|
class TestDispatcher(unittest.TestCase):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def setUpClass(cls):
|
def setUpClass(cls):
|
||||||
cls.data_dispatcher = DataDispatcher()
|
cls.data_dispatcher = ARQReceivedDataDispatcher()
|
||||||
|
cls.data_formatter = ARQDataFormatter()
|
||||||
|
|
||||||
|
|
||||||
def testEncapsulator(self):
|
def testEncapsulator(self):
|
||||||
message_type = "p2pmsg"
|
message_type = "p2pmsg"
|
||||||
message_data = {"message": "Hello, P2P World!"}
|
message_data = {"message": "Hello, P2P World!"}
|
||||||
|
|
||||||
encapsulated = self.data_dispatcher.encapsulate(message_data, message_type)
|
encapsulated = self.data_formatter.encapsulate(message_data, message_type)
|
||||||
type, decapsulated = self.data_dispatcher.decapsulate(encapsulated.encode('utf-8'))
|
type, decapsulated = self.data_formatter.decapsulate(encapsulated.encode('utf-8'))
|
||||||
self.assertEqual(type, message_type)
|
self.assertEqual(type, message_type)
|
||||||
self.assertEqual(decapsulated, message_data)
|
self.assertEqual(decapsulated, message_data)
|
||||||
|
|
||||||
|
@ -24,7 +26,7 @@ class TestDispatcher(unittest.TestCase):
|
||||||
message_type = "test"
|
message_type = "test"
|
||||||
message_data = {"message": "Hello, P2P World!"}
|
message_data = {"message": "Hello, P2P World!"}
|
||||||
|
|
||||||
encapsulated = self.data_dispatcher.encapsulate(message_data, message_type)
|
encapsulated = self.data_formatter.encapsulate(message_data, message_type)
|
||||||
self.data_dispatcher.dispatch(encapsulated.encode('utf-8'))
|
self.data_dispatcher.dispatch(encapsulated.encode('utf-8'))
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue