first attempt with arq session inside connection

This commit is contained in:
DJ2LS 2024-03-19 22:10:58 +01:00
parent aaa2084bfd
commit bb8da6f5d8
9 changed files with 108 additions and 35 deletions

View file

@ -11,6 +11,7 @@ class ARQ_SESSION_TYPES(Enum):
raw_lzma = 10
raw_gzip = 11
p2pmsg_lzma = 20
p2p_connection = 30
class ARQDataTypeHandler:
def __init__(self, event_manager, state_manager):
@ -43,6 +44,12 @@ class ARQDataTypeHandler:
'failed' : self.failed_p2pmsg_lzma,
'transmitted': self.transmitted_p2pmsg_lzma,
},
ARQ_SESSION_TYPES.p2p_connection: {
'prepare': self.prepare_p2p_connection,
'handle': self.handle_p2p_connection,
'failed': self.failed_p2p_connection,
'transmitted': self.transmitted_p2p_connection,
},
}
@staticmethod
@ -161,4 +168,32 @@ class ARQDataTypeHandler:
def transmitted_p2pmsg_lzma(self, data, statistics):
decompressed_data = lzma.decompress(data)
message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics)
return decompressed_data
return decompressed_data
def prepare_p2p_connection(self, data):
compressed_data = gzip.compress(data)
self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
print(self.state_manager.p2p_connection_sessions)
return compressed_data
def handle_p2p_connection(self, data, statistics):
decompressed_data = gzip.decompress(data)
self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
print(self.state_manager.p2p_connection_sessions)
return decompressed_data
def failed_p2p_connection(self, data, statistics):
decompressed_data = gzip.decompress(data)
self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
print(self.state_manager.p2p_connection_sessions)
return decompressed_data
def transmitted_p2p_connection(self, data, statistics):
decompressed_data = gzip.decompress(data)
print(decompressed_data)
print(self.state_manager.p2p_connection_sessions)
for session_id in self.state_manager.p2p_connection_sessions:
print(session_id)
self.state_manager.p2p_connection_sessions[session_id].transmitted_arq()

View file

@ -29,13 +29,13 @@ class ARQSession():
},
}
def __init__(self, config: dict, modem, dxcall: str):
def __init__(self, config: dict, modem, dxcall: str, state_manager):
self.logger = structlog.get_logger(type(self).__name__)
self.config = config
self.event_manager: EventManager = modem.event_manager
self.states = modem.states
#self.states = modem.states
self.states = state_manager
self.states.setARQ(True)
self.snr = []

View file

@ -59,8 +59,8 @@ class ARQSessionIRS(arq_session.ARQSession):
},
}
def __init__(self, config: dict, modem, dxcall: str, session_id: int):
super().__init__(config, modem, dxcall)
def __init__(self, config: dict, modem, dxcall: str, session_id: int, state_manager):
super().__init__(config, modem, dxcall, state_manager)
self.id = session_id
self.dxcall = dxcall

View file

@ -54,7 +54,7 @@ class ARQSessionISS(arq_session.ARQSession):
}
def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes):
super().__init__(config, modem, dxcall)
super().__init__(config, modem, dxcall, state_manager)
self.state_manager = state_manager
self.data = data
self.total_length = len(data)
@ -191,6 +191,10 @@ class ARQSessionISS(arq_session.ARQSession):
self.set_state(ISS_State.ENDED)
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
print(self.state_manager.p2p_connection_sessions)
print(self.arq_data_type_handler.state_manager.p2p_connection_sessions)
self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
self.state_manager.remove_arq_iss_session(self.id)
self.states.setARQ(False)

View file

@ -24,7 +24,7 @@ class P2PConnectionCommand(TxCommand):
try:
self.emit_event(event_queue)
self.logger.info(self.log_message())
session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.socket_command_handler)
session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.event_manager, self.socket_command_handler)
if session.session_id:
self.state_manager.register_p2p_connection_session(session)
session.connect()

View file

@ -32,7 +32,8 @@ class ARQFrameHandler(frame_handler.FrameHandler):
session = ARQSessionIRS(self.config,
self.modem,
frame['origin'],
session_id)
session_id,
self.states)
self.states.register_arq_irs_session(session)
elif frame['frame_type_int'] in [

View file

@ -33,7 +33,7 @@ class P2PConnectionFrameHandler(frame_handler.FrameHandler):
self.modem,
frame['origin'],
frame['destination_crc'],
self.states)
self.states, self.event_manager)
session.session_id = session_id
self.states.register_p2p_connection_session(session)

View file

