Collapse repeated code to a loop.

Add function return types. Start using FREEDV_MODE enum in places where
a raw number or string were used.
This commit is contained in:
Paul Kronenwetter 2022-05-22 15:07:52 -04:00
parent e0f96ffabe
commit bcdc7193a5

View file

@ -45,7 +45,7 @@ RECEIVE_FSK_LDPC_1 = False
class RF:
"""Class to encapsulate interactions between the audio device and codec2"""
def __init__(self):
def __init__(self) -> None:
self.sampler_avg = 0
self.buffer_avg = 0
@ -332,7 +332,7 @@ class RF:
worker_transmit.start()
# --------------------------------------------------------------------------------------------------------
def mkfifo_read_callback(self):
def mkfifo_read_callback(self) -> None:
"""
Support testing by reading the audio data from a pipe and
depositing the data into the codec data buffers.
@ -355,6 +355,9 @@ class RF:
(self.datac0_buffer, True),
(self.datac1_buffer, RECEIVE_DATAC1),
(self.datac3_buffer, RECEIVE_DATAC3),
# Not enabled yet.
# (self.fsk_ldpc_buffer_0, static.ENABLE_FSK),
# (self.fsk_ldpc_buffer_1, static.ENABLE_FSK),
]:
if (
not data_buffer.nbuffer + length_x > data_buffer.size
@ -362,7 +365,7 @@ class RF:
):
data_buffer.push(x)
def mkfifo_write_callback(self):
def mkfifo_write_callback(self) -> None:
"""Support testing by writing the audio data to a pipe."""
while 1:
time.sleep(0.01)
@ -382,7 +385,7 @@ class RF:
fifo_write.flush()
# --------------------------------------------------------------------
def callback(self, data_in48k, outdata, frames, time, status):
def callback(self, data_in48k, outdata, frames, time, status) -> None:
"""
Receive data into appropriate queue.
@ -401,39 +404,20 @@ class RF:
# Avoid decoding when transmitting to reduce CPU
if not static.TRANSMITTING:
length_x = len(x)
# Avoid buffer overflow by filling only if buffer not full
if not self.datac0_buffer.nbuffer + length_x > self.datac0_buffer.size:
self.datac0_buffer.push(x)
else:
static.BUFFER_OVERFLOW_COUNTER[0] += 1
# Avoid buffer overflow by filling only if buffer for
# selected datachannel mode is not full
if self.datac1_buffer.nbuffer + length_x > self.datac1_buffer.size:
static.BUFFER_OVERFLOW_COUNTER[1] += 1
elif RECEIVE_DATAC1:
self.datac1_buffer.push(x)
# Avoid buffer overflow by filling only if buffer for
# selected datachannel mode is not full
if self.datac3_buffer.nbuffer + length_x > self.datac3_buffer.size:
static.BUFFER_OVERFLOW_COUNTER[2] += 1
elif RECEIVE_DATAC3:
self.datac3_buffer.push(x)
# Avoid buffer overflow by filling only if buffer for
# selected datachannel mode is not full
if self.fsk_ldpc_buffer_0.nbuffer + length_x > self.fsk_ldpc_buffer_0.size:
static.BUFFER_OVERFLOW_COUNTER[3] += 1
elif static.ENABLE_FSK:
self.fsk_ldpc_buffer_0.push(x)
# Avoid buffer overflow by filling only if buffer for
# selected datachannel mode is not full
if self.fsk_ldpc_buffer_1.nbuffer + length_x > self.fsk_ldpc_buffer_1.size:
static.BUFFER_OVERFLOW_COUNTER[4] += 1
elif RECEIVE_FSK_LDPC_1 and static.ENABLE_FSK:
self.fsk_ldpc_buffer_1.push(x)
for audiobuffer, receive, index in [
(self.datac0_buffer, True, 0),
(self.datac1_buffer, RECEIVE_DATAC1, 1),
(self.datac3_buffer, RECEIVE_DATAC3, 2),
(self.fsk_ldpc_buffer_0, static.ENABLE_FSK, 3),
(self.fsk_ldpc_buffer_1, static.ENABLE_FSK, 4),
]:
if audiobuffer.nbuffer + length_x > audiobuffer.size:
static.BUFFER_OVERFLOW_COUNTER[index] += 1
elif receive:
audiobuffer.push(x)
if len(self.modoutqueue) <= 0 or self.mod_out_locked:
# if not self.modoutqueue or self.mod_out_locked:
@ -451,7 +435,9 @@ class RF:
# return (data_out48k, audio.pyaudio.paContinue)
# --------------------------------------------------------------------
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
def transmit(
self, mode, repeats: int, repeat_delay: int, frames: bytearray
) -> None:
"""
Args:
@ -508,7 +494,10 @@ class RF:
for _ in range(repeats):
# codec2 fsk preamble may be broken -
# at least it sounds like that, so we are disabling it for testing
if self.MODE not in ["FSK_LDPC_0", "FSK_LDPC_1", 200, 201]:
if self.MODE not in [
codec2.FREEDV_MODE.fsk_ldpc_0.value,
codec2.FREEDV_MODE.fsk_ldpc_1.value,
]:
# Write preamble to txbuffer
codec2.api.freedv_rawdatapreambletx(freedv, mod_out_preamble)
txbuffer += bytes(mod_out_preamble)
@ -540,7 +529,10 @@ class RF:
# codec2 fsk postamble may be broken -
# at least it sounds like that, so we are disabling it for testing
if self.MODE not in ["FSK_LDPC_0", "FSK_LDPC_1", 200, 201]:
if self.MODE not in [
codec2.FREEDV_MODE.fsk_ldpc_0.value,
codec2.FREEDV_MODE.fsk_ldpc_1.value,
]:
# Write postamble to txbuffer
codec2.api.freedv_rawdatapostambletx(freedv, mod_out_postamble)
# Append postamble to txbuffer
@ -604,7 +596,7 @@ class RF:
freedv: ctypes.c_void_p,
bytes_out,
bytes_per_frame,
):
) -> int:
"""
De-modulate supplied audio stream with supplied codec2 instance.
Decoded audio is placed into `bytes_out`.
@ -641,7 +633,7 @@ class RF:
self.calculate_snr(freedv)
return nin
def audio_datac0(self):
def audio_datac0(self) -> None:
"""Receive data encoded with datac0"""
self.datac0_nin = self.demodulate_audio(
self.datac0_buffer,
@ -651,7 +643,7 @@ class RF:
self.datac0_bytes_per_frame,
)
def audio_datac1(self):
def audio_datac1(self) -> None:
"""Receive data encoded with datac1"""
self.datac1_nin = self.demodulate_audio(
self.datac1_buffer,
@ -661,7 +653,7 @@ class RF:
self.datac1_bytes_per_frame,
)
def audio_datac3(self):
def audio_datac3(self) -> None:
"""Receive data encoded with datac3"""
self.datac3_nin = self.demodulate_audio(
self.datac3_buffer,
@ -671,7 +663,7 @@ class RF:
self.datac3_bytes_per_frame,
)
def audio_fsk_ldpc_0(self):
def audio_fsk_ldpc_0(self) -> None:
"""Receive data encoded with FSK + LDPC0"""
self.fsk_ldpc_nin_0 = self.demodulate_audio(
self.fsk_ldpc_buffer_0,
@ -681,7 +673,7 @@ class RF:
self.fsk_ldpc_bytes_per_frame_0,
)
def audio_fsk_ldpc_1(self):
def audio_fsk_ldpc_1(self) -> None:
"""Receive data encoded with FSK + LDPC1"""
self.fsk_ldpc_nin_1 = self.demodulate_audio(
self.fsk_ldpc_buffer_1,
@ -691,7 +683,7 @@ class RF:
self.fsk_ldpc_bytes_per_frame_1,
)
def worker_transmit(self):
def worker_transmit(self) -> None:
"""Worker for FIFO queue for processing frames to be transmitted"""
while True:
data = self.modem_transmit_queue.get()
@ -704,7 +696,7 @@ class RF:
)
# self.modem_transmit_queue.task_done()
def worker_received(self):
def worker_received(self) -> None:
"""Worker for FIFO queue for processing received frames"""
while True:
data = self.modem_received_queue.get()
@ -734,7 +726,7 @@ class RF:
static.FREQ_OFFSET = offset
return offset
def get_scatter(self, freedv: ctypes.c_void_p):
def get_scatter(self, freedv: ctypes.c_void_p) -> None:
"""
Ask codec2 for data about the received signal and calculate the scatter plot.
Side-effect: sets static.SCATTER
@ -804,7 +796,7 @@ class RF:
static.SNR = 0
return static.SNR
def update_rig_data(self):
def update_rig_data(self) -> None:
"""
Request information about the current state of the radio via hamlib
Side-effect: sets
@ -818,7 +810,7 @@ class RF:
static.HAMLIB_MODE = self.hamlib.get_mode()
static.HAMLIB_BANDWIDTH = self.hamlib.get_bandwith()
def calculate_fft(self):
def calculate_fft(self) -> None:
"""
Calculate an average signal strength of the channel to assess
whether the channel is 'busy.'
@ -886,7 +878,7 @@ class RF:
# else 0
static.FFT = [0]
def set_frames_per_burst(self, frames_per_burst: int):
def set_frames_per_burst(self, frames_per_burst: int) -> None:
"""
Configure codec2 to send the configured number of frames per burst.
@ -902,7 +894,7 @@ class RF:
codec2.api.freedv_set_frames_per_burst(self.fsk_ldpc_freedv_0, frames_per_burst)
def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p:
def open_codec2_instance(mode: int) -> ctypes.c_void_p:
"""
Return a codec2 instance of the type `mode`
@ -911,7 +903,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p:
:return: C-function of the requested codec2 instance
:rtype: ctypes.c_void_p
"""
if mode in ["FSK_LDPC_0", 200]:
if mode in [codec2.FREEDV_MODE.fsk_ldpc_0.value]:
return ctypes.cast(
codec2.api.freedv_open_advanced(
codec2.api.FREEDV_MODE_FSK_LDPC,
@ -920,7 +912,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p:
ctypes.c_void_p,
)
if mode in ["FSK_LDPC_1", 201]:
if mode in [codec2.FREEDV_MODE.fsk_ldpc_1.value]:
return ctypes.cast(
codec2.api.freedv_open_advanced(
codec2.api.FREEDV_MODE_FSK_LDPC,
@ -932,7 +924,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p:
return ctypes.cast(codec2.api.freedv_open(mode), ctypes.c_void_p)
def get_bytes_per_frame(mode: Union[int, str]) -> int:
def get_bytes_per_frame(mode: int) -> int:
"""
Provide bytes per frame information for accessing from data handler
@ -947,13 +939,13 @@ def get_bytes_per_frame(mode: Union[int, str]) -> int:
return int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
def set_audio_volume(datalist: np.int16, volume: float) -> np.int16:
def set_audio_volume(datalist, volume: float) -> np.int16:
"""
Scale values for the provided audio samples by volume,
`volume` is clipped to the range of 0-100
:param datalist: Audio samples to scale
:type datalist: np.int16
:type datalist: NDArray[np.int16]
:param volume: Percentage (0-100) to scale samples
:type volume: float
:return: Scaled audio samples