From 20b1fe7e2d82d2d69d5f11f189eebe33f4d22885 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 9 Mar 2024 10:47:27 +0100 Subject: [PATCH 01/18] WIP p2p --- modem/command_p2p_connection.py | 37 +++++ modem/data_frame_factory.py | 99 +++++++++++- modem/frame_dispatcher.py | 17 ++- modem/frame_handler.py | 16 +- modem/frame_handler_p2p_connection.py | 52 +++++++ modem/modem_frametypes.py | 9 +- modem/p2p_connection.py | 211 ++++++++++++++++++++++++++ modem/server.py | 1 + modem/state_manager.py | 13 ++ modem/vara_interface.py | 116 ++++++++++++++ tests/test_p2p_connection.py | 160 +++++++++++++++++++ 11 files changed, 721 insertions(+), 10 deletions(-) create mode 100644 modem/command_p2p_connection.py create mode 100644 modem/frame_handler_p2p_connection.py create mode 100644 modem/p2p_connection.py create mode 100644 modem/vara_interface.py create mode 100644 tests/test_p2p_connection.py diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py new file mode 100644 index 00000000..74fb6a17 --- /dev/null +++ b/modem/command_p2p_connection.py @@ -0,0 +1,37 @@ +import queue +from command import TxCommand +import api_validations +import base64 +from queue import Queue +from p2p_connection import P2PConnection + +class P2PConnectionCommand(TxCommand): + + def set_params_from_api(self, apiParams): + self.origin = apiParams['origin'] + if not api_validations.validate_freedata_callsign(self.origin): + self.origin = f"{self.origin}-0" + + self.destination = apiParams['destination'] + if not api_validations.validate_freedata_callsign(self.destination): + self.destination = f"{self.destination}-0" + + + def connect(self, event_queue: Queue, modem): + pass + + def run(self, event_queue: Queue, modem): + try: + self.emit_event(event_queue) + self.logger.info(self.log_message()) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager) + if session.session_id: + self.state_manager.register_p2p_connection_session(session) + session.connect() + return session + return False + + except Exception as e: + self.log(f"Error starting P2P Connection session: {e}", isWarning=True) + + return False \ No newline at end of file diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 2acc837c..560bdb35 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -28,6 +28,7 @@ class DataFrameFactory: self._load_ping_templates() self._load_fec_templates() self._load_arq_templates() + self._load_p2p_connection_templates() def _load_broadcast_templates(self): # cq frame @@ -159,6 +160,52 @@ class DataFrameFactory: "snr": 1, "flag": 1, } + + def _load_p2p_connection_templates(self): + # p2p connect request + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # connect ACK + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # heartbeat for "is alive" + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # ack heartbeat + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # p2p payload frames + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD.value] = { + "frame_length": None, + "session_id": 1, + "sequence_id": 1, + "data": "dynamic", + } + + # p2p payload frame ack + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + "sequence_id": 1, + } + + def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): @@ -402,8 +449,9 @@ class DataFrameFactory: "offset": offset.to_bytes(4, 'big'), "data": data, } - frame = self.construct(FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode)) - return frame + return self.construct( + FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode) + ) def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int, frames_per_burst: int, snr: int, flag_final=False, flag_checksum=False, flag_abort=False): @@ -426,3 +474,50 @@ class DataFrameFactory: "flag": flag.to_bytes(1, 'big'), } return self.construct(FR_TYPE.ARQ_BURST_ACK, payload) + + def build_p2p_connection_connect(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT, payload) + + def build_p2p_connection_connect_ack(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) + + def build_p2p_connection_heartbeat(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) + + def build_p2p_connection_heartbeat_ack(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) + + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + "data": data, + } + return self.construct( + FR_TYPE.P2P_CONNECTION_PAYLOAD, + payload, + self.get_bytes_per_frame(freedv_mode), + ) + + def build_p2p_connection_payload_ack(self, session_id, sequence_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload) \ No newline at end of file diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index c7f860bb..88ddaf94 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -13,8 +13,11 @@ from frame_handler import FrameHandler from frame_handler_ping import PingFrameHandler from frame_handler_cq import CQFrameHandler from frame_handler_arq_session import ARQFrameHandler +from frame_handler_p2p_connection import P2PConnectionFrameHandler from frame_handler_beacon import BeaconFrameHandler + + class DISPATCHER(): FRAME_HANDLER = { @@ -22,9 +25,15 @@ class DISPATCHER(): FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"}, FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"}, FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"}, - FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"}, - FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, - FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.P2P_CONNECTION_CONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT"}, + FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT ACK"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD ACK"}, + + #FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, + #FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"}, FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"}, FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"}, @@ -82,7 +91,7 @@ class DISPATCHER(): if frametype not in self.FRAME_HANDLER: self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + "[DISPATCHER] ARQ - other frame type", frametype=FR_TYPE(frametype).name) return # instantiate handler diff --git a/modem/frame_handler.py b/modem/frame_handler.py index 46347d1e..89493bbe 100644 --- a/modem/frame_handler.py +++ b/modem/frame_handler.py @@ -34,7 +34,7 @@ class FrameHandler(): ft = self.details['frame']['frame_type'] valid = False # Check for callsign checksum - if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']: + if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK', 'P2P_CONNECTION_CONNECT']: valid, mycallsign = helpers.check_callsign( call_with_ssid, self.details["frame"]["destination_crc"], @@ -51,6 +51,20 @@ class FrameHandler(): session_id = self.details['frame']['session_id'] if session_id in self.states.arq_iss_sessions: valid = True + + # check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT']: + valid, mycallsign = helpers.check_callsign( + call_with_ssid, + self.details["frame"]["destination_crc"], + self.config['STATION']['ssid_list']) + + #check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK']: + session_id = self.details['frame']['session_id'] + if session_id in self.states.p2p_connection_sessions: + valid = True + else: valid = False diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py new file mode 100644 index 00000000..13315487 --- /dev/null +++ b/modem/frame_handler_p2p_connection.py @@ -0,0 +1,52 @@ +from queue import Queue +import frame_handler +from event_manager import EventManager +from state_manager import StateManager +from modem_frametypes import FRAME_TYPE as FR +from p2p_connection import P2PConnection + +class P2PConnectionFrameHandler(frame_handler.FrameHandler): + + def follow_protocol(self): + + if not self.should_respond(): + return + + frame = self.details['frame'] + session_id = frame['session_id'] + snr = self.details["snr"] + frequency_offset = self.details["frequency_offset"] + + if frame['frame_type_int'] == FR.P2P_CONNECTION_CONNECT.value: + + # Lost OPEN_ACK case .. ISS will retry opening a session + if session_id in self.states.arq_irs_sessions: + session = self.states.p2p_connection_sessions[session_id] + + # Normal case when receiving a SESSION_OPEN for the first time + else: + # if self.states.check_if_running_arq_session(): + # self.logger.warning("DISCARDING SESSION OPEN because of ongoing ARQ session ", frame=frame) + # return + print(frame) + session = P2PConnection(self.config, + self.modem, + frame['origin'], + frame['destination_crc'], + self.states) + session.session_id = session_id + self.states.register_p2p_connection_session(session) + + elif frame['frame_type_int'] in [ + FR.P2P_CONNECTION_CONNECT_ACK.value, + FR.P2P_CONNECTION_PAYLOAD.value, + FR.P2P_CONNECTION_PAYLOAD_ACK.value, + ]: + session = self.states.get_p2p_connection_session(session_id) + + else: + self.logger.warning("DISCARDING FRAME", frame=frame) + return + + session.set_details(snr, frequency_offset) + session.on_frame_received(frame) diff --git a/modem/modem_frametypes.py b/modem/modem_frametypes.py index 582a7907..da46a033 100644 --- a/modem/modem_frametypes.py +++ b/modem/modem_frametypes.py @@ -6,9 +6,6 @@ from enum import Enum class FRAME_TYPE(Enum): """Lookup for frame types""" - ARQ_CONNECTION_OPEN = 1 - ARQ_CONNECTION_HB = 2 - ARQ_CONNECTION_CLOSE = 3 ARQ_STOP = 10 ARQ_STOP_ACK = 11 ARQ_SESSION_OPEN = 12 @@ -17,6 +14,12 @@ class FRAME_TYPE(Enum): ARQ_SESSION_INFO_ACK = 15 ARQ_BURST_FRAME = 20 ARQ_BURST_ACK = 21 + P2P_CONNECTION_CONNECT = 30 + P2P_CONNECTION_CONNECT_ACK = 31 + P2P_CONNECTION_HEARTBEAT = 32 + P2P_CONNECTION_HEARTBEAT_ACK = 33 + P2P_CONNECTION_PAYLOAD = 34 + P2P_CONNECTION_PAYLOAD_ACK = 35 MESH_BROADCAST = 100 MESH_SIGNALLING_PING = 101 MESH_SIGNALLING_PING_ACK = 102 diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py new file mode 100644 index 00000000..facdf296 --- /dev/null +++ b/modem/p2p_connection.py @@ -0,0 +1,211 @@ +import threading +from enum import Enum +from modem_frametypes import FRAME_TYPE +from codec2 import FREEDV_MODE +import data_frame_factory +import structlog +import random +from queue import Queue + +class States(Enum): + NEW = 0 + CONNECTING = 1 + CONNECT_SENT = 2 + CONNECT_ACK_SENT = 3 + CONNECTED = 4 + HEARTBEAT_SENT = 5 + HEARTBEAT_ACK_SENT = 6 + PAYLOAD_SENT = 7 + DISCONNECTING = 8 + DISCONNECTED = 9 + FAILED = 10 + + + +class P2PConnection: + STATE_TRANSITION = { + States.NEW: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + }, + States.CONNECTING: { + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + }, + States.CONNECTED: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue', + + } + } + + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager): + self.logger = structlog.get_logger(type(self).__name__) + self.config = config + self.frame_factory = data_frame_factory.DataFrameFactory(self.config) + + self.destination = destination + self.origin = origin + self.states = state_manager + self.modem = modem + + self.p2p_rx_queue = Queue() + self.p2p_tx_queue = Queue() + + + self.state = States.NEW + self.session_id = self.generate_id() + + def generate_random_string(min_length, max_length): + import string + length = random.randint(min_length, max_length) + return ''.join(random.choices(string.ascii_letters, k=length)) + + # Generate and add 5 random entries to the queue + for _ in range(1): + random_entry = generate_random_string(2, 11) + self.p2p_tx_queue.put(random_entry) + + + + + self.event_frame_received = threading.Event() + + self.RETRIES_CONNECT = 1 + self.TIMEOUT_CONNECT = 10 + self.TIMEOUT_DATA = 5 + self.RETRIES_DATA = 5 + self.ENTIRE_CONNECTION_TIMEOUT = 100 + + self.is_ISS = False # Indicator, if we are ISS or IRS + + def generate_id(self): + while True: + random_int = random.randint(1,255) + if random_int not in self.states.p2p_connection_sessions: + return random_int + + if len(self.states.p2p_connection_sessions) >= 255: + return False + + + def set_details(self, snr, frequency_offset): + self.snr = snr + self.frequency_offset = frequency_offset + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def set_state(self, state): + if self.state == state: + self.log(f"{type(self).__name__} state {self.state.name} unchanged.") + else: + self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}") + self.state = state + + def on_frame_received(self, frame): + self.event_frame_received.set() + self.log(f"Received {frame['frame_type']}") + frame_type = frame['frame_type_int'] + if self.state in self.STATE_TRANSITION: + if frame_type in self.STATE_TRANSITION[self.state]: + action_name = self.STATE_TRANSITION[self.state][frame_type] + response = getattr(self, action_name)(frame) + + return + + self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") + + def transmit_frame(self, frame: bytearray, mode='auto'): + self.log("Transmitting frame") + if mode in ['auto']: + mode = self.get_mode_by_speed_level(self.speed_level) + + self.modem.transmit(mode, 1, 1, frame) + + def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode): + while retries > 0: + self.event_frame_received = threading.Event() + if isinstance(frame_or_burst, list): burst = frame_or_burst + else: burst = [frame_or_burst] + for f in burst: + self.transmit_frame(f, mode) + self.event_frame_received.clear() + self.log(f"Waiting {timeout} seconds...") + if self.event_frame_received.wait(timeout): + return + self.log("Timeout!") + retries = retries - 1 + + self.session_failed() + + def launch_twr(self, frame_or_burst, timeout, retries, mode): + twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True) + twr.start() + + def transmit_and_wait_irs(self, frame, timeout, mode): + self.event_frame_received.clear() + self.transmit_frame(frame, mode) + self.log(f"Waiting {timeout} seconds...") + #if not self.event_frame_received.wait(timeout): + # self.log("Timeout waiting for ISS. Session failed.") + # self.transmission_failed() + + def launch_twr_irs(self, frame, timeout, mode): + thread_wait = threading.Thread(target = self.transmit_and_wait_irs, + args = [frame, timeout, mode], daemon=True) + thread_wait.start() + + def connect(self): + self.set_state(States.CONNECTING) + self.is_ISS = True + session_open_frame = self.frame_factory.build_p2p_connection_connect(self.origin, self.destination, self.session_id) + self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + return + + def connected_iss(self, frame): + self.log("CONNECTED ISS...........................") + self.set_state(States.CONNECTED) + self.is_ISS = True + self.process_data_queue() + + def connected_irs(self, frame): + self.log("CONNECTED IRS...........................") + self.set_state(States.CONNECTED) + self.is_ISS = False + session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) + self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def session_failed(self): + self.log("FAILED...........................") + self.set_state(States.FAILED) + + def process_data_queue(self, frame=None): + print("processing data....") + print(self.p2p_tx_queue.empty()) + if not self.p2p_tx_queue.empty(): + data = self.p2p_tx_queue.get() + sequence_id = random.randint(0,255) + data = data.encode('utf-8') + + if len(data) <= 11: + mode = FREEDV_MODE.signalling + + payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, + mode=mode) + return + print("ALL DATA SENT!!!!!") + + def prepare_data_chunk(self, data, mode): + return data + + def received_data(self, frame): + print(frame) + ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) + self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def transmit_data_ack(self, frame): + print(frame) diff --git a/modem/server.py b/modem/server.py index f6a80421..37617706 100644 --- a/modem/server.py +++ b/modem/server.py @@ -329,6 +329,7 @@ if __name__ == "__main__": app.config_manager = CONFIG(config_file) # start modem + app.p2p_data_queue = queue.Queue() # queue which holds processing data of p2p connections app.state_queue = queue.Queue() # queue which holds latest states app.modem_events = queue.Queue() # queue which holds latest events app.modem_fft = queue.Queue() # queue which holds latest fft data diff --git a/modem/state_manager.py b/modem/state_manager.py index 69b6b44a..91bec80a 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -38,6 +38,8 @@ class StateManager: self.arq_iss_sessions = {} self.arq_irs_sessions = {} + self.p2p_connection_sessions = {} + #self.mesh_routing_table = [] self.radio_frequency = 0 @@ -214,3 +216,14 @@ class StateManager: "radio_rf_level": self.radio_rf_level, "s_meter_strength": self.s_meter_strength, } + + def register_p2p_connection_session(self, session): + if session.session_id in self.p2p_connection_sessions: + return False + self.p2p_connection_sessions[session.session_id] = session + return True + + def get_p2p_connection_session(self, id): + if id not in self.p2p_connection_sessions: + pass + return self.p2p_connection_sessions[id] \ No newline at end of file diff --git a/modem/vara_interface.py b/modem/vara_interface.py new file mode 100644 index 00000000..d1143e45 --- /dev/null +++ b/modem/vara_interface.py @@ -0,0 +1,116 @@ +import socketserver +import threading +import logging +import signal +import sys +import select +from queue import Queue + +from command_p2p_connection import P2PConnectionCommand + +# Shared queue for command and data handlers +data_queue = Queue() +# Initialize logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + +class VARACommandHandler(socketserver.BaseRequestHandler): + mycall = None # Class attribute to store mycall + dxcall = None + bandwidth = None # Class attribute to store bandwidth + + + def handle(self): + logging.info(f"Command connection established with {self.client_address}") + try: + while True: + self.data = self.request.recv(1024).strip() + if not self.data: + break + logging.info(f"Command received from {self.client_address}: {self.data}") + + if self.data.startswith(b'MYCALL '): + VARACommandHandler.mycall = self.data.split(b' ')[1].strip() + self.request.sendall(b"OK\r\n") + elif self.data.startswith(b'BW'): + VARACommandHandler.bandwidth = self.data[2:].strip() + self.request.sendall(b"OK\r\n") + elif self.data.startswith(b'CONNECT '): + + P2PConnectionCommand.connect('MYCALL', 'DXCALL', 'BANDWIDTH') + + self.request.sendall(b"OK\r\n") + parts = self.data.split() + if len(parts) >= 3 and VARACommandHandler.mycall and VARACommandHandler.bandwidth: + VARACommandHandler.dxcall = parts[2] + # Using the stored mycall and bandwidth for the response + bytestring = b'CONNECTED ' + VARACommandHandler.mycall + b' ' + VARACommandHandler.dxcall + b' ' + VARACommandHandler.bandwidth + b'\r\n' + self.request.sendall(bytestring) + + else: + self.request.sendall(b"ERROR: MYCALL or Bandwidth not set.\r\n") + elif self.data.startswith(b'ABORT'): + bytestring = b'DISCONNECTED\r\n' + elif self.data.startswith(b'DISCONNECT'): + bytestring = b'DISCONNECTED\r\n' + self.request.sendall(bytestring) + else: + self.request.sendall(b"OK\r\n") + + finally: + logging.info(f"Command connection closed with {self.client_address}") + + +class VARADataHandler(socketserver.BaseRequestHandler): + def handle(self): + logging.info(f"Data connection established with {self.client_address}") + + try: + while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout + if ready_to_read: + self.data = self.request.recv(1024).strip() + if not self.data: + break + try: + logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") + except: + logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + + + + # Check if there's something to send from the queue, without blocking + if not data_queue.empty(): + data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + logging.info(f"Sent data to {self.client_address}") + + finally: + logging.info(f"Data connection closed with {self.client_address}") +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + +def run_server(port, handler): + with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + logging.info(f"Server running on port {port}") + server.serve_forever() + + +def signal_handler(sig, frame): + sys.exit(0) + +if __name__ == '__main__': + # Setup signal handler for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) + + # Create server threads for command and data ports + command_server_thread = threading.Thread(target=run_server, args=(8300, VARACommandHandler)) + data_server_thread = threading.Thread(target=run_server, args=(8301, VARADataHandler)) + + # Start the server threads + command_server_thread.start() + data_server_thread.start() + + # Wait for both server threads to finish + command_server_thread.join() + data_server_thread.join() diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py new file mode 100644 index 00000000..9c0cf73b --- /dev/null +++ b/tests/test_p2p_connection.py @@ -0,0 +1,160 @@ +import sys +import time + +sys.path.append('modem') + +import unittest +import unittest.mock +from config import CONFIG +import helpers +import queue +import threading +import base64 +from command_p2p_connection import P2PConnectionCommand +from state_manager import StateManager +from frame_dispatcher import DISPATCHER +import random +import structlog +import numpy as np +from event_manager import EventManager +from state_manager import StateManager +from data_frame_factory import DataFrameFactory +import codec2 +import p2p_connection + + +class TestModem: + def __init__(self, event_q, state_q): + self.data_queue_received = queue.Queue() + self.demodulator = unittest.mock.Mock() + self.event_manager = EventManager([event_q]) + self.logger = structlog.get_logger('Modem') + self.states = StateManager(state_q) + + def getFrameTransmissionTime(self, mode): + samples = 0 + c2instance = codec2.open_instance(mode.value) + samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance) + time = samples / 8000 + return time + + def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: + # Simulate transmission time + tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT + self.logger.info(f"TX {tx_time} seconds...") + threading.Event().wait(tx_time) + + transmission = { + 'mode': mode, + 'bytes': frames, + } + self.data_queue_received.put(transmission) + + +class TestP2PConnectionSession(unittest.TestCase): + + @classmethod + def setUpClass(cls): + config_manager = CONFIG('modem/config.ini.example') + cls.config = config_manager.read() + cls.logger = structlog.get_logger("TESTS") + cls.frame_factory = DataFrameFactory(cls.config) + + # ISS + cls.iss_state_manager = StateManager(queue.Queue()) + cls.iss_event_manager = EventManager([queue.Queue()]) + cls.iss_event_queue = queue.Queue() + cls.iss_state_queue = queue.Queue() + cls.iss_p2p_data_queue = queue.Queue() + + + cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue) + cls.iss_frame_dispatcher = DISPATCHER(cls.config, + cls.iss_event_manager, + cls.iss_state_manager, + cls.iss_modem) + + # IRS + cls.irs_state_manager = StateManager(queue.Queue()) + cls.irs_event_manager = EventManager([queue.Queue()]) + cls.irs_event_queue = queue.Queue() + cls.irs_state_queue = queue.Queue() + cls.irs_p2p_data_queue = queue.Queue() + cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue) + cls.irs_frame_dispatcher = DISPATCHER(cls.config, + cls.irs_event_manager, + cls.irs_state_manager, + cls.irs_modem) + + # Frame loss probability in % + cls.loss_probability = 30 + + cls.channels_running = True + + def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER): + while self.channels_running: + # Transfer data between both parties + try: + transmission = modem_transmit_queue.get(timeout=1) + if random.randint(0, 100) < self.loss_probability: + self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + continue + + frame_bytes = transmission['bytes'] + frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) + except queue.Empty: + continue + self.logger.info(f"[{threading.current_thread().name}] Channel closed.") + + def waitForSession(self, q, outbound=False): + key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' + while True and self.channels_running: + ev = q.get() + if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]): + self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + break + + def establishChannels(self): + self.channels_running = True + self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, + args=[self.iss_modem.data_queue_received, + self.irs_frame_dispatcher], + name="ISS to IRS channel") + self.iss_to_irs_channel.start() + + self.irs_to_iss_channel = threading.Thread(target=self.channelWorker, + args=[self.irs_modem.data_queue_received, + self.iss_frame_dispatcher], + name="IRS to ISS channel") + self.irs_to_iss_channel.start() + + def waitAndCloseChannels(self): + self.waitForSession(self.iss_event_queue, True) + self.channels_running = False + self.waitForSession(self.irs_event_queue, False) + self.channels_running = False + + def testARQSessionSmallPayload(self): + # set Packet Error Rate (PER) / frame loss probability + self.loss_probability = 0 + + self.establishChannels() + params = { + 'destination': "BB2BBB-2", + 'origin': "AA1AAA-1", + } + cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) + session = cmd.run(self.iss_event_queue, self.iss_modem) + if session.session_id: + self.iss_state_manager.register_p2p_connection_session(session) + session.connect() + + + + self.waitAndCloseChannels() + del cmd + +if __name__ == '__main__': + unittest.main() From 216799fe2b597332ad3fc304a924bdc8b5bf57d8 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Fri, 15 Mar 2024 14:25:46 +0100 Subject: [PATCH 02/18] added disconnect --- modem/data_frame_factory.py | 27 +++++++++++++++++++-- modem/frame_dispatcher.py | 3 +++ modem/frame_handler.py | 2 +- modem/frame_handler_p2p_connection.py | 2 ++ modem/modem_frametypes.py | 2 ++ modem/p2p_connection.py | 35 ++++++++++++++++++++++++--- 6 files changed, 65 insertions(+), 6 deletions(-) diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 560bdb35..4577a0ea 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -205,7 +205,18 @@ class DataFrameFactory: "sequence_id": 1, } - + # heartbeat for "is alive" + self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # ack heartbeat + self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): @@ -520,4 +531,16 @@ class DataFrameFactory: "session_id": session_id.to_bytes(1, 'big'), "sequence_id": sequence_id.to_bytes(1, 'big'), } - return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload) \ No newline at end of file + return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload) + + def build_p2p_connection_disconnect(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT, payload) + + def build_p2p_connection_disconnect_ack(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK, payload) diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index 88ddaf94..e2a37ab2 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -27,6 +27,9 @@ class DISPATCHER(): FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"}, FR_TYPE.P2P_CONNECTION_CONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT"}, FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT ACK"}, + FR_TYPE.P2P_CONNECTION_DISCONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection DISCONNECT"}, + FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection DISCONNECT ACK"}, FR_TYPE.P2P_CONNECTION_PAYLOAD.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection PAYLOAD"}, FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler, diff --git a/modem/frame_handler.py b/modem/frame_handler.py index 89493bbe..7375b6c2 100644 --- a/modem/frame_handler.py +++ b/modem/frame_handler.py @@ -60,7 +60,7 @@ class FrameHandler(): self.config['STATION']['ssid_list']) #check for p2p connection - elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK']: + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: session_id = self.details['frame']['session_id'] if session_id in self.states.p2p_connection_sessions: valid = True diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py index 13315487..7dc4931d 100644 --- a/modem/frame_handler_p2p_connection.py +++ b/modem/frame_handler_p2p_connection.py @@ -39,6 +39,8 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler): elif frame['frame_type_int'] in [ FR.P2P_CONNECTION_CONNECT_ACK.value, + FR.P2P_CONNECTION_DISCONNECT.value, + FR.P2P_CONNECTION_DISCONNECT_ACK.value, FR.P2P_CONNECTION_PAYLOAD.value, FR.P2P_CONNECTION_PAYLOAD_ACK.value, ]: diff --git a/modem/modem_frametypes.py b/modem/modem_frametypes.py index da46a033..fbe33f77 100644 --- a/modem/modem_frametypes.py +++ b/modem/modem_frametypes.py @@ -20,6 +20,8 @@ class FRAME_TYPE(Enum): P2P_CONNECTION_HEARTBEAT_ACK = 33 P2P_CONNECTION_PAYLOAD = 34 P2P_CONNECTION_PAYLOAD_ACK = 35 + P2P_CONNECTION_DISCONNECT = 36 + P2P_CONNECTION_DISCONNECT_ACK = 37 MESH_BROADCAST = 100 MESH_SIGNALLING_PING = 101 MESH_SIGNALLING_PING_ACK = 102 diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index facdf296..a50b62f3 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -34,9 +34,19 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + }, + States.PAYLOAD_SENT: { FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue', + }, + States.DISCONNECTING: { + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', + }, + States.DISCONNECTED: { + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', - } + }, } def __init__(self, config: dict, modem, origin: str, destination: str, state_manager): @@ -183,9 +193,10 @@ class P2PConnection: self.set_state(States.FAILED) def process_data_queue(self, frame=None): - print("processing data....") - print(self.p2p_tx_queue.empty()) if not self.p2p_tx_queue.empty(): + print("processing data....") + + self.set_state(States.PAYLOAD_SENT) data = self.p2p_tx_queue.get() sequence_id = random.randint(0,255) data = data.encode('utf-8') @@ -198,6 +209,7 @@ class P2PConnection: mode=mode) return print("ALL DATA SENT!!!!!") + self.disconnect() def prepare_data_chunk(self, data, mode): return data @@ -209,3 +221,20 @@ class P2PConnection: def transmit_data_ack(self, frame): print(frame) + + def disconnect(self): + self.set_state(States.DISCONNECTING) + disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) + self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + return + + def received_disconnect(self, frame): + self.log("DISCONNECTED...............") + self.set_state(States.DISCONNECTED) + self.is_ISS = False + disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id) + self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def received_disconnect_ack(self, frame): + self.log("DISCONNECTED...............") + self.set_state(States.DISCONNECTED) From cbc928f1175ee9a6bfc0a6c4a1e4c98ab8394c95 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Fri, 15 Mar 2024 14:33:06 +0100 Subject: [PATCH 03/18] added dummy functions for continuing work on... --- modem/p2p_connection.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index a50b62f3..2e7219cf 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -238,3 +238,13 @@ class P2PConnection: def received_disconnect_ack(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) + + + def transmit_arq(self): + pass + #command = cmd_class(self.config, self.states, self.eve, params) + #app.logger.info(f"Command {command.get_name()} running...") + #if command.run(app.modem_events, app.service_manager.modem): + + def received_arq(self): + pass \ No newline at end of file From 7714c7aeb633a02fd86d9489ea1af6010b711b62 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 10:29:13 +0100 Subject: [PATCH 04/18] added socket interface to modem --- modem/config.ini.example | 6 ++ modem/config.py | 7 ++ modem/server.py | 2 +- modem/service_manager.py | 8 +- modem/socket_interface.py | 193 +++++++++++++++++++++++++++++++++++ modem/vara_interface.py | 116 --------------------- tests/test_p2p_connection.py | 2 - 7 files changed, 213 insertions(+), 121 deletions(-) create mode 100644 modem/socket_interface.py delete mode 100644 modem/vara_interface.py diff --git a/modem/config.ini.example b/modem/config.ini.example index c8c7dae1..1061ca76 100644 --- a/modem/config.ini.example +++ b/modem/config.ini.example @@ -50,6 +50,12 @@ enable_morse_identifier = False respond_to_cq = True tx_delay = 200 +[SOCKET_INTERFACE] +enable = False +host = 127.0.0.1 +cmd_port = 8000 +data_port = 8001 + [MESSAGES] enable_auto_repeat = False diff --git a/modem/config.py b/modem/config.py index fa9926df..a12572fd 100644 --- a/modem/config.py +++ b/modem/config.py @@ -61,6 +61,13 @@ class CONFIG: 'respond_to_cq': bool, 'tx_delay': int }, + 'SOCKET_INTERFACE': { + 'enable' : bool, + 'host' : str, + 'cmd_port' : int, + 'data_port' : int, + + }, 'MESSAGES': { 'enable_auto_repeat': bool, } diff --git a/modem/server.py b/modem/server.py index c822a093..d335f1f4 100644 --- a/modem/server.py +++ b/modem/server.py @@ -327,7 +327,7 @@ def sock_states(sock): @atexit.register def stop_server(): try: - app.service_manager.stop_modem() + app.service_manager.modem_service.put("stop") if app.service_manager.modem: app.service_manager.modem.sd_input_stream.stop audio.sd._terminate() diff --git a/modem/service_manager.py b/modem/service_manager.py index 2f36a5bb..270f3c60 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -5,7 +5,7 @@ import structlog import audio import radio_manager - +from socket_interface import SocketInterfaceHandler class SM: def __init__(self, app): @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - + self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager) runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True @@ -34,9 +34,13 @@ class SM: self.start_radio_manager() self.start_modem() + self.socket_interface_manager.start_servers() + elif cmd in ['stop'] and self.modem: self.stop_modem() self.stop_radio_manager() + + self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) diff --git a/modem/socket_interface.py b/modem/socket_interface.py new file mode 100644 index 00000000..9fdb8de7 --- /dev/null +++ b/modem/socket_interface.py @@ -0,0 +1,193 @@ +import socketserver +import threading +import logging +import structlog +import select +from queue import Queue + +from command_p2p_connection import P2PConnectionCommand + +# Shared queue for command and data handlers +data_queue = Queue() + +class CommandHandler(socketserver.BaseRequestHandler): + def __init__(self, request, client_address, server): + self.logger = structlog.get_logger(type(self).__name__) + + self.handlers = { + 'CONNECT': self.handle_connect, + 'DISCONNECT': self.handle_disconnect, + 'MYCALL': self.handle_mycall, + 'BW': self.handle_bw, + 'ABORT': self.handle_abort, + 'PUBLIC': self.handle_public, + 'CWID': self.handle_cwid, + 'LISTEN': self.handle_listen, + 'COMPRESSION': self.handle_compression, + 'WINLINK SESSION': self.handle_winlink_session, + } + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def handle(self): + self.log(f"Client connected: {self.client_address}") + try: + while True: + data = self.request.recv(1024).strip() + if not data: + break + decoded_data = data.decode() + self.log(f"Command received from {self.client_address}: {decoded_data}") + self.parse_command(decoded_data) + finally: + self.log(f"Command connection closed with {self.client_address}") + + def parse_command(self, data): + for command in self.handlers: + if data.startswith(command): + # Extract command arguments after the command itself + args = data[len(command):].strip().split() + self.dispatch_command(command, args) + return + self.send_response("ERROR: Unknown command\r\n") + + def dispatch_command(self, command, data): + if command in self.handlers: + handler = self.handlers[command] + handler(data) + else: + self.send_response(f"Unknown command: {command}") + + def send_response(self, message): + self.request.sendall(message.encode()) + + # Command handlers + def handle_connect(self, data): + # Your existing connect logic + self.send_response("OK\r\n") + + def handle_disconnect(self, data): + # Your existing disconnect logic + self.send_response("OK\r\n") + + def handle_mycall(self, data): + # Logic for handling MYCALL command + self.send_response("OK\r\n") + + def handle_bw(self, data): + # Logic for handling BW command + self.send_response("OK\r\n") + + def handle_abort(self, data): + # Logic for handling ABORT command + self.send_response("OK\r\n") + + def handle_public(self, data): + # Logic for handling PUBLIC command + self.send_response("OK\r\n") + + def handle_cwid(self, data): + # Logic for handling CWID command + self.send_response("OK\r\n") + + def handle_listen(self, data): + # Logic for handling LISTEN command + self.send_response("OK\r\n") + + def handle_compression(self, data): + # Logic for handling COMPRESSION command + self.send_response("OK\r\n") + + def handle_winlink_session(self, data): + # Logic for handling WINLINK SESSION command + self.send_response("OK\r\n") + +class DataHandler(socketserver.BaseRequestHandler): + def __init__(self, request, client_address, server): + self.logger = structlog.get_logger(type(self).__name__) + + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + def handle(self): + self.log(f"Data connection established with {self.client_address}") + + try: + while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout + if ready_to_read: + self.data = self.request.recv(1024).strip() + if not self.data: + break + try: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") + except: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + + # Check if there's something to send from the queue, without blocking + if not data_queue.empty(): + data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + self.log(f"Sent data to {self.client_address}") + + finally: + self.log(f"Data connection closed with {self.client_address}") +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + + +class SocketInterfaceHandler: + def __init__(self, config, state_manager): + self.config = config + self.state_manager = state_manager + self.logger = structlog.get_logger(type(self).__name__) + self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"] + self.data_port = self.config["SOCKET_INTERFACE"]["data_port"] + self.command_server = None + self.data_server = None + self.command_server_thread = None + self.data_server_thread = None + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def start_servers(self): + # Method to start both command and data server threads + self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandHandler)) + self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataHandler)) + + self.command_server_thread.start() + self.data_server_thread.start() + + self.log(f"Interfaces started") + + def run_server(self, port, handler): + with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + self.log(f"Server started on port {port}") + if port == self.command_port: + self.command_server = server + else: + self.data_server = server + server.serve_forever() + + def stop_servers(self): + # Gracefully shutdown the server + if self.command_server: + self.command_server.shutdown() + if self.data_server: + self.data_server.shutdown() + self.log(f"Interfaces stopped") + + def wait_for_server_threads(self): + # Wait for both server threads to finish + self.command_server_thread.join() + self.data_server_thread.join() diff --git a/modem/vara_interface.py b/modem/vara_interface.py deleted file mode 100644 index d1143e45..00000000 --- a/modem/vara_interface.py +++ /dev/null @@ -1,116 +0,0 @@ -import socketserver -import threading -import logging -import signal -import sys -import select -from queue import Queue - -from command_p2p_connection import P2PConnectionCommand - -# Shared queue for command and data handlers -data_queue = Queue() -# Initialize logging -logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') - - -class VARACommandHandler(socketserver.BaseRequestHandler): - mycall = None # Class attribute to store mycall - dxcall = None - bandwidth = None # Class attribute to store bandwidth - - - def handle(self): - logging.info(f"Command connection established with {self.client_address}") - try: - while True: - self.data = self.request.recv(1024).strip() - if not self.data: - break - logging.info(f"Command received from {self.client_address}: {self.data}") - - if self.data.startswith(b'MYCALL '): - VARACommandHandler.mycall = self.data.split(b' ')[1].strip() - self.request.sendall(b"OK\r\n") - elif self.data.startswith(b'BW'): - VARACommandHandler.bandwidth = self.data[2:].strip() - self.request.sendall(b"OK\r\n") - elif self.data.startswith(b'CONNECT '): - - P2PConnectionCommand.connect('MYCALL', 'DXCALL', 'BANDWIDTH') - - self.request.sendall(b"OK\r\n") - parts = self.data.split() - if len(parts) >= 3 and VARACommandHandler.mycall and VARACommandHandler.bandwidth: - VARACommandHandler.dxcall = parts[2] - # Using the stored mycall and bandwidth for the response - bytestring = b'CONNECTED ' + VARACommandHandler.mycall + b' ' + VARACommandHandler.dxcall + b' ' + VARACommandHandler.bandwidth + b'\r\n' - self.request.sendall(bytestring) - - else: - self.request.sendall(b"ERROR: MYCALL or Bandwidth not set.\r\n") - elif self.data.startswith(b'ABORT'): - bytestring = b'DISCONNECTED\r\n' - elif self.data.startswith(b'DISCONNECT'): - bytestring = b'DISCONNECTED\r\n' - self.request.sendall(bytestring) - else: - self.request.sendall(b"OK\r\n") - - finally: - logging.info(f"Command connection closed with {self.client_address}") - - -class VARADataHandler(socketserver.BaseRequestHandler): - def handle(self): - logging.info(f"Data connection established with {self.client_address}") - - try: - while True: - ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout - if ready_to_read: - self.data = self.request.recv(1024).strip() - if not self.data: - break - try: - logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") - except: - logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") - - - - # Check if there's something to send from the queue, without blocking - if not data_queue.empty(): - data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking - self.request.sendall(data_to_send) - logging.info(f"Sent data to {self.client_address}") - - finally: - logging.info(f"Data connection closed with {self.client_address}") -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): - allow_reuse_address = True - -def run_server(port, handler): - with ThreadedTCPServer(('127.0.0.1', port), handler) as server: - logging.info(f"Server running on port {port}") - server.serve_forever() - - -def signal_handler(sig, frame): - sys.exit(0) - -if __name__ == '__main__': - # Setup signal handler for graceful shutdown - signal.signal(signal.SIGINT, signal_handler) - - # Create server threads for command and data ports - command_server_thread = threading.Thread(target=run_server, args=(8300, VARACommandHandler)) - data_server_thread = threading.Thread(target=run_server, args=(8301, VARADataHandler)) - - # Start the server threads - command_server_thread.start() - data_server_thread.start() - - # Wait for both server threads to finish - command_server_thread.join() - data_server_thread.join() diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index 9c0cf73b..b8cf2da4 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -151,8 +151,6 @@ class TestP2PConnectionSession(unittest.TestCase): self.iss_state_manager.register_p2p_connection_session(session) session.connect() - - self.waitAndCloseChannels() del cmd From 24f41edb63f15d1eb66ab9ae726704b4a3fe6398 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 11:14:48 +0100 Subject: [PATCH 05/18] add state and event manager instaces to command and data handler --- modem/service_manager.py | 2 +- modem/socket_interface.py | 103 +++++++++++------------------ modem/socket_interface_commands.py | 68 +++++++++++++++++++ 3 files changed, 109 insertions(+), 64 deletions(-) create mode 100644 modem/socket_interface_commands.py diff --git a/modem/service_manager.py b/modem/service_manager.py index 270f3c60..10a943f0 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager) + self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager, self.event_manager) runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 9fdb8de7..d2362b69 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -1,30 +1,33 @@ import socketserver import threading -import logging import structlog import select from queue import Queue - -from command_p2p_connection import P2PConnectionCommand +from socket_interface_commands import SocketCommandHandler # Shared queue for command and data handlers data_queue = Queue() -class CommandHandler(socketserver.BaseRequestHandler): - def __init__(self, request, client_address, server): +class CommandSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + self.state_manager = state_manager + self.logger = structlog.get_logger(type(self).__name__) + self.command_handler = SocketCommandHandler(request, self.state_manager) + self.handlers = { - 'CONNECT': self.handle_connect, - 'DISCONNECT': self.handle_disconnect, - 'MYCALL': self.handle_mycall, - 'BW': self.handle_bw, - 'ABORT': self.handle_abort, - 'PUBLIC': self.handle_public, - 'CWID': self.handle_cwid, - 'LISTEN': self.handle_listen, - 'COMPRESSION': self.handle_compression, - 'WINLINK SESSION': self.handle_winlink_session, + 'CONNECT': self.command_handler.handle_connect, + 'DISCONNECT': self.command_handler.handle_disconnect, + 'MYCALL': self.command_handler.handle_mycall, + 'BW': self.command_handler.handle_bw, + 'ABORT': self.command_handler.handle_abort, + 'PUBLIC': self.command_handler.handle_public, + 'CWID': self.command_handler.handle_cwid, + 'LISTEN': self.command_handler.handle_listen, + 'COMPRESSION': self.command_handler.handle_compression, + 'WINLINK SESSION': self.command_handler.handle_winlink_session, } super().__init__(request, client_address, server) @@ -62,52 +65,13 @@ class CommandHandler(socketserver.BaseRequestHandler): else: self.send_response(f"Unknown command: {command}") - def send_response(self, message): - self.request.sendall(message.encode()) - # Command handlers - def handle_connect(self, data): - # Your existing connect logic - self.send_response("OK\r\n") - def handle_disconnect(self, data): - # Your existing disconnect logic - self.send_response("OK\r\n") +class DataSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + self.state_manager = state_manager - def handle_mycall(self, data): - # Logic for handling MYCALL command - self.send_response("OK\r\n") - - def handle_bw(self, data): - # Logic for handling BW command - self.send_response("OK\r\n") - - def handle_abort(self, data): - # Logic for handling ABORT command - self.send_response("OK\r\n") - - def handle_public(self, data): - # Logic for handling PUBLIC command - self.send_response("OK\r\n") - - def handle_cwid(self, data): - # Logic for handling CWID command - self.send_response("OK\r\n") - - def handle_listen(self, data): - # Logic for handling LISTEN command - self.send_response("OK\r\n") - - def handle_compression(self, data): - # Logic for handling COMPRESSION command - self.send_response("OK\r\n") - - def handle_winlink_session(self, data): - # Logic for handling WINLINK SESSION command - self.send_response("OK\r\n") - -class DataHandler(socketserver.BaseRequestHandler): - def __init__(self, request, client_address, server): self.logger = structlog.get_logger(type(self).__name__) super().__init__(request, client_address, server) @@ -139,14 +103,27 @@ class DataHandler(socketserver.BaseRequestHandler): finally: self.log(f"Data connection closed with {self.client_address}") -class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + + +#class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): +# allow_reuse_address = True + + +class CustomThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): allow_reuse_address = True + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs): + self.extra_args = kwargs + super().__init__(server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) + + def finish_request(self, request, client_address): + self.RequestHandlerClass(request, client_address, self, **self.extra_args) class SocketInterfaceHandler: - def __init__(self, config, state_manager): + def __init__(self, config, state_manager, event_manager): self.config = config self.state_manager = state_manager + self.event_manager = event_manager self.logger = structlog.get_logger(type(self).__name__) self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"] self.data_port = self.config["SOCKET_INTERFACE"]["data_port"] @@ -162,8 +139,8 @@ class SocketInterfaceHandler: def start_servers(self): # Method to start both command and data server threads - self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandHandler)) - self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataHandler)) + self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandSocket)) + self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataSocket)) self.command_server_thread.start() self.data_server_thread.start() @@ -171,7 +148,7 @@ class SocketInterfaceHandler: self.log(f"Interfaces started") def run_server(self, port, handler): - with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + with CustomThreadedTCPServer(('127.0.0.1', port), handler, state_manager=self.state_manager, event_manager=self.event_manager) as server: self.log(f"Server started on port {port}") if port == self.command_port: self.command_server = server diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py new file mode 100644 index 00000000..2d252b59 --- /dev/null +++ b/modem/socket_interface_commands.py @@ -0,0 +1,68 @@ +from command_p2p_connection import P2PConnectionCommand + +class SocketCommandHandler: + + def __init__(self, request, state_manager): + self.request = request + self.state_manager = state_manager + + print(self.state_manager) + + + def send_response(self, message): + self.request.sendall(message.encode()) + + + + def handle_connect(self, data): + # Your existing connect logic + self.send_response("OK\r\n") + + params = { + 'destination': "BB2BBB-2", + 'origin': "AA1AAA-1", + } + cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) + session = cmd.run(self.iss_event_queue, self.iss_modem) + if session.session_id: + self.iss_state_manager.register_p2p_connection_session(session) + session.connect() + + + + + def handle_disconnect(self, data): + # Your existing disconnect logic + self.send_response("OK\r\n") + + def handle_mycall(self, data): + # Logic for handling MYCALL command + self.send_response("OK\r\n") + + def handle_bw(self, data): + # Logic for handling BW command + self.send_response("OK\r\n") + + def handle_abort(self, data): + # Logic for handling ABORT command + self.send_response("OK\r\n") + + def handle_public(self, data): + # Logic for handling PUBLIC command + self.send_response("OK\r\n") + + def handle_cwid(self, data): + # Logic for handling CWID command + self.send_response("OK\r\n") + + def handle_listen(self, data): + # Logic for handling LISTEN command + self.send_response("OK\r\n") + + def handle_compression(self, data): + # Logic for handling COMPRESSION command + self.send_response("OK\r\n") + + def handle_winlink_session(self, data): + # Logic for handling WINLINK SESSION command + self.send_response("OK\r\n") \ No newline at end of file From 4d3a0832d523a3ebd93f980f807fd945b60614d9 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 13:44:58 +0100 Subject: [PATCH 06/18] bind p2p connection to interface --- modem/data_frame_factory.py | 5 +++++ modem/server.py | 11 +++++++++++ modem/service_manager.py | 5 ++--- modem/socket_interface.py | 22 +++++++++++++++------- modem/socket_interface_commands.py | 23 ++++++++++++++--------- 5 files changed, 47 insertions(+), 19 deletions(-) diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 4577a0ea..8becb67e 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -18,6 +18,11 @@ class DataFrameFactory: } def __init__(self, config): + print(config) + print(config['STATION']['mycall']) + print(config['STATION']['myssid']) + print(config['STATION']['mygrid']) + self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}" self.mygrid = config['STATION']['mygrid'] diff --git a/modem/server.py b/modem/server.py index d335f1f4..4c4b3329 100644 --- a/modem/server.py +++ b/modem/server.py @@ -23,6 +23,7 @@ import command_arq_raw import command_message_send import event_manager import atexit +from socket_interface import SocketInterfaceHandler from message_system_db_manager import DatabaseManager from message_system_db_messages import DatabaseManagerMessages @@ -328,6 +329,8 @@ def sock_states(sock): def stop_server(): try: app.service_manager.modem_service.put("stop") + app.socket_interface_manager.stop_servers() + if app.service_manager.modem: app.service_manager.modem.sd_input_stream.stop audio.sd._terminate() @@ -358,6 +361,11 @@ if __name__ == "__main__": app.schedule_manager = ScheduleManager(app.MODEM_VERSION, app.config_manager, app.state_manager, app.event_manager) # start service manager app.service_manager = service_manager.SM(app) + + + #app.socket_interface_manager = SocketInterfaceHandler(app.service_manager.modem, app.config_manager, app.state_manager, app.event_manager) + + # start modem service app.modem_service.put("start") # initialize database default values @@ -373,4 +381,7 @@ if __name__ == "__main__": if not modemport: modemport = 5000 + #app.socket_interface_manager.start_servers() + app.run(modemaddress, modemport) + diff --git a/modem/service_manager.py b/modem/service_manager.py index 10a943f0..80a930ab 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager, self.event_manager) + self.socket_interface_manager = None runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True @@ -33,8 +33,7 @@ class SM: self.config = self.app.config_manager.read() self.start_radio_manager() self.start_modem() - - self.socket_interface_manager.start_servers() + self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers() elif cmd in ['stop'] and self.modem: self.stop_modem() diff --git a/modem/socket_interface.py b/modem/socket_interface.py index d2362b69..4c18c6d2 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -10,12 +10,15 @@ data_queue = Queue() class CommandSocket(socketserver.BaseRequestHandler): #def __init__(self, request, client_address, server): - def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None): self.state_manager = state_manager - + self.event_manager = event_manager + self.config_manager = config_manager + self.modem = modem + print(self.config_manager) self.logger = structlog.get_logger(type(self).__name__) - self.command_handler = SocketCommandHandler(request, self.state_manager) + self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager) self.handlers = { 'CONNECT': self.command_handler.handle_connect, @@ -69,8 +72,11 @@ class CommandSocket(socketserver.BaseRequestHandler): class DataSocket(socketserver.BaseRequestHandler): #def __init__(self, request, client_address, server): - def __init__(self, request, client_address, server, state_manager=None, event_manager=None): + def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None): self.state_manager = state_manager + self.event_manager = event_manager + self.config_manager = config_manager + self.modem = modem self.logger = structlog.get_logger(type(self).__name__) @@ -120,8 +126,10 @@ class CustomThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServe self.RequestHandlerClass(request, client_address, self, **self.extra_args) class SocketInterfaceHandler: - def __init__(self, config, state_manager, event_manager): - self.config = config + def __init__(self, modem, config_manager, state_manager, event_manager): + self.modem = modem + self.config_manager = config_manager + self.config = self.config_manager.read() self.state_manager = state_manager self.event_manager = event_manager self.logger = structlog.get_logger(type(self).__name__) @@ -148,7 +156,7 @@ class SocketInterfaceHandler: self.log(f"Interfaces started") def run_server(self, port, handler): - with CustomThreadedTCPServer(('127.0.0.1', port), handler, state_manager=self.state_manager, event_manager=self.event_manager) as server: + with CustomThreadedTCPServer(('127.0.0.1', port), handler, modem=self.modem, state_manager=self.state_manager, event_manager=self.event_manager, config_manager=self.config_manager) as server: self.log(f"Server started on port {port}") if port == self.command_port: self.command_server = server diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 2d252b59..9ed5ac84 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -2,30 +2,35 @@ from command_p2p_connection import P2PConnectionCommand class SocketCommandHandler: - def __init__(self, request, state_manager): + def __init__(self, request, modem, config_manager, state_manager, event_manager): self.request = request + self.modem = modem + self.config_manager = config_manager self.state_manager = state_manager - - print(self.state_manager) - + self.event_manager = event_manager def send_response(self, message): self.request.sendall(message.encode()) - - def handle_connect(self, data): # Your existing connect logic self.send_response("OK\r\n") + + + print(self.modem) + print(self.config_manager) + print(self.state_manager) + print(self.event_manager) + params = { 'destination': "BB2BBB-2", 'origin': "AA1AAA-1", } - cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) - session = cmd.run(self.iss_event_queue, self.iss_modem) + cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params) + session = cmd.run(self.event_manager.queues, self.modem) if session.session_id: - self.iss_state_manager.register_p2p_connection_session(session) + self.state_manager.register_p2p_connection_session(session) session.connect() From 0f3611fb157684b2f8cbde38afd48434c0281045 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 13:57:48 +0100 Subject: [PATCH 07/18] testing first response --- modem/command.py | 3 ++- modem/command_p2p_connection.py | 2 +- modem/p2p_connection.py | 6 +++++- modem/socket_interface_commands.py | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/modem/command.py b/modem/command.py index 65b9dc4f..6b4c0529 100644 --- a/modem/command.py +++ b/modem/command.py @@ -8,7 +8,7 @@ from arq_data_type_handler import ARQDataTypeHandler class TxCommand(): - def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}): + def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, request=None): self.config = config self.logger = structlog.get_logger(type(self).__name__) self.state_manager = state_manager @@ -16,6 +16,7 @@ class TxCommand(): self.set_params_from_api(apiParams) self.frame_factory = DataFrameFactory(config) self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager) + self.request = request def log(self, message, isWarning = False): msg = f"[{type(self).__name__}]: {message}" diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py index 74fb6a17..db293074 100644 --- a/modem/command_p2p_connection.py +++ b/modem/command_p2p_connection.py @@ -24,7 +24,7 @@ class P2PConnectionCommand(TxCommand): try: self.emit_event(event_queue) self.logger.info(self.log_message()) - session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.request) if session.session_id: self.state_manager.register_p2p_connection_session(session) session.connect() diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 2e7219cf..a5909ecb 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -49,11 +49,13 @@ class P2PConnection: }, } - def __init__(self, config: dict, modem, origin: str, destination: str, state_manager): + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, request=None): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.frame_factory = data_frame_factory.DataFrameFactory(self.config) + self.request = request + self.destination = destination self.origin = origin self.states = state_manager @@ -191,6 +193,8 @@ class P2PConnection: def session_failed(self): self.log("FAILED...........................") self.set_state(States.FAILED) + message = "DISCONNECTED\r\n" + self.request.sendall(message.encode()) def process_data_queue(self, frame=None): if not self.p2p_tx_queue.empty(): diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 9ed5ac84..d1fba490 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -27,7 +27,7 @@ class SocketCommandHandler: 'destination': "BB2BBB-2", 'origin': "AA1AAA-1", } - cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params) + cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self.request) session = cmd.run(self.event_manager.queues, self.modem) if session.session_id: self.state_manager.register_p2p_connection_session(session) From d2b3f3a36ea3c8846ba7c5093d726affbea56f6e Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 15:52:14 +0100 Subject: [PATCH 08/18] adjusted response part for commands --- modem/command.py | 4 +-- modem/command_p2p_connection.py | 2 +- modem/p2p_connection.py | 9 +++--- modem/server.py | 7 ----- modem/service_manager.py | 3 +- modem/socket_interface_commands.py | 49 ++++++++++++++---------------- 6 files changed, 32 insertions(+), 42 deletions(-) diff --git a/modem/command.py b/modem/command.py index 6b4c0529..e65b0d5d 100644 --- a/modem/command.py +++ b/modem/command.py @@ -8,7 +8,7 @@ from arq_data_type_handler import ARQDataTypeHandler class TxCommand(): - def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, request=None): + def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, socket_command_handler=None): self.config = config self.logger = structlog.get_logger(type(self).__name__) self.state_manager = state_manager @@ -16,7 +16,7 @@ class TxCommand(): self.set_params_from_api(apiParams) self.frame_factory = DataFrameFactory(config) self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager) - self.request = request + self.socket_command_handler = socket_command_handler def log(self, message, isWarning = False): msg = f"[{type(self).__name__}]: {message}" diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py index db293074..6d3d1035 100644 --- a/modem/command_p2p_connection.py +++ b/modem/command_p2p_connection.py @@ -24,7 +24,7 @@ class P2PConnectionCommand(TxCommand): try: self.emit_event(event_queue) self.logger.info(self.log_message()) - session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.request) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.socket_command_handler) if session.session_id: self.state_manager.register_p2p_connection_session(session) session.connect() diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index a5909ecb..815dcae2 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -49,12 +49,12 @@ class P2PConnection: }, } - def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, request=None): + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, socket_command_handler=None): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.frame_factory = data_frame_factory.DataFrameFactory(self.config) - self.request = request + self.socket_command_handler = socket_command_handler self.destination = destination self.origin = origin @@ -193,8 +193,7 @@ class P2PConnection: def session_failed(self): self.log("FAILED...........................") self.set_state(States.FAILED) - message = "DISCONNECTED\r\n" - self.request.sendall(message.encode()) + self.socket_command_handler.socket_respond_disconnected() def process_data_queue(self, frame=None): if not self.p2p_tx_queue.empty(): @@ -235,6 +234,7 @@ class P2PConnection: def received_disconnect(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) + self.socket_command_handler.socket_respond_disconnected() self.is_ISS = False disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id) self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) @@ -242,6 +242,7 @@ class P2PConnection: def received_disconnect_ack(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) + self.socket_command_handler.socket_respond_disconnected() def transmit_arq(self): diff --git a/modem/server.py b/modem/server.py index 4c4b3329..ee220096 100644 --- a/modem/server.py +++ b/modem/server.py @@ -23,7 +23,6 @@ import command_arq_raw import command_message_send import event_manager import atexit -from socket_interface import SocketInterfaceHandler from message_system_db_manager import DatabaseManager from message_system_db_messages import DatabaseManagerMessages @@ -362,10 +361,6 @@ if __name__ == "__main__": # start service manager app.service_manager = service_manager.SM(app) - - #app.socket_interface_manager = SocketInterfaceHandler(app.service_manager.modem, app.config_manager, app.state_manager, app.event_manager) - - # start modem service app.modem_service.put("start") # initialize database default values @@ -381,7 +376,5 @@ if __name__ == "__main__": if not modemport: modemport = 5000 - #app.socket_interface_manager.start_servers() - app.run(modemaddress, modemport) diff --git a/modem/service_manager.py b/modem/service_manager.py index 80a930ab..f19ecbab 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -38,7 +38,6 @@ class SM: elif cmd in ['stop'] and self.modem: self.stop_modem() self.stop_radio_manager() - self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) @@ -46,6 +45,8 @@ class SM: elif cmd in ['restart']: self.stop_modem() self.stop_radio_manager() + self.socket_interface_manager.stop_servers() + # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index d1fba490..33708b51 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -2,72 +2,67 @@ from command_p2p_connection import P2PConnectionCommand class SocketCommandHandler: - def __init__(self, request, modem, config_manager, state_manager, event_manager): - self.request = request + def __init__(self, cmd_request, modem, config_manager, state_manager, event_manager): + self.cmd_request = cmd_request self.modem = modem self.config_manager = config_manager self.state_manager = state_manager self.event_manager = event_manager def send_response(self, message): - self.request.sendall(message.encode()) + full_message = f"{message}\r\n" + self.cmd_request.sendall(full_message.encode()) def handle_connect(self, data): # Your existing connect logic - self.send_response("OK\r\n") - - - - print(self.modem) - print(self.config_manager) - print(self.state_manager) - print(self.event_manager) + self.send_response("OK") params = { - 'destination': "BB2BBB-2", - 'origin': "AA1AAA-1", + 'origin': data[0], + 'destination': data[1], } - cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self.request) + cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self) session = cmd.run(self.event_manager.queues, self.modem) if session.session_id: self.state_manager.register_p2p_connection_session(session) session.connect() - - - def handle_disconnect(self, data): - # Your existing disconnect logic - self.send_response("OK\r\n") + # Your existing connect logic + self.send_response("OK") def handle_mycall(self, data): # Logic for handling MYCALL command - self.send_response("OK\r\n") + self.send_response("OK") def handle_bw(self, data): # Logic for handling BW command - self.send_response("OK\r\n") + self.send_response("OK") def handle_abort(self, data): # Logic for handling ABORT command - self.send_response("OK\r\n") + self.send_response("OK") def handle_public(self, data): # Logic for handling PUBLIC command - self.send_response("OK\r\n") + self.send_response("OK") def handle_cwid(self, data): # Logic for handling CWID command - self.send_response("OK\r\n") + self.send_response("OK") def handle_listen(self, data): # Logic for handling LISTEN command - self.send_response("OK\r\n") + self.send_response("OK") def handle_compression(self, data): # Logic for handling COMPRESSION command - self.send_response("OK\r\n") + self.send_response("OK") def handle_winlink_session(self, data): # Logic for handling WINLINK SESSION command - self.send_response("OK\r\n") \ No newline at end of file + self.send_response("OK") + + def socket_respond_disconnected(self): + self.send_response("DISCONNECTED") + From 6562a441751c3bbea038e5b76570983624d2cb8b Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 16:11:38 +0100 Subject: [PATCH 09/18] adjusted response part for commands --- modem/p2p_connection.py | 9 ++++++++- modem/socket_interface_commands.py | 3 +++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 815dcae2..fd90ed5b 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -58,6 +58,8 @@ class P2PConnection: self.destination = destination self.origin = origin + self.bandwidth = 0 + self.states = state_manager self.modem = modem @@ -181,17 +183,22 @@ class P2PConnection: self.log("CONNECTED ISS...........................") self.set_state(States.CONNECTED) self.is_ISS = True + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) self.process_data_queue() def connected_irs(self, frame): self.log("CONNECTED IRS...........................") self.set_state(States.CONNECTED) self.is_ISS = False + self.orign = frame["origin"] + self.destination = frame["destination"] + + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) + session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def session_failed(self): - self.log("FAILED...........................") self.set_state(States.FAILED) self.socket_command_handler.socket_respond_disconnected() diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 33708b51..08885451 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -66,3 +66,6 @@ class SocketCommandHandler: def socket_respond_disconnected(self): self.send_response("DISCONNECTED") + def socket_respond_connected(self, mycall, dxcall, bandwidth): + message = f"CONNECTED {mycall} {dxcall} {bandwidth}" + self.send_response(message) From 0beb4aea237050e4c83868f755ced4927d8d5f57 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 16 Mar 2024 16:23:41 +0100 Subject: [PATCH 10/18] small cleanup --- modem/data_frame_factory.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 8becb67e..6fa82d29 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -18,10 +18,6 @@ class DataFrameFactory: } def __init__(self, config): - print(config) - print(config['STATION']['mycall']) - print(config['STATION']['myssid']) - print(config['STATION']['mygrid']) self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}" self.mygrid = config['STATION']['mygrid'] From 5ed11d771fbcf506c8a257c6514457d9ab0e93cf Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 11:19:13 +0100 Subject: [PATCH 11/18] added data and command queues --- modem/p2p_connection.py | 89 +++++++++++++++++++----------- modem/socket_interface.py | 20 +++++-- modem/socket_interface_commands.py | 11 ++-- tests/test_p2p_connection.py | 22 ++++++++ 4 files changed, 99 insertions(+), 43 deletions(-) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index fd90ed5b..6ce25047 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -6,6 +6,7 @@ import data_frame_factory import structlog import random from queue import Queue +import time class States(Enum): NEW = 0 @@ -37,7 +38,7 @@ class P2PConnection: FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', }, States.PAYLOAD_SENT: { - FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', }, States.DISCONNECTING: { FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', @@ -63,26 +64,13 @@ class P2PConnection: self.states = state_manager self.modem = modem - self.p2p_rx_queue = Queue() - self.p2p_tx_queue = Queue() + self.p2p_data_rx_queue = Queue() + self.p2p_data_tx_queue = Queue() self.state = States.NEW self.session_id = self.generate_id() - def generate_random_string(min_length, max_length): - import string - length = random.randint(min_length, max_length) - return ''.join(random.choices(string.ascii_letters, k=length)) - - # Generate and add 5 random entries to the queue - for _ in range(1): - random_entry = generate_random_string(2, 11) - self.p2p_tx_queue.put(random_entry) - - - - self.event_frame_received = threading.Event() self.RETRIES_CONNECT = 1 @@ -93,6 +81,30 @@ class P2PConnection: self.is_ISS = False # Indicator, if we are ISS or IRS + self.last_data_timestamp= time.time() + + self.start_data_processing_worker() + + def start_data_processing_worker(self): + """Starts a worker thread to monitor the transmit data queue and process data.""" + + def data_processing_worker(): + while True: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT: + self.disconnect() + return + + if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED: + self.process_data_queue() + threading.Event().wait(0.1) + + + + + # Create and start the worker thread + worker_thread = threading.Thread(target=data_processing_worker, daemon=True) + worker_thread.start() + def generate_id(self): while True: random_int = random.randint(1,255) @@ -102,7 +114,6 @@ class P2PConnection: if len(self.states.p2p_connection_sessions) >= 255: return False - def set_details(self, snr, frequency_offset): self.snr = snr self.frequency_offset = frequency_offset @@ -120,6 +131,7 @@ class P2PConnection: self.state = state def on_frame_received(self, frame): + self.last_data_timestamp = time.time() self.event_frame_received.set() self.log(f"Received {frame['frame_type']}") frame_type = frame['frame_type_int'] @@ -153,6 +165,7 @@ class P2PConnection: self.log("Timeout!") retries = retries - 1 + #self.connected_iss() # override connection state for simulation purposes self.session_failed() def launch_twr(self, frame_or_burst, timeout, retries, mode): @@ -179,35 +192,38 @@ class P2PConnection: self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return - def connected_iss(self, frame): + def connected_iss(self): self.log("CONNECTED ISS...........................") self.set_state(States.CONNECTED) self.is_ISS = True - self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) - self.process_data_queue() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) def connected_irs(self, frame): self.log("CONNECTED IRS...........................") + self.states.register_p2p_connection_session(self) self.set_state(States.CONNECTED) self.is_ISS = False self.orign = frame["origin"] - self.destination = frame["destination"] + self.destination = frame["destination_crc"] - self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def session_failed(self): self.set_state(States.FAILED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() def process_data_queue(self, frame=None): - if not self.p2p_tx_queue.empty(): + if not self.p2p_data_tx_queue.empty(): print("processing data....") self.set_state(States.PAYLOAD_SENT) - data = self.p2p_tx_queue.get() + data = self.p2p_data_tx_queue.get() sequence_id = random.randint(0,255) data = data.encode('utf-8') @@ -218,30 +234,35 @@ class P2PConnection: self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, mode=mode) return - print("ALL DATA SENT!!!!!") - self.disconnect() def prepare_data_chunk(self, data, mode): return data def received_data(self, frame): print(frame) + self.p2p_data_rx_queue.put(frame['data']) ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) def transmit_data_ack(self, frame): print(frame) + def transmitted_data(self, frame): + print("transmitted data...") + self.set_state(States.CONNECTED) + def disconnect(self): - self.set_state(States.DISCONNECTING) - disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) - self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + if self.state not in [States.DISCONNECTING, States.DISCONNECTED]: + self.set_state(States.DISCONNECTING) + disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) + self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return def received_disconnect(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() self.is_ISS = False disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id) self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) @@ -249,7 +270,8 @@ class P2PConnection: def received_disconnect_ack(self, frame): self.log("DISCONNECTED...............") self.set_state(States.DISCONNECTED) - self.socket_command_handler.socket_respond_disconnected() + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() def transmit_arq(self): @@ -259,4 +281,5 @@ class P2PConnection: #if command.run(app.modem_events, app.service_manager.modem): def received_arq(self): - pass \ No newline at end of file + pass + diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 4c18c6d2..7e6079b9 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -15,7 +15,6 @@ class CommandSocket(socketserver.BaseRequestHandler): self.event_manager = event_manager self.config_manager = config_manager self.modem = modem - print(self.config_manager) self.logger = structlog.get_logger(type(self).__name__) self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager) @@ -86,11 +85,13 @@ class DataSocket(socketserver.BaseRequestHandler): msg = f"[{type(self).__name__}]: {message}" logger = self.logger.warn if isWarning else self.logger.info logger(msg) + def handle(self): self.log(f"Data connection established with {self.client_address}") try: while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout if ready_to_read: self.data = self.request.recv(1024).strip() @@ -98,14 +99,21 @@ class DataSocket(socketserver.BaseRequestHandler): break try: self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") - except: + except Exception: self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + for session in self.state_manager.p2p_connection_sessions: + print(f"sessions: {session}") + session.p2p_data_tx_queue.put(self.data) + # Check if there's something to send from the queue, without blocking - if not data_queue.empty(): - data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking - self.request.sendall(data_to_send) - self.log(f"Sent data to {self.client_address}") + + for session_id in self.state_manager.p2p_connection_sessions: + session = self.state_manager.get_p2p_connection_session(session_id) + if not session.p2p_data_tx_queue.empty(): + data_to_send = session.p2p_data_tx_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + self.log(f"Sent data to {self.client_address}") finally: self.log(f"Data connection closed with {self.client_address}") diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 08885451..2db96532 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -9,6 +9,8 @@ class SocketCommandHandler: self.state_manager = state_manager self.event_manager = event_manager + self.session = None + def send_response(self, message): full_message = f"{message}\r\n" self.cmd_request.sendall(full_message.encode()) @@ -22,10 +24,11 @@ class SocketCommandHandler: 'destination': data[1], } cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self) - session = cmd.run(self.event_manager.queues, self.modem) - if session.session_id: - self.state_manager.register_p2p_connection_session(session) - session.connect() + self.session = cmd.run(self.event_manager.queues, self.modem) + if self.session.session_id: + self.state_manager.register_p2p_connection_session(self.session) + self.session.connect() + def handle_disconnect(self, data): # Your existing connect logic diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index b8cf2da4..c47cdf20 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -136,6 +136,11 @@ class TestP2PConnectionSession(unittest.TestCase): self.waitForSession(self.irs_event_queue, False) self.channels_running = False + def generate_random_string(self, min_length, max_length): + import string + length = random.randint(min_length, max_length) + return ''.join(random.choices(string.ascii_letters, k=length))# + def testARQSessionSmallPayload(self): # set Packet Error Rate (PER) / frame loss probability self.loss_probability = 0 @@ -151,8 +156,25 @@ class TestP2PConnectionSession(unittest.TestCase): self.iss_state_manager.register_p2p_connection_session(session) session.connect() + # Generate and add 5 random entries to the queue + for _ in range(5): + random_entry = self.generate_random_string(2, 11) + session.p2p_data_tx_queue.put(random_entry) + self.waitAndCloseChannels() del cmd + +class TestSocket: + def __init__(self, isCmd=True): + self.isCmd = isCmd + self.sent_data = [] # To capture data sent through this socket + self.received_data = b"" # To simulate data received by this socket + + def sendall(self, data): + print(f"Mock sendall called with data: {data}") + self.sent_data.append(data) + + if __name__ == '__main__': unittest.main() From 9da3fb80f0ca0c534501a4606871392e28e75e2d Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 13:01:25 +0100 Subject: [PATCH 12/18] adjusted tests --- modem/p2p_connection.py | 2 +- modem/socket_interface.py | 2 -- tests/test_p2p_connection.py | 46 ++++++++++++++++++++++-------------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 6ce25047..f4f8d992 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -192,7 +192,7 @@ class P2PConnection: self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) return - def connected_iss(self): + def connected_iss(self, frame=None): self.log("CONNECTED ISS...........................") self.set_state(States.CONNECTED) self.is_ISS = True diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 7e6079b9..62a2ae30 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -5,8 +5,6 @@ import select from queue import Queue from socket_interface_commands import SocketCommandHandler -# Shared queue for command and data handlers -data_queue = Queue() class CommandSocket(socketserver.BaseRequestHandler): #def __init__(self, request, client_address, server): diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index c47cdf20..9f19d171 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -22,6 +22,7 @@ from data_frame_factory import DataFrameFactory import codec2 import p2p_connection +from socket_interface_commands import SocketCommandHandler class TestModem: def __init__(self, event_q, state_q): @@ -63,6 +64,7 @@ class TestP2PConnectionSession(unittest.TestCase): cls.frame_factory = DataFrameFactory(cls.config) # ISS + cls.iss_config_manager = config_manager cls.iss_state_manager = StateManager(queue.Queue()) cls.iss_event_manager = EventManager([queue.Queue()]) cls.iss_event_queue = queue.Queue() @@ -76,6 +78,9 @@ class TestP2PConnectionSession(unittest.TestCase): cls.iss_state_manager, cls.iss_modem) + #cls.iss_socket_interface_handler = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager) + #cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234) + # IRS cls.irs_state_manager = StateManager(queue.Queue()) cls.irs_event_manager = EventManager([queue.Queue()]) @@ -146,34 +151,39 @@ class TestP2PConnectionSession(unittest.TestCase): self.loss_probability = 0 self.establishChannels() - params = { - 'destination': "BB2BBB-2", - 'origin': "AA1AAA-1", - } - cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) - session = cmd.run(self.iss_event_queue, self.iss_modem) - if session.session_id: - self.iss_state_manager.register_p2p_connection_session(session) - session.connect() - # Generate and add 5 random entries to the queue - for _ in range(5): - random_entry = self.generate_random_string(2, 11) - session.p2p_data_tx_queue.put(random_entry) + handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager) + handler.handle_connect(["AA1AAA-1", "BB2BBB-2"]) + + self.connected_event = threading.Event() + self.connected_event.wait() + + for session_id in self.iss_state_manager.p2p_connection_sessions: + session = self.iss_state_manager.get_p2p_connection_session(session_id) + + # Generate and add 5 random entries to the queue + for _ in range(5): + random_entry = self.generate_random_string(2, 11) + session.p2p_data_tx_queue.put(random_entry) + + + self.waitAndCloseChannels() - del cmd class TestSocket: - def __init__(self, isCmd=True): - self.isCmd = isCmd + def __init__(self, test_class): self.sent_data = [] # To capture data sent through this socket - self.received_data = b"" # To simulate data received by this socket - + self.test_class = test_class def sendall(self, data): print(f"Mock sendall called with data: {data}") self.sent_data.append(data) + self.event_handler(data) + + def event_handler(self, data): + if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data: + self.test_class.connected_event.set() if __name__ == '__main__': From aaa2084bfdfcbf6312890fa4eef725d0b0c68c2c Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 13:19:13 +0100 Subject: [PATCH 13/18] adjusted tests --- modem/p2p_connection.py | 1 + modem/state_manager.py | 1 + tests/test_p2p_connection.py | 8 ++++---- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index f4f8d992..9d4a626a 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -241,6 +241,7 @@ class P2PConnection: def received_data(self, frame): print(frame) self.p2p_data_rx_queue.put(frame['data']) + ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) diff --git a/modem/state_manager.py b/modem/state_manager.py index 91bec80a..ca65e4fb 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -219,6 +219,7 @@ class StateManager: def register_p2p_connection_session(self, session): if session.session_id in self.p2p_connection_sessions: + print("session already registered...") return False self.p2p_connection_sessions[session.session_id] = session return True diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index 9f19d171..fdcfc3ee 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -160,15 +160,12 @@ class TestP2PConnectionSession(unittest.TestCase): for session_id in self.iss_state_manager.p2p_connection_sessions: session = self.iss_state_manager.get_p2p_connection_session(session_id) - + session.ENTIRE_CONNECTION_TIMEOUT = 15 # Generate and add 5 random entries to the queue for _ in range(5): random_entry = self.generate_random_string(2, 11) session.p2p_data_tx_queue.put(random_entry) - - - self.waitAndCloseChannels() @@ -185,6 +182,9 @@ class TestSocket: if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data: self.test_class.connected_event.set() + if b'DISCONNECTED\r\n' in self.sent_data: + self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n') + if __name__ == '__main__': unittest.main() From bb8da6f5d81886970c7ee00ebfd1c713441ce55a Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 22:10:58 +0100 Subject: [PATCH 14/18] first attempt with arq session inside connection --- modem/arq_data_type_handler.py | 37 ++++++++++++++- modem/arq_session.py | 6 +-- modem/arq_session_irs.py | 4 +- modem/arq_session_iss.py | 6 ++- modem/command_p2p_connection.py | 2 +- modem/frame_handler_arq_session.py | 3 +- modem/frame_handler_p2p_connection.py | 2 +- modem/p2p_connection.py | 65 +++++++++++++++++++-------- tests/test_p2p_connection.py | 18 +++++--- 9 files changed, 108 insertions(+), 35 deletions(-) diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index 573d4077..d84eb634 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -11,6 +11,7 @@ class ARQ_SESSION_TYPES(Enum): raw_lzma = 10 raw_gzip = 11 p2pmsg_lzma = 20 + p2p_connection = 30 class ARQDataTypeHandler: def __init__(self, event_manager, state_manager): @@ -43,6 +44,12 @@ class ARQDataTypeHandler: 'failed' : self.failed_p2pmsg_lzma, 'transmitted': self.transmitted_p2pmsg_lzma, }, + ARQ_SESSION_TYPES.p2p_connection: { + 'prepare': self.prepare_p2p_connection, + 'handle': self.handle_p2p_connection, + 'failed': self.failed_p2p_connection, + 'transmitted': self.transmitted_p2p_connection, + }, } @staticmethod @@ -161,4 +168,32 @@ class ARQDataTypeHandler: def transmitted_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics) - return decompressed_data \ No newline at end of file + return decompressed_data + + + def prepare_p2p_connection(self, data): + compressed_data = gzip.compress(data) + self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + return compressed_data + + def handle_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + return decompressed_data + + def failed_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True) + print(self.state_manager.p2p_connection_sessions) + return decompressed_data + + def transmitted_p2p_connection(self, data, statistics): + + decompressed_data = gzip.decompress(data) + print(decompressed_data) + print(self.state_manager.p2p_connection_sessions) + for session_id in self.state_manager.p2p_connection_sessions: + print(session_id) + self.state_manager.p2p_connection_sessions[session_id].transmitted_arq() \ No newline at end of file diff --git a/modem/arq_session.py b/modem/arq_session.py index 0db45356..51672757 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -29,13 +29,13 @@ class ARQSession(): }, } - def __init__(self, config: dict, modem, dxcall: str): + def __init__(self, config: dict, modem, dxcall: str, state_manager): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.event_manager: EventManager = modem.event_manager - self.states = modem.states - + #self.states = modem.states + self.states = state_manager self.states.setARQ(True) self.snr = [] diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index fa6409fb..a0d1f2bd 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -59,8 +59,8 @@ class ARQSessionIRS(arq_session.ARQSession): }, } - def __init__(self, config: dict, modem, dxcall: str, session_id: int): - super().__init__(config, modem, dxcall) + def __init__(self, config: dict, modem, dxcall: str, session_id: int, state_manager): + super().__init__(config, modem, dxcall, state_manager) self.id = session_id self.dxcall = dxcall diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 611f591e..1c989c0c 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -54,7 +54,7 @@ class ARQSessionISS(arq_session.ARQSession): } def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes): - super().__init__(config, modem, dxcall) + super().__init__(config, modem, dxcall, state_manager) self.state_manager = state_manager self.data = data self.total_length = len(data) @@ -191,6 +191,10 @@ class ARQSessionISS(arq_session.ARQSession): self.set_state(ISS_State.ENDED) self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) + + print(self.state_manager.p2p_connection_sessions) + print(self.arq_data_type_handler.state_manager.p2p_connection_sessions) + self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) self.state_manager.remove_arq_iss_session(self.id) self.states.setARQ(False) diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py index 6d3d1035..2ec44960 100644 --- a/modem/command_p2p_connection.py +++ b/modem/command_p2p_connection.py @@ -24,7 +24,7 @@ class P2PConnectionCommand(TxCommand): try: self.emit_event(event_queue) self.logger.info(self.log_message()) - session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.socket_command_handler) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.event_manager, self.socket_command_handler) if session.session_id: self.state_manager.register_p2p_connection_session(session) session.connect() diff --git a/modem/frame_handler_arq_session.py b/modem/frame_handler_arq_session.py index 8ea805cd..9709fb02 100644 --- a/modem/frame_handler_arq_session.py +++ b/modem/frame_handler_arq_session.py @@ -32,7 +32,8 @@ class ARQFrameHandler(frame_handler.FrameHandler): session = ARQSessionIRS(self.config, self.modem, frame['origin'], - session_id) + session_id, + self.states) self.states.register_arq_irs_session(session) elif frame['frame_type_int'] in [ diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py index 7dc4931d..dc52a131 100644 --- a/modem/frame_handler_p2p_connection.py +++ b/modem/frame_handler_p2p_connection.py @@ -33,7 +33,7 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler): self.modem, frame['origin'], frame['destination_crc'], - self.states) + self.states, self.event_manager) session.session_id = session_id self.states.register_p2p_connection_session(session) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 9d4a626a..131762e7 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -7,6 +7,11 @@ import structlog import random from queue import Queue import time +from command_arq_raw import ARQRawCommand +import numpy as np +import base64 +from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES +from arq_session_iss import ARQSessionISS class States(Enum): NEW = 0 @@ -14,12 +19,13 @@ class States(Enum): CONNECT_SENT = 2 CONNECT_ACK_SENT = 3 CONNECTED = 4 - HEARTBEAT_SENT = 5 - HEARTBEAT_ACK_SENT = 6 + #HEARTBEAT_SENT = 5 + #HEARTBEAT_ACK_SENT = 6 PAYLOAD_SENT = 7 - DISCONNECTING = 8 - DISCONNECTED = 9 - FAILED = 10 + ARQ_SESSION = 8 + DISCONNECTING = 9 + DISCONNECTED = 10 + FAILED = 11 @@ -50,7 +56,7 @@ class P2PConnection: }, } - def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, socket_command_handler=None): + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.frame_factory = data_frame_factory.DataFrameFactory(self.config) @@ -61,12 +67,16 @@ class P2PConnection: self.origin = origin self.bandwidth = 0 - self.states = state_manager + self.state_manager = state_manager + self.event_manager = event_manager self.modem = modem + self.modem.demodulator.set_decode_mode([]) self.p2p_data_rx_queue = Queue() self.p2p_data_tx_queue = Queue() + self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager) + self.state = States.NEW self.session_id = self.generate_id() @@ -85,12 +95,13 @@ class P2PConnection: self.start_data_processing_worker() + def start_data_processing_worker(self): """Starts a worker thread to monitor the transmit data queue and process data.""" def data_processing_worker(): while True: - if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION: self.disconnect() return @@ -108,10 +119,10 @@ class P2PConnection: def generate_id(self): while True: random_int = random.randint(1,255) - if random_int not in self.states.p2p_connection_sessions: + if random_int not in self.state_manager.p2p_connection_sessions: return random_int - if len(self.states.p2p_connection_sessions) >= 255: + if len(self.state_manager.p2p_connection_sessions) >= 255: return False def set_details(self, snr, frequency_offset): @@ -201,7 +212,7 @@ class P2PConnection: def connected_irs(self, frame): self.log("CONNECTED IRS...........................") - self.states.register_p2p_connection_session(self) + self.state_manager.register_p2p_connection_session(self) self.set_state(States.CONNECTED) self.is_ISS = False self.orign = frame["origin"] @@ -229,10 +240,14 @@ class P2PConnection: if len(data) <= 11: mode = FREEDV_MODE.signalling + elif 11 < len(data) < 32: + mode = FREEDV_MODE.datac4 + else: + self.transmit_arq(data) + return payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) - self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, - mode=mode) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode) return def prepare_data_chunk(self, data, mode): @@ -274,12 +289,26 @@ class P2PConnection: if self.socket_command_handler: self.socket_command_handler.socket_respond_disconnected() + def transmit_arq(self, data): + self.set_state(States.ARQ_SESSION) + + print("----------------------------------------------------------------") + print(self.destination) + print(self.state_manager.p2p_connection_sessions) + + prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) + iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte) + iss.id = self.session_id + if iss.id: + self.state_manager.register_arq_iss_session(iss) + iss.start() + return iss + + def transmitted_arq(self): + self.last_data_timestamp = time.time() + self.set_state(States.CONNECTED) + - def transmit_arq(self): - pass - #command = cmd_class(self.config, self.states, self.eve, params) - #app.logger.info(f"Command {command.get_name()} running...") - #if command.run(app.modem_events, app.service_manager.modem): def received_arq(self): pass diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index fdcfc3ee..c67ed2f2 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -117,9 +117,9 @@ class TestP2PConnectionSession(unittest.TestCase): key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' while True and self.channels_running: ev = q.get() - if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]): - self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") - break + #if key in ev and ('success' in ev[key] or 'P2P_CONNECTION_DISCONNECT_ACK' in ev[key]): + # self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + # break def establishChannels(self): self.channels_running = True @@ -162,10 +162,15 @@ class TestP2PConnectionSession(unittest.TestCase): session = self.iss_state_manager.get_p2p_connection_session(session_id) session.ENTIRE_CONNECTION_TIMEOUT = 15 # Generate and add 5 random entries to the queue - for _ in range(5): - random_entry = self.generate_random_string(2, 11) - session.p2p_data_tx_queue.put(random_entry) + for _ in range(3): + min_length = (30 * _ ) + 1 + max_length = (30 * _ ) + 1 + print(min_length) + print(max_length) + random_entry = self.generate_random_string(min_length, max_length) + session.p2p_data_tx_queue.put(random_entry) + session.p2p_data_tx_queue.put('12345') self.waitAndCloseChannels() @@ -185,6 +190,5 @@ class TestSocket: if b'DISCONNECTED\r\n' in self.sent_data: self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n') - if __name__ == '__main__': unittest.main() From f308134276e143a9e14488e7e6ce3f4f29b9c1d4 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Tue, 19 Mar 2024 22:16:27 +0100 Subject: [PATCH 15/18] added rx part for larger data --- modem/arq_data_type_handler.py | 6 +++++- modem/p2p_connection.py | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index d84eb634..a81c63d0 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -181,7 +181,11 @@ class ARQDataTypeHandler: decompressed_data = gzip.decompress(data) self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes") print(self.state_manager.p2p_connection_sessions) - return decompressed_data + print(decompressed_data) + print(self.state_manager.p2p_connection_sessions) + for session_id in self.state_manager.p2p_connection_sessions: + print(session_id) + self.state_manager.p2p_connection_sessions[session_id].received_arq(decompressed_data) def failed_p2p_connection(self, data, statistics): decompressed_data = gzip.decompress(data) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 131762e7..17d8fe65 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -308,8 +308,8 @@ class P2PConnection: self.last_data_timestamp = time.time() self.set_state(States.CONNECTED) - - - def received_arq(self): - pass + def received_arq(self, data): + self.last_data_timestamp = time.time() + self.set_state(States.CONNECTED) + self.p2p_data_rx_queue.put(data) From e57d3cf6659229c6938a6cb1f5defb5bf9519ebd Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sun, 24 Mar 2024 19:16:19 +0100 Subject: [PATCH 16/18] saved latest dev state.. --- tests/test_p2p_connection.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index c67ed2f2..c8004646 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -98,6 +98,8 @@ class TestP2PConnectionSession(unittest.TestCase): cls.channels_running = True + cls.disconnect_received = False + def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER): while self.channels_running: # Transfer data between both parties @@ -114,12 +116,12 @@ class TestP2PConnectionSession(unittest.TestCase): self.logger.info(f"[{threading.current_thread().name}] Channel closed.") def waitForSession(self, q, outbound=False): - key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' while True and self.channels_running: ev = q.get() - #if key in ev and ('success' in ev[key] or 'P2P_CONNECTION_DISCONNECT_ACK' in ev[key]): - # self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") - # break + print(ev) + if 'P2P_CONNECTION_DISCONNECT_ACK' in ev or self.disconnect_received: + self.logger.info(f"[{threading.current_thread().name}] session ended.") + break def establishChannels(self): self.channels_running = True @@ -188,6 +190,7 @@ class TestSocket: self.test_class.connected_event.set() if b'DISCONNECTED\r\n' in self.sent_data: + self.disconnect_received = True self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n') if __name__ == '__main__': From bdd8888f1b87dee4ab2870e5ba270ba3b34cfdb7 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Mon, 25 Mar 2024 19:29:30 +0100 Subject: [PATCH 17/18] enable/disable socket interface --- modem/config.ini.example | 1 + modem/config.py | 3 ++- modem/service_manager.py | 10 +++++++--- modem/socket_interface.py | 2 ++ modem/socket_interface_commands.py | 1 + tests/test_p2p_connection.py | 2 +- 6 files changed, 14 insertions(+), 5 deletions(-) diff --git a/modem/config.ini.example b/modem/config.ini.example index c0a2b5bf..1f077baa 100644 --- a/modem/config.ini.example +++ b/modem/config.ini.example @@ -49,6 +49,7 @@ enable_morse_identifier = False respond_to_cq = True tx_delay = 50 maximum_bandwidth = 1700 +enable_socket_interface = False [SOCKET_INTERFACE] enable = False diff --git a/modem/config.py b/modem/config.py index dcd234c1..0b7debca 100644 --- a/modem/config.py +++ b/modem/config.py @@ -59,7 +59,8 @@ class CONFIG: 'enable_morse_identifier': bool, 'maximum_bandwidth': int, 'respond_to_cq': bool, - 'tx_delay': int + 'tx_delay': int, + 'enable_socket_interface': bool, }, 'SOCKET_INTERFACE': { 'enable' : bool, diff --git a/modem/service_manager.py b/modem/service_manager.py index f19ecbab..a2f9a0c1 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -33,19 +33,23 @@ class SM: self.config = self.app.config_manager.read() self.start_radio_manager() self.start_modem() - self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers() + + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers() elif cmd in ['stop'] and self.modem: self.stop_modem() self.stop_radio_manager() - self.socket_interface_manager.stop_servers() + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) elif cmd in ['restart']: self.stop_modem() self.stop_radio_manager() - self.socket_interface_manager.stop_servers() + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) diff --git a/modem/socket_interface.py b/modem/socket_interface.py index 62a2ae30..0b41b4fc 100644 --- a/modem/socket_interface.py +++ b/modem/socket_interface.py @@ -1,3 +1,5 @@ +""" WORK IN PROGRESS by DJ2LS""" + import socketserver import threading import structlog diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py index 2db96532..fe4c18f4 100644 --- a/modem/socket_interface_commands.py +++ b/modem/socket_interface_commands.py @@ -1,3 +1,4 @@ +""" WORK IN PROGRESS by DJ2LS""" from command_p2p_connection import P2PConnectionCommand class SocketCommandHandler: diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index c8004646..44ceb5ae 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -148,7 +148,7 @@ class TestP2PConnectionSession(unittest.TestCase): length = random.randint(min_length, max_length) return ''.join(random.choices(string.ascii_letters, k=length))# - def testARQSessionSmallPayload(self): + def DisabledtestARQSessionSmallPayload(self): # set Packet Error Rate (PER) / frame loss probability self.loss_probability = 0 From f5d0b90eda8107b6c4607b1aa8c4effb9aaa4156 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Wed, 27 Mar 2024 09:20:24 +0100 Subject: [PATCH 18/18] fixing a test --- tests/test_arq_session.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index 8309565d..b9221475 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -221,7 +221,9 @@ class TestARQSession(unittest.TestCase): session = arq_session_irs.ARQSessionIRS(self.config, self.irs_modem, 'AA1AAA-1', - random.randint(0, 255)) + random.randint(0, 255), + self.irs_state_manager + ) self.irs_state_manager.register_arq_irs_session(session) for session_id in self.irs_state_manager.arq_irs_sessions: session = self.irs_state_manager.arq_irs_sessions[session_id]