From 7714c7aeb633a02fd86d9489ea1af6010b711b62 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 10:29:13 +0100 Subject: [PATCH] added socket interface to modem --- modem/config.ini.example | 6 ++ modem/config.py | 7 ++ modem/server.py | 2 +- modem/service_manager.py | 8 +- modem/socket_interface.py | 193 +++++++++++++++++++++++++++++++++++ modem/vara_interface.py | 116 --------------------- tests/test_p2p_connection.py | 2 - 7 files changed, 213 insertions(+), 121 deletions(-) create mode 100644 modem/socket_interface.py delete mode 100644 modem/vara_interface.py diff --git a/modem/config.ini.example b/modem/config.ini.example index c8c7dae1..1061ca76 100644 --- a/modem/config.ini.example +++ b/modem/config.ini.example @@ -50,6 +50,12 @@ enable_morse_identifier = False respond_to_cq = True tx_delay = 200 +[SOCKET_INTERFACE] +enable = False +host = 127.0.0.1 +cmd_port = 8000 +data_port = 8001 + [MESSAGES] enable_auto_repeat = False diff --git a/modem/config.py b/modem/config.py index fa9926df..a12572fd 100644 --- a/modem/config.py +++ b/modem/config.py @@ -61,6 +61,13 @@ class CONFIG: 'respond_to_cq': bool, 'tx_delay': int }, + 'SOCKET_INTERFACE': { + 'enable' : bool, + 'host' : str, + 'cmd_port' : int, + 'data_port' : int, + + }, 'MESSAGES': { 'enable_auto_repeat': bool, } diff --git a/modem/server.py b/modem/server.py index c822a093..d335f1f4 100644 --- a/modem/server.py +++ b/modem/server.py @@ -327,7 +327,7 @@ def sock_states(sock): @atexit.register def stop_server(): try: - app.service_manager.stop_modem() + app.service_manager.modem_service.put("stop") if app.service_manager.modem: app.service_manager.modem.sd_input_stream.stop audio.sd._terminate() diff --git a/modem/service_manager.py b/modem/service_manager.py index 2f36a5bb..270f3c60 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -5,7 +5,7 @@ import structlog import audio import radio_manager - +from socket_interface import SocketInterfaceHandler class SM: def __init__(self, app): @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - + self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager) runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True @@ -34,9 +34,13 @@ class SM: self.start_radio_manager() self.start_modem() + self.socket_interface_manager.start_servers() + elif cmd in ['stop'] and self.modem: self.stop_modem() self.stop_radio_manager() + + self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) diff --git a/modem/socket_interface.py b/modem/socket_interface.py new file mode 100644 index 00000000..9fdb8de7 --- /dev/null +++ b/modem/socket_interface.py @@ -0,0 +1,193 @@ +import socketserver +import threading +import logging +import structlog +import select +from queue import Queue + +from command_p2p_connection import P2PConnectionCommand + +# Shared queue for command and data handlers +data_queue = Queue() + +class CommandHandler(socketserver.BaseRequestHandler): + def __init__(self, request, client_address, server): + self.logger = structlog.get_logger(type(self).__name__) + + self.handlers = { + 'CONNECT': self.handle_connect, + 'DISCONNECT': self.handle_disconnect, + 'MYCALL': self.handle_mycall, + 'BW': self.handle_bw, + 'ABORT': self.handle_abort, + 'PUBLIC': self.handle_public, + 'CWID': self.handle_cwid, + 'LISTEN': self.handle_listen, + 'COMPRESSION': self.handle_compression, + 'WINLINK SESSION': self.handle_winlink_session, + } + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def handle(self): + self.log(f"Client connected: {self.client_address}") + try: + while True: + data = self.request.recv(1024).strip() + if not data: + break + decoded_data = data.decode() + self.log(f"Command received from {self.client_address}: {decoded_data}") + self.parse_command(decoded_data) + finally: + self.log(f"Command connection closed with {self.client_address}") + + def parse_command(self, data): + for command in self.handlers: + if data.startswith(command): + # Extract command arguments after the command itself + args = data[len(command):].strip().split() + self.dispatch_command(command, args) + return + self.send_response("ERROR: Unknown command\r\n") + + def dispatch_command(self, command, data): + if command in self.handlers: + handler = self.handlers[command] + handler(data) + else: + self.send_response(f"Unknown command: {command}") + + def send_response(self, message): + self.request.sendall(message.encode()) + + # Command handlers + def handle_connect(self, data): + # Your existing connect logic + self.send_response("OK\r\n") + + def handle_disconnect(self, data): + # Your existing disconnect logic + self.send_response("OK\r\n") + + def handle_mycall(self, data): + # Logic for handling MYCALL command + self.send_response("OK\r\n") + + def handle_bw(self, data): + # Logic for handling BW command + self.send_response("OK\r\n") + + def handle_abort(self, data): + # Logic for handling ABORT command + self.send_response("OK\r\n") + + def handle_public(self, data): + # Logic for handling PUBLIC command + self.send_response("OK\r\n") + + def handle_cwid(self, data): + # Logic for handling CWID command + self.send_response("OK\r\n") + + def handle_listen(self, data): + # Logic for handling LISTEN command + self.send_response("OK\r\n") + + def handle_compression(self, data): + # Logic for handling COMPRESSION command + self.send_response("OK\r\n") + + def handle_winlink_session(self, data): + # Logic for handling WINLINK SESSION command + self.send_response("OK\r\n") + +class DataHandler(socketserver.BaseRequestHandler): + def __init__(self, request, client_address, server): + self.logger = structlog.get_logger(type(self).__name__) + + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + def handle(self): + self.log(f"Data connection established with {self.client_address}") + + try: + while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout + if ready_to_read: + self.data = self.request.recv(1024).strip() + if not self.data: + break + try: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") + except: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + + # Check if there's something to send from the queue, without blocking + if not data_queue.empty(): + data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + self.log(f"Sent data to {self.client_address}") + + finally: + self.log(f"Data connection closed with {self.client_address}") +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + + +class SocketInterfaceHandler: + def __init__(self, config, state_manager): + self.config = config + self.state_manager = state_manager + self.logger = structlog.get_logger(type(self).__name__) + self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"] + self.data_port = self.config["SOCKET_INTERFACE"]["data_port"] + self.command_server = None + self.data_server = None + self.command_server_thread = None + self.data_server_thread = None + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def start_servers(self): + # Method to start both command and data server threads + self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandHandler)) + self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataHandler)) + + self.command_server_thread.start() + self.data_server_thread.start() + + self.log(f"Interfaces started") + + def run_server(self, port, handler): + with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + self.log(f"Server started on port {port}") + if port == self.command_port: + self.command_server = server + else: + self.data_server = server + server.serve_forever() + + def stop_servers(self): + # Gracefully shutdown the server + if self.command_server: + self.command_server.shutdown() + if self.data_server: + self.data_server.shutdown() + self.log(f"Interfaces stopped") + + def wait_for_server_threads(self): + # Wait for both server threads to finish + self.command_server_thread.join() + self.data_server_thread.join() diff --git a/modem/vara_interface.py b/modem/vara_interface.py deleted file mode 100644 index d1143e45..00000000 --- a/modem/vara_interface.py +++ /dev/null @@ -1,116 +0,0 @@ -import socketserver -import threading -import logging -import signal -import sys -import select -from queue import Queue - -from command_p2p_connection import P2PConnectionCommand - -# Shared queue for command and data handlers -data_queue = Queue() -# Initialize logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - - -class VARACommandHandler(socketserver.BaseRequestHandler): - mycall = None # Class attribute to store mycall - dxcall = None - bandwidth = None # Class attribute to store bandwidth - - - def handle(self): - logging.info(f"Command connection established with {self.client_address}") - try: - while True: - self.data = self.request.recv(1024).strip() - if not self.data: - break - logging.info(f"Command received from {self.client_address}: {self.data}") - - if self.data.startswith(b'MYCALL '): - VARACommandHandler.mycall = self.data.split(b' ')[1].strip() - self.request.sendall(b"OK\r\n") - elif self.data.startswith(b'BW'): - VARACommandHandler.bandwidth = self.data[2:].strip() - self.request.sendall(b"OK\r\n") - elif self.data.startswith(b'CONNECT '): - - P2PConnectionCommand.connect('MYCALL', 'DXCALL', 'BANDWIDTH') - - self.request.sendall(b"OK\r\n") - parts = self.data.split() - if len(parts) >= 3 and VARACommandHandler.mycall and VARACommandHandler.bandwidth: - VARACommandHandler.dxcall = parts[2] - # Using the stored mycall and bandwidth for the response - bytestring = b'CONNECTED ' + VARACommandHandler.mycall + b' ' + VARACommandHandler.dxcall + b' ' + VARACommandHandler.bandwidth + b'\r\n' - self.request.sendall(bytestring) - - else: - self.request.sendall(b"ERROR: MYCALL or Bandwidth not set.\r\n") - elif self.data.startswith(b'ABORT'): - bytestring = b'DISCONNECTED\r\n' - elif self.data.startswith(b'DISCONNECT'): - bytestring = b'DISCONNECTED\r\n' - self.request.sendall(bytestring) - else: - self.request.sendall(b"OK\r\n") - - finally: - logging.info(f"Command connection closed with {self.client_address}") - - -class VARADataHandler(socketserver.BaseRequestHandler): - def handle(self): - logging.info(f"Data connection established with {self.client_address}") - - try: - while True: - ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout - if ready_to_read: - self.data = self.request.recv(1024).strip() - if not self.data: - break - try: - logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") - except: - logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") - - - - # Check if there's something to send from the queue, without blocking - if not data_queue.empty(): - data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking - self.request.sendall(data_to_send) - logging.info(f"Sent data to {self.client_address}") - - finally: - logging.info(f"Data connection closed with {self.client_address}") -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - allow_reuse_address = True - -def run_server(port, handler): - with ThreadedTCPServer(('127.0.0.1', port), handler) as server: - logging.info(f"Server running on port {port}") - server.serve_forever() - - -def signal_handler(sig, frame): - sys.exit(0) - -if __name__ == '__main__': - # Setup signal handler for graceful shutdown - signal.signal(signal.SIGINT, signal_handler) - - # Create server threads for command and data ports - command_server_thread = threading.Thread(target=run_server, args=(8300, VARACommandHandler)) - data_server_thread = threading.Thread(target=run_server, args=(8301, VARADataHandler)) - - # Start the server threads - command_server_thread.start() - data_server_thread.start() - - # Wait for both server threads to finish - command_server_thread.join() - data_server_thread.join() diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index 9c0cf73b..b8cf2da4 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -151,8 +151,6 @@ class TestP2PConnectionSession(unittest.TestCase): self.iss_state_manager.register_p2p_connection_session(session) session.connect() - - self.waitAndCloseChannels() del cmd