Eliminate modem TX queues and change transmit() to a blocking call

This commit is contained in:
Pedro 2023-12-16 00:51:57 +01:00
parent 1640f6c66e
commit 30996c03b6
15 changed files with 75 additions and 118 deletions

View file

@ -25,7 +25,7 @@ class ARQSession():
}
def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str):
def __init__(self, config: dict, modem, dxcall: str):
self.logger = structlog.get_logger(type(self).__name__)
self.config = config
@ -34,7 +34,7 @@ class ARQSession():
self.dxcall = dxcall
self.dx_snr = []
self.tx_frame_queue = tx_frame_queue
self.modem = modem
self.speed_level = 0
self.frames_per_burst = 1
@ -56,13 +56,7 @@ class ARQSession():
if mode in ['auto']:
mode = self.get_mode_by_speed_level(self.speed_level)
modem_queue_item = {
'mode': mode,
'repeat': 1,
'repeat_delay': 1,
'frame': frame,
}
self.tx_frame_queue.put(modem_queue_item)
self.modem.transmit(mode, 1, 1, frame)
def set_state(self, state):
self.log(f"{type(self).__name__} state change from {self.state} to {state}")

View file

@ -39,8 +39,8 @@ class ARQSessionIRS(arq_session.ARQSession):
},
}
def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int):
super().__init__(config, tx_frame_queue, dxcall)
def __init__(self, config: dict, modem, dxcall: str, session_id: int):
super().__init__(config, modem, dxcall)
self.id = session_id
self.dxcall = dxcall

View file

@ -35,8 +35,8 @@ class ARQSessionISS(arq_session.ARQSession):
},
}
def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, data: bytearray):
super().__init__(config, tx_frame_queue, dxcall)
def __init__(self, config: dict, modem, dxcall: str, data: bytearray):
super().__init__(config, modem, dxcall)
self.data = data
self.data_crc = ''

View file

@ -44,12 +44,11 @@ class TxCommand():
'frame': frame,
}
def transmit(self, tx_frame_queue):
def transmit(self, modem):
frame = self.build_frame()
tx_queue_item = self.make_modem_queue_item(self.get_tx_mode(), 1, 0, frame)
tx_frame_queue.put(tx_queue_item)
modem.transmit(self.get_tx_mode(), 1, 0, frame)
def run(self, event_queue: queue.Queue, tx_frame_queue: queue.Queue):
def run(self, event_queue: queue.Queue, modem):
self.emit_event(event_queue)
self.logger.info(self.log_message())
self.transmit(tx_frame_queue)
self.transmit(modem)

View file

@ -13,11 +13,11 @@ class ARQRawCommand(TxCommand):
self.data = base64.b64decode(apiParams['data'])
def run(self, event_queue: Queue, tx_frame_queue: Queue):
def run(self, event_queue: Queue, modem):
self.emit_event(event_queue)
self.logger.info(self.log_message())
iss = ARQSessionISS(self.config, tx_frame_queue, self.dxcall, self.data)
iss = ARQSessionISS(self.config, modem, self.dxcall, self.data)
self.state_manager.register_arq_iss_session(iss)
iss.start()
return iss

View file

@ -5,8 +5,8 @@ class BeaconCommand(TxCommand):
def build_frame(self):
return self.frame_factory.build_beacon()
def transmit(self, tx_frame_queue):
super().transmit(tx_frame_queue)
def transmit(self, modem):
super().transmit(modem)
if self.config['MODEM']['enable_morse_identifier']:
mycall = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}"
tx_frame_queue.put(["morse", 1, 0, mycall])
modem.transmit_morse("morse", 1, 0, mycall)

View file

@ -39,7 +39,7 @@ class DISPATCHER():
FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"},
}
def __init__(self, config, event_queue, states, data_q_rx, modem_tx_q):
def __init__(self, config, event_queue, states, modem):
self.log = structlog.get_logger("frame_dispatcher")
self.log.info("loading frame dispatcher.....\n")
@ -49,8 +49,8 @@ class DISPATCHER():
self._initialize_handlers(config, event_queue, states)
self.data_queue_received = data_q_rx
self.modem_transmit_queue = modem_tx_q
self.modem = modem
self.data_queue_received = modem.data_queue_received
self.arq_sessions = []
@ -92,7 +92,7 @@ class DISPATCHER():
self.config,
self.states,
self.event_manager,
self.modem_transmit_queue)
self.modem)
handler.handle(deconstructed_frame, snr, frequency_offset, freedv, bytes_per_frame)

View file

