FreeDATA/modem/rigctld.py

206 lines
6.6 KiB
Python
Raw Normal View History

import socket
import structlog
import time
import threading
2022-05-09 01:27:24 +00:00
class radio:
"""rigctld (hamlib) communication class"""
2022-06-01 00:35:35 +00:00
log = structlog.get_logger("radio (rigctld)")
2024-01-12 14:51:23 +00:00
def __init__(self, states, hostname="localhost", port=4532, timeout=5):
self.hostname = hostname
self.port = port
2024-01-12 14:51:23 +00:00
self.timeout = timeout
self.states = states
self.connection = None
self.connected = False
self.await_response = threading.Event()
self.await_response.set()
2024-01-12 14:51:23 +00:00
self.parameters = {
'frequency': '---',
'mode': '---',
'alc': '---',
'strength': '---',
'bandwidth': '---',
'rf': '---',
'ptt': False # Initial PTT state is set to False
}
# connect to radio
self.connect()
def connect(self):
try:
self.connection = socket.create_connection((self.hostname, self.port), timeout=self.timeout)
self.connected = True
self.states.set("radio_status", True)
self.log.info(f"[RIGCTLD] Connected to rigctld at {self.hostname}:{self.port}")
except Exception as err:
self.log.warning(f"[RIGCTLD] Failed to connect to rigctld: {err}")
self.connected = False
self.states.set("radio_status", False)
def disconnect(self):
self.connected = False
self.connection.close()
del self.connection
self.connection = None
self.states.set("radio_status", False)
self.parameters = {
'frequency': '---',
'mode': '---',
'alc': '---',
'strength': '---',
'bandwidth': '---',
'rf': '---',
'ptt': False # Initial PTT state is set to False
}
def send_command(self, command) -> str:
if self.connected:
# wait if we have another command awaiting its response...
self.await_response.wait()
2024-01-12 14:51:23 +00:00
try:
self.await_response = threading.Event()
2024-01-12 14:51:23 +00:00
self.connection.sendall(command.encode('utf-8') + b"\n")
response = self.connection.recv(1024)
self.await_response.set()
2024-01-12 14:51:23 +00:00
return response.decode('utf-8').strip()
except Exception as err:
self.log.warning(f"[RIGCTLD] Error sending command [{command}] to rigctld: {err}")
2024-01-12 14:51:23 +00:00
self.connected = False
2024-01-12 14:51:23 +00:00
return ""
2022-11-19 08:51:48 +00:00
2024-01-12 14:51:23 +00:00
def set_ptt(self, state):
"""Set the PTT (Push-to-Talk) state.
Args:
2024-01-12 14:51:23 +00:00
state (bool): True to enable PTT, False to disable.
Returns:
2024-01-12 14:51:23 +00:00
bool: True if the PTT state was set successfully, False otherwise.
"""
2024-01-12 14:51:23 +00:00
if self.connected:
try:
if state:
self.send_command('T 1') # Enable PTT
else:
self.send_command('T 0') # Disable PTT
self.parameters['ptt'] = state # Update PTT state in parameters
return True
except Exception as err:
self.log.warning(f"[RIGCTLD] Error setting PTT state: {err}")
self.connected = False
return False
2022-05-09 00:41:49 +00:00
2024-01-12 14:51:23 +00:00
def set_mode(self, mode):
"""Set the mode.
2023-01-04 07:33:25 +00:00
Args:
2024-01-12 14:51:23 +00:00
mode (str): The mode to set.
2023-01-04 07:33:25 +00:00
2024-01-12 14:51:23 +00:00
Returns:
bool: True if the mode was set successfully, False otherwise.
2023-01-04 07:33:25 +00:00
"""
2024-01-12 14:51:23 +00:00
if self.connected:
try:
command = f"M {mode} 0"
2024-01-12 14:51:23 +00:00
self.send_command(command)
self.parameters['mode'] = mode
return True
except Exception as err:
self.log.warning(f"[RIGCTLD] Error setting mode: {err}")
self.connected = False
return False
2024-01-12 14:51:23 +00:00
def set_frequency(self, frequency):
"""Set the frequency.
Args:
2024-01-12 14:51:23 +00:00
frequency (str): The frequency to set.
2024-01-12 14:51:23 +00:00
Returns:
bool: True if the frequency was set successfully, False otherwise.
"""
2024-01-12 14:51:23 +00:00
if self.connected:
try:
2024-01-12 14:51:23 +00:00
command = f"F {frequency}"
self.send_command(command)
self.parameters['frequency'] = frequency
return True
except Exception as err:
self.log.warning(f"[RIGCTLD] Error setting frequency: {err}")
self.connected = False
return False
2022-05-09 00:41:49 +00:00
2024-01-12 14:51:23 +00:00
def set_bandwidth(self, bandwidth):
"""Set the bandwidth.
Args:
2024-01-12 14:51:23 +00:00
bandwidth (str): The bandwidth to set.
Returns:
2024-01-12 14:51:23 +00:00
bool: True if the bandwidth was set successfully, False otherwise.
"""
2024-01-12 14:51:23 +00:00
if self.connected:
try:
command = f"M {self.parameters['mode']} {bandwidth}"
self.send_command(command)
self.parameters['bandwidth'] = bandwidth
return True
except Exception as err:
self.log.warning(f"[RIGCTLD] Error setting bandwidth: {err}")
self.connected = False
return False
def set_rf_level(self, rf):
2024-01-12 14:51:23 +00:00
"""Set the RF.
Args:
2024-01-12 14:51:23 +00:00
rf (str): The RF to set.
Returns:
2024-01-12 14:51:23 +00:00
bool: True if the RF was set successfully, False otherwise.
"""
2024-01-12 14:51:23 +00:00
if self.connected:
try:
command = f"L RFPOWER {rf/100}" #RF RFPOWER --> RFPOWER == IC705
2024-01-12 14:51:23 +00:00
self.send_command(command)
self.parameters['rf'] = rf
return True
except Exception as err:
self.log.warning(f"[RIGCTLD] Error setting RF: {err}")
self.connected = False
return False
2023-01-04 19:12:03 +00:00
2024-01-12 14:51:23 +00:00
def get_parameters(self):
if not self.connected:
self.connect()
2023-01-04 19:12:03 +00:00
2024-01-12 14:51:23 +00:00
if self.connected:
self.parameters['frequency'] = self.send_command('f')
response = self.send_command(
'm').strip() # Get the mode/bandwidth response and remove leading/trailing spaces
try:
mode, bandwidth = response.split('\n', 1) # Split the response into mode and bandwidth
except ValueError:
print(response)
mode = 'err'
bandwidth = 'err'
2023-01-04 19:12:03 +00:00
2024-01-12 14:51:23 +00:00
self.parameters['mode'] = mode
self.parameters['bandwidth'] = bandwidth
2023-01-04 19:12:03 +00:00
2024-01-12 14:51:23 +00:00
self.parameters['alc'] = self.send_command('l ALC')
self.parameters['strength'] = self.send_command('l STRENGTH')
self.parameters['rf'] = self.send_command('l RFPOWER') # RF, RFPOWER
2024-01-12 14:51:23 +00:00
"""Return the latest fetched parameters."""
return self.parameters