From bcdc7193a56a662056284d0a5d99de36bcd6677e Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sun, 22 May 2022 15:07:52 -0400 Subject: [PATCH] Collapse repeated code to a loop. Add function return types. Start using FREEDV_MODE enum in places where a raw number or string were used. --- tnc/modem.py | 102 ++++++++++++++++++++++++--------------------------- 1 file changed, 47 insertions(+), 55 deletions(-) diff --git a/tnc/modem.py b/tnc/modem.py index 3211f605..a572799e 100644 --- a/tnc/modem.py +++ b/tnc/modem.py @@ -45,7 +45,7 @@ RECEIVE_FSK_LDPC_1 = False class RF: """Class to encapsulate interactions between the audio device and codec2""" - def __init__(self): + def __init__(self) -> None: self.sampler_avg = 0 self.buffer_avg = 0 @@ -332,7 +332,7 @@ class RF: worker_transmit.start() # -------------------------------------------------------------------------------------------------------- - def mkfifo_read_callback(self): + def mkfifo_read_callback(self) -> None: """ Support testing by reading the audio data from a pipe and depositing the data into the codec data buffers. @@ -355,6 +355,9 @@ class RF: (self.datac0_buffer, True), (self.datac1_buffer, RECEIVE_DATAC1), (self.datac3_buffer, RECEIVE_DATAC3), + # Not enabled yet. + # (self.fsk_ldpc_buffer_0, static.ENABLE_FSK), + # (self.fsk_ldpc_buffer_1, static.ENABLE_FSK), ]: if ( not data_buffer.nbuffer + length_x > data_buffer.size @@ -362,7 +365,7 @@ class RF: ): data_buffer.push(x) - def mkfifo_write_callback(self): + def mkfifo_write_callback(self) -> None: """Support testing by writing the audio data to a pipe.""" while 1: time.sleep(0.01) @@ -382,7 +385,7 @@ class RF: fifo_write.flush() # -------------------------------------------------------------------- - def callback(self, data_in48k, outdata, frames, time, status): + def callback(self, data_in48k, outdata, frames, time, status) -> None: """ Receive data into appropriate queue. @@ -401,39 +404,20 @@ class RF: # Avoid decoding when transmitting to reduce CPU if not static.TRANSMITTING: length_x = len(x) - # Avoid buffer overflow by filling only if buffer not full - if not self.datac0_buffer.nbuffer + length_x > self.datac0_buffer.size: - self.datac0_buffer.push(x) - else: - static.BUFFER_OVERFLOW_COUNTER[0] += 1 # Avoid buffer overflow by filling only if buffer for # selected datachannel mode is not full - if self.datac1_buffer.nbuffer + length_x > self.datac1_buffer.size: - static.BUFFER_OVERFLOW_COUNTER[1] += 1 - elif RECEIVE_DATAC1: - self.datac1_buffer.push(x) - - # Avoid buffer overflow by filling only if buffer for - # selected datachannel mode is not full - if self.datac3_buffer.nbuffer + length_x > self.datac3_buffer.size: - static.BUFFER_OVERFLOW_COUNTER[2] += 1 - elif RECEIVE_DATAC3: - self.datac3_buffer.push(x) - - # Avoid buffer overflow by filling only if buffer for - # selected datachannel mode is not full - if self.fsk_ldpc_buffer_0.nbuffer + length_x > self.fsk_ldpc_buffer_0.size: - static.BUFFER_OVERFLOW_COUNTER[3] += 1 - elif static.ENABLE_FSK: - self.fsk_ldpc_buffer_0.push(x) - - # Avoid buffer overflow by filling only if buffer for - # selected datachannel mode is not full - if self.fsk_ldpc_buffer_1.nbuffer + length_x > self.fsk_ldpc_buffer_1.size: - static.BUFFER_OVERFLOW_COUNTER[4] += 1 - elif RECEIVE_FSK_LDPC_1 and static.ENABLE_FSK: - self.fsk_ldpc_buffer_1.push(x) + for audiobuffer, receive, index in [ + (self.datac0_buffer, True, 0), + (self.datac1_buffer, RECEIVE_DATAC1, 1), + (self.datac3_buffer, RECEIVE_DATAC3, 2), + (self.fsk_ldpc_buffer_0, static.ENABLE_FSK, 3), + (self.fsk_ldpc_buffer_1, static.ENABLE_FSK, 4), + ]: + if audiobuffer.nbuffer + length_x > audiobuffer.size: + static.BUFFER_OVERFLOW_COUNTER[index] += 1 + elif receive: + audiobuffer.push(x) if len(self.modoutqueue) <= 0 or self.mod_out_locked: # if not self.modoutqueue or self.mod_out_locked: @@ -451,7 +435,9 @@ class RF: # return (data_out48k, audio.pyaudio.paContinue) # -------------------------------------------------------------------- - def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray): + def transmit( + self, mode, repeats: int, repeat_delay: int, frames: bytearray + ) -> None: """ Args: @@ -508,7 +494,10 @@ class RF: for _ in range(repeats): # codec2 fsk preamble may be broken - # at least it sounds like that, so we are disabling it for testing - if self.MODE not in ["FSK_LDPC_0", "FSK_LDPC_1", 200, 201]: + if self.MODE not in [ + codec2.FREEDV_MODE.fsk_ldpc_0.value, + codec2.FREEDV_MODE.fsk_ldpc_1.value, + ]: # Write preamble to txbuffer codec2.api.freedv_rawdatapreambletx(freedv, mod_out_preamble) txbuffer += bytes(mod_out_preamble) @@ -540,7 +529,10 @@ class RF: # codec2 fsk postamble may be broken - # at least it sounds like that, so we are disabling it for testing - if self.MODE not in ["FSK_LDPC_0", "FSK_LDPC_1", 200, 201]: + if self.MODE not in [ + codec2.FREEDV_MODE.fsk_ldpc_0.value, + codec2.FREEDV_MODE.fsk_ldpc_1.value, + ]: # Write postamble to txbuffer codec2.api.freedv_rawdatapostambletx(freedv, mod_out_postamble) # Append postamble to txbuffer @@ -604,7 +596,7 @@ class RF: freedv: ctypes.c_void_p, bytes_out, bytes_per_frame, - ): + ) -> int: """ De-modulate supplied audio stream with supplied codec2 instance. Decoded audio is placed into `bytes_out`. @@ -641,7 +633,7 @@ class RF: self.calculate_snr(freedv) return nin - def audio_datac0(self): + def audio_datac0(self) -> None: """Receive data encoded with datac0""" self.datac0_nin = self.demodulate_audio( self.datac0_buffer, @@ -651,7 +643,7 @@ class RF: self.datac0_bytes_per_frame, ) - def audio_datac1(self): + def audio_datac1(self) -> None: """Receive data encoded with datac1""" self.datac1_nin = self.demodulate_audio( self.datac1_buffer, @@ -661,7 +653,7 @@ class RF: self.datac1_bytes_per_frame, ) - def audio_datac3(self): + def audio_datac3(self) -> None: """Receive data encoded with datac3""" self.datac3_nin = self.demodulate_audio( self.datac3_buffer, @@ -671,7 +663,7 @@ class RF: self.datac3_bytes_per_frame, ) - def audio_fsk_ldpc_0(self): + def audio_fsk_ldpc_0(self) -> None: """Receive data encoded with FSK + LDPC0""" self.fsk_ldpc_nin_0 = self.demodulate_audio( self.fsk_ldpc_buffer_0, @@ -681,7 +673,7 @@ class RF: self.fsk_ldpc_bytes_per_frame_0, ) - def audio_fsk_ldpc_1(self): + def audio_fsk_ldpc_1(self) -> None: """Receive data encoded with FSK + LDPC1""" self.fsk_ldpc_nin_1 = self.demodulate_audio( self.fsk_ldpc_buffer_1, @@ -691,7 +683,7 @@ class RF: self.fsk_ldpc_bytes_per_frame_1, ) - def worker_transmit(self): + def worker_transmit(self) -> None: """Worker for FIFO queue for processing frames to be transmitted""" while True: data = self.modem_transmit_queue.get() @@ -704,7 +696,7 @@ class RF: ) # self.modem_transmit_queue.task_done() - def worker_received(self): + def worker_received(self) -> None: """Worker for FIFO queue for processing received frames""" while True: data = self.modem_received_queue.get() @@ -734,7 +726,7 @@ class RF: static.FREQ_OFFSET = offset return offset - def get_scatter(self, freedv: ctypes.c_void_p): + def get_scatter(self, freedv: ctypes.c_void_p) -> None: """ Ask codec2 for data about the received signal and calculate the scatter plot. Side-effect: sets static.SCATTER @@ -804,7 +796,7 @@ class RF: static.SNR = 0 return static.SNR - def update_rig_data(self): + def update_rig_data(self) -> None: """ Request information about the current state of the radio via hamlib Side-effect: sets @@ -818,7 +810,7 @@ class RF: static.HAMLIB_MODE = self.hamlib.get_mode() static.HAMLIB_BANDWIDTH = self.hamlib.get_bandwith() - def calculate_fft(self): + def calculate_fft(self) -> None: """ Calculate an average signal strength of the channel to assess whether the channel is 'busy.' @@ -886,7 +878,7 @@ class RF: # else 0 static.FFT = [0] - def set_frames_per_burst(self, frames_per_burst: int): + def set_frames_per_burst(self, frames_per_burst: int) -> None: """ Configure codec2 to send the configured number of frames per burst. @@ -902,7 +894,7 @@ class RF: codec2.api.freedv_set_frames_per_burst(self.fsk_ldpc_freedv_0, frames_per_burst) -def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p: +def open_codec2_instance(mode: int) -> ctypes.c_void_p: """ Return a codec2 instance of the type `mode` @@ -911,7 +903,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p: :return: C-function of the requested codec2 instance :rtype: ctypes.c_void_p """ - if mode in ["FSK_LDPC_0", 200]: + if mode in [codec2.FREEDV_MODE.fsk_ldpc_0.value]: return ctypes.cast( codec2.api.freedv_open_advanced( codec2.api.FREEDV_MODE_FSK_LDPC, @@ -920,7 +912,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p: ctypes.c_void_p, ) - if mode in ["FSK_LDPC_1", 201]: + if mode in [codec2.FREEDV_MODE.fsk_ldpc_1.value]: return ctypes.cast( codec2.api.freedv_open_advanced( codec2.api.FREEDV_MODE_FSK_LDPC, @@ -932,7 +924,7 @@ def open_codec2_instance(mode: Union[int, str]) -> ctypes.c_void_p: return ctypes.cast(codec2.api.freedv_open(mode), ctypes.c_void_p) -def get_bytes_per_frame(mode: Union[int, str]) -> int: +def get_bytes_per_frame(mode: int) -> int: """ Provide bytes per frame information for accessing from data handler @@ -947,13 +939,13 @@ def get_bytes_per_frame(mode: Union[int, str]) -> int: return int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8) -def set_audio_volume(datalist: np.int16, volume: float) -> np.int16: +def set_audio_volume(datalist, volume: float) -> np.int16: """ Scale values for the provided audio samples by volume, `volume` is clipped to the range of 0-100 :param datalist: Audio samples to scale - :type datalist: np.int16 + :type datalist: NDArray[np.int16] :param volume: Percentage (0-100) to scale samples :type volume: float :return: Scaled audio samples