FreeDATA/tnc/rigctld.py
2023-04-27 21:43:56 +02:00

350 lines
No EOL
10 KiB
Python

#!/usr/bin/env python3
# class taken from darksidelemm
# rigctl - https://github.com/darksidelemm/rotctld-web-gui/blob/master/rotatorgui.py#L35
#
# modified and adjusted to FreeDATA needs by DJ2LS
import contextlib
import socket
import time
import structlog
import threading
import static
from static import ARQ, AudioParam, Beacon, Channel, Daemon, HamlibParam, ModemParam, Station, TCIParam
class radio:
"""rigctld (hamlib) communication class"""
log = structlog.get_logger("radio (rigctld)")
def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5):
"""Open a connection to rigctld, and test it for validity"""
self.ptt_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ptt_connected = False
self.data_connected = False
self.hostname = hostname
self.port = port
self.connection_attempts = 5
# class wide variable for some parameters
self.bandwidth = ''
self.frequency = ''
self.mode = ''
self.alc = ''
self.strength = ''
self.rf = ''
def open_rig(
self,
rigctld_ip,
rigctld_port
):
"""
Args:
rigctld_ip:
rigctld_port:
Returns:
"""
self.hostname = rigctld_ip
self.port = int(rigctld_port)
# _ptt_connect = self.ptt_connect()
# _data_connect = self.data_connect()
ptt_thread = threading.Thread(target=self.ptt_connect, args=[], daemon=True)
ptt_thread.start()
data_thread = threading.Thread(target=self.data_connect, args=[], daemon=True)
data_thread.start()
# wait some time
threading.Event().wait(0.5)
if self.ptt_connected and self.data_connected:
self.log.debug("Rigctl DATA/PTT initialized")
return True
self.log.error(
"[RIGCTLD] Can't connect!", ip=self.hostname, port=self.port
)
return False
def ptt_connect(self):
"""Connect to rigctld instance"""
while True:
if not self.ptt_connected:
try:
self.ptt_connection = socket.create_connection((self.hostname, self.port))
self.ptt_connected = True
self.log.info(
"[RIGCTLD] Connected PTT instance to rigctld!", ip=self.hostname, port=self.port
)
except Exception as err:
# ConnectionRefusedError: [Errno 111] Connection refused
self.close_rig()
self.log.warning(
"[RIGCTLD] PTT Reconnect...",
ip=self.hostname,
port=self.port,
e=err,
)
threading.Event().wait(0.5)
def data_connect(self):
"""Connect to rigctld instance"""
while True:
if not self.data_connected:
try:
self.data_connection = socket.create_connection((self.hostname, self.port))
self.data_connected = True
self.log.info(
"[RIGCTLD] Connected DATA instance to rigctld!", ip=self.hostname, port=self.port
)
except Exception as err:
# ConnectionRefusedError: [Errno 111] Connection refused
self.close_rig()
self.log.warning(
"[RIGCTLD] DATA Reconnect...",
ip=self.hostname,
port=self.port,
e=err,
)
threading.Event().wait(0.5)
def close_rig(self):
""" """
self.ptt_sock.close()
self.data_sock.close()
self.ptt_connected = False
self.data_connected = False
def send_ptt_command(self, command, expect_answer) -> bytes:
"""Send a command to the connected rotctld instance,
and return the return value.
Args:
command:
"""
if self.ptt_connected:
try:
self.ptt_connection.sendall(command + b"\n")
except Exception:
self.log.warning(
"[RIGCTLD] Command not executed!",
command=command,
ip=self.hostname,
port=self.port,
)
self.ptt_connected = False
return b""
def send_data_command(self, command, expect_answer) -> bytes:
"""Send a command to the connected rotctld instance,
and return the return value.
Args:
command:
"""
if self.data_connected:
self.data_connection.setblocking(False)
#Allow a little more time for a response from rigctld before generating a timeout, seems to have no ill effects on a well behaving setup and fixes Issue #373
self.data_connection.settimeout(0.30)
try:
self.data_connection.sendall(command + b"\n")
except Exception:
self.log.warning(
"[RIGCTLD] Command not executed!",
command=command,
ip=self.hostname,
port=self.port,
)
self.data_connected = False
try:
# recv seems to be blocking so in case of ptt we don't need the response
# maybe this speeds things up and avoids blocking states
recv = True
data = b''
while recv:
try:
data = self.data_connection.recv(4800)
except socket.timeout:
recv = False
return data
# return self.data_connection.recv(64) if expect_answer else True
except Exception:
self.log.warning(
"[RIGCTLD] No command response!",
command=command,
ip=self.hostname,
port=self.port,
)
self.data_connected = False
return b""
def get_status(self):
""" """
return "connected" if self.data_connected and self.ptt_connected else "unknown/disconnected"
def get_level(self):
try:
data = self.send_data_command(b"l RF", True)
data = data.split(b"\n")
rf = data[0].decode("utf-8")
if 'RPRT' not in rf:
try:
self.rf = str(rf)
except ValueError:
self.rf = str(rf)
return self.rf
except Exception:
return self.rf
def get_strength(self):
try:
data = self.send_data_command(b"l STRENGTH", True)
data = data.split(b"\n")
strength = data[0].decode("utf-8")
if 'RPRT' not in strength:
try:
self.strength = str(strength)
except ValueError:
self.strength = str(strength)
return self.strength
except Exception:
return self.strength
def get_alc(self):
try:
data = self.send_data_command(b"l ALC", True)
data = data.split(b"\n")
alc = data[0].decode("utf-8")
if 'RPRT' not in alc:
try:
alc = float(alc)
self.alc = alc if alc != 0.0 else HamlibParam.alc
except ValueError:
self.alc = 0.0
return self.alc
except Exception:
return self.alc
def get_mode(self):
""" """
try:
data = self.send_data_command(b"m", True)
data = data.split(b"\n")
data = data[0].decode("utf-8")
if 'RPRT' not in data:
try:
data = int(data)
except ValueError:
self.mode = str(data)
return self.mode
except Exception:
return self.mode
def get_bandwidth(self):
""" """
try:
data = self.send_data_command(b"m", True)
data = data.split(b"\n")
data = data[1].decode("utf-8")
if 'RPRT' not in data and data not in ['']:
with contextlib.suppress(ValueError):
self.bandwidth = int(data)
return self.bandwidth
except Exception:
return self.bandwidth
def get_frequency(self):
""" """
try:
data = self.send_data_command(b"f", True)
data = data.decode("utf-8")
if 'RPRT' not in data and data not in [0, '0', '']:
with contextlib.suppress(ValueError):
data = int(data)
# make sure we have a frequency and not bandwidth
if data >= 10000:
self.frequency = data
return self.frequency
except Exception:
return self.frequency
def get_ptt(self):
""" """
try:
return self.send_data_command(b"t", True)
except Exception:
return False
def set_ptt(self, state):
"""
Args:
state:
Returns:
"""
try:
if state:
self.send_ptt_command(b"T 1", False)
else:
self.send_ptt_command(b"T 0", False)
return state
except Exception:
return False
def set_frequency(self, frequency):
"""
Args:
frequency:
Returns:
"""
try:
command = bytes(f"F {frequency}", "utf-8")
self.send_data_command(command, False)
except Exception:
return False
def set_mode(self, mode):
"""
Args:
mode:
Returns:
"""
try:
command = bytes(f"M {mode} {self.bandwidth}", "utf-8")
self.send_data_command(command, False)
except Exception:
return False