@ -9,13 +9,13 @@ from codec2 import FREEDV_MODE
class FrameHandler():
def __init__(self, name: str, config, states: StateManager, event_manager: EventManager,
tx_frame_queue: Queue) -> None:
modem) -> None:
self.name = name
self.config = config
self.states = states
self.event_manager = event_manager
self.tx_frame_queue = tx_frame_queue
self.modem = modem
self.logger = structlog.get_logger("Frame Handler")
self.details = {
@ -86,14 +86,6 @@ class FrameHandler():
event_data = self.make_event()
self.event_manager.broadcast(event_data)
def make_modem_queue_item(self, mode, repeat, repeat_delay, frame):
return {
'mode': self.get_tx_mode(),
'repeat': repeat,
'repeat_delay': repeat_delay,
'frame': frame,
}
def get_tx_mode(self):
return (
FREEDV_MODE.fsk_ldpc_0.value
@ -102,8 +94,7 @@ class FrameHandler():
)
def transmit(self, frame):
tx_queue_item = self.make_modem_queue_item(self.get_tx_mode(), 1, 0, frame)
self.tx_frame_queue.put(tx_queue_item)
self.modem.transmit(self.get_tx_mode(), 1, 0, frame)
def follow_protocol(self):
pass

View file

@ -22,8 +22,8 @@ class ARQFrameHandler(frame_handler.FrameHandler):
# Normal case when receiving a SESSION_OPEN for the first time
else:
session = ARQSessionIRS(self.config,
self.tx_frame_queue,
session = ARQSessionIRS(self.config,
self.modem,
frame['origin'],
session_id)
self.states.register_arq_irs_session(session)

View file

@ -87,7 +87,6 @@ class RF:
self.modem_received_queue = queue.Queue()
self.audio_received_queue = queue.Queue()
self.audio_transmit_queue = queue.Queue()
self.data_queue_received = queue.Queue()
@ -105,7 +104,6 @@ class RF:
self.beacon = beacon.Beacon(self.config, self.states, event_queue,
self.log, self.modem_transmit_queue)
# --------------------------------------------------------------------------------------------------------
def tci_tx_callback(self, audio_48k) -> None:
self.radio.set_ptt(True)
self.event_manager.send_ptt_change(True)
@ -212,7 +210,7 @@ class RF:
self.stream = Object()
# lets init TCI module
self.tci_module = tci.TCICtrl(self.audio_received_queue, self.audio_transmit_queue)
self.tci_module = tci.TCICtrl(self.audio_received_queue)
tci_rx_callback_thread = threading.Thread(
target=self.tci_rx_callback,
@ -230,7 +228,35 @@ class RF:
)
tci_tx_callback_thread.start()
# --------------------------------------------------------------------
def audio_auto_tune(self):
# enable / disable AUDIO TUNE Feature / ALC correction
if self.enable_audio_auto_tune:
if self.radio_alc == 0.0:
self.tx_audio_level = self.tx_audio_level + 20
elif 0.0 < self.radio_alc <= 0.1:
print("0.0 < self.radio_alc <= 0.1")
self.tx_audio_level = self.tx_audio_level + 2
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 0.1 < self.radio_alc < 0.2:
print("0.1 < self.radio_alc < 0.2")
self.tx_audio_level = self.tx_audio_level
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 0.2 < self.radio_alc < 0.99:
print("0.2 < self.radio_alc < 0.99")
self.tx_audio_level = self.tx_audio_level - 20
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 1.0 >= self.radio_alc:
print("1.0 >= self.radio_alc")
self.tx_audio_level = self.tx_audio_level - 40
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
else:
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
def transmit(
self, mode, repeats: int, repeat_delay: int, frames: bytearray
) -> bool:
@ -310,6 +336,7 @@ class RF:
"[MDM] TRANSMIT", mode=self.MODE, payload=payload_bytes_per_frame, delay=self.tx_delay
)
if not isinstance(frames, list): frames = [frames]
for _ in range(repeats):
# Create modulation for all frames in the list
@ -419,42 +446,12 @@ class RF:
# After processing, set the locking state back to true to be prepared for next transmission
self.mod_out_locked = True
self.modem_transmit_queue.task_done()
self.states.setTransmitting(False)
end_of_transmission = time.time()
transmission_time = end_of_transmission - start_of_transmission
self.log.debug("[MDM] ON AIR TIME", time=transmission_time)
def audio_auto_tune(self):
# enable / disable AUDIO TUNE Feature / ALC correction
if self.enable_audio_auto_tune:
if self.radio_alc == 0.0:
self.tx_audio_level = self.tx_audio_level + 20
elif 0.0 < self.radio_alc <= 0.1:
print("0.0 < self.radio_alc <= 0.1")
self.tx_audio_level = self.tx_audio_level + 2
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 0.1 < self.radio_alc < 0.2:
print("0.1 < self.radio_alc < 0.2")
self.tx_audio_level = self.tx_audio_level
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 0.2 < self.radio_alc < 0.99:
print("0.2 < self.radio_alc < 0.99")
self.tx_audio_level = self.tx_audio_level - 20
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
elif 1.0 >= self.radio_alc:
print("1.0 >= self.radio_alc")
self.tx_audio_level = self.tx_audio_level - 40
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
else:
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(self.tx_audio_level),
alc_level=str(self.radio_alc))
def transmit_morse(self, repeats, repeat_delay, frames):
self.states.waitForTransmission()
self.states.setTransmitting(True)
@ -528,11 +525,6 @@ class RF:
)
worker_received.start()
worker_transmit = threading.Thread(
target=self.worker_transmit, name="WORKER_THREAD", daemon=True
)
worker_transmit.start()
# Low level modem audio transmit
def transmit_audio(self, audio_48k) -> None:
self.radio.set_ptt(True)
@ -545,23 +537,6 @@ class RF:
sd.play(audio_48k, blocking=True)
return
def worker_transmit(self) -> None:
"""Worker for FIFO queue for processing frames to be transmitted"""
while True:
# print queue size for debugging purposes
# TODO Lets check why we have several frames in our transmit queue which causes sometimes a double transmission
# we could do a cleanup after a transmission so theres no reason sending twice
queuesize = self.modem_transmit_queue.qsize()
self.log.debug("[MDM] self.modem_transmit_queue", qsize=queuesize)
tx = self.modem_transmit_queue.get()
print(tx)
# TODO Why we is this taking an array instead of a single frame?
if tx['mode'] in ["morse"]:
self.transmit_morse(tx['repeat'], tx['repeat_delay'], [tx['frame']])
else:
self.transmit(tx['mode'], tx['repeat'], tx['repeat_delay'], [tx['frame']])
# self.modem_transmit_queue.task_done()
def init_rig_control(self):
# Check how we want to control the radio
if self.radiocontrol == "rigctld":

