mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
WIP ARQ
This commit is contained in:
parent
ee41286ff7
commit
cefaea2369
5 changed files with 52 additions and 34 deletions
|
@ -29,13 +29,16 @@ class ARQSession():
|
||||||
# 3 bytes for the EOF End of File indicator in a data frame
|
# 3 bytes for the EOF End of File indicator in a data frame
|
||||||
self.data_frame_eof = b"EOF"
|
self.data_frame_eof = b"EOF"
|
||||||
|
|
||||||
|
def log(self, message, isWarning = False):
|
||||||
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
def get_mode_by_speed_level(self, speed_level):
|
def get_mode_by_speed_level(self, speed_level):
|
||||||
return self.MODE_BY_SPEED[speed_level]
|
return self.MODE_BY_SPEED[speed_level]
|
||||||
|
|
||||||
def transmit_frame(self, frame: bytearray):
|
def transmit_frame(self, frame: bytearray):
|
||||||
self.logger.info("Transmitting frame")
|
self.log("Transmitting frame")
|
||||||
modem_queue_item = {
|
modem_queue_item = {
|
||||||
'mode': self.get_mode_by_speed_level(self.speed_level),
|
'mode': self.get_mode_by_speed_level(self.speed_level),
|
||||||
'repeat': 1,
|
'repeat': 1,
|
||||||
|
@ -46,7 +49,7 @@ class ARQSession():
|
||||||
|
|
||||||
def setState(self, state):
|
def setState(self, state):
|
||||||
self.state = state
|
self.state = state
|
||||||
self.logger.info(f"state changed to {state}")
|
self.log(f"state changed to {state}")
|
||||||
|
|
||||||
def get_payload_size(self, speed_level):
|
def get_payload_size(self, speed_level):
|
||||||
mode = self.MODE_BY_SPEED[speed_level]
|
mode = self.MODE_BY_SPEED[speed_level]
|
||||||
|
|
|
@ -15,10 +15,14 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
|
|
||||||
TIMEOUT_DATA = 6
|
TIMEOUT_DATA = 6
|
||||||
|
|
||||||
def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int):
|
def __init__(self, config: dict, tx_frame_queue: queue.Queue, dxcall: str, session_id: int, is_wide_band: bool):
|
||||||
super().__init__(config, tx_frame_queue, dxcall)
|
super().__init__(config, tx_frame_queue, dxcall)
|
||||||
|
|
||||||
self.id = session_id
|
self.id = session_id
|
||||||
|
self.is_wide_band = is_wide_band
|
||||||
|
self.speed = 0
|
||||||
|
self.version = 1
|
||||||
|
self.snr = 0
|
||||||
|
|
||||||
self.state = self.STATE_CONN_REQ_RECEIVED
|
self.state = self.STATE_CONN_REQ_RECEIVED
|
||||||
|
|
||||||
|
@ -40,9 +44,6 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
def generate_id(self):
|
def generate_id(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def log(self, message):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def set_state(self, state):
|
def set_state(self, state):
|
||||||
self.log(f"ARQ Session IRS {self.id} state {self.state}")
|
self.log(f"ARQ Session IRS {self.id} state {self.state}")
|
||||||
self.state = state
|
self.state = state
|
||||||
|
@ -51,34 +52,46 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def runner(self):
|
def runner(self):
|
||||||
isWideband = True
|
|
||||||
speed = 1
|
|
||||||
version = 1
|
|
||||||
|
|
||||||
ack_frame = self.frame_factory.build_arq_session_connect_ack(isWideband, self.id, speed, version)
|
|
||||||
self.transmit_frame(ack_frame)
|
|
||||||
|
|
||||||
self.set_modem_decode_modes(None)
|
self.set_modem_decode_modes(None)
|
||||||
self.state = self.STATE_WAITING_DATA
|
retries = self.RETRIES_TRANSFER
|
||||||
while self.state == self.STATE_WAITING_DATA:
|
while retries > 0:
|
||||||
if not self.event_data_received.wait(self.TIMEOUT_DATA):
|
if self.event_data_received.wait(self.TIMEOUT_DATA):
|
||||||
self.log("Timeout waiting for data")
|
retries = self.RETRIES_TRANSFER
|
||||||
self.state = self.STATE_FAILED
|
self.append_data_to_burst_buffer()
|
||||||
return
|
|
||||||
|
self.send_data_nack()
|
||||||
|
|
||||||
|
self.state = self.STATE_FAILED
|
||||||
|
return
|
||||||
|
|
||||||
self.log("Finished ARQ IRS session")
|
self.log("Finished ARQ IRS session")
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
self.send_session_ack()
|
||||||
|
self.state = self.STATE_WAITING_DATA
|
||||||
self.thread = threading.Thread(target=self.runner, name=f"ARQ IRS Session {self.id}", daemon=True)
|
self.thread = threading.Thread(target=self.runner, name=f"ARQ IRS Session {self.id}", daemon=True)
|
||||||
self.thread.start()
|
self.thread.start()
|
||||||
|
|
||||||
|
def send_session_ack(self):
|
||||||
|
ack_frame = self.frame_factory.build_arq_session_connect_ack(
|
||||||
|
self.is_wide_band,
|
||||||
|
self.id,
|
||||||
|
self.speed,
|
||||||
|
self.version)
|
||||||
|
self.transmit_frame(ack_frame)
|
||||||
|
|
||||||
def on_data_received(self):
|
def send_data_nack(self):
|
||||||
|
nack = self.frame_factory.build_arq_burst_nack(self.session_id, self.snr, self.speed_level,
|
||||||
|
10, # WTF?
|
||||||
|
1)
|
||||||
|
self.transmit_frame(nack)
|
||||||
|
|
||||||
|
def on_data_received(self, frame):
|
||||||
if self.state != self.STATE_WAITING_DATA:
|
if self.state != self.STATE_WAITING_DATA:
|
||||||
raise RuntimeError(f"ARQ Session: Received data while in state {self.state}, expected {self.STATE_WAITING_DATA}")
|
raise RuntimeError(f"ARQ Session: Received data while in state {self.state}, expected {self.STATE_WAITING_DATA}")
|
||||||
|
self.rx_data_chain(frame)
|
||||||
self.event_data_received.set()
|
self.event_data_received.set()
|
||||||
|
|
||||||
|
|
||||||
def on_transfer_ack_received(self, ack):
|
def on_transfer_ack_received(self, ack):
|
||||||
self.event_transfer_ack_received.set()
|
self.event_transfer_ack_received.set()
|
||||||
self.speed_level = ack['speed_level']
|
self.speed_level = ack['speed_level']
|
||||||
|
@ -314,7 +327,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
|
|
||||||
# check if hmac signing enabled
|
# check if hmac signing enabled
|
||||||
if self.enable_hmac:
|
if self.enable_hmac:
|
||||||
self.log.info(
|
self.logger.info(
|
||||||
"[Modem] [HMAC] Enabled",
|
"[Modem] [HMAC] Enabled",
|
||||||
)
|
)
|
||||||
if salt_found := helpers.search_hmac_salt(
|
if salt_found := helpers.search_hmac_salt(
|
||||||
|
@ -331,7 +344,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
# hmac signature wrong
|
# hmac signature wrong
|
||||||
self.arq_process_received_data_frame(data, snr, signed=False)
|
self.arq_process_received_data_frame(data, snr, signed=False)
|
||||||
elif checksum_expected == checksum_received:
|
elif checksum_expected == checksum_received:
|
||||||
self.log.warning(
|
self.logger.warning(
|
||||||
"[Modem] [HMAC] Disabled, using CRC",
|
"[Modem] [HMAC] Disabled, using CRC",
|
||||||
)
|
)
|
||||||
self.arq_process_received_data_frame(data, snr, signed=False)
|
self.arq_process_received_data_frame(data, snr, signed=False)
|
||||||
|
@ -347,7 +360,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
)
|
)
|
||||||
|
|
||||||
duration = time.time() - self.rx_start_of_transmission
|
duration = time.time() - self.rx_start_of_transmission
|
||||||
self.log.warning(
|
self.logger.warning(
|
||||||
"[Modem] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
|
"[Modem] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
|
||||||
e="wrong crc",
|
e="wrong crc",
|
||||||
expected=checksum_expected.hex(),
|
expected=checksum_expected.hex(),
|
||||||
|
@ -362,7 +375,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
if self.enable_stats:
|
if self.enable_stats:
|
||||||
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration)
|
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration)
|
||||||
|
|
||||||
self.log.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish,
|
self.logger.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish,
|
||||||
bytesperminute=self.states.arq_bytes_per_minute)
|
bytesperminute=self.states.arq_bytes_per_minute)
|
||||||
self.send_burst_nack_frame(snr)
|
self.send_burst_nack_frame(snr)
|
||||||
|
|
||||||
|
|
|
@ -112,11 +112,12 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def on_transfer_ack_received(self, ack):
|
def on_transfer_ack_received(self, ack):
|
||||||
self.event_transfer_ack_received.set()
|
|
||||||
self.speed_level = ack['speed_level']
|
self.speed_level = ack['speed_level']
|
||||||
|
self.event_transfer_ack_received.set()
|
||||||
|
|
||||||
def on_transfer_nack_received(self, nack):
|
def on_transfer_nack_received(self, nack):
|
||||||
self.speed_level = nack['speed_level']
|
self.speed_level = nack['speed_level']
|
||||||
|
self.event_transfer_ack_received.set()
|
||||||
|
|
||||||
def on_disconnect_received(self):
|
def on_disconnect_received(self):
|
||||||
self.abort()
|
self.abort()
|
||||||
|
|
|
@ -15,18 +15,19 @@ class ARQFrameHandler(frame_handler.FrameHandler):
|
||||||
if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_N.value, FR.ARQ_SESSION_OPEN_W.value]:
|
if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_N.value, FR.ARQ_SESSION_OPEN_W.value]:
|
||||||
session = ARQSessionIRS(self.config,
|
session = ARQSessionIRS(self.config,
|
||||||
self.tx_frame_queue,
|
self.tx_frame_queue,
|
||||||
frame['origin'], frame['session_id'])
|
frame['origin'],
|
||||||
self.states.register_arq_irs_session(session)
|
frame['session_id'],
|
||||||
|
frame['frame_type_int'] == FR.ARQ_SESSION_OPEN_W.value)
|
||||||
|
self.states.register_arq_irs_session(session, frame['frame_type_int'] == FR.ARQ_SESSION_OPEN_W.value)
|
||||||
session.run()
|
session.run()
|
||||||
|
|
||||||
# ARQ session open ack received
|
# ARQ session open ack received
|
||||||
if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_ACK_N.value, FR.ARQ_SESSION_OPEN_ACK_W.value]:
|
elif frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_ACK_N.value, FR.ARQ_SESSION_OPEN_ACK_W.value]:
|
||||||
iss_session:ARQSessionISS = self.states.get_arq_iss_session(frame['session_id'])
|
iss_session:ARQSessionISS = self.states.get_arq_iss_session(frame['session_id'])
|
||||||
iss_session.on_connection_ack_received(frame)
|
iss_session.on_connection_ack_received(frame)
|
||||||
|
|
||||||
# ARQ session data frame received
|
# ARQ session data frame received
|
||||||
if frame['frame_type_int'] in [FR.BURST_01.value, FR.BURST_02.value, FR.BURST_03.value, FR.BURST_04.value, FR.BURST_05.value]:
|
elif frame['frame_type_int'] in [FR.BURST_01.value, FR.BURST_02.value, FR.BURST_03.value, FR.BURST_04.value, FR.BURST_05.value]:
|
||||||
print("received data frame....")
|
print("received data frame....")
|
||||||
irs_session:ARQSessionIRS = self.states.get_arq_irs_session(frame['session_id'])
|
irs_session:ARQSessionIRS = self.states.get_arq_irs_session(frame['session_id'])
|
||||||
irs_session.on_data_received()
|
irs_session.on_data_received(frame)
|
||||||
irs_session.rx_data_chain(frame)
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ class StateManager:
|
||||||
raise RuntimeError(f"ARQ ISS Session '{session.id}' already exists!")
|
raise RuntimeError(f"ARQ ISS Session '{session.id}' already exists!")
|
||||||
self.arq_iss_sessions[session.id] = session
|
self.arq_iss_sessions[session.id] = session
|
||||||
|
|
||||||
def register_arq_irs_session(self, session):
|
def register_arq_irs_session(self, session, is_wide_band):
|
||||||
if session.id in self.arq_irs_sessions:
|
if session.id in self.arq_irs_sessions:
|
||||||
raise RuntimeError(f"ARQ IRS Session '{session.id}' already exists!")
|
raise RuntimeError(f"ARQ IRS Session '{session.id}' already exists!")
|
||||||
self.arq_irs_sessions[session.id] = session
|
self.arq_irs_sessions[session.id] = session
|
||||||
|
|
Loading…
Reference in a new issue