improved rigctld network integration

This commit is contained in:
dj2ls 2022-03-06 17:23:04 +01:00
parent 1519c22358
commit 7134361267
3 changed files with 68 additions and 43 deletions

View file

@ -1103,7 +1103,8 @@ class DATA():
modem.MODEM_TRANSMIT_QUEUE.put([14,1,0,txbuffer]) modem.MODEM_TRANSMIT_QUEUE.put([14,1,0,txbuffer])
# wait while transmitting # wait while transmitting
while static.TRANSMITTING: while static.TRANSMITTING:
time.sleep(0.01) time.sleep(0.01)
@ -1119,12 +1120,9 @@ class DATA():
structlog.get_logger("structlog").debug("received session heartbeat") structlog.get_logger("structlog").debug("received session heartbeat")
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp
static.ARQ_SESSION = True static.ARQ_SESSION = True
#static.ARQ_STATE = True
static.TNC_STATE = 'BUSY' static.TNC_STATE = 'BUSY'
self.data_channel_last_received = int(time.time()) self.data_channel_last_received = int(time.time())
if static.ARQ_SESSION and not self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer: if static.ARQ_SESSION and not self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer:
@ -1316,7 +1314,7 @@ class DATA():
def arq_received_channel_is_open(self, data_in:bytes): def arq_received_channel_is_open(self, data_in:bytes):
""" """
Called if we received a data channel opener
Args: Args:
data_in:bytes: data_in:bytes:
@ -1360,7 +1358,7 @@ class DATA():
# ---------- PING # ---------- PING
def transmit_ping(self, dxcallsign:bytes): def transmit_ping(self, dxcallsign:bytes):
""" """
Funktion for controlling pings
Args: Args:
dxcallsign:bytes: dxcallsign:bytes:
@ -1390,6 +1388,7 @@ class DATA():
def received_ping(self, data_in:bytes, frequency_offset:str): def received_ping(self, data_in:bytes, frequency_offset:str):
""" """
Called if we received a ping
Args: Args:
data_in:bytes: data_in:bytes:
@ -1423,7 +1422,7 @@ class DATA():
def received_ping_ack(self, data_in:bytes): def received_ping_ack(self, data_in:bytes):
""" """
Called if a PING ack has been received
Args: Args:
data_in:bytes: data_in:bytes:
@ -1443,7 +1442,9 @@ class DATA():
def stop_transmission(self): def stop_transmission(self):
""" """ """
Force a stop of the running transmission
"""
structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") structlog.get_logger("structlog").warning("[TNC] Stopping transmission!")
stop_frame = bytearray(14) stop_frame = bytearray(14)
stop_frame[:1] = bytes([249]) stop_frame[:1] = bytes([249])
@ -1462,7 +1463,9 @@ class DATA():
self.arq_cleanup() self.arq_cleanup()
def received_stop_transmission(self): def received_stop_transmission(self):
""" """ """
Received a transmission stop
"""
structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") structlog.get_logger("structlog").warning("[TNC] Stopping transmission!")
static.TNC_STATE = 'IDLE' static.TNC_STATE = 'IDLE'
static.ARQ_STATE = False static.ARQ_STATE = False
@ -1473,7 +1476,7 @@ class DATA():
def run_beacon(self, interval:int): def run_beacon(self, interval:int):
""" """
Controlling funktion for running a beacon
Args: Args:
interval:int: interval:int:
@ -1508,7 +1511,7 @@ class DATA():
def received_beacon(self, data_in:bytes): def received_beacon(self, data_in:bytes):
""" """
Called if we received a beacon
Args: Args:
data_in:bytes: data_in:bytes:
@ -1525,7 +1528,9 @@ class DATA():
def transmit_cq(self): def transmit_cq(self):
""" """ """
Transmit a CQ
"""
logging.info("CQ CQ CQ") logging.info("CQ CQ CQ")
static.INFO.append("CQ;SENDING") static.INFO.append("CQ;SENDING")
@ -1546,7 +1551,7 @@ class DATA():
def received_cq(self, data_in:bytes): def received_cq(self, data_in:bytes):
""" """
Called if we received a CQ
Args: Args:
data_in:bytes: data_in:bytes:
@ -1563,7 +1568,7 @@ class DATA():
# ------------ CALUCLATE TRANSFER RATES # ------------ CALUCLATE TRANSFER RATES
def calculate_transfer_rate_rx(self, rx_start_of_transmission:float, receivedbytes:int) -> list: def calculate_transfer_rate_rx(self, rx_start_of_transmission:float, receivedbytes:int) -> list:
""" """
Calculate Transferrate for receiving data
Args: Args:
rx_start_of_transmission:float: rx_start_of_transmission:float:
receivedbytes:int: receivedbytes:int:
@ -1595,7 +1600,9 @@ class DATA():
def reset_statistics(self): def reset_statistics(self):
""" """ """
Reset statistics
"""
# reset ARQ statistics # reset ARQ statistics
static.ARQ_BYTES_PER_MINUTE_BURST = 0 static.ARQ_BYTES_PER_MINUTE_BURST = 0
static.ARQ_BYTES_PER_MINUTE = 0 static.ARQ_BYTES_PER_MINUTE = 0
@ -1606,7 +1613,7 @@ class DATA():
def calculate_transfer_rate_tx(self, tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list: def calculate_transfer_rate_tx(self, tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list:
""" """
Calcualte Transferrate for transmission
Args: Args:
tx_start_of_transmission:float: tx_start_of_transmission:float:
sentbytes:int: sentbytes:int:
@ -1642,7 +1649,9 @@ class DATA():
# ----------------------CLEANUP AND RESET FUNCTIONS # ----------------------CLEANUP AND RESET FUNCTIONS
def arq_cleanup(self): def arq_cleanup(self):
""" """ """
Cleanup funktion which clears all ARQ states
"""
structlog.get_logger("structlog").debug("cleanup") structlog.get_logger("structlog").debug("cleanup")
@ -1681,7 +1690,7 @@ class DATA():
static.ARQ_STATE = False static.ARQ_STATE = False
self.arq_file_transfer = False self.arq_file_transfer = False
self.IS_ARQ_SESSION_MASTER = False
@ -1690,7 +1699,7 @@ class DATA():
def arq_reset_ack(self,state:bool): def arq_reset_ack(self,state:bool):
""" """
Funktion for resetting acknowledge states
Args: Args:
state:bool: state:bool:
@ -1705,6 +1714,7 @@ class DATA():
def set_listening_modes(self, mode): def set_listening_modes(self, mode):
""" """
Function for setting the data modes we are listening to for saving cpu power
Args: Args:
mode: mode:
@ -1745,8 +1755,12 @@ class DATA():
self.burst_watchdog() self.burst_watchdog()
self.arq_session_keep_alive_watchdog() self.arq_session_keep_alive_watchdog()
def burst_watchdog(self): def burst_watchdog(self):
""" """ """
watchdog which checks if we are running into a connection timeout
DATA BURST
"""
# IRS SIDE # IRS SIDE
if static.ARQ_STATE and static.TNC_STATE == 'BUSY' and self.is_IRS: if static.ARQ_STATE and static.TNC_STATE == 'BUSY' and self.is_IRS:
@ -1796,7 +1810,10 @@ class DATA():
#print(self.n_retries_per_burst) #print(self.n_retries_per_burst)
def data_channel_keep_alive_watchdog(self): def data_channel_keep_alive_watchdog(self):
"""Author: DJ2LS""" """
watchdog which checks if we are running into a connection timeout
DATA CHANNEL
"""
# and not static.ARQ_SEND_KEEP_ALIVE: # and not static.ARQ_SEND_KEEP_ALIVE:
if static.ARQ_STATE and static.TNC_STATE == 'BUSY': if static.ARQ_STATE and static.TNC_STATE == 'BUSY':
@ -1813,7 +1830,10 @@ class DATA():
self.arq_cleanup() self.arq_cleanup()
def arq_session_keep_alive_watchdog(self): def arq_session_keep_alive_watchdog(self):
""" """ """
watchdog which checks if we are running into a connection timeout
ARQ SESSION
"""
if static.ARQ_SESSION and static.TNC_STATE == 'BUSY' and not self.arq_file_transfer: if static.ARQ_SESSION and static.TNC_STATE == 'BUSY' and not self.arq_file_transfer:
if self.arq_session_last_received + self.arq_session_timeout > time.time(): if self.arq_session_last_received + self.arq_session_timeout > time.time():
time.sleep(0.01) time.sleep(0.01)
@ -1825,7 +1845,9 @@ class DATA():
def heartbeat(self): def heartbeat(self):
""" """ """
heartbeat thread which auto resumes the heartbeat signal within a arq session
"""
while 1: while 1:
time.sleep(0.01) time.sleep(0.01)
if static.ARQ_SESSION and self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer: if static.ARQ_SESSION and self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer:

View file

@ -20,7 +20,8 @@ class radio():
def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5): def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5):
""" Open a connection to rotctld, and test it for validity """ """ Open a connection to rotctld, and test it for validity """
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(timeout) #self.sock.settimeout(timeout)
self.connected = False self.connected = False
self.hostname = hostname self.hostname = hostname
self.port = port self.port = port
@ -47,28 +48,27 @@ class radio():
self.hostname = rigctld_ip self.hostname = rigctld_ip
self.port = int(rigctld_port) self.port = int(rigctld_port)
if self.connect(): if self.connect():
logging.debug(f"Rigctl intialized") logging.debug(f"Rigctl intialized")
return True return True
else: else:
structlog.get_logger("structlog").error("[RIGCTLD] Can't connect to rigctld!", ip=self.hostname, port=self.port) structlog.get_logger("structlog").error("[RIGCTLD] Can't connect to rigctld!", ip=self.hostname, port=self.port)
return False return False
def connect(self): def connect(self):
"""Connect to rotctld instance""" """Connect to rigctld instance"""
for a in range(0,self.connection_attempts): if not self.connected:
try: try:
self.sock.connect((self.hostname,self.port)) self.connection = socket.create_connection((self.hostname,self.port))
self.connected = True self.connected = True
structlog.get_logger("structlog").info("[RIGCTLD] Connected to rigctld!", attempt=a+1, ip=self.hostname, port=self.port) structlog.get_logger("structlog").info("[RIGCTLD] Connected to rigctld!", ip=self.hostname, port=self.port)
return True return True
except: except Exception as e:
# ConnectionRefusedError: [Errno 111] Connection refused # ConnectionRefusedError: [Errno 111] Connection refused
self.connected = False self.close_rig()
structlog.get_logger("structlog").warning("[RIGCTLD] Re-Trying to establish a connection to rigctld!", attempt=a+1, ip=self.hostname, port=self.port) structlog.get_logger("structlog").warning("[RIGCTLD] Connection to rigctld refused! Reconnect...", ip=self.hostname, port=self.port, e=e)
time.sleep(0.5) return False
return False
def close_rig(self): def close_rig(self):
""" """ """ """
@ -88,20 +88,23 @@ class radio():
""" """
if self.connected: if self.connected:
try: try:
self.sock.sendall(command+b'\n') self.connection.sendall(command+b'\n')
except: except:
structlog.get_logger("structlog").warning("[RIGCTLD] Command not executed!", command=command, ip=self.hostname, port=self.port) structlog.get_logger("structlog").warning("[RIGCTLD] Command not executed!", command=command, ip=self.hostname, port=self.port)
self.connected = False self.connected = False
try: try:
return self.sock.recv(1024) return self.connection.recv(1024)
except: except:
structlog.get_logger("structlog").warning("[RIGCTLD] No command response!", command=command, ip=self.hostname, port=self.port) structlog.get_logger("structlog").warning("[RIGCTLD] No command response!", command=command, ip=self.hostname, port=self.port)
self.connected = False self.connected = False
else: else:
structlog.get_logger("structlog").error("[RIGCTLD] No connection to rigctl!", ip=self.hostname, port=self.port)
# reconnecting....
time.sleep(0.5)
self.connect() self.connect()
def get_mode(self): def get_mode(self):
""" """ """ """
try: try:

