From d5a1a74f1a81d43c89da8b23686a2b68bf4b228c Mon Sep 17 00:00:00 2001 From: Pedro Date: Wed, 29 Nov 2023 17:35:23 +0100 Subject: [PATCH] Separate demodulation code --- modem/audio.py | 35 ++ modem/frame_dispatcher.py | 15 +- modem/modem.py | 759 ++++---------------------------------- modem/protocol_arq.py | 14 +- modem/protocol_arq_iss.py | 4 +- modem/queues.py | 6 - modem/service_manager.py | 7 +- modem/tci.py | 7 +- 8 files changed, 124 insertions(+), 723 deletions(-) diff --git a/modem/audio.py b/modem/audio.py index a42d0ca2..bcf03b93 100644 --- a/modem/audio.py +++ b/modem/audio.py @@ -6,6 +6,7 @@ import multiprocessing import crcengine import sounddevice as sd import structlog +import numpy as np atexit.register(sd._terminate) @@ -172,3 +173,37 @@ def test_audio_devices(input_id: str, output_id: str) -> list: sd._terminate() sd._initialize() return test_result + +def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray: + """ + Scale values for the provided audio samples by dB. + + :param datalist: Audio samples to scale + :type datalist: np.ndarray + :param dB: Decibels to scale samples, constrained to the range [-50, 50] + :type dB: float + :return: Scaled audio samples + :rtype: np.ndarray + """ + try: + dB = float(dB) + except ValueError as e: + print(f"[MDM] Changing audio volume failed with error: {e}") + dB = 0.0 # 0 dB means no change + + # Clip dB value to the range [-50, 50] + dB = np.clip(dB, -30, 20) + + # Ensure datalist is an np.ndarray + if not isinstance(datalist, np.ndarray): + print("[MDM] Invalid data type for datalist. Expected np.ndarray.") + return datalist + + # Convert dB to linear scale + scale_factor = 10 ** (dB / 20) + + # Scale samples + scaled_data = datalist * scale_factor + + # Clip values to int16 range and convert data type + return np.clip(scaled_data, -32768, 32767).astype(np.int16) \ No newline at end of file diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index a735f623..79f908ab 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -23,7 +23,7 @@ from protocol_arq_session import SESSION class DISPATCHER(): - def __init__(self, config, event_queue, states): + def __init__(self, config, event_queue, states, data_q_rx): self.log = structlog.get_logger("frame_dispatcher") self.log.info("loading frame dispatcher.....\n") @@ -33,7 +33,9 @@ class DISPATCHER(): self._initialize_handlers(config, event_queue, states) self._initialize_dispatchers() - self._initialize_queues() + + self.data_queue_transmit = DATA_QUEUE_TRANSMIT + self.data_queue_received = data_q_rx def _initialize_handlers(self, config, event_queue, states): """Initializes various data handlers.""" @@ -102,11 +104,6 @@ class DISPATCHER(): } - def _initialize_queues(self): - """Initializes data queues.""" - self.data_queue_transmit = DATA_QUEUE_TRANSMIT - self.data_queue_received = DATA_QUEUE_RECEIVED - def start(self): """Starts worker threads for transmit and receive operations.""" threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start() @@ -194,8 +191,8 @@ class DISPATCHER(): # we could also create an own function, which returns True. - deconstructed_frame["destination_crc"] - deconstructed_frame["origin_crc"] + #deconstructed_frame["destination_crc"] + #deconstructed_frame["origin_crc"] # check for callsign CRC _valid1, _ = helpers.check_callsign(self.arq.mycallsign, deconstructed_frame["destination_crc"], self.arq.ssid_list) diff --git a/modem/modem.py b/modem/modem.py index ebeac886..f7e22ac6 100644 --- a/modem/modem.py +++ b/modem/modem.py @@ -17,43 +17,21 @@ import threading import time from collections import deque import codec2 -import itertools import numpy as np import sounddevice as sd import structlog import tci import cw -from queues import DATA_QUEUE_RECEIVED, MODEM_RECEIVED_QUEUE, MODEM_TRANSMIT_QUEUE, RIGCTLD_COMMAND_QUEUE, \ - AUDIO_RECEIVED_QUEUE, AUDIO_TRANSMIT_QUEUE, MESH_RECEIVED_QUEUE +from queues import MODEM_TRANSMIT_QUEUE, RIGCTLD_COMMAND_QUEUE import audio import event_manager from modem_frametypes import FRAME_TYPE import beacon +import demodulator TESTMODE = False -RXCHANNEL = "" TXCHANNEL = "" -# Receive only specific modes to reduce CPU load -RECEIVE_SIG0 = True -RECEIVE_SIG1 = False -RECEIVE_DATAC1 = False -RECEIVE_DATAC3 = False -RECEIVE_DATAC4 = False - - -# state buffer - -SIG0_DATAC13_STATE = [] -SIG1_DATAC13_STATE = [] -DAT0_DATAC1_STATE = [] -DAT0_DATAC3_STATE = [] -DAT0_DATAC4_STATE = [] - -FSK_LDPC0_STATE = [] -FSK_LDPC1_STATE = [] - - class RF: """Class to encapsulate interactions between the audio device and codec2""" @@ -72,15 +50,11 @@ class RF: self.audio_input_device = config['AUDIO']['input_device'] self.audio_output_device = config['AUDIO']['output_device'] - self.rx_audio_level = config['AUDIO']['rx_audio_level'] self.tx_audio_level = config['AUDIO']['tx_audio_level'] self.enable_audio_auto_tune = config['AUDIO']['enable_auto_tune'] - self.enable_fsk = config['MODEM']['enable_fsk'] #Dynamically enable FFT data stream when a client connects to FFT web socket self.enable_fft_stream = False self.tx_delay = config['MODEM']['tx_delay'] - self.tuning_range_fmin = config['MODEM']['tuning_range_fmin'] - self.tuning_range_fmax = config['MODEM']['tuning_range_fmax'] self.radiocontrol = config['RADIO']['control'] self.rigctld_ip = config['RIGCTLD']['ip'] @@ -94,23 +68,18 @@ class RF: self.tci_ip = config['TCI']['tci_ip'] self.tci_port = config['TCI']['tci_port'] - self.buffer_overflow_counter = [0, 0, 0, 0, 0, 0, 0, 0] - self.channel_busy_delay = 0 self.AUDIO_SAMPLE_RATE_RX = 48000 self.AUDIO_SAMPLE_RATE_TX = 48000 self.MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000 - self.AUDIO_FRAMES_PER_BUFFER_RX = 2400 * 2 # 8192 # 8192 Let's do some tests with very small chunks for TX self.AUDIO_FRAMES_PER_BUFFER_TX = 1200 if self.radiocontrol in ["tci"] else 2400 * 2 # 8 * (self.AUDIO_SAMPLE_RATE_RX/self.MODEM_SAMPLE_RATE) == 48 self.AUDIO_CHANNELS = 1 self.MODE = 0 - self.is_codec2_traffic_cooldown = 20 - self.is_codec2_traffic_counter = 0 # Locking state for mod out so buffer will be filled before we can use it # https://github.com/DJ2LS/FreeDATA/issues/127 # https://github.com/DJ2LS/FreeDATA/issues/99 @@ -120,12 +89,13 @@ class RF: # Make sure our resampler will work assert (self.AUDIO_SAMPLE_RATE_RX / self.MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48 # type: ignore - self.modem_transmit_queue = MODEM_TRANSMIT_QUEUE - self.modem_received_queue = MODEM_RECEIVED_QUEUE + self.modem_received_queue = queue.Queue() - self.audio_received_queue = AUDIO_RECEIVED_QUEUE - self.audio_transmit_queue = AUDIO_TRANSMIT_QUEUE + self.audio_received_queue = queue.Queue() + self.audio_transmit_queue = queue.Queue() + + self.data_queue_received = queue.Queue() # Init FIFO queue to store modulation out in self.modoutqueue = deque() @@ -134,12 +104,16 @@ class RF: self.fft_queue = fft_queue + self.demodulator = demodulator.Demodulator(self.config, + self.audio_received_queue, + self.modem_received_queue, + self.data_queue_received, + self.states, + self.event_manager) + self.beacon = beacon.Beacon(self.config, self.states, event_queue, self.log, MODEM_TRANSMIT_QUEUE) - self.start_modem() - - # -------------------------------------------------------------------------------------------------------- def tci_tx_callback(self) -> None: """ @@ -156,13 +130,19 @@ class RF: self.tci_module.push_audio(data_out) def start_modem(self): + result = False + if not TESTMODE and self.radiocontrol not in ["tci"]: result = self.init_audio() + if not result: + raise RuntimeError("Unable to init audio devices") + self.demodulator.start(self.stream) elif not TESTMODE: result = self.init_tci() else: result = self.init_mkfifo() + if result not in [False]: # init codec2 instances self.init_codec2() @@ -170,10 +150,7 @@ class RF: # init rig control self.init_rig_control() - # init decoders - self.init_decoders() - - # init decoding threads + # init data thread self.init_data_threads() atexit.register(self.stream.stop) @@ -238,7 +215,6 @@ class RF: self.stop_modem() return False - def init_tci(self): # placeholder area for processing audio via TCI # https://github.com/maksimus1210/TCI @@ -252,7 +228,7 @@ class RF: self.stream = Object() # lets init TCI module - self.tci_module = tci.TCICtrl() + self.tci_module = tci.TCICtrl(self.audio_received_queue, self.audio_transmit_queue) tci_rx_callback_thread = threading.Thread( target=self.tci_rx_callback, @@ -298,73 +274,6 @@ class RF: ) mkfifo_read_callback_thread.start() - - def tci_rx_callback(self) -> None: - """ - Callback for TCI RX - - data_in48k must be filled with 48000Hz audio raw data - - """ - - while True: - - x = self.audio_received_queue.get() - x = np.frombuffer(x, dtype=np.int16) - # x = self.resampler.resample48_to_8(x) - - self.calculate_fft(x) - - length_x = len(x) - for data_buffer, receive in [ - (self.sig0_datac13_buffer, RECEIVE_SIG0), - (self.sig1_datac13_buffer, RECEIVE_SIG1), - (self.dat0_datac1_buffer, RECEIVE_DATAC1), - (self.dat0_datac3_buffer, RECEIVE_DATAC3), - (self.dat0_datac4_buffer, RECEIVE_DATAC4), - (self.fsk_ldpc_buffer_0, self.enable_fsk), - (self.fsk_ldpc_buffer_1, self.enable_fsk), - ]: - if ( - not (data_buffer.nbuffer + length_x) > data_buffer.size - and receive - ): - data_buffer.push(x) - - def mkfifo_read_callback(self) -> None: - """ - Support testing by reading the audio data from a pipe and - depositing the data into the codec data buffers. - """ - while True: - threading.Event().wait(0.01) - # -----read - data_in48k = bytes() - with open(RXCHANNEL, "rb") as fifo: - for line in fifo: - data_in48k += line - - while len(data_in48k) >= 48: - x = np.frombuffer(data_in48k[:48], dtype=np.int16) - x = self.resampler.resample48_to_8(x) - data_in48k = data_in48k[48:] - - length_x = len(x) - for data_buffer, receive in [ - (self.sig0_datac13_buffer, RECEIVE_SIG0), - (self.sig1_datac13_buffer, RECEIVE_SIG1), - (self.dat0_datac1_buffer, RECEIVE_DATAC1), - (self.dat0_datac3_buffer, RECEIVE_DATAC3), - (self.dat0_datac4_buffer, RECEIVE_DATAC4), - (self.fsk_ldpc_buffer_0, self.enable_fsk), - (self.fsk_ldpc_buffer_1, self.enable_fsk), - ]: - if ( - not (data_buffer.nbuffer + length_x) > data_buffer.size - and receive - ): - data_buffer.push(x) - def mkfifo_write_callback(self) -> None: """Support testing by writing the audio data to a pipe.""" while True: @@ -380,73 +289,6 @@ class RF: fifo_write.flush() fifo_write.flush() - # Callback for the audio streaming devices - def callback(self, data_in48k, outdata, frames, time, status) -> None: - """ - Receive data into appropriate queue. - - Args: - data_in48k: Incoming data received - outdata: Container for the data returned - frames: Number of frames - time: - status: - - """ - # self.log.debug("[MDM] callback") - try: - x = np.frombuffer(data_in48k, dtype=np.int16) - x = self.resampler.resample48_to_8(x) - x = set_audio_volume(x, self.rx_audio_level) - - # audio recording for debugging purposes - # TODO Find a nice place for this - #if AudioParam.audio_record: - # AudioParam.audio_record_file.writeframes(x) - - # Avoid decoding when transmitting to reduce CPU - # TODO Overriding this for testing purposes - # if not self.states.is_transmitting: - length_x = len(x) - # Avoid buffer overflow by filling only if buffer for - # selected datachannel mode is not full - for audiobuffer, receive, index in [ - (self.sig0_datac13_buffer, RECEIVE_SIG0, 0), - (self.sig1_datac13_buffer, RECEIVE_SIG1, 1), - (self.dat0_datac1_buffer, RECEIVE_DATAC1, 2), - (self.dat0_datac3_buffer, RECEIVE_DATAC3, 3), - (self.dat0_datac4_buffer, RECEIVE_DATAC4, 4), - (self.fsk_ldpc_buffer_0, self.enable_fsk, 5), - (self.fsk_ldpc_buffer_1, self.enable_fsk, 6), - ]: - if (audiobuffer.nbuffer + length_x) > audiobuffer.size: - self.buffer_overflow_counter[index] += 1 - self.event_manager.send_buffer_overflow(self.buffer_overflow_counter) - elif receive: - audiobuffer.push(x) - # end of "not self.states.is_transmitting" if block - - if not self.modoutqueue or self.mod_out_locked: - data_out48k = np.zeros(frames, dtype=np.int16) - self.calculate_fft(x) - else: - # TODO Moved to this place for testing - # Maybe we can avoid moments of silence before transmitting - self.radio.set_ptt(True) - self.event_manager.send_ptt_change(True) - - data_out48k = self.modoutqueue.popleft() - self.calculate_fft(data_out48k) - except Exception as e: - self.log.warning(f"[MDM] audio callback not ready yet: {e}") - - try: - outdata[:] = data_out48k[:frames] - except IndexError as err: - self.log.debug(f"[MDM] callback writing error: IndexError: {err}") - - # return (data_out48k, audio.pyaudio.paContinue) - # -------------------------------------------------------------------- def transmit( self, mode, repeats: int, repeat_delay: int, frames: bytearray @@ -588,7 +430,7 @@ class RF: x = np.frombuffer(txbuffer, dtype=np.int16) self.audio_auto_tune() - x = set_audio_volume(x, self.tx_audio_level) + x = audio.set_audio_volume(x, self.tx_audio_level) if not self.radiocontrol in ["tci"]: txbuffer_out = self.resampler.resample8_to_48(x) @@ -747,228 +589,9 @@ class RF: # self.log.debug("[MDM] mod out shorter than audio buffer", delta=delta) self.modoutqueue.append(c) - def init_decoders(self): - - if self.enable_fsk: - audio_thread_fsk_ldpc0 = threading.Thread( - target=self.audio_fsk_ldpc_0, name="AUDIO_THREAD FSK LDPC0", daemon=True - ) - audio_thread_fsk_ldpc0.start() - - audio_thread_fsk_ldpc1 = threading.Thread( - target=self.audio_fsk_ldpc_1, name="AUDIO_THREAD FSK LDPC1", daemon=True - ) - audio_thread_fsk_ldpc1.start() - - else: - audio_thread_sig0_datac13 = threading.Thread( - target=self.audio_sig0_datac13, name="AUDIO_THREAD DATAC13 - 0", daemon=True - ) - audio_thread_sig0_datac13.start() - - audio_thread_sig1_datac13 = threading.Thread( - target=self.audio_sig1_datac13, name="AUDIO_THREAD DATAC13 - 1", daemon=True - ) - audio_thread_sig1_datac13.start() - - audio_thread_dat0_datac1 = threading.Thread( - target=self.audio_dat0_datac1, name="AUDIO_THREAD DATAC1", daemon=True - ) - audio_thread_dat0_datac1.start() - - audio_thread_dat0_datac3 = threading.Thread( - target=self.audio_dat0_datac3, name="AUDIO_THREAD DATAC3", daemon=True - ) - audio_thread_dat0_datac3.start() - - audio_thread_dat0_datac4 = threading.Thread( - target=self.audio_dat0_datac4, name="AUDIO_THREAD DATAC4", daemon=True - ) - audio_thread_dat0_datac4.start() - - def demodulate_audio( - self, - audiobuffer: codec2.audio_buffer, - nin: int, - freedv: ctypes.c_void_p, - bytes_out, - bytes_per_frame, - state_buffer, - mode_name, - ) -> int: - """ - De-modulate supplied audio stream with supplied codec2 instance. - Decoded audio is placed into `bytes_out`. - - :param audiobuffer: Incoming audio - :type audiobuffer: codec2.audio_buffer - :param nin: Number of frames codec2 is expecting - :type nin: int - :param freedv: codec2 instance - :type freedv: ctypes.c_void_p - :param bytes_out: Demodulated audio - :type bytes_out: _type_ - :param bytes_per_frame: Number of bytes per frame - :type bytes_per_frame: int - :param state_buffer: modem states - :type state_buffer: int - :param mode_name: mode name - :type mode_name: str - :return: NIN from freedv instance - :rtype: int - """ - - nbytes = 0 - try: - while self.stream.active: - threading.Event().wait(0.01) - while audiobuffer.nbuffer >= nin: - # demodulate audio - nbytes = codec2.api.freedv_rawdatarx( - freedv, bytes_out, audiobuffer.buffer.ctypes - ) - # get current modem states and write to list - # 1 trial - # 2 sync - # 3 trial sync - # 6 decoded - # 10 error decoding == NACK - rx_status = codec2.api.freedv_get_rx_status(freedv) - - if rx_status not in [0]: - # we need to disable this if in testmode as its causing problems with FIFO it seems - if not TESTMODE: - self.states.set("is_codec2_traffic", True) - self.is_codec2_traffic_counter = self.is_codec2_traffic_cooldown - if not self.states.channel_busy: - self.log.debug("[MDM] Setting channel_busy since codec2 data detected") - self.states.set("channel_busy", True) - self.channel_busy_delay += 10 - self.log.debug( - "[MDM] [demod_audio] modem state", mode=mode_name, rx_status=rx_status, - sync_flag=codec2.api.rx_sync_flags_to_text[rx_status] - ) - else: - self.states.set("is_codec2_traffic", False) - - # decrement codec traffic counter for making state smoother - if self.is_codec2_traffic_counter > 0: - self.is_codec2_traffic_counter -= 1 - self.states.set("is_codec2_traffic", True) - else: - self.states.set("is_codec2_traffic", False) - - if rx_status == 10: - state_buffer.append(rx_status) - - audiobuffer.pop(nin) - nin = codec2.api.freedv_nin(freedv) - if nbytes == bytes_per_frame: - print(bytes(bytes_out)) - - # ignore data channel opener frames for avoiding toggle states - # use case: opener already received, but ack got lost and we are receiving - # an opener again - if mode_name in ["sig1-datac13"] and int.from_bytes(bytes(bytes_out[:1]), "big") in [ - FRAME_TYPE.ARQ_SESSION_OPEN.value, - FRAME_TYPE.ARQ_DC_OPEN_W.value, - FRAME_TYPE.ARQ_DC_OPEN_ACK_W.value, - FRAME_TYPE.ARQ_DC_OPEN_N.value, - FRAME_TYPE.ARQ_DC_OPEN_ACK_N.value - ]: - print("dropp") - elif int.from_bytes(bytes(bytes_out[:1]), "big") in [ - FRAME_TYPE.MESH_BROADCAST.value, - FRAME_TYPE.MESH_SIGNALLING_PING.value, - FRAME_TYPE.MESH_SIGNALLING_PING_ACK.value, - ]: - self.log.debug( - "[MDM] [demod_audio] moving data to mesh dispatcher", nbytes=nbytes - ) - MESH_RECEIVED_QUEUE.put([bytes(bytes_out), snr]) - - else: - self.log.debug( - "[MDM] [demod_audio] Pushing received data to received_queue", nbytes=nbytes - ) - snr = self.calculate_snr(freedv) - self.modem_received_queue.put([bytes_out, freedv, bytes_per_frame, snr]) - self.get_scatter(freedv) - state_buffer = [] - - except Exception as e: - self.log.warning("[MDM] [demod_audio] Stream not active anymore", e=e) - - return nin - def init_codec2(self): # Open codec2 instances - # DATAC13 - # SIGNALLING MODE 0 - Used for Connecting - Payload 14 Bytes - self.sig0_datac13_freedv, \ - self.sig0_datac13_bytes_per_frame, \ - self.sig0_datac13_bytes_out, \ - self.sig0_datac13_buffer, \ - self.sig0_datac13_nin = \ - self.init_codec2_mode(codec2.FREEDV_MODE.datac13.value, None) - - # DATAC13 - # SIGNALLING MODE 1 - Used for ACK/NACK - Payload 5 Bytes - self.sig1_datac13_freedv, \ - self.sig1_datac13_bytes_per_frame, \ - self.sig1_datac13_bytes_out, \ - self.sig1_datac13_buffer, \ - self.sig1_datac13_nin = \ - self.init_codec2_mode(codec2.FREEDV_MODE.datac13.value, None) - - # DATAC1 - self.dat0_datac1_freedv, \ - self.dat0_datac1_bytes_per_frame, \ - self.dat0_datac1_bytes_out, \ - self.dat0_datac1_buffer, \ - self.dat0_datac1_nin = \ - self.init_codec2_mode(codec2.FREEDV_MODE.datac1.value, None) - - # DATAC3 - self.dat0_datac3_freedv, \ - self.dat0_datac3_bytes_per_frame, \ - self.dat0_datac3_bytes_out, \ - self.dat0_datac3_buffer, \ - self.dat0_datac3_nin = \ - self.init_codec2_mode(codec2.FREEDV_MODE.datac3.value, None) - - # DATAC4 - self.dat0_datac4_freedv, \ - self.dat0_datac4_bytes_per_frame, \ - self.dat0_datac4_bytes_out, \ - self.dat0_datac4_buffer, \ - self.dat0_datac4_nin = \ - self.init_codec2_mode(codec2.FREEDV_MODE.datac4.value, None) - - - # FSK LDPC - 0 - self.fsk_ldpc_freedv_0, \ - self.fsk_ldpc_bytes_per_frame_0, \ - self.fsk_ldpc_bytes_out_0, \ - self.fsk_ldpc_buffer_0, \ - self.fsk_ldpc_nin_0 = \ - self.init_codec2_mode( - codec2.FREEDV_MODE.fsk_ldpc.value, - codec2.api.FREEDV_MODE_FSK_LDPC_0_ADV - ) - - # FSK LDPC - 1 - self.fsk_ldpc_freedv_1, \ - self.fsk_ldpc_bytes_per_frame_1, \ - self.fsk_ldpc_bytes_out_1, \ - self.fsk_ldpc_buffer_1, \ - self.fsk_ldpc_nin_1 = \ - self.init_codec2_mode( - codec2.FREEDV_MODE.fsk_ldpc.value, - codec2.api.FREEDV_MODE_FSK_LDPC_1_ADV - ) - # INIT TX MODES - here we need all modes. self.freedv_datac0_tx = codec2.open_instance(codec2.FREEDV_MODE.datac0.value) self.freedv_datac1_tx = codec2.open_instance(codec2.FREEDV_MODE.datac1.value) @@ -978,164 +601,9 @@ class RF: self.freedv_ldpc0_tx = codec2.open_instance(codec2.FREEDV_MODE.fsk_ldpc_0.value) self.freedv_ldpc1_tx = codec2.open_instance(codec2.FREEDV_MODE.fsk_ldpc_1.value) - def init_codec2_mode(self, mode, adv): - """ - Init codec2 and return some important parameters - - Args: - self: - mode: - adv: - - Returns: - c2instance, bytes_per_frame, bytes_out, audio_buffer, nin - """ - if adv: - # FSK Long-distance Parity Code 1 - data frames - c2instance = ctypes.cast( - codec2.api.freedv_open_advanced( - codec2.FREEDV_MODE.fsk_ldpc.value, - ctypes.byref(adv), - ), - ctypes.c_void_p, - ) - else: - - # create codec2 instance - c2instance = ctypes.cast( - codec2.api.freedv_open(mode), ctypes.c_void_p - ) - - # set tuning range - codec2.api.freedv_set_tuning_range( - c2instance, - ctypes.c_float(float(self.tuning_range_fmin)), - ctypes.c_float(float(self.tuning_range_fmax)), - ) - - # get bytes per frame - bytes_per_frame = int( - codec2.api.freedv_get_bits_per_modem_frame(c2instance) / 8 - ) - - # create byte out buffer - bytes_out = ctypes.create_string_buffer(bytes_per_frame) - - # set initial frames per burst - codec2.api.freedv_set_frames_per_burst(c2instance, 1) - - # init audio buffer - audio_buffer = codec2.audio_buffer(2 * self.AUDIO_FRAMES_PER_BUFFER_RX) - - # get initial nin - nin = codec2.api.freedv_nin(c2instance) - - # Additional Datac0-specific information - these are not referenced anywhere else. - # self.sig0_datac0_payload_per_frame = self.sig0_datac0_bytes_per_frame - 2 - # self.sig0_datac0_n_nom_modem_samples = codec2.api.freedv_get_n_nom_modem_samples( - # self.sig0_datac0_freedv - # ) - # self.sig0_datac0_n_tx_modem_samples = codec2.api.freedv_get_n_tx_modem_samples( - # self.sig0_datac0_freedv - # ) - # self.sig0_datac0_n_tx_preamble_modem_samples = ( - # codec2.api.freedv_get_n_tx_preamble_modem_samples(self.sig0_datac0_freedv) - # ) - # self.sig0_datac0_n_tx_postamble_modem_samples = ( - # codec2.api.freedv_get_n_tx_postamble_modem_samples(self.sig0_datac0_freedv) - # ) - - # return values - return c2instance, bytes_per_frame, bytes_out, audio_buffer, nin - - def audio_sig0_datac13(self) -> None: - """Receive data encoded with datac13 - 0""" - self.sig0_datac13_nin = self.demodulate_audio( - self.sig0_datac13_buffer, - self.sig0_datac13_nin, - self.sig0_datac13_freedv, - self.sig0_datac13_bytes_out, - self.sig0_datac13_bytes_per_frame, - SIG0_DATAC13_STATE, - "sig0-datac13" - ) - - def audio_sig1_datac13(self) -> None: - """Receive data encoded with datac13 - 1""" - self.sig1_datac13_nin = self.demodulate_audio( - self.sig1_datac13_buffer, - self.sig1_datac13_nin, - self.sig1_datac13_freedv, - self.sig1_datac13_bytes_out, - self.sig1_datac13_bytes_per_frame, - SIG1_DATAC13_STATE, - "sig1-datac13" - ) - - def audio_dat0_datac4(self) -> None: - """Receive data encoded with datac4""" - self.dat0_datac4_nin = self.demodulate_audio( - self.dat0_datac4_buffer, - self.dat0_datac4_nin, - self.dat0_datac4_freedv, - self.dat0_datac4_bytes_out, - self.dat0_datac4_bytes_per_frame, - DAT0_DATAC4_STATE, - "dat0-datac4" - ) - - def audio_dat0_datac1(self) -> None: - """Receive data encoded with datac1""" - self.dat0_datac1_nin = self.demodulate_audio( - self.dat0_datac1_buffer, - self.dat0_datac1_nin, - self.dat0_datac1_freedv, - self.dat0_datac1_bytes_out, - self.dat0_datac1_bytes_per_frame, - DAT0_DATAC1_STATE, - "dat0-datac1" - ) - - def audio_dat0_datac3(self) -> None: - """Receive data encoded with datac3""" - self.dat0_datac3_nin = self.demodulate_audio( - self.dat0_datac3_buffer, - self.dat0_datac3_nin, - self.dat0_datac3_freedv, - self.dat0_datac3_bytes_out, - self.dat0_datac3_bytes_per_frame, - DAT0_DATAC3_STATE, - "dat0-datac3" - ) - - def audio_fsk_ldpc_0(self) -> None: - """Receive data encoded with FSK + LDPC0""" - self.fsk_ldpc_nin_0 = self.demodulate_audio( - self.fsk_ldpc_buffer_0, - self.fsk_ldpc_nin_0, - self.fsk_ldpc_freedv_0, - self.fsk_ldpc_bytes_out_0, - self.fsk_ldpc_bytes_per_frame_0, - FSK_LDPC0_STATE, - "fsk_ldpc0", - ) - - def audio_fsk_ldpc_1(self) -> None: - """Receive data encoded with FSK + LDPC1""" - self.fsk_ldpc_nin_1 = self.demodulate_audio( - self.fsk_ldpc_buffer_1, - self.fsk_ldpc_nin_1, - self.fsk_ldpc_freedv_1, - self.fsk_ldpc_bytes_out_1, - self.fsk_ldpc_bytes_per_frame_1, - FSK_LDPC1_STATE, - "fsk_ldpc1", - ) - def init_data_threads(self): - # self.log.debug("[MDM] Starting worker_receive") worker_received = threading.Thread( - target=self.worker_received, name="WORKER_THREAD", daemon=True + target=self.demodulator.worker_received, name="WORKER_THREAD", daemon=True ) worker_received.start() @@ -1144,6 +612,44 @@ class RF: ) worker_transmit.start() + # Callback for the audio streaming devices + def callback(self, data_in48k, outdata, frames, time, status) -> None: + """ + Receive data into appropriate queue. + + Args: + data_in48k: Incoming data received + outdata: Container for the data returned + frames: Number of frames + time: + status: + + """ + # self.log.debug("[MDM] callback") + try: + processed_audio_in = self.demodulator.on_audio_received(data_in48k) + + if not self.modoutqueue or self.mod_out_locked: + data_out48k = np.zeros(frames, dtype=np.int16) + self.calculate_fft(processed_audio_in) + else: + # TODO Moved to this place for testing + # Maybe we can avoid moments of silence before transmitting + self.radio.set_ptt(True) + self.event_manager.send_ptt_change(True) + + data_out48k = self.modoutqueue.popleft() + self.calculate_fft(data_out48k) + except Exception as e: + self.log.warning(f"[MDM] audio callback not ready yet: {e}") + + try: + outdata[:] = data_out48k[:frames] + except IndexError as err: + self.log.debug(f"[MDM] callback writing error: IndexError: {err}") + + # return (data_out48k, audio.pyaudio.paContinue) + def worker_transmit(self) -> None: """Worker for FIFO queue for processing frames to be transmitted""" while True: @@ -1161,18 +667,6 @@ class RF: self.transmit(tx['mode'], tx['repeat'], tx['repeat_delay'], [tx['frame']]) # self.modem_transmit_queue.task_done() - def worker_received(self) -> None: - """Worker for FIFO queue for processing received frames""" - while True: - data = self.modem_received_queue.get() - self.log.debug("[MDM] worker_received: received data!") - # data[0] = bytes_out - # data[1] = freedv session - # data[2] = bytes_per_frame - # data[3] = snr - DATA_QUEUE_RECEIVED.put([data[0], data[1], data[2], data[3]]) - self.modem_received_queue.task_done() - def get_frequency_offset(self, freedv: ctypes.c_void_p) -> float: """ Ask codec2 for the calculated (audio) frequency offset of the received signal. @@ -1187,75 +681,6 @@ class RF: offset = round(modemStats.foff) * (-1) return offset - def get_scatter(self, freedv: ctypes.c_void_p) -> None: - """ - Ask codec2 for data about the received signal and calculate the scatter plot. - - :param freedv: codec2 instance to query - :type freedv: ctypes.c_void_p - """ - - modemStats = codec2.MODEMSTATS() - ctypes.cast( - codec2.api.freedv_get_modem_extended_stats(freedv, ctypes.byref(modemStats)), - ctypes.c_void_p, - ) - - scatterdata = [] - # original function before itertool - # for i in range(codec2.MODEM_STATS_NC_MAX): - # for j in range(1, codec2.MODEM_STATS_NR_MAX, 2): - # # print(f"{modemStats.rx_symbols[i][j]} - {modemStats.rx_symbols[i][j]}") - # xsymbols = round(modemStats.rx_symbols[i][j - 1] // 1000) - # ysymbols = round(modemStats.rx_symbols[i][j] // 1000) - # if xsymbols != 0.0 and ysymbols != 0.0: - # scatterdata.append({"x": str(xsymbols), "y": str(ysymbols)}) - - for i, j in itertools.product(range(codec2.MODEM_STATS_NC_MAX), range(1, codec2.MODEM_STATS_NR_MAX, 2)): - # print(f"{modemStats.rx_symbols[i][j]} - {modemStats.rx_symbols[i][j]}") - xsymbols = round(modemStats.rx_symbols[i][j - 1] // 1000) - ysymbols = round(modemStats.rx_symbols[i][j] // 1000) - if xsymbols != 0.0 and ysymbols != 0.0: - scatterdata.append({"x": str(xsymbols), "y": str(ysymbols)}) - - # Send all the data if we have too-few samples, otherwise send a sampling - if 150 > len(scatterdata) > 0: - self.event_manager.send_scatter_change(scatterdata) - - else: - # only take every tenth data point - self.event_manager.send_scatter_change(scatterdata[::10]) - - def calculate_snr(self, freedv: ctypes.c_void_p) -> float: - """ - Ask codec2 for data about the received signal and calculate - the signal-to-noise ratio. - - :param freedv: codec2 instance to query - :type freedv: ctypes.c_void_p - :return: Signal-to-noise ratio of the decoded data - :rtype: float - """ - try: - modem_stats_snr = ctypes.c_float() - modem_stats_sync = ctypes.c_int() - - codec2.api.freedv_get_modem_stats( - freedv, ctypes.byref(modem_stats_sync), ctypes.byref(modem_stats_snr) - ) - modem_stats_snr = modem_stats_snr.value - modem_stats_sync = modem_stats_sync.value - - snr = round(modem_stats_snr, 1) - self.log.info("[MDM] calculate_snr: ", snr=snr) - # snr = np.clip( - # snr, -127, 127 - # ) # limit to max value of -128/128 as a possible fix of #188 - return snr - except Exception as err: - self.log.error(f"[MDM] calculate_snr: Exception: {err}") - return 0 - def init_rig_control(self): # Check how we want to control the radio if self.radiocontrol == "rigctld": @@ -1441,24 +866,6 @@ class RF: # else 0 self.fft_queue.put([0]) - def set_frames_per_burst(self, frames_per_burst: int) -> None: - """ - Configure codec2 to send the configured number of frames per burst. - - :param frames_per_burst: Number of frames per burst requested - :type frames_per_burst: int - """ - # Limit frames per burst to acceptable values - frames_per_burst = min(frames_per_burst, 1) - frames_per_burst = max(frames_per_burst, 5) - - frames_per_burst = 1 - - codec2.api.freedv_set_frames_per_burst(self.dat0_datac1_freedv, frames_per_burst) - codec2.api.freedv_set_frames_per_burst(self.dat0_datac3_freedv, frames_per_burst) - codec2.api.freedv_set_frames_per_burst(self.dat0_datac4_freedv, frames_per_burst) - codec2.api.freedv_set_frames_per_burst(self.fsk_ldpc_freedv_0, frames_per_burst) - def reset_data_sync(self) -> None: """ reset sync state for data modes @@ -1467,49 +874,15 @@ class RF: :type frames_per_burst: int """ - codec2.api.freedv_set_sync(self.dat0_datac1_freedv, 0) - codec2.api.freedv_set_sync(self.dat0_datac3_freedv, 0) - codec2.api.freedv_set_sync(self.dat0_datac4_freedv, 0) - codec2.api.freedv_set_sync(self.fsk_ldpc_freedv_0, 0) + #codec2.api.freedv_set_sync(self.dat0_datac1_freedv, 0) + #codec2.api.freedv_set_sync(self.dat0_datac3_freedv, 0) + #codec2.api.freedv_set_sync(self.dat0_datac4_freedv, 0) + #codec2.api.freedv_set_sync(self.fsk_ldpc_freedv_0, 0) def set_FFT_stream(self, enable: bool): # Set config boolean regarding wheter it should sent FFT data to queue self.enable_fft_stream = enable -def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray: - """ - Scale values for the provided audio samples by dB. - - :param datalist: Audio samples to scale - :type datalist: np.ndarray - :param dB: Decibels to scale samples, constrained to the range [-50, 50] - :type dB: float - :return: Scaled audio samples - :rtype: np.ndarray - """ - try: - dB = float(dB) - except ValueError as e: - print(f"[MDM] Changing audio volume failed with error: {e}") - dB = 0.0 # 0 dB means no change - - # Clip dB value to the range [-50, 50] - dB = np.clip(dB, -30, 20) - - # Ensure datalist is an np.ndarray - if not isinstance(datalist, np.ndarray): - print("[MDM] Invalid data type for datalist. Expected np.ndarray.") - return datalist - - # Convert dB to linear scale - scale_factor = 10 ** (dB / 20) - - # Scale samples - scaled_data = datalist * scale_factor - - # Clip values to int16 range and convert data type - return np.clip(scaled_data, -32768, 32767).astype(np.int16) - def get_modem_error_state(): """ get current state buffer and return True of contains 10 diff --git a/modem/protocol_arq.py b/modem/protocol_arq.py index c5c0c859..855fba9a 100644 --- a/modem/protocol_arq.py +++ b/modem/protocol_arq.py @@ -413,13 +413,13 @@ class ARQ: self.rx_n_frames_per_burst = 0 # reset modem receiving state to reduce cpu load - modem.RECEIVE_SIG0 = True - modem.RECEIVE_SIG1 = False - modem.RECEIVE_DATAC1 = False - modem.RECEIVE_DATAC3 = False - modem.RECEIVE_DATAC4 = False - # modem.RECEIVE_FSK_LDPC_0 = False - modem.RECEIVE_FSK_LDPC_1 = False + modem.demodulator.RECEIVE_SIG0 = True + modem.demodulator.RECEIVE_SIG1 = False + modem.demodulator.RECEIVE_DATAC1 = False + modem.demodulator.RECEIVE_DATAC3 = False + modem.demodulator.RECEIVE_DATAC4 = False + # modem.demodulator.RECEIVE_FSK_LDPC_0 = False + modem.demodulator.RECEIVE_FSK_LDPC_1 = False self.is_IRS = False self.burst_nack = False diff --git a/modem/protocol_arq_iss.py b/modem/protocol_arq_iss.py index b8be1053..7a1b7fe7 100644 --- a/modem/protocol_arq_iss.py +++ b/modem/protocol_arq_iss.py @@ -40,8 +40,8 @@ class ISS(ARQ): # set signalling modes we want to listen to # we are in an ongoing arq transmission, so we don't need sig0 actually - modem.RECEIVE_SIG0 = False - modem.RECEIVE_SIG1 = True + modem.demodulator.RECEIVE_SIG0 = False + modem.demodulator.RECEIVE_SIG1 = True self.tx_n_retry_of_burst = 0 # retries we already sent data # Maximum number of retries to send before declaring a frame is lost diff --git a/modem/queues.py b/modem/queues.py index 7237f525..7abc2f4e 100644 --- a/modem/queues.py +++ b/modem/queues.py @@ -7,7 +7,6 @@ DATA_QUEUE_TRANSMIT = queue.Queue() DATA_QUEUE_RECEIVED = queue.Queue() # Initialize FIFO queue to store received frames -MODEM_RECEIVED_QUEUE = queue.Queue() MODEM_TRANSMIT_QUEUE = queue.Queue() # Initialize FIFO queue to store received frames @@ -15,11 +14,6 @@ MESH_RECEIVED_QUEUE = queue.Queue() MESH_QUEUE_TRANSMIT = queue.Queue() MESH_SIGNALLING_TABLE = [] - -# Initialize FIFO queue to store audio frames -AUDIO_RECEIVED_QUEUE = queue.Queue() -AUDIO_TRANSMIT_QUEUE = queue.Queue() - # Initialize FIFO queue to finally store received data # TODO Fix rx_buffer_size RX_BUFFER = queue.Queue(maxsize=16) diff --git a/modem/service_manager.py b/modem/service_manager.py index fa00b30d..72b4568f 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -1,5 +1,4 @@ import threading -import data_handler import frame_dispatcher import modem import structlog @@ -82,11 +81,15 @@ class SM: self.log.info("starting modem....") self.modem = modem.RF(self.config, self.modem_events, self.modem_fft, self.modem_service, self.states) - self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config, self.modem_events, self.states) + self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config, + self.modem_events, + self.states, + self.modem.data_queue_received) self.frame_dispatcher.start() self.states.set("is_modem_running", True) self.modem.set_FFT_stream(self.enable_fft_stream) + self.modem.start_modem() return True diff --git a/modem/tci.py b/modem/tci.py index 45fd97e8..9809bff9 100644 --- a/modem/tci.py +++ b/modem/tci.py @@ -5,15 +5,14 @@ import structlog import threading import websocket import time -from queues import AUDIO_TRANSMIT_QUEUE, AUDIO_RECEIVED_QUEUE class TCICtrl: - def __init__(self, hostname='127.0.0.1', port=50001): + def __init__(self, audio_rx_q, audio_tx_q, hostname='127.0.0.1', port=50001): # websocket.enableTrace(True) self.log = structlog.get_logger("TCI") - self.audio_received_queue = AUDIO_RECEIVED_QUEUE - self.audio_transmit_queue = AUDIO_TRANSMIT_QUEUE + self.audio_received_queue = audio_rx_q + self.audio_transmit_queue = audio_tx_q self.hostname = str(hostname) self.port = str(port)