diff --git a/modem/service_manager.py b/modem/service_manager.py index 270f3c60..10a943f0 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager) + self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager, self.event_manager) runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 9fdb8de7..d2362b69 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -1,30 +1,33 @@ import socketserver import threading -import logging import structlog import select from queue import Queue - -from command_p2p_connection import P2PConnectionCommand +from socket_interface_commands import SocketCommandHandler # Shared queue for command and data handlers data_queue = Queue() -class CommandHandler(socketserver.BaseRequestHandler): - def __init__(self, request, client_address, server): +class CommandSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + self.state_manager = state_manager + self.logger = structlog.get_logger(type(self).__name__) + self.command_handler = SocketCommandHandler(request, self.state_manager) + self.handlers = { - 'CONNECT': self.handle_connect, - 'DISCONNECT': self.handle_disconnect, - 'MYCALL': self.handle_mycall, - 'BW': self.handle_bw, - 'ABORT': self.handle_abort, - 'PUBLIC': self.handle_public, - 'CWID': self.handle_cwid, - 'LISTEN': self.handle_listen, - 'COMPRESSION': self.handle_compression, - 'WINLINK SESSION': self.handle_winlink_session, + 'CONNECT': self.command_handler.handle_connect, + 'DISCONNECT': self.command_handler.handle_disconnect, + 'MYCALL': self.command_handler.handle_mycall, + 'BW': self.command_handler.handle_bw, + 'ABORT': self.command_handler.handle_abort, + 'PUBLIC': self.command_handler.handle_public, + 'CWID': self.command_handler.handle_cwid, + 'LISTEN': self.command_handler.handle_listen, + 'COMPRESSION': self.command_handler.handle_compression, + 'WINLINK SESSION': self.command_handler.handle_winlink_session, } super().__init__(request, client_address, server) @@ -62,52 +65,13 @@ class CommandHandler(socketserver.BaseRequestHandler): else: self.send_response(f"Unknown command: {command}") - def send_response(self, message): - self.request.sendall(message.encode()) - # Command handlers - def handle_connect(self, data): - # Your existing connect logic - self.send_response("OK\r\n") - def handle_disconnect(self, data): - # Your existing disconnect logic - self.send_response("OK\r\n") +class DataSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + self.state_manager = state_manager - def handle_mycall(self, data): - # Logic for handling MYCALL command - self.send_response("OK\r\n") - - def handle_bw(self, data): - # Logic for handling BW command - self.send_response("OK\r\n") - - def handle_abort(self, data): - # Logic for handling ABORT command - self.send_response("OK\r\n") - - def handle_public(self, data): - # Logic for handling PUBLIC command - self.send_response("OK\r\n") - - def handle_cwid(self, data): - # Logic for handling CWID command - self.send_response("OK\r\n") - - def handle_listen(self, data): - # Logic for handling LISTEN command - self.send_response("OK\r\n") - - def handle_compression(self, data): - # Logic for handling COMPRESSION command - self.send_response("OK\r\n") - - def handle_winlink_session(self, data): - # Logic for handling WINLINK SESSION command - self.send_response("OK\r\n") - -class DataHandler(socketserver.BaseRequestHandler): - def __init__(self, request, client_address, server): self.logger = structlog.get_logger(type(self).__name__) super().__init__(request, client_address, server) @@ -139,14 +103,27 @@ class DataHandler(socketserver.BaseRequestHandler): finally: self.log(f"Data connection closed with {self.client_address}") -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + + +#class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): +# allow_reuse_address = True + + +class CustomThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs): + self.extra_args = kwargs + super().__init__(server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) + + def finish_request(self, request, client_address): + self.RequestHandlerClass(request, client_address, self, **self.extra_args) class SocketInterfaceHandler: - def __init__(self, config, state_manager): + def __init__(self, config, state_manager, event_manager): self.config = config self.state_manager = state_manager + self.event_manager = event_manager self.logger = structlog.get_logger(type(self).__name__) self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"] self.data_port = self.config["SOCKET_INTERFACE"]["data_port"] @@ -162,8 +139,8 @@ class SocketInterfaceHandler: def start_servers(self): # Method to start both command and data server threads - self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandHandler)) - self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataHandler)) + self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandSocket)) + self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataSocket)) self.command_server_thread.start() self.data_server_thread.start() @@ -171,7 +148,7 @@ class SocketInterfaceHandler: self.log(f"Interfaces started") def run_server(self, port, handler): - with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + with CustomThreadedTCPServer(('127.0.0.1', port), handler, state_manager=self.state_manager, event_manager=self.event_manager) as server: self.log(f"Server started on port {port}") if port == self.command_port: self.command_server = server diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py new file mode 100644 index 00000000..2d252b59 --- /dev/null +++ b/modem/socket_interface_commands.py @@ -0,0 +1,68 @@ +from command_p2p_connection import P2PConnectionCommand + +class SocketCommandHandler: + + def __init__(self, request, state_manager): + self.request = request + self.state_manager = state_manager + + print(self.state_manager) + + + def send_response(self, message): + self.request.sendall(message.encode()) + + + + def handle_connect(self, data): + # Your existing connect logic + self.send_response("OK\r\n") + + params = { + 'destination': "BB2BBB-2", + 'origin': "AA1AAA-1", + } + cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) + session = cmd.run(self.iss_event_queue, self.iss_modem) + if session.session_id: + self.iss_state_manager.register_p2p_connection_session(session) + session.connect() + + + + + def handle_disconnect(self, data): + # Your existing disconnect logic + self.send_response("OK\r\n") + + def handle_mycall(self, data): + # Logic for handling MYCALL command + self.send_response("OK\r\n") + + def handle_bw(self, data): + # Logic for handling BW command + self.send_response("OK\r\n") + + def handle_abort(self, data): + # Logic for handling ABORT command + self.send_response("OK\r\n") + + def handle_public(self, data): + # Logic for handling PUBLIC command + self.send_response("OK\r\n") + + def handle_cwid(self, data): + # Logic for handling CWID command + self.send_response("OK\r\n") + + def handle_listen(self, data): + # Logic for handling LISTEN command + self.send_response("OK\r\n") + + def handle_compression(self, data): + # Logic for handling COMPRESSION command + self.send_response("OK\r\n") + + def handle_winlink_session(self, data): + # Logic for handling WINLINK SESSION command + self.send_response("OK\r\n") \ No newline at end of file