View file

@ -117,7 +117,7 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
if chunk == b'': if chunk == b'':
#print("connection broken. Closing...") #print("connection broken. Closing...")
self.connection_alive = False self.connection_alive = False
print(chunk)
if data.startswith(b'{') and data.endswith(b'}\n'): if data.startswith(b'{') and data.endswith(b'}\n'):
# split data by \n if we have multiple commands in socket buffer # split data by \n if we have multiple commands in socket buffer
data = data.split(b'\n') data = data.split(b'\n')
@ -270,7 +270,7 @@ def process_tnc_commands(data):
static.DXCALLSIGN = dxcallsign static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN)
else: else:
dxcallsign = static.DXCALLSIGN = dxcallsign dxcallsign = static.DXCALLSIGN
static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN)
mode = int(received_json["parameter"][0]["mode"]) mode = int(received_json["parameter"][0]["mode"])
@ -309,7 +309,7 @@ def process_tnc_commands(data):
} }
for i in range(0, len(static.RX_BUFFER)): for i in range(0, len(static.RX_BUFFER)):
print(static.RX_BUFFER[i][4]) #print(static.RX_BUFFER[i][4])
#rawdata = json.loads(static.RX_BUFFER[i][4]) #rawdata = json.loads(static.RX_BUFFER[i][4])
base64_data = static.RX_BUFFER[i][4] base64_data = static.RX_BUFFER[i][4]
output["data-array"].append({"uuid": static.RX_BUFFER[i][0],"timestamp": static.RX_BUFFER[i][1], "dxcallsign": str(static.RX_BUFFER[i][2], 'utf-8'), "dxgrid": str(static.RX_BUFFER[i][3], 'utf-8'), "data": base64_data}) output["data-array"].append({"uuid": static.RX_BUFFER[i][0],"timestamp": static.RX_BUFFER[i][1], "dxcallsign": str(static.RX_BUFFER[i][2], 'utf-8'), "dxgrid": str(static.RX_BUFFER[i][3], 'utf-8'), "data": base64_data})
@ -393,7 +393,7 @@ def process_daemon_commands(data):
if received_json["type"] == 'set' and received_json["command"] == 'mycallsign': if received_json["type"] == 'set' and received_json["command"] == 'mycallsign':
try: try:
callsign = received_json["parameter"] callsign = received_json["parameter"]
print(received_json)
if bytes(callsign, 'utf-8') == b'': if bytes(callsign, 'utf-8') == b'':
self.request.sendall(b'INVALID CALLSIGN') self.request.sendall(b'INVALID CALLSIGN')
structlog.get_logger("structlog").warning("[DMN] SET MYCALL FAILED", call=static.MYCALLSIGN, crc=static.MYCALLSIGN_CRC) structlog.get_logger("structlog").warning("[DMN] SET MYCALL FAILED", call=static.MYCALLSIGN, crc=static.MYCALLSIGN_CRC)