Change timeouts and retries in ARQ sessions.

This commit is contained in:
Pedro 2023-12-18 13:56:49 +01:00
parent 5786f1f1dd
commit 8bc8cdffcc
7 changed files with 20 additions and 20 deletions

View file

@ -74,6 +74,7 @@ class ARQSession():
def on_frame_received(self, frame): def on_frame_received(self, frame):
self.event_frame_received.set() self.event_frame_received.set()
self.log(f"Received {frame['frame_type']}")
frame_type = frame['frame_type_int'] frame_type = frame['frame_type_int']
if self.state in self.STATE_TRANSITION: if self.state in self.STATE_TRANSITION:
if frame_type in self.STATE_TRANSITION[self.state]: if frame_type in self.STATE_TRANSITION[self.state]:

View file

@ -17,8 +17,8 @@ class ARQSessionIRS(arq_session.ARQSession):
RETRIES_CONNECT = 3 RETRIES_CONNECT = 3
RETRIES_TRANSFER = 3 # we need to increase this RETRIES_TRANSFER = 3 # we need to increase this
TIMEOUT_CONNECT = 6 TIMEOUT_CONNECT = 3
TIMEOUT_DATA = 6 TIMEOUT_DATA = 12
STATE_TRANSITION = { STATE_TRANSITION = {
STATE_NEW: { STATE_NEW: {
@ -60,11 +60,9 @@ class ARQSessionIRS(arq_session.ARQSession):
def all_data_received(self): def all_data_received(self):
return self.total_length == self.received_bytes return self.total_length == self.received_bytes
def final_crc_check(self): def final_crc_matches(self) -> bool:
print(self.received_data) match = self.total_crc == helpers.get_crc_32(bytes(self.received_data)).hex()
print(self.total_crc) return match
print(helpers.get_crc_32(bytes(self.received_data)).hex())
return self.total_crc == helpers.get_crc_32(bytes(self.received_data)).hex()
def transmit_and_wait(self, frame, timeout, mode): def transmit_and_wait(self, frame, timeout, mode):
self.transmit_frame(frame, mode) self.transmit_frame(frame, mode)
@ -101,6 +99,8 @@ class ARQSessionIRS(arq_session.ARQSession):
self.total_crc = info_frame['total_crc'] self.total_crc = info_frame['total_crc']
self.dx_snr.append(info_frame['snr']) self.dx_snr.append(info_frame['snr'])
self.log(f"New transfer of {self.total_length} bytes")
self.calibrate_speed_settings() self.calibrate_speed_settings()
self.set_decode_mode() self.set_decode_mode()
info_ack = self.frame_factory.build_arq_session_info_ack( info_ack = self.frame_factory.build_arq_session_info_ack(
@ -117,7 +117,7 @@ class ARQSessionIRS(arq_session.ARQSession):
def process_incoming_data(self, frame): def process_incoming_data(self, frame):
if frame['offset'] != self.received_bytes: if frame['offset'] != self.received_bytes:
self.logger.info(f"Discarding data frame due to wrong offset", frame=frame) self.log(f"Discarding data offset {frame['offset']}")
return False return False
remaining_data_length = self.total_length - self.received_bytes remaining_data_length = self.total_length - self.received_bytes
@ -132,6 +132,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.received_data[frame['offset']:] = data_part self.received_data[frame['offset']:] = data_part
self.received_bytes += len(data_part) self.received_bytes += len(data_part)
self.log(f"Received {self.received_bytes}/{self.total_length} bytes")
return True return True
@ -149,7 +150,7 @@ class ARQSessionIRS(arq_session.ARQSession):
self.set_state(self.STATE_BURST_REPLY_SENT) self.set_state(self.STATE_BURST_REPLY_SENT)
return return
if self.final_crc_check(): if self.final_crc_matches():
self.log("All data received successfully!") self.log("All data received successfully!")
self.transmit_frame(ack, mode=FREEDV_MODE.signalling) self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
self.set_state(self.STATE_ENDED) self.set_state(self.STATE_ENDED)

View file

@ -16,9 +16,9 @@ class ARQSessionISS(arq_session.ARQSession):
STATE_ENDED = 4 STATE_ENDED = 4
STATE_FAILED = 5 STATE_FAILED = 5
RETRIES_CONNECT = 3 RETRIES_CONNECT = 10
TIMEOUT_CONNECT_ACK = 7 TIMEOUT_CONNECT_ACK = 3
TIMEOUT_TRANSFER = 10 TIMEOUT_TRANSFER = 3
STATE_TRANSITION = { STATE_TRANSITION = {
STATE_OPEN_SENT: { STATE_OPEN_SENT: {
@ -93,13 +93,13 @@ class ARQSessionISS(arq_session.ARQSession):
if 'offset' in irs_frame: if 'offset' in irs_frame:
self.confirmed_bytes = irs_frame['offset'] self.confirmed_bytes = irs_frame['offset']
self.log(f"IRS confirmed {self.confirmed_bytes}/{len(self.data)} bytes")
if self.confirmed_bytes == len(self.data): if self.confirmed_bytes == len(self.data):
self.set_state(self.STATE_ENDED) self.set_state(self.STATE_ENDED)
self.log("All data transfered!") self.log("All data transfered!")
return return
payload_size = self.get_data_payload_size() payload_size = self.get_data_payload_size()
print(f"payload size: {payload_size}")
burst = [] burst = []
for f in range(0, self.frames_per_burst): for f in range(0, self.frames_per_burst):
offset = self.confirmed_bytes offset = self.confirmed_bytes

View file

@ -159,7 +159,6 @@ class DataFrameFactory:
frame_length = frame_template["frame_length"] frame_length = frame_template["frame_length"]
else: else:
frame_length -= 2 frame_length -= 2
print(frame_length)
frame = bytearray(frame_length) frame = bytearray(frame_length)
frame[:1] = bytes([frametype.value]) frame[:1] = bytes([frametype.value])
@ -232,7 +231,6 @@ class DataFrameFactory:
whole_frame_length = self.get_bytes_per_frame(mode) whole_frame_length = self.get_bytes_per_frame(mode)
available = whole_frame_length - 2 # 2Bytes CRC16 available = whole_frame_length - 2 # 2Bytes CRC16
available -= 1 # Frame Type available -= 1 # Frame Type
print(self.template_list[type.value].items())
for field, length in self.template_list[type.value].items(): for field, length in self.template_list[type.value].items():
if field != 'frame_length' and isinstance(length, int): if field != 'frame_length' and isinstance(length, int):
available -= length available -= length

View file

@ -9,7 +9,6 @@ class EventManager:
self.lastpttstate = False self.lastpttstate = False
def broadcast(self, data): def broadcast(self, data):
self.log.debug(f"Broadcasting event: {data}")
for q in self.queues: for q in self.queues:
q.put(data) q.put(data)

View file

@ -101,8 +101,8 @@ class FrameHandler():
pass pass
def log(self): def log(self):
self.logger.info(f"[Frame Handler] Handling frame {self.details['frame']}") return
pass self.logger.info(f"[Frame Handler] Handling frame {self.details['frame']['frame_type']}")
def handle(self, frame, snr, frequency_offset, freedv_inst, bytes_per_frame): def handle(self, frame, snr, frequency_offset, freedv_inst, bytes_per_frame):
self.details['frame'] = frame self.details['frame'] = frame

View file

@ -54,15 +54,16 @@ class TestARQSession(unittest.TestCase):
cls.loss_probability = 50 cls.loss_probability = 50
def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER): def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER):
while True: while True:
frame_bytes = modem_transmit_queue.get() frame_bytes = modem_transmit_queue.get()
if random.randint(0, 100) < self.loss_probability: if random.randint(0, 100) < self.loss_probability:
self.logger.info(f"[{threading.current_thread().name}] Frame lost...") self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
continue continue
self.logger.info(f"[{threading.current_thread().name}] Redirecting frame")
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
def establishChannels(self): def establishChannels(self):
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
args=[self.iss_modem.data_queue_received, args=[self.iss_modem.data_queue_received,
@ -76,7 +77,7 @@ class TestARQSession(unittest.TestCase):
name = "IRS to ISS channel") name = "IRS to ISS channel")
self.irs_to_iss_channel.start() self.irs_to_iss_channel.start()
def testARQSessionSmallPayload(self): def xtestARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability # set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 30 self.loss_probability = 30