From 20b1fe7e2d82d2d69d5f11f189eebe33f4d22885 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sat, 9 Mar 2024 10:47:27 +0100 Subject: [PATCH] WIP p2p --- modem/command_p2p_connection.py | 37 +++++ modem/data_frame_factory.py | 99 +++++++++++- modem/frame_dispatcher.py | 17 ++- modem/frame_handler.py | 16 +- modem/frame_handler_p2p_connection.py | 52 +++++++ modem/modem_frametypes.py | 9 +- modem/p2p_connection.py | 211 ++++++++++++++++++++++++++ modem/server.py | 1 + modem/state_manager.py | 13 ++ modem/vara_interface.py | 116 ++++++++++++++ tests/test_p2p_connection.py | 160 +++++++++++++++++++ 11 files changed, 721 insertions(+), 10 deletions(-) create mode 100644 modem/command_p2p_connection.py create mode 100644 modem/frame_handler_p2p_connection.py create mode 100644 modem/p2p_connection.py create mode 100644 modem/vara_interface.py create mode 100644 tests/test_p2p_connection.py diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py new file mode 100644 index 00000000..74fb6a17 --- /dev/null +++ b/modem/command_p2p_connection.py @@ -0,0 +1,37 @@ +import queue +from command import TxCommand +import api_validations +import base64 +from queue import Queue +from p2p_connection import P2PConnection + +class P2PConnectionCommand(TxCommand): + + def set_params_from_api(self, apiParams): + self.origin = apiParams['origin'] + if not api_validations.validate_freedata_callsign(self.origin): + self.origin = f"{self.origin}-0" + + self.destination = apiParams['destination'] + if not api_validations.validate_freedata_callsign(self.destination): + self.destination = f"{self.destination}-0" + + + def connect(self, event_queue: Queue, modem): + pass + + def run(self, event_queue: Queue, modem): + try: + self.emit_event(event_queue) + self.logger.info(self.log_message()) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager) + if session.session_id: + self.state_manager.register_p2p_connection_session(session) + session.connect() + return session + return False + + except Exception as e: + self.log(f"Error starting P2P Connection session: {e}", isWarning=True) + + return False \ No newline at end of file diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 2acc837c..560bdb35 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -28,6 +28,7 @@ class DataFrameFactory: self._load_ping_templates() self._load_fec_templates() self._load_arq_templates() + self._load_p2p_connection_templates() def _load_broadcast_templates(self): # cq frame @@ -159,6 +160,52 @@ class DataFrameFactory: "snr": 1, "flag": 1, } + + def _load_p2p_connection_templates(self): + # p2p connect request + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # connect ACK + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # heartbeat for "is alive" + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # ack heartbeat + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # p2p payload frames + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD.value] = { + "frame_length": None, + "session_id": 1, + "sequence_id": 1, + "data": "dynamic", + } + + # p2p payload frame ack + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + "sequence_id": 1, + } + + def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): @@ -402,8 +449,9 @@ class DataFrameFactory: "offset": offset.to_bytes(4, 'big'), "data": data, } - frame = self.construct(FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode)) - return frame + return self.construct( + FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode) + ) def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int, frames_per_burst: int, snr: int, flag_final=False, flag_checksum=False, flag_abort=False): @@ -426,3 +474,50 @@ class DataFrameFactory: "flag": flag.to_bytes(1, 'big'), } return self.construct(FR_TYPE.ARQ_BURST_ACK, payload) + + def build_p2p_connection_connect(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT, payload) + + def build_p2p_connection_connect_ack(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) + + def build_p2p_connection_heartbeat(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) + + def build_p2p_connection_heartbeat_ack(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) + + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + "data": data, + } + return self.construct( + FR_TYPE.P2P_CONNECTION_PAYLOAD, + payload, + self.get_bytes_per_frame(freedv_mode), + ) + + def build_p2p_connection_payload_ack(self, session_id, sequence_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload) \ No newline at end of file diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index c7f860bb..88ddaf94 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -13,8 +13,11 @@ from frame_handler import FrameHandler from frame_handler_ping import PingFrameHandler from frame_handler_cq import CQFrameHandler from frame_handler_arq_session import ARQFrameHandler +from frame_handler_p2p_connection import P2PConnectionFrameHandler from frame_handler_beacon import BeaconFrameHandler + + class DISPATCHER(): FRAME_HANDLER = { @@ -22,9 +25,15 @@ class DISPATCHER(): FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"}, FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"}, FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"}, - FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"}, - FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, - FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.P2P_CONNECTION_CONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT"}, + FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT ACK"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD ACK"}, + + #FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, + #FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"}, FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"}, FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"}, @@ -82,7 +91,7 @@ class DISPATCHER(): if frametype not in self.FRAME_HANDLER: self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + "[DISPATCHER] ARQ - other frame type", frametype=FR_TYPE(frametype).name) return # instantiate handler diff --git a/modem/frame_handler.py b/modem/frame_handler.py index 46347d1e..89493bbe 100644 --- a/modem/frame_handler.py +++ b/modem/frame_handler.py @@ -34,7 +34,7 @@ class FrameHandler(): ft = self.details['frame']['frame_type'] valid = False # Check for callsign checksum - if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']: + if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK', 'P2P_CONNECTION_CONNECT']: valid, mycallsign = helpers.check_callsign( call_with_ssid, self.details["frame"]["destination_crc"], @@ -51,6 +51,20 @@ class FrameHandler(): session_id = self.details['frame']['session_id'] if session_id in self.states.arq_iss_sessions: valid = True + + # check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT']: + valid, mycallsign = helpers.check_callsign( + call_with_ssid, + self.details["frame"]["destination_crc"], + self.config['STATION']['ssid_list']) + + #check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK']: + session_id = self.details['frame']['session_id'] + if session_id in self.states.p2p_connection_sessions: + valid = True + else: valid = False diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py new file mode 100644 index 00000000..13315487 --- /dev/null +++ b/modem/frame_handler_p2p_connection.py @@ -0,0 +1,52 @@ +from queue import Queue +import frame_handler +from event_manager import EventManager +from state_manager import StateManager +from modem_frametypes import FRAME_TYPE as FR +from p2p_connection import P2PConnection + +class P2PConnectionFrameHandler(frame_handler.FrameHandler): + + def follow_protocol(self): + + if not self.should_respond(): + return + + frame = self.details['frame'] + session_id = frame['session_id'] + snr = self.details["snr"] + frequency_offset = self.details["frequency_offset"] + + if frame['frame_type_int'] == FR.P2P_CONNECTION_CONNECT.value: + + # Lost OPEN_ACK case .. ISS will retry opening a session + if session_id in self.states.arq_irs_sessions: + session = self.states.p2p_connection_sessions[session_id] + + # Normal case when receiving a SESSION_OPEN for the first time + else: + # if self.states.check_if_running_arq_session(): + # self.logger.warning("DISCARDING SESSION OPEN because of ongoing ARQ session ", frame=frame) + # return + print(frame) + session = P2PConnection(self.config, + self.modem, + frame['origin'], + frame['destination_crc'], + self.states) + session.session_id = session_id + self.states.register_p2p_connection_session(session) + + elif frame['frame_type_int'] in [ + FR.P2P_CONNECTION_CONNECT_ACK.value, + FR.P2P_CONNECTION_PAYLOAD.value, + FR.P2P_CONNECTION_PAYLOAD_ACK.value, + ]: + session = self.states.get_p2p_connection_session(session_id) + + else: + self.logger.warning("DISCARDING FRAME", frame=frame) + return + + session.set_details(snr, frequency_offset) + session.on_frame_received(frame) diff --git a/modem/modem_frametypes.py b/modem/modem_frametypes.py index 582a7907..da46a033 100644 --- a/modem/modem_frametypes.py +++ b/modem/modem_frametypes.py @@ -6,9 +6,6 @@ from enum import Enum class FRAME_TYPE(Enum): """Lookup for frame types""" - ARQ_CONNECTION_OPEN = 1 - ARQ_CONNECTION_HB = 2 - ARQ_CONNECTION_CLOSE = 3 ARQ_STOP = 10 ARQ_STOP_ACK = 11 ARQ_SESSION_OPEN = 12 @@ -17,6 +14,12 @@ class FRAME_TYPE(Enum): ARQ_SESSION_INFO_ACK = 15 ARQ_BURST_FRAME = 20 ARQ_BURST_ACK = 21 + P2P_CONNECTION_CONNECT = 30 + P2P_CONNECTION_CONNECT_ACK = 31 + P2P_CONNECTION_HEARTBEAT = 32 + P2P_CONNECTION_HEARTBEAT_ACK = 33 + P2P_CONNECTION_PAYLOAD = 34 + P2P_CONNECTION_PAYLOAD_ACK = 35 MESH_BROADCAST = 100 MESH_SIGNALLING_PING = 101 MESH_SIGNALLING_PING_ACK = 102 diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py new file mode 100644 index 00000000..facdf296 --- /dev/null +++ b/modem/p2p_connection.py @@ -0,0 +1,211 @@ +import threading +from enum import Enum +from modem_frametypes import FRAME_TYPE +from codec2 import FREEDV_MODE +import data_frame_factory +import structlog +import random +from queue import Queue + +class States(Enum): + NEW = 0 + CONNECTING = 1 + CONNECT_SENT = 2 + CONNECT_ACK_SENT = 3 + CONNECTED = 4 + HEARTBEAT_SENT = 5 + HEARTBEAT_ACK_SENT = 6 + PAYLOAD_SENT = 7 + DISCONNECTING = 8 + DISCONNECTED = 9 + FAILED = 10 + + + +class P2PConnection: + STATE_TRANSITION = { + States.NEW: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + }, + States.CONNECTING: { + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + }, + States.CONNECTED: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue', + + } + } + + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager): + self.logger = structlog.get_logger(type(self).__name__) + self.config = config + self.frame_factory = data_frame_factory.DataFrameFactory(self.config) + + self.destination = destination + self.origin = origin + self.states = state_manager + self.modem = modem + + self.p2p_rx_queue = Queue() + self.p2p_tx_queue = Queue() + + + self.state = States.NEW + self.session_id = self.generate_id() + + def generate_random_string(min_length, max_length): + import string + length = random.randint(min_length, max_length) + return ''.join(random.choices(string.ascii_letters, k=length)) + + # Generate and add 5 random entries to the queue + for _ in range(1): + random_entry = generate_random_string(2, 11) + self.p2p_tx_queue.put(random_entry) + + + + + self.event_frame_received = threading.Event() + + self.RETRIES_CONNECT = 1 + self.TIMEOUT_CONNECT = 10 + self.TIMEOUT_DATA = 5 + self.RETRIES_DATA = 5 + self.ENTIRE_CONNECTION_TIMEOUT = 100 + + self.is_ISS = False # Indicator, if we are ISS or IRS + + def generate_id(self): + while True: + random_int = random.randint(1,255) + if random_int not in self.states.p2p_connection_sessions: + return random_int + + if len(self.states.p2p_connection_sessions) >= 255: + return False + + + def set_details(self, snr, frequency_offset): + self.snr = snr + self.frequency_offset = frequency_offset + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def set_state(self, state): + if self.state == state: + self.log(f"{type(self).__name__} state {self.state.name} unchanged.") + else: + self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}") + self.state = state + + def on_frame_received(self, frame): + self.event_frame_received.set() + self.log(f"Received {frame['frame_type']}") + frame_type = frame['frame_type_int'] + if self.state in self.STATE_TRANSITION: + if frame_type in self.STATE_TRANSITION[self.state]: + action_name = self.STATE_TRANSITION[self.state][frame_type] + response = getattr(self, action_name)(frame) + + return + + self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") + + def transmit_frame(self, frame: bytearray, mode='auto'): + self.log("Transmitting frame") + if mode in ['auto']: + mode = self.get_mode_by_speed_level(self.speed_level) + + self.modem.transmit(mode, 1, 1, frame) + + def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode): + while retries > 0: + self.event_frame_received = threading.Event() + if isinstance(frame_or_burst, list): burst = frame_or_burst + else: burst = [frame_or_burst] + for f in burst: + self.transmit_frame(f, mode) + self.event_frame_received.clear() + self.log(f"Waiting {timeout} seconds...") + if self.event_frame_received.wait(timeout): + return + self.log("Timeout!") + retries = retries - 1 + + self.session_failed() + + def launch_twr(self, frame_or_burst, timeout, retries, mode): + twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True) + twr.start() + + def transmit_and_wait_irs(self, frame, timeout, mode): + self.event_frame_received.clear() + self.transmit_frame(frame, mode) + self.log(f"Waiting {timeout} seconds...") + #if not self.event_frame_received.wait(timeout): + # self.log("Timeout waiting for ISS. Session failed.") + # self.transmission_failed() + + def launch_twr_irs(self, frame, timeout, mode): + thread_wait = threading.Thread(target = self.transmit_and_wait_irs, + args = [frame, timeout, mode], daemon=True) + thread_wait.start() + + def connect(self): + self.set_state(States.CONNECTING) + self.is_ISS = True + session_open_frame = self.frame_factory.build_p2p_connection_connect(self.origin, self.destination, self.session_id) + self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + return + + def connected_iss(self, frame): + self.log("CONNECTED ISS...........................") + self.set_state(States.CONNECTED) + self.is_ISS = True + self.process_data_queue() + + def connected_irs(self, frame): + self.log("CONNECTED IRS...........................") + self.set_state(States.CONNECTED) + self.is_ISS = False + session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) + self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def session_failed(self): + self.log("FAILED...........................") + self.set_state(States.FAILED) + + def process_data_queue(self, frame=None): + print("processing data....") + print(self.p2p_tx_queue.empty()) + if not self.p2p_tx_queue.empty(): + data = self.p2p_tx_queue.get() + sequence_id = random.randint(0,255) + data = data.encode('utf-8') + + if len(data) <= 11: + mode = FREEDV_MODE.signalling + + payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, + mode=mode) + return + print("ALL DATA SENT!!!!!") + + def prepare_data_chunk(self, data, mode): + return data + + def received_data(self, frame): + print(frame) + ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) + self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def transmit_data_ack(self, frame): + print(frame) diff --git a/modem/server.py b/modem/server.py index f6a80421..37617706 100644 --- a/modem/server.py +++ b/modem/server.py @@ -329,6 +329,7 @@ if __name__ == "__main__": app.config_manager = CONFIG(config_file) # start modem + app.p2p_data_queue = queue.Queue() # queue which holds processing data of p2p connections app.state_queue = queue.Queue() # queue which holds latest states app.modem_events = queue.Queue() # queue which holds latest events app.modem_fft = queue.Queue() # queue which holds latest fft data diff --git a/modem/state_manager.py b/modem/state_manager.py index 69b6b44a..91bec80a 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -38,6 +38,8 @@ class StateManager: self.arq_iss_sessions = {} self.arq_irs_sessions = {} + self.p2p_connection_sessions = {} + #self.mesh_routing_table = [] self.radio_frequency = 0 @@ -214,3 +216,14 @@ class StateManager: "radio_rf_level": self.radio_rf_level, "s_meter_strength": self.s_meter_strength, } + + def register_p2p_connection_session(self, session): + if session.session_id in self.p2p_connection_sessions: + return False + self.p2p_connection_sessions[session.session_id] = session + return True + + def get_p2p_connection_session(self, id): + if id not in self.p2p_connection_sessions: + pass + return self.p2p_connection_sessions[id] \ No newline at end of file diff --git a/modem/vara_interface.py b/modem/vara_interface.py new file mode 100644 index 00000000..d1143e45 --- /dev/null +++ b/modem/vara_interface.py @@ -0,0 +1,116 @@ +import socketserver +import threading +import logging +import signal +import sys +import select +from queue import Queue + +from command_p2p_connection import P2PConnectionCommand + +# Shared queue for command and data handlers +data_queue = Queue() +# Initialize logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + + +class VARACommandHandler(socketserver.BaseRequestHandler): + mycall = None # Class attribute to store mycall + dxcall = None + bandwidth = None # Class attribute to store bandwidth + + + def handle(self): + logging.info(f"Command connection established with {self.client_address}") + try: + while True: + self.data = self.request.recv(1024).strip() + if not self.data: + break + logging.info(f"Command received from {self.client_address}: {self.data}") + + if self.data.startswith(b'MYCALL '): + VARACommandHandler.mycall = self.data.split(b' ')[1].strip() + self.request.sendall(b"OK\r\n") + elif self.data.startswith(b'BW'): + VARACommandHandler.bandwidth = self.data[2:].strip() + self.request.sendall(b"OK\r\n") + elif self.data.startswith(b'CONNECT '): + + P2PConnectionCommand.connect('MYCALL', 'DXCALL', 'BANDWIDTH') + + self.request.sendall(b"OK\r\n") + parts = self.data.split() + if len(parts) >= 3 and VARACommandHandler.mycall and VARACommandHandler.bandwidth: + VARACommandHandler.dxcall = parts[2] + # Using the stored mycall and bandwidth for the response + bytestring = b'CONNECTED ' + VARACommandHandler.mycall + b' ' + VARACommandHandler.dxcall + b' ' + VARACommandHandler.bandwidth + b'\r\n' + self.request.sendall(bytestring) + + else: + self.request.sendall(b"ERROR: MYCALL or Bandwidth not set.\r\n") + elif self.data.startswith(b'ABORT'): + bytestring = b'DISCONNECTED\r\n' + elif self.data.startswith(b'DISCONNECT'): + bytestring = b'DISCONNECTED\r\n' + self.request.sendall(bytestring) + else: + self.request.sendall(b"OK\r\n") + + finally: + logging.info(f"Command connection closed with {self.client_address}") + + +class VARADataHandler(socketserver.BaseRequestHandler): + def handle(self): + logging.info(f"Data connection established with {self.client_address}") + + try: + while True: + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout + if ready_to_read: + self.data = self.request.recv(1024).strip() + if not self.data: + break + try: + logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") + except: + logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + + + + # Check if there's something to send from the queue, without blocking + if not data_queue.empty(): + data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + logging.info(f"Sent data to {self.client_address}") + + finally: + logging.info(f"Data connection closed with {self.client_address}") +class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + +def run_server(port, handler): + with ThreadedTCPServer(('127.0.0.1', port), handler) as server: + logging.info(f"Server running on port {port}") + server.serve_forever() + + +def signal_handler(sig, frame): + sys.exit(0) + +if __name__ == '__main__': + # Setup signal handler for graceful shutdown + signal.signal(signal.SIGINT, signal_handler) + + # Create server threads for command and data ports + command_server_thread = threading.Thread(target=run_server, args=(8300, VARACommandHandler)) + data_server_thread = threading.Thread(target=run_server, args=(8301, VARADataHandler)) + + # Start the server threads + command_server_thread.start() + data_server_thread.start() + + # Wait for both server threads to finish + command_server_thread.join() + data_server_thread.join() diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py new file mode 100644 index 00000000..9c0cf73b --- /dev/null +++ b/tests/test_p2p_connection.py @@ -0,0 +1,160 @@ +import sys +import time + +sys.path.append('modem') + +import unittest +import unittest.mock +from config import CONFIG +import helpers +import queue +import threading +import base64 +from command_p2p_connection import P2PConnectionCommand +from state_manager import StateManager +from frame_dispatcher import DISPATCHER +import random +import structlog +import numpy as np +from event_manager import EventManager +from state_manager import StateManager +from data_frame_factory import DataFrameFactory +import codec2 +import p2p_connection + + +class TestModem: + def __init__(self, event_q, state_q): + self.data_queue_received = queue.Queue() + self.demodulator = unittest.mock.Mock() + self.event_manager = EventManager([event_q]) + self.logger = structlog.get_logger('Modem') + self.states = StateManager(state_q) + + def getFrameTransmissionTime(self, mode): + samples = 0 + c2instance = codec2.open_instance(mode.value) + samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance) + time = samples / 8000 + return time + + def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: + # Simulate transmission time + tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT + self.logger.info(f"TX {tx_time} seconds...") + threading.Event().wait(tx_time) + + transmission = { + 'mode': mode, + 'bytes': frames, + } + self.data_queue_received.put(transmission) + + +class TestP2PConnectionSession(unittest.TestCase): + + @classmethod + def setUpClass(cls): + config_manager = CONFIG('modem/config.ini.example') + cls.config = config_manager.read() + cls.logger = structlog.get_logger("TESTS") + cls.frame_factory = DataFrameFactory(cls.config) + + # ISS + cls.iss_state_manager = StateManager(queue.Queue()) + cls.iss_event_manager = EventManager([queue.Queue()]) + cls.iss_event_queue = queue.Queue() + cls.iss_state_queue = queue.Queue() + cls.iss_p2p_data_queue = queue.Queue() + + + cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue) + cls.iss_frame_dispatcher = DISPATCHER(cls.config, + cls.iss_event_manager, + cls.iss_state_manager, + cls.iss_modem) + + # IRS + cls.irs_state_manager = StateManager(queue.Queue()) + cls.irs_event_manager = EventManager([queue.Queue()]) + cls.irs_event_queue = queue.Queue() + cls.irs_state_queue = queue.Queue() + cls.irs_p2p_data_queue = queue.Queue() + cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue) + cls.irs_frame_dispatcher = DISPATCHER(cls.config, + cls.irs_event_manager, + cls.irs_state_manager, + cls.irs_modem) + + # Frame loss probability in % + cls.loss_probability = 30 + + cls.channels_running = True + + def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER): + while self.channels_running: + # Transfer data between both parties + try: + transmission = modem_transmit_queue.get(timeout=1) + if random.randint(0, 100) < self.loss_probability: + self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + continue + + frame_bytes = transmission['bytes'] + frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) + except queue.Empty: + continue + self.logger.info(f"[{threading.current_thread().name}] Channel closed.") + + def waitForSession(self, q, outbound=False): + key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' + while True and self.channels_running: + ev = q.get() + if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]): + self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + break + + def establishChannels(self): + self.channels_running = True + self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, + args=[self.iss_modem.data_queue_received, + self.irs_frame_dispatcher], + name="ISS to IRS channel") + self.iss_to_irs_channel.start() + + self.irs_to_iss_channel = threading.Thread(target=self.channelWorker, + args=[self.irs_modem.data_queue_received, + self.iss_frame_dispatcher], + name="IRS to ISS channel") + self.irs_to_iss_channel.start() + + def waitAndCloseChannels(self): + self.waitForSession(self.iss_event_queue, True) + self.channels_running = False + self.waitForSession(self.irs_event_queue, False) + self.channels_running = False + + def testARQSessionSmallPayload(self): + # set Packet Error Rate (PER) / frame loss probability + self.loss_probability = 0 + + self.establishChannels() + params = { + 'destination': "BB2BBB-2", + 'origin': "AA1AAA-1", + } + cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) + session = cmd.run(self.iss_event_queue, self.iss_modem) + if session.session_id: + self.iss_state_manager.register_p2p_connection_session(session) + session.connect() + + + + self.waitAndCloseChannels() + del cmd + +if __name__ == '__main__': + unittest.main()