From a5ebf2d0fc605016c7b58b6fd824df4671eecaa7 Mon Sep 17 00:00:00 2001 From: Pedro Date: Wed, 22 Nov 2023 21:54:50 +0100 Subject: [PATCH] Improve threading.Event() usage on transmission --- modem/data_handler.py | 6 ------ modem/data_handler_arq.py | 4 ++-- modem/data_handler_broadcasts.py | 6 +++--- modem/mesh.py | 11 ----------- modem/modem.py | 20 +++++++++++--------- modem/state_manager.py | 25 +++++++++++++++++++++---- 6 files changed, 37 insertions(+), 35 deletions(-) diff --git a/modem/data_handler.py b/modem/data_handler.py index 96a8c31b..4e47eb75 100644 --- a/modem/data_handler.py +++ b/modem/data_handler.py @@ -293,9 +293,3 @@ class DATA: frame=bytes_out[:-2].hex(), frame_type=FR_TYPE(int.from_bytes(bytes_out[:1], byteorder="big")).name, ) - - - - - - diff --git a/modem/data_handler_arq.py b/modem/data_handler_arq.py index 5c7e4733..c5e8126c 100644 --- a/modem/data_handler_arq.py +++ b/modem/data_handler_arq.py @@ -573,7 +573,7 @@ class ARQ: self.speed_level] * frames_left) + self.duration_sig0_frame + self.channel_busy_timeout + 1 timeout_percent = 100 - (time_left / waiting_time * 100) # timeout_percent = 0 - if timeout_percent >= 75 and not self.states.is_codec2_traffic and not self.states.is_transmitting: + if timeout_percent >= 75 and not self.states.is_codec2_traffic and not self.states.isTransmitting(): override = True else: override = False @@ -585,7 +585,7 @@ class ARQ: # better wait some more time because data might be important for us # reason for this situation can be delays on IRS and ISS, maybe because both had a busy channel condition. # Nevertheless, we need to keep timeouts short for efficiency - if timeout <= time.time() or modem_error_state and not self.states.is_codec2_traffic and not self.states.is_transmitting or override: + if timeout <= time.time() or modem_error_state and not self.states.is_codec2_traffic and not self.states.isTransmitting() or override: self.log.warning( "[Modem] Burst decoding error or timeout", attempt=self.n_retries_per_burst, diff --git a/modem/data_handler_broadcasts.py b/modem/data_handler_broadcasts.py index 8cb13c79..1de28222 100644 --- a/modem/data_handler_broadcasts.py +++ b/modem/data_handler_broadcasts.py @@ -102,7 +102,7 @@ class BROADCAST: # send burst only if channel not busy - but without waiting # otherwise burst will be dropped - if not self.states.channel_busy and not self.states.is_transmitting: + if not self.states.channel_busy and not self.states.isTransmitting(): self.enqueue_frame_for_tx( frame_to_tx=[fec_frame], c2_mode=codec2.FREEDV_MODE["sig0"].value ) @@ -415,5 +415,5 @@ class BROADCAST: MODEM_TRANSMIT_QUEUE.put([c2_mode, copies, repeat_delay, frame_to_tx]) # Wait while transmitting - while self.states.is_transmitting: - threading.Event().wait(0.01) \ No newline at end of file + # I don't think this is necessary + self.states.waitForTransmission() diff --git a/modem/mesh.py b/modem/mesh.py index 078e5f30..b1c2e94d 100644 --- a/modem/mesh.py +++ b/modem/mesh.py @@ -228,14 +228,10 @@ class MeshRouter(): #print(len(_)) frame_list.append(mesh_broadcast_frame_header + _) - self.states.set("is_transmitting", True) c2_mode = FREEDV_MODE.datac4.value self.log.info("[MESH] broadcasting routing table", frame_list=frame_list, frames=len(split_result)) modem.MODEM_TRANSMIT_QUEUE.put([c2_mode, 1, 0, frame_list]) - # Wait while transmitting - while self.states.is_transmitting: - threading.Event().wait(0.01) except Exception as e: self.log.warning("[MESH] broadcasting routing table", e=e) @@ -548,15 +544,8 @@ class MeshRouter(): self.log.debug("[Modem] enqueue_frame_for_tx", c2_mode=FREEDV_MODE(c2_mode).name, data=frame_to_tx, type=frame_type) - # Set the TRANSMITTING flag before adding an object to the transmit queue - # TODO This is not that nice, we could improve this somehow - self.states.set("is_transmitting", True) modem.MODEM_TRANSMIT_QUEUE.put([c2_mode, copies, repeat_delay, frame_to_tx]) - # Wait while transmitting - while self.states.is_transmitting: - threading.Event().wait(0.01) - def transmit_mesh_signalling_ping(self, destination, origin): diff --git a/modem/modem.py b/modem/modem.py index ca6b836b..86241011 100644 --- a/modem/modem.py +++ b/modem/modem.py @@ -84,8 +84,8 @@ class RF: self.rigctld_ip = config['RIGCTLD']['ip'] self.rigctld_port = config['RIGCTLD']['port'] + self.states.setTransmitting(False) - self.states.set("is_transmitting", False) self.ptt_state = False self.radio_alc = 0.0 @@ -469,7 +469,9 @@ class RF: else: return False - self.states.set("is_transmitting", True) + # Wait for some other thread that might be transmitting + self.states.waitForTransmission() + self.states.setTransmitting(True) # if we're transmitting FreeDATA signals, reset channel busy state self.states.set("channel_busy", False) @@ -628,7 +630,7 @@ class RF: self.mod_out_locked = True self.modem_transmit_queue.task_done() - self.states.set("is_transmitting", False) + self.states.setTransmitting(False) end_of_transmission = time.time() transmission_time = end_of_transmission - start_of_transmission @@ -664,7 +666,8 @@ class RF: alc_level=str(self.radio_alc)) def transmit_morse(self, repeats, repeat_delay, frames): - self.states.set("is_transmitting", True) + self.states.waitForTransmission() + self.states.setTransmitting(True) # if we're transmitting FreeDATA signals, reset channel busy state self.states.set("channel_busy", False) self.log.debug( @@ -711,8 +714,7 @@ class RF: self.mod_out_locked = True self.modem_transmit_queue.task_done() - self.states.set("is_transmitting", False) - threading.Event().set() + self.states.setTransmitting(False) end_of_transmission = time.time() transmission_time = end_of_transmission - start_of_transmission @@ -1300,7 +1302,7 @@ class RF: threading.Event().wait(0.1) self.states.set("radio_bandwidth", self.radio.get_bandwidth()) threading.Event().wait(0.1) - if self.states.is_transmitting: + if self.states.isTransmitting(): self.radio_alc = self.radio.get_alc() threading.Event().wait(0.1) self.states.set("radio_rf_power", self.radio.get_level()) @@ -1341,7 +1343,7 @@ class RF: # Therefore we are setting it to 100 so it will be highlighted # Have to do this when we are not transmitting so our # own sending data will not affect this too much - if not self.states.is_transmitting: + if not self.states.isTransmitting(): dfft[dfft > avg + 15] = 100 # Calculate audio dbfs @@ -1401,7 +1403,7 @@ class RF: # Check for signals higher than average by checking for "100" # If we have a signal, increment our channel_busy delay counter # so we have a smoother state toggle - if np.sum(slotdfft[slotdfft > avg + 15]) >= 200 and not self.states.is_transmitting: + if np.sum(slotdfft[slotdfft > avg + 15]) >= 200 and not self.states.isTransmitting(): addDelay=True self.states.channel_busy_slot[slot] = True else: diff --git a/modem/state_manager.py b/modem/state_manager.py index c84b1ab0..da030494 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -1,5 +1,6 @@ import time import ujson as json +import threading class StateManager: def __init__(self, statequeue): @@ -19,7 +20,11 @@ class StateManager: self.is_beacon_running = False self.is_arq_state = False self.is_arq_session = False - self.is_transmitting = False + + # If true, any wait() call is blocking + self.transmitting_event = threading.Event() + self.setTransmitting(False) + self.audio_dbfs = 0 self.dxcallsign: bytes = b"ZZ9YY-0" self.dxgrid: bytes = b"------" @@ -52,7 +57,6 @@ class StateManager: def sendStateUpdate (self): self.statequeue.put(self.newstate) - def set(self, key, value): setattr(self, key, value) @@ -63,7 +67,6 @@ class StateManager: self.newstate = new_state self.sendStateUpdate() - def getAsJSON(self, isChangedState): msgtype = "state-change" if (not isChangedState): @@ -77,4 +80,18 @@ class StateManager: "is_beacon_running": self.is_beacon_running, "radio_status": self.radio_status, "radio_frequency": self.radio_frequency, - }) \ No newline at end of file + }) + + # .wait() blocks until the event is set + def isTransmitting(self): + return not self.transmitting_event.is_set() + + # .wait() blocks until the event is set + def setTransmitting(self, transmitting: bool): + if transmitting: + self.transmitting_event.clear() + else: + self.transmitting_event.set() + + def waitForTransmission(self): + self.transmitting_event.wait()