Eliminate the modem modoutqueue

This commit is contained in:
Pedro 2023-12-15 18:25:01 +01:00
parent 8b7f7c47aa
commit 1640f6c66e
2 changed files with 40 additions and 107 deletions

View file

@ -322,8 +322,8 @@ class Demodulator():
"fsk_ldpc1",
)
def on_audio_received(self, audio_in_48k):
x = np.frombuffer(audio_in_48k, dtype=np.int16)
def sd_input_audio_callback(self, indata: np.ndarray, frames: int, time, status) -> None:
x = np.frombuffer(indata, dtype=np.int16)
x = self.resampler.resample48_to_8(x)
x = audio.set_audio_volume(x, self.rx_audio_level)

View file

@ -65,13 +65,12 @@ class RF:
self.channel_busy_delay = 0
self.AUDIO_SAMPLE_RATE_RX = 48000
self.AUDIO_SAMPLE_RATE_TX = 48000
self.AUDIO_SAMPLE_RATE = 48000
self.MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
# 8192 Let's do some tests with very small chunks for TX
self.AUDIO_FRAMES_PER_BUFFER_TX = 1200 if self.radiocontrol in ["tci"] else 2400 * 2
# 8 * (self.AUDIO_SAMPLE_RATE_RX/self.MODEM_SAMPLE_RATE) == 48
# 8 * (self.AUDIO_SAMPLE_RATE/self.MODEM_SAMPLE_RATE) == 48
self.AUDIO_CHANNELS = 1
self.MODE = 0
@ -82,7 +81,7 @@ class RF:
self.rms_counter = 0
# Make sure our resampler will work
assert (self.AUDIO_SAMPLE_RATE_RX / self.MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48 # type: ignore
assert (self.AUDIO_SAMPLE_RATE / self.MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48 # type: ignore
self.modem_transmit_queue = queue.Queue()
self.modem_received_queue = queue.Queue()
@ -92,9 +91,6 @@ class RF:
self.data_queue_received = queue.Queue()
# Init FIFO queue to store modulation out in
self.modoutqueue = deque()
self.event_manager = event_manager.EventManager([event_queue])
self.fft_queue = fft_queue
@ -110,19 +106,10 @@ class RF:
self.log, self.modem_transmit_queue)
# --------------------------------------------------------------------------------------------------------
def tci_tx_callback(self) -> None:
"""
Callback for TCI TX
"""
while True:
threading.Event().wait(0.01)
if len(self.modoutqueue) > 0 and not self.mod_out_locked:
self.radio.set_ptt(True)
self.event_manager.send_ptt_change(True)
data_out = self.modoutqueue.popleft()
self.tci_module.push_audio(data_out)
def tci_tx_callback(self, audio_48k) -> None:
self.radio.set_ptt(True)
self.event_manager.send_ptt_change(True)
self.tci_module.push_audio(audio_48k)
def start_modem(self):
result = False
@ -131,7 +118,7 @@ class RF:
result = self.init_audio()
if not result:
raise RuntimeError("Unable to init audio devices")
self.demodulator.start(self.stream)
self.demodulator.start(self.sd_input_stream)
else:
result = self.init_tci()
@ -145,7 +132,7 @@ class RF:
# init data thread
self.init_data_threads()
atexit.register(self.stream.stop)
atexit.register(self.sd_input_stream.stop)
# init beacon
self.beacon.start()
@ -186,19 +173,23 @@ class RF:
self.log.info(f"[MDM] init: transmiting audio on '{out_dev_name}'")
self.log.debug("[MDM] init: starting pyaudio callback and decoding threads")
sd.default.samplerate = self.AUDIO_SAMPLE_RATE
sd.default.device = (in_dev_index, out_dev_index)
# init codec2 resampler
self.resampler = codec2.resampler()
# init audio stream
self.stream = sd.RawStream(
# SoundDevice audio input stream
self.sd_input_stream = sd.InputStream(
channels=1,
dtype="int16",
callback=self.callback,
device=(in_dev_index, out_dev_index),
samplerate=self.AUDIO_SAMPLE_RATE_RX,
callback=self.demodulator.sd_input_audio_callback,
device=in_dev_index,
samplerate=self.AUDIO_SAMPLE_RATE,
blocksize=4800,
)
self.stream.start()
self.sd_input_stream.start()
return True
@ -239,21 +230,6 @@ class RF:
)
tci_tx_callback_thread.start()
def mkfifo_write_callback(self) -> None:
"""Support testing by writing the audio data to a pipe."""
while True:
threading.Event().wait(0.01)
# -----write
if len(self.modoutqueue) > 0 and not self.mod_out_locked:
data_out48k = self.modoutqueue.popleft()
# print(len(data_out48k))
with open(TXCHANNEL, "wb") as fifo_write:
fifo_write.write(data_out48k)
fifo_write.flush()
fifo_write.flush()
# --------------------------------------------------------------------
def transmit(
self, mode, repeats: int, repeat_delay: int, frames: bytearray
@ -270,11 +246,11 @@ class RF:
self.demodulator.reset_data_sync()
# get freedv instance by mode
mode_transition = {
codec2.FREEDV_MODE.datac0: self.freedv_datac0_tx,
codec2.FREEDV_MODE.datac1: self.freedv_datac1_tx,
codec2.FREEDV_MODE.datac3: self.freedv_datac3_tx,
codec2.FREEDV_MODE.datac4: self.freedv_datac4_tx,
codec2.FREEDV_MODE.datac13: self.freedv_datac13_tx,
codec2.FREEDV_MODE.datac0.value: self.freedv_datac0_tx,
codec2.FREEDV_MODE.datac1.value: self.freedv_datac1_tx,
codec2.FREEDV_MODE.datac3.value: self.freedv_datac3_tx,
codec2.FREEDV_MODE.datac4.value: self.freedv_datac4_tx,
codec2.FREEDV_MODE.datac13.value: self.freedv_datac13_tx,
}
if mode in mode_transition:
freedv = mode_transition[mode]
@ -407,7 +383,7 @@ class RF:
# -------------------------------
# add modulation to modout_queue
self.enqueue_modulation(txbuffer_out)
self.transmit_audio(txbuffer_out)
# Release our mod_out_lock, so we can use the queue
self.mod_out_locked = False
@ -425,7 +401,7 @@ class RF:
# set tci timeout reached to True for overriding if not used
tci_timeout_reached = True
while self.modoutqueue or not tci_timeout_reached:
while not tci_timeout_reached:
if self.radiocontrol in ["tci"]:
if time.time() < timestamp_to_sleep:
tci_timeout_reached = False
@ -492,7 +468,7 @@ class RF:
txbuffer_out = cw.MorseCodePlayer().text_to_signal("DJ2LS-1")
self.mod_out_locked = True
self.enqueue_modulation(txbuffer_out)
self.transmit_audio(txbuffer_out)
self.mod_out_locked = False
# we need to wait manually for tci processing
@ -508,7 +484,7 @@ class RF:
# set tci timeout reached to True for overriding if not used
tci_timeout_reached = True
while self.modoutqueue or not tci_timeout_reached:
while not tci_timeout_reached:
if self.radiocontrol in ["tci"]:
if time.time() < timestamp_to_sleep:
tci_timeout_reached = False
@ -534,23 +510,6 @@ class RF:
transmission_time = end_of_transmission - start_of_transmission
self.log.debug("[MDM] ON AIR TIME", time=transmission_time)
def enqueue_modulation(self, txbuffer_out):
chunk_length = self.AUDIO_FRAMES_PER_BUFFER_TX # 4800
chunk = [
txbuffer_out[i: i + chunk_length]
for i in range(0, len(txbuffer_out), chunk_length)
]
for c in chunk:
# Pad the chunk, if needed
if len(c) < chunk_length:
delta = chunk_length - len(c)
delta_zeros = np.zeros(delta, dtype=np.int16)
c = np.append(c, delta_zeros)
# self.log.debug("[MDM] mod out shorter than audio buffer", delta=delta)
self.modoutqueue.append(c)
def init_codec2(self):
# Open codec2 instances
@ -574,43 +533,17 @@ class RF:
)
worker_transmit.start()
# Callback for the audio streaming devices
def callback(self, data_in48k, outdata, frames, time, status) -> None:
"""
Receive data into appropriate queue.
# Low level modem audio transmit
def transmit_audio(self, audio_48k) -> None:
self.radio.set_ptt(True)
self.event_manager.send_ptt_change(True)
self.calculate_fft(audio_48k)
Args:
data_in48k: Incoming data received
outdata: Container for the data returned
frames: Number of frames
time:
status:
"""
# self.log.debug("[MDM] callback")
try:
processed_audio_in = self.demodulator.on_audio_received(data_in48k)
if not self.modoutqueue or self.mod_out_locked:
data_out48k = np.zeros(frames, dtype=np.int16)
self.calculate_fft(processed_audio_in)
else:
# TODO Moved to this place for testing
# Maybe we can avoid moments of silence before transmitting
self.radio.set_ptt(True)
self.event_manager.send_ptt_change(True)
data_out48k = self.modoutqueue.popleft()
self.calculate_fft(data_out48k)
except Exception as e:
self.log.warning(f"[MDM] audio callback not ready yet: {e}")
try:
outdata[:] = data_out48k[:frames]
except IndexError as err:
self.log.debug(f"[MDM] callback writing error: IndexError: {err}")
# return (data_out48k, audio.pyaudio.paContinue)
if self.radiocontrol in ["tci"]:
self.tci_tx_callback(audio_48k)
else:
sd.play(audio_48k, blocking=True)
return
def worker_transmit(self) -> None:
"""Worker for FIFO queue for processing frames to be transmitted"""