diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 91a0d45f..860401fb 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -68,7 +68,8 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_crc = None self.transmitted_acks = 0 - self.final = False + + self.abort = False def set_decode_mode(self): self.modem.demodulator.set_decode_mode(self.get_mode_by_speed_level(self.speed_level)) @@ -119,7 +120,7 @@ class ARQSessionIRS(arq_session.ARQSession): info_ack = self.frame_factory.build_arq_session_info_ack( self.id, self.total_crc, self.snr[0], - self.speed_level, self.frames_per_burst, flag_final=self.final) + self.speed_level, self.frames_per_burst, flag_abort=self.abort) self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) self.set_state(IRS_State.INFO_ACK_SENT) @@ -154,10 +155,10 @@ class ARQSessionIRS(arq_session.ARQSession): if not self.all_data_received(): ack = self.frame_factory.build_arq_burst_ack( self.id, self.received_bytes, - self.speed_level, self.frames_per_burst, self.snr[0], flag_final=self.final) + self.speed_level, self.frames_per_burst, self.snr[0], flag_abort=self.abort) # increase ack counter - self.transmitted_acks += 1 + # self.transmitted_acks += 1 self.set_state(IRS_State.BURST_REPLY_SENT) self.launch_transmit_and_wait(ack, self.TIMEOUT_DATA, mode=FREEDV_MODE.signalling) return @@ -205,9 +206,9 @@ class ARQSessionIRS(arq_session.ARQSession): if self.snr[0] >= self.SPEED_LEVEL_DICT[new_speed_level]["min_snr"]: self.speed_level = new_speed_level - def stop_transmission(self): - self.log(f"Stopping transmission... setting final flag") - self.final = True + def abort_transmission(self): + self.log(f"Aborting transmission... setting abort flag") + self.abort = True def send_stop_ack(self, stop_frame): stop_ack = self.frame_factory.build_arq_stop_ack(self.id) diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 5f9dfa94..0991d030 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -15,9 +15,9 @@ class ISS_State(Enum): BURST_SENT = 3 ENDED = 4 FAILED = 5 - ABORT = 6 - ABORTING = 7 - ABORTED = 8 + BREAK = 6 # state for interrupting actual retries + ABORTING = 7 # state while running abort sequence and waiting for stop ack + ABORTED = 8 # stop ack received class ARQSessionISS(arq_session.ARQSession): @@ -44,7 +44,7 @@ class ARQSessionISS(arq_session.ARQSession): ISS_State.FAILED:{ FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_failed' }, - ISS_State.ABORT: { + ISS_State.BREAK: { FRAME_TYPE.ARQ_BURST_ACK.value: 'abort_transmission', FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'abort_transmission', FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'abort_transmission', @@ -69,16 +69,13 @@ class ARQSessionISS(arq_session.ARQSession): self.state = ISS_State.NEW self.id = self.generate_id() - - self.retry = True - self.frame_factory = data_frame_factory.DataFrameFactory(self.config) def generate_id(self): return random.randint(1,255) def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode): - while retries > 0 and self.state not in [ISS_State.ABORT, ISS_State.ABORTED]: + while retries > 0 and self.state not in [ISS_State.BREAK, ISS_State.ABORTED]: self.event_frame_received = threading.Event() if isinstance(frame_or_burst, list): burst = frame_or_burst else: burst = [frame_or_burst] @@ -131,6 +128,10 @@ class ARQSessionISS(arq_session.ARQSession): self.event_manager.send_arq_session_progress( True, self.id, self.dxcall, self.confirmed_bytes, len(self.data), self.state.name) + if irs_frame["flag"]["ABORT"]: + self.transmission_aborted() + return + if irs_frame["flag"]["FINAL"]: if self.confirmed_bytes == len(self.data) and irs_frame["flag"]["CHECKSUM"]: self.transmission_ended(irs_frame) @@ -152,31 +153,39 @@ class ARQSessionISS(arq_session.ARQSession): self.set_state(ISS_State.BURST_SENT) def transmission_ended(self, irs_frame): + # final function for sucessfully ended transmissions self.set_state(ISS_State.ENDED) self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data),True, self.state.name) def transmission_failed(self, irs_frame=None): + # final function for failed transmissions self.set_state(ISS_State.FAILED) self.log(f"Transmission failed!") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data),False, self.state.name) def abort_transmission(self, irs_frame=None): + # function for starting the abort sequence self.log(f"aborting transmission...") self.event_manager.send_arq_session_finished( - True, self.id, self.dxcall, self.total_length, False, self.state.name) + True, self.id, self.dxcall, len(self.data), False, self.state.name) + + # break actual retries + self.set_state(ISS_State.BREAK) + + # start with abort sequence + # TODO: We have to wait some time here for avoiding collisions with actual transmissions... + # This could be done by the channel busy detection, for example, if part of def transmit() in modem.py self.set_state(ISS_State.ABORTING) - # interrupt possible pending retries - self.retry = False - self.retry = True + self.send_stop() def send_stop(self): stop_frame = self.frame_factory.build_arq_stop(self.id) self.launch_twr(stop_frame, 15, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) - def aborted(self, irs_frame): + def transmission_aborted(self, irs_frame): self.log("session aborted") self.set_state(ISS_State.ABORTED) self.event_manager.send_arq_session_finished( - True, self.id, self.dxcall, self.total_length, False, self.state.name) \ No newline at end of file + True, self.id, self.dxcall, len(self.data), False, self.state.name) \ No newline at end of file diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 2afee208..cb077f05 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -13,8 +13,9 @@ class DataFrameFactory: """ ARQ_FLAGS = { 'FINAL': 0, # Bit-position for indicating the FINAL state - 'CHECKSUM': 1, # Bit-position for indicating the CHECKSUM is correct or not - 'ENABLE_COMPRESSION': 2 # Bit-position for indicating compression is enabled + 'ABORT': 1, # Bit-position for indicating the ABORT request + 'CHECKSUM': 2, # Bit-position for indicating the CHECKSUM is correct or not + 'ENABLE_COMPRESSION': 3 # Bit-position for indicating compression is enabled } def __init__(self, config): @@ -370,10 +371,12 @@ class DataFrameFactory: return self.construct(FR_TYPE.ARQ_STOP_ACK, payload) - def build_arq_session_info_ack(self, session_id, total_crc, snr, speed_level, frames_per_burst, flag_final=False): + def build_arq_session_info_ack(self, session_id, total_crc, snr, speed_level, frames_per_burst, flag_final=False, flag_abort=False): flag = 0b00000000 if flag_final: flag = helpers.set_flag(flag, 'FINAL', True, self.ARQ_FLAGS) + if flag_abort: + flag = helpers.set_flag(flag, 'ABORT', True, self.ARQ_FLAGS) payload = { "frame_length": self.LENGTH_SIG0_FRAME, diff --git a/modem/server.py b/modem/server.py index f774cf64..5579ab39 100644 --- a/modem/server.py +++ b/modem/server.py @@ -227,9 +227,12 @@ def post_modem_send_raw_stop(): return api_response({"info": "endpoint for SENDING a STOP command via POST"}) if not app.state_manager.is_modem_running: api_abort('Modem not running', 503) - print("stop") - app.state_manager.set_final_to_arq_transmissions() - # server_commands.modem_arq_send_raw(request.json) + + for id in app.state_manager.arq_irs_sessions: + app.state_manager.arq_irs_sessions[id].abort_transmission() + for id in app.state_manager.arq_iss_sessions: + app.state_manager.arq_iss_sessions[id].abort_transmission() + return api_response(request.json) diff --git a/modem/state_manager.py b/modem/state_manager.py index 5c40a511..0d199039 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -148,12 +148,6 @@ class StateManager: raise RuntimeError(f"ARQ ISS Session '{id}' not found!") del self.arq_irs_sessions[id] - def set_final_to_arq_transmissions(self): - for id in self.arq_irs_sessions: - self.arq_irs_sessions[id].abort_transmission() - for id in self.arq_iss_sessions: - self.arq_iss_sessions[id].abort_transmission() - def add_activity(self, activity_data): # Generate a random 8-byte string as hex activity_id = np.random.bytes(8).hex()