View file

@ -8,9 +8,5 @@ MESH_RECEIVED_QUEUE = queue.Queue()
MESH_QUEUE_TRANSMIT = queue.Queue()
MESH_SIGNALLING_TABLE = []
# Initialize FIFO queue to finally store received data
# TODO Fix rx_buffer_size
RX_BUFFER = queue.Queue(maxsize=16)
# Commands we want to send to rigctld
RIGCTLD_COMMAND_QUEUE = queue.Queue()

View file

@ -83,8 +83,8 @@ def validate(req, param, validator, isRequired = True):
# Takes a transmit command and puts it in the transmit command queue
def enqueue_tx_command(cmd_class, params = {}):
command = cmd_class(app.config_manager.read(), app.state_manager, app.modem_events, params)
command.run(app.modem_events, app.service_manager.modem.modem_transmit_queue)
app.logger.info(f"Command {command.get_name()} enqueued.")
app.logger.info(f"Command {command.get_name()} running...")
command.run(app.modem_events, app.service_manager.modem)
## REST API
@app.route('/', methods=['GET'])

View file

@ -84,8 +84,7 @@ class SM:
self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config,
self.modem_events,
self.states,
self.modem.data_queue_received,
self.modem.modem_transmit_queue)
self.modem)
self.frame_dispatcher.start()
self.states.set("is_modem_running", True)

View file

@ -7,12 +7,11 @@ import websocket
import time
class TCICtrl:
def __init__(self, audio_rx_q, audio_tx_q, hostname='127.0.0.1', port=50001):
def __init__(self, audio_rx_q, hostname='127.0.0.1', port=50001):
# websocket.enableTrace(True)
self.log = structlog.get_logger("TCI")
self.audio_received_queue = audio_rx_q
self.audio_transmit_queue = audio_tx_q
self.hostname = str(hostname)
self.port = str(port)

View file

@ -13,6 +13,13 @@ from frame_dispatcher import DISPATCHER
import random
import structlog
class TestModem:
def __init__(self):
self.data_queue_received = queue.Queue()
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
self.data_queue_received.put(frames)
class TestARQSession(unittest.TestCase):
@classmethod
@ -23,24 +30,22 @@ class TestARQSession(unittest.TestCase):
cls.logger = structlog.get_logger("TESTS")
# ISS
cls.iss_modem_transmit_queue = queue.Queue()
cls.iss_modem = TestModem()
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_queue = queue.Queue()
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_queue,
cls.iss_state_manager,
queue.Queue(),
cls.iss_modem_transmit_queue)
cls.iss_modem)
# IRS
cls.irs_modem_transmit_queue = queue.Queue()
cls.irs_modem = TestModem()
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_queue = queue.Queue()
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_queue,
cls.irs_state_manager,
queue.Queue(),
cls.irs_modem_transmit_queue)
cls.irs_modem)
# Frame loss probability in %
cls.loss_probability = 50
@ -48,8 +53,7 @@ class TestARQSession(unittest.TestCase):
def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER):
while True:
transmission_item = modem_transmit_queue.get()
frame_bytes = bytes(transmission_item['frame'])
frame_bytes = modem_transmit_queue.get()
if random.randint(0, 100) < self.loss_probability:
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
continue
@ -58,13 +62,13 @@ class TestARQSession(unittest.TestCase):
def establishChannels(self):
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
args=[self.iss_modem_transmit_queue,
args=[self.iss_modem.data_queue_received,
self.irs_frame_dispatcher],
name = "ISS to IRS channel")
self.iss_to_irs_channel.start()
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker,
args=[self.irs_modem_transmit_queue,
args=[self.irs_modem.data_queue_received,
self.iss_frame_dispatcher],
name = "IRS to ISS channel")
self.irs_to_iss_channel.start()
@ -80,7 +84,7 @@ class TestARQSession(unittest.TestCase):
'data': base64.b64encode(bytes("Hello world!", encoding="utf-8")),
}
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem_transmit_queue)
cmd.run(self.iss_event_queue, self.iss_modem)
if __name__ == '__main__':
unittest.main()