diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index 573d4077..a81c63d0 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -11,6 +11,7 @@ class ARQ_SESSION_TYPES(Enum): raw_lzma = 10 raw_gzip = 11 p2pmsg_lzma = 20 + p2p_connection = 30 class ARQDataTypeHandler: def __init__(self, event_manager, state_manager): @@ -43,6 +44,12 @@ class ARQDataTypeHandler: 'failed' : self.failed_p2pmsg_lzma, 'transmitted': self.transmitted_p2pmsg_lzma, }, + ARQ_SESSION_TYPES.p2p_connection: { + 'prepare': self.prepare_p2p_connection, + 'handle': self.handle_p2p_connection, + 'failed': self.failed_p2p_connection, + 'transmitted': self.transmitted_p2p_connection, + }, } @staticmethod @@ -161,4 +168,36 @@ class ARQDataTypeHandler: def transmitted_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics) - return decompressed_data \ No newline at end of file + return decompressed_data + + + def prepare_p2p_connection(self, data): + compressed_data = gzip.compress(data) + self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + return compressed_data + + def handle_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + print(decompressed_data) + print(self.state_manager.p2p_connection_sessions) + for session_id in self.state_manager.p2p_connection_sessions: + print(session_id) + self.state_manager.p2p_connection_sessions[session_id].received_arq(decompressed_data) + + def failed_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True) + print(self.state_manager.p2p_connection_sessions) + return decompressed_data + + def transmitted_p2p_connection(self, data, statistics): + + decompressed_data = gzip.decompress(data) + print(decompressed_data) + print(self.state_manager.p2p_connection_sessions) + for session_id in self.state_manager.p2p_connection_sessions: + print(session_id) + self.state_manager.p2p_connection_sessions[session_id].transmitted_arq() \ No newline at end of file diff --git a/modem/arq_session.py b/modem/arq_session.py index a6cfcf8b..5edc4e08 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -32,13 +32,13 @@ class ARQSession: }, } - def __init__(self, config: dict, modem, dxcall: str): + def __init__(self, config: dict, modem, dxcall: str, state_manager): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.event_manager: EventManager = modem.event_manager - self.states = modem.states - + #self.states = modem.states + self.states = state_manager self.states.setARQ(True) self.snr = [] diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 92bf0d83..52d94ca6 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -59,8 +59,8 @@ class ARQSessionIRS(arq_session.ARQSession): }, } - def __init__(self, config: dict, modem, dxcall: str, session_id: int): - super().__init__(config, modem, dxcall) + def __init__(self, config: dict, modem, dxcall: str, session_id: int, state_manager): + super().__init__(config, modem, dxcall, state_manager) self.id = session_id self.dxcall = dxcall diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 12f91d93..e0c74395 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -53,7 +53,7 @@ class ARQSessionISS(arq_session.ARQSession): } def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes): - super().__init__(config, modem, dxcall) + super().__init__(config, modem, dxcall, state_manager) self.state_manager = state_manager self.data = data self.total_length = len(data) @@ -191,6 +191,10 @@ class ARQSessionISS(arq_session.ARQSession): self.set_state(ISS_State.ENDED) self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) + + print(self.state_manager.p2p_connection_sessions) + print(self.arq_data_type_handler.state_manager.p2p_connection_sessions) + self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) self.state_manager.remove_arq_iss_session(self.id) self.states.setARQ(False) diff --git a/modem/command.py b/modem/command.py index 65b9dc4f..e65b0d5d 100644 --- a/modem/command.py +++ b/modem/command.py @@ -8,7 +8,7 @@ from arq_data_type_handler import ARQDataTypeHandler class TxCommand(): - def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}): + def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, socket_command_handler=None): self.config = config self.logger = structlog.get_logger(type(self).__name__) self.state_manager = state_manager @@ -16,6 +16,7 @@ class TxCommand(): self.set_params_from_api(apiParams) self.frame_factory = DataFrameFactory(config) self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager) + self.socket_command_handler = socket_command_handler def log(self, message, isWarning = False): msg = f"[{type(self).__name__}]: {message}" diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py new file mode 100644 index 00000000..2ec44960 --- /dev/null +++ b/modem/command_p2p_connection.py @@ -0,0 +1,37 @@ +import queue +from command import TxCommand +import api_validations +import base64 +from queue import Queue +from p2p_connection import P2PConnection + +class P2PConnectionCommand(TxCommand): + + def set_params_from_api(self, apiParams): + self.origin = apiParams['origin'] + if not api_validations.validate_freedata_callsign(self.origin): + self.origin = f"{self.origin}-0" + + self.destination = apiParams['destination'] + if not api_validations.validate_freedata_callsign(self.destination): + self.destination = f"{self.destination}-0" + + + def connect(self, event_queue: Queue, modem): + pass + + def run(self, event_queue: Queue, modem): + try: + self.emit_event(event_queue) + self.logger.info(self.log_message()) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.event_manager, self.socket_command_handler) + if session.session_id: + self.state_manager.register_p2p_connection_session(session) + session.connect() + return session + return False + + except Exception as e: + self.log(f"Error starting P2P Connection session: {e}", isWarning=True) + + return False \ No newline at end of file diff --git a/modem/config.ini.example b/modem/config.ini.example index c5cd6c18..1f077baa 100644 --- a/modem/config.ini.example +++ b/modem/config.ini.example @@ -49,6 +49,13 @@ enable_morse_identifier = False respond_to_cq = True tx_delay = 50 maximum_bandwidth = 1700 +enable_socket_interface = False + +[SOCKET_INTERFACE] +enable = False +host = 127.0.0.1 +cmd_port = 8000 +data_port = 8001 [MESSAGES] enable_auto_repeat = False diff --git a/modem/config.py b/modem/config.py index c131626b..0b7debca 100644 --- a/modem/config.py +++ b/modem/config.py @@ -59,7 +59,15 @@ class CONFIG: 'enable_morse_identifier': bool, 'maximum_bandwidth': int, 'respond_to_cq': bool, - 'tx_delay': int + 'tx_delay': int, + 'enable_socket_interface': bool, + }, + 'SOCKET_INTERFACE': { + 'enable' : bool, + 'host' : str, + 'cmd_port' : int, + 'data_port' : int, + }, 'MESSAGES': { 'enable_auto_repeat': bool, diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index cdd69362..c025c2ab 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -18,6 +18,7 @@ class DataFrameFactory: } def __init__(self, config): + self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}" self.mygrid = config['STATION']['mygrid'] @@ -28,6 +29,7 @@ class DataFrameFactory: self._load_ping_templates() self._load_fec_templates() self._load_arq_templates() + self._load_p2p_connection_templates() def _load_broadcast_templates(self): # cq frame @@ -160,6 +162,63 @@ class DataFrameFactory: "snr": 1, "flag": 1, } + + def _load_p2p_connection_templates(self): + # p2p connect request + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # connect ACK + self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "destination_crc": 3, + "origin": 6, + "session_id": 1, + } + + # heartbeat for "is alive" + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # ack heartbeat + self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # p2p payload frames + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD.value] = { + "frame_length": None, + "session_id": 1, + "sequence_id": 1, + "data": "dynamic", + } + + # p2p payload frame ack + self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + "sequence_id": 1, + } + + # heartbeat for "is alive" + self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + + # ack heartbeat + self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value] = { + "frame_length": self.LENGTH_SIG1_FRAME, + "session_id": 1, + } + def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME): @@ -404,8 +463,9 @@ class DataFrameFactory: "offset": offset.to_bytes(4, 'big'), "data": data, } - frame = self.construct(FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode)) - return frame + return self.construct( + FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode) + ) def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int, frames_per_burst: int, snr: int, flag_final=False, flag_checksum=False, flag_abort=False): @@ -428,3 +488,62 @@ class DataFrameFactory: "flag": flag.to_bytes(1, 'big'), } return self.construct(FR_TYPE.ARQ_BURST_ACK, payload) + + def build_p2p_connection_connect(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT, payload) + + def build_p2p_connection_connect_ack(self, destination, origin, session_id): + payload = { + "destination_crc": helpers.get_crc_24(destination), + "origin": helpers.callsign_to_bytes(origin), + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload) + + def build_p2p_connection_heartbeat(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload) + + def build_p2p_connection_heartbeat_ack(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload) + + def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + "data": data, + } + return self.construct( + FR_TYPE.P2P_CONNECTION_PAYLOAD, + payload, + self.get_bytes_per_frame(freedv_mode), + ) + + def build_p2p_connection_payload_ack(self, session_id, sequence_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + "sequence_id": sequence_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload) + + def build_p2p_connection_disconnect(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT, payload) + + def build_p2p_connection_disconnect_ack(self, session_id): + payload = { + "session_id": session_id.to_bytes(1, 'big'), + } + return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK, payload) diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index c7f860bb..e2a37ab2 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -13,8 +13,11 @@ from frame_handler import FrameHandler from frame_handler_ping import PingFrameHandler from frame_handler_cq import CQFrameHandler from frame_handler_arq_session import ARQFrameHandler +from frame_handler_p2p_connection import P2PConnectionFrameHandler from frame_handler_beacon import BeaconFrameHandler + + class DISPATCHER(): FRAME_HANDLER = { @@ -22,9 +25,18 @@ class DISPATCHER(): FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"}, FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"}, FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"}, - FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"}, - FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, - FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.P2P_CONNECTION_CONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT"}, + FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT ACK"}, + FR_TYPE.P2P_CONNECTION_DISCONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection DISCONNECT"}, + FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection DISCONNECT ACK"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD"}, + FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler, + "name": "P2P Connection PAYLOAD ACK"}, + + #FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, + #FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"}, FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"}, FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"}, @@ -82,7 +94,7 @@ class DISPATCHER(): if frametype not in self.FRAME_HANDLER: self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + "[DISPATCHER] ARQ - other frame type", frametype=FR_TYPE(frametype).name) return # instantiate handler diff --git a/modem/frame_handler.py b/modem/frame_handler.py index 46347d1e..7375b6c2 100644 --- a/modem/frame_handler.py +++ b/modem/frame_handler.py @@ -34,7 +34,7 @@ class FrameHandler(): ft = self.details['frame']['frame_type'] valid = False # Check for callsign checksum - if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']: + if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK', 'P2P_CONNECTION_CONNECT']: valid, mycallsign = helpers.check_callsign( call_with_ssid, self.details["frame"]["destination_crc"], @@ -51,6 +51,20 @@ class FrameHandler(): session_id = self.details['frame']['session_id'] if session_id in self.states.arq_iss_sessions: valid = True + + # check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT']: + valid, mycallsign = helpers.check_callsign( + call_with_ssid, + self.details["frame"]["destination_crc"], + self.config['STATION']['ssid_list']) + + #check for p2p connection + elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']: + session_id = self.details['frame']['session_id'] + if session_id in self.states.p2p_connection_sessions: + valid = True + else: valid = False diff --git a/modem/frame_handler_arq_session.py b/modem/frame_handler_arq_session.py index 8ea805cd..9709fb02 100644 --- a/modem/frame_handler_arq_session.py +++ b/modem/frame_handler_arq_session.py @@ -32,7 +32,8 @@ class ARQFrameHandler(frame_handler.FrameHandler): session = ARQSessionIRS(self.config, self.modem, frame['origin'], - session_id) + session_id, + self.states) self.states.register_arq_irs_session(session) elif frame['frame_type_int'] in [ diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py new file mode 100644 index 00000000..dc52a131 --- /dev/null +++ b/modem/frame_handler_p2p_connection.py @@ -0,0 +1,54 @@ +from queue import Queue +import frame_handler +from event_manager import EventManager +from state_manager import StateManager +from modem_frametypes import FRAME_TYPE as FR +from p2p_connection import P2PConnection + +class P2PConnectionFrameHandler(frame_handler.FrameHandler): + + def follow_protocol(self): + + if not self.should_respond(): + return + + frame = self.details['frame'] + session_id = frame['session_id'] + snr = self.details["snr"] + frequency_offset = self.details["frequency_offset"] + + if frame['frame_type_int'] == FR.P2P_CONNECTION_CONNECT.value: + + # Lost OPEN_ACK case .. ISS will retry opening a session + if session_id in self.states.arq_irs_sessions: + session = self.states.p2p_connection_sessions[session_id] + + # Normal case when receiving a SESSION_OPEN for the first time + else: + # if self.states.check_if_running_arq_session(): + # self.logger.warning("DISCARDING SESSION OPEN because of ongoing ARQ session ", frame=frame) + # return + print(frame) + session = P2PConnection(self.config, + self.modem, + frame['origin'], + frame['destination_crc'], + self.states, self.event_manager) + session.session_id = session_id + self.states.register_p2p_connection_session(session) + + elif frame['frame_type_int'] in [ + FR.P2P_CONNECTION_CONNECT_ACK.value, + FR.P2P_CONNECTION_DISCONNECT.value, + FR.P2P_CONNECTION_DISCONNECT_ACK.value, + FR.P2P_CONNECTION_PAYLOAD.value, + FR.P2P_CONNECTION_PAYLOAD_ACK.value, + ]: + session = self.states.get_p2p_connection_session(session_id) + + else: + self.logger.warning("DISCARDING FRAME", frame=frame) + return + + session.set_details(snr, frequency_offset) + session.on_frame_received(frame) diff --git a/modem/modem_frametypes.py b/modem/modem_frametypes.py index 582a7907..fbe33f77 100644 --- a/modem/modem_frametypes.py +++ b/modem/modem_frametypes.py @@ -6,9 +6,6 @@ from enum import Enum class FRAME_TYPE(Enum): """Lookup for frame types""" - ARQ_CONNECTION_OPEN = 1 - ARQ_CONNECTION_HB = 2 - ARQ_CONNECTION_CLOSE = 3 ARQ_STOP = 10 ARQ_STOP_ACK = 11 ARQ_SESSION_OPEN = 12 @@ -17,6 +14,14 @@ class FRAME_TYPE(Enum): ARQ_SESSION_INFO_ACK = 15 ARQ_BURST_FRAME = 20 ARQ_BURST_ACK = 21 + P2P_CONNECTION_CONNECT = 30 + P2P_CONNECTION_CONNECT_ACK = 31 + P2P_CONNECTION_HEARTBEAT = 32 + P2P_CONNECTION_HEARTBEAT_ACK = 33 + P2P_CONNECTION_PAYLOAD = 34 + P2P_CONNECTION_PAYLOAD_ACK = 35 + P2P_CONNECTION_DISCONNECT = 36 + P2P_CONNECTION_DISCONNECT_ACK = 37 MESH_BROADCAST = 100 MESH_SIGNALLING_PING = 101 MESH_SIGNALLING_PING_ACK = 102 diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py new file mode 100644 index 00000000..17d8fe65 --- /dev/null +++ b/modem/p2p_connection.py @@ -0,0 +1,315 @@ +import threading +from enum import Enum +from modem_frametypes import FRAME_TYPE +from codec2 import FREEDV_MODE +import data_frame_factory +import structlog +import random +from queue import Queue +import time +from command_arq_raw import ARQRawCommand +import numpy as np +import base64 +from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES +from arq_session_iss import ARQSessionISS + +class States(Enum): + NEW = 0 + CONNECTING = 1 + CONNECT_SENT = 2 + CONNECT_ACK_SENT = 3 + CONNECTED = 4 + #HEARTBEAT_SENT = 5 + #HEARTBEAT_ACK_SENT = 6 + PAYLOAD_SENT = 7 + ARQ_SESSION = 8 + DISCONNECTING = 9 + DISCONNECTED = 10 + FAILED = 11 + + + +class P2PConnection: + STATE_TRANSITION = { + States.NEW: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + }, + States.CONNECTING: { + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + }, + States.CONNECTED: { + FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs', + FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss', + FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + }, + States.PAYLOAD_SENT: { + FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data', + }, + States.DISCONNECTING: { + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', + }, + States.DISCONNECTED: { + FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect', + FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack', + + }, + } + + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None): + self.logger = structlog.get_logger(type(self).__name__) + self.config = config + self.frame_factory = data_frame_factory.DataFrameFactory(self.config) + + self.socket_command_handler = socket_command_handler + + self.destination = destination + self.origin = origin + self.bandwidth = 0 + + self.state_manager = state_manager + self.event_manager = event_manager + self.modem = modem + self.modem.demodulator.set_decode_mode([]) + + self.p2p_data_rx_queue = Queue() + self.p2p_data_tx_queue = Queue() + + self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager) + + + self.state = States.NEW + self.session_id = self.generate_id() + + self.event_frame_received = threading.Event() + + self.RETRIES_CONNECT = 1 + self.TIMEOUT_CONNECT = 10 + self.TIMEOUT_DATA = 5 + self.RETRIES_DATA = 5 + self.ENTIRE_CONNECTION_TIMEOUT = 100 + + self.is_ISS = False # Indicator, if we are ISS or IRS + + self.last_data_timestamp= time.time() + + self.start_data_processing_worker() + + + def start_data_processing_worker(self): + """Starts a worker thread to monitor the transmit data queue and process data.""" + + def data_processing_worker(): + while True: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION: + self.disconnect() + return + + if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED: + self.process_data_queue() + threading.Event().wait(0.1) + + + + + # Create and start the worker thread + worker_thread = threading.Thread(target=data_processing_worker, daemon=True) + worker_thread.start() + + def generate_id(self): + while True: + random_int = random.randint(1,255) + if random_int not in self.state_manager.p2p_connection_sessions: + return random_int + + if len(self.state_manager.p2p_connection_sessions) >= 255: + return False + + def set_details(self, snr, frequency_offset): + self.snr = snr + self.frequency_offset = frequency_offset + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def set_state(self, state): + if self.state == state: + self.log(f"{type(self).__name__} state {self.state.name} unchanged.") + else: + self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}") + self.state = state + + def on_frame_received(self, frame): + self.last_data_timestamp = time.time() + self.event_frame_received.set() + self.log(f"Received {frame['frame_type']}") + frame_type = frame['frame_type_int'] + if self.state in self.STATE_TRANSITION: + if frame_type in self.STATE_TRANSITION[self.state]: + action_name = self.STATE_TRANSITION[self.state][frame_type] + response = getattr(self, action_name)(frame) + + return + + self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") + + def transmit_frame(self, frame: bytearray, mode='auto'): + self.log("Transmitting frame") + if mode in ['auto']: + mode = self.get_mode_by_speed_level(self.speed_level) + + self.modem.transmit(mode, 1, 1, frame) + + def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode): + while retries > 0: + self.event_frame_received = threading.Event() + if isinstance(frame_or_burst, list): burst = frame_or_burst + else: burst = [frame_or_burst] + for f in burst: + self.transmit_frame(f, mode) + self.event_frame_received.clear() + self.log(f"Waiting {timeout} seconds...") + if self.event_frame_received.wait(timeout): + return + self.log("Timeout!") + retries = retries - 1 + + #self.connected_iss() # override connection state for simulation purposes + self.session_failed() + + def launch_twr(self, frame_or_burst, timeout, retries, mode): + twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True) + twr.start() + + def transmit_and_wait_irs(self, frame, timeout, mode): + self.event_frame_received.clear() + self.transmit_frame(frame, mode) + self.log(f"Waiting {timeout} seconds...") + #if not self.event_frame_received.wait(timeout): + # self.log("Timeout waiting for ISS. Session failed.") + # self.transmission_failed() + + def launch_twr_irs(self, frame, timeout, mode): + thread_wait = threading.Thread(target = self.transmit_and_wait_irs, + args = [frame, timeout, mode], daemon=True) + thread_wait.start() + + def connect(self): + self.set_state(States.CONNECTING) + self.is_ISS = True + session_open_frame = self.frame_factory.build_p2p_connection_connect(self.origin, self.destination, self.session_id) + self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + return + + def connected_iss(self, frame=None): + self.log("CONNECTED ISS...........................") + self.set_state(States.CONNECTED) + self.is_ISS = True + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) + + def connected_irs(self, frame): + self.log("CONNECTED IRS...........................") + self.state_manager.register_p2p_connection_session(self) + self.set_state(States.CONNECTED) + self.is_ISS = False + self.orign = frame["origin"] + self.destination = frame["destination_crc"] + + if self.socket_command_handler: + self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth) + + session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id) + self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def session_failed(self): + self.set_state(States.FAILED) + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() + + def process_data_queue(self, frame=None): + if not self.p2p_data_tx_queue.empty(): + print("processing data....") + + self.set_state(States.PAYLOAD_SENT) + data = self.p2p_data_tx_queue.get() + sequence_id = random.randint(0,255) + data = data.encode('utf-8') + + if len(data) <= 11: + mode = FREEDV_MODE.signalling + elif 11 < len(data) < 32: + mode = FREEDV_MODE.datac4 + else: + self.transmit_arq(data) + return + + payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode) + return + + def prepare_data_chunk(self, data, mode): + return data + + def received_data(self, frame): + print(frame) + self.p2p_data_rx_queue.put(frame['data']) + + ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0) + self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def transmit_data_ack(self, frame): + print(frame) + + def transmitted_data(self, frame): + print("transmitted data...") + self.set_state(States.CONNECTED) + + def disconnect(self): + if self.state not in [States.DISCONNECTING, States.DISCONNECTED]: + self.set_state(States.DISCONNECTING) + disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id) + self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) + return + + def received_disconnect(self, frame): + self.log("DISCONNECTED...............") + self.set_state(States.DISCONNECTED) + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() + self.is_ISS = False + disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id) + self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling) + + def received_disconnect_ack(self, frame): + self.log("DISCONNECTED...............") + self.set_state(States.DISCONNECTED) + if self.socket_command_handler: + self.socket_command_handler.socket_respond_disconnected() + + def transmit_arq(self, data): + self.set_state(States.ARQ_SESSION) + + print("----------------------------------------------------------------") + print(self.destination) + print(self.state_manager.p2p_connection_sessions) + + prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) + iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte) + iss.id = self.session_id + if iss.id: + self.state_manager.register_arq_iss_session(iss) + iss.start() + return iss + + def transmitted_arq(self): + self.last_data_timestamp = time.time() + self.set_state(States.CONNECTED) + + def received_arq(self, data): + self.last_data_timestamp = time.time() + self.set_state(States.CONNECTED) + self.p2p_data_rx_queue.put(data) + diff --git a/modem/server.py b/modem/server.py index dde5de07..24b6af35 100644 --- a/modem/server.py +++ b/modem/server.py @@ -327,7 +327,9 @@ def sock_states(sock): @atexit.register def stop_server(): try: - app.service_manager.stop_modem() + app.service_manager.modem_service.put("stop") + app.socket_interface_manager.stop_servers() + if app.service_manager.modem: app.service_manager.modem.sd_input_stream.stop audio.sd._terminate() @@ -346,6 +348,7 @@ if __name__ == "__main__": app.config_manager = CONFIG(config_file) # start modem + app.p2p_data_queue = queue.Queue() # queue which holds processing data of p2p connections app.state_queue = queue.Queue() # queue which holds latest states app.modem_events = queue.Queue() # queue which holds latest events app.modem_fft = queue.Queue() # queue which holds latest fft data @@ -357,6 +360,7 @@ if __name__ == "__main__": app.schedule_manager = ScheduleManager(app.MODEM_VERSION, app.config_manager, app.state_manager, app.event_manager) # start service manager app.service_manager = service_manager.SM(app) + # start modem service app.modem_service.put("start") # initialize database default values @@ -373,3 +377,4 @@ if __name__ == "__main__": modemport = 5000 app.run(modemaddress, modemport) + diff --git a/modem/service_manager.py b/modem/service_manager.py index 2f36a5bb..a2f9a0c1 100644 --- a/modem/service_manager.py +++ b/modem/service_manager.py @@ -5,7 +5,7 @@ import structlog import audio import radio_manager - +from socket_interface import SocketInterfaceHandler class SM: def __init__(self, app): @@ -19,7 +19,7 @@ class SM: self.state_manager = app.state_manager self.event_manager = app.event_manager self.schedule_manager = app.schedule_manager - + self.socket_interface_manager = None runner_thread = threading.Thread( target=self.runner, name="runner thread", daemon=True @@ -34,15 +34,23 @@ class SM: self.start_radio_manager() self.start_modem() + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers() + elif cmd in ['stop'] and self.modem: self.stop_modem() self.stop_radio_manager() + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager.stop_servers() # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) elif cmd in ['restart']: self.stop_modem() self.stop_radio_manager() + if self.config['MODEM']['enable_socket_interface']: + self.socket_interface_manager.stop_servers() + # we need to wait a bit for avoiding a portaudio crash threading.Event().wait(0.5) diff --git a/modem/socket_interface.py b/modem/socket_interface.py new file mode 100644 index 00000000..0b41b4fc --- /dev/null +++ b/modem/socket_interface.py @@ -0,0 +1,186 @@ +""" WORK IN PROGRESS by DJ2LS""" + +import socketserver +import threading +import structlog +import select +from queue import Queue +from socket_interface_commands import SocketCommandHandler + + +class CommandSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None): + self.state_manager = state_manager + self.event_manager = event_manager + self.config_manager = config_manager + self.modem = modem + self.logger = structlog.get_logger(type(self).__name__) + + self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager) + + self.handlers = { + 'CONNECT': self.command_handler.handle_connect, + 'DISCONNECT': self.command_handler.handle_disconnect, + 'MYCALL': self.command_handler.handle_mycall, + 'BW': self.command_handler.handle_bw, + 'ABORT': self.command_handler.handle_abort, + 'PUBLIC': self.command_handler.handle_public, + 'CWID': self.command_handler.handle_cwid, + 'LISTEN': self.command_handler.handle_listen, + 'COMPRESSION': self.command_handler.handle_compression, + 'WINLINK SESSION': self.command_handler.handle_winlink_session, + } + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def handle(self): + self.log(f"Client connected: {self.client_address}") + try: + while True: + data = self.request.recv(1024).strip() + if not data: + break + decoded_data = data.decode() + self.log(f"Command received from {self.client_address}: {decoded_data}") + self.parse_command(decoded_data) + finally: + self.log(f"Command connection closed with {self.client_address}") + + def parse_command(self, data): + for command in self.handlers: + if data.startswith(command): + # Extract command arguments after the command itself + args = data[len(command):].strip().split() + self.dispatch_command(command, args) + return + self.send_response("ERROR: Unknown command\r\n") + + def dispatch_command(self, command, data): + if command in self.handlers: + handler = self.handlers[command] + handler(data) + else: + self.send_response(f"Unknown command: {command}") + + + +class DataSocket(socketserver.BaseRequestHandler): + #def __init__(self, request, client_address, server): + def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None): + self.state_manager = state_manager + self.event_manager = event_manager + self.config_manager = config_manager + self.modem = modem + + self.logger = structlog.get_logger(type(self).__name__) + + super().__init__(request, client_address, server) + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def handle(self): + self.log(f"Data connection established with {self.client_address}") + + try: + while True: + + ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout + if ready_to_read: + self.data = self.request.recv(1024).strip() + if not self.data: + break + try: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}") + except Exception: + self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}") + + for session in self.state_manager.p2p_connection_sessions: + print(f"sessions: {session}") + session.p2p_data_tx_queue.put(self.data) + + # Check if there's something to send from the queue, without blocking + + for session_id in self.state_manager.p2p_connection_sessions: + session = self.state_manager.get_p2p_connection_session(session_id) + if not session.p2p_data_tx_queue.empty(): + data_to_send = session.p2p_data_tx_queue.get_nowait() # Use get_nowait to avoid blocking + self.request.sendall(data_to_send) + self.log(f"Sent data to {self.client_address}") + + finally: + self.log(f"Data connection closed with {self.client_address}") + + +#class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): +# allow_reuse_address = True + + +class CustomThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): + allow_reuse_address = True + + def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs): + self.extra_args = kwargs + super().__init__(server_address, RequestHandlerClass, bind_and_activate=bind_and_activate) + + def finish_request(self, request, client_address): + self.RequestHandlerClass(request, client_address, self, **self.extra_args) + +class SocketInterfaceHandler: + def __init__(self, modem, config_manager, state_manager, event_manager): + self.modem = modem + self.config_manager = config_manager + self.config = self.config_manager.read() + self.state_manager = state_manager + self.event_manager = event_manager + self.logger = structlog.get_logger(type(self).__name__) + self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"] + self.data_port = self.config["SOCKET_INTERFACE"]["data_port"] + self.command_server = None + self.data_server = None + self.command_server_thread = None + self.data_server_thread = None + + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) + + def start_servers(self): + # Method to start both command and data server threads + self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandSocket)) + self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataSocket)) + + self.command_server_thread.start() + self.data_server_thread.start() + + self.log(f"Interfaces started") + + def run_server(self, port, handler): + with CustomThreadedTCPServer(('127.0.0.1', port), handler, modem=self.modem, state_manager=self.state_manager, event_manager=self.event_manager, config_manager=self.config_manager) as server: + self.log(f"Server started on port {port}") + if port == self.command_port: + self.command_server = server + else: + self.data_server = server + server.serve_forever() + + def stop_servers(self): + # Gracefully shutdown the server + if self.command_server: + self.command_server.shutdown() + if self.data_server: + self.data_server.shutdown() + self.log(f"Interfaces stopped") + + def wait_for_server_threads(self): + # Wait for both server threads to finish + self.command_server_thread.join() + self.data_server_thread.join() diff --git a/modem/socket_interface_commands.py b/modem/socket_interface_commands.py new file mode 100644 index 00000000..fe4c18f4 --- /dev/null +++ b/modem/socket_interface_commands.py @@ -0,0 +1,75 @@ +""" WORK IN PROGRESS by DJ2LS""" +from command_p2p_connection import P2PConnectionCommand + +class SocketCommandHandler: + + def __init__(self, cmd_request, modem, config_manager, state_manager, event_manager): + self.cmd_request = cmd_request + self.modem = modem + self.config_manager = config_manager + self.state_manager = state_manager + self.event_manager = event_manager + + self.session = None + + def send_response(self, message): + full_message = f"{message}\r\n" + self.cmd_request.sendall(full_message.encode()) + + def handle_connect(self, data): + # Your existing connect logic + self.send_response("OK") + + params = { + 'origin': data[0], + 'destination': data[1], + } + cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self) + self.session = cmd.run(self.event_manager.queues, self.modem) + if self.session.session_id: + self.state_manager.register_p2p_connection_session(self.session) + self.session.connect() + + + def handle_disconnect(self, data): + # Your existing connect logic + self.send_response("OK") + + def handle_mycall(self, data): + # Logic for handling MYCALL command + self.send_response("OK") + + def handle_bw(self, data): + # Logic for handling BW command + self.send_response("OK") + + def handle_abort(self, data): + # Logic for handling ABORT command + self.send_response("OK") + + def handle_public(self, data): + # Logic for handling PUBLIC command + self.send_response("OK") + + def handle_cwid(self, data): + # Logic for handling CWID command + self.send_response("OK") + + def handle_listen(self, data): + # Logic for handling LISTEN command + self.send_response("OK") + + def handle_compression(self, data): + # Logic for handling COMPRESSION command + self.send_response("OK") + + def handle_winlink_session(self, data): + # Logic for handling WINLINK SESSION command + self.send_response("OK") + + def socket_respond_disconnected(self): + self.send_response("DISCONNECTED") + + def socket_respond_connected(self, mycall, dxcall, bandwidth): + message = f"CONNECTED {mycall} {dxcall} {bandwidth}" + self.send_response(message) diff --git a/modem/state_manager.py b/modem/state_manager.py index 69b6b44a..ca65e4fb 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -38,6 +38,8 @@ class StateManager: self.arq_iss_sessions = {} self.arq_irs_sessions = {} + self.p2p_connection_sessions = {} + #self.mesh_routing_table = [] self.radio_frequency = 0 @@ -214,3 +216,15 @@ class StateManager: "radio_rf_level": self.radio_rf_level, "s_meter_strength": self.s_meter_strength, } + + def register_p2p_connection_session(self, session): + if session.session_id in self.p2p_connection_sessions: + print("session already registered...") + return False + self.p2p_connection_sessions[session.session_id] = session + return True + + def get_p2p_connection_session(self, id): + if id not in self.p2p_connection_sessions: + pass + return self.p2p_connection_sessions[id] \ No newline at end of file diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index 8309565d..b9221475 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -221,7 +221,9 @@ class TestARQSession(unittest.TestCase): session = arq_session_irs.ARQSessionIRS(self.config, self.irs_modem, 'AA1AAA-1', - random.randint(0, 255)) + random.randint(0, 255), + self.irs_state_manager + ) self.irs_state_manager.register_arq_irs_session(session) for session_id in self.irs_state_manager.arq_irs_sessions: session = self.irs_state_manager.arq_irs_sessions[session_id] diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py new file mode 100644 index 00000000..44ceb5ae --- /dev/null +++ b/tests/test_p2p_connection.py @@ -0,0 +1,197 @@ +import sys +import time + +sys.path.append('modem') + +import unittest +import unittest.mock +from config import CONFIG +import helpers +import queue +import threading +import base64 +from command_p2p_connection import P2PConnectionCommand +from state_manager import StateManager +from frame_dispatcher import DISPATCHER +import random +import structlog +import numpy as np +from event_manager import EventManager +from state_manager import StateManager +from data_frame_factory import DataFrameFactory +import codec2 +import p2p_connection + +from socket_interface_commands import SocketCommandHandler + +class TestModem: + def __init__(self, event_q, state_q): + self.data_queue_received = queue.Queue() + self.demodulator = unittest.mock.Mock() + self.event_manager = EventManager([event_q]) + self.logger = structlog.get_logger('Modem') + self.states = StateManager(state_q) + + def getFrameTransmissionTime(self, mode): + samples = 0 + c2instance = codec2.open_instance(mode.value) + samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance) + samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance) + time = samples / 8000 + return time + + def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: + # Simulate transmission time + tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT + self.logger.info(f"TX {tx_time} seconds...") + threading.Event().wait(tx_time) + + transmission = { + 'mode': mode, + 'bytes': frames, + } + self.data_queue_received.put(transmission) + + +class TestP2PConnectionSession(unittest.TestCase): + + @classmethod + def setUpClass(cls): + config_manager = CONFIG('modem/config.ini.example') + cls.config = config_manager.read() + cls.logger = structlog.get_logger("TESTS") + cls.frame_factory = DataFrameFactory(cls.config) + + # ISS + cls.iss_config_manager = config_manager + cls.iss_state_manager = StateManager(queue.Queue()) + cls.iss_event_manager = EventManager([queue.Queue()]) + cls.iss_event_queue = queue.Queue() + cls.iss_state_queue = queue.Queue() + cls.iss_p2p_data_queue = queue.Queue() + + + cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue) + cls.iss_frame_dispatcher = DISPATCHER(cls.config, + cls.iss_event_manager, + cls.iss_state_manager, + cls.iss_modem) + + #cls.iss_socket_interface_handler = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager) + #cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234) + + # IRS + cls.irs_state_manager = StateManager(queue.Queue()) + cls.irs_event_manager = EventManager([queue.Queue()]) + cls.irs_event_queue = queue.Queue() + cls.irs_state_queue = queue.Queue() + cls.irs_p2p_data_queue = queue.Queue() + cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue) + cls.irs_frame_dispatcher = DISPATCHER(cls.config, + cls.irs_event_manager, + cls.irs_state_manager, + cls.irs_modem) + + # Frame loss probability in % + cls.loss_probability = 30 + + cls.channels_running = True + + cls.disconnect_received = False + + def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER): + while self.channels_running: + # Transfer data between both parties + try: + transmission = modem_transmit_queue.get(timeout=1) + if random.randint(0, 100) < self.loss_probability: + self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + continue + + frame_bytes = transmission['bytes'] + frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) + except queue.Empty: + continue + self.logger.info(f"[{threading.current_thread().name}] Channel closed.") + + def waitForSession(self, q, outbound=False): + while True and self.channels_running: + ev = q.get() + print(ev) + if 'P2P_CONNECTION_DISCONNECT_ACK' in ev or self.disconnect_received: + self.logger.info(f"[{threading.current_thread().name}] session ended.") + break + + def establishChannels(self): + self.channels_running = True + self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, + args=[self.iss_modem.data_queue_received, + self.irs_frame_dispatcher], + name="ISS to IRS channel") + self.iss_to_irs_channel.start() + + self.irs_to_iss_channel = threading.Thread(target=self.channelWorker, + args=[self.irs_modem.data_queue_received, + self.iss_frame_dispatcher], + name="IRS to ISS channel") + self.irs_to_iss_channel.start() + + def waitAndCloseChannels(self): + self.waitForSession(self.iss_event_queue, True) + self.channels_running = False + self.waitForSession(self.irs_event_queue, False) + self.channels_running = False + + def generate_random_string(self, min_length, max_length): + import string + length = random.randint(min_length, max_length) + return ''.join(random.choices(string.ascii_letters, k=length))# + + def DisabledtestARQSessionSmallPayload(self): + # set Packet Error Rate (PER) / frame loss probability + self.loss_probability = 0 + + self.establishChannels() + + handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager) + handler.handle_connect(["AA1AAA-1", "BB2BBB-2"]) + + self.connected_event = threading.Event() + self.connected_event.wait() + + for session_id in self.iss_state_manager.p2p_connection_sessions: + session = self.iss_state_manager.get_p2p_connection_session(session_id) + session.ENTIRE_CONNECTION_TIMEOUT = 15 + # Generate and add 5 random entries to the queue + for _ in range(3): + min_length = (30 * _ ) + 1 + max_length = (30 * _ ) + 1 + print(min_length) + print(max_length) + random_entry = self.generate_random_string(min_length, max_length) + session.p2p_data_tx_queue.put(random_entry) + + session.p2p_data_tx_queue.put('12345') + self.waitAndCloseChannels() + + +class TestSocket: + def __init__(self, test_class): + self.sent_data = [] # To capture data sent through this socket + self.test_class = test_class + def sendall(self, data): + print(f"Mock sendall called with data: {data}") + self.sent_data.append(data) + self.event_handler(data) + + def event_handler(self, data): + if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data: + self.test_class.connected_event.set() + + if b'DISCONNECTED\r\n' in self.sent_data: + self.disconnect_received = True + self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n') + +if __name__ == '__main__': + unittest.main()