""" Gather information about audio devices. """ import multiprocessing import crcengine import sounddevice as sd import structlog import numpy as np import queue log = structlog.get_logger("audio") # crc algorithm for unique audio device names crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library def get_audio_devices(): """ return list of input and output audio devices in own process to avoid crashes of portaudio on raspberry pi also uses a process data manager """ # we need to run this on Windows for multiprocessing support # multiprocessing.freeze_support() # multiprocessing.get_context("spawn") # we need to reset and initialize sounddevice before running the multiprocessing part. # If we are not doing this at this early point, not all devices will be displayed #sd._terminate() #sd._initialize() # log.debug("[AUD] get_audio_devices") with multiprocessing.Manager() as manager: proxy_input_devices = manager.list() proxy_output_devices = manager.list() # print(multiprocessing.get_start_method()) proc = multiprocessing.Process( target=fetch_audio_devices, args=(proxy_input_devices, proxy_output_devices) ) proc.start() proc.join() # additional logging for audio devices # log.debug("[AUD] get_audio_devices: input_devices:", list=f"{proxy_input_devices}") # log.debug("[AUD] get_audio_devices: output_devices:", list=f"{proxy_output_devices}") return list(proxy_input_devices), list(proxy_output_devices) def device_crc(device) -> str: crc_hwid = crc_algorithm(bytes(f"{device['name']}.{device['hostapi']}", encoding="utf-8")) crc_hwid = crc_hwid.to_bytes(2, byteorder="big") crc_hwid = crc_hwid.hex() return crc_hwid def fetch_audio_devices(input_devices, output_devices): """ get audio devices from portaudio Args: input_devices: proxy variable for input devices output_devices: proxy variable for output devices Returns: """ devices = sd.query_devices(device=None, kind=None) for index, device in enumerate(devices): # Use a try/except block because Windows doesn't have an audio device range try: name = device["name"] # Ignore some Flex Radio devices to make device selection simpler if name.startswith("DAX RESERVED") or name.startswith("DAX IQ"): continue max_output_channels = device["max_output_channels"] max_input_channels = device["max_input_channels"] except KeyError: continue except Exception as err: print(err) max_input_channels = 0 max_output_channels = 0 if max_input_channels > 0: hostapi_name = sd.query_hostapis(device['hostapi'])['name'] new_input_device = {"id": device_crc(device), "name": device['name'], "api": hostapi_name, "native_index":index} # check if device not in device list if new_input_device not in input_devices: input_devices.append(new_input_device) if max_output_channels > 0: hostapi_name = sd.query_hostapis(device['hostapi'])['name'] new_output_device = {"id": device_crc(device), "name": device['name'], "api": hostapi_name, "native_index":index} # check if device not in device list if new_output_device not in output_devices: output_devices.append(new_output_device) # FreeData uses the crc as id inside the configuration # SD lib uses a numerical id which is essentially an # index of the device within the list # returns (id, name) def get_device_index_from_crc(crc, isInput: bool): try: in_devices = [] out_devices = [] fetch_audio_devices(in_devices, out_devices) if isInput: detected_devices = in_devices else: detected_devices = out_devices for i, dev in enumerate(detected_devices): if dev['id'] == crc: return (dev['native_index'], dev['name']) except Exception as e: log.warning(f"Audio device {crc} not detected ", devices=detected_devices, isInput=isInput) return [None, None] def test_audio_devices(input_id: str, output_id: str) -> list: test_result = [False, False] try: result = get_device_index_from_crc(input_id, True) if result is None: # in_dev_index, in_dev_name = None, None raise ValueError(f"[Audio-Test] Invalid input device index {input_id}.") else: in_dev_index, in_dev_name = result sd.check_input_settings( device=in_dev_index, channels=1, dtype="int16", samplerate=48000, ) test_result[0] = True except (sd.PortAudioError, ValueError) as e: log.warning(f"[Audio-Test] Input device error ({input_id}):", e=e) test_result[0] = False try: result = get_device_index_from_crc(output_id, False) if result is None: # out_dev_index, out_dev_name = None, None raise ValueError(f"[Audio-Test] Invalid output device index {output_id}.") else: out_dev_index, out_dev_name = result sd.check_output_settings( device=out_dev_index, channels=1, dtype="int16", samplerate=48000, ) test_result[1] = True except (sd.PortAudioError, ValueError) as e: log.warning(f"[Audio-Test] Output device error ({output_id}):", e=e) test_result[1] = False sd._terminate() sd._initialize() return test_result def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray: """ Scale values for the provided audio samples by dB. :param datalist: Audio samples to scale :type datalist: np.ndarray :param dB: Decibels to scale samples, constrained to the range [-50, 50] :type dB: float :return: Scaled audio samples :rtype: np.ndarray """ try: dB = float(dB) except ValueError as e: print(f"[MDM] Changing audio volume failed with error: {e}") dB = 0.0 # 0 dB means no change # Clip dB value to the range [-50, 50] dB = np.clip(dB, -30, 20) # Ensure datalist is an np.ndarray if not isinstance(datalist, np.ndarray): print("[MDM] Invalid data type for datalist. Expected np.ndarray.") return datalist # Convert dB to linear scale scale_factor = 10 ** (dB / 20) # Scale samples scaled_data = datalist * scale_factor # Clip values to int16 range and convert data type return np.clip(scaled_data, -32768, 32767).astype(np.int16) RMS_COUNTER = 0 CHANNEL_BUSY_DELAY = 0 def calculate_fft(data, fft_queue, states) -> None: """ Calculate an average signal strength of the channel to assess whether the channel is "busy." """ # Initialize dbfs counter # rms_counter = 0 # https://gist.github.com/ZWMiller/53232427efc5088007cab6feee7c6e4c # Fast Fourier Transform, 10*log10(abs) is to scale it to dB # and make sure it's not imaginary global RMS_COUNTER, CHANNEL_BUSY_DELAY try: data = bytearray() fftarray = np.fft.rfft(data) # Set value 0 to 1 to avoid division by zero fftarray[fftarray == 0] = 1 dfft = 10.0 * np.log10(abs(fftarray)) # get average of dfft avg = np.mean(dfft) # Detect signals which are higher than the # average + 10 (+10 smoothes the output). # Data higher than the average must be a signal. # Therefore we are setting it to 100 so it will be highlighted # Have to do this when we are not transmitting so our # own sending data will not affect this too much if not states.isTransmitting(): dfft[dfft > avg + 15] = 100 # Calculate audio dbfs # https://stackoverflow.com/a/9763652 # calculate dbfs every 50 cycles for reducing CPU load RMS_COUNTER += 1 if RMS_COUNTER > 5: d = np.frombuffer(data, np.int16).astype(np.float32) # calculate RMS and then dBFS # https://dsp.stackexchange.com/questions/8785/how-to-compute-dbfs # try except for avoiding runtime errors by division/0 try: rms = int(np.sqrt(np.max(d ** 2))) if rms == 0: raise ZeroDivisionError audio_dbfs = 20 * np.log10(rms / 32768) states.set("audio_dbfs", audio_dbfs) except Exception as e: states.set("audio_dbfs", -100) RMS_COUNTER = 0 # Convert data to int to decrease size dfft = dfft.astype(int) # Create list of dfft dfftlist = dfft.tolist() # Reduce area where the busy detection is enabled # We want to have this in correlation with mode bandwidth # TODO This is not correctly and needs to be checked for correct maths # dfftlist[0:1] = 10,15Hz # Bandwidth[Hz] / 10,15 # narrowband = 563Hz = 56 # wideband = 1700Hz = 167 # 1500Hz = 148 # 2700Hz = 266 # 3200Hz = 315 # slot slot = 0 slot1 = [0, 65] slot2 = [65,120] slot3 = [120, 176] slot4 = [176, 231] slot5 = [231, len(dfftlist)] slotbusy = [False,False,False,False,False] # Set to true if we should increment delay count; else false to decrement addDelay=False for range in [slot1, slot2, slot3, slot4, slot5]: range_start = range[0] range_end = range[1] # define the area, we are detecting busy state slotdfft = dfft[range_start:range_end] # Check for signals higher than average by checking for "100" # If we have a signal, increment our channel_busy delay counter # so we have a smoother state toggle if np.sum(slotdfft[slotdfft > avg + 15]) >= 200 and not states.isTransmitting(): addDelay=True slotbusy[slot]=True #states.channel_busy_slot[slot] = True # increment slot slot += 1 states.set_channel_slot_busy(slotbusy) if addDelay: # Limit delay counter to a maximum of 200. The higher this value, # the longer we will wait until releasing state states.set_channel_busy_condition_traffic(True) CHANNEL_BUSY_DELAY = min(CHANNEL_BUSY_DELAY + 10, 200) else: # Decrement channel busy counter if no signal has been detected. CHANNEL_BUSY_DELAY = max(CHANNEL_BUSY_DELAY - 1, 0) # When our channel busy counter reaches 0, toggle state to False if CHANNEL_BUSY_DELAY == 0: states.set_channel_busy_condition_traffic(False) # erase queue if greater than 3 if fft_queue.qsize() >= 1: fft_queue = queue.Queue() #fft_queue.put(dfftlist[:315]) # 315 --> bandwidth 3200 fft_queue.put(dfftlist) # 315 --> bandwidth 3200 except Exception as err: print(f"[MDM] calculate_fft: Exception: {err}")