From 17504e194cb7b58b934fcac787c512939bbfb2f3 Mon Sep 17 00:00:00 2001 From: DJ2LS Date: Sun, 10 Dec 2023 09:59:02 +0100 Subject: [PATCH] WIP adding arq data frame --- modem/arq_session.py | 7 + modem/arq_session_irs.py | 295 +++++++++++++++++++++++++++++++++++- modem/data_frame_factory.py | 2 +- 3 files changed, 299 insertions(+), 5 deletions(-) diff --git a/modem/arq_session.py b/modem/arq_session.py index 298b8ada..fe6c9996 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -24,6 +24,13 @@ class ARQSession(): self.id = None + # 3 bytes for the BOF Beginning of File indicator in a data frame + self.data_frame_bof = b"BOF" + # 3 bytes for the EOF End of File indicator in a data frame + self.data_frame_eof = b"EOF" + + + def get_mode_by_speed_level(self, speed_level): return self.MODE_BY_SPEED[speed_level] diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 99ff6dcd..846f7eff 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -20,14 +20,23 @@ class ARQSessionIRS(arq_session.ARQSession): self.id = session_id - self.received_data = b'' - self.state = self.STATE_CONN_REQ_RECEIVED self.event_data_received = threading.Event() self.frame_factory = data_frame_factory.DataFrameFactory(self.config) + # naming: + # frame = single frame + # burst = one or more frames. A burst will be acknowledged with a ACK + + # this is the buffer which holds received data temporarily for each burst + self.arq_rx_burst_buffer = [] + # this is our buffer, holding data, after we received a full burst + self.arq_rx_frame_buffer = b"" + # this variable holds the amount/size of data we've received the last time + self.arq_burst_last_data_size = 0 + def generate_id(self): pass @@ -68,7 +77,7 @@ class ARQSessionIRS(arq_session.ARQSession): if self.state != self.STATE_WAITING_DATA: raise RuntimeError(f"ARQ Session: Received data while in state {self.state}, expected {self.STATE_WAITING_DATA}") - self.received_data = data_frame["data"] + self.rx_data_chain(data_frame) self.event_data_received.set() @@ -88,4 +97,282 @@ class ARQSessionIRS(arq_session.ARQSession): self.event_connection_ack_received.clear() self.event_transfer_feedback.set() self.event_transfer_feedback.clear() - self.received_data = b'' + + def rx_data_chain(self, data_frame): + """ + Function for processing received frames in correct order + Args: + data_frame: {'frame_type': 'BURST_01', 'frame_type_int': 1, 'n_frames_per_burst': 1, 'session_id': 118, 'data': b'Hello world!'} + + + + Returns: + + """ + + # unpack some parameters from frame + data = data_frame["data"] + rx_n_frame_of_burst = data_frame["frame_type_int"] + rx_n_frames_per_burst = data_frame["n_frames_per_burst"] + session_id = data_frame["session_id"] + + self.init_rx_buffer() + self.append_data_to_burst_buffer(data) + + # Check if we received all frames in the burst by checking if burst buffer has no more "Nones" + if None not in self.arq_rx_burst_buffer: + # Stick burst together in case we received multiple frames per burst + burst_data = self.stick_burst_together() + + # check if we already received the burst in a transmission before + # use case: ACK packet from IRS to ISS got lost + if self.arq_rx_frame_buffer.endswith(burst_data): + self.log.info("[Modem] ARQ | RX | Frame already received - sending ACK again") + else: + # add burst to our data buffer + self.proces_burst(burst_data) + + # Check if we didn't receive a BOF and EOF yet to avoid sending + # ack frames if we already received all data + if not self.check_if_last_data_received(data): + self.acknowledge_burst() + return + + else: + # burst is missing some data...can happen for N > 1 frames per burst in case of packet loss + self.log.warning("[Modem] data_handler: missing data in burst buffer!",frame=rx_n_frame_of_burst + 1, frames=rx_n_frames_per_burst) + + + # check if we have a BOF ( Begin Of Frame ) or EOF (End Of Frame) flag in our data + bof_position, eof_position = self.search_for_bof_eof_flag() + if bof_position >= 0: + self.arq_extract_statistics_from_data_frame(bof_position, eof_position, snr) + + # now check if we received the entire data by BOF and EOF position + if self.check_if_entire_data_received(bof_position, eof_position): + # meanwhile we have our data + header information, we want't to separate them now + data, checksum, size, compression_factor = self.deconstruct_arq_frame(bof_position, eof_position) + self.states.set("arq_total_bytes", size) + + # do some HMAC and checksum related stuff + # TODO We need to split the calculate checksums so the entire logics is visible here + self.calculate_checksums(data, checksum) + + # Finally cleanup our buffers and states, + self.arq_cleanup() + + else: + print("something bad happened...") + + + + # -------------------------------------------------------------------- + # THIS AREA HOLDS RX CHAIN FUNCTIONS IN CHRONOLOGICAL PROCESSING ORDER + + def init_rx_buffer(self, rx_n_frames_per_burst): + # The RX burst buffer needs to have a fixed length filled with "None". + # We need this later for counting the "Nones" to detect missing data. + # Check if burst buffer has expected length else create it + if len(self.arq_rx_burst_buffer) != rx_n_frames_per_burst: + self.arq_rx_burst_buffer = [None] * rx_n_frames_per_burst + + def append_data_to_burst_buffer(self, data): + # Append data to rx burst buffer + self.arq_rx_burst_buffer[self.rx_n_frame_of_burst] = data_in[self.arq_burst_header_size:] # type: ignore + + def stick_burst_together(self): + # then iterate through burst buffer and stick the burst together + # the temp burst buffer is needed for checking, if we already received data + burst_data = b"" + for frame in self.arq_rx_burst_buffer: + burst_data += bytes(frame) # type: ignore + + # free up burst buffer + self.arq_rx_burst_buffer = [] + return burst_data + + def process_burst(self, burst_data): + # Here we are going to search for our data in the last received bytes. + # This reduces the chance we will lose the entire frame in the case of signalling frame loss + + # self.arq_rx_frame_buffer --> existing data + # temp_burst_buffer --> new data + # search_area --> area where we want to search + + search_area = self.arq_burst_last_data_size * self.rx_n_frames_per_burst + + search_position = len(self.arq_rx_frame_buffer) - search_area + # if search position < 0, then search position = 0 + search_position = max(0, search_position) + + # find position of data. returns -1 if nothing found in area else >= 0 + # we are beginning from the end, so if data exists twice or more, + # only the last one should be replaced + # we are going to only check position against minimum data frame payload + # use case: receive data, which already contains received data + # while the payload of data received before is shorter than actual payload + get_position = self.arq_rx_frame_buffer[search_position:].rfind( + burst_data[:self.arq_burst_minimum_payload] + ) + # if we find data, replace it at this position with the new data and strip it + if get_position >= 0: + self.arq_rx_frame_buffer = self.arq_rx_frame_buffer[ + : search_position + get_position + ] + self.log.warning( + "[Modem] ARQ | RX | replacing existing buffer data", + area=search_area, + pos=get_position, + ) + else: + self.log.debug("[Modem] ARQ | RX | appending data to buffer") + + # append data to our data store + self.arq_rx_frame_buffer += burst_data + + # finally update the data size, so we can use it for the next burst + self.arq_burst_last_data_size = len(burst_data) + + def check_if_last_data_received(self, frame): + # Check if we didn't receive a BOF and EOF yet to avoid sending + # ack frames if we already received all data + return bool( + self.rx_frame_bof_received + or self.rx_frame_eof_received + or frame.find(self.data_frame_eof) >= 0 + ) + + def acknowledge_burst(self): + # TODO WIP + self.arq_calculate_speed_level(snr) + self.send_burst_ack_frame(snr) + # Reset n retries per burst counter + self.n_retries_per_burst = 0 + + # calculate statistics + self.calculate_transfer_rate_rx( + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr + ) + + # send a network message with information + self.event_manager.send_custom_event( + freedata="modem-message", + arq="transmission", + status="receiving", + uuid=self.transmission_uuid, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + finished=self.states.arq_seconds_until_finish, + irs=helpers.bool_to_string(self.is_IRS) + ) + + def search_for_bof_eof_flag(self): + # We have a BOF and EOF flag in our data. If we received both we received our frame. + # In case of loosing data, but we received already a BOF and EOF we need to make sure, we + # received the complete last burst by checking it for Nones + bof_position = self.arq_rx_frame_buffer.find(self.data_frame_bof) + eof_position = self.arq_rx_frame_buffer.find(self.data_frame_eof) + return bof_position, eof_position + + def arq_extract_statistics_from_data_frame(self, bof_position, eof_position, snr): + payload = self.arq_rx_frame_buffer[ + bof_position + len(self.data_frame_bof): eof_position + ] + frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes + self.states.set("arq_total_bytes", frame_length) + compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes + # limit to max value of 255 + compression_factor = np.clip(compression_factor, 0, 255) + self.arq_compression_factor = compression_factor / 10 + self.calculate_transfer_rate_rx( + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr + ) + + def check_if_entire_data_received(self, bof_position, eof_position): + return ( + bof_position >= 0 + and eof_position > 0 + and None not in self.arq_rx_burst_buffer + ) + + def deconstruct_arq_frame(self, bof_position, eof_position): + # Extract raw data from buffer + payload = self.arq_rx_frame_buffer[bof_position + len(self.data_frame_bof): eof_position] + # Get the data frame crc + checksum = payload[:4] # 0:4 = 4 bytes + # Get the data frame length + size = int.from_bytes(payload[4:8], "big") # 4:8 = 4 bytes + compression_factor = int.from_bytes(payload[8:9], "big") # 8:9 = 1 byte + data = payload[9:] # this is our data + + return data, checksum, size, compression_factor + + + def calculate_checksums(self, data, checksum_expected): + # TODO WIP, we need to fix this + # lets do a crc calculation for our recevied data + checksum_received = helpers.get_crc_32(data) + + + # check if hmac signing enabled + if self.enable_hmac: + self.log.info( + "[Modem] [HMAC] Enabled", + ) + if salt_found := helpers.search_hmac_salt( + self.dxcallsign, + self.mycallsign, + checksum_expected, + data, + token_iters=100, + ): + # hmac digest received + self.arq_process_received_data_frame(data, snr, signed=True) + + else: + # hmac signature wrong + self.arq_process_received_data_frame(data, snr, signed=False) + elif checksum_expected == checksum_received: + self.log.warning( + "[Modem] [HMAC] Disabled, using CRC", + ) + self.arq_process_received_data_frame(data, snr, signed=False) + else: + self.event_manager.send_custom_event( + freedata="modem-message", + arq="transmission", + status="failed", + uuid=self.transmission_uuid, + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + irs=helpers.bool_to_string(self.is_IRS) + ) + + duration = time.time() - self.rx_start_of_transmission + self.log.warning( + "[Modem] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!", + e="wrong crc", + expected=checksum_expected.hex(), + received=checksum_received.hex(), + nacks=self.frame_nack_counter, + duration=duration, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, + data=data, + + ) + if self.enable_stats: + self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration) + + self.log.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish, + bytesperminute=self.states.arq_bytes_per_minute) + self.send_burst_nack_frame(snr) + + def arq_process_received_data_frame(self): + # TODO We need to port arq_process_received_data_frame function from deprecated_protocol_arq_session_irs to this place + # A smiley for the brave ones reading until this area + # :-) :-) :-) :-) + pass \ No newline at end of file diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 50e9e5f0..c1041b38 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -229,7 +229,7 @@ class DataFrameFactory: elif key == "gridsquare": extracted_data[key] = helpers.decode_grid(data) - elif key in ["session_id", "speed_level", "n_frames_per_burst"]: + elif key in ["session_id", "speed_level", "n_frames_per_burst", "arq_protocol_version"]: extracted_data[key] = int.from_bytes(data, 'big') else: