diff --git a/tnc/data_handler.py b/tnc/data_handler.py index c56ff058..d9fe8db0 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -56,6 +56,9 @@ class DATA: self.arq_session_timeout = 30 self.session_connect_max_retries = 15 + # actual n retries of burst + self.tx_n_retry_of_burst = 0 + self.transmission_uuid = "" self.data_channel_last_received = 0.0 # time of last "live sign" of a frame @@ -75,6 +78,7 @@ class DATA: # 3 bytes for the EOF End of File indicator in a data frame self.data_frame_eof = b"EOF" + self.tx_n_max_retries_per_burst = 50 self.rx_n_max_retries_per_burst = 50 self.n_retries_per_burst = 0 @@ -131,6 +135,10 @@ class DATA: self.rx_frame_bof_received = False self.rx_frame_eof_received = False + # TIMEOUTS + self.burst_ack_timeout_seconds = 3.0 # timeout for burst acknowledges + self.data_frame_ack_timeout_seconds = 3.0 # timeout for data frame acknowledges + self.rpt_ack_timeout_seconds = 3.0 # timeout for rpt frame acknowledges self.transmission_timeout = 360 # transmission timeout in seconds # Dictionary of functions and log messages used in process_data @@ -294,16 +302,16 @@ class DATA: or _valid3 or frametype in [ - FR_TYPE.CQ.value, - FR_TYPE.QRV.value, - FR_TYPE.PING.value, - FR_TYPE.BEACON.value, - ] + FR_TYPE.CQ.value, + FR_TYPE.QRV.value, + FR_TYPE.PING.value, + FR_TYPE.BEACON.value, + ] ): # CHECK IF FRAMETYPE IS BETWEEN 10 and 50 ------------------------ - frame = frametype - 10 - n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big") + # frame = frametype - 10 + # n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big") # Dispatch activity based on received frametype if frametype in self.rx_dispatcher: @@ -403,8 +411,8 @@ class DATA: """Build and send ACK frame for burst DATA frame""" ack_frame = bytearray(self.length_sig_frame) ack_frame[:1] = bytes([FR_TYPE.BURST_ACK.value]) - #ack_frame[1:4] = static.DXCALLSIGN_CRC - #ack_frame[4:7] = static.MYCALLSIGN_CRC + # ack_frame[1:4] = static.DXCALLSIGN_CRC + # ack_frame[4:7] = static.MYCALLSIGN_CRC ack_frame[1:2] = self.session_id ack_frame[7:8] = bytes([int(snr)]) ack_frame[8:9] = bytes([int(self.speed_level)]) @@ -417,8 +425,8 @@ class DATA: ack_frame = bytearray(self.length_sig_frame) ack_frame[:1] = bytes([FR_TYPE.FR_ACK.value]) ack_frame[1:2] = self.session_id - #ack_frame[1:4] = static.DXCALLSIGN_CRC - #ack_frame[4:7] = static.MYCALLSIGN_CRC + # ack_frame[1:4] = static.DXCALLSIGN_CRC + # ack_frame[4:7] = static.MYCALLSIGN_CRC ack_frame[7:8] = bytes([int(snr)]) ack_frame[8:9] = bytes([int(self.speed_level)]) @@ -445,8 +453,8 @@ class DATA: rpt_frame = bytearray(self.length_sig_frame) rpt_frame[:1] = bytes([FR_TYPE.FR_REPEAT.value]) rpt_frame[1:2] = self.session_id - #rpt_frame[1:4] = static.DXCALLSIGN_CRC - #rpt_frame[4:7] = static.MYCALLSIGN_CRC + # rpt_frame[1:4] = static.DXCALLSIGN_CRC + # rpt_frame[4:7] = static.MYCALLSIGN_CRC rpt_frame[7:13] = missing_frames self.log.info("[TNC] ARQ | RX | Requesting", frames=missing_frames) @@ -458,8 +466,8 @@ class DATA: nack_frame = bytearray(self.length_sig_frame) nack_frame[:1] = bytes([FR_TYPE.FR_NACK.value]) nack_frame[1:2] = self.session_id - #nack_frame[1:4] = static.DXCALLSIGN_CRC - #nack_frame[4:7] = static.MYCALLSIGN_CRC + # nack_frame[1:4] = static.DXCALLSIGN_CRC + # nack_frame[4:7] = static.MYCALLSIGN_CRC nack_frame[7:8] = bytes([int(snr)]) nack_frame[8:9] = bytes([int(self.speed_level)]) @@ -471,8 +479,8 @@ class DATA: nack_frame = bytearray(self.length_sig_frame) nack_frame[:1] = bytes([FR_TYPE.BURST_NACK.value]) nack_frame[1:2] = self.session_id - #nack_frame[1:4] = static.DXCALLSIGN_CRC - #nack_frame[4:7] = static.MYCALLSIGN_CRC + # nack_frame[1:4] = static.DXCALLSIGN_CRC + # nack_frame[4:7] = static.MYCALLSIGN_CRC nack_frame[7:8] = bytes([int(snr)]) nack_frame[8:9] = bytes([int(self.speed_level)]) @@ -484,8 +492,8 @@ class DATA: disconnection_frame = bytearray(self.length_sig_frame) disconnection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_CLOSE.value]) disconnection_frame[1:2] = self.session_id - #disconnection_frame[1:4] = static.DXCALLSIGN_CRC - #disconnection_frame[4:7] = static.MYCALLSIGN_CRC + # disconnection_frame[1:4] = static.DXCALLSIGN_CRC + # disconnection_frame[4:7] = static.MYCALLSIGN_CRC disconnection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) self.enqueue_frame_for_tx(disconnection_frame, copies=5, repeat_delay=250) @@ -526,19 +534,19 @@ class DATA: # Extract some important data from the frame # Get sequence number of burst frame - RX_N_FRAME_OF_BURST = int.from_bytes(bytes(data_in[:1]), "big") - 10 + rx_n_frame_of_burst = int.from_bytes(bytes(data_in[:1]), "big") - 10 # Get number of bursts from received frame - RX_N_FRAMES_PER_BURST = int.from_bytes(bytes(data_in[1:2]), "big") + rx_n_frames_per_burst = int.from_bytes(bytes(data_in[1:2]), "big") # The RX burst buffer needs to have a fixed length filled with "None". # We need this later for counting the "Nones" to detect missing data. # Check if burst buffer has expected length else create it - if len(static.RX_BURST_BUFFER) != RX_N_FRAMES_PER_BURST: - static.RX_BURST_BUFFER = [None] * RX_N_FRAMES_PER_BURST + if len(static.RX_BURST_BUFFER) != rx_n_frames_per_burst: + static.RX_BURST_BUFFER = [None] * rx_n_frames_per_burst # Append data to rx burst buffer # [frame_type][n_frames_per_burst][CRC24][CRC24] - static.RX_BURST_BUFFER[RX_N_FRAME_OF_BURST] = data_in[8:] # type: ignore + static.RX_BURST_BUFFER[rx_n_frame_of_burst] = data_in[8:] # type: ignore self.log.debug("[TNC] static.RX_BURST_BUFFER", buffer=static.RX_BURST_BUFFER) @@ -618,7 +626,6 @@ class DATA: ) static.ARQ_SPEED_LEVEL = self.speed_level - # Update modes we are listening to self.set_listening_modes(self.mode_list[self.speed_level]) @@ -634,7 +641,7 @@ class DATA: self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) ) - elif RX_N_FRAME_OF_BURST == RX_N_FRAMES_PER_BURST - 1: + elif rx_n_frame_of_burst == rx_n_frames_per_burst - 1: # We have "Nones" in our rx buffer, # Check if we received last frame of burst - this is an indicator for missed frames. # With this way of doing this, we always MUST receive the last @@ -642,8 +649,8 @@ class DATA: # TODO: See if a timeout on the send side with re-transmit last burst would help. self.log.debug( "[TNC] all frames in burst received:", - frame=RX_N_FRAME_OF_BURST, - frames=RX_N_FRAMES_PER_BURST, + frame=rx_n_frame_of_burst, + frames=rx_n_frames_per_burst, ) self.send_retransmit_request_frame(freedv) self.calculate_transfer_rate_rx( @@ -654,8 +661,8 @@ class DATA: else: self.log.error( "[TNC] data_handler: Should not reach this point...", - frame=RX_N_FRAME_OF_BURST, - frames=RX_N_FRAMES_PER_BURST, + frame=rx_n_frame_of_burst, + frames=rx_n_frames_per_burst, ) # We have a BOF and EOF flag in our data. If we received both we received our frame. @@ -824,13 +831,6 @@ class DATA: self.tx_n_retry_of_burst = 0 # retries we already sent data # Maximum number of retries to send before declaring a frame is lost - TX_N_MAX_RETRIES_PER_BURST = 50 - TX_N_FRAMES_PER_BURST = n_frames_per_burst # amount of n frames per burst - - # TIMEOUTS - BURST_ACK_TIMEOUT_SECONDS = 3.0 # timeout for burst acknowledges - DATA_FRAME_ACK_TIMEOUT_SECONDS = 3.0 # timeout for data frame acknowledges - RPT_ACK_TIMEOUT_SECONDS = 3.0 # timeout for rpt frame acknowledges # save len of data_out to TOTAL_BYTES for our statistics --> kBytes # static.TOTAL_BYTES = round(len(data_out) / 1024, 2) @@ -882,9 +882,9 @@ class DATA: # Iterate through data_out buffer while not self.data_frame_ack_received and static.ARQ_STATE: - # we have TX_N_MAX_RETRIES_PER_BURST attempts for sending a burst - for self.tx_n_retry_of_burst in range(TX_N_MAX_RETRIES_PER_BURST): - data_mode = mode + # we have self.tx_n_max_retries_per_burst attempts for sending a burst + for self.tx_n_retry_of_burst in range(self.tx_n_max_retries_per_burst): + # data_mode = mode # self.log.debug("[TNC] FIXED MODE:", mode=FREEDV_MODE(data_mode).name) # we are doing a modulo check of transmission retries of the actual burst @@ -921,17 +921,17 @@ class DATA: # Tempbuffer list for storing our data frames tempbuffer = [] - # Append data frames with TX_N_FRAMES_PER_BURST to tempbuffer + # Append data frames with n_frames_per_burst to tempbuffer # TODO: this part needs a complete rewrite! - # TX_N_FRAMES_PER_BURST = 1 is working + # n_frames_per_burst = 1 is working arqheader = bytearray() # arqheader[:1] = bytes([FR_TYPE.BURST_01.value + i]) arqheader[:1] = bytes([FR_TYPE.BURST_01.value]) - arqheader[1:2] = bytes([TX_N_FRAMES_PER_BURST]) + arqheader[1:2] = bytes([n_frames_per_burst]) arqheader[2:3] = self.session_id - #arqheader[2:5] = static.DXCALLSIGN_CRC - #arqheader[5:8] = static.MYCALLSIGN_CRC + # arqheader[2:5] = static.DXCALLSIGN_CRC + # arqheader[5:8] = static.MYCALLSIGN_CRC bufferposition_end = bufferposition + payload_per_frame - len(arqheader) @@ -955,7 +955,7 @@ class DATA: self.log.info( "[TNC] ARQ | TX | FRAMES", mode=FREEDV_MODE(data_mode).name, - fpb=TX_N_FRAMES_PER_BURST, + fpb=n_frames_per_burst, retry=self.tx_n_retry_of_burst, ) @@ -963,13 +963,13 @@ class DATA: self.enqueue_frame_for_tx(t_buf_item, c2_mode=data_mode) # After transmission finished, wait for an ACK or RPT frame - # burstacktimeout = time.time() + BURST_ACK_TIMEOUT_SECONDS + 100 + # burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100 # while (not self.burst_ack and not self.burst_nack and # not self.rpt_request_received and not self.data_frame_ack_received and # time.time() < burstacktimeout and static.ARQ_STATE): # time.sleep(0.01) - # burstacktimeout = time.time() + BURST_ACK_TIMEOUT_SECONDS + 100 + # burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100 while static.ARQ_STATE and not ( self.burst_ack or self.burst_nack @@ -1016,7 +1016,7 @@ class DATA: self.log.debug( "[TNC] ATTEMPT:", retry=self.tx_n_retry_of_burst, - maxretries=TX_N_MAX_RETRIES_PER_BURST, + maxretries=self.tx_n_max_retries_per_burst, overflows=static.BUFFER_OVERFLOW_COUNTER, ) # End of FOR loop @@ -1294,8 +1294,8 @@ class DATA: connection_frame = bytearray(self.length_sig_frame) connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_OPEN.value]) connection_frame[1:2] = self.session_id - #connection_frame[1:4] = static.DXCALLSIGN_CRC - #connection_frame[4:7] = static.MYCALLSIGN_CRC + # connection_frame[1:4] = static.DXCALLSIGN_CRC + # connection_frame[4:7] = static.MYCALLSIGN_CRC connection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) while not static.ARQ_SESSION: @@ -1414,7 +1414,8 @@ class DATA: # is intended for this station. # Close the session if the CRC matches the remote station in static. _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) - if _valid_crc: + _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) + if _valid_crc or _valid_session: static.ARQ_SESSION_STATE = "disconnected" helpers.add_to_heard_stations( static.DXCALLSIGN, @@ -1452,9 +1453,8 @@ class DATA: connection_frame = bytearray(self.length_sig_frame) connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value]) connection_frame[1:2] = self.session_id - #connection_frame[1:4] = static.DXCALLSIGN_CRC - #connection_frame[4:7] = static.MYCALLSIGN_CRC - + # connection_frame[1:4] = static.DXCALLSIGN_CRC + # connection_frame[4:7] = static.MYCALLSIGN_CRC self.enqueue_frame_for_tx(connection_frame) @@ -1466,6 +1466,7 @@ class DATA: """ # Accept session data if the DXCALLSIGN_CRC matches the station in static or session id. + _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) if _valid_crc or _valid_session: self.log.debug("[TNC] Received session heartbeat") @@ -1507,6 +1508,8 @@ class DATA: data_out:bytes: mode:int: n_frames_per_burst:int: + transmission_uuid:str: + mycallsign:bytes: Returns: True if the data session was opened and the data was sent @@ -1552,6 +1555,7 @@ class DATA: Args: mode:int: n_frames_per_burst:int: + mycallsign:bytes: Returns: True if the data channel was opened successfully @@ -1559,9 +1563,10 @@ class DATA: """ self.is_IRS = False - # init a new random session id - self.session_id = randbytes(1) - print(session_id) + # init a new random session id if we are not in an arq session + if not static.ARQ_SESSION: + self.session_id = randbytes(1) + print(self.session_id) # Update data_channel timestamp self.data_channel_last_received = int(time.time()) @@ -1579,7 +1584,7 @@ class DATA: connection_frame[1:4] = static.DXCALLSIGN_CRC connection_frame[4:7] = static.MYCALLSIGN_CRC connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign) - #connection_frame[13:14] = bytes([n_frames_per_burst]) + # connection_frame[13:14] = bytes([n_frames_per_burst]) connection_frame[13:14] = self.session_id while not static.ARQ_STATE: @@ -1771,8 +1776,8 @@ class DATA: connection_frame = bytearray(self.length_sig_frame) connection_frame[:1] = frametype connection_frame[1:2] = self.session_id - #connection_frame[1:4] = static.DXCALLSIGN_CRC - #connection_frame[4:7] = static.MYCALLSIGN_CRC + # connection_frame[1:4] = static.DXCALLSIGN_CRC + # connection_frame[4:7] = static.MYCALLSIGN_CRC connection_frame[8:9] = bytes([self.speed_level]) # For checking protocol version on the receiving side @@ -2009,8 +2014,8 @@ class DATA: self.log.warning("[TNC] Stopping transmission!") stop_frame = bytearray(self.length_sig_frame) stop_frame[:1] = bytes([FR_TYPE.ARQ_STOP.value]) - #stop_frame[1:4] = static.DXCALLSIGN_CRC - #stop_frame[4:7] = static.MYCALLSIGN_CRC + # stop_frame[1:4] = static.DXCALLSIGN_CRC + # stop_frame[4:7] = static.MYCALLSIGN_CRC stop_frame[1:2] = self.session_id stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) @@ -2141,7 +2146,7 @@ class DATA: """ Transmit a CQ Args: - Nothing + self Returns: Nothing @@ -2290,7 +2295,7 @@ class DATA: ( receivedbytes * static.ARQ_COMPRESSION_FACTOR - / (static.TOTAL_BYTES) + / static.TOTAL_BYTES ) * 100 ), @@ -2302,7 +2307,7 @@ class DATA: if receivedbytes > 0: static.ARQ_BITS_PER_SECOND = int((receivedbytes * 8) / transmissiontime) static.ARQ_BYTES_PER_MINUTE = int( - (receivedbytes) / (transmissiontime / 60) + receivedbytes / (transmissiontime / 60) ) else: @@ -2356,7 +2361,7 @@ class DATA: if sentbytes > 0: static.ARQ_BITS_PER_SECOND = int((sentbytes * 8) / transmissiontime) - static.ARQ_BYTES_PER_MINUTE = int((sentbytes) / (transmissiontime / 60)) + static.ARQ_BYTES_PER_MINUTE = int(sentbytes / (transmissiontime / 60)) else: static.ARQ_BITS_PER_SECOND = 0