mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
328 lines
12 KiB
Python
328 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Created on Wed Dec 23 07:04:24 2020
|
|
|
|
@author: DJ2LS
|
|
"""
|
|
|
|
# pylint: disable=invalid-name, line-too-long, c-extension-no-member
|
|
# pylint: disable=import-outside-toplevel
|
|
|
|
import queue
|
|
import time
|
|
import codec2
|
|
import numpy as np
|
|
import sounddevice as sd
|
|
import structlog
|
|
import tci
|
|
import cw
|
|
import audio
|
|
import demodulator
|
|
import modulator
|
|
|
|
TESTMODE = False
|
|
|
|
class RF:
|
|
"""Class to encapsulate interactions between the audio device and codec2"""
|
|
|
|
log = structlog.get_logger("RF")
|
|
|
|
def __init__(self, config, event_manager, fft_queue, service_queue, states, radio_manager) -> None:
|
|
self.config = config
|
|
self.service_queue = service_queue
|
|
self.states = states
|
|
self.event_manager = event_manager
|
|
self.radio = radio_manager
|
|
self.sampler_avg = 0
|
|
self.buffer_avg = 0
|
|
|
|
# these are crc ids now
|
|
self.audio_input_device = config['AUDIO']['input_device']
|
|
self.audio_output_device = config['AUDIO']['output_device']
|
|
|
|
|
|
|
|
self.radiocontrol = config['RADIO']['control']
|
|
self.rigctld_ip = config['RIGCTLD']['ip']
|
|
self.rigctld_port = config['RIGCTLD']['port']
|
|
|
|
self.tci_ip = config['TCI']['tci_ip']
|
|
self.tci_port = config['TCI']['tci_port']
|
|
|
|
self.tx_audio_level = config['AUDIO']['tx_audio_level']
|
|
self.rx_audio_level = config['AUDIO']['rx_audio_level']
|
|
|
|
|
|
self.ptt_state = False
|
|
self.enqueuing_audio = False # set to True, while we are processing audio
|
|
|
|
self.AUDIO_SAMPLE_RATE = 48000
|
|
self.modem_sample_rate = codec2.api.FREEDV_FS_8000
|
|
|
|
# 8192 Let's do some tests with very small chunks for TX
|
|
#self.AUDIO_FRAMES_PER_BUFFER_TX = 1200 if self.radiocontrol in ["tci"] else 2400 * 2
|
|
# 8 * (self.AUDIO_SAMPLE_RATE/self.modem_sample_rate) == 48
|
|
self.AUDIO_CHANNELS = 1
|
|
self.MODE = 0
|
|
self.rms_counter = 0
|
|
|
|
self.audio_out_queue = queue.Queue()
|
|
|
|
# Make sure our resampler will work
|
|
assert (self.AUDIO_SAMPLE_RATE / self.modem_sample_rate) == codec2.api.FDMDV_OS_48 # type: ignore
|
|
|
|
self.audio_received_queue = queue.Queue()
|
|
self.data_queue_received = queue.Queue()
|
|
self.fft_queue = fft_queue
|
|
|
|
self.demodulator = demodulator.Demodulator(self.config,
|
|
self.audio_received_queue,
|
|
self.data_queue_received,
|
|
self.states,
|
|
self.event_manager,
|
|
self.service_queue,
|
|
self.fft_queue
|
|
)
|
|
|
|
self.modulator = modulator.Modulator(self.config)
|
|
|
|
|
|
|
|
def tci_tx_callback(self, audio_48k) -> None:
|
|
self.radio.set_ptt(True)
|
|
self.event_manager.send_ptt_change(True)
|
|
self.tci_module.push_audio(audio_48k)
|
|
|
|
def start_modem(self):
|
|
if TESTMODE:
|
|
return True
|
|
elif self.radiocontrol.lower() == "tci":
|
|
if not self.init_tci():
|
|
return False
|
|
else:
|
|
if not self.init_audio():
|
|
raise RuntimeError("Unable to init audio devices")
|
|
self.demodulator.start(self.sd_input_stream)
|
|
|
|
return True
|
|
|
|
def stop_modem(self):
|
|
try:
|
|
# let's stop the modem service
|
|
self.service_queue.put("stop")
|
|
# simulate audio class active state for reducing cli output
|
|
# self.stream = lambda: None
|
|
# self.stream.active = False
|
|
# self.stream.stop
|
|
|
|
except Exception:
|
|
self.log.error("[MDM] Error stopping modem")
|
|
|
|
def init_audio(self):
|
|
self.log.info(f"[MDM] init: get audio devices", input_device=self.audio_input_device,
|
|
output_device=self.audio_output_device)
|
|
try:
|
|
result = audio.get_device_index_from_crc(self.audio_input_device, True)
|
|
if result is None:
|
|
raise ValueError("Invalid input device")
|
|
else:
|
|
in_dev_index, in_dev_name = result
|
|
|
|
result = audio.get_device_index_from_crc(self.audio_output_device, False)
|
|
if result is None:
|
|
raise ValueError("Invalid output device")
|
|
else:
|
|
out_dev_index, out_dev_name = result
|
|
|
|
self.log.info(f"[MDM] init: receiving audio from '{in_dev_name}'")
|
|
self.log.info(f"[MDM] init: transmiting audio on '{out_dev_name}'")
|
|
self.log.debug("[MDM] init: starting pyaudio callback and decoding threads")
|
|
|
|
sd.default.samplerate = self.AUDIO_SAMPLE_RATE
|
|
sd.default.device = (in_dev_index, out_dev_index)
|
|
|
|
# init codec2 resampler
|
|
self.resampler = codec2.resampler()
|
|
|
|
# SoundDevice audio input stream
|
|
self.sd_input_stream = sd.InputStream(
|
|
channels=1,
|
|
dtype="int16",
|
|
callback=self.sd_input_audio_callback,
|
|
device=in_dev_index,
|
|
samplerate=self.AUDIO_SAMPLE_RATE,
|
|
blocksize=4800,
|
|
)
|
|
self.sd_input_stream.start()
|
|
|
|
self.sd_output_stream = sd.OutputStream(
|
|
channels=1,
|
|
dtype="int16",
|
|
callback=self.sd_output_audio_callback,
|
|
device=out_dev_index,
|
|
samplerate=self.AUDIO_SAMPLE_RATE,
|
|
blocksize=4800,
|
|
)
|
|
self.sd_output_stream.start()
|
|
|
|
return True
|
|
|
|
except Exception as audioerr:
|
|
self.log.error("[MDM] init: starting pyaudio callback failed", e=audioerr)
|
|
self.stop_modem()
|
|
return False
|
|
|
|
def init_tci(self):
|
|
# placeholder area for processing audio via TCI
|
|
# https://github.com/maksimus1210/TCI
|
|
self.log.warning("[MDM] [TCI] Not yet fully implemented", ip=self.tci_ip, port=self.tci_port)
|
|
|
|
# we are trying this by simulating an audio stream Object like with mkfifo
|
|
class Object:
|
|
"""An object for simulating audio stream"""
|
|
active = True
|
|
|
|
self.stream = Object()
|
|
|
|
# lets init TCI module
|
|
self.tci_module = tci.TCICtrl(self.audio_received_queue)
|
|
|
|
return True
|
|
|
|
|
|
|
|
def transmit_morse(self, repeats, repeat_delay, frames):
|
|
self.states.waitForTransmission()
|
|
self.states.setTransmitting(True)
|
|
# if we're transmitting FreeDATA signals, reset channel busy state
|
|
self.log.debug(
|
|
"[MDM] TRANSMIT", mode="MORSE"
|
|
)
|
|
start_of_transmission = time.time()
|
|
|
|
txbuffer_out = cw.MorseCodePlayer().text_to_signal(self.config['STATION'].mycall)
|
|
|
|
# transmit audio
|
|
self.enqueue_audio_out(txbuffer_out)
|
|
|
|
end_of_transmission = time.time()
|
|
transmission_time = end_of_transmission - start_of_transmission
|
|
self.log.debug("[MDM] ON AIR TIME", time=transmission_time)
|
|
|
|
|
|
def transmit(
|
|
self, mode, repeats: int, repeat_delay: int, frames: bytearray
|
|
) -> bool:
|
|
|
|
self.demodulator.reset_data_sync()
|
|
|
|
# Wait for some other thread that might be transmitting
|
|
self.states.waitForTransmission()
|
|
self.states.setTransmitting(True)
|
|
# self.states.channel_busy_event.wait()
|
|
|
|
start_of_transmission = time.time()
|
|
txbuffer = self.modulator.create_burst(mode, repeats, repeat_delay, frames)
|
|
|
|
# Re-sample back up to 48k (resampler works on np.int16)
|
|
x = np.frombuffer(txbuffer, dtype=np.int16)
|
|
x = audio.set_audio_volume(x, self.tx_audio_level)
|
|
|
|
if self.radiocontrol not in ["tci"]:
|
|
txbuffer_out = self.resampler.resample8_to_48(x)
|
|
else:
|
|
txbuffer_out = x
|
|
|
|
# transmit audio
|
|
self.enqueue_audio_out(txbuffer_out)
|
|
|
|
end_of_transmission = time.time()
|
|
transmission_time = end_of_transmission - start_of_transmission
|
|
self.log.debug("[MDM] ON AIR TIME", time=transmission_time)
|
|
|
|
|
|
|
|
def enqueue_audio_out(self, audio_48k) -> None:
|
|
self.enqueuing_audio = True
|
|
|
|
if not self.states.isTransmitting():
|
|
self.states.setTransmitting(True)
|
|
|
|
self.radio.set_ptt(True)
|
|
self.event_manager.send_ptt_change(True)
|
|
|
|
if self.radiocontrol in ["tci"]:
|
|
self.tci_tx_callback(audio_48k)
|
|
# we need to wait manually for tci processing
|
|
self.tci_module.wait_until_transmitted(audio_48k)
|
|
else:
|
|
# slice audio data to needed blocklength
|
|
block_size = 4800
|
|
pad_length = -len(audio_48k) % block_size
|
|
padded_data = np.pad(audio_48k, (0, pad_length), mode='constant')
|
|
sliced_audio_data = padded_data.reshape(-1, block_size)
|
|
# add each block to audio out queue
|
|
for block in sliced_audio_data:
|
|
self.audio_out_queue.put(block)
|
|
|
|
self.enqueuing_audio = False
|
|
self.states.transmitting_event.wait()
|
|
|
|
self.radio.set_ptt(False)
|
|
self.event_manager.send_ptt_change(False)
|
|
|
|
return
|
|
|
|
def sd_output_audio_callback(self, outdata: np.ndarray, frames: int, time, status) -> None:
|
|
try:
|
|
if not self.audio_out_queue.empty():
|
|
chunk = self.audio_out_queue.get_nowait()
|
|
audio.calculate_fft(chunk, self.fft_queue, self.states)
|
|
outdata[:] = chunk.reshape(outdata.shape)
|
|
|
|
else:
|
|
# reset transmitting state only, if we are not actively processing audio
|
|
# for avoiding a ptt toggle state bug
|
|
if not self.enqueuing_audio:
|
|
self.states.setTransmitting(False)
|
|
# Fill with zeros if the queue is empty
|
|
outdata.fill(0)
|
|
except Exception as e:
|
|
self.log.warning("[AUDIO STATUS]", status=status, time=time, frames=frames, e=e)
|
|
outdata.fill(0)
|
|
|
|
def sd_input_audio_callback(self, indata: np.ndarray, frames: int, time, status) -> None:
|
|
if status:
|
|
self.log.warning("[AUDIO STATUS]", status=status, time=time, frames=frames)
|
|
# FIXME on windows input overflows crashing the rx audio stream. Lets restart the server then
|
|
#if status.input_overflow:
|
|
# self.service_queue.put("restart")
|
|
return
|
|
try:
|
|
audio_48k = np.frombuffer(indata, dtype=np.int16)
|
|
audio_8k = self.resampler.resample48_to_8(audio_48k)
|
|
|
|
audio_8k_level_adjusted = audio.set_audio_volume(audio_8k, self.rx_audio_level)
|
|
|
|
if not self.states.isTransmitting():
|
|
audio.calculate_fft(audio_8k_level_adjusted, self.fft_queue, self.states)
|
|
|
|
length_audio_8k_level_adjusted = len(audio_8k_level_adjusted)
|
|
# Avoid buffer overflow by filling only if buffer for
|
|
# selected datachannel mode is not full
|
|
index = 0
|
|
for mode in self.demodulator.MODE_DICT:
|
|
mode_data = self.demodulator.MODE_DICT[mode]
|
|
audiobuffer = mode_data['audio_buffer']
|
|
decode = mode_data['decode']
|
|
index += 1
|
|
if audiobuffer:
|
|
if (audiobuffer.nbuffer + length_audio_8k_level_adjusted) > audiobuffer.size:
|
|
self.demodulator.buffer_overflow_counter[index] += 1
|
|
self.event_manager.send_buffer_overflow(self.demodulator.buffer_overflow_counter)
|
|
elif decode:
|
|
audiobuffer.push(audio_8k_level_adjusted)
|
|
except Exception as e:
|
|
self.log.warning("[AUDIO EXCEPTION]", status=status, time=time, frames=frames, e=e)
|
|
|