FreeDATA/tnc/audio.py

97 lines
2.9 KiB
Python
Raw Normal View History

"""
Gather information about audio devices.
"""
import atexit
2022-02-17 13:25:22 +00:00
import multiprocessing
2022-09-19 21:17:07 +00:00
import crcengine
import sounddevice as sd
2022-06-30 12:05:57 +00:00
import structlog
atexit.register(sd._terminate)
2022-09-18 15:40:11 +00:00
log = structlog.get_logger("audio")
2022-06-30 12:05:57 +00:00
2022-04-11 09:03:54 +00:00
def get_audio_devices():
"""
return list of input and output audio devices in own process to avoid crashes of portaudio on raspberry pi
2022-05-09 00:41:49 +00:00
also uses a process data manager
"""
# we need to run this on Windows for multiprocessing support
2022-02-19 19:45:57 +00:00
# multiprocessing.freeze_support()
# multiprocessing.get_context("spawn")
2022-02-17 13:25:22 +00:00
# we need to reset and initialize sounddevice before running the multiprocessing part.
# If we are not doing this at this early point, not all devices will be displayed
sd._terminate()
sd._initialize()
2022-05-09 00:41:49 +00:00
2022-09-18 19:33:46 +00:00
log.error("[AUD] get_audio_devices")
2022-02-19 19:45:57 +00:00
with multiprocessing.Manager() as manager:
proxy_input_devices = manager.list()
proxy_output_devices = manager.list()
# print(multiprocessing.get_start_method())
proc = multiprocessing.Process(
target=fetch_audio_devices, args=(proxy_input_devices, proxy_output_devices)
)
proc.start()
proc.join()
2022-05-09 00:41:49 +00:00
2022-09-18 19:33:46 +00:00
log.error(f"[AUD] get_audio_devices: input_devices: {proxy_input_devices}")
log.error(f"[AUD] get_audio_devices: output_devices: {proxy_output_devices}")
return list(proxy_input_devices), list(proxy_output_devices)
2022-09-20 22:24:22 +00:00
def device_crc(device) -> str:
2022-09-19 21:17:07 +00:00
crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc8 library
2022-09-20 22:24:22 +00:00
crc_hwid = crc_algorithm(bytes(f"{device}", encoding="utf-8"))
2022-09-19 21:17:07 +00:00
crc_hwid = crc_hwid.to_bytes(2, byteorder="big")
crc_hwid = crc_hwid.hex()
2022-09-20 22:24:22 +00:00
return f"{device['name']} [{crc_hwid}]"
2022-09-19 21:17:07 +00:00
def fetch_audio_devices(input_devices, output_devices):
"""
get audio devices from portaudio
2022-05-09 00:41:49 +00:00
Args:
input_devices: proxy variable for input devices
output_devices: proxy variable for output devices
Returns:
"""
devices = sd.query_devices(device=None, kind=None)
2022-09-18 19:33:46 +00:00
2022-06-25 20:56:03 +00:00
# The use of set forces the list to contain only unique entries.
input_devs = set()
output_devs = set()
for device in devices:
# Use a try/except block because Windows doesn't have an audio device range
try:
name = device["name"]
2022-05-09 00:41:49 +00:00
max_output_channels = device["max_output_channels"]
max_input_channels = device["max_input_channels"]
except KeyError:
continue
except Exception as err:
print(err)
max_input_channels = 0
max_output_channels = 0
name = ""
if max_input_channels > 0:
2022-09-20 22:24:22 +00:00
input_devs.add(device_crc(device))
if max_output_channels > 0:
2022-09-20 22:24:22 +00:00
output_devs.add(device_crc(device))
for index, item in enumerate(input_devs):
input_devices.append({"id": index, "name": item})
for index, item in enumerate(output_devs):
output_devices.append({"id": index, "name": item})