diff --git a/modem/arq_session.py b/modem/arq_session.py index 20e8d2f5..c5b2720b 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -25,7 +25,7 @@ class ARQSession(): } - def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str): + def __init__(self, config: dict, modem, dxcall: str): self.logger = structlog.get_logger(type(self).__name__) self.config = config @@ -34,7 +34,7 @@ class ARQSession(): self.dxcall = dxcall self.dx_snr = [] - self.tx_frame_queue = tx_frame_queue + self.modem = modem self.speed_level = 0 self.frames_per_burst = 1 @@ -56,13 +56,7 @@ class ARQSession(): if mode in ['auto']: mode = self.get_mode_by_speed_level(self.speed_level) - modem_queue_item = { - 'mode': mode, - 'repeat': 1, - 'repeat_delay': 1, - 'frame': frame, - } - self.tx_frame_queue.put(modem_queue_item) + self.modem.transmit(mode, 1, 1, frame) def set_state(self, state): self.log(f"{type(self).__name__} state change from {self.state} to {state}") diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 25870480..4d928126 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -39,8 +39,8 @@ class ARQSessionIRS(arq_session.ARQSession): }, } - def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int): - super().__init__(config, tx_frame_queue, dxcall) + def __init__(self, config: dict, modem, dxcall: str, session_id: int): + super().__init__(config, modem, dxcall) self.id = session_id self.dxcall = dxcall diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 7c0fb973..d8f0aae1 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -35,8 +35,8 @@ class ARQSessionISS(arq_session.ARQSession): }, } - def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, data: bytearray): - super().__init__(config, tx_frame_queue, dxcall) + def __init__(self, config: dict, modem, dxcall: str, data: bytearray): + super().__init__(config, modem, dxcall) self.data = data self.data_crc = '' diff --git a/modem/command.py b/modem/command.py index e846346c..337f107d 100644 --- a/modem/command.py +++ b/modem/command.py @@ -44,12 +44,11 @@ class TxCommand(): 'frame': frame, } - def transmit(self, tx_frame_queue): + def transmit(self, modem): frame = self.build_frame() - tx_queue_item = self.make_modem_queue_item(self.get_tx_mode(), 1, 0, frame) - tx_frame_queue.put(tx_queue_item) + modem.transmit(self.get_tx_mode(), 1, 0, frame) - def run(self, event_queue: queue.Queue, tx_frame_queue: queue.Queue): + def run(self, event_queue: queue.Queue, modem): self.emit_event(event_queue) self.logger.info(self.log_message()) - self.transmit(tx_frame_queue) + self.transmit(modem) diff --git a/modem/command_arq_raw.py b/modem/command_arq_raw.py index 09d14373..98550546 100644 --- a/modem/command_arq_raw.py +++ b/modem/command_arq_raw.py @@ -13,11 +13,11 @@ class ARQRawCommand(TxCommand): self.data = base64.b64decode(apiParams['data']) - def run(self, event_queue: Queue, tx_frame_queue: Queue): + def run(self, event_queue: Queue, modem): self.emit_event(event_queue) self.logger.info(self.log_message()) - iss = ARQSessionISS(self.config, tx_frame_queue, self.dxcall, self.data) + iss = ARQSessionISS(self.config, modem, self.dxcall, self.data) self.state_manager.register_arq_iss_session(iss) iss.start() return iss diff --git a/modem/command_beacon.py b/modem/command_beacon.py index 37eb638d..bb902c92 100644 --- a/modem/command_beacon.py +++ b/modem/command_beacon.py @@ -5,8 +5,8 @@ class BeaconCommand(TxCommand): def build_frame(self): return self.frame_factory.build_beacon() - def transmit(self, tx_frame_queue): - super().transmit(tx_frame_queue) + def transmit(self, modem): + super().transmit(modem) if self.config['MODEM']['enable_morse_identifier']: mycall = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}" - tx_frame_queue.put(["morse", 1, 0, mycall]) + modem.transmit_morse("morse", 1, 0, mycall) diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index 00712398..a93aa819 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -39,7 +39,7 @@ class DISPATCHER(): FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"}, } - def __init__(self, config, event_queue, states, data_q_rx, modem_tx_q): + def __init__(self, config, event_queue, states, modem): self.log = structlog.get_logger("frame_dispatcher") self.log.info("loading frame dispatcher.....\n") @@ -49,8 +49,8 @@ class DISPATCHER(): self._initialize_handlers(config, event_queue, states) - self.data_queue_received = data_q_rx - self.modem_transmit_queue = modem_tx_q + self.modem = modem + self.data_queue_received = modem.data_queue_received self.arq_sessions = [] @@ -92,7 +92,7 @@ class DISPATCHER(): self.config, self.states, self.event_manager, - self.modem_transmit_queue) + self.modem) handler.handle(deconstructed_frame, snr, frequency_offset, freedv, bytes_per_frame) diff --git a/modem/frame_handler.py b/modem/frame_handler.py index 077d8dfe..b8547b4c 100644 --- a/modem/frame_handler.py +++ b/modem/frame_handler.py @@ -9,13 +9,13 @@ from codec2 import FREEDV_MODE class FrameHandler(): def __init__(self, name: str, config, states: StateManager, event_manager: EventManager, - tx_frame_queue: Queue) -> None: + modem) -> None: self.name = name self.config = config self.states = states self.event_manager = event_manager - self.tx_frame_queue = tx_frame_queue + self.modem = modem self.logger = structlog.get_logger("Frame Handler") self.details = { @@ -86,14 +86,6 @@ class FrameHandler(): event_data = self.make_event() self.event_manager.broadcast(event_data) - def make_modem_queue_item(self, mode, repeat, repeat_delay, frame): - return { - 'mode': self.get_tx_mode(), - 'repeat': repeat, - 'repeat_delay': repeat_delay, - 'frame': frame, - } - def get_tx_mode(self): return ( FREEDV_MODE.fsk_ldpc_0.value @@ -102,8 +94,7 @@ class FrameHandler(): ) def transmit(self, frame): - tx_queue_item = self.make_modem_queue_item(self.get_tx_mode(), 1, 0, frame) - self.tx_frame_queue.put(tx_queue_item) + self.modem.transmit(self.get_tx_mode(), 1, 0, frame) def follow_protocol(self): pass diff --git a/modem/frame_handler_arq_session.py b/modem/frame_handler_arq_session.py index f257ad2b..51d9926f 100644 --- a/modem/frame_handler_arq_session.py +++ b/modem/frame_handler_arq_session.py @@ -22,8 +22,8 @@ class ARQFrameHandler(frame_handler.FrameHandler): # Normal case when receiving a SESSION_OPEN for the first time else: - session = ARQSessionIRS(self.config, - self.tx_frame_queue, + session = ARQSessionIRS(self.config, + self.modem, frame['origin'], session_id) self.states.register_arq_irs_session(session) diff --git a/modem/modem.py b/modem/modem.py index 9be64217..ffae9dbe 100644 --- a/modem/modem.py +++ b/modem/modem.py @@ -87,7 +87,6 @@ class RF: self.modem_received_queue = queue.Queue() self.audio_received_queue = queue.Queue() - self.audio_transmit_queue = queue.Queue() self.data_queue_received = queue.Queue() @@ -105,7 +104,6 @@ class RF: self.beacon = beacon.Beacon(self.config, self.states, event_queue, self.log, self.modem_transmit_queue) - # -------------------------------------------------------------------------------------------------------- def tci_tx_callback(self, audio_48k) -> None: self.radio.set_ptt(True) self.event_manager.send_ptt_change(True) @@ -212,7 +210,7 @@ class RF: self.stream = Object() # lets init TCI module - self.tci_module = tci.TCICtrl(self.audio_received_queue, self.audio_transmit_queue) + self.tci_module = tci.TCICtrl(self.audio_received_queue) tci_rx_callback_thread = threading.Thread( target=self.tci_rx_callback, @@ -230,7 +228,35 @@ class RF: ) tci_tx_callback_thread.start() - # -------------------------------------------------------------------- + def audio_auto_tune(self): + # enable / disable AUDIO TUNE Feature / ALC correction + if self.enable_audio_auto_tune: + if self.radio_alc == 0.0: + self.tx_audio_level = self.tx_audio_level + 20 + elif 0.0 < self.radio_alc <= 0.1: + print("0.0 < self.radio_alc <= 0.1") + self.tx_audio_level = self.tx_audio_level + 2 + self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), + alc_level=str(self.radio_alc)) + elif 0.1 < self.radio_alc < 0.2: + print("0.1 < self.radio_alc < 0.2") + self.tx_audio_level = self.tx_audio_level + self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), + alc_level=str(self.radio_alc)) + elif 0.2 < self.radio_alc < 0.99: + print("0.2 < self.radio_alc < 0.99") + self.tx_audio_level = self.tx_audio_level - 20 + self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), + alc_level=str(self.radio_alc)) + elif 1.0 >= self.radio_alc: + print("1.0 >= self.radio_alc") + self.tx_audio_level = self.tx_audio_level - 40 + self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), + alc_level=str(self.radio_alc)) + else: + self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), + alc_level=str(self.radio_alc)) + def transmit( self, mode, repeats: int, repeat_delay: int, frames: bytearray ) -> bool: @@ -310,6 +336,7 @@ class RF: "[MDM] TRANSMIT", mode=self.MODE, payload=payload_bytes_per_frame, delay=self.tx_delay ) + if not isinstance(frames, list): frames = [frames] for _ in range(repeats): # Create modulation for all frames in the list @@ -419,42 +446,12 @@ class RF: # After processing, set the locking state back to true to be prepared for next transmission self.mod_out_locked = True - self.modem_transmit_queue.task_done() self.states.setTransmitting(False) end_of_transmission = time.time() transmission_time = end_of_transmission - start_of_transmission self.log.debug("[MDM] ON AIR TIME", time=transmission_time) - def audio_auto_tune(self): - # enable / disable AUDIO TUNE Feature / ALC correction - if self.enable_audio_auto_tune: - if self.radio_alc == 0.0: - self.tx_audio_level = self.tx_audio_level + 20 - elif 0.0 < self.radio_alc <= 0.1: - print("0.0 < self.radio_alc <= 0.1") - self.tx_audio_level = self.tx_audio_level + 2 - self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), - alc_level=str(self.radio_alc)) - elif 0.1 < self.radio_alc < 0.2: - print("0.1 < self.radio_alc < 0.2") - self.tx_audio_level = self.tx_audio_level - self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), - alc_level=str(self.radio_alc)) - elif 0.2 < self.radio_alc < 0.99: - print("0.2 < self.radio_alc < 0.99") - self.tx_audio_level = self.tx_audio_level - 20 - self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), - alc_level=str(self.radio_alc)) - elif 1.0 >= self.radio_alc: - print("1.0 >= self.radio_alc") - self.tx_audio_level = self.tx_audio_level - 40 - self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), - alc_level=str(self.radio_alc)) - else: - self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level), - alc_level=str(self.radio_alc)) - def transmit_morse(self, repeats, repeat_delay, frames): self.states.waitForTransmission() self.states.setTransmitting(True) @@ -528,11 +525,6 @@ class RF: ) worker_received.start() - worker_transmit = threading.Thread( - target=self.worker_transmit, name="WORKER_THREAD", daemon=True - ) - worker_transmit.start() - # Low level modem audio transmit def transmit_audio(self, audio_48k) -> None: self.radio.set_ptt(True) @@ -545,23 +537,6 @@ class RF: sd.play(audio_48k, blocking=True) return - def worker_transmit(self) -> None: - """Worker for FIFO queue for processing frames to be transmitted""" - while True: - # print queue size for debugging purposes - # TODO Lets check why we have several frames in our transmit queue which causes sometimes a double transmission - # we could do a cleanup after a transmission so theres no reason sending twice - queuesize = self.modem_transmit_queue.qsize() - self.log.debug("[MDM] self.modem_transmit_queue", qsize=queuesize) - tx = self.modem_transmit_queue.get() - print(tx) - # TODO Why we is this taking an array instead of a single frame? - if tx['mode'] in ["morse"]: - self.transmit_morse(tx['repeat'], tx['repeat_delay'], [tx['frame']]) - else: - self.transmit(tx['mode'], tx['repeat'], tx['repeat_delay'], [tx['frame']]) - # self.modem_transmit_queue.task_done() - def init_rig_control(self): # Check how we want to control the radio if self.radiocontrol == "rigctld": diff --git a/modem/queues.py b/modem/queues.py index 0c8631d8..465fcc8e 100644 --- a/modem/queues.py +++ b/modem/queues.py @@ -8,9 +8,5 @@ MESH_RECEIVED_QUEUE = queue.Queue() MESH_QUEUE_TRANSMIT = queue.Queue() MESH_SIGNALLING_TABLE = [] -# Initialize FIFO queue to finally store received data -# TODO Fix rx_buffer_size -RX_BUFFER = queue.Queue(maxsize=16) - # Commands we want to send to rigctld RIGCTLD_COMMAND_QUEUE = queue.Queue() \ No newline at end of file diff --git a/modem/server.py b/modem/server.py index ca1111c9..0caf5a18 100644 --- a/modem/server.py +++ b/modem/server.py @@ -83,8 +83,8 @@ def validate(req, param, validator, isRequired = True): # Takes a transmit command and puts it in the transmit command queue def enqueue_tx_command(cmd_class, params = {}): command = cmd_class(app.config_manager.read(), app.state_manager, app.modem_events, params) - command.run(app.modem_events, app.service_manager.modem.modem_transmit_queue) - app.logger.info(f"Command {command.get_name()} enqueued.") + app.logger.info(f"Command {command.get_name()} running...") + command.run(app.modem_events, app.service_manager.modem) ## REST API @app.route('/', methods=['GET']) diff --git a/modem/service_manager.py b/modem/service_manager.py index bc534453..02485543 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -84,8 +84,7 @@ class SM: self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config, self.modem_events, self.states, - self.modem.data_queue_received, - self.modem.modem_transmit_queue) + self.modem) self.frame_dispatcher.start() self.states.set("is_modem_running", True) diff --git a/modem/tci.py b/modem/tci.py index 9809bff9..5e088feb 100644 --- a/modem/tci.py +++ b/modem/tci.py @@ -7,12 +7,11 @@ import websocket import time class TCICtrl: - def __init__(self, audio_rx_q, audio_tx_q, hostname='127.0.0.1', port=50001): + def __init__(self, audio_rx_q, hostname='127.0.0.1', port=50001): # websocket.enableTrace(True) self.log = structlog.get_logger("TCI") self.audio_received_queue = audio_rx_q - self.audio_transmit_queue = audio_tx_q self.hostname = str(hostname) self.port = str(port) diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index 027effce..8717b1d5 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -13,6 +13,13 @@ from frame_dispatcher import DISPATCHER import random import structlog +class TestModem: + def __init__(self): + self.data_queue_received = queue.Queue() + + def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: + self.data_queue_received.put(frames) + class TestARQSession(unittest.TestCase): @classmethod @@ -23,24 +30,22 @@ class TestARQSession(unittest.TestCase): cls.logger = structlog.get_logger("TESTS") # ISS - cls.iss_modem_transmit_queue = queue.Queue() + cls.iss_modem = TestModem() cls.iss_state_manager = StateManager(queue.Queue()) cls.iss_event_queue = queue.Queue() cls.iss_frame_dispatcher = DISPATCHER(cls.config, cls.iss_event_queue, cls.iss_state_manager, - queue.Queue(), - cls.iss_modem_transmit_queue) + cls.iss_modem) # IRS - cls.irs_modem_transmit_queue = queue.Queue() + cls.irs_modem = TestModem() cls.irs_state_manager = StateManager(queue.Queue()) cls.irs_event_queue = queue.Queue() cls.irs_frame_dispatcher = DISPATCHER(cls.config, cls.irs_event_queue, cls.irs_state_manager, - queue.Queue(), - cls.irs_modem_transmit_queue) + cls.irs_modem) # Frame loss probability in % cls.loss_probability = 50 @@ -48,8 +53,7 @@ class TestARQSession(unittest.TestCase): def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER): while True: - transmission_item = modem_transmit_queue.get() - frame_bytes = bytes(transmission_item['frame']) + frame_bytes = modem_transmit_queue.get() if random.randint(0, 100) < self.loss_probability: self.logger.info(f"[{threading.current_thread().name}] Frame lost...") continue @@ -58,13 +62,13 @@ class TestARQSession(unittest.TestCase): def establishChannels(self): self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, - args=[self.iss_modem_transmit_queue, + args=[self.iss_modem.data_queue_received, self.irs_frame_dispatcher], name = "ISS to IRS channel") self.iss_to_irs_channel.start() self.irs_to_iss_channel = threading.Thread(target=self.channelWorker, - args=[self.irs_modem_transmit_queue, + args=[self.irs_modem.data_queue_received, self.iss_frame_dispatcher], name = "IRS to ISS channel") self.irs_to_iss_channel.start() @@ -80,7 +84,7 @@ class TestARQSession(unittest.TestCase): 'data': base64.b64encode(bytes("Hello world!", encoding="utf-8")), } cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) - cmd.run(self.iss_event_queue, self.iss_modem_transmit_queue) + cmd.run(self.iss_event_queue, self.iss_modem) if __name__ == '__main__': unittest.main()