FreeDATA/modem/audio.py

209 lines
7 KiB
Python
Raw Normal View History

"""
Gather information about audio devices.
"""
import atexit
2022-02-17 13:25:22 +00:00
import multiprocessing
2022-09-19 21:17:07 +00:00
import crcengine
import sounddevice as sd
2022-06-30 12:05:57 +00:00
import structlog
2023-11-29 16:35:23 +00:00
import numpy as np
atexit.register(sd._terminate)
2022-09-18 15:40:11 +00:00
log = structlog.get_logger("audio")
2022-06-30 12:05:57 +00:00
2022-10-05 14:01:58 +00:00
# crc algorithm for unique audio device names
crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library
2022-04-11 09:03:54 +00:00
def get_audio_devices():
"""
return list of input and output audio devices in own process to avoid crashes of portaudio on raspberry pi
2022-05-09 00:41:49 +00:00
also uses a process data manager
"""
# we need to run this on Windows for multiprocessing support
2022-02-19 19:45:57 +00:00
# multiprocessing.freeze_support()
# multiprocessing.get_context("spawn")
2022-02-17 13:25:22 +00:00
# we need to reset and initialize sounddevice before running the multiprocessing part.
# If we are not doing this at this early point, not all devices will be displayed
#sd._terminate()
#sd._initialize()
2022-05-09 00:41:49 +00:00
2022-11-08 08:45:26 +00:00
# log.debug("[AUD] get_audio_devices")
2022-02-19 19:45:57 +00:00
with multiprocessing.Manager() as manager:
proxy_input_devices = manager.list()
proxy_output_devices = manager.list()
# print(multiprocessing.get_start_method())
proc = multiprocessing.Process(
target=fetch_audio_devices, args=(proxy_input_devices, proxy_output_devices)
)
proc.start()
proc.join()
2022-05-09 00:41:49 +00:00
2022-11-08 08:45:26 +00:00
# additional logging for audio devices
# log.debug("[AUD] get_audio_devices: input_devices:", list=f"{proxy_input_devices}")
# log.debug("[AUD] get_audio_devices: output_devices:", list=f"{proxy_output_devices}")
return list(proxy_input_devices), list(proxy_output_devices)
2022-09-20 22:24:22 +00:00
def device_crc(device) -> str:
2023-11-15 20:14:56 +00:00
crc_hwid = crc_algorithm(bytes(f"{device['name']}.{device['hostapi']}", encoding="utf-8"))
2022-09-19 21:17:07 +00:00
crc_hwid = crc_hwid.to_bytes(2, byteorder="big")
crc_hwid = crc_hwid.hex()
return crc_hwid
2022-09-19 21:17:07 +00:00
def fetch_audio_devices(input_devices, output_devices):
"""
get audio devices from portaudio
2022-05-09 00:41:49 +00:00
Args:
input_devices: proxy variable for input devices
output_devices: proxy variable for output devices
Returns:
"""
devices = sd.query_devices(device=None, kind=None)
2022-09-18 19:33:46 +00:00
2022-10-05 14:01:58 +00:00
for index, device in enumerate(devices):
# Use a try/except block because Windows doesn't have an audio device range
try:
name = device["name"]
2023-06-10 16:54:03 +00:00
# Ignore some Flex Radio devices to make device selection simpler
if name.startswith("DAX RESERVED") or name.startswith("DAX IQ"):
2023-06-11 06:29:31 +00:00
continue
2022-05-09 00:41:49 +00:00
max_output_channels = device["max_output_channels"]
max_input_channels = device["max_input_channels"]
except KeyError:
continue
except Exception as err:
print(err)
max_input_channels = 0
max_output_channels = 0
if max_input_channels > 0:
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
new_input_device = {"id": device_crc(device),
"name": device['name'],
"api": hostapi_name,
"native_index":index}
2022-10-05 14:01:58 +00:00
# check if device not in device list
if new_input_device not in input_devices:
input_devices.append(new_input_device)
2022-10-05 14:01:58 +00:00
if max_output_channels > 0:
hostapi_name = sd.query_hostapis(device['hostapi'])['name']
new_output_device = {"id": device_crc(device),
"name": device['name'],
"api": hostapi_name,
"native_index":index}
2022-10-05 14:01:58 +00:00
# check if device not in device list
if new_output_device not in output_devices:
output_devices.append(new_output_device)
# FreeData uses the crc as id inside the configuration
# SD lib uses a numerical id which is essentially an
# index of the device within the list
# returns (id, name)
def get_device_index_from_crc(crc, isInput: bool):
try:
in_devices = []
out_devices = []
fetch_audio_devices(in_devices, out_devices)
if isInput:
detected_devices = in_devices
else:
detected_devices = out_devices
for i, dev in enumerate(detected_devices):
if dev['id'] == crc:
2023-11-11 17:35:35 +00:00
return (dev['native_index'], dev['name'])
except Exception as e:
log.warning(f"Audio device {crc} not detected ", devices=detected_devices, isInput=isInput)
2023-11-11 09:41:16 +00:00
return [None, None]
def test_audio_devices(input_id: str, output_id: str) -> list:
2023-11-11 09:41:16 +00:00
test_result = [False, False]
try:
2023-11-11 17:35:35 +00:00
result = get_device_index_from_crc(input_id, True)
2023-11-11 09:41:16 +00:00
if result is None:
# in_dev_index, in_dev_name = None, None
2023-11-15 21:33:49 +00:00
raise ValueError(f"[Audio-Test] Invalid input device index {input_id}.")
2023-11-11 09:41:16 +00:00
else:
in_dev_index, in_dev_name = result
sd.check_input_settings(
device=in_dev_index,
channels=1,
dtype="int16",
samplerate=48000,
)
test_result[0] = True
except (sd.PortAudioError, ValueError) as e:
log.warning(f"[Audio-Test] Input device error ({input_id}):", e=e)
2023-11-11 09:41:16 +00:00
test_result[0] = False
try:
2023-11-11 17:35:35 +00:00
result = get_device_index_from_crc(output_id, False)
2023-11-11 09:41:16 +00:00
if result is None:
# out_dev_index, out_dev_name = None, None
2023-11-11 17:35:35 +00:00
raise ValueError(f"[Audio-Test] Invalid output device index {output_id}.")
2023-11-11 09:41:16 +00:00
else:
out_dev_index, out_dev_name = result
2023-11-11 17:35:35 +00:00
sd.check_output_settings(
2023-11-11 09:41:16 +00:00
device=out_dev_index,
channels=1,
dtype="int16",
samplerate=48000,
)
test_result[1] = True
except (sd.PortAudioError, ValueError) as e:
log.warning(f"[Audio-Test] Output device error ({output_id}):", e=e)
2023-11-11 09:41:16 +00:00
test_result[1] = False
2023-11-11 09:41:16 +00:00
sd._terminate()
sd._initialize()
return test_result
2023-11-29 16:35:23 +00:00
def set_audio_volume(datalist: np.ndarray, dB: float) -> np.ndarray:
"""
Scale values for the provided audio samples by dB.
:param datalist: Audio samples to scale
:type datalist: np.ndarray
:param dB: Decibels to scale samples, constrained to the range [-50, 50]
:type dB: float
:return: Scaled audio samples
:rtype: np.ndarray
"""
try:
dB = float(dB)
except ValueError as e:
print(f"[MDM] Changing audio volume failed with error: {e}")
dB = 0.0 # 0 dB means no change
# Clip dB value to the range [-50, 50]
dB = np.clip(dB, -30, 20)
# Ensure datalist is an np.ndarray
if not isinstance(datalist, np.ndarray):
print("[MDM] Invalid data type for datalist. Expected np.ndarray.")
return datalist
# Convert dB to linear scale
scale_factor = 10 ** (dB / 20)
# Scale samples
scaled_data = datalist * scale_factor
# Clip values to int16 range and convert data type
return np.clip(scaled_data, -32768, 32767).astype(np.int16)