From 71343612673e3132ed7674fa49226613c59d1775 Mon Sep 17 00:00:00 2001 From: dj2ls Date: Sun, 6 Mar 2022 17:23:04 +0100 Subject: [PATCH] improved rigctld network integration --- tnc/data_handler.py | 68 ++++++++++++++++++++++++++++++--------------- tnc/rigctld.py | 35 ++++++++++++----------- tnc/sock.py | 8 +++--- 3 files changed, 68 insertions(+), 43 deletions(-) diff --git a/tnc/data_handler.py b/tnc/data_handler.py index d2204187..4260ef9f 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -1103,7 +1103,8 @@ class DATA(): modem.MODEM_TRANSMIT_QUEUE.put([14,1,0,txbuffer]) # wait while transmitting while static.TRANSMITTING: - time.sleep(0.01) + time.sleep(0.01) + @@ -1119,12 +1120,9 @@ class DATA(): structlog.get_logger("structlog").debug("received session heartbeat") helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - - self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp static.ARQ_SESSION = True - #static.ARQ_STATE = True static.TNC_STATE = 'BUSY' self.data_channel_last_received = int(time.time()) if static.ARQ_SESSION and not self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer: @@ -1316,7 +1314,7 @@ class DATA(): def arq_received_channel_is_open(self, data_in:bytes): """ - + Called if we received a data channel opener Args: data_in:bytes: @@ -1360,7 +1358,7 @@ class DATA(): # ---------- PING def transmit_ping(self, dxcallsign:bytes): """ - + Funktion for controlling pings Args: dxcallsign:bytes: @@ -1390,6 +1388,7 @@ class DATA(): def received_ping(self, data_in:bytes, frequency_offset:str): """ + Called if we received a ping Args: data_in:bytes: @@ -1423,7 +1422,7 @@ class DATA(): def received_ping_ack(self, data_in:bytes): """ - + Called if a PING ack has been received Args: data_in:bytes: @@ -1443,7 +1442,9 @@ class DATA(): def stop_transmission(self): - """ """ + """ + Force a stop of the running transmission + """ structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") stop_frame = bytearray(14) stop_frame[:1] = bytes([249]) @@ -1462,7 +1463,9 @@ class DATA(): self.arq_cleanup() def received_stop_transmission(self): - """ """ + """ + Received a transmission stop + """ structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") static.TNC_STATE = 'IDLE' static.ARQ_STATE = False @@ -1473,7 +1476,7 @@ class DATA(): def run_beacon(self, interval:int): """ - + Controlling funktion for running a beacon Args: interval:int: @@ -1508,7 +1511,7 @@ class DATA(): def received_beacon(self, data_in:bytes): """ - + Called if we received a beacon Args: data_in:bytes: @@ -1525,7 +1528,9 @@ class DATA(): def transmit_cq(self): - """ """ + """ + Transmit a CQ + """ logging.info("CQ CQ CQ") static.INFO.append("CQ;SENDING") @@ -1546,7 +1551,7 @@ class DATA(): def received_cq(self, data_in:bytes): """ - + Called if we received a CQ Args: data_in:bytes: @@ -1563,7 +1568,7 @@ class DATA(): # ------------ CALUCLATE TRANSFER RATES def calculate_transfer_rate_rx(self, rx_start_of_transmission:float, receivedbytes:int) -> list: """ - + Calculate Transferrate for receiving data Args: rx_start_of_transmission:float: receivedbytes:int: @@ -1595,7 +1600,9 @@ class DATA(): def reset_statistics(self): - """ """ + """ + Reset statistics + """ # reset ARQ statistics static.ARQ_BYTES_PER_MINUTE_BURST = 0 static.ARQ_BYTES_PER_MINUTE = 0 @@ -1606,7 +1613,7 @@ class DATA(): def calculate_transfer_rate_tx(self, tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list: """ - + Calcualte Transferrate for transmission Args: tx_start_of_transmission:float: sentbytes:int: @@ -1642,7 +1649,9 @@ class DATA(): # ----------------------CLEANUP AND RESET FUNCTIONS def arq_cleanup(self): - """ """ + """ + Cleanup funktion which clears all ARQ states + """ structlog.get_logger("structlog").debug("cleanup") @@ -1681,7 +1690,7 @@ class DATA(): static.ARQ_STATE = False self.arq_file_transfer = False - self.IS_ARQ_SESSION_MASTER = False + @@ -1690,7 +1699,7 @@ class DATA(): def arq_reset_ack(self,state:bool): """ - + Funktion for resetting acknowledge states Args: state:bool: @@ -1705,6 +1714,7 @@ class DATA(): def set_listening_modes(self, mode): """ + Function for setting the data modes we are listening to for saving cpu power Args: mode: @@ -1745,8 +1755,12 @@ class DATA(): self.burst_watchdog() self.arq_session_keep_alive_watchdog() + def burst_watchdog(self): - """ """ + """ + watchdog which checks if we are running into a connection timeout + DATA BURST + """ # IRS SIDE if static.ARQ_STATE and static.TNC_STATE == 'BUSY' and self.is_IRS: @@ -1796,7 +1810,10 @@ class DATA(): #print(self.n_retries_per_burst) def data_channel_keep_alive_watchdog(self): - """Author: DJ2LS""" + """ + watchdog which checks if we are running into a connection timeout + DATA CHANNEL + """ # and not static.ARQ_SEND_KEEP_ALIVE: if static.ARQ_STATE and static.TNC_STATE == 'BUSY': @@ -1813,7 +1830,10 @@ class DATA(): self.arq_cleanup() def arq_session_keep_alive_watchdog(self): - """ """ + """ + watchdog which checks if we are running into a connection timeout + ARQ SESSION + """ if static.ARQ_SESSION and static.TNC_STATE == 'BUSY' and not self.arq_file_transfer: if self.arq_session_last_received + self.arq_session_timeout > time.time(): time.sleep(0.01) @@ -1825,7 +1845,9 @@ class DATA(): def heartbeat(self): - """ """ + """ + heartbeat thread which auto resumes the heartbeat signal within a arq session + """ while 1: time.sleep(0.01) if static.ARQ_SESSION and self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer: diff --git a/tnc/rigctld.py b/tnc/rigctld.py index 17a0602e..6d3f5660 100644 --- a/tnc/rigctld.py +++ b/tnc/rigctld.py @@ -20,7 +20,8 @@ class radio(): def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5): """ Open a connection to rotctld, and test it for validity """ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(timeout) + #self.sock.settimeout(timeout) + self.connected = False self.hostname = hostname self.port = port @@ -47,28 +48,27 @@ class radio(): self.hostname = rigctld_ip self.port = int(rigctld_port) - + if self.connect(): logging.debug(f"Rigctl intialized") return True else: structlog.get_logger("structlog").error("[RIGCTLD] Can't connect to rigctld!", ip=self.hostname, port=self.port) return False - + def connect(self): - """Connect to rotctld instance""" - for a in range(0,self.connection_attempts): + """Connect to rigctld instance""" + if not self.connected: try: - self.sock.connect((self.hostname,self.port)) + self.connection = socket.create_connection((self.hostname,self.port)) self.connected = True - structlog.get_logger("structlog").info("[RIGCTLD] Connected to rigctld!", attempt=a+1, ip=self.hostname, port=self.port) + structlog.get_logger("structlog").info("[RIGCTLD] Connected to rigctld!", ip=self.hostname, port=self.port) return True - except: + except Exception as e: # ConnectionRefusedError: [Errno 111] Connection refused - self.connected = False - structlog.get_logger("structlog").warning("[RIGCTLD] Re-Trying to establish a connection to rigctld!", attempt=a+1, ip=self.hostname, port=self.port) - time.sleep(0.5) - return False + self.close_rig() + structlog.get_logger("structlog").warning("[RIGCTLD] Connection to rigctld refused! Reconnect...", ip=self.hostname, port=self.port, e=e) + return False def close_rig(self): """ """ @@ -88,20 +88,23 @@ class radio(): """ if self.connected: try: - self.sock.sendall(command+b'\n') + self.connection.sendall(command+b'\n') except: structlog.get_logger("structlog").warning("[RIGCTLD] Command not executed!", command=command, ip=self.hostname, port=self.port) self.connected = False try: - return self.sock.recv(1024) + return self.connection.recv(1024) except: structlog.get_logger("structlog").warning("[RIGCTLD] No command response!", command=command, ip=self.hostname, port=self.port) self.connected = False else: - structlog.get_logger("structlog").error("[RIGCTLD] No connection to rigctl!", ip=self.hostname, port=self.port) + + # reconnecting.... + time.sleep(0.5) self.connect() - + + def get_mode(self): """ """ try: diff --git a/tnc/sock.py b/tnc/sock.py index 04d4164f..fc6e13f9 100644 --- a/tnc/sock.py +++ b/tnc/sock.py @@ -117,7 +117,7 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler): if chunk == b'': #print("connection broken. Closing...") self.connection_alive = False - print(chunk) + if data.startswith(b'{') and data.endswith(b'}\n'): # split data by \n if we have multiple commands in socket buffer data = data.split(b'\n') @@ -270,7 +270,7 @@ def process_tnc_commands(data): static.DXCALLSIGN = dxcallsign static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) else: - dxcallsign = static.DXCALLSIGN = dxcallsign + dxcallsign = static.DXCALLSIGN static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) mode = int(received_json["parameter"][0]["mode"]) @@ -309,7 +309,7 @@ def process_tnc_commands(data): } for i in range(0, len(static.RX_BUFFER)): - print(static.RX_BUFFER[i][4]) + #print(static.RX_BUFFER[i][4]) #rawdata = json.loads(static.RX_BUFFER[i][4]) base64_data = static.RX_BUFFER[i][4] output["data-array"].append({"uuid": static.RX_BUFFER[i][0],"timestamp": static.RX_BUFFER[i][1], "dxcallsign": str(static.RX_BUFFER[i][2], 'utf-8'), "dxgrid": str(static.RX_BUFFER[i][3], 'utf-8'), "data": base64_data}) @@ -393,7 +393,7 @@ def process_daemon_commands(data): if received_json["type"] == 'set' and received_json["command"] == 'mycallsign': try: callsign = received_json["parameter"] - print(received_json) + if bytes(callsign, 'utf-8') == b'': self.request.sendall(b'INVALID CALLSIGN') structlog.get_logger("structlog").warning("[DMN] SET MYCALL FAILED", call=static.MYCALLSIGN, crc=static.MYCALLSIGN_CRC)