diff --git a/modem/arq_session.py b/modem/arq_session.py index fe6c9996..2e199a2c 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -29,13 +29,16 @@ class ARQSession(): # 3 bytes for the EOF End of File indicator in a data frame self.data_frame_eof = b"EOF" - + def log(self, message, isWarning = False): + msg = f"[{type(self).__name__}]: {message}" + logger = self.logger.warn if isWarning else self.logger.info + logger(msg) def get_mode_by_speed_level(self, speed_level): return self.MODE_BY_SPEED[speed_level] def transmit_frame(self, frame: bytearray): - self.logger.info("Transmitting frame") + self.log("Transmitting frame") modem_queue_item = { 'mode': self.get_mode_by_speed_level(self.speed_level), 'repeat': 1, @@ -46,7 +49,7 @@ class ARQSession(): def setState(self, state): self.state = state - self.logger.info(f"state changed to {state}") + self.log(f"state changed to {state}") def get_payload_size(self, speed_level): mode = self.MODE_BY_SPEED[speed_level] diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index f35020f5..0cbed199 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -15,10 +15,14 @@ class ARQSessionIRS(arq_session.ARQSession): TIMEOUT_DATA = 6 - def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int): + def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int, is_wide_band: bool): super().__init__(config, tx_frame_queue, dxcall) self.id = session_id + self.is_wide_band = is_wide_band + self.speed = 0 + self.version = 1 + self.snr = 0 self.state = self.STATE_CONN_REQ_RECEIVED @@ -40,9 +44,6 @@ class ARQSessionIRS(arq_session.ARQSession): def generate_id(self): pass - def log(self, message): - pass - def set_state(self, state): self.log(f"ARQ Session IRS {self.id} state {self.state}") self.state = state @@ -51,34 +52,46 @@ class ARQSessionIRS(arq_session.ARQSession): pass def runner(self): - isWideband = True - speed = 1 - version = 1 - - ack_frame = self.frame_factory.build_arq_session_connect_ack(isWideband, self.id, speed, version) - self.transmit_frame(ack_frame) - self.set_modem_decode_modes(None) - self.state = self.STATE_WAITING_DATA - while self.state == self.STATE_WAITING_DATA: - if not self.event_data_received.wait(self.TIMEOUT_DATA): - self.log("Timeout waiting for data") - self.state = self.STATE_FAILED - return + retries = self.RETRIES_TRANSFER + while retries > 0: + if self.event_data_received.wait(self.TIMEOUT_DATA): + retries = self.RETRIES_TRANSFER + self.append_data_to_burst_buffer() + + self.send_data_nack() + + self.state = self.STATE_FAILED + return self.log("Finished ARQ IRS session") def run(self): + self.send_session_ack() + self.state = self.STATE_WAITING_DATA self.thread = threading.Thread(target=self.runner, name=f"ARQ IRS Session {self.id}", daemon=True) self.thread.start() + def send_session_ack(self): + ack_frame = self.frame_factory.build_arq_session_connect_ack( + self.is_wide_band, + self.id, + self.speed, + self.version) + self.transmit_frame(ack_frame) - def on_data_received(self): + def send_data_nack(self): + nack = self.frame_factory.build_arq_burst_nack(self.session_id, self.snr, self.speed_level, + 10, # WTF? + 1) + self.transmit_frame(nack) + + def on_data_received(self, frame): if self.state != self.STATE_WAITING_DATA: raise RuntimeError(f"ARQ Session: Received data while in state {self.state}, expected {self.STATE_WAITING_DATA}") + self.rx_data_chain(frame) self.event_data_received.set() - def on_transfer_ack_received(self, ack): self.event_transfer_ack_received.set() self.speed_level = ack['speed_level'] @@ -314,7 +327,7 @@ class ARQSessionIRS(arq_session.ARQSession): # check if hmac signing enabled if self.enable_hmac: - self.log.info( + self.logger.info( "[Modem] [HMAC] Enabled", ) if salt_found := helpers.search_hmac_salt( @@ -331,7 +344,7 @@ class ARQSessionIRS(arq_session.ARQSession): # hmac signature wrong self.arq_process_received_data_frame(data, snr, signed=False) elif checksum_expected == checksum_received: - self.log.warning( + self.logger.warning( "[Modem] [HMAC] Disabled, using CRC", ) self.arq_process_received_data_frame(data, snr, signed=False) @@ -347,7 +360,7 @@ class ARQSessionIRS(arq_session.ARQSession): ) duration = time.time() - self.rx_start_of_transmission - self.log.warning( + self.logger.warning( "[Modem] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!", e="wrong crc", expected=checksum_expected.hex(), @@ -362,7 +375,7 @@ class ARQSessionIRS(arq_session.ARQSession): if self.enable_stats: self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration) - self.log.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish, + self.logger.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish, bytesperminute=self.states.arq_bytes_per_minute) self.send_burst_nack_frame(snr) diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 39a6cd73..07dfe205 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -112,11 +112,12 @@ class ARQSessionISS(arq_session.ARQSession): return False def on_transfer_ack_received(self, ack): - self.event_transfer_ack_received.set() self.speed_level = ack['speed_level'] + self.event_transfer_ack_received.set() def on_transfer_nack_received(self, nack): self.speed_level = nack['speed_level'] + self.event_transfer_ack_received.set() def on_disconnect_received(self): self.abort() diff --git a/modem/frame_handler_arq_session.py b/modem/frame_handler_arq_session.py index bbf4a46c..2de7d19b 100644 --- a/modem/frame_handler_arq_session.py +++ b/modem/frame_handler_arq_session.py @@ -15,18 +15,19 @@ class ARQFrameHandler(frame_handler.FrameHandler): if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_N.value, FR.ARQ_SESSION_OPEN_W.value]: session = ARQSessionIRS(self.config, self.tx_frame_queue, - frame['origin'], frame['session_id']) - self.states.register_arq_irs_session(session) + frame['origin'], + frame['session_id'], + frame['frame_type_int'] == FR.ARQ_SESSION_OPEN_W.value) + self.states.register_arq_irs_session(session, frame['frame_type_int'] == FR.ARQ_SESSION_OPEN_W.value) session.run() # ARQ session open ack received - if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_ACK_N.value, FR.ARQ_SESSION_OPEN_ACK_W.value]: + elif frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_ACK_N.value, FR.ARQ_SESSION_OPEN_ACK_W.value]: iss_session:ARQSessionISS = self.states.get_arq_iss_session(frame['session_id']) iss_session.on_connection_ack_received(frame) # ARQ session data frame received - if frame['frame_type_int'] in [FR.BURST_01.value, FR.BURST_02.value, FR.BURST_03.value, FR.BURST_04.value, FR.BURST_05.value]: + elif frame['frame_type_int'] in [FR.BURST_01.value, FR.BURST_02.value, FR.BURST_03.value, FR.BURST_04.value, FR.BURST_05.value]: print("received data frame....") irs_session:ARQSessionIRS = self.states.get_arq_irs_session(frame['session_id']) - irs_session.on_data_received() - irs_session.rx_data_chain(frame) \ No newline at end of file + irs_session.on_data_received(frame) diff --git a/modem/state_manager.py b/modem/state_manager.py index f59cab10..588a0e6d 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -119,7 +119,7 @@ class StateManager: raise RuntimeError(f"ARQ ISS Session '{session.id}' already exists!") self.arq_iss_sessions[session.id] = session - def register_arq_irs_session(self, session): + def register_arq_irs_session(self, session, is_wide_band): if session.id in self.arq_irs_sessions: raise RuntimeError(f"ARQ IRS Session '{session.id}' already exists!") self.arq_irs_sessions[session.id] = session