From 3fd2c402af414a304097e11851cab160b3273acf Mon Sep 17 00:00:00 2001 From: Pedro Date: Mon, 4 Dec 2023 11:05:45 +0100 Subject: [PATCH] Get rid of the DATA_QUEUE_TRANSMIT (there is still the MODEM_TRANSMIT_QUEUE) --- modem/frame_dispatcher.py | 16 +++------------- modem/modem.py | 6 +++--- modem/protocol_arq_session.py | 1 - modem/queues.py | 3 --- modem/server.py | 6 ++---- modem/service_manager.py | 6 ++++-- 6 files changed, 12 insertions(+), 26 deletions(-) diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index 0754c8d2..4078e47f 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -7,7 +7,6 @@ import threading import structlog from modem_frametypes import FRAME_TYPE as FR_TYPE import event_manager -from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, MODEM_TRANSMIT_QUEUE from data_frame_factory import DataFrameFactory #from deprecated_data_handler_broadcasts import BROADCAST @@ -49,7 +48,7 @@ class DISPATCHER(): FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"}, } - def __init__(self, config, event_queue, states, data_q_rx): + def __init__(self, config, event_queue, states, data_q_rx, modem_tx_q): self.log = structlog.get_logger("frame_dispatcher") self.log.info("loading frame dispatcher.....\n") @@ -60,8 +59,8 @@ class DISPATCHER(): self._initialize_handlers(config, event_queue, states) #self._initialize_dispatchers() - self.data_queue_transmit = DATA_QUEUE_TRANSMIT self.data_queue_received = data_q_rx + self.modem_transmit_queue = modem_tx_q self.arq_sessions = [] @@ -81,19 +80,10 @@ class DISPATCHER(): self.event_manager = event_manager.EventManager([event_queue]) - - def start(self): """Starts worker threads for transmit and receive operations.""" - threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start() threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start() - def worker_transmit(self) -> None: - """Dispatch incoming UI instructions for transmitting operations""" - while True: - command = self.data_queue_transmit.get() - command.run(self.event_queue, MODEM_TRANSMIT_QUEUE) - def worker_receive(self) -> None: """Queue received data for processing""" while True: @@ -122,7 +112,7 @@ class DISPATCHER(): self.config, self.states, self.event_manager, - MODEM_TRANSMIT_QUEUE, + self.modem_transmit_queue, self.arq_sessions) handler.handle(deconstructed_frame, snr, offset, freedv, bytes_per_frame) diff --git a/modem/modem.py b/modem/modem.py index 5bc4fadc..6d3f3867 100644 --- a/modem/modem.py +++ b/modem/modem.py @@ -22,7 +22,7 @@ import sounddevice as sd import structlog import tci import cw -from queues import MODEM_TRANSMIT_QUEUE, RIGCTLD_COMMAND_QUEUE +from queues import RIGCTLD_COMMAND_QUEUE import audio import event_manager from modem_frametypes import FRAME_TYPE @@ -89,7 +89,7 @@ class RF: # Make sure our resampler will work assert (self.AUDIO_SAMPLE_RATE_RX / self.MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48 # type: ignore - self.modem_transmit_queue = MODEM_TRANSMIT_QUEUE + self.modem_transmit_queue = queue.Queue() self.modem_received_queue = queue.Queue() self.audio_received_queue = queue.Queue() @@ -112,7 +112,7 @@ class RF: self.event_manager) self.beacon = beacon.Beacon(self.config, self.states, event_queue, - self.log, MODEM_TRANSMIT_QUEUE) + self.log, self.modem_transmit_queue) # -------------------------------------------------------------------------------------------------------- def tci_tx_callback(self) -> None: diff --git a/modem/protocol_arq_session.py b/modem/protocol_arq_session.py index d0d9dcda..a6f96b3b 100644 --- a/modem/protocol_arq_session.py +++ b/modem/protocol_arq_session.py @@ -2,7 +2,6 @@ import time import helpers from codec2 import FREEDV_MODE -from queues import MODEM_TRANSMIT_QUEUE from modem_frametypes import FRAME_TYPE as FR_TYPE from protocol_arq import ARQ diff --git a/modem/queues.py b/modem/queues.py index 7abc2f4e..933ce74c 100644 --- a/modem/queues.py +++ b/modem/queues.py @@ -6,9 +6,6 @@ import queue DATA_QUEUE_TRANSMIT = queue.Queue() DATA_QUEUE_RECEIVED = queue.Queue() -# Initialize FIFO queue to store received frames -MODEM_TRANSMIT_QUEUE = queue.Queue() - # Initialize FIFO queue to store received frames MESH_RECEIVED_QUEUE = queue.Queue() MESH_QUEUE_TRANSMIT = queue.Queue() diff --git a/modem/server.py b/modem/server.py index 52f32b2b..59f68802 100644 --- a/modem/server.py +++ b/modem/server.py @@ -18,8 +18,6 @@ import command_feq import command_test import command_arq_raw -from queues import DATA_QUEUE_TRANSMIT as tx_cmd_queue - app = Flask(__name__) CORS(app) CORS(app, resources={r"/*": {"origins": "*"}}) @@ -56,7 +54,7 @@ app.modem_service = queue.Queue() # start / stop modem service app.state_manager = state_manager.StateManager(app.state_queue) # start service manager -service_manager.SM(app) +app.service_manager = service_manager.SM(app) # start modem service app.modem_service.put("start") @@ -85,7 +83,7 @@ def validate(req, param, validator, isRequired = True): # Takes a transmit command and puts it in the transmit command queue def enqueue_tx_command(cmd_class, params = {}): command = cmd_class(app.config_manager.read(), app.logger, app.state_manager, app.modem_events, params) - tx_cmd_queue.put(command) + command.run(app.modem_events, app.service_manager.modem.modem_transmit_queue) app.logger.info(f"Command {command.get_name()} enqueued.") ## REST API diff --git a/modem/service_manager.py b/modem/service_manager.py index 72b4568f..bc534453 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -84,7 +84,8 @@ class SM: self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config, self.modem_events, self.states, - self.modem.data_queue_received) + self.modem.data_queue_received, + self.modem.modem_transmit_queue) self.frame_dispatcher.start() self.states.set("is_modem_running", True) @@ -106,4 +107,5 @@ class SM: self.config['AUDIO']['output_device']) self.log.info("tested audio devices", result=audio_test) - return audio_test \ No newline at end of file + return audio_test + \ No newline at end of file