FreeDATA/tnc/rigctld.py

218 lines
5.8 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
2022-11-19 08:51:48 +00:00
# class taken from darksidelemm
# rigctl - https://github.com/darksidelemm/rotctld-web-gui/blob/master/rotatorgui.py#L35
#
# modified and adjusted to FreeDATA needs by DJ2LS
import contextlib
import socket
import time
import structlog
import threading
# set global hamlib version
hamlib_version = 0
2022-05-09 01:27:24 +00:00
class radio:
"""rigctld (hamlib) communication class"""
2022-06-01 00:35:35 +00:00
log = structlog.get_logger("radio (rigctld)")
def __init__(self, hostname="localhost", port=4532, poll_rate=5, timeout=5):
"""Open a connection to rigctld, and test it for validity"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2022-03-06 16:23:04 +00:00
2022-01-23 07:38:02 +00:00
self.connected = False
self.hostname = hostname
self.port = port
self.connection_attempts = 5
2022-11-19 08:51:48 +00:00
# class wide variable for some parameters
2022-11-19 09:05:17 +00:00
self.bandwidth = ''
self.frequency = ''
2022-11-19 08:51:48 +00:00
self.mode = ''
def open_rig(
self,
devicename,
deviceport,
hamlib_ptt_type,
serialspeed,
pttport,
data_bits,
stop_bits,
handshake,
rigctld_ip,
rigctld_port,
):
"""
Args:
2022-05-09 00:41:49 +00:00
devicename:
deviceport:
hamlib_ptt_type:
serialspeed:
pttport:
data_bits:
stop_bits:
handshake:
rigctld_ip:
rigctld_port:
Returns:
"""
self.hostname = rigctld_ip
self.port = int(rigctld_port)
if self.connect():
self.log.debug("Rigctl initialized")
return True
self.log.error(
2022-12-01 07:56:21 +00:00
"[RIGCTLD] Can't connect!", ip=self.hostname, port=self.port
)
return False
2022-05-09 00:41:49 +00:00
def connect(self):
2022-03-06 16:23:04 +00:00
"""Connect to rigctld instance"""
if not self.connected:
try:
self.connection = socket.create_connection((self.hostname, self.port))
self.connected = True
self.log.info(
"[RIGCTLD] Connected to rigctld!", ip=self.hostname, port=self.port
)
return True
except Exception as err:
# ConnectionRefusedError: [Errno 111] Connection refused
2022-03-06 16:23:04 +00:00
self.close_rig()
self.log.warning(
2022-12-01 07:56:21 +00:00
"[RIGCTLD] Reconnect...",
ip=self.hostname,
port=self.port,
e=err,
)
2022-03-06 16:23:04 +00:00
return False
2022-05-09 00:41:49 +00:00
def close_rig(self):
""" """
self.sock.close()
self.connected = False
def send_command(self, command, expect_answer) -> bytes:
"""Send a command to the connected rotctld instance,
and return the return value.
Args:
2022-05-09 00:41:49 +00:00
command:
"""
2022-01-23 07:38:02 +00:00
if self.connected:
try:
self.connection.sendall(command + b"\n")
except Exception:
self.log.warning(
"[RIGCTLD] Command not executed!",
command=command,
ip=self.hostname,
port=self.port,
)
self.connected = False
try:
# recv seems to be blocking so in case of ptt we dont need the response
# maybe this speeds things up and avoids blocking states
return self.connection.recv(16) if expect_answer else True
except Exception:
self.log.warning(
"[RIGCTLD] No command response!",
command=command,
ip=self.hostname,
port=self.port,
)
self.connected = False
else:
2022-05-09 00:41:49 +00:00
2022-03-06 16:23:04 +00:00
# reconnecting....
threading.Event().wait(0.5)
self.connect()
2022-03-06 16:23:04 +00:00
return b""
def get_status(self):
""" """
return "connected" if self.connected else "unknown/disconnected"
2022-11-19 09:11:08 +00:00
def get_mode(self):
""" """
try:
data = self.send_command(b"m", True)
data = data.split(b"\n")
2022-11-19 09:14:59 +00:00
data = data[0].decode("utf-8")
if 'RPRT' not in data:
try:
data = int(data)
except ValueError:
self.mode = str(data)
2022-11-19 08:51:48 +00:00
2022-11-19 09:05:17 +00:00
return self.mode
except Exception:
2022-11-19 09:14:59 +00:00
return self.mode
2022-05-09 01:27:24 +00:00
def get_bandwidth(self):
""" """
try:
data = self.send_command(b"m", True)
data = data.split(b"\n")
2022-11-19 09:14:59 +00:00
data = data[1].decode("utf-8")
2022-11-19 08:51:48 +00:00
if 'RPRT' not in data and data not in ['']:
with contextlib.suppress(ValueError):
self.bandwidth = int(data)
2022-11-19 09:05:17 +00:00
return self.bandwidth
except Exception:
2022-11-19 09:14:59 +00:00
return self.bandwidth
2022-05-09 00:41:49 +00:00
def get_frequency(self):
""" """
try:
data = self.send_command(b"f", True)
2022-11-19 09:05:17 +00:00
data = data.decode("utf-8")
if 'RPRT' not in data and data not in [0, '0', '']:
with contextlib.suppress(ValueError):
data = int(data)
# make sure we have a frequency and not bandwidth
if data >= 10000:
self.frequency = data
2022-11-19 09:05:17 +00:00
return self.frequency
except Exception:
2022-11-19 09:14:59 +00:00
return self.frequency
2022-05-09 00:41:49 +00:00
def get_ptt(self):
""" """
try:
return self.send_command(b"t", True)
except Exception:
return False
2022-05-09 00:41:49 +00:00
def set_ptt(self, state):
"""
Args:
2022-05-09 00:41:49 +00:00
state:
Returns:
"""
try:
if state:
self.send_command(b"T 1", False)
else:
self.send_command(b"T 0", False)
2022-05-09 00:41:49 +00:00
return state
except Exception:
return False