From ee82cfe96751c3c6ebbac39b08d5f6447742f48e Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sat, 2 Jul 2022 16:14:05 -0400 Subject: [PATCH 1/9] Use dispatcher for command and received frame handling. --- tnc/data_handler.py | 241 +++++++++++++++++--------------------------- 1 file changed, 90 insertions(+), 151 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index b07081b1..66acb58f 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -126,6 +126,57 @@ class DATA: self.transmission_timeout = 360 # transmission timeout in seconds + # Dictionary of functions and log messages used in process_data + # instead of a long series of if-elif-else statements. + self.rx_dispatcher = { + FR_TYPE.ARQ_DC_OPEN_ACK_N.value: ( + self.arq_received_channel_is_open, + "ARQ OPEN ACK (Narrow)", + ), + FR_TYPE.ARQ_DC_OPEN_ACK_W.value: ( + self.arq_received_channel_is_open, + "ARQ OPEN ACK (Wide)", + ), + FR_TYPE.ARQ_DC_OPEN_N.value: ( + self.arq_received_data_channel_opener, + "ARQ Data Channel Open (Narrow)", + ), + FR_TYPE.ARQ_DC_OPEN_W.value: ( + self.arq_received_data_channel_opener, + "ARQ Data Channel Open (Wide)", + ), + FR_TYPE.ARQ_SESSION_CLOSE.value: ( + self.received_session_close, + "ARQ CLOSE SESSION", + ), + FR_TYPE.ARQ_SESSION_HB.value: ( + self.received_session_heartbeat, + "ARQ HEARTBEAT", + ), + FR_TYPE.ARQ_SESSION_OPEN.value: ( + self.received_session_opener, + "ARQ OPEN SESSION", + ), + FR_TYPE.ARQ_STOP.value: (self.received_stop_transmission, "ARQ STOP TX"), + FR_TYPE.BEACON.value: (self.received_beacon, "BEACON"), + FR_TYPE.BURST_ACK.value: (self.burst_ack_nack_received, "BURST ACK"), + FR_TYPE.BURST_NACK.value: (self.burst_ack_nack_received, "BURST NACK"), + FR_TYPE.CQ.value: (self.received_cq, "CQ"), + FR_TYPE.FR_ACK.value: (self.frame_ack_received, "FRAME ACK"), + FR_TYPE.FR_NACK.value: (self.frame_nack_received, "FRAME NACK"), + FR_TYPE.FR_REPEAT.value: (self.burst_rpt_received, "REPEAT REQUEST"), + FR_TYPE.PING_ACK.value: (self.received_ping_ack, "PING ACK"), + FR_TYPE.PING.value: (self.received_ping, "PING"), + FR_TYPE.QRV.value: (self.received_qrv, "QRV"), + } + self.command_dispatcher = { + "CONNECT": (self.arq_session_handler, "CONNECT"), + "CQ": (self.transmit_cq, "CQ"), + "DISCONNECT": (self.close_session, "DISCONNECT"), + "SEND_TEST_FRAME": (self.send_test_frame, "TEST"), + "STOP": (self.stop_transmission, "STOP"), + } + # Start worker and watchdog threads worker_thread_transmit = threading.Thread( target=self.worker_transmit, name="worker thread transmit", daemon=True @@ -159,13 +210,12 @@ class DATA: while True: data = self.data_queue_transmit.get() - # [0] == Command - if data[0] == "CQ": - self.transmit_cq() - - elif data[0] == "STOP": - self.stop_transmission() + # Dispatch commands known to command_dispatcher + if data[0] in self.command_dispatcher: + self.log.debug(f"[TNC] TX {self.command_dispatcher[data[0]][1]}...") + self.command_dispatcher[data[0]][0]() + # Dispatch commands that need more arguments. elif data[0] == "PING": # [1] dxcallsign self.transmit_ping(data[1]) @@ -187,18 +237,6 @@ class DATA: # [5] mycallsign with ssid self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5]) - elif data[0] == "CONNECT": - # [1] DX CALLSIGN - # self.arq_session_handler(data[1]) - self.arq_session_handler() - - elif data[0] == "DISCONNECT": - # [1] DX CALLSIGN - self.close_session() - - elif data[0] == "SEND_TEST_FRAME": - # [1] DX CALLSIGN - self.send_test_frame() else: self.log.error( "[TNC] worker_transmit: received invalid command:", data=data @@ -254,7 +292,14 @@ class DATA: frame = frametype - 10 n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big") - if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: + # Dispatch activity based on received frametype + if frametype in self.rx_dispatcher: + # Process frames "known" by rx_dispatcher + self.log.debug(f"[TNC] {self.rx_dispatcher[frametype][1]} RECEIVED....") + self.rx_dispatcher[frametype][0](bytes_out[:-2]) + + # Process frametypes requiring a different set of arguments. + elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: # get snr of received data # FIXME: find a fix for this - after moving to classes, this no longer works # snr = self.calculate_snr(freedv) @@ -270,92 +315,6 @@ class DATA: # self.log.debug(f"[TNC] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") # self.c_lib.freedv_set_sync(freedv, 0) - # BURST ACK - elif frametype == FR_TYPE.BURST_ACK.value: - self.log.debug("[TNC] BURST ACK RECEIVED....") - self.burst_ack_received(bytes_out[:-2]) - - # FRAME ACK - elif frametype == FR_TYPE.FR_ACK.value: - self.log.debug("[TNC] FRAME ACK RECEIVED....") - self.frame_ack_received() - - # FRAME RPT - elif frametype == FR_TYPE.FR_REPEAT.value: - self.log.debug("[TNC] REPEAT REQUEST RECEIVED....") - self.burst_rpt_received(bytes_out[:-2]) - - # FRAME NACK - elif frametype == FR_TYPE.FR_NACK.value: - self.log.debug("[TNC] FRAME NACK RECEIVED....") - self.frame_nack_received(bytes_out[:-2]) - - # BURST NACK - elif frametype == FR_TYPE.BURST_NACK.value: - self.log.debug("[TNC] BURST NACK RECEIVED....") - self.burst_nack_received(bytes_out[:-2]) - - # CQ FRAME - elif frametype == FR_TYPE.CQ.value: - self.log.debug("[TNC] CQ RECEIVED....") - self.received_cq(bytes_out[:-2]) - - # QRV FRAME - elif frametype == FR_TYPE.QRV.value: - self.log.debug("[TNC] QRV RECEIVED....") - self.received_qrv(bytes_out[:-2]) - - # PING FRAME - elif frametype == FR_TYPE.PING.value: - self.log.debug("[TNC] PING RECEIVED....") - self.received_ping(bytes_out[:-2]) - - # PING ACK - elif frametype == FR_TYPE.PING_ACK.value: - self.log.debug("[TNC] PING ACK RECEIVED....") - self.received_ping_ack(bytes_out[:-2]) - - # SESSION OPENER - elif frametype == FR_TYPE.ARQ_SESSION_OPEN.value: - self.log.debug("[TNC] OPEN SESSION RECEIVED....") - self.received_session_opener(bytes_out[:-2]) - - # SESSION HEARTBEAT - elif frametype == FR_TYPE.ARQ_SESSION_HB.value: - self.log.debug("[TNC] SESSION HEARTBEAT RECEIVED....") - self.received_session_heartbeat(bytes_out[:-2]) - - # SESSION CLOSE - elif frametype == FR_TYPE.ARQ_SESSION_CLOSE.value: - self.log.debug("[TNC] CLOSE ARQ SESSION RECEIVED....") - self.received_session_close(bytes_out[:-2]) - - # ARQ FILE TRANSFER RECEIVED! - elif frametype in [ - FR_TYPE.ARQ_DC_OPEN_W.value, - FR_TYPE.ARQ_DC_OPEN_N.value, - ]: - self.log.debug("[TNC] ARQ arq_received_data_channel_opener") - self.arq_received_data_channel_opener(bytes_out[:-2]) - - # ARQ CHANNEL IS OPENED - elif frametype in [ - FR_TYPE.ARQ_DC_OPEN_ACK_W.value, - FR_TYPE.ARQ_DC_OPEN_ACK_N.value, - ]: - self.log.debug("[TNC] ARQ arq_received_channel_is_open") - self.arq_received_channel_is_open(bytes_out[:-2]) - - # ARQ STOP TRANSMISSION - elif frametype == FR_TYPE.ARQ_STOP.value: - self.log.debug("[TNC] ARQ received stop transmission") - self.received_stop_transmission() - - # this is outdated and we may remove it - elif frametype == FR_TYPE.BEACON.value: - self.log.debug("[TNC] BEACON RECEIVED") - self.received_beacon(bytes_out[:-2]) - # TESTFRAMES elif frametype == FR_TYPE.TEST_FRAME.value: self.log.debug("[TNC] TESTFRAME RECEIVED", frame=bytes_out[:]) @@ -1106,10 +1065,9 @@ class DATA: self.log.debug("[TNC] TESTMODE: arq_transmit exiting.") sys.exit(0) - # signalling frames received - def burst_ack_received(self, data_in: bytes): + def burst_ack_nack_received(self, data_in: bytes) -> None: """ - Received a ACK for a transmitted frame, keep track and + Received a ACK/NACK for a transmitted frame, keep track and make adjustments to speed level if needed. Args: @@ -1118,9 +1076,6 @@ class DATA: Returns: """ - # Increase speed level if we received a burst ack - # self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1) - # Process data only if we are in ARQ and BUSY state if static.ARQ_STATE: helpers.add_to_heard_stations( @@ -1131,62 +1086,42 @@ class DATA: static.FREQ_OFFSET, static.HAMLIB_FREQUENCY, ) + + frametype = int.from_bytes(bytes(data_in[:1]), "big") + desc = "ack" + if frametype == FR_TYPE.BURST_ACK.value: + # Increase speed level if we received a burst ack + # self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1) # Force data retry loops of TX TNC to stop and continue with next frame self.burst_ack = True - # Update data_channel timestamp - self.data_channel_last_received = int(time.time()) - self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big") - self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big") - static.ARQ_SPEED_LEVEL = self.speed_level - self.log.debug( - "[TNC] burst_ack_received:", - speed_level=self.speed_level, - c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name, - ) - # Reset burst nack counter self.burst_nack_counter = 0 # Reset n retries per burst counter self.n_retries_per_burst = 0 - - # signalling frames received - def burst_nack_received(self, data_in: bytes): - """ - Received a NACK for a transmitted frame, keep track and - make adjustments to speed level if needed. - - Args: - data_in:bytes: - - """ + else: # Decrease speed level if we received a burst nack # self.speed_level = max(self.speed_level - 1, 0) - - # only process data if we are in ARQ and BUSY state - if static.ARQ_STATE: - helpers.add_to_heard_stations( - static.DXCALLSIGN, - static.DXGRID, - "DATA-CHANNEL", - static.SNR, - static.FREQ_OFFSET, - static.HAMLIB_FREQUENCY, - ) - # Force data loops of TNC to stop and continue with next frame + # Set flag to retry frame again. self.burst_nack = True + # Increment burst nack counter + self.burst_nack_counter += 1 + desc = "nack" + # Update data_channel timestamp self.data_channel_last_received = int(time.time()) self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big") self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big") static.ARQ_SPEED_LEVEL = self.speed_level - self.burst_nack_counter += 1 + self.log.debug( - "[TNC] burst_nack_received:", + f"[TNC] burst_{desc}_received:", speed_level=self.speed_level, c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name, ) - def frame_ack_received(self): + def frame_ack_received( + self, data_in: bytes # pylint: disable=unused-argument + ) -> None: """Received an ACK for a transmitted frame""" # Process data only if we are in ARQ and BUSY state if static.ARQ_STATE: @@ -1204,9 +1139,11 @@ class DATA: self.data_channel_last_received = int(time.time()) self.arq_session_last_received = int(time.time()) - def frame_nack_received(self, data_in: bytes): # pylint: disable=unused-argument + def frame_nack_received( + self, data_in: bytes # pylint: disable=unused-argument + ) -> None: """ - Received a NACK for a transmitted framt + Received a NACK for a transmitted frame Args: data_in:bytes: @@ -1990,7 +1927,9 @@ class DATA: ) self.arq_cleanup() - def received_stop_transmission(self) -> None: + def received_stop_transmission( + self, data_in: bytes + ) -> None: # pylint: disable=unused-argument """ Received a transmission stop """ From 06c11e0537626d2bf082ac379afea15cb80e8f04 Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sat, 2 Jul 2022 16:19:33 -0400 Subject: [PATCH 2/9] Remove unnecessary callsign checks. Add notes. --- tnc/data_handler.py | 39 ++++++++++++++++++--------------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index 66acb58f..0abfb9b8 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -481,19 +481,17 @@ class DATA: Returns: """ + # We've arrived here from process_data which already checked that the frame + # is intended for this station. data_in = bytes(data_in) # get received crc for different mycall ssids self.received_mycall_crc = data_in[2:5] # check if callsign ssid override - _valid, mycallsign = helpers.check_callsign( + _, mycallsign = helpers.check_callsign( self.mycallsign, self.received_mycall_crc ) - if not _valid: - # ARQ data packet not for me. - self.arq_cleanup() - return # only process data if we are in ARQ and BUSY state else return to quit if not static.ARQ_STATE and static.TNC_STATE != "BUSY": @@ -703,14 +701,11 @@ class DATA: self.transmission_uuid = str(uuid.uuid4()) timestamp = int(time.time()) - # check if callsign ssid override - _valid, mycallsign = helpers.check_callsign( - self.mycallsign, self.received_mycall_crc - ) - if not _valid: - # ARQ data packet not for me. - self.arq_cleanup() - return + # This shouldn't be needed - it's checked at the start of arq_data_received. + # # check if callsign ssid override + # _, mycallsign = helpers.check_callsign( + # self.mycallsign, self.received_mycall_crc + # ) # Re-code data_frame in base64, UTF-8 for JSON UI communication. base64_data = base64.b64encode(data_frame).decode("UTF-8") @@ -756,7 +751,6 @@ class DATA: ) else: - self.send_data_to_socket_queue( freedata="tnc-message", arq="transmission", @@ -1381,7 +1375,9 @@ class DATA: data_in:bytes: """ - # Close the session if the DXCALLSIGN_CRC matches the station in static. + # We've arrived here from process_data which already checked that the frame + # is intended for this station. + # Close the session if the CRC matches the remote station in static. _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) if _valid_crc: static.ARQ_SESSION_STATE = "disconnected" @@ -1614,6 +1610,8 @@ class DATA: data_in:bytes: """ + # We've arrived here from process_data which already checked that the frame + # is intended for this station. self.arq_file_transfer = True self.is_IRS = True self.send_data_to_socket_queue( @@ -1621,10 +1619,13 @@ class DATA: arq="transmission", status="opening", ) + static.DXCALLSIGN_CRC = bytes(data_in[4:7]) static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) - n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big") + # n_frames_per_burst is currently unused + # n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big") + frametype = int.from_bytes(bytes(data_in[:1]), "big") # check if we received low bandwidth mode if frametype == FR_TYPE.ARQ_DC_OPEN_W.value: @@ -1650,11 +1651,7 @@ class DATA: ) # check if callsign ssid override - valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4]) - if not valid: - # ARQ connect packet not for me. - self.arq_cleanup() - return + _, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4]) self.log.info( "[TNC] ARQ | DATA | RX | [" From 953e981e186c98a751c425e1b56a433f71ec0cc5 Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sat, 2 Jul 2022 16:20:14 -0400 Subject: [PATCH 3/9] Avoid setting global / class state for unrelated frame. --- tnc/data_handler.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index 0abfb9b8..953aec1c 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -1813,10 +1813,10 @@ class DATA: data_in:bytes: """ - static.DXCALLSIGN_CRC = bytes(data_in[4:7]) - static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) + dxcallsign_CRC = bytes(data_in[4:7]) + dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13])) helpers.add_to_heard_stations( - static.DXCALLSIGN, + dxcallsign, static.DXGRID, "PING", static.SNR, @@ -1830,7 +1830,7 @@ class DATA: uuid=str(uuid.uuid4()), timestamp=int(time.time()), mycallsign=str(self.mycallsign, "UTF-8"), - dxcallsign=str(static.DXCALLSIGN, "UTF-8"), + dxcallsign=str(dxcallsign, "UTF-8"), dxgrid=str(static.DXGRID, "UTF-8"), snr=str(static.SNR), ) @@ -1841,6 +1841,8 @@ class DATA: self.log.debug("[TNC] received_ping: ping not for this station.") return + static.DXCALLSIGN_CRC = dxcallsign_CRC + static.DXCALLSIGN = dxcallsign self.log.info( "[TNC] PING REQ [" + str(mycallsign, "UTF-8") From 9b7056a774411e824fd552c19d4964ad42995374 Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sat, 2 Jul 2022 16:20:51 -0400 Subject: [PATCH 4/9] Misc additions. --- tnc/data_handler.py | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index 953aec1c..ac9c8daa 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -56,7 +56,7 @@ class DATA: self.transmission_uuid = "" # Received my callsign crc if we received a crc for another ssid - self.received_mycall_crc = b"" + self.received_mycall_crc = b"" # Does this need to be a class variable? self.data_channel_last_received = 0.0 # time of last "live sign" of a frame self.burst_ack_snr = 0 # SNR from received burst ack frames @@ -321,7 +321,9 @@ class DATA: # Unknown frame type else: - self.log.warning("[TNC] ARQ - other frame type", frametype=frametype) + self.log.warning( + "[TNC] ARQ - other frame type", frametype=FR_TYPE(frametype).name + ) else: # for debugging purposes to receive all data @@ -895,9 +897,8 @@ class DATA: # TX_N_FRAMES_PER_BURST = 1 is working arqheader = bytearray() - arqheader[:1] = bytes( - [FR_TYPE.BURST_01.value] - ) # bytes([FRAME_TYPE.BURST_01.value + i]) + # arqheader[:1] = bytes([FR_TYPE.BURST_01.value + i]) + arqheader[:1] = bytes([FR_TYPE.BURST_01.value]) arqheader[1:2] = bytes([TX_N_FRAMES_PER_BURST]) arqheader[2:5] = static.DXCALLSIGN_CRC arqheader[5:8] = static.MYCALLSIGN_CRC @@ -1086,17 +1087,17 @@ class DATA: if frametype == FR_TYPE.BURST_ACK.value: # Increase speed level if we received a burst ack # self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1) - # Force data retry loops of TX TNC to stop and continue with next frame - self.burst_ack = True - # Reset burst nack counter - self.burst_nack_counter = 0 - # Reset n retries per burst counter - self.n_retries_per_burst = 0 + # Force data retry loops of TX TNC to stop and continue with next frame + self.burst_ack = True + # Reset burst nack counter + self.burst_nack_counter = 0 + # Reset n retries per burst counter + self.n_retries_per_burst = 0 else: - # Decrease speed level if we received a burst nack - # self.speed_level = max(self.speed_level - 1, 0) + # Decrease speed level if we received a burst nack + # self.speed_level = max(self.speed_level - 1, 0) # Set flag to retry frame again. - self.burst_nack = True + self.burst_nack = True # Increment burst nack counter self.burst_nack_counter += 1 desc = "nack" From 7f649f98dfa6bab33a5d87259dbab733bb44c41a Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sun, 3 Jul 2022 13:41:06 -0400 Subject: [PATCH 5/9] Extract queues from data_handler and modem. Eliminates a cyclical import identified by pylint. --- tnc/data_handler.py | 5 +---- tnc/modem.py | 8 ++------ tnc/queues.py | 11 +++++++++++ tnc/sock.py | 20 ++++++++++---------- 4 files changed, 24 insertions(+), 20 deletions(-) create mode 100644 tnc/queues.py diff --git a/tnc/data_handler.py b/tnc/data_handler.py index ac9c8daa..c73d0c80 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -8,7 +8,6 @@ Created on Sun Dec 27 20:43:40 2020 # pylint: disable=import-outside-toplevel, attribute-defined-outside-init import base64 -import queue import sys import threading import time @@ -26,13 +25,11 @@ import structlog import ujson as json from codec2 import FREEDV_MODE from exceptions import NoCallsign +from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT from static import FRAME_TYPE as FR_TYPE TESTMODE = False -DATA_QUEUE_TRANSMIT = queue.Queue() -DATA_QUEUE_RECEIVED = queue.Queue() - class DATA: """Terminal Node Controller for FreeDATA""" diff --git a/tnc/modem.py b/tnc/modem.py index 551b0158..7edaffe6 100644 --- a/tnc/modem.py +++ b/tnc/modem.py @@ -11,28 +11,24 @@ Created on Wed Dec 23 07:04:24 2020 import atexit import ctypes import os -import queue import sys import threading import time from collections import deque import codec2 -import data_handler import numpy as np import sock import sounddevice as sd import static import structlog import ujson as json +from queues import DATA_QUEUE_RECEIVED, MODEM_RECEIVED_QUEUE, MODEM_TRANSMIT_QUEUE TESTMODE = False RXCHANNEL = "" TXCHANNEL = "" -# Initialize FIFO queue to store received frames -MODEM_RECEIVED_QUEUE = queue.Queue() -MODEM_TRANSMIT_QUEUE = queue.Queue() static.TRANSMITTING = False # Receive only specific modes to reduce CPU load @@ -690,7 +686,7 @@ class RF: # data[0] = bytes_out # data[1] = freedv session # data[2] = bytes_per_frame - data_handler.DATA_QUEUE_RECEIVED.put([data[0], data[1], data[2]]) + DATA_QUEUE_RECEIVED.put([data[0], data[1], data[2]]) self.modem_received_queue.task_done() def get_frequency_offset(self, freedv: ctypes.c_void_p) -> float: diff --git a/tnc/queues.py b/tnc/queues.py new file mode 100644 index 00000000..259218f6 --- /dev/null +++ b/tnc/queues.py @@ -0,0 +1,11 @@ +""" +Hold queues used by more than one module to eliminate cyclic imports. +""" +import queue + +DATA_QUEUE_TRANSMIT = queue.Queue() +DATA_QUEUE_RECEIVED = queue.Queue() + +# Initialize FIFO queue to store received frames +MODEM_RECEIVED_QUEUE = queue.Queue() +MODEM_TRANSMIT_QUEUE = queue.Queue() diff --git a/tnc/sock.py b/tnc/sock.py index 79cc2741..f62a321f 100644 --- a/tnc/sock.py +++ b/tnc/sock.py @@ -25,12 +25,12 @@ import sys import threading import time -import data_handler import helpers import static import structlog import ujson as json from exceptions import NoCallsign +from queues import DATA_QUEUE_TRANSMIT SOCKET_QUEUE = queue.Queue() DAEMON_QUEUE = queue.Queue() @@ -227,7 +227,7 @@ def process_tnc_commands(data): and received_json["command"] == "send_test_frame" ): try: - data_handler.DATA_QUEUE_TRANSMIT.put(["SEND_TEST_FRAME"]) + DATA_QUEUE_TRANSMIT.put(["SEND_TEST_FRAME"]) command_response("send_test_frame", True) except Exception as err: command_response("send_test_frame", False) @@ -240,7 +240,7 @@ def process_tnc_commands(data): # CQ CQ CQ ----------------------------------------------------- if received_json["command"] == "cqcqcq": try: - data_handler.DATA_QUEUE_TRANSMIT.put(["CQ"]) + DATA_QUEUE_TRANSMIT.put(["CQ"]) command_response("cqcqcq", True) except Exception as err: @@ -254,7 +254,7 @@ def process_tnc_commands(data): try: static.BEACON_STATE = True interval = int(received_json["parameter"]) - data_handler.DATA_QUEUE_TRANSMIT.put(["BEACON", interval, True]) + DATA_QUEUE_TRANSMIT.put(["BEACON", interval, True]) command_response("start_beacon", True) except Exception as err: command_response("start_beacon", False) @@ -269,7 +269,7 @@ def process_tnc_commands(data): try: log.warning("[SCK] Stopping beacon!") static.BEACON_STATE = False - data_handler.DATA_QUEUE_TRANSMIT.put(["BEACON", None, False]) + DATA_QUEUE_TRANSMIT.put(["BEACON", None, False]) command_response("stop_beacon", True) except Exception as err: command_response("stop_beacon", False) @@ -293,7 +293,7 @@ def process_tnc_commands(data): dxcallsign = helpers.callsign_to_bytes(dxcallsign) dxcallsign = helpers.bytes_to_callsign(dxcallsign) - data_handler.DATA_QUEUE_TRANSMIT.put(["PING", dxcallsign]) + DATA_QUEUE_TRANSMIT.put(["PING", dxcallsign]) command_response("ping", True) except NoCallsign: command_response("ping", False) @@ -320,7 +320,7 @@ def process_tnc_commands(data): static.DXCALLSIGN = dxcallsign static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) - data_handler.DATA_QUEUE_TRANSMIT.put(["CONNECT", dxcallsign]) + DATA_QUEUE_TRANSMIT.put(["CONNECT", dxcallsign]) command_response("connect", True) except Exception as err: command_response("connect", False) @@ -334,7 +334,7 @@ def process_tnc_commands(data): if received_json["type"] == "arq" and received_json["command"] == "disconnect": # send ping frame and wait for ACK try: - data_handler.DATA_QUEUE_TRANSMIT.put(["DISCONNECT"]) + DATA_QUEUE_TRANSMIT.put(["DISCONNECT"]) command_response("disconnect", True) except Exception as err: command_response("disconnect", False) @@ -383,7 +383,7 @@ def process_tnc_commands(data): binarydata = base64.b64decode(base64data) - data_handler.DATA_QUEUE_TRANSMIT.put( + DATA_QUEUE_TRANSMIT.put( ["ARQ_RAW", binarydata, mode, n_frames, arq_uuid, mycallsign] ) @@ -402,7 +402,7 @@ def process_tnc_commands(data): ): try: if static.TNC_STATE == "BUSY" or static.ARQ_STATE: - data_handler.DATA_QUEUE_TRANSMIT.put(["STOP"]) + DATA_QUEUE_TRANSMIT.put(["STOP"]) log.warning("[SCK] Stopping transmission!") static.TNC_STATE = "IDLE" static.ARQ_STATE = False From 361a67b67f7e68c19ee6ca95007b0189397996c2 Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sun, 3 Jul 2022 13:41:34 -0400 Subject: [PATCH 6/9] Remove unused imports --- test/util_datac0.py | 1 - test/util_datac0_negative.py | 1 - 2 files changed, 2 deletions(-) diff --git a/test/util_datac0.py b/test/util_datac0.py index 9d0e5504..9301759a 100644 --- a/test/util_datac0.py +++ b/test/util_datac0.py @@ -23,7 +23,6 @@ import modem import sock import static import structlog -from codec2 import FREEDV_MODE from static import FRAME_TYPE as FR_TYPE diff --git a/test/util_datac0_negative.py b/test/util_datac0_negative.py index 0099bef0..aaf74e0d 100644 --- a/test/util_datac0_negative.py +++ b/test/util_datac0_negative.py @@ -17,7 +17,6 @@ import modem import sock import static import structlog -from codec2 import FREEDV_MODE from static import FRAME_TYPE as FR_TYPE From 4a50c9c9d070ee926d30592b79a8685735264c5a Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Sun, 3 Jul 2022 14:06:01 -0400 Subject: [PATCH 7/9] Remove compression factor calculation. This is done in arq_transmit and should not be needed here. --- tnc/data_handler.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index c73d0c80..bef0d76d 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -1490,7 +1490,7 @@ class DATA: # we need to compress data for gettin a compression factor. # so we are compressing twice. This is not that nice and maybe there is another way # for calculating transmission statistics - static.ARQ_COMPRESSION_FACTOR = len(data_out) / len(zlib.compress(data_out)) + # static.ARQ_COMPRESSION_FACTOR = len(data_out) / len(zlib.compress(data_out)) self.arq_open_data_channel(mode, n_frames_per_burst, mycallsign) @@ -1777,6 +1777,7 @@ class DATA: """ if not str(dxcallsign).strip(): + # TODO: We should display a message to this effect on the UI. self.log.warning("[TNC] Missing required callsign", dxcallsign=dxcallsign) return static.DXCALLSIGN = dxcallsign From cd00eaa8519269d373b48d849502a89a4f2f5a8f Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Mon, 4 Jul 2022 17:09:08 -0400 Subject: [PATCH 8/9] Restore Python 3.6 compatibility --- test/test_chat_text.py | 4 +++- test/test_datac0.py | 4 +++- test/test_datac0_negative.py | 4 +++- test/test_highsnr_stdio_C_P_datacx.py | 6 ++++-- test/test_highsnr_stdio_P_C_datacx.py | 6 ++++-- test/test_highsnr_stdio_P_P_datacx.py | 6 ++++-- test/test_highsnr_stdio_P_P_multi.py | 6 ++++-- test/test_tnc_states.py | 4 ++-- tnc/data_handler.py | 2 +- 9 files changed, 28 insertions(+), 14 deletions(-) diff --git a/test/test_chat_text.py b/test/test_chat_text.py index c5db2c8d..ae798eb1 100644 --- a/test/test_chat_text.py +++ b/test/test_chat_text.py @@ -221,7 +221,9 @@ def test_chat_text( for p_item in proc: assert p_item.exitcode == 0 - p_item.close() + # p_item.close() # Python 3.7+ only + p_item.terminate() + p_item.join() analyze_results(s1_data, s2_data, STATIONS) diff --git a/test/test_datac0.py b/test/test_datac0.py index 4721ed5c..9ff94392 100644 --- a/test/test_datac0.py +++ b/test/test_datac0.py @@ -275,7 +275,9 @@ def test_datac0(frame_type: str, tmp_path): for p_item in proc: assert p_item.exitcode == 0 - p_item.close() + # p_item.close() # Python 3.7+ only + p_item.terminate() + p_item.join() analyze_results(s1_data, s2_data, STATIONS) diff --git a/test/test_datac0_negative.py b/test/test_datac0_negative.py index c23a0a82..d90fb6f0 100644 --- a/test/test_datac0_negative.py +++ b/test/test_datac0_negative.py @@ -242,7 +242,9 @@ def test_datac0_negative(frame_type: str, tmp_path): for p_item in proc: assert p_item.exitcode == 0 - p_item.close() + # p_item.close() # Python 3.7+ only + p_item.terminate() + p_item.join() analyze_results(s1_data, s2_data, STATIONS) diff --git a/test/test_highsnr_stdio_C_P_datacx.py b/test/test_highsnr_stdio_C_P_datacx.py index 058074a1..21c9c709 100644 --- a/test/test_highsnr_stdio_C_P_datacx.py +++ b/test/test_highsnr_stdio_C_P_datacx.py @@ -75,7 +75,7 @@ def t_HighSNR_C_P_DATACx( tx_side = path break - print(f"{tx_side=} / {rx_side=}") + print(f"tx_side={tx_side} / rx_side={rx_side}") with subprocess.Popen( args=[ @@ -178,7 +178,9 @@ def test_HighSNR_C_P_DATACx( proc.terminate() assert proc.exitcode == 0 - proc.close() + # proc.close() # Python 3.7+ only + proc.terminate() + proc.join() if __name__ == "__main__": diff --git a/test/test_highsnr_stdio_P_C_datacx.py b/test/test_highsnr_stdio_P_C_datacx.py index 3ba5f222..f213def4 100644 --- a/test/test_highsnr_stdio_P_C_datacx.py +++ b/test/test_highsnr_stdio_P_C_datacx.py @@ -73,7 +73,7 @@ def t_HighSNR_P_C_DATACx(bursts: int, frames_per_burst: int, mode: str): tx_side = os.path.join("test", tx_side) os.environ["PYTHONPATH"] += ":." - print(f"{tx_side=} / {rx_side=}") + print(f"tx_side={tx_side} / rx_side={rx_side}") with subprocess.Popen( args=[ @@ -178,7 +178,9 @@ def test_HighSNR_P_C_DATACx(bursts: int, frames_per_burst: int, mode: str): proc.terminate() assert proc.exitcode == 0 - proc.close() + # proc.close() # Python 3.7+ only + proc.terminate() + proc.join() if __name__ == "__main__": diff --git a/test/test_highsnr_stdio_P_P_datacx.py b/test/test_highsnr_stdio_P_P_datacx.py index c430e96f..78aaff32 100644 --- a/test/test_highsnr_stdio_P_P_datacx.py +++ b/test/test_highsnr_stdio_P_P_datacx.py @@ -61,7 +61,7 @@ def t_HighSNR_P_P_DATACx(bursts: int, frames_per_burst: int, mode: str): rx_side = os.path.join("test", rx_side) os.environ["PYTHONPATH"] += ":." - print(f"{tx_side=} / {rx_side=}") + print(f"tx_side={tx_side} / rx_side={rx_side}") with subprocess.Popen( args=[ @@ -139,7 +139,9 @@ def test_HighSNR_P_P_DATACx(bursts: int, frames_per_burst: int, mode: str): proc.terminate() assert proc.exitcode == 0 - proc.close() + # proc.close() # Python 3.7+ only + proc.terminate() + proc.join() if __name__ == "__main__": diff --git a/test/test_highsnr_stdio_P_P_multi.py b/test/test_highsnr_stdio_P_P_multi.py index 2c02802f..7ce5b1a5 100644 --- a/test/test_highsnr_stdio_P_P_multi.py +++ b/test/test_highsnr_stdio_P_P_multi.py @@ -61,7 +61,7 @@ def t_HighSNR_P_P_Multi(bursts: int, frames_per_burst: int): rx_side = os.path.join("test", rx_side) os.environ["PYTHONPATH"] += ":." - print(f"{tx_side=} / {rx_side=}") + print(f"tx_side={tx_side} / rx_side={rx_side}") with subprocess.Popen( args=[ @@ -134,7 +134,9 @@ def test_HighSNR_P_P_multi(bursts: int, frames_per_burst: int): proc.terminate() assert proc.exitcode == 0 - proc.close() + # proc.close() # Python 3.7+ only + proc.terminate() + proc.join() if __name__ == "__main__": diff --git a/test/test_tnc_states.py b/test/test_tnc_states.py index 3b5e6b47..7cc5575a 100644 --- a/test/test_tnc_states.py +++ b/test/test_tnc_states.py @@ -253,7 +253,7 @@ def test_foreign_disconnect(mycall: str, dxcall: str): proc.terminate() proc.join() - # print(f"\n{proc.exitcode=}") + # print(f"\nproc.exitcode={proc.exitcode}") assert proc.exitcode == 0 @@ -271,7 +271,7 @@ def test_valid_disconnect(mycall: str, dxcall: str): proc.terminate() proc.join() - # print(f"\n{proc.exitcode=}") + # print(f"\nproc.exitcode={proc.exitcode}") assert proc.exitcode == 0 diff --git a/tnc/data_handler.py b/tnc/data_handler.py index bef0d76d..c95efb3d 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -360,7 +360,7 @@ class DATA: while static.TRANSMITTING: time.sleep(0.01) - def send_data_to_socket_queue(self, /, **jsondata): + def send_data_to_socket_queue(self, **jsondata): """ Send information to the UI via JSON and the sock.SOCKET_QUEUE. From c84a4a29a125e554c751a4dcf26936785dc754fb Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Mon, 4 Jul 2022 17:27:35 -0400 Subject: [PATCH 9/9] Remove unused received_mycall_crc --- tnc/data_handler.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index c95efb3d..0e858fc9 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -52,9 +52,6 @@ class DATA: self.transmission_uuid = "" - # Received my callsign crc if we received a crc for another ssid - self.received_mycall_crc = b"" # Does this need to be a class variable? - self.data_channel_last_received = 0.0 # time of last "live sign" of a frame self.burst_ack_snr = 0 # SNR from received burst ack frames @@ -485,11 +482,9 @@ class DATA: data_in = bytes(data_in) # get received crc for different mycall ssids - self.received_mycall_crc = data_in[2:5] - # check if callsign ssid override _, mycallsign = helpers.check_callsign( - self.mycallsign, self.received_mycall_crc + self.mycallsign, data_in[2:5] ) # only process data if we are in ARQ and BUSY state else return to quit @@ -700,12 +695,6 @@ class DATA: self.transmission_uuid = str(uuid.uuid4()) timestamp = int(time.time()) - # This shouldn't be needed - it's checked at the start of arq_data_received. - # # check if callsign ssid override - # _, mycallsign = helpers.check_callsign( - # self.mycallsign, self.received_mycall_crc - # ) - # Re-code data_frame in base64, UTF-8 for JSON UI communication. base64_data = base64.b64encode(data_frame).decode("UTF-8") static.RX_BUFFER.append( @@ -2285,8 +2274,6 @@ class DATA: self.log.debug("[TNC] arq_cleanup") - self.received_mycall_crc = b"" - self.rx_frame_bof_received = False self.rx_frame_eof_received = False self.burst_ack = False