From b29c539d5f87b5b60ab7d5764c26271dc4ce8748 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sun, 3 Dec 2023 14:11:43 +0100 Subject: [PATCH] saving thoughts regarding to processing arq commands... --- gui/src/js/eventHandler.js | 4 +- modem/command.py | 4 +- modem/command_arq_raw.py | 36 +++ .../deprecated_data_handler.py} | 0 .../deprecated_data_handler_broadcasts.py} | 2 +- ...eprecated_data_handler_data_broadcasts.py} | 2 +- .../deprecated_data_handler_ping.py} | 2 +- modem/frame_dispatcher.py | 278 ++++++++++-------- modem/protocol_arq.py | 1 - modem/server.py | 5 +- 10 files changed, 209 insertions(+), 125 deletions(-) create mode 100644 modem/command_arq_raw.py rename modem/{data_handler.py => data cemetery/deprecated_data_handler.py} (100%) rename modem/{data_handler_broadcasts.py => data cemetery/deprecated_data_handler_broadcasts.py} (99%) rename modem/{data_handler_data_broadcasts.py => data cemetery/deprecated_data_handler_data_broadcasts.py} (99%) rename modem/{data_handler_ping.py => data cemetery/deprecated_data_handler_ping.py} (99%) diff --git a/gui/src/js/eventHandler.js b/gui/src/js/eventHandler.js index cd8f1148..7cc15dfd 100644 --- a/gui/src/js/eventHandler.js +++ b/gui/src/js/eventHandler.js @@ -23,7 +23,7 @@ export function connectionFailed(endpoint, event) { } export function stateDispatcher(data) { data = JSON.parse(data); - //console.log(data); + console.log(data); stateStore.modem_connection = "connected"; @@ -43,6 +43,8 @@ export function stateDispatcher(data) { stateStore.radio_status = data["radio_status"]; stateStore.frequency = data["radio_frequency"]; stateStore.mode = data["radio_mode"]; + + stateStore.heard_stations = data["heard_stations"]; /* self.is_arq_state = False self.is_arq_session = False diff --git a/modem/command.py b/modem/command.py index 603bb02e..87c9be12 100644 --- a/modem/command.py +++ b/modem/command.py @@ -4,9 +4,11 @@ from codec2 import FREEDV_MODE class TxCommand(): - def __init__(self, config, logger, apiParams = {}): + def __init__(self, config, logger, state_manager, modem_events, apiParams = {}): self.config = config self.logger = logger + self.state_manager = state_manager + self.modem_events = modem_events self.set_params_from_api(apiParams) self.frame_factory = DataFrameFactory(config) diff --git a/modem/command_arq_raw.py b/modem/command_arq_raw.py new file mode 100644 index 00000000..2b22bd8f --- /dev/null +++ b/modem/command_arq_raw.py @@ -0,0 +1,36 @@ +from command import TxCommand +import api_validations +from protocol_arq_iss import ISS +from protocol_arq import ARQ + +class ARQRawCommand(TxCommand): + + def __int__(self, state_manager): + # open a new arq instance here + self.initialize_arq_instance() + + def set_params_from_api(self, apiParams): + self.dxcall = apiParams['dxcall'] + if not api_validations.validate_freedata_callsign(self.dxcall): + self.dxcall = f"{self.dxcall}-0" + return super().set_params_from_api(apiParams) + + def initialize_arq_transmission_iss(self, data): + if id := self.get_id_from_frame(data): + instance = self.initialize_arq_instance() + self.states.register_arq_instance_by_id(id, instance) + instance['arq_irs'].arq_received_data_channel_opener() + + + def initialize_arq_instance(self): + self.arq = ARQ(self.config, self.event_queue, self.state_manager) + self.arq_iss = ISS(self.config, self.event_queue, self.state_manager) + + return { + 'arq': self.arq, + 'arq_irs': self.arq_irs, + 'arq_iss': self.arq_iss, + 'arq_session': self.arq_session + } + def build_frame(self): + return self.frame_factory.build_arq_connect(destination=self.dxcall, session_id=b'', isWideband=True) diff --git a/modem/data_handler.py b/modem/data cemetery/deprecated_data_handler.py similarity index 100% rename from modem/data_handler.py rename to modem/data cemetery/deprecated_data_handler.py diff --git a/modem/data_handler_broadcasts.py b/modem/data cemetery/deprecated_data_handler_broadcasts.py similarity index 99% rename from modem/data_handler_broadcasts.py rename to modem/data cemetery/deprecated_data_handler_broadcasts.py index cf28d559..d17f73e1 100644 --- a/modem/data_handler_broadcasts.py +++ b/modem/data cemetery/deprecated_data_handler_broadcasts.py @@ -9,7 +9,7 @@ import structlog import event_manager import command_qrv -from data_handler import DATA +from deprecated_data_handler import DATA TESTMODE = False diff --git a/modem/data_handler_data_broadcasts.py b/modem/data cemetery/deprecated_data_handler_data_broadcasts.py similarity index 99% rename from modem/data_handler_data_broadcasts.py rename to modem/data cemetery/deprecated_data_handler_data_broadcasts.py index 8f039323..dc00c260 100644 --- a/modem/data_handler_data_broadcasts.py +++ b/modem/data cemetery/deprecated_data_handler_data_broadcasts.py @@ -5,7 +5,7 @@ import time import modem import base64 import ujson as json -from data_handler import DATA +from deprecated_data_handler import DATA class DATABROADCAST(DATA): """Terminal Node Controller for FreeDATA""" diff --git a/modem/data_handler_ping.py b/modem/data cemetery/deprecated_data_handler_ping.py similarity index 99% rename from modem/data_handler_ping.py rename to modem/data cemetery/deprecated_data_handler_ping.py index 36756a28..af908b91 100644 --- a/modem/data_handler_ping.py +++ b/modem/data cemetery/deprecated_data_handler_ping.py @@ -4,7 +4,7 @@ from codec2 import FREEDV_MODE import helpers import uuid import structlog -from data_handler import DATA +from deprecated_data_handler import DATA class PING(DATA): def __init__(self, config, event_queue, states): super().__init__(config, event_queue, states) diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index bd3395e8..0754c8d2 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -10,9 +10,9 @@ import event_manager from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, MODEM_TRANSMIT_QUEUE from data_frame_factory import DataFrameFactory -from data_handler_broadcasts import BROADCAST -from data_handler_data_broadcasts import DATABROADCAST -from data_handler_ping import PING +#from deprecated_data_handler_broadcasts import BROADCAST +#from deprecated_data_handler_data_broadcasts import DATABROADCAST +#from deprecated_data_handler_ping import PING from protocol_arq_iss import ISS from protocol_arq_irs import IRS @@ -58,7 +58,7 @@ class DISPATCHER(): self.states = states self._initialize_handlers(config, event_queue, states) - self._initialize_dispatchers() + #self._initialize_dispatchers() self.data_queue_transmit = DATA_QUEUE_TRANSMIT self.data_queue_received = data_q_rx @@ -70,9 +70,9 @@ class DISPATCHER(): self.frame_factory = DataFrameFactory(config) - self.broadcasts = BROADCAST(config, event_queue, states) - self.data_broadcasts = DATABROADCAST(config, event_queue, states) - self.ping = PING(config, event_queue, states) + #self.broadcasts = BROADCAST(config, event_queue, states) + #self.data_broadcasts = DATABROADCAST(config, event_queue, states) + #self.ping = PING(config, event_queue, states) self.arq = ARQ(config, event_queue, states) self.arq_irs = IRS(config, event_queue, states) @@ -81,6 +81,159 @@ class DISPATCHER(): self.event_manager = event_manager.EventManager([event_queue]) + + + def start(self): + """Starts worker threads for transmit and receive operations.""" + threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start() + threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start() + + def worker_transmit(self) -> None: + """Dispatch incoming UI instructions for transmitting operations""" + while True: + command = self.data_queue_transmit.get() + command.run(self.event_queue, MODEM_TRANSMIT_QUEUE) + + def worker_receive(self) -> None: + """Queue received data for processing""" + while True: + data = self.data_queue_received.get() + self.new_process_data( + data['payload'], + data['freedv'], + data['bytes_per_frame'], + data['snr'], + data['frequency_offset'], + ) + + def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr, offset) -> None: + # get frame as dictionary + deconstructed_frame = self.frame_factory.deconstruct(bytes_out) + frametype = deconstructed_frame["frame_type_int"] + + if frametype not in self.FRAME_HANDLER: + self.log.warning( + "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + return + + # instantiate handler + handler_class = self.FRAME_HANDLER[frametype]['class'] + handler = handler_class(self.FRAME_HANDLER[frametype]['name'], + self.config, + self.states, + self.event_manager, + MODEM_TRANSMIT_QUEUE, + self.arq_sessions) + + handler.handle(deconstructed_frame, snr, offset, freedv, bytes_per_frame) + + + + def get_id_from_frame(self, data): + if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]: + return data[13:14] + return None + + def initialize_arq_instance(self): + self.arq = ARQ(self.config, self.event_queue, self.states) + self.arq_irs = IRS(self.config, self.event_queue, self.states) + self.arq_iss = ISS(self.config, self.event_queue, self.states) + self.arq_session = SESSION(self.config, self.event_queue, self.states) + + return { + 'arq': self.arq, + 'arq_irs': self.arq_irs, + 'arq_iss': self.arq_iss, + 'arq_session': self.arq_session + } + + def initialize_arq_transmission_irs(self, data): + if id := self.get_id_from_frame(data): + instance = self.initialize_arq_instance() + self.states.register_arq_instance_by_id(id, instance) + instance['arq_irs'].arq_received_data_channel_opener() + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None: + """ + Process incoming data and decide what to do with the frame. + + Args: + bytes_out: + freedv: + bytes_per_frame: + snr: + + Returns: + + """ + # get frame as dictionary + deconstructed_frame = self.frame_factory.deconstruct(bytes_out) + frametype = deconstructed_frame["frame_type_int"] + + if frametype not in self.rx_dispatcher: + self.log.warning( + "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + return + + # Process frametypes requiring a different set of arguments. + if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: + self.arq_irs.arq_data_received( + deconstructed_frame, bytes_per_frame, snr, freedv + ) + + # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync + # if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: + # self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") + # self.c_lib.freedv_set_sync(freedv, 0) + return + + # TESTFRAMES + if frametype == FR_TYPE.TEST_FRAME.value: + self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame) + return + + # Process frames "known" by rx_dispatcher + # self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....") + self.rx_dispatcher[frametype][0](deconstructed_frame, snr) + def _initialize_dispatchers(self): """Initializes dispatchers for handling different frame types.""" # Dictionary of functions and log messages used in process_data @@ -129,114 +282,3 @@ class DISPATCHER(): FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"), FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"), } - - def start(self): - """Starts worker threads for transmit and receive operations.""" - threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start() - threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start() - - def worker_transmit(self) -> None: - """Dispatch incoming UI instructions for transmitting operations""" - while True: - command = self.data_queue_transmit.get() - command.run(self.event_queue, MODEM_TRANSMIT_QUEUE) - - def worker_receive(self) -> None: - """Queue received data for processing""" - while True: - data = self.data_queue_received.get() - self.new_process_data( - data['payload'], - data['freedv'], - data['bytes_per_frame'], - data['snr'], - data['frequency_offset'], - ) - - def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr, offset) -> None: - # get frame as dictionary - deconstructed_frame = self.frame_factory.deconstruct(bytes_out) - frametype = deconstructed_frame["frame_type_int"] - - if frametype not in self.FRAME_HANDLER: - self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) - return - - # instantiate handler - handler_class = self.FRAME_HANDLER[frametype]['class'] - handler = handler_class(self.FRAME_HANDLER[frametype]['name'], - self.config, - self.states, - self.event_manager, - MODEM_TRANSMIT_QUEUE, - self.arq_sessions) - - handler.handle(deconstructed_frame, snr, offset, freedv, bytes_per_frame) - - def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None: - """ - Process incoming data and decide what to do with the frame. - - Args: - bytes_out: - freedv: - bytes_per_frame: - snr: - - Returns: - - """ - # get frame as dictionary - deconstructed_frame = self.frame_factory.deconstruct(bytes_out) - frametype = deconstructed_frame["frame_type_int"] - - if frametype not in self.rx_dispatcher: - self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) - return - - # Process frametypes requiring a different set of arguments. - if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: - self.arq_irs.arq_data_received( - deconstructed_frame, bytes_per_frame, snr, freedv - ) - - # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync - # if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: - # self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") - # self.c_lib.freedv_set_sync(freedv, 0) - return - - # TESTFRAMES - if frametype == FR_TYPE.TEST_FRAME.value: - self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame) - return - - # Process frames "known" by rx_dispatcher - # self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....") - self.rx_dispatcher[frametype][0](deconstructed_frame, snr) - - def get_id_from_frame(self, data): - if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]: - return data[13:14] - return None - - def initialize_arq_instance(self): - self.arq = ARQ(self.config, self.event_queue, self.states) - self.arq_irs = IRS(self.config, self.event_queue, self.states) - self.arq_iss = ISS(self.config, self.event_queue, self.states) - self.arq_session = SESSION(self.config, self.event_queue, self.states) - - return { - 'arq': self.arq, - 'arq_irs': self.arq_irs, - 'arq_iss': self.arq_iss, - 'arq_session': self.arq_session - } - - def initialize_arq_transmission_irs(self, data): - if id := self.get_id_from_frame(data): - instance = self.initialize_arq_instance() - self.states.register_arq_instance_by_id(id, instance) - instance['arq_irs'].arq_received_data_channel_opener() diff --git a/modem/protocol_arq.py b/modem/protocol_arq.py index 855fba9a..e0dcc916 100644 --- a/modem/protocol_arq.py +++ b/modem/protocol_arq.py @@ -11,7 +11,6 @@ from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS from modem_frametypes import FRAME_TYPE as FR_TYPE import event_manager -from data_handler import DATA TESTMODE = False class ARQ: def __init__(self, config, event_queue, states): diff --git a/modem/server.py b/modem/server.py index be0da2ec..52f32b2b 100644 --- a/modem/server.py +++ b/modem/server.py @@ -16,6 +16,8 @@ import command_cq import command_ping import command_feq import command_test +import command_arq_raw + from queues import DATA_QUEUE_TRANSMIT as tx_cmd_queue app = Flask(__name__) @@ -82,7 +84,7 @@ def validate(req, param, validator, isRequired = True): # Takes a transmit command and puts it in the transmit command queue def enqueue_tx_command(cmd_class, params = {}): - command = cmd_class(app.config_manager.read(), app.logger, params) + command = cmd_class(app.config_manager.read(), app.logger, app.state_manager, app.modem_events, params) tx_cmd_queue.put(command) app.logger.info(f"Command {command.get_name()} enqueued.") @@ -210,6 +212,7 @@ def post_modem_send_raw(): return api_response({"info": "endpoint for SENDING RAW DATA via POST"}) if not app.state_manager.is_modem_running: api_abort('Modem not running', 503) + enqueue_tx_command(command_arq_raw.ARQRawCommand, request.json) # server_commands.modem_arq_send_raw(request.json) return "Not implemented yet"