mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
ARQ WIP - session aborting
This commit is contained in:
parent
2db37adf21
commit
cc60391d26
5 changed files with 46 additions and 20 deletions
|
@ -45,10 +45,8 @@ class ARQSession():
|
||||||
|
|
||||||
self.id = None
|
self.id = None
|
||||||
|
|
||||||
self.final = False # class wide final state for stopping transmissions on command
|
|
||||||
|
|
||||||
def log(self, message, isWarning = False):
|
def log(self, message, isWarning = False):
|
||||||
msg = f"[{type(self).__name__}][{self.state}]: {message}"
|
msg = f"[{type(self).__name__}][state={self.state}]: {message}"
|
||||||
logger = self.logger.warn if isWarning else self.logger.info
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
logger(msg)
|
logger(msg)
|
||||||
|
|
||||||
|
|
|
@ -68,6 +68,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.received_crc = None
|
self.received_crc = None
|
||||||
|
|
||||||
self.transmitted_acks = 0
|
self.transmitted_acks = 0
|
||||||
|
self.final = False
|
||||||
|
|
||||||
def set_decode_mode(self):
|
def set_decode_mode(self):
|
||||||
self.modem.demodulator.set_decode_mode(self.get_mode_by_speed_level(self.speed_level))
|
self.modem.demodulator.set_decode_mode(self.get_mode_by_speed_level(self.speed_level))
|
||||||
|
@ -204,7 +205,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.speed_level = new_speed_level
|
self.speed_level = new_speed_level
|
||||||
|
|
||||||
def stop_transmission(self):
|
def stop_transmission(self):
|
||||||
self.log(f"Stopping transmission...., setting final flag")
|
self.log(f"Stopping transmission... setting final flag")
|
||||||
self.final = True
|
self.final = True
|
||||||
|
|
||||||
def send_stop_ack(self, stop_frame):
|
def send_stop_ack(self, stop_frame):
|
||||||
|
|
|
@ -15,7 +15,9 @@ class ISS_State(Enum):
|
||||||
BURST_SENT = 3
|
BURST_SENT = 3
|
||||||
ENDED = 4
|
ENDED = 4
|
||||||
FAILED = 5
|
FAILED = 5
|
||||||
ABORTED = 6
|
ABORT = 6
|
||||||
|
ABORTING = 7
|
||||||
|
ABORTED = 8
|
||||||
|
|
||||||
class ARQSessionISS(arq_session.ARQSession):
|
class ARQSessionISS(arq_session.ARQSession):
|
||||||
|
|
||||||
|
@ -41,6 +43,20 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
},
|
},
|
||||||
ISS_State.FAILED:{
|
ISS_State.FAILED:{
|
||||||
FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_failed'
|
FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_failed'
|
||||||
|
},
|
||||||
|
ISS_State.ABORT: {
|
||||||
|
FRAME_TYPE.ARQ_BURST_ACK.value: 'abort_transmission',
|
||||||
|
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'abort_transmission',
|
||||||
|
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'abort_transmission',
|
||||||
|
},
|
||||||
|
ISS_State.ABORTING: {
|
||||||
|
FRAME_TYPE.ARQ_STOP_ACK.value: 'aborted',
|
||||||
|
FRAME_TYPE.ARQ_BURST_ACK.value: 'abort_transmission',
|
||||||
|
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'abort_transmission',
|
||||||
|
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'abort_transmission',
|
||||||
|
},
|
||||||
|
ISS_State.ABORTED: {
|
||||||
|
FRAME_TYPE.ARQ_STOP_ACK.value: 'aborted',
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,13 +70,15 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.state = ISS_State.NEW
|
self.state = ISS_State.NEW
|
||||||
self.id = self.generate_id()
|
self.id = self.generate_id()
|
||||||
|
|
||||||
|
self.retry = True
|
||||||
|
|
||||||
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
||||||
|
|
||||||
def generate_id(self):
|
def generate_id(self):
|
||||||
return random.randint(1,255)
|
return random.randint(1,255)
|
||||||
|
|
||||||
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode):
|
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode):
|
||||||
while retries > 0 and not self.final:
|
while retries > 0 and self.state not in [ISS_State.ABORT, ISS_State.ABORTED]:
|
||||||
self.event_frame_received = threading.Event()
|
self.event_frame_received = threading.Event()
|
||||||
if isinstance(frame_or_burst, list): burst = frame_or_burst
|
if isinstance(frame_or_burst, list): burst = frame_or_burst
|
||||||
else: burst = [frame_or_burst]
|
else: burst = [frame_or_burst]
|
||||||
|
@ -72,10 +90,13 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
return
|
return
|
||||||
self.log("Timeout!")
|
self.log("Timeout!")
|
||||||
retries = retries - 1
|
retries = retries - 1
|
||||||
|
|
||||||
|
if self.state == ISS_State.ABORTED:
|
||||||
|
self.log("session aborted initiated...")
|
||||||
|
return
|
||||||
|
|
||||||
self.set_state(ISS_State.FAILED)
|
self.set_state(ISS_State.FAILED)
|
||||||
self.transmission_failed()
|
self.transmission_failed()
|
||||||
if self.final:
|
|
||||||
self.send_stop()
|
|
||||||
|
|
||||||
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
||||||
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode])
|
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode])
|
||||||
|
@ -101,6 +122,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.set_state(ISS_State.INFO_SENT)
|
self.set_state(ISS_State.INFO_SENT)
|
||||||
|
|
||||||
def send_data(self, irs_frame):
|
def send_data(self, irs_frame):
|
||||||
|
|
||||||
self.set_speed_and_frames_per_burst(irs_frame)
|
self.set_speed_and_frames_per_burst(irs_frame)
|
||||||
|
|
||||||
if 'offset' in irs_frame:
|
if 'offset' in irs_frame:
|
||||||
|
@ -114,7 +136,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.transmission_ended(irs_frame)
|
self.transmission_ended(irs_frame)
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
self.transmission_failed(irs_frame)
|
self.transmission_failed()
|
||||||
return
|
return
|
||||||
|
|
||||||
payload_size = self.get_data_payload_size()
|
payload_size = self.get_data_payload_size()
|
||||||
|
@ -136,16 +158,21 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
|
|
||||||
def transmission_failed(self, irs_frame):
|
def transmission_failed(self, irs_frame):
|
||||||
self.set_state(ISS_State.FAILED)
|
self.set_state(ISS_State.FAILED)
|
||||||
self.log(f"Transmission failed! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
|
self.log(f"Transmission failed!")
|
||||||
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data),False)
|
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data),False)
|
||||||
|
|
||||||
def stop_transmission(self):
|
def abort_transmission(self, irs_frame=None):
|
||||||
self.log(f"Stopping transmission...")
|
self.log(f"Stopping transmission...")
|
||||||
self.set_state(ISS_State.FAILED)
|
self.set_state(ISS_State.ABORTING)
|
||||||
self.final = True
|
# interrupt possible pending retries
|
||||||
|
self.retry = False
|
||||||
|
self.retry = True
|
||||||
|
self.send_stop()
|
||||||
|
|
||||||
def send_stop(self):
|
def send_stop(self):
|
||||||
self.final = False
|
|
||||||
stop_frame = self.frame_factory.build_arq_stop(self.id)
|
stop_frame = self.frame_factory.build_arq_stop(self.id)
|
||||||
self.launch_twr(stop_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
self.launch_twr(stop_frame, 15, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
|
|
||||||
|
def aborted(self, irs_frame):
|
||||||
|
self.log("session aborted")
|
||||||
|
self.set_state(ISS_State.ABORTED)
|
|
@ -31,6 +31,7 @@ class ARQFrameHandler(frame_handler.FrameHandler):
|
||||||
elif frame['frame_type_int'] in [
|
elif frame['frame_type_int'] in [
|
||||||
FR.ARQ_SESSION_INFO.value,
|
FR.ARQ_SESSION_INFO.value,
|
||||||
FR.ARQ_BURST_FRAME.value,
|
FR.ARQ_BURST_FRAME.value,
|
||||||
|
FR.ARQ_STOP.value,
|
||||||
]:
|
]:
|
||||||
session = self.states.get_arq_irs_session(session_id)
|
session = self.states.get_arq_irs_session(session_id)
|
||||||
|
|
||||||
|
@ -38,7 +39,6 @@ class ARQFrameHandler(frame_handler.FrameHandler):
|
||||||
FR.ARQ_SESSION_OPEN_ACK.value,
|
FR.ARQ_SESSION_OPEN_ACK.value,
|
||||||
FR.ARQ_SESSION_INFO_ACK.value,
|
FR.ARQ_SESSION_INFO_ACK.value,
|
||||||
FR.ARQ_BURST_ACK.value,
|
FR.ARQ_BURST_ACK.value,
|
||||||
FR.ARQ_STOP.value,
|
|
||||||
FR.ARQ_STOP_ACK.value
|
FR.ARQ_STOP_ACK.value
|
||||||
]:
|
]:
|
||||||
session = self.states.get_arq_iss_session(session_id)
|
session = self.states.get_arq_iss_session(session_id)
|
||||||
|
|
|
@ -146,9 +146,9 @@ class StateManager:
|
||||||
|
|
||||||
def set_final_to_arq_transmissions(self):
|
def set_final_to_arq_transmissions(self):
|
||||||
for id in self.arq_irs_sessions:
|
for id in self.arq_irs_sessions:
|
||||||
self.arq_irs_sessions[id].stop_transmission()
|
self.arq_irs_sessions[id].abort_transmission()
|
||||||
for id in self.arq_iss_sessions:
|
for id in self.arq_iss_sessions:
|
||||||
self.arq_iss_sessions[id].stop_transmission()
|
self.arq_iss_sessions[id].abort_transmission()
|
||||||
|
|
||||||
def add_activity(self, activity_data):
|
def add_activity(self, activity_data):
|
||||||
# Generate a random 8-byte string as hex
|
# Generate a random 8-byte string as hex
|
||||||
|
|
Loading…
Reference in a new issue