diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index 573d4077..d84eb634 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -11,6 +11,7 @@ class ARQ_SESSION_TYPES(Enum): raw_lzma = 10 raw_gzip = 11 p2pmsg_lzma = 20 + p2p_connection = 30 class ARQDataTypeHandler: def __init__(self, event_manager, state_manager): @@ -43,6 +44,12 @@ class ARQDataTypeHandler: 'failed' : self.failed_p2pmsg_lzma, 'transmitted': self.transmitted_p2pmsg_lzma, }, + ARQ_SESSION_TYPES.p2p_connection: { + 'prepare': self.prepare_p2p_connection, + 'handle': self.handle_p2p_connection, + 'failed': self.failed_p2p_connection, + 'transmitted': self.transmitted_p2p_connection, + }, } @staticmethod @@ -161,4 +168,32 @@ class ARQDataTypeHandler: def transmitted_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics) - return decompressed_data \ No newline at end of file + return decompressed_data + + + def prepare_p2p_connection(self, data): + compressed_data = gzip.compress(data) + self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + return compressed_data + + def handle_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes") + print(self.state_manager.p2p_connection_sessions) + return decompressed_data + + def failed_p2p_connection(self, data, statistics): + decompressed_data = gzip.decompress(data) + self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True) + print(self.state_manager.p2p_connection_sessions) + return decompressed_data + + def transmitted_p2p_connection(self, data, statistics): + + decompressed_data = gzip.decompress(data) + print(decompressed_data) + print(self.state_manager.p2p_connection_sessions) + for session_id in self.state_manager.p2p_connection_sessions: + print(session_id) + self.state_manager.p2p_connection_sessions[session_id].transmitted_arq() \ No newline at end of file diff --git a/modem/arq_session.py b/modem/arq_session.py index 0db45356..51672757 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -29,13 +29,13 @@ class ARQSession(): }, } - def __init__(self, config: dict, modem, dxcall: str): + def __init__(self, config: dict, modem, dxcall: str, state_manager): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.event_manager: EventManager = modem.event_manager - self.states = modem.states - + #self.states = modem.states + self.states = state_manager self.states.setARQ(True) self.snr = [] diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index fa6409fb..a0d1f2bd 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -59,8 +59,8 @@ class ARQSessionIRS(arq_session.ARQSession): }, } - def __init__(self, config: dict, modem, dxcall: str, session_id: int): - super().__init__(config, modem, dxcall) + def __init__(self, config: dict, modem, dxcall: str, session_id: int, state_manager): + super().__init__(config, modem, dxcall, state_manager) self.id = session_id self.dxcall = dxcall diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 611f591e..1c989c0c 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -54,7 +54,7 @@ class ARQSessionISS(arq_session.ARQSession): } def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes): - super().__init__(config, modem, dxcall) + super().__init__(config, modem, dxcall, state_manager) self.state_manager = state_manager self.data = data self.total_length = len(data) @@ -191,6 +191,10 @@ class ARQSessionISS(arq_session.ARQSession): self.set_state(ISS_State.ENDED) self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) + + print(self.state_manager.p2p_connection_sessions) + print(self.arq_data_type_handler.state_manager.p2p_connection_sessions) + self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length)) self.state_manager.remove_arq_iss_session(self.id) self.states.setARQ(False) diff --git a/modem/command_p2p_connection.py b/modem/command_p2p_connection.py index 6d3d1035..2ec44960 100644 --- a/modem/command_p2p_connection.py +++ b/modem/command_p2p_connection.py @@ -24,7 +24,7 @@ class P2PConnectionCommand(TxCommand): try: self.emit_event(event_queue) self.logger.info(self.log_message()) - session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.socket_command_handler) + session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.event_manager, self.socket_command_handler) if session.session_id: self.state_manager.register_p2p_connection_session(session) session.connect() diff --git a/modem/frame_handler_arq_session.py b/modem/frame_handler_arq_session.py index 8ea805cd..9709fb02 100644 --- a/modem/frame_handler_arq_session.py +++ b/modem/frame_handler_arq_session.py @@ -32,7 +32,8 @@ class ARQFrameHandler(frame_handler.FrameHandler): session = ARQSessionIRS(self.config, self.modem, frame['origin'], - session_id) + session_id, + self.states) self.states.register_arq_irs_session(session) elif frame['frame_type_int'] in [ diff --git a/modem/frame_handler_p2p_connection.py b/modem/frame_handler_p2p_connection.py index 7dc4931d..dc52a131 100644 --- a/modem/frame_handler_p2p_connection.py +++ b/modem/frame_handler_p2p_connection.py @@ -33,7 +33,7 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler): self.modem, frame['origin'], frame['destination_crc'], - self.states) + self.states, self.event_manager) session.session_id = session_id self.states.register_p2p_connection_session(session) diff --git a/modem/p2p_connection.py b/modem/p2p_connection.py index 9d4a626a..131762e7 100644 --- a/modem/p2p_connection.py +++ b/modem/p2p_connection.py @@ -7,6 +7,11 @@ import structlog import random from queue import Queue import time +from command_arq_raw import ARQRawCommand +import numpy as np +import base64 +from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES +from arq_session_iss import ARQSessionISS class States(Enum): NEW = 0 @@ -14,12 +19,13 @@ class States(Enum): CONNECT_SENT = 2 CONNECT_ACK_SENT = 3 CONNECTED = 4 - HEARTBEAT_SENT = 5 - HEARTBEAT_ACK_SENT = 6 + #HEARTBEAT_SENT = 5 + #HEARTBEAT_ACK_SENT = 6 PAYLOAD_SENT = 7 - DISCONNECTING = 8 - DISCONNECTED = 9 - FAILED = 10 + ARQ_SESSION = 8 + DISCONNECTING = 9 + DISCONNECTED = 10 + FAILED = 11 @@ -50,7 +56,7 @@ class P2PConnection: }, } - def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, socket_command_handler=None): + def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None): self.logger = structlog.get_logger(type(self).__name__) self.config = config self.frame_factory = data_frame_factory.DataFrameFactory(self.config) @@ -61,12 +67,16 @@ class P2PConnection: self.origin = origin self.bandwidth = 0 - self.states = state_manager + self.state_manager = state_manager + self.event_manager = event_manager self.modem = modem + self.modem.demodulator.set_decode_mode([]) self.p2p_data_rx_queue = Queue() self.p2p_data_tx_queue = Queue() + self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager) + self.state = States.NEW self.session_id = self.generate_id() @@ -85,12 +95,13 @@ class P2PConnection: self.start_data_processing_worker() + def start_data_processing_worker(self): """Starts a worker thread to monitor the transmit data queue and process data.""" def data_processing_worker(): while True: - if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT: + if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION: self.disconnect() return @@ -108,10 +119,10 @@ class P2PConnection: def generate_id(self): while True: random_int = random.randint(1,255) - if random_int not in self.states.p2p_connection_sessions: + if random_int not in self.state_manager.p2p_connection_sessions: return random_int - if len(self.states.p2p_connection_sessions) >= 255: + if len(self.state_manager.p2p_connection_sessions) >= 255: return False def set_details(self, snr, frequency_offset): @@ -201,7 +212,7 @@ class P2PConnection: def connected_irs(self, frame): self.log("CONNECTED IRS...........................") - self.states.register_p2p_connection_session(self) + self.state_manager.register_p2p_connection_session(self) self.set_state(States.CONNECTED) self.is_ISS = False self.orign = frame["origin"] @@ -229,10 +240,14 @@ class P2PConnection: if len(data) <= 11: mode = FREEDV_MODE.signalling + elif 11 < len(data) < 32: + mode = FREEDV_MODE.datac4 + else: + self.transmit_arq(data) + return payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data) - self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA, - mode=mode) + self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode) return def prepare_data_chunk(self, data, mode): @@ -274,12 +289,26 @@ class P2PConnection: if self.socket_command_handler: self.socket_command_handler.socket_respond_disconnected() + def transmit_arq(self, data): + self.set_state(States.ARQ_SESSION) + + print("----------------------------------------------------------------") + print(self.destination) + print(self.state_manager.p2p_connection_sessions) + + prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection) + iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte) + iss.id = self.session_id + if iss.id: + self.state_manager.register_arq_iss_session(iss) + iss.start() + return iss + + def transmitted_arq(self): + self.last_data_timestamp = time.time() + self.set_state(States.CONNECTED) + - def transmit_arq(self): - pass - #command = cmd_class(self.config, self.states, self.eve, params) - #app.logger.info(f"Command {command.get_name()} running...") - #if command.run(app.modem_events, app.service_manager.modem): def received_arq(self): pass diff --git a/tests/test_p2p_connection.py b/tests/test_p2p_connection.py index fdcfc3ee..c67ed2f2 100644 --- a/tests/test_p2p_connection.py +++ b/tests/test_p2p_connection.py @@ -117,9 +117,9 @@ class TestP2PConnectionSession(unittest.TestCase): key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' while True and self.channels_running: ev = q.get() - if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]): - self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") - break + #if key in ev and ('success' in ev[key] or 'P2P_CONNECTION_DISCONNECT_ACK' in ev[key]): + # self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + # break def establishChannels(self): self.channels_running = True @@ -162,10 +162,15 @@ class TestP2PConnectionSession(unittest.TestCase): session = self.iss_state_manager.get_p2p_connection_session(session_id) session.ENTIRE_CONNECTION_TIMEOUT = 15 # Generate and add 5 random entries to the queue - for _ in range(5): - random_entry = self.generate_random_string(2, 11) - session.p2p_data_tx_queue.put(random_entry) + for _ in range(3): + min_length = (30 * _ ) + 1 + max_length = (30 * _ ) + 1 + print(min_length) + print(max_length) + random_entry = self.generate_random_string(min_length, max_length) + session.p2p_data_tx_queue.put(random_entry) + session.p2p_data_tx_queue.put('12345') self.waitAndCloseChannels() @@ -185,6 +190,5 @@ class TestSocket: if b'DISCONNECTED\r\n' in self.sent_data: self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n') - if __name__ == '__main__': unittest.main()