diff --git a/tnc/data_handler.py b/tnc/data_handler.py index ed2bc89f..ba9eccdc 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -26,7 +26,7 @@ import structlog import stats import ujson as json from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS -from exceptions import NoCallsign +# from exceptions import NoCallsign from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, RX_BUFFER from static import FRAME_TYPE as FR_TYPE @@ -119,7 +119,7 @@ class DATA: # List for minimum SNR operating level for the corresponding mode in self.mode_list self.snr_list_low_bw = [-100] # List for time to wait for corresponding mode in seconds - self.time_list_low_bw = [6+5] + self.time_list_low_bw = [6 + 5] # --------------------- HIGH BANDWIDTH @@ -135,7 +135,7 @@ class DATA: # test with 6,7 --> caused sometimes a frame timeout if ack frame takes longer # TODO: Need to check why ACK frames needs more time # TODO: Adjust these times - self.time_list_high_bw = [6+5, 7, 10] + self.time_list_high_bw = [6 + 5, 7, 10] # -------------- AVAILABLE MODES END----------- # Mode list for selecting between low bandwidth ( 500Hz ) and modes with higher bandwidth @@ -224,7 +224,7 @@ class DATA: } self.command_dispatcher = { - #"CONNECT": (self.arq_session_handler, "CONNECT"), + # "CONNECT": (self.arq_session_handler, "CONNECT"), "CQ": (self.transmit_cq, "CQ"), "DISCONNECT": (self.close_session, "DISCONNECT"), "SEND_TEST_FRAME": (self.send_test_frame, "TEST"), @@ -346,18 +346,14 @@ class DATA: # [0] bytes # [1] freedv instance # [2] bytes_per_frame - self.process_data( - bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2] - ) + self.process_data(bytes_out=data[0]) - def process_data(self, bytes_out, freedv, bytes_per_frame: int) -> None: + def process_data(self, bytes_out) -> None: """ Process incoming data and decide what to do with the frame. Args: bytes_out: - freedv: - bytes_per_frame: Returns: @@ -388,11 +384,11 @@ class DATA: or _valid4 or frametype in [ - FR_TYPE.CQ.value, - FR_TYPE.QRV.value, - FR_TYPE.PING.value, - FR_TYPE.BEACON.value, - FR_TYPE.IS_WRITING.value, + FR_TYPE.CQ.value, + FR_TYPE.QRV.value, + FR_TYPE.PING.value, + FR_TYPE.BEACON.value, + FR_TYPE.IS_WRITING.value, ] ): @@ -414,9 +410,7 @@ class DATA: snr = static.SNR self.log.debug("[TNC] RX SNR", snr=snr) # send payload data to arq checker without CRC16 - self.arq_data_received( - bytes(bytes_out[:-2]), bytes_per_frame, snr, freedv - ) + self.arq_data_received(bytes(bytes_out[:-2]), snr) # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync # if static.RX_BURST_BUFFER.count(None) <= 1 or (frame+1) == n_frames_per_burst: @@ -443,7 +437,7 @@ class DATA: def enqueue_frame_for_tx( self, - frame_to_tx, # : list[bytearray], # this causes a crash on python 3.7 + frame_to_tx, # : list[bytearray], # this causes a crash on python 3.7 c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0, @@ -463,7 +457,11 @@ class DATA: print(frame_to_tx[0]) print(frame_to_tx) frame_type = FR_TYPE(int.from_bytes(frame_to_tx[0][:1], byteorder="big")).name - self.log.debug("[TNC] enqueue_frame_for_tx", c2_mode=FREEDV_MODE(c2_mode).name, data=frame_to_tx, type=frame_type) + self.log.debug("[TNC] enqueue_frame_for_tx", + c2_mode=FREEDV_MODE(c2_mode).name, + data=frame_to_tx, + type=frame_type + ) # Set the TRANSMITTING flag before adding an object to the transmit queue # TODO: This is not that nice, we could improve this somehow @@ -511,7 +509,7 @@ class DATA: # finally push data to our network queue sock.SOCKET_QUEUE.put(json_data_out) - def send_ident_frame(self, transmit) -> None: + def send_ident_frame(self, transmit) -> bytearray: """Build and send IDENT frame """ ident_frame = bytearray(self.length_sig1_frame) ident_frame[:1] = bytes([FR_TYPE.IDENT.value]) @@ -541,7 +539,6 @@ class DATA: # Transmit frame self.enqueue_frame_for_tx([ack_frame], c2_mode=FREEDV_MODE.sig1.value) - def send_data_ack_frame(self, snr) -> None: """Build and send ACK frame for received DATA frame""" @@ -559,13 +556,19 @@ class DATA: self.burst_last_received = time.time() + channel_busy_timeout + 8 # Transmit frame # TODO: Do we have to send , self.send_ident_frame(False) ? - # self.enqueue_frame_for_tx([ack_frame, self.send_ident_frame(False)], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) - self.enqueue_frame_for_tx([ack_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) + # self.enqueue_frame_for_tx([ack_frame, self.send_ident_frame(False)], + # c2_mode=FREEDV_MODE.sig1.value, + # copies=3, + # repeat_delay=0) + self.enqueue_frame_for_tx([ack_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) def send_retransmit_request_frame(self) -> None: # check where a None is in our burst buffer and do frame+1, because lists start at 0 # FIXME: Check to see if there's a `frame - 1` in the receive portion. Remove both if there is. + + self.burst_rpt_counter += 1 + print(static.RX_BURST_BUFFER) missing_frames = [ frame + 1 @@ -576,13 +579,13 @@ class DATA: rpt_frame = bytearray(self.length_sig1_frame) rpt_frame[:1] = bytes([FR_TYPE.FR_REPEAT.value]) rpt_frame[1:2] = self.session_id - rpt_frame[2:2+len(missing_frames)] = missing_frames + rpt_frame[2:2 + len(missing_frames)] = missing_frames self.log.info("[TNC] ARQ | RX | Requesting", frames=missing_frames) # Transmit frame self.enqueue_frame_for_tx([rpt_frame], c2_mode=FREEDV_MODE.sig1.value, copies=1, repeat_delay=0) - def send_burst_nack_frame(self, snr: bytes) -> None: + def send_burst_nack_frame(self, snr: float) -> None: """Build and send NACK frame for received DATA frame""" nack_frame = bytearray(self.length_sig1_frame) @@ -592,11 +595,13 @@ class DATA: nack_frame[3:4] = bytes([int(self.speed_level)]) nack_frame[4:8] = len(static.RX_FRAME_BUFFER).to_bytes(4, byteorder="big") - - # TRANSMIT NACK FRAME FOR BURST # TODO: Do we have to send ident frame? - # self.enqueue_frame_for_tx([ack_frame, self.send_ident_frame(False)], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) + # self.enqueue_frame_for_tx([ack_frame, self.send_ident_frame(False)], + # c2_mode=FREEDV_MODE.sig1.value, + # copies=3, + # repeat_delay=0 + # ) # wait while timeout not reached and our busy state is busy channel_busy_timeout = time.time() + 5 @@ -607,7 +612,7 @@ class DATA: # reset burst timeout in case we had to wait too long self.burst_last_received = time.time() - def send_burst_nack_frame_watchdog(self, snr: bytes, tx_n_frames_per_burst) -> None: + def send_burst_nack_frame_watchdog(self, snr: float, tx_n_frames_per_burst) -> None: """Build and send NACK frame for watchdog timeout""" # increment nack counter for transmission stats @@ -643,7 +648,12 @@ class DATA: disconnection_frame[1:2] = self.session_id disconnection_frame[2:5] = static.DXCALLSIGN_CRC # TODO: Needed? disconnection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) - # self.enqueue_frame_for_tx([disconnection_frame, self.send_ident_frame(False)], c2_mode=FREEDV_MODE.sig0.value, copies=5, repeat_delay=0) + # self.enqueue_frame_for_tx([disconnection_frame, + # self.send_ident_frame(False)], + # c2_mode=FREEDV_MODE.sig0.value, + # copies=5, + # repeat_delay=0 + # ) # TODO: We need to add the ident frame feature with a seperate PR after publishing latest protocol # TODO: We need to wait some time between last arq related signalling frame and ident frame # TODO: Maybe about 500ms - 1500ms to avoid confusion and too much PTT toggles @@ -656,15 +666,12 @@ class DATA: self.enqueue_frame_for_tx([disconnection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=3, repeat_delay=0) def arq_data_received( - self, data_in: bytes, bytes_per_frame: int, snr: float, freedv + self, data_in: bytes, snr: float ) -> None: """ Args: data_in:bytes: - bytes_per_frame:int: snr:float: - freedv: - Returns: """ # We've arrived here from process_data which already checked that the frame @@ -673,7 +680,10 @@ class DATA: # only process data if we are in ARQ and BUSY state else return to quit if not static.ARQ_STATE and static.TNC_STATE not in ["BUSY"]: - self.log.warning("[TNC] wrong tnc state - dropping data", arq_state=static.ARQ_STATE, tnc_state=static.TNC_STATE) + self.log.warning("[TNC] wrong tnc state - dropping data", + arq_state=static.ARQ_STATE, + tnc_state=static.TNC_STATE + ) return self.arq_file_transfer = True @@ -690,7 +700,7 @@ class DATA: self.rx_n_frame_of_burst = int.from_bytes(bytes(data_in[:1]), "big") - 10 # Get number of bursts from received frame self.rx_n_frames_per_burst = int.from_bytes(bytes(data_in[1:2]), "big") - + # The RX burst buffer needs to have a fixed length filled with "None". # We need this later for counting the "Nones" to detect missing data. # Check if burst buffer has expected length else create it @@ -700,7 +710,6 @@ class DATA: # Append data to rx burst buffer static.RX_BURST_BUFFER[self.rx_n_frame_of_burst] = data_in[self.arq_burst_header_size:] # type: ignore - static.DXGRID = b'------' helpers.add_to_heard_stations( static.DXCALLSIGN, @@ -737,7 +746,7 @@ class DATA: "[TNC] ARQ | RX | wrong byteorder received - dropping data" ) # we need to run a return here, so we are not sending an ACK - #return + # return except Exception as e: self.log.warning( "[TNC] ARQ | RX | wrong byteorder check failed", e=e @@ -801,15 +810,14 @@ class DATA: and not self.rx_frame_eof_received and data_in.find(self.data_frame_eof) < 0 ): - - self.arq_calculate_speed_level(snr) + self.arq_calculate_speed_level() self.data_channel_last_received = int(time.time()) + 6 + 6 self.burst_last_received = int(time.time()) + 6 + 6 # Create and send ACK frame self.log.info("[TNC] ARQ | RX | SENDING ACK", finished=static.ARQ_SECONDS_UNTIL_FINISH, bytesperminute=static.ARQ_BYTES_PER_MINUTE) - + self.send_burst_ack_frame(snr) # Reset n retries per burst counter @@ -835,7 +843,7 @@ class DATA: irs=helpers.bool_to_string(self.is_IRS) ) - #elif self.rx_n_frame_of_burst == self.rx_n_frames_per_burst: + # elif self.rx_n_frame_of_burst == self.rx_n_frames_per_burst: # # We have "Nones" in our rx buffer, # # Check if we received last frame of burst - this is an indicator for missed frames. # # With this way of doing this, we always MUST receive the last @@ -851,19 +859,20 @@ class DATA: # self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) # ) - #elif self.rx_n_frame_of_burst not in [self.rx_n_frames_per_burst - 1]: + # elif self.rx_n_frame_of_burst not in [self.rx_n_frames_per_burst - 1]: # self.log.info( # "[TNC] data_handler: received burst", # frame=self.rx_n_frame_of_burst + 1, # frames=self.rx_n_frames_per_burst, # ) - #else: - # self.log.error( - # "[TNC] data_handler: Should not reach this point...", - # frame=self.rx_n_frame_of_burst + 1, - # frames=self.rx_n_frames_per_burst, - # ) + # send repeat without waiting for timeout + # --> this should speed up requesting instead of waiting + elif self.rx_n_frame_of_burst == self.rx_n_frames_per_burst and self.burst_rpt_counter == 0: + frames_left = static.RX_BURST_BUFFER.count(None) + self.burst_last_received = time.time() + self.time_list[self.speed_level] * frames_left + self.send_retransmit_request_frame() + else: self.log.warning( "[TNC] data_handler: missing data in burst buffer...", @@ -939,7 +948,10 @@ class DATA: if static.ENABLE_STATS: self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration) - self.log.info("[TNC] ARQ | RX | Sending NACK", finished=static.ARQ_SECONDS_UNTIL_FINISH, bytesperminute=static.ARQ_BYTES_PER_MINUTE) + self.log.info("[TNC] ARQ | RX | Sending NACK", + finished=static.ARQ_SECONDS_UNTIL_FINISH, + bytesperminute=static.ARQ_BYTES_PER_MINUTE + ) self.send_burst_nack_frame(snr) # Update arq_session timestamp @@ -982,8 +994,7 @@ class DATA: else: return True - - def arq_calculate_speed_level(self, snr): + def arq_calculate_speed_level(self): self.frame_received_counter += 1 # try increasing speed level only if we had two successful decodes if self.frame_received_counter >= 2: @@ -995,7 +1006,6 @@ class DATA: if static.SNR >= self.snr_list[new_speed_level]: self.speed_level = new_speed_level - else: self.log.info("[TNC] ARQ | increasing speed level not possible because of SNR limit", given_snr=static.SNR, @@ -1006,75 +1016,95 @@ class DATA: if not self.check_if_mode_fits_to_busy_slot(): self.speed_level = 0 - static.ARQ_SPEED_LEVEL = self.speed_level # Update modes we are listening to self.set_listening_modes(False, True, self.mode_list[self.speed_level]) def arq_process_received_data_frame(self, data_frame, snr): - """ + """ """ - # transmittion duration - duration = time.time() - self.rx_start_of_transmission - self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) - ) - self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter, - bytesperminute=static.ARQ_BYTES_PER_MINUTE, total_bytes=static.TOTAL_BYTES, duration=duration) + # transmittion duration + duration = time.time() - self.rx_start_of_transmission + self.calculate_transfer_rate_rx( + self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) + ) + self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter, + bytesperminute=static.ARQ_BYTES_PER_MINUTE, total_bytes=static.TOTAL_BYTES, duration=duration) - # Decompress the data frame - data_frame_decompressed = lzma.decompress(data_frame) - static.ARQ_COMPRESSION_FACTOR = len(data_frame_decompressed) / len( - data_frame - ) - data_frame = data_frame_decompressed + # Decompress the data frame + data_frame_decompressed = lzma.decompress(data_frame) + static.ARQ_COMPRESSION_FACTOR = len(data_frame_decompressed) / len( + data_frame + ) + data_frame = data_frame_decompressed - self.transmission_uuid = str(uuid.uuid4()) - timestamp = int(time.time()) + self.transmission_uuid = str(uuid.uuid4()) + timestamp = int(time.time()) - # Re-code data_frame in base64, UTF-8 for JSON UI communication. - base64_data = base64.b64encode(data_frame).decode("UTF-8") + # Re-code data_frame in base64, UTF-8 for JSON UI communication. + base64_data = base64.b64encode(data_frame).decode("UTF-8") - # check if RX_BUFFER isn't full - if not RX_BUFFER.full(): - # make sure we have always the correct buffer size - RX_BUFFER.maxsize = int(static.RX_BUFFER_SIZE) - else: - # if full, free space by getting an item - self.log.info( - "[TNC] ARQ | RX | RX_BUFFER FULL - dropping old data", - buffer_size=RX_BUFFER.qsize(), - maxsize=int(static.RX_BUFFER_SIZE) - ) - RX_BUFFER.get() - - # add item to RX_BUFFER + # check if RX_BUFFER isn't full + if not RX_BUFFER.full(): + # make sure we have always the correct buffer size + RX_BUFFER.maxsize = int(static.RX_BUFFER_SIZE) + else: + # if full, free space by getting an item self.log.info( - "[TNC] ARQ | RX | saving data to rx buffer", - buffer_size=RX_BUFFER.qsize() + 1, - maxsize=RX_BUFFER.maxsize + "[TNC] ARQ | RX | RX_BUFFER FULL - dropping old data", + buffer_size=RX_BUFFER.qsize(), + maxsize=int(static.RX_BUFFER_SIZE) ) + RX_BUFFER.get() + + # add item to RX_BUFFER + self.log.info( + "[TNC] ARQ | RX | saving data to rx buffer", + buffer_size=RX_BUFFER.qsize() + 1, + maxsize=RX_BUFFER.maxsize + ) + try: + RX_BUFFER.put( + [ + self.transmission_uuid, + timestamp, + static.DXCALLSIGN, + static.DXGRID, + base64_data, + ] + ) + except Exception as e: + # File "/usr/lib/python3.7/queue.py", line 133, in put + # if self.maxsize > 0 + # TypeError: '>' not supported between instances of 'str' and 'int' + # + # Occurs on Raspberry Pi and Python 3.7 + self.log.error( + "[TNC] ARQ | RX | error occurred when saving data!", + e=e, + uuid=self.transmission_uuid, + timestamp=timestamp, + dxcall=static.DXCALLSIGN, + dxgrid=static.DXGRID, + data=base64_data + ) + + if static.ARQ_SAVE_TO_FOLDER: try: - RX_BUFFER.put( - [ - self.transmission_uuid, - timestamp, - static.DXCALLSIGN, - static.DXGRID, - base64_data, - ] + self.save_data_to_folder( + self.transmission_uuid, + timestamp, + self.mycallsign, + static.DXCALLSIGN, + static.DXGRID, + data_frame ) except Exception as e: - # File "/usr/lib/python3.7/queue.py", line 133, in put - # if self.maxsize > 0 - # TypeError: '>' not supported between instances of 'str' and 'int' - # - # Occurs on Raspberry Pi and Python 3.7 self.log.error( - "[TNC] ARQ | RX | error occurred when saving data!", + "[TNC] ARQ | RX | can't save file to folder", e=e, uuid=self.transmission_uuid, timestamp=timestamp, @@ -1083,75 +1113,51 @@ class DATA: data=base64_data ) - if static.ARQ_SAVE_TO_FOLDER: - try: - self.save_data_to_folder( - self.transmission_uuid, - timestamp, - self.mycallsign, - static.DXCALLSIGN, - static.DXGRID, - data_frame - ) - except Exception as e: - self.log.error( - "[TNC] ARQ | RX | can't save file to folder", - e=e, - uuid=self.transmission_uuid, - timestamp=timestamp, - dxcall=static.DXCALLSIGN, - dxgrid=static.DXGRID, - data=base64_data - ) + self.send_data_to_socket_queue( + freedata="tnc-message", + arq="transmission", + status="received", + uuid=self.transmission_uuid, + percent=static.ARQ_TRANSMISSION_PERCENT, + bytesperminute=static.ARQ_BYTES_PER_MINUTE, + compression=static.ARQ_COMPRESSION_FACTOR, + timestamp=timestamp, + finished=0, + mycallsign=str(self.mycallsign, "UTF-8"), + dxcallsign=str(static.DXCALLSIGN, "UTF-8"), + dxgrid=str(static.DXGRID, "UTF-8"), + data=base64_data, + irs=helpers.bool_to_string(self.is_IRS) + ) - self.send_data_to_socket_queue( - freedata="tnc-message", - arq="transmission", - status="received", - uuid=self.transmission_uuid, - percent=static.ARQ_TRANSMISSION_PERCENT, - bytesperminute=static.ARQ_BYTES_PER_MINUTE, - compression=static.ARQ_COMPRESSION_FACTOR, - timestamp=timestamp, - finished=0, - mycallsign=str(self.mycallsign, "UTF-8"), - dxcallsign=str(static.DXCALLSIGN, "UTF-8"), - dxgrid=str(static.DXGRID, "UTF-8"), - data=base64_data, - irs=helpers.bool_to_string(self.is_IRS) - ) + if static.ENABLE_STATS: + duration = time.time() - self.rx_start_of_transmission + self.stats.push(frame_nack_counter=self.frame_nack_counter, status="received", duration=duration) - if static.ENABLE_STATS: - duration = time.time() - self.rx_start_of_transmission - self.stats.push(frame_nack_counter=self.frame_nack_counter, status="received", duration=duration) + self.log.info( + "[TNC] ARQ | RX | SENDING DATA FRAME ACK") - self.log.info( - "[TNC] ARQ | RX | SENDING DATA FRAME ACK") + self.send_data_ack_frame(snr) + # Update statistics AFTER the frame ACK is sent + self.calculate_transfer_rate_rx( + self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) + ) - self.send_data_ack_frame(snr) - # Update statistics AFTER the frame ACK is sent - self.calculate_transfer_rate_rx( - self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER) - ) + self.log.info( + "[TNC] | RX | DATACHANNEL [" + + str(self.mycallsign, "UTF-8") + + "]<< >>[" + + str(static.DXCALLSIGN, "UTF-8") + + "]", + snr=snr, + ) - self.log.info( - "[TNC] | RX | DATACHANNEL [" - + str(self.mycallsign, "UTF-8") - + "]<< >>[" - + str(static.DXCALLSIGN, "UTF-8") - + "]", - snr=snr, - ) - - def arq_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int): + def arq_transmit(self, data_out: bytes): """ Transmit ARQ frame Args: data_out:bytes: - mode:int: - n_frames_per_burst:int: - """ # set signalling modes we want to listen to @@ -1215,8 +1221,7 @@ class DATA: # Initial bufferposition is 0 bufferposition = 0 bufferposition_end = 0 - bufferposition_temp = 0 - + bufferposition_burst_start = 0 # Iterate through data_out buffer while not self.data_frame_ack_received and static.ARQ_STATE: @@ -1244,13 +1249,14 @@ class DATA: self.log.info("[TNC] early buffer info", bufferposition=bufferposition, bufferposition_end=bufferposition_end, - bufferposition_temp=bufferposition_temp + bufferposition_burst_start=bufferposition_burst_start ) # check for maximum frames per burst for remaining data n_frames_per_burst = 1 if self.max_n_frames_per_burst > 1: - while (payload_per_frame * n_frames_per_burst) % len(data_out[bufferposition_temp:]) == (payload_per_frame * n_frames_per_burst): + while (payload_per_frame * n_frames_per_burst) % len(data_out[bufferposition_burst_start:]) == ( + payload_per_frame * n_frames_per_burst): threading.Event().wait(0.01) print((payload_per_frame * n_frames_per_burst) % len(data_out)) n_frames_per_burst += 1 @@ -1263,17 +1269,23 @@ class DATA: tempbuffer = [] self.rpt_request_buffer = [] # Append data frames with n_frames_per_burst to tempbuffer - for n_frame in range(0,n_frames_per_burst): + for n_frame in range(0, n_frames_per_burst): + + if n_frame == 0: + bufferposition = bufferposition_burst_start + arqheader = bytearray() arqheader[:1] = bytes([FR_TYPE.BURST_01.value + n_frame]) - #####arqheader[:1] = bytes([FR_TYPE.BURST_01.value]) + # ####arqheader[:1] = bytes([FR_TYPE.BURST_01.value]) arqheader[1:2] = bytes([n_frames_per_burst]) arqheader[2:3] = self.session_id # only check for buffer position if at least one NACK received - self.log.info("[TNC] ----- data buffer position:", iss_buffer_pos=bufferposition, irs_bufferposition=self.irs_buffer_position) + self.log.info("[TNC] ----- data buffer position:", iss_buffer_pos=bufferposition, + irs_bufferposition=self.irs_buffer_position) if self.frame_nack_counter > 0 and self.irs_buffer_position != bufferposition: - self.log.error("[TNC] ----- data buffer offset:", iss_buffer_pos=bufferposition, irs_bufferposition=self.irs_buffer_position) + self.log.error("[TNC] ----- data buffer offset:", iss_buffer_pos=bufferposition, + irs_bufferposition=self.irs_buffer_position) # only adjust buffer position for experimental versions if 'exp' in static.VERSION: self.log.warning("[TNC] ----- data adjustment disabled!") @@ -1294,7 +1306,7 @@ class DATA: ) frame = arqheader + extended_data_out - ######tempbuffer = frame # [frame] + # #####tempbuffer = frame # [frame] tempbuffer.append(frame) # add data to our repeat request buffer for easy access if we received a request self.rpt_request_buffer.append(frame) @@ -1313,11 +1325,11 @@ class DATA: # After transmission finished, wait for an ACK or RPT frame while ( - static.ARQ_STATE - and not self.burst_ack - and not self.burst_nack - and not self.rpt_request_received - and not self.data_frame_ack_received + static.ARQ_STATE + and not self.burst_ack + and not self.burst_nack + and not self.rpt_request_received + and not self.data_frame_ack_received ): threading.Event().wait(0.01) @@ -1326,14 +1338,16 @@ class DATA: self.burst_ack = False # reset ack state self.tx_n_retry_of_burst = 0 # reset retries self.log.debug( - "[TNC] arq_transmit: Received BURST ACK. Sending next chunk." - , irs_snr=self.burst_ack_snr) + "[TNC] arq_transmit: Received BURST ACK. Sending next chunk.", + irs_snr=self.burst_ack_snr) # update temp bufferposition for n frames per burst early calculation - bufferposition_temp = bufferposition_end + bufferposition_burst_start = bufferposition_end # pylint: disable=unused-variable break # break retry loop if self.burst_nack: self.burst_nack = False # reset nack state + # fall back to starting bufferposition + bufferposition = bufferposition_burst_start if self.data_frame_ack_received: self.log.debug( @@ -1362,6 +1376,7 @@ class DATA: # update buffer position bufferposition = bufferposition_end + bufferposition_burst_start = bufferposition_end # update stats self.calculate_transfer_rate_tx( @@ -1613,11 +1628,10 @@ class DATA: self.log.info("[TNC] ARQ REPEAT RECEIVED") - - #self.rpt_request_received = True + # self.rpt_request_received = True # Update data_channel timestamp self.data_channel_last_received = int(time.time()) - #self.rpt_request_buffer = [] + # self.rpt_request_buffer = [] missing_area = bytes(data_in[2:12]) # 1:9 missing_area = missing_area.strip(b"\x00") @@ -1626,16 +1640,15 @@ class DATA: tempbuffer_rptframes = [] for i in range(0, len(missing_area)): - print(missing_area[i]) - missing_frames_buffer_position = missing_area[i] -1 + missing_frames_buffer_position = missing_area[i] - 1 tempbuffer_rptframes.append(self.rpt_request_buffer[missing_frames_buffer_position]) self.log.info("[TNC] SENDING REPEAT....") data_mode = self.mode_list[self.speed_level] self.enqueue_frame_for_tx(tempbuffer_rptframes, c2_mode=data_mode) - #for i in range(0, 6, 2): + # for i in range(0, 6, 2): # if not missing_area[i: i + 2].endswith(b"\x00\x00"): # self.rpt_request_buffer.insert(0, missing_area[i: i + 2]) @@ -1823,7 +1836,7 @@ class DATA: ) return True - def received_session_opener(self, data_in: bytes) -> None: + def received_session_opener(self, data_in: bytes) -> bool: """ Received a session open request packet. @@ -2095,7 +2108,7 @@ class DATA: threading.Event().wait(0.01) if static.ARQ_STATE: - self.arq_transmit(data_out, mode, n_frames_per_burst) + self.arq_transmit(data_out) return True return False @@ -2150,7 +2163,7 @@ class DATA: arq="transmission", status="opening", mycallsign=str(mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign,'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), irs=helpers.bool_to_string(self.is_IRS) ) @@ -2182,7 +2195,7 @@ class DATA: self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - timeout = time.time() + self.duration_sig1_frame * 3 + (static.TX_DELAY/1000 * 2) + timeout = time.time() + self.duration_sig1_frame * 3 + (static.TX_DELAY / 1000 * 2) while time.time() < timeout: threading.Event().wait(0.01) # Stop waiting if data channel is opened @@ -2327,7 +2340,6 @@ class DATA: # initially set speed_level 0 in case of bad SNR and no matching mode self.speed_level = 0 - # TODO: MOVE THIS TO arq_calculate_speed_level() # calculate speed level in correlation to latest known SNR for i in range(len(self.mode_list)): @@ -2595,7 +2607,7 @@ class DATA: uuid=str(uuid.uuid4()), timestamp=int(time.time()), dxgrid=str(static.DXGRID, "UTF-8"), - dxcallsign = str(dxcallsign, "UTF-8"), + dxcallsign=str(dxcallsign, "UTF-8"), mycallsign=str(mycallsign, "UTF-8"), snr=str(static.SNR), ) @@ -2641,7 +2653,7 @@ class DATA: uuid=str(uuid.uuid4()), timestamp=int(time.time()), dxgrid=str(static.DXGRID, "UTF-8"), - dxcallsign = str(static.DXCALLSIGN, "UTF-8"), + dxcallsign=str(static.DXCALLSIGN, "UTF-8"), mycallsign=str(mycallsign, "UTF-8"), snr=str(static.SNR), dxsnr=str(dxsnr) @@ -2904,7 +2916,7 @@ class DATA: # self.duration_sig1_frame * 4 == 4 slots # in self.duration_sig1_frame increments. self.log.info("[TNC] Waiting for QRV slot...") - helpers.wait(randrange(0, int(self.duration_sig1_frame*4), self.duration_sig1_frame*10 // 10.0)) + helpers.wait(randrange(0, int(self.duration_sig1_frame * 4), int(self.duration_sig1_frame))) self.send_data_to_socket_queue( freedata="tnc-message", qrv="transmitting", @@ -3024,7 +3036,8 @@ class DATA: static.ARQ_BYTES_PER_MINUTE = int( receivedbytes / (transmissiontime / 60) ) - static.ARQ_SECONDS_UNTIL_FINISH = int(((static.TOTAL_BYTES - receivedbytes) / (static.ARQ_BYTES_PER_MINUTE * static.ARQ_COMPRESSION_FACTOR)) * 60) -20 # offset because of frame ack/nack + static.ARQ_SECONDS_UNTIL_FINISH = int(((static.TOTAL_BYTES - receivedbytes) / ( + static.ARQ_BYTES_PER_MINUTE * static.ARQ_COMPRESSION_FACTOR)) * 60) - 20 # offset because of frame ack/nack speed_chart = {"snr": static.SNR, "bpm": static.ARQ_BYTES_PER_MINUTE, "timestamp": int(time.time())} # check if data already in list @@ -3084,10 +3097,11 @@ class DATA: if sentbytes > 0: static.ARQ_BITS_PER_SECOND = int((sentbytes * 8) / transmissiontime) static.ARQ_BYTES_PER_MINUTE = int(sentbytes / (transmissiontime / 60)) - static.ARQ_SECONDS_UNTIL_FINISH = int(((tx_buffer_length - sentbytes) / (static.ARQ_BYTES_PER_MINUTE* static.ARQ_COMPRESSION_FACTOR)) * 60 ) + static.ARQ_SECONDS_UNTIL_FINISH = int(((tx_buffer_length - sentbytes) / ( + static.ARQ_BYTES_PER_MINUTE * static.ARQ_COMPRESSION_FACTOR)) * 60) - - speed_chart = {"snr": self.burst_ack_snr, "bpm": static.ARQ_BYTES_PER_MINUTE, "timestamp": int(time.time())} + speed_chart = {"snr": self.burst_ack_snr, "bpm": static.ARQ_BYTES_PER_MINUTE, + "timestamp": int(time.time())} # check if data already in list if speed_chart not in static.SPEED_LIST: static.SPEED_LIST.append(speed_chart) @@ -3286,7 +3300,7 @@ class DATA: timeout = self.burst_last_received + (self.time_list[self.speed_level] * frames_left) # TODO: Enable this for development - #print(f"timeout expected in:{round(timeout - time.time())} | frames left: {frames_left} of {self.rx_n_frames_per_burst} | speed level: {self.speed_level}") + # print(f"timeout expected in:{round(timeout - time.time())} | frames left: {frames_left} of {self.rx_n_frames_per_burst} | speed level: {self.speed_level}") if timeout <= time.time() or modem_error_state: self.log.warning( "[TNC] Burst decoding error or timeout", @@ -3296,12 +3310,12 @@ class DATA: modem_error_state=modem_error_state ) - print(f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {static.RX_BURST_BUFFER.count(None)}") + print( + f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {static.RX_BURST_BUFFER.count(None)}") if self.rx_n_frames_per_burst > 1 and self.burst_rpt_counter < 3 and static.RX_BURST_BUFFER.count(None) > 0: # reset self.burst_last_received self.burst_last_received = time.time() + self.time_list[self.speed_level] * frames_left - self.burst_rpt_counter += 1 self.send_retransmit_request_frame() else: @@ -3327,13 +3341,12 @@ class DATA: # Update modes we are listening to self.set_listening_modes(True, True, self.mode_list[self.speed_level]) - # TODO: Does SNR make sense for NACK if we dont have an actual SNR information? self.send_burst_nack_frame_watchdog(0, tx_n_frames_per_burst) # Update data_channel timestamp # TODO: Disabled this one for testing. - #self.data_channel_last_received = time.time() + # self.data_channel_last_received = time.time() self.n_retries_per_burst += 1 else: # print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time()) @@ -3454,7 +3467,7 @@ class DATA: frame_to_tx=[fec_frame], c2_mode=codec2.FREEDV_MODE[mode].value ) - def send_fec_is_writing(self, mycallsign) -> None: + def send_fec_is_writing(self, mycallsign) -> bool: """Send an empty test frame""" fec_frame = bytearray(14) @@ -3522,6 +3535,8 @@ class DATA: else: message = b'' filename = b'' + data = b'' + checksum_delivered = b'' # save file to folder if filename not in [b'', b'undefined']: @@ -3546,4 +3561,4 @@ class DATA: file.write(message) except Exception as e: - self.log.error("[TNC] error saving data to folder", e=e) \ No newline at end of file + self.log.error("[TNC] error saving data to folder", e=e) diff --git a/tnc/static.py b/tnc/static.py index 30da410e..09d8c74a 100644 --- a/tnc/static.py +++ b/tnc/static.py @@ -12,7 +12,7 @@ import subprocess from enum import Enum CHANNEL_BUSY_SLOT = [False] * 5 -VERSION = "0.9.0-alpha-exp.4" +VERSION = "0.9.0-alpha-exp.5" ENABLE_EXPLORER = False ENABLE_STATS = False @@ -88,8 +88,6 @@ AUDIO_ENABLE_TCI: bool = False TCI_IP: str = '127.0.0.1' TCI_PORT: int = '9000' - - AUDIO_DBFS: int = 0 FFT: list = [0] ENABLE_FFT: bool = True @@ -141,7 +139,8 @@ INFO: list = [] # ------- CODEC2 SETTINGS TUNING_RANGE_FMIN: float = -50.0 TUNING_RANGE_FMAX: float = 50.0 -IS_CODEC2_TRAFFIC: bool = False # true if we have codec2 signalling mode traffic on channel +IS_CODEC2_TRAFFIC: bool = False # true if we have codec2 signalling mode traffic on channel + class FRAME_TYPE(Enum): """Lookup for frame types"""