mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
331 lines
11 KiB
Python
331 lines
11 KiB
Python
"""
|
|
Gather information about audio devices.
|
|
"""
|
|
import atexit
|
|
import multiprocessing
|
|
import crcengine
|
|
import sounddevice as sd
|
|
import structlog
|
|
import numpy as np
|
|
import queue
|
|
|
|
atexit.register(sd._terminate)
|
|
|
|
log = structlog.get_logger("audio")
|
|
|
|
# crc algorithm for unique audio device names
|
|
crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library
|
|
|
|
|
|
def get_audio_devices():
|
|
"""
|
|
return list of input and output audio devices in own process to avoid crashes of portaudio on raspberry pi
|
|
|
|
also uses a process data manager
|
|
"""
|
|
# we need to run this on Windows for multiprocessing support
|
|
# multiprocessing.freeze_support()
|
|
# multiprocessing.get_context("spawn")
|
|
|
|
# we need to reset and initialize sounddevice before running the multiprocessing part.
|
|
# If we are not doing this at this early point, not all devices will be displayed
|
|
#sd._terminate()
|
|
#sd._initialize()
|
|
|
|
# log.debug("[AUD] get_audio_devices")
|
|
with multiprocessing.Manager() as manager:
|
|
proxy_input_devices = manager.list()
|
|
proxy_output_devices = manager.list()
|
|
# print(multiprocessing.get_start_method())
|
|
proc = multiprocessing.Process(
|
|
target=fetch_audio_devices, args=(proxy_input_devices, proxy_output_devices)
|
|
)
|
|
proc.start()
|
|
proc.join()
|
|
|
|
# additional logging for audio devices
|
|
# log.debug("[AUD] get_audio_devices: input_devices:", list=f"{proxy_input_devices}")
|
|
# log.debug("[AUD] get_audio_devices: output_devices:", list=f"{proxy_output_devices}")
|
|
return list(proxy_input_devices), list(proxy_output_devices)
|
|
|
|
|
|
def device_crc(device) -> str:
|
|
crc_hwid = crc_algorithm(bytes(f"{device['name']}.{device['hostapi']}", encoding="utf-8"))
|
|
crc_hwid = crc_hwid.to_bytes(2, byteorder="big")
|
|
crc_hwid = crc_hwid.hex()
|
|
return crc_hwid
|
|
|
|
def fetch_audio_devices(input_devices, output_devices):
|
|
"""
|
|
get audio devices from portaudio
|
|
|
|
Args:
|
|
input_devices: proxy variable for input devices
|
|
output_devices: proxy variable for output devices
|
|
|
|
Returns:
|
|
|
|
"""
|
|
devices = sd.query_devices(device=None, kind=None)
|
|
|
|
for index, device in enumerate(devices):
|
|
# Use a try/except block because Windows doesn't have an audio device range
|
|
try:
|
|
name = device["name"]
|
|
# Ignore some Flex Radio devices to make device selection simpler
|
|
if name.startswith("DAX RESERVED") or name.startswith("DAX IQ"):
|
|
continue
|
|
|
|
max_output_channels = device["max_output_channels"]
|
|
max_input_channels = device["max_input_channels"]
|
|
|
|
except KeyError:
|
|
continue
|
|
except Exception as err:
|
|
print(err)
|
|
max_input_channels = 0
|
|
max_output_channels = 0
|
|
|
|
if max_input_channels > 0:
|
|
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
|
|
|
new_input_device = {"id": device_crc(device),
|
|
"name": device['name'],
|
|
"api": hostapi_name,
|
|
"native_index":index}
|
|
# check if device not in device list
|
|
if new_input_device not in input_devices:
|
|
input_devices.append(new_input_device)
|
|
|
|
if max_output_channels > 0:
|
|
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
|
|
new_output_device = {"id": device_crc(device),
|
|
"name": device['name'],
|
|
"api": hostapi_name,
|
|
"native_index":index}
|
|
# check if device not in device list
|
|
if new_output_device not in output_devices:
|
|
output_devices.append(new_output_device)
|
|
|
|
|
|
# FreeData uses the crc as id inside the configuration
|
|
# SD lib uses a numerical id which is essentially an
|
|
# index of the device within the list
|
|
# returns (id, name)
|
|
def get_device_index_from_crc(crc, isInput: bool):
|
|
try:
|
|
in_devices = []
|
|
out_devices = []
|
|
|
|
fetch_audio_devices(in_devices, out_devices)
|
|
|
|
if isInput:
|
|
detected_devices = in_devices
|
|
else:
|
|
detected_devices = out_devices
|
|
|
|
for i, dev in enumerate(detected_devices):
|
|
if dev['id'] == crc:
|
|
return (dev['native_index'], dev['name'])
|
|
|
|
except Exception as e:
|
|
log.warning(f"Audio device {crc} not detected ", devices=detected_devices, isInput=isInput)
|
|
return [None, None]
|
|
|
|
def test_audio_devices(input_id: str, output_id: str) -> list:
|
|
test_result = [False, False]
|
|
try:
|
|
result = get_device_index_from_crc(input_id, True)
|
|
if result is None:
|
|
# in_dev_index, in_dev_name = None, None
|
|
raise ValueError(f"[Audio-Test] Invalid input device index {input_id}.")
|
|
else:
|
|
in_dev_index, in_dev_name = result
|
|
sd.check_input_settings(
|
|
device=in_dev_index,
|
|
channels=1,
|
|
dtype="int16",
|
|
samplerate=48000,
|
|
)
|
|
test_result[0] = True
|
|
except (sd.PortAudioError, ValueError) as e:
|
|
log.warning(f"[Audio-Test] Input device error ({input_id}):", e=e)
|
|
test_result[0] = False
|
|
try:
|
|
result = get_device_index_from_crc(output_id, False)
|
|
if result is None:
|
|
# out_dev_index, out_dev_name = None, None
|
|
raise ValueError(f"[Audio-Test] Invalid output device index {output_id}.")
|
|
else:
|
|
out_dev_index, out_dev_name = result
|
|
sd.check_output_settings(
|
|
device=out_dev_index,
|
|
channels=1,
|
|
dtype="int16",
|
|
samplerate=48000,
|
|
)
|
|
test_result[1] = True
|
|
|
|
|
|
except (sd.PortAudioError, ValueError) as e:
|
|
log.warning(f"[Audio-Test] Output device error ({output_id}):", e=e)
|
|
test_result[1] = False
|
|
|
|
sd._terminate()
|
|
sd._initialize()
|
|
return test_result
|
|
|
|
def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray:
|
|
"""
|
|
Scale values for the provided audio samples by dB.
|
|
|
|
:param datalist: Audio samples to scale
|
|
:type datalist: np.ndarray
|
|
:param dB: Decibels to scale samples, constrained to the range [-50, 50]
|
|
:type dB: float
|
|
:return: Scaled audio samples
|
|
:rtype: np.ndarray
|
|
"""
|
|
try:
|
|
dB = float(dB)
|
|
except ValueError as e:
|
|
print(f"[MDM] Changing audio volume failed with error: {e}")
|
|
dB = 0.0 # 0 dB means no change
|
|
|
|
# Clip dB value to the range [-50, 50]
|
|
dB = np.clip(dB, -30, 20)
|
|
|
|
# Ensure datalist is an np.ndarray
|
|
if not isinstance(datalist, np.ndarray):
|
|
print("[MDM] Invalid data type for datalist. Expected np.ndarray.")
|
|
return datalist
|
|
|
|
# Convert dB to linear scale
|
|
scale_factor = 10 ** (dB / 20)
|
|
|
|
# Scale samples
|
|
scaled_data = datalist * scale_factor
|
|
|
|
# Clip values to int16 range and convert data type
|
|
return np.clip(scaled_data, -32768, 32767).astype(np.int16)
|
|
|
|
|
|
RMS_COUNTER = 0
|
|
CHANNEL_BUSY_DELAY = 0
|
|
|
|
def calculate_fft(data, fft_queue, states) -> None:
|
|
"""
|
|
Calculate an average signal strength of the channel to assess
|
|
whether the channel is "busy."
|
|
"""
|
|
# Initialize dbfs counter
|
|
# rms_counter = 0
|
|
|
|
# https://gist.github.com/ZWMiller/53232427efc5088007cab6feee7c6e4c
|
|
# Fast Fourier Transform, 10*log10(abs) is to scale it to dB
|
|
# and make sure it's not imaginary
|
|
|
|
global RMS_COUNTER, CHANNEL_BUSY_DELAY
|
|
|
|
try:
|
|
fftarray = np.fft.rfft(data)
|
|
|
|
# Set value 0 to 1 to avoid division by zero
|
|
fftarray[fftarray == 0] = 1
|
|
dfft = 10.0 * np.log10(abs(fftarray))
|
|
|
|
# get average of dfft
|
|
avg = np.mean(dfft)
|
|
|
|
# Detect signals which are higher than the
|
|
# average + 10 (+10 smoothes the output).
|
|
# Data higher than the average must be a signal.
|
|
# Therefore we are setting it to 100 so it will be highlighted
|
|
# Have to do this when we are not transmitting so our
|
|
# own sending data will not affect this too much
|
|
if not states.isTransmitting():
|
|
dfft[dfft > avg + 15] = 100
|
|
|
|
# Calculate audio dbfs
|
|
# https://stackoverflow.com/a/9763652
|
|
# calculate dbfs every 50 cycles for reducing CPU load
|
|
RMS_COUNTER += 1
|
|
if RMS_COUNTER > 5:
|
|
d = np.frombuffer(data, np.int16).astype(np.float32)
|
|
# calculate RMS and then dBFS
|
|
# https://dsp.stackexchange.com/questions/8785/how-to-compute-dbfs
|
|
# try except for avoiding runtime errors by division/0
|
|
try:
|
|
rms = int(np.sqrt(np.max(d ** 2)))
|
|
if rms == 0:
|
|
raise ZeroDivisionError
|
|
audio_dbfs = 20 * np.log10(rms / 32768)
|
|
states.set("audio_dbfs", audio_dbfs)
|
|
except Exception as e:
|
|
states.set("audio_dbfs", -100)
|
|
|
|
RMS_COUNTER = 0
|
|
|
|
# Convert data to int to decrease size
|
|
dfft = dfft.astype(int)
|
|
|
|
# Create list of dfft
|
|
dfftlist = dfft.tolist()
|
|
|
|
# Reduce area where the busy detection is enabled
|
|
# We want to have this in correlation with mode bandwidth
|
|
# TODO This is not correctly and needs to be checked for correct maths
|
|
# dfftlist[0:1] = 10,15Hz
|
|
# Bandwidth[Hz] / 10,15
|
|
# narrowband = 563Hz = 56
|
|
# wideband = 1700Hz = 167
|
|
# 1500Hz = 148
|
|
# 2700Hz = 266
|
|
# 3200Hz = 315
|
|
|
|
# slot
|
|
slot = 0
|
|
slot1 = [0, 65]
|
|
slot2 = [65,120]
|
|
slot3 = [120, 176]
|
|
slot4 = [176, 231]
|
|
slot5 = [231, len(dfftlist)]
|
|
slotbusy = [False,False,False,False,False]
|
|
|
|
# Set to true if we should increment delay count; else false to decrement
|
|
addDelay=False
|
|
for range in [slot1, slot2, slot3, slot4, slot5]:
|
|
|
|
range_start = range[0]
|
|
range_end = range[1]
|
|
# define the area, we are detecting busy state
|
|
slotdfft = dfft[range_start:range_end]
|
|
# Check for signals higher than average by checking for "100"
|
|
# If we have a signal, increment our channel_busy delay counter
|
|
# so we have a smoother state toggle
|
|
if np.sum(slotdfft[slotdfft > avg + 15]) >= 200 and not states.isTransmitting():
|
|
addDelay=True
|
|
slotbusy[slot]=True
|
|
#states.channel_busy_slot[slot] = True
|
|
# increment slot
|
|
slot += 1
|
|
states.set_channel_slot_busy(slotbusy)
|
|
if addDelay:
|
|
# Limit delay counter to a maximum of 200. The higher this value,
|
|
# the longer we will wait until releasing state
|
|
states.set("channel_busy", True)
|
|
CHANNEL_BUSY_DELAY = min(CHANNEL_BUSY_DELAY + 10, 200)
|
|
else:
|
|
# Decrement channel busy counter if no signal has been detected.
|
|
CHANNEL_BUSY_DELAY = max(CHANNEL_BUSY_DELAY - 1, 0)
|
|
# When our channel busy counter reaches 0, toggle state to False
|
|
if CHANNEL_BUSY_DELAY == 0:
|
|
states.set("channel_busy", False)
|
|
# erase queue if greater than 10
|
|
if fft_queue.qsize() >= 10:
|
|
fft_queue = queue.Queue()
|
|
fft_queue.put(dfftlist[:315]) # 315 --> bandwidth 3200
|
|
except Exception as err:
|
|
print(f"[MDM] calculate_fft: Exception: {err}")
|
|
print("[MDM] Setting fft=0")
|
|
fft_queue.put([0])
|