diff --git a/modem/config.py b/modem/config.py index 7a955821..aabd3c12 100644 --- a/modem/config.py +++ b/modem/config.py @@ -44,6 +44,8 @@ class CONFIG: 'tuning_range_fmax': int, 'tuning_range_fmin': int, 'enable_fsk': bool, + 'enable_hmac': bool, + 'enable_morse_identifier': bool, 'enable_low_bandwidth_mode': bool, 'respond_to_cq': bool, 'rx_buffer_size': int, diff --git a/modem/data_handler.py b/modem/data_handler.py index 6d6be88d..8ac6235c 100644 --- a/modem/data_handler.py +++ b/modem/data_handler.py @@ -9,7 +9,6 @@ Created on Sun Dec 27 20:43:40 2020 # pylint: disable=fixme -import os import base64 import sys import threading @@ -23,7 +22,6 @@ import codec2 import helpers import modem import numpy as np -from global_instances import ARQ, Modem import structlog import stats import ujson as json @@ -55,14 +53,17 @@ class DATA: self.enable_hmac = config['MODEM']['enable_hmac'] self.enable_stats = config['STATION']['enable_stats'] self.enable_morse_identifier = config['MODEM']['enable_morse_identifier'] - + self.arq_rx_buffer_size = config['MODEM']['rx_buffer_size'] + self.enable_experimental_features = False # Enable general responding to channel openers for example # this can be combined with a callsign blacklist for example self.respond_to_call = True - - - # TODO we need to pass this information from modem when receiving a burst + # ARQ PROTOCOL VERSION + # v.5 - signalling frame uses datac0 + # v.6 - signalling frame uses datac13 + # v.7 - adjusting ARQ timeouts, not done yet + self.arq_protocol_version = 8 self.modem_frequency_offset = 0 self.dxcallsign = b"ZZ9YY-0" @@ -98,6 +99,8 @@ class DATA: self.session_connect_max_retries = 15 self.irs_buffer_position = 0 + self.arq_compression_factor = 0 + # actual n retries of burst self.tx_n_retry_of_burst = 0 @@ -122,7 +125,8 @@ class DATA: self.data_frame_bof = b"BOF" # 3 bytes for the EOF End of File indicator in a data frame self.data_frame_eof = b"EOF" - + self.arq_rx_burst_buffer = [] + self.arq_rx_frame_buffer = b"" self.tx_n_max_retries_per_burst = 40 self.rx_n_max_retries_per_burst = 40 self.n_retries_per_burst = 0 @@ -188,7 +192,7 @@ class DATA: self.time_list = self.time_list_high_bw self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) # minimum payload for arq burst # import for avoiding byteorder bug and buffer search area @@ -466,7 +470,7 @@ class DATA: ) # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync - # if ARQ.rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: + # if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: # self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") # self.c_lib.freedv_set_sync(freedv, 0) @@ -575,7 +579,7 @@ class DATA: ack_frame[1:2] = self.session_id ack_frame[2:3] = helpers.snr_to_bytes(snr) ack_frame[3:4] = bytes([int(self.speed_level)]) - ack_frame[4:8] = len(ARQ.rx_frame_buffer).to_bytes(4, byteorder="big") + ack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big") # wait if we have a channel busy condition if self.states.channel_busy: @@ -602,10 +606,9 @@ class DATA: def send_retransmit_request_frame(self) -> None: # check where a None is in our burst buffer and do frame+1, because lists start at 0 # FIXME Check to see if there's a `frame - 1` in the receive portion. Remove both if there is. - #print(ARQ.rx_burst_buffer) missing_frames = [ frame + 1 - for frame, element in enumerate(ARQ.rx_burst_buffer) + for frame, element in enumerate(self.arq_rx_burst_buffer) if element is None ] @@ -626,7 +629,7 @@ class DATA: nack_frame[1:2] = self.session_id nack_frame[2:3] = helpers.snr_to_bytes(snr) nack_frame[3:4] = bytes([int(self.speed_level)]) - nack_frame[4:8] = len(ARQ.rx_frame_buffer).to_bytes(4, byteorder="big") + nack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big") # TRANSMIT NACK FRAME FOR BURST # TODO Do we have to send ident frame? @@ -647,7 +650,7 @@ class DATA: self.frame_nack_counter += 1 # we need to clear our rx burst buffer - ARQ.rx_burst_buffer = [] + self.arq_rx_burst_buffer = [] # Create and send ACK frame self.log.info("[Modem] ARQ | RX | SENDING NACK") @@ -657,7 +660,7 @@ class DATA: nack_frame[2:3] = helpers.snr_to_bytes(0) nack_frame[3:4] = bytes([int(self.speed_level)]) nack_frame[4:5] = bytes([int(tx_n_frames_per_burst)]) - nack_frame[5:9] = len(ARQ.rx_frame_buffer).to_bytes(4, byteorder="big") + nack_frame[5:9] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big") # wait if we have a channel busy condition if self.states.channel_busy: @@ -722,11 +725,11 @@ class DATA: # The RX burst buffer needs to have a fixed length filled with "None". # We need this later for counting the "Nones" to detect missing data. # Check if burst buffer has expected length else create it - if len(ARQ.rx_burst_buffer) != self.rx_n_frames_per_burst: - ARQ.rx_burst_buffer = [None] * self.rx_n_frames_per_burst + if len(self.arq_rx_burst_buffer) != self.rx_n_frames_per_burst: + self.arq_rx_burst_buffer = [None] * self.rx_n_frames_per_burst # Append data to rx burst buffer - ARQ.rx_burst_buffer[self.rx_n_frame_of_burst] = data_in[self.arq_burst_header_size:] # type: ignore + self.arq_rx_burst_buffer[self.rx_n_frame_of_burst] = data_in[self.arq_burst_header_size:] # type: ignore self.dxgrid = b'------' helpers.add_to_heard_stations( @@ -740,16 +743,16 @@ class DATA: # Check if we received all frames in the burst by checking if burst buffer has no more "Nones" # This is the ideal case because we received all data - if None not in ARQ.rx_burst_buffer: + if None not in self.arq_rx_burst_buffer: # then iterate through burst buffer and stick the burst together # the temp burst buffer is needed for checking, if we already received data temp_burst_buffer = b"" - for value in ARQ.rx_burst_buffer: - # ARQ.rx_frame_buffer += ARQ.rx_burst_buffer[i] + for value in self.arq_rx_burst_buffer: + # self.arq_rx_frame_buffer += self.arq_rx_burst_buffer[i] temp_burst_buffer += bytes(value) # type: ignore # free up burst buffer - ARQ.rx_burst_buffer = [] + self.arq_rx_burst_buffer = [] # TODO Needs to be removed as soon as mode error is fixed # catch possible modem error which leads into false byteorder @@ -759,7 +762,7 @@ class DATA: # This might only work for datac1 and datac3 try: # area_of_interest = (modem.get_bytes_per_frame(self.mode_list[speed_level] - 1) -3) * 2 - if ARQ.rx_frame_buffer.endswith(temp_burst_buffer[:246]) and len(temp_burst_buffer) >= 246: + if self.arq_rx_frame_buffer.endswith(temp_burst_buffer[:246]) and len(temp_burst_buffer) >= 246: self.log.warning( "[Modem] ARQ | RX | wrong byteorder received - dropping data" ) @@ -771,12 +774,12 @@ class DATA: ) self.log.debug("[Modem] temp_burst_buffer", buffer=temp_burst_buffer) - self.log.debug("[Modem] ARQ.rx_frame_buffer", buffer=ARQ.rx_frame_buffer) + self.log.debug("[Modem] self.arq_rx_frame_buffer", buffer=self.arq_rx_frame_buffer) # if frame buffer ends not with the current frame, we are going to append new data # if data already exists, we received the frame correctly, # but the ACK frame didn't receive its destination (ISS) - if ARQ.rx_frame_buffer.endswith(temp_burst_buffer): + if self.arq_rx_frame_buffer.endswith(temp_burst_buffer): self.log.info( "[Modem] ARQ | RX | Frame already received - sending ACK again" ) @@ -785,13 +788,13 @@ class DATA: # Here we are going to search for our data in the last received bytes. # This reduces the chance we will lose the entire frame in the case of signalling frame loss - # ARQ.rx_frame_buffer --> existing data + # self.arq_rx_frame_buffer --> existing data # temp_burst_buffer --> new data # search_area --> area where we want to search search_area = self.arq_burst_last_payload * self.rx_n_frames_per_burst - search_position = len(ARQ.rx_frame_buffer) - search_area + search_position = len(self.arq_rx_frame_buffer) - search_area # if search position < 0, then search position = 0 search_position = max(0, search_position) @@ -801,12 +804,12 @@ class DATA: # we are going to only check position against minimum data frame payload # use case: receive data, which already contains received data # while the payload of data received before is shorter than actual payload - get_position = ARQ.rx_frame_buffer[search_position:].rfind( + get_position = self.arq_rx_frame_buffer[search_position:].rfind( temp_burst_buffer[:self.arq_burst_minimum_payload] ) # if we find data, replace it at this position with the new data and strip it if get_position >= 0: - ARQ.rx_frame_buffer = ARQ.rx_frame_buffer[ + self.arq_rx_frame_buffer = self.arq_rx_frame_buffer[ : search_position + get_position ] self.log.warning( @@ -817,7 +820,7 @@ class DATA: else: self.log.debug("[Modem] ARQ | RX | appending data to buffer") - ARQ.rx_frame_buffer += temp_burst_buffer + self.arq_rx_frame_buffer += temp_burst_buffer self.arq_burst_last_payload = len(temp_burst_buffer) @@ -837,8 +840,8 @@ class DATA: self.burst_last_received = int(time.time()) # Create and send ACK frame - self.log.info("[Modem] ARQ | RX | SENDING ACK", finished=ARQ.arq_seconds_until_finish, - bytesperminute=ARQ.bytes_per_minute) + self.log.info("[Modem] ARQ | RX | SENDING ACK", finished=self.states.arq_seconds_until_finish, + bytesperminute=self.states.arq_bytes_per_minute) self.send_burst_ack_frame(snr) @@ -847,7 +850,7 @@ class DATA: # calculate statistics self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(ARQ.rx_frame_buffer), snr + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr ) # send a network message with information @@ -856,12 +859,12 @@ class DATA: arq="transmission", status="receiving", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), - finished=ARQ.arq_seconds_until_finish, + finished=self.states.arq_seconds_until_finish, irs=helpers.bool_to_string(self.is_IRS) ) else: @@ -874,8 +877,8 @@ class DATA: # We have a BOF and EOF flag in our data. If we received both we received our frame. # In case of loosing data, but we received already a BOF and EOF we need to make sure, we # received the complete last burst by checking it for Nones - bof_position = ARQ.rx_frame_buffer.find(self.data_frame_bof) - eof_position = ARQ.rx_frame_buffer.find(self.data_frame_eof) + bof_position = self.arq_rx_frame_buffer.find(self.data_frame_bof) + eof_position = self.arq_rx_frame_buffer.find(self.data_frame_eof) # get total bytes per transmission information as soon we received a frame with a BOF @@ -884,7 +887,7 @@ class DATA: if ( bof_position >= 0 and eof_position > 0 - and None not in ARQ.rx_burst_buffer + and None not in self.arq_rx_burst_buffer ): self.log.debug( "[Modem] arq_data_received:", @@ -895,14 +898,14 @@ class DATA: self.rx_frame_eof_received = True # Extract raw data from buffer - payload = ARQ.rx_frame_buffer[ + payload = self.arq_rx_frame_buffer[ bof_position + len(self.data_frame_bof): eof_position ] # Get the data frame crc data_frame_crc = payload[:4] # 0:4 = 4 bytes # Get the data frame length frame_length = int.from_bytes(payload[4:8], "big") # 4:8 = 4 bytes - ARQ.total_bytes = frame_length + self.states.set("arq_total_bytes", frame_length) # 8:9 = compression factor data_frame = payload[9:] @@ -948,16 +951,16 @@ class DATA: received=data_frame_crc_received.hex(), nacks=self.frame_nack_counter, duration=duration, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, data=data_frame, ) if self.enable_stats: self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration) - self.log.info("[Modem] ARQ | RX | Sending NACK", finished=ARQ.arq_seconds_until_finish, - bytesperminute=ARQ.bytes_per_minute) + self.log.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish, + bytesperminute=self.states.arq_bytes_per_minute) self.send_burst_nack_frame(snr) # Update arq_session timestamp @@ -967,17 +970,17 @@ class DATA: self.arq_cleanup() def arq_extract_statistics_from_data_frame(self, bof_position, eof_position, snr): - payload = ARQ.rx_frame_buffer[ + payload = self.arq_rx_frame_buffer[ bof_position + len(self.data_frame_bof): eof_position ] frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes - ARQ.total_bytes = frame_length + self.states.set("arq_total_bytes", frame_length) compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes # limit to max value of 255 compression_factor = np.clip(compression_factor, 0, 255) - ARQ.arq_compression_factor = compression_factor / 10 + self.arq_compression_factor = compression_factor / 10 self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(ARQ.rx_frame_buffer), snr + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr ) def check_if_mode_fits_to_busy_slot(self): @@ -1022,7 +1025,7 @@ class DATA: if not self.check_if_mode_fits_to_busy_slot(): self.speed_level = current_speed_level - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) # Update modes we are listening to self.set_listening_modes(False, True, self.mode_list[self.speed_level]) @@ -1045,14 +1048,14 @@ class DATA: duration = time.time() - self.rx_start_of_transmission self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(ARQ.rx_frame_buffer), snr + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr ) self.log.info("[Modem] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter, - bytesperminute=ARQ.bytes_per_minute, total_bytes=ARQ.total_bytes, duration=duration, hmac_signed=signed) + bytesperminute=self.states.arq_bytes_per_minute, total_bytes=self.states.arq_total_bytes, duration=duration, hmac_signed=signed) # Decompress the data frame data_frame_decompressed = lzma.decompress(data_frame) - ARQ.arq_compression_factor = len(data_frame_decompressed) / len( + self.arq_compression_factor = len(data_frame_decompressed) / len( data_frame ) data_frame = data_frame_decompressed @@ -1066,13 +1069,13 @@ class DATA: # check if RX_BUFFER isn't full if not RX_BUFFER.full(): # make sure we have always the correct buffer size - RX_BUFFER.maxsize = int(ARQ.rx_buffer_size) + RX_BUFFER.maxsize = int(self.arq_rx_buffer_size) else: # if full, free space by getting an item self.log.info( "[Modem] ARQ | RX | RX_BUFFER FULL - dropping old data", buffer_size=RX_BUFFER.qsize(), - maxsize=int(ARQ.rx_buffer_size) + maxsize=int(self.arq_rx_buffer_size) ) RX_BUFFER.get() @@ -1104,11 +1107,11 @@ class DATA: self.dxgrid, base64_data, signed, - ARQ.arq_compression_factor, - ARQ.bytes_per_minute, + self.arq_compression_factor, + self.states.arq_bytes_per_minute, duration, self.frame_nack_counter, - ARQ.speed_list + self.states.arq_speed_list ] ) except Exception as e: @@ -1127,35 +1130,15 @@ class DATA: data=base64_data ) - if ARQ.arq_save_to_folder: - try: - self.save_data_to_folder( - self.transmission_uuid, - timestamp, - self.mycallsign, - self.dxcallsign, - self.dxgrid, - data_frame - ) - except Exception as e: - self.log.error( - "[Modem] ARQ | RX | can't save file to folder", - e=e, - uuid=self.transmission_uuid, - timestamp=timestamp, - dxcall=self.dxcallsign, - dxgrid=self.dxgrid, - data=base64_data - ) self.send_data_to_socket_queue( freedata="modem-message", arq="transmission", status="received", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, timestamp=timestamp, finished=0, mycallsign=str(self.mycallsign, "UTF-8"), @@ -1166,7 +1149,7 @@ class DATA: hmac_signed=signed, duration=duration, nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) if self.enable_stats: @@ -1179,7 +1162,7 @@ class DATA: self.send_data_ack_frame(snr) # Update statistics AFTER the frame ACK is sent self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(ARQ.rx_frame_buffer), snr + self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr ) self.log.info( @@ -1210,25 +1193,26 @@ class DATA: # Maximum number of retries to send before declaring a frame is lost # save len of data_out to TOTAL_BYTES for our statistics - ARQ.total_bytes = len(data_out) + self.states.set("arq_total_bytes", len(data_out)) + self.arq_file_transfer = True frame_total_size = len(data_out).to_bytes(4, byteorder="big") # Compress data frame data_frame_compressed = lzma.compress(data_out) compression_factor = len(data_out) / len(data_frame_compressed) - ARQ.arq_compression_factor = np.clip(compression_factor, 0, 255) - compression_factor = bytes([int(ARQ.arq_compression_factor * 10)]) + self.arq_compression_factor = np.clip(compression_factor, 0, 255) + compression_factor = bytes([int(self.arq_compression_factor * 10)]) self.send_data_to_socket_queue( freedata="modem-message", arq="transmission", status="transmitting", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, - finished=ARQ.arq_seconds_until_finish, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, + finished=self.states.arq_seconds_until_finish, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS) @@ -1236,7 +1220,7 @@ class DATA: self.log.info( "[Modem] | TX | DATACHANNEL", - Bytes=ARQ.total_bytes, + Bytes=self.states.arq_total_bytes, ) data_out = data_frame_compressed @@ -1284,7 +1268,7 @@ class DATA: self.speed_level = min(self.speed_level, len(self.mode_list) - 1) self.speed_level = max(self.speed_level, 0) - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) data_mode = self.mode_list[self.speed_level] self.log.debug( @@ -1334,9 +1318,9 @@ class DATA: self.log.error("[Modem] ----- data buffer offset:", iss_buffer_pos=bufferposition, irs_bufferposition=self.irs_buffer_position) # only adjust buffer position for experimental versions - if 'exp' in Modem.version: - self.log.warning("[Modem] ----- data adjustment disabled!") - # bufferposition = self.irs_buffer_position + if self.enable_experimental_features: + self.log.warning("[Modem] ----- data adjustment enabled!") + bufferposition = self.irs_buffer_position bufferposition_end = bufferposition + payload_per_frame - len(arqheader) @@ -1442,10 +1426,10 @@ class DATA: arq="transmission", status="transmitting", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, - finished=ARQ.arq_seconds_until_finish, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, + finished=self.states.arq_seconds_until_finish, irs_snr=self.burst_ack_snr, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), @@ -1484,22 +1468,22 @@ class DATA: arq="transmission", status="transmitted", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, - finished=ARQ.arq_seconds_until_finish, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, + finished=self.states.arq_seconds_until_finish, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS), nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) self.log.info( "[Modem] ARQ | TX | DATA TRANSMITTED!", - BytesPerMinute=ARQ.bytes_per_minute, - total_bytes=ARQ.total_bytes, - BitsPerSecond=ARQ.arq_bits_per_second, + BytesPerMinute=self.states.arq_bytes_per_minute, + total_bytes=self.states.arq_total_bytes, + BitsPerSecond=self.states.arq_bits_per_second, ) # finally do an arq cleanup @@ -1514,14 +1498,14 @@ class DATA: arq="transmission", status="failed", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS), nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) self.log.info( @@ -1589,7 +1573,7 @@ class DATA: # self.log.info("SNR ON IRS", snr=self.burst_ack_snr) self.speed_level = int.from_bytes(bytes(data_in[3:4]), "big") - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) def frame_ack_received( self, data_in: bytes, snr # pylint: disable=unused-argument, @@ -1626,13 +1610,13 @@ class DATA: arq="transmission", status="failed", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS), nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) self.dxgrid = b'------' @@ -1649,14 +1633,14 @@ class DATA: arq="transmission", status="failed", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS), nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) # Update data_channel timestamp self.arq_session_last_received = int(time.time()) @@ -1746,7 +1730,7 @@ class DATA: self.open_session() # wait until data channel is open - while not ARQ.arq_session and not self.arq_session_timeout: + while not self.states.is_arq_session and not self.arq_session_timeout: threading.Event().wait(0.01) self.states.set("arq_session_state", "connecting") self.send_data_to_socket_queue( @@ -1756,7 +1740,7 @@ class DATA: mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), ) - if ARQ.arq_session and self.states.arq_session_state == "connected": + if self.states.is_arq_session and self.states.arq_session_state == "connected": # self.states.set("arq_session_state", "connected") self.send_data_to_socket_queue( freedata="modem-message", @@ -1809,7 +1793,7 @@ class DATA: connection_frame[5:8] = self.mycallsign_crc connection_frame[8:14] = helpers.callsign_to_bytes(self.mycallsign) - while not ARQ.arq_session: + while not self.states.is_arq_session: threading.Event().wait(0.01) for attempt in range(self.session_connect_max_retries): self.log.info( @@ -1834,13 +1818,13 @@ class DATA: self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - # Wait for a time, looking to see if `ARQ.arq_session` + # Wait for a time, looking to see if `self.states.is_arq_session` # indicates we've received a positive response from the far station. timeout = time.time() + 3 while time.time() < timeout: threading.Event().wait(0.01) # Stop waiting if data channel is opened - if ARQ.arq_session: + if self.states.is_arq_session: return True # Stop waiting and interrupt if data channel is getting closed while opening @@ -1852,11 +1836,11 @@ class DATA: # Session connect timeout, send close_session frame to # attempt to clean up the far-side, if it received the # open_session frame and can still hear us. - if not ARQ.arq_session: + if not self.states.is_arq_session: self.close_session() return False - # Given the while condition, it will only exit when `ARQ.arq_session` is True + # Given the while condition, it will only exit when `self.states.is_arq_session` is True self.send_data_to_socket_queue( freedata="modem-message", arq="session", @@ -1882,7 +1866,7 @@ class DATA: # Station B already tries connecting to Station A. # For avoiding ignoring repeated connect request in case of packet loss # we are only ignoring packets in case we are ISS - if ARQ.arq_session and self.IS_ARQ_SESSION_MASTER: + if self.states.is_arq_session and self.IS_ARQ_SESSION_MASTER: return False self.IS_ARQ_SESSION_MASTER = False @@ -1916,7 +1900,7 @@ class DATA: + "]", self.states.arq_session_state, ) - ARQ.arq_session = True + self.states.is_arq_session = True self.states.set("is_modem_busy", True) self.send_data_to_socket_queue( @@ -1950,7 +1934,7 @@ class DATA: ) self.IS_ARQ_SESSION_MASTER = False - ARQ.arq_session = False + self.states.is_arq_session = False # we need to send disconnect frame before doing arq cleanup # we would lose our session id then @@ -2004,12 +1988,12 @@ class DATA: ) self.IS_ARQ_SESSION_MASTER = False - ARQ.arq_session = False + self.states.is_arq_session = False self.arq_cleanup() def transmit_session_heartbeat(self) -> None: """Send ARQ sesion heartbeat while connected""" - # ARQ.arq_session = True + # self.states.is_arq_session = True # self.states.set("is_modem_busy", True) # self.states.set("arq_session_state", "connected") @@ -2059,7 +2043,7 @@ class DATA: dxcallsign=str(self.dxcallsign, 'UTF-8'), ) - ARQ.arq_session = True + self.states.is_arq_session = True self.states.set("arq_session_state", "connected") self.states.set("is_modem_busy", True) @@ -2121,7 +2105,7 @@ class DATA: # wait a moment for the case, a heartbeat is already on the way back to us # this makes channel establishment more clean - if ARQ.arq_session: + if self.states.is_arq_session: threading.Event().wait(2.5) # init arq state event @@ -2147,14 +2131,14 @@ class DATA: status="failed", reason="unknown", uuid=self.transmission_uuid, - percent=ARQ.arq_transmission_percent, - bytesperminute=ARQ.bytes_per_minute, - compression=ARQ.arq_compression_factor, + percent=self.states.arq_transmission_percent, + bytesperminute=self.states.arq_bytes_per_minute, + compression=self.arq_compression_factor, mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS), nacks=self.frame_nack_counter, - speed_list=ARQ.speed_list + speed_list=self.states.arq_speed_list ) self.log.warning( @@ -2189,7 +2173,7 @@ class DATA: self.is_IRS = False # init a new random session id if we are not in an arq session - if not ARQ.arq_session: + if not self.states.is_arq_session: self.session_id = np.random.bytes(1) # Update data_channel timestamp @@ -2264,12 +2248,12 @@ class DATA: # is intended for this station. # stop processing if we don't want to respond to a call when not in a arq session - if not self.respond_to_call and not ARQ.arq_session: + if not self.respond_to_call and not self.states.is_arq_session: return False # stop processing if not in arq session, but modem state is busy and we have a different session id # use-case we get a connection request while connecting to another station - if not ARQ.arq_session and self.states.is_modem_busy and data_in[13:14] != self.session_id: + if not self.states.is_arq_session and self.states.is_modem_busy and data_in[13:14] != self.session_id: return False self.arq_file_transfer = True @@ -2406,7 +2390,7 @@ class DATA: connection_frame[:1] = frametype connection_frame[1:2] = self.session_id connection_frame[8:9] = bytes([self.speed_level]) - connection_frame[13:14] = bytes([ARQ.arq_protocol_version]) + connection_frame[13:14] = bytes([self.arq_protocol_version]) self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) @@ -2446,7 +2430,7 @@ class DATA: """ protocol_version = int.from_bytes(bytes(data_in[13:14]), "big") - if protocol_version == ARQ.arq_protocol_version: + if protocol_version == self.arq_protocol_version: self.send_data_to_socket_queue( freedata="modem-message", arq="transmission", @@ -2512,7 +2496,7 @@ class DATA: self.log.warning( "[Modem] protocol version mismatch:", received=protocol_version, - own=ARQ.arq_protocol_version, + own=self.arq_protocol_version, ) self.stop_transmission() @@ -2757,7 +2741,7 @@ class DATA: threading.Event().wait(0.5) while self.states.is_beacon_running: if ( - not ARQ.arq_session + not self.states.is_arq_session and not self.arq_file_transfer and not self.beacon_paused #and not self.states.channel_busy @@ -3047,14 +3031,14 @@ class DATA: transmission_percent: float """ try: - if ARQ.total_bytes == 0: - ARQ.total_bytes = 1 - ARQ.arq_transmission_percent = min( + if self.states.arq_total_bytes == 0: + self.states.set("arq_total_bytes", 1) + arq_transmission_percent = min( int( ( receivedbytes - * ARQ.arq_compression_factor - / ARQ.total_bytes + * self.arq_compression_factor + / self.states.arq_total_bytes ) * 100 ), @@ -3064,31 +3048,36 @@ class DATA: transmissiontime = time.time() - self.rx_start_of_transmission if receivedbytes > 0: - ARQ.arq_bits_per_second = int((receivedbytes * 8) / transmissiontime) - ARQ.bytes_per_minute = int( + arq_bits_per_second = int((receivedbytes * 8) / transmissiontime) + bytes_per_minute = int( receivedbytes / (transmissiontime / 60) ) - ARQ.arq_seconds_until_finish = int(((ARQ.total_bytes - receivedbytes) / ( - ARQ.bytes_per_minute * ARQ.arq_compression_factor)) * 60) - 20 # offset because of frame ack/nack + arq_seconds_until_finish = int(((self.states.arq_total_bytes - receivedbytes) / ( + bytes_per_minute * self.arq_compression_factor)) * 60) - 20 # offset because of frame ack/nack - speed_chart = {"snr": snr, "bpm": ARQ.bytes_per_minute, "timestamp": int(time.time())} + speed_chart = {"snr": snr, "bpm": bytes_per_minute, "timestamp": int(time.time())} # check if data already in list - if speed_chart not in ARQ.speed_list: - ARQ.speed_list.append(speed_chart) + if speed_chart not in self.states.arq_speed_list: + self.states.arq_speed_list.append(speed_chart) else: - ARQ.arq_bits_per_second = 0 - ARQ.bytes_per_minute = 0 - ARQ.arq_seconds_until_finish = 0 + arq_bits_per_second = 0 + bytes_per_minute = 0 + arq_seconds_until_finish = 0 except Exception as err: self.log.error(f"[Modem] calculate_transfer_rate_rx: Exception: {err}") - ARQ.arq_transmission_percent = 0.0 - ARQ.arq_bits_per_second = 0 - ARQ.bytes_per_minute = 0 + arq_transmission_percent = 0.0 + arq_bits_per_second = 0 + bytes_per_minute = 0 + + self.states.set("arq_bits_per_second", arq_bits_per_second) + self.states.set("bytes_per_minute", bytes_per_minute) + self.states.set("arq_transmission_percent", arq_transmission_percent) + self.states.set("arq_compression_factor", self.arq_compression_factor) return [ - ARQ.arq_bits_per_second, - ARQ.bytes_per_minute, - ARQ.arq_transmission_percent, + arq_bits_per_second, + bytes_per_minute, + arq_transmission_percent, ] def reset_statistics(self) -> None: @@ -3096,13 +3085,13 @@ class DATA: Reset statistics """ # reset ARQ statistics - ARQ.bytes_per_minute_burst = 0 - ARQ.bytes_per_minute = 0 - ARQ.arq_bits_per_second_burst = 0 - ARQ.arq_bits_per_second = 0 - ARQ.arq_transmission_percent = 0 - ARQ.total_bytes = 0 - ARQ.arq_seconds_until_finish = 0 + self.states.set("bytes_per_minute_burst", 0) + self.states.set("arq_total_bytes", 0) + self.states.set("self.states.arq_seconds_until_finish", 0) + self.states.set("arq_bits_per_second", 0) + self.states.set("bytes_per_minute", 0) + self.states.set("arq_transmission_percent", 0) + self.states.set("arq_compression_factor", 0) def calculate_transfer_rate_tx( self, tx_start_of_transmission: float, sentbytes: int, tx_buffer_length: int @@ -3120,40 +3109,39 @@ class DATA: transmission_percent: float """ try: - ARQ.arq_transmission_percent = min( + arq_transmission_percent = min( int((sentbytes / tx_buffer_length) * 100), 100 ) transmissiontime = time.time() - tx_start_of_transmission if sentbytes > 0: - ARQ.arq_bits_per_second = int((sentbytes * 8) / transmissiontime) - ARQ.bytes_per_minute = int(sentbytes / (transmissiontime / 60)) - ARQ.arq_seconds_until_finish = int(((tx_buffer_length - sentbytes) / ( - ARQ.bytes_per_minute * ARQ.arq_compression_factor)) * 60) + arq_bits_per_second = int((sentbytes * 8) / transmissiontime) + bytes_per_minute = int(sentbytes / (transmissiontime / 60)) + arq_seconds_until_finish = int(((tx_buffer_length - sentbytes) / ( + bytes_per_minute * self.arq_compression_factor)) * 60) - speed_chart = {"snr": self.burst_ack_snr, "bpm": ARQ.bytes_per_minute, + speed_chart = {"snr": self.burst_ack_snr, "bpm": bytes_per_minute, "timestamp": int(time.time())} # check if data already in list - if speed_chart not in ARQ.speed_list: - ARQ.speed_list.append(speed_chart) + if speed_chart not in self.states.arq_speed_list: + self.states.arq_speed_list.append(speed_chart) else: - ARQ.arq_bits_per_second = 0 - ARQ.bytes_per_minute = 0 - ARQ.arq_seconds_until_finish = 0 + arq_bits_per_second = 0 + bytes_per_minute = 0 + arq_seconds_until_finish = 0 except Exception as err: self.log.error(f"[Modem] calculate_transfer_rate_tx: Exception: {err}") - ARQ.arq_transmission_percent = 0.0 - ARQ.arq_bits_per_second = 0 - ARQ.bytes_per_minute = 0 + arq_transmission_percent = 0.0 + arq_bits_per_second = 0 + bytes_per_minute = 0 - return [ - ARQ.arq_bits_per_second, - ARQ.bytes_per_minute, - ARQ.arq_transmission_percent, - ] + self.states.set("arq_bits_per_second", arq_bits_per_second) + self.states.set("bytes_per_minute", bytes_per_minute) + self.states.set("arq_transmission_percent", arq_transmission_percent) + self.states.set("arq_compression_factor", self.arq_compression_factor) # ----------------------CLEANUP AND RESET FUNCTIONS def arq_cleanup(self) -> None: @@ -3174,8 +3162,8 @@ class DATA: self.rpt_request_received = False self.burst_rpt_counter = 0 self.data_frame_ack_received = False - ARQ.rx_burst_buffer = [] - ARQ.rx_frame_buffer = b"" + self.arq_rx_burst_buffer = [] + self.arq_rx_frame_buffer = b"" self.burst_ack_snr = 0 self.arq_burst_last_payload = 0 self.rx_n_frame_of_burst = 0 @@ -3196,7 +3184,7 @@ class DATA: self.frame_nack_counter = 0 self.frame_received_counter = 0 self.speed_level = len(self.mode_list) - 1 - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) # low bandwidth mode indicator self.received_LOW_BANDWIDTH_MODE = False @@ -3209,14 +3197,14 @@ class DATA: self.data_channel_max_retries = 10 # we need to keep these values if in ARQ_SESSION - if not ARQ.arq_session: + if not self.states.is_arq_session: self.states.set("is_modem_busy", False) self.dxcallsign = b"AA0AA-0" self.mycallsign = self.mycallsign self.session_id = bytes(1) self.states.set("arq_session_state", "disconnected") - ARQ.speed_list = [] + self.states.arq_speed_list = [] self.states.set("is_arq_state", False) self.arq_state_event = threading.Event() self.arq_file_transfer = False @@ -3318,7 +3306,7 @@ class DATA: # We want to reach this state only if connected ( == return above not called ) if self.rx_n_frames_per_burst > 1: # uses case for IRS: reduce time for waiting by counting "None" in burst buffer - frames_left = ARQ.rx_burst_buffer.count(None) + frames_left = self.arq_rx_burst_buffer.count(None) elif self.rx_n_frame_of_burst == 0 and self.rx_n_frames_per_burst == 0: # use case for IRS: We didn't receive a burst yet, because the first one got lost # in this case we don't have any information about the expected burst length @@ -3372,9 +3360,9 @@ class DATA: ) print( - f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {ARQ.rx_burst_buffer.count(None)}") + f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {self.arq_rx_burst_buffer.count(None)}") # check if we have N frames per burst > 1 - if self.rx_n_frames_per_burst > 1 and self.burst_rpt_counter < 3 and ARQ.rx_burst_buffer.count(None) > 0: + if self.rx_n_frames_per_burst > 1 and self.burst_rpt_counter < 3 and self.arq_rx_burst_buffer.count(None) > 0: # reset self.burst_last_received self.burst_last_received = time.time() + self.time_list[self.speed_level] * frames_left self.burst_rpt_counter += 1 @@ -3391,7 +3379,7 @@ class DATA: if self.burst_nack_counter >= 2: self.burst_nack_counter = 0 self.speed_level = max(self.speed_level - 1, 0) - ARQ.arq_speed_level = self.speed_level + self.states.set("arq_speed_level", self.speed_level) # TODO Create better mechanisms for handling n frames per burst for bad channels # reduce frames per burst @@ -3432,7 +3420,7 @@ class DATA: ): timeleft = int((self.data_channel_last_received + self.transmission_timeout) - time.time()) - ARQ.arq_seconds_until_timeout = timeleft + self.states.set("arq_seconds_until_timeout", timeleft) if timeleft % 10 == 0: self.log.debug("Time left until channel timeout", seconds=timeleft) @@ -3466,7 +3454,7 @@ class DATA: ARQ SESSION """ if ( - ARQ.arq_session + self.states.is_arq_session and self.states.is_modem_busy and not self.arq_file_transfer ): @@ -3500,7 +3488,7 @@ class DATA: while not self.arq_file_transfer: threading.Event().wait(0.01) if ( - ARQ.arq_session + self.states.is_arq_session and self.IS_ARQ_SESSION_MASTER and self.states.arq_session_state == "connected" # and not self.arq_file_transfer @@ -3566,82 +3554,3 @@ class DATA: ) else: return False - - - def save_data_to_folder(self, - transmission_uuid, - timestamp, - mycallsign, - dxcallsign, - dxgrid, - data_frame - ): - - """ - Save received data to folder - Also supports chat messages - """ - - try: - - self.log.info("[Modem] ARQ | RX | saving data to folder") - - mycallsign = str(mycallsign, "UTF-8") - dxcallsign = str(dxcallsign, "UTF-8") - - folder_path = "received" - if not os.path.exists(folder_path): - os.makedirs(folder_path) - - callsign_path = f"{mycallsign}_{dxcallsign}" - if not os.path.exists(f"{folder_path}/{callsign_path}"): - os.makedirs(f"{folder_path}/{callsign_path}") - - split_char = b"\0;\1;" - n_objects = 9 - decoded_data = data_frame.split(split_char) - # if we have a false positive in case our split_char is available in data - # lets stick the data together, so we are not loosing it - if len(decoded_data) > n_objects: - file_data = b''.join(decoded_data[n_objects:]) - - # slice is crashing nuitka - # decoded_data = [*decoded_data[:n_objects], file_data] - decoded_data = decoded_data[:n_objects] + [file_data] - - if decoded_data[0] in [b'm']: - checksum_delivered = str(decoded_data[2], "utf-8").lower() - # transmission_uuid = decoded_data[3] - message = decoded_data[5] - filename = decoded_data[6] - # filetype = decoded_data[7] - # timestamp = decoded_data[4] - data = decoded_data[8] - else: - message = b'' - filename = b'' - - # save file to folder - if filename not in [b'', b'undefined']: - # doing crc check - crc = helpers.get_crc_32(data).hex().lower() - validity = checksum_delivered == crc - self.log.info( - "[Modem] ARQ | RX | checking data crc", - crc_delivered=checksum_delivered, - crc_calculated=crc, - valid=validity, - ) - filename = str(filename, "UTF-8") - filename_complex = f"{timestamp}_{transmission_uuid}_{filename}" - with open(f"{folder_path}/{callsign_path}/{filename_complex}", "wb") as file: - file.write(data) - - if message not in [b'', b'undefined']: - # save message to folder - message_name = f"{timestamp}_{transmission_uuid}_msg.txt" - with open(f"{folder_path}/{callsign_path}/{message_name}", "wb") as file: - file.write(message) - - except Exception as e: - self.log.error("[Modem] error saving data to folder", e=e) \ No newline at end of file diff --git a/modem/state_manager.py b/modem/state_manager.py index ee7b6704..af6019c3 100644 --- a/modem/state_manager.py +++ b/modem/state_manager.py @@ -20,11 +20,20 @@ class STATES: self.is_arq_state = False self.is_arq_session = False self.is_transmitting = False - self.arq_session_state = 'disconnected' self.audio_dbfs = 0 self.dxcallsign: bytes = b"ZZ9YY-0" self.dxgrid: bytes = b"------" + self.arq_session_state = 'disconnected' + self.arq_speed_level = 0 + self.arq_total_bytes = 0 + self.arq_bits_per_second = 0 + self.arq_bytes_per_minute = 0 + self.arq_transmission_percent = 0 + self.arq_compression_factor = 0 + self.arq_speed_list = [] + self.arq_seconds_until_timeout = 0 + self.radio_frequency = 0 self.radio_mode = None self.radio_bandwidth = 0