@ -7,6 +7,11 @@ import structlog
import random
from queue import Queue
import time
from command_arq_raw import ARQRawCommand
import numpy as np
import base64
from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES
from arq_session_iss import ARQSessionISS
class States(Enum):
NEW = 0
@ -14,12 +19,13 @@ class States(Enum):
CONNECT_SENT = 2
CONNECT_ACK_SENT = 3
CONNECTED = 4
HEARTBEAT_SENT = 5
HEARTBEAT_ACK_SENT = 6
#HEARTBEAT_SENT = 5
#HEARTBEAT_ACK_SENT = 6
PAYLOAD_SENT = 7
DISCONNECTING = 8
DISCONNECTED = 9
FAILED = 10
ARQ_SESSION = 8
DISCONNECTING = 9
DISCONNECTED = 10
FAILED = 11
@ -50,7 +56,7 @@ class P2PConnection:
},
}
def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, socket_command_handler=None):
def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None):
self.logger = structlog.get_logger(type(self).__name__)
self.config = config
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
@ -61,12 +67,16 @@ class P2PConnection:
self.origin = origin
self.bandwidth = 0
self.states = state_manager
self.state_manager = state_manager
self.event_manager = event_manager
self.modem = modem
self.modem.demodulator.set_decode_mode([])
self.p2p_data_rx_queue = Queue()
self.p2p_data_tx_queue = Queue()
self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager)
self.state = States.NEW
self.session_id = self.generate_id()
@ -85,12 +95,13 @@ class P2PConnection:
self.start_data_processing_worker()
def start_data_processing_worker(self):
"""Starts a worker thread to monitor the transmit data queue and process data."""
def data_processing_worker():
while True:
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT:
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION:
self.disconnect()
return
@ -108,10 +119,10 @@ class P2PConnection:
def generate_id(self):
while True:
random_int = random.randint(1,255)
if random_int not in self.states.p2p_connection_sessions:
if random_int not in self.state_manager.p2p_connection_sessions:
return random_int
if len(self.states.p2p_connection_sessions) >= 255:
if len(self.state_manager.p2p_connection_sessions) >= 255:
return False
def set_details(self, snr, frequency_offset):
@ -201,7 +212,7 @@ class P2PConnection:
def connected_irs(self, frame):
self.log("CONNECTED IRS...........................")
self.states.register_p2p_connection_session(self)
self.state_manager.register_p2p_connection_session(self)
self.set_state(States.CONNECTED)
self.is_ISS = False
self.orign = frame["origin"]
@ -229,10 +240,14 @@ class P2PConnection:
if len(data) <= 11:
mode = FREEDV_MODE.signalling
elif 11 < len(data) < 32:
mode = FREEDV_MODE.datac4
else:
self.transmit_arq(data)
return
payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data)
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,
mode=mode)
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode)
return
def prepare_data_chunk(self, data, mode):
@ -274,12 +289,26 @@ class P2PConnection:
if self.socket_command_handler:
self.socket_command_handler.socket_respond_disconnected()
def transmit_arq(self, data):
self.set_state(States.ARQ_SESSION)
print("----------------------------------------------------------------")
print(self.destination)
print(self.state_manager.p2p_connection_sessions)
prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection)
iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte)
iss.id = self.session_id
if iss.id:
self.state_manager.register_arq_iss_session(iss)
iss.start()
return iss
def transmitted_arq(self):
self.last_data_timestamp = time.time()
self.set_state(States.CONNECTED)
def transmit_arq(self):
pass
#command = cmd_class(self.config, self.states, self.eve, params)
#app.logger.info(f"Command {command.get_name()} running...")
#if command.run(app.modem_events, app.service_manager.modem):
def received_arq(self):
pass

View file

@ -117,9 +117,9 @@ class TestP2PConnectionSession(unittest.TestCase):
key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound'
while True and self.channels_running:
ev = q.get()
if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]):
self.logger.info(f"[{threading.current_thread().name}] {key} session ended.")
break
#if key in ev and ('success' in ev[key] or 'P2P_CONNECTION_DISCONNECT_ACK' in ev[key]):
# self.logger.info(f"[{threading.current_thread().name}] {key} session ended.")
# break
def establishChannels(self):
self.channels_running = True
@ -162,10 +162,15 @@ class TestP2PConnectionSession(unittest.TestCase):
session = self.iss_state_manager.get_p2p_connection_session(session_id)
session.ENTIRE_CONNECTION_TIMEOUT = 15
# Generate and add 5 random entries to the queue
for _ in range(5):
random_entry = self.generate_random_string(2, 11)
session.p2p_data_tx_queue.put(random_entry)
for _ in range(3):
min_length = (30 * _ ) + 1
max_length = (30 * _ ) + 1
print(min_length)
print(max_length)
random_entry = self.generate_random_string(min_length, max_length)
session.p2p_data_tx_queue.put(random_entry)
session.p2p_data_tx_queue.put('12345')
self.waitAndCloseChannels()
@ -185,6 +190,5 @@ class TestSocket:
if b'DISCONNECTED\r\n' in self.sent_data:
self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n')
if __name__ == '__main__':
unittest.main()