From 5ed11d771fbcf506c8a257c6514457d9ab0e93cf Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 11:19:13 +0100 Subject: [PATCH] added data and command queues --- modem/p2p_connection.py | 89 +++++++++++++++++++----------- modem/socket_interface.py | 20 +++++-- modem/socket_interface_commands.py | 11 ++-- tests/test_p2p_connection.py | 22 ++++++++ 4 files changed, 99 insertions(+), 43 deletions(-) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index fd90ed5b..6ce25047 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -6,6 +6,7 @@ import data_frame_factory import structlog import random from queue import Queue +import time class States(Enum): NEW = 0 @@ -37,7 +38,7 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', }, States.PAYLOAD_SENT: { - FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', }, States.DISCONNECTING: { FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', @@ -63,26 +64,13 @@ class P2PConnection: self.states = state_manager self.modem = modem - self.p2p_rx_queue = Queue() - self.p2p_tx_queue = Queue() + self.p2p_data_rx_queue = Queue() + self.p2p_data_tx_queue = Queue() self.state = States.NEW self.session_id = self.generate_id() - def generate_random_string(min_length, max_length): - import string - length = random.randint(min_length, max_length) - return ''.join(random.choices(string.ascii_letters, k=length)) - - # Generate and add 5 random entries to the queue - for _ in range(1): - random_entry = generate_random_string(2, 11) - self.p2p_tx_queue.put(random_entry) - - - - self.event_frame_received = threading.Event() self.RETRIES_CONNECT = 1 @@ -93,6 +81,30 @@ class P2PConnection: self.is_ISS = False # Indicator, if we are ISS or IRS + self.last_data_timestamp= time.time() + + self.start_data_processing_worker() + + def start_data_processing_worker(self): + """Starts a worker thread to monitor the transmit data queue and process data.""" + + def data_processing_worker(): + while True: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT: + self.disconnect() + return + + if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED: + self.process_data_queue() + threading.Event().wait(0.1) + + + + + # Create and start the worker thread + worker_thread = threading.Thread(target=data_processing_worker, daemon=True) + worker_thread.start() + def generate_id(self): while True: random_int = random.randint(1,255) @@ -102,7 +114,6 @@ class P2PConnection: if len(self.states.p2p_connection_sessions) >= 255: return False - def set_details(self, snr, frequency_offset): self.snr = snr self.frequency_offset = frequency_offset @@ -120,6 +131,7 @@ class P2PConnection: self.state = state def on_frame_received(self, frame): + self.last_data_timestamp = time.time() self.event_frame_received.set() self.log(f"Received {frame['frame_type']}") frame_type = frame['frame_type_int'] @@ -153,6 +165,7 @@ class P2PConnection: self.log("Timeout!") retries = retries - 1 + #self.connected_iss() # override connection state for simulation purposes self.session_failed() def launch_twr(self, frame_or_burst, timeout, retries, mode): @@ -179,35 +192,38 @@ class P2PConnection: self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return - def connected_iss(self, frame): + def connected_iss(self): self.log("CONNECTED ISS...........................") self.set_state(States.CONNECTED) self.is_ISS = True - self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) - self.process_data_queue() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) def connected_irs(self, frame): self.log("CONNECTED IRS...........................") + self.states.register_p2p_connection_session(self) self.set_state(States.CONNECTED) self.is_ISS = False self.orign = frame["origin"] - self.destination = frame["destination"] + self.destination = frame["destination_crc"] - self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def session_failed(self): self.set_state(States.FAILED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() def process_data_queue(self, frame=None): - if not self.p2p_tx_queue.empty(): + if not self.p2p_data_tx_queue.empty(): print("processing data....") self.set_state(States.PAYLOAD_SENT) - data = self.p2p_tx_queue.get() + data = self.p2p_data_tx_queue.get() sequence_id = random.randint(0,255) data = data.encode('utf-8') @@ -218,30 +234,35 @@ class P2PConnection: self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=mode) return - print("ALL DATA SENT!!!!!") - self.disconnect() def prepare_data_chunk(self, data, mode): return data def received_data(self, frame): print(frame) + self.p2p_data_rx_queue.put(frame['data']) ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def transmit_data_ack(self, frame): print(frame) + def transmitted_data(self, frame): + print("transmitted data...") + self.set_state(States.CONNECTED) + def disconnect(self): - self.set_state(States.DISCONNECTING) - disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) - self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + if self.state not in [States.DISCONNECTING, States.DISCONNECTED]: + self.set_state(States.DISCONNECTING) + disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) + self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return def received_disconnect(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() self.is_ISS = False disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id) self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) @@ -249,7 +270,8 @@ class P2PConnection: def received_disconnect_ack(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() def transmit_arq(self): @@ -259,4 +281,5 @@ class P2PConnection: #if command.run(app.modem_events, app.service_manager.modem): def received_arq(self): - pass \ No newline at end of file + pass + diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 4c18c6d2..7e6079b9 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -15,7 +15,6 @@ class CommandSocket(socketserver.BaseRequestHandler): self.event_manager = event_manager self.config_manager = config_manager self.modem = modem - print(self.config_manager) self.logger = structlog.get_logger(type(self).__name__) self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager) @@ -86,11 +85,13 @@ class DataSocket(socketserver.BaseRequestHandler): msg = f"[{type(self).__name__}]: {message}" logger = self.logger.warn if isWarning else self.logger.info logger(msg) + def handle(self): self.log(f"Data connection established with {self.client_address}") try: while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout if ready_to_read: self.data = self.request.recv(1024).strip() @@ -98,14 +99,21 @@ class DataSocket(socketserver.BaseRequestHandler): break try: self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") - except: + except Exception: self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + for session in self.state_manager.p2p_connection_sessions: + print(f"sessions: {session}") + session.p2p_data_tx_queue.put(self.data) + # Check if there's something to send from the queue, without blocking - if not data_queue.empty(): - data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking - self.request.sendall(data_to_send) - self.log(f"Sent data to {self.client_address}") + + for session_id in self.state_manager.p2p_connection_sessions: + session = self.state_manager.get_p2p_connection_session(session_id) + if not session.p2p_data_tx_queue.empty(): + data_to_send = session.p2p_data_tx_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + self.log(f"Sent data to {self.client_address}") finally: self.log(f"Data connection closed with {self.client_address}") diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 08885451..2db96532 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -9,6 +9,8 @@ class SocketCommandHandler: self.state_manager = state_manager self.event_manager = event_manager + self.session = None + def send_response(self, message): full_message = f"{message}\r\n" self.cmd_request.sendall(full_message.encode()) @@ -22,10 +24,11 @@ class SocketCommandHandler: 'destination': data[1], } cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self) - session = cmd.run(self.event_manager.queues, self.modem) - if session.session_id: - self.state_manager.register_p2p_connection_session(session) - session.connect() + self.session = cmd.run(self.event_manager.queues, self.modem) + if self.session.session_id: + self.state_manager.register_p2p_connection_session(self.session) + self.session.connect() + def handle_disconnect(self, data): # Your existing connect logic diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index b8cf2da4..c47cdf20 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -136,6 +136,11 @@ class TestP2PConnectionSession(unittest.TestCase): self.waitForSession(self.irs_event_queue, False) self.channels_running = False + def generate_random_string(self, min_length, max_length): + import string + length = random.randint(min_length, max_length) + return ''.join(random.choices(string.ascii_letters, k=length))# + def testARQSessionSmallPayload(self): # set Packet Error Rate (PER) / frame loss probability self.loss_probability = 0 @@ -151,8 +156,25 @@ class TestP2PConnectionSession(unittest.TestCase): self.iss_state_manager.register_p2p_connection_session(session) session.connect() + # Generate and add 5 random entries to the queue + for _ in range(5): + random_entry = self.generate_random_string(2, 11) + session.p2p_data_tx_queue.put(random_entry) + self.waitAndCloseChannels() del cmd + +class TestSocket: + def __init__(self, isCmd=True): + self.isCmd = isCmd + self.sent_data = [] # To capture data sent through this socket + self.received_data = b"" # To simulate data received by this socket + + def sendall(self, data): + print(f"Mock sendall called with data: {data}") + self.sent_data.append(data) + + if __name__ == '__main__': unittest.main()