diff --git a/tnc/data_handler.py b/tnc/data_handler.py index 2c2c1594..225fdc6e 100644 --- a/tnc/data_handler.py +++ b/tnc/data_handler.py @@ -8,9 +8,7 @@ Created on Sun Dec 27 20:43:40 2020 # pylint: disable=invalid-name, line-too-long, c-extension-no-member # pylint: disable=import-outside-toplevel -import asyncio import base64 -import logging import queue import sys import threading @@ -25,7 +23,6 @@ import ujson as json import codec2 import helpers -import log_handler import modem import sock import static @@ -36,10 +33,11 @@ DATA_QUEUE_TRANSMIT = queue.Queue() DATA_QUEUE_RECEIVED = queue.Queue() -class DATA(): +class DATA: """ Terminal Node Controller for FreeDATA """ + def __init__(self): - self.mycallsign = static.MYCALLSIGN # initial callsign. Will be overwritten later + self.mycallsign = static.MYCALLSIGN # initial call sign. Will be overwritten later self.data_queue_transmit = DATA_QUEUE_TRANSMIT self.data_queue_received = DATA_QUEUE_RECEIVED @@ -55,20 +53,20 @@ class DATA(): self.received_mycall_crc = b'' # Received my callsign crc if we received a crc for another ssid - self.data_channel_last_received = 0.0 # time of last "live sign" of a frame - self.burst_ack_snr = 0 # SNR from received ack frames - self.burst_ack = False # if we received an acknowledge frame for a burst - self.data_frame_ack_received = False # if we received an acknowledge frame for a data frame - self.rpt_request_received = False # if we received an request for repeater frames - self.rpt_request_buffer = [] # requested frames, saved in a list - self.rx_start_of_transmission = 0 # time of transmission start - self.data_frame_bof = b'BOF' # 2 bytes for the BOF End of File indicator in a data frame - self.data_frame_eof = b'EOF' # 2 bytes for the EOF End of File indicator in a data frame + self.data_channel_last_received = 0.0 # time of last "live sign" of a frame + self.burst_ack_snr = 0 # SNR from received ack frames + self.burst_ack = False # if we received an acknowledge frame for a burst + self.data_frame_ack_received = False # if we received an acknowledge frame for a data frame + self.rpt_request_received = False # if we received an request for repeater frames + self.rpt_request_buffer = [] # requested frames, saved in a list + self.rx_start_of_transmission = 0 # time of transmission start + self.data_frame_bof = b'BOF' # 2 bytes for the BOF End of File indicator in a data frame + self.data_frame_eof = b'EOF' # 2 bytes for the EOF End of File indicator in a data frame self.rx_n_max_retries_per_burst = 50 self.n_retries_per_burst = 0 - self.received_low_bandwith_mode = False # indicator if we recevied a low bandwith mode channel opener + self.received_low_bandwith_mode = False # indicator if we recevied a low bandwith mode channel opener self.data_channel_max_retries = 5 self.datachannel_timeout = False @@ -76,19 +74,19 @@ class DATA(): self.mode_list_low_bw = [14, 12] self.time_list_low_bw = [3, 7] - self.mode_list_high_bw = [14, 12, 10] #201 = FSK mode list of available modes, each mode will be used 2times per speed level - self.time_list_high_bw = [3, 7, 8, 30] # list for time to wait for correspinding mode in seconds + self.mode_list_high_bw = [14, 12, 10] # mode list of available modes,each mode will be used 2 times per level + self.time_list_high_bw = [3, 7, 8, 30] # list for time to wait for corresponding mode in seconds - # mode list for selecting between low bandwith ( 500Hz ) and normal modes with higher bandwith + # mode list for selecting between low bandwidth ( 500Hz ) and normal modes with higher bandwidth if static.LOW_BANDWITH_MODE: - self.mode_list = self.mode_list_low_bw # mode list of available modes, each mode will be used 2times per speed level - self.time_list = self.time_list_low_bw # list for time to wait for correspinding mode in seconds + self.mode_list = self.mode_list_low_bw # mode list of available modes, each mode will be used 2times per speed level + self.time_list = self.time_list_low_bw # list for time to wait for corresponding mode in seconds else: - self.mode_list = self.mode_list_high_bw # mode list of available modes, each mode will be used 2times per speed level - self.time_list = self.time_list_high_bw # list for time to wait for correspinding mode in seconds + self.mode_list = self.mode_list_high_bw # mode list of available modes, each mode will be used 2times per speed level + self.time_list = self.time_list_high_bw # list for time to wait for corresponding mode in seconds - self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode + self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode static.ARQ_SPEED_LEVEL = self.speed_level self.is_IRS = False @@ -101,21 +99,22 @@ class DATA(): self.transmission_timeout = 360 # transmission timeout in seconds - worker_thread_transmit = threading.Thread(target=self.worker_transmit, name="worker thread transmit",daemon=True) + worker_thread_transmit = threading.Thread(target=self.worker_transmit, name="worker thread transmit", + daemon=True) worker_thread_transmit.start() - worker_thread_receive = threading.Thread(target=self.worker_receive, name="worker thread receive",daemon=True) + worker_thread_receive = threading.Thread(target=self.worker_receive, name="worker thread receive", daemon=True) worker_thread_receive.start() # START THE THREAD FOR THE TIMEOUT WATCHDOG - watchdog_thread = threading.Thread(target=self.watchdog, name="watchdog",daemon=True) + watchdog_thread = threading.Thread(target=self.watchdog, name="watchdog", daemon=True) watchdog_thread.start() - arq_session_thread = threading.Thread(target=self.heartbeat, name="watchdog",daemon=True) + arq_session_thread = threading.Thread(target=self.heartbeat, name="watchdog", daemon=True) arq_session_thread.start() self.beacon_interval = 0 - self.beacon_thread = threading.Thread(target=self.run_beacon, name="watchdog",daemon=True) + self.beacon_thread = threading.Thread(target=self.run_beacon, name="watchdog", daemon=True) self.beacon_thread.start() def worker_transmit(self): @@ -204,7 +203,7 @@ class DATA(): if 50 >= frametype >= 10: # get snr of received data - #snr = self.calculate_snr(freedv) + # snr = self.calculate_snr(freedv) # we need to find a way of fixing this because after moving to class system this doesn't work anymore snr = static.SNR structlog.get_logger("structlog").debug("[TNC] RX SNR", snr=snr) @@ -212,7 +211,7 @@ class DATA(): self.arq_data_received(bytes(bytes_out[:-2]), bytes_per_frame, snr, freedv) # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync - #if static.RX_BURST_BUFFER.count(None) <= 1 or (frame+1) == n_frames_per_burst: + # if static.RX_BURST_BUFFER.count(None) <= 1 or (frame+1) == n_frames_per_burst: # structlog.get_logger("structlog").debug(f"[TNC] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") # self.c_lib.freedv_set_sync(freedv, 0) @@ -287,7 +286,7 @@ class DATA(): self.arq_received_channel_is_open(bytes_out[:-2]) # ARQ MANUAL MODE TRANSMISSION - elif 230 <= frametype <= 240 : + elif 230 <= frametype <= 240: structlog.get_logger("structlog").debug("[TNC] ARQ manual mode") self.arq_received_data_channel_opener(bytes_out[:-2]) @@ -339,8 +338,8 @@ class DATA(): def send_burst_ack_frame(self, snr): """ Build and send ACK frame for burst DATA frame """ - ack_frame = bytearray(14) - ack_frame[:1] = bytes([60]) + ack_frame = bytearray(14) + ack_frame[:1] = bytes([60]) ack_frame[1:4] = static.DXCALLSIGN_CRC ack_frame[4:7] = static.MYCALLSIGN_CRC ack_frame[7:8] = bytes([int(snr)]) @@ -351,8 +350,8 @@ class DATA(): def send_data_ack_frame(self, snr): """ Build and send ACK frame for received DATA frame """ - ack_frame = bytearray(14) - ack_frame[:1] = bytes([61]) + ack_frame = bytearray(14) + ack_frame[:1] = bytes([61]) ack_frame[1:4] = static.DXCALLSIGN_CRC ack_frame[4:7] = static.MYCALLSIGN_CRC ack_frame[7:8] = bytes([int(snr)]) @@ -368,15 +367,15 @@ class DATA(): # set n frames per burst to modem # this is an idea so its not getting lost.... # we need to work on this - codec2.api.freedv_set_frames_per_burst(freedv,len(missing_frames)) + codec2.api.freedv_set_frames_per_burst(freedv, len(missing_frames)) # TODO: Trim `missing_frames` bytesarray to [7:13] (6) frames, if it's larger. # then create a repeat frame - rpt_frame = bytearray(14) - rpt_frame[:1] = bytes([62]) - rpt_frame[1:4] = static.DXCALLSIGN_CRC - rpt_frame[4:7] = static.MYCALLSIGN_CRC + rpt_frame = bytearray(14) + rpt_frame[:1] = bytes([62]) + rpt_frame[1:4] = static.DXCALLSIGN_CRC + rpt_frame[4:7] = static.MYCALLSIGN_CRC rpt_frame[7:13] = missing_frames structlog.get_logger("structlog").info("[TNC] ARQ | RX | Requesting", frames=missing_frames) @@ -385,8 +384,8 @@ class DATA(): def send_burst_nack_frame(self, snr=0): """ Build and send NACK frame for received DATA frame """ - nack_frame = bytearray(14) - nack_frame[:1] = bytes([63]) + nack_frame = bytearray(14) + nack_frame[:1] = bytes([63]) nack_frame[1:4] = static.DXCALLSIGN_CRC nack_frame[4:7] = static.MYCALLSIGN_CRC nack_frame[7:8] = bytes([int(snr)]) @@ -397,8 +396,8 @@ class DATA(): def send_burst_nack_frame_watchdog(self, snr=0): """ Build and send NACK frame for watchdog timeout """ - nack_frame = bytearray(14) - nack_frame[:1] = bytes([64]) + nack_frame = bytearray(14) + nack_frame[:1] = bytes([64]) nack_frame[1:4] = static.DXCALLSIGN_CRC nack_frame[4:7] = static.MYCALLSIGN_CRC nack_frame[7:8] = bytes([int(snr)]) @@ -409,10 +408,10 @@ class DATA(): def send_disconnect_frame(self): """ Build and send a disconnect frame """ - disconnection_frame = bytearray(14) - disconnection_frame[:1] = bytes([223]) - disconnection_frame[1:4] = static.DXCALLSIGN_CRC - disconnection_frame[4:7] = static.MYCALLSIGN_CRC + disconnection_frame = bytearray(14) + disconnection_frame[:1] = bytes([223]) + disconnection_frame[1:4] = static.DXCALLSIGN_CRC + disconnection_frame[4:7] = static.MYCALLSIGN_CRC disconnection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) self.enqueue_frame_for_tx(disconnection_frame, copies=5, repeat_delay=250) @@ -446,15 +445,13 @@ class DATA(): self.arq_file_transfer = True - RX_PAYLOAD_PER_MODEM_FRAME = bytes_per_frame - 2 # payload per moden frame - static.TNC_STATE = 'BUSY' static.ARQ_STATE = True static.INFO.append("ARQ;RECEIVING") self.data_channel_last_received = int(time.time()) # get some important data from the frame - RX_N_FRAME_OF_BURST = int.from_bytes(bytes(data_in[:1]), "big") - 10 # get number of burst frame + RX_N_FRAME_OF_BURST = int.from_bytes(bytes(data_in[:1]), "big") - 10 # get number of burst frame RX_N_FRAMES_PER_BURST = int.from_bytes(bytes(data_in[1:2]), "big") # get number of bursts from received frame # The RX burst buffer needs to have a fixed length filled with "None". @@ -464,11 +461,12 @@ class DATA(): static.RX_BURST_BUFFER = [None] * RX_N_FRAMES_PER_BURST # Append data to rx burst buffer - static.RX_BURST_BUFFER[RX_N_FRAME_OF_BURST] = data_in[8:] # [frame_type][n_frames_per_burst][CRC24][CRC24] + static.RX_BURST_BUFFER[RX_N_FRAME_OF_BURST] = data_in[8:] # [frame_type][n_frames_per_burst][CRC24][CRC24] structlog.get_logger("structlog").debug("[TNC] static.RX_BURST_BUFFER", buffer=static.RX_BURST_BUFFER) - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', snr, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', snr, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) # Check if we received all frames in the burst by checking if burst buffer has no more "Nones" # This is the ideal case because we received all data @@ -477,7 +475,7 @@ class DATA(): # the temp burst buffer is needed for checking, if we already recevied data temp_burst_buffer = b'' for value in static.RX_BURST_BUFFER: - #static.RX_FRAME_BUFFER += static.RX_BURST_BUFFER[i] + # static.RX_FRAME_BUFFER += static.RX_BURST_BUFFER[i] temp_burst_buffer += bytes(value) # if frame buffer ends not with the current frame, we are going to append new data @@ -495,7 +493,7 @@ class DATA(): # search_area --> area where we want to search search_area = 510 - search_position = len(static.RX_FRAME_BUFFER)-search_area + search_position = len(static.RX_FRAME_BUFFER) - search_area # find position of data. returns -1 if nothing found in area else >= 0 # we are beginning from the end, so if data exists twice or more, only the last one should be replaced get_position = static.RX_FRAME_BUFFER[search_position:].rfind(temp_burst_buffer) @@ -503,7 +501,8 @@ class DATA(): if get_position >= 0: static.RX_FRAME_BUFFER = static.RX_FRAME_BUFFER[:search_position + get_position] static.RX_FRAME_BUFFER += temp_burst_buffer - structlog.get_logger("structlog").warning("[TNC] ARQ | RX | replacing existing buffer data", area=search_area, pos=get_position) + structlog.get_logger("structlog").warning("[TNC] ARQ | RX | replacing existing buffer data", + area=search_area, pos=get_position) # if we dont find data n this range, we really have new data and going to replace it else: static.RX_FRAME_BUFFER += temp_burst_buffer @@ -511,8 +510,8 @@ class DATA(): # lets check if we didnt receive a BOF and EOF yet to avoid sending ack frames if we already received all data if (not self.rx_frame_bof_received and - not self.rx_frame_eof_received and - data_in.find(self.data_frame_eof) < 0): + not self.rx_frame_eof_received and + data_in.find(self.data_frame_eof) < 0): self.frame_received_counter += 1 if self.frame_received_counter >= 2: @@ -535,18 +534,20 @@ class DATA(): # calculate statistics self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)) - elif RX_N_FRAME_OF_BURST == RX_N_FRAMES_PER_BURST -1: + elif RX_N_FRAME_OF_BURST == RX_N_FRAMES_PER_BURST - 1: # We have "Nones" in our rx buffer, # Check if we received last frame of burst - this is an indicator for missed frames. # With this way of doing this, we always MUST receive the last frame of a burst otherwise the entire # burst is lost - structlog.get_logger("structlog").debug("[TNC] all frames in burst received:", frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST) + structlog.get_logger("structlog").debug("[TNC] all frames in burst received:", frame=RX_N_FRAME_OF_BURST, + frames=RX_N_FRAMES_PER_BURST) self.send_retransmit_request_frame(freedv) self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)) # Should never reach this point else: - structlog.get_logger("structlog").error("[TNC] data_handler: Should not reach this point...", frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST) + structlog.get_logger("structlog").error("[TNC] data_handler: Should not reach this point...", + frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST) # We have a BOF and EOF flag in our data. If we received both we received our frame. # In case of loosing data but we received already a BOF and EOF we need to make sure, we @@ -555,28 +556,28 @@ class DATA(): eof_position = static.RX_FRAME_BUFFER.find(self.data_frame_eof) # get total bytes per transmission information as soon we recevied a frame with a BOF - if bof_position >=0: - - payload = static.RX_FRAME_BUFFER[bof_position+len(self.data_frame_bof):eof_position] - frame_length = int.from_bytes(payload[4:8], "big") #4:8 4bytes + if bof_position >= 0: + payload = static.RX_FRAME_BUFFER[bof_position + len(self.data_frame_bof):eof_position] + frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes static.TOTAL_BYTES = frame_length - compression_factor = int.from_bytes(payload[8:9], "big") #4:8 4bytes - compression_factor = np.clip(compression_factor, 0, 255) #limit to max value of 255 + compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes + compression_factor = np.clip(compression_factor, 0, 255) # limit to max value of 255 static.ARQ_COMPRESSION_FACTOR = compression_factor / 10 self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)) if bof_position >= 0 and eof_position > 0 and None not in static.RX_BURST_BUFFER: - structlog.get_logger("structlog").debug("[TNC] arq_data_received:", bof_position=bof_position, eof_position=eof_position) + structlog.get_logger("structlog").debug("[TNC] arq_data_received:", bof_position=bof_position, + eof_position=eof_position) # print(f"bof_position {bof_position} / eof_position {eof_position}") self.rx_frame_bof_received = True self.rx_frame_eof_received = True # Extract raw data from buffer - payload = static.RX_FRAME_BUFFER[bof_position+len(self.data_frame_bof):eof_position] + payload = static.RX_FRAME_BUFFER[bof_position + len(self.data_frame_bof):eof_position] # Get the data frame crc - data_frame_crc = payload[:4] #0:4 4bytes + data_frame_crc = payload[:4] # 0:4 4bytes # Get the data frame length - frame_length = int.from_bytes(payload[4:8], "big") #4:8 4bytes + frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes static.TOTAL_BYTES = frame_length # 8:9 = compression factor @@ -606,37 +607,44 @@ class DATA(): # Re-code data_frame in base64, UTF-8 for JSON UI communication. base64_data = base64.b64encode(data_frame).decode("utf-8") static.RX_BUFFER.append([uniqueid, timestamp, static.DXCALLSIGN, static.DXGRID, base64_data]) - jsondata = {"arq":"received", "uuid" : uniqueid, "timestamp": timestamp, "mycallsign" : str(mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), "dxgrid": str(static.DXGRID, 'utf-8'), "data": base64_data} + jsondata = {"arq": "received", "uuid": uniqueid, "timestamp": timestamp, + "mycallsign": str(mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), + "dxgrid": str(static.DXGRID, 'utf-8'), "data": base64_data} json_data_out = json.dumps(jsondata) structlog.get_logger("structlog").debug("[TNC] arq_data_received:", jsondata=jsondata) # print(jsondata) sock.SOCKET_QUEUE.put(json_data_out) static.INFO.append("ARQ;RECEIVING;SUCCESS") - structlog.get_logger("structlog").info("[TNC] ARQ | RX | SENDING DATA FRAME ACK", snr=snr, crc=data_frame_crc.hex()) + structlog.get_logger("structlog").info("[TNC] ARQ | RX | SENDING DATA FRAME ACK", snr=snr, + crc=data_frame_crc.hex()) self.send_data_ack_frame(snr) # update our statistics AFTER the frame ACK self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)) structlog.get_logger("structlog").info("[TNC] | RX | DATACHANNEL [" + - str(self.mycallsign, 'utf-8') + "]<< >>[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=snr) + str(self.mycallsign, 'utf-8') + "]<< >>[" + str( + static.DXCALLSIGN, 'utf-8') + "]", snr=snr) else: static.INFO.append("ARQ;RECEIVING;FAILED") - structlog.get_logger("structlog").warning("[TNC] ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!", e="wrong crc", expected=data_frame_crc, received=data_frame_crc_received, overflows=static.BUFFER_OVERFLOW_COUNTER) + structlog.get_logger("structlog").warning("[TNC] ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!", + e="wrong crc", expected=data_frame_crc, + received=data_frame_crc_received, + overflows=static.BUFFER_OVERFLOW_COUNTER) structlog.get_logger("structlog").info("[TNC] ARQ | RX | Sending NACK") self.send_burst_nack_frame(snr) # update session timeout - self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp + self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp # And finally we do a cleanup of our buffers and states # do cleanup only when not in testmode if not TESTMODE: self.arq_cleanup() - def arq_transmit(self, data_out:bytes, mode:int, n_frames_per_burst:int): + def arq_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int): """ Args: @@ -649,27 +657,28 @@ class DATA(): """ self.arq_file_transfer = True - self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode + self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode static.ARQ_SPEED_LEVEL = self.speed_level - TX_N_SENT_BYTES = 0 # already sent bytes per data frame - self.tx_n_retry_of_burst = 0 # retries we already sent data - TX_N_MAX_RETRIES_PER_BURST = 50 # max amount of retries we sent before frame is lost - TX_N_FRAMES_PER_BURST = n_frames_per_burst # amount of n frames per burst + TX_N_SENT_BYTES = 0 # already sent bytes per data frame + self.tx_n_retry_of_burst = 0 # retries we already sent data + TX_N_MAX_RETRIES_PER_BURST = 50 # max amount of retries we sent before frame is lost + TX_N_FRAMES_PER_BURST = n_frames_per_burst # amount of n frames per burst TX_BUFFER = [] # our buffer for appending new data # TIMEOUTS - BURST_ACK_TIMEOUT_SECONDS = 3.0 # timeout for burst acknowledges - DATA_FRAME_ACK_TIMEOUT_SECONDS = 3.0 # timeout for data frame acknowledges - RPT_ACK_TIMEOUT_SECONDS = 3.0 # timeout for rpt frame acknowledges + BURST_ACK_TIMEOUT_SECONDS = 3.0 # timeout for burst acknowledges + DATA_FRAME_ACK_TIMEOUT_SECONDS = 3.0 # timeout for data frame acknowledges + RPT_ACK_TIMEOUT_SECONDS = 3.0 # timeout for rpt frame acknowledges # save len of data_out to TOTAL_BYTES for our statistics --> kBytes - #static.TOTAL_BYTES = round(len(data_out) / 1024, 2) + # static.TOTAL_BYTES = round(len(data_out) / 1024, 2) static.TOTAL_BYTES = len(data_out) frame_total_size = len(data_out).to_bytes(4, byteorder='big') static.INFO.append("ARQ;TRANSMITTING") - jsondata = {"arq":"transmission", "status" :"transmitting", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "transmitting", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) @@ -694,7 +703,7 @@ class DATA(): # data_out = self.data_frame_bof + frame_payload_crc + data_out + self.data_frame_eof data_out = self.data_frame_bof + frame_payload_crc + frame_total_size + compression_factor + data_out + self.data_frame_eof - #initial bufferposition is 0 + # initial bufferposition is 0 bufferposition = bufferposition_end = 0 # iterate through data out buffer @@ -721,7 +730,7 @@ class DATA(): self.speed_level = 0 ''' - #if self.tx_n_retry_of_burst <= 1: + # if self.tx_n_retry_of_burst <= 1: # self.speed_level += 1 # if self.speed_level >= len(self.mode_list)-1: # self.speed_level = len(self.mode_list)-1 @@ -734,10 +743,11 @@ class DATA(): static.ARQ_SPEED_LEVEL = self.speed_level data_mode = self.mode_list[self.speed_level] - structlog.get_logger("structlog").debug("[TNC] Speed-level:", level=self.speed_level, retry=self.tx_n_retry_of_burst, mode=data_mode) + structlog.get_logger("structlog").debug("[TNC] Speed-level:", level=self.speed_level, + retry=self.tx_n_retry_of_burst, mode=data_mode) # payload information - payload_per_frame = modem.get_bytes_per_frame(data_mode) -2 + payload_per_frame = modem.get_bytes_per_frame(data_mode) - 2 # tempbuffer list for storing our data frames tempbuffer = [] @@ -746,8 +756,8 @@ class DATA(): # TODO: this part needs a complete rewrite! # TX_N_FRAMES_PER_BURST = 1 is working - arqheader = bytearray() - arqheader[:1] = bytes([10]) #bytes([10 + i]) + arqheader = bytearray() + arqheader[:1] = bytes([10]) # bytes([10 + i]) arqheader[1:2] = bytes([TX_N_FRAMES_PER_BURST]) arqheader[2:5] = static.DXCALLSIGN_CRC arqheader[5:8] = static.MYCALLSIGN_CRC @@ -756,8 +766,8 @@ class DATA(): # normal behavior if bufferposition_end <= len(data_out): - frame = data_out[bufferposition:bufferposition_end] - frame = arqheader + frame + frame = data_out[bufferposition:bufferposition_end] + frame = arqheader + frame # this point shouldnt reached that often elif bufferposition > len(data_out): @@ -766,19 +776,20 @@ class DATA(): # the last bytes of a frame else: extended_data_out = data_out[bufferposition:] - extended_data_out += bytes([0]) * (payload_per_frame-len(extended_data_out)-len(arqheader)) + extended_data_out += bytes([0]) * (payload_per_frame - len(extended_data_out) - len(arqheader)) frame = arqheader + extended_data_out # append frame to tempbuffer for transmission tempbuffer.append(frame) structlog.get_logger("structlog").debug("[TNC] tempbuffer:", tempbuffer=tempbuffer) - structlog.get_logger("structlog").info("[TNC] ARQ | TX | FRAMES", mode=data_mode, fpb=TX_N_FRAMES_PER_BURST, retry=self.tx_n_retry_of_burst) + structlog.get_logger("structlog").info("[TNC] ARQ | TX | FRAMES", mode=data_mode, + fpb=TX_N_FRAMES_PER_BURST, retry=self.tx_n_retry_of_burst) # we need to set our TRANSMITTING flag before we are adding an object the transmit queue # this is not that nice, we could improve this somehow static.TRANSMITTING = True - modem.MODEM_TRANSMIT_QUEUE.put([data_mode,1,0,tempbuffer]) + modem.MODEM_TRANSMIT_QUEUE.put([data_mode, 1, 0, tempbuffer]) # wait while transmitting while static.TRANSMITTING: @@ -790,37 +801,39 @@ class DATA(): while not self.burst_ack and not self.burst_nack and not self.rpt_request_received and not self.data_frame_ack_received and time.time() < burstacktimeout and static.ARQ_STATE: time.sleep(0.01) ''' - #burstacktimeout = time.time() + BURST_ACK_TIMEOUT_SECONDS + 100 + # burstacktimeout = time.time() + BURST_ACK_TIMEOUT_SECONDS + 100 while (static.ARQ_STATE and not - (self.burst_ack or self.burst_nack or - self.rpt_request_received or self.data_frame_ack_received)): + (self.burst_ack or self.burst_nack or + self.rpt_request_received or self.data_frame_ack_received)): time.sleep(0.01) # once we received a burst ack, reset its state and break the RETRIES loop if self.burst_ack: - self.burst_ack = False # reset ack state - self.tx_n_retry_of_burst = 0 # reset retries - break #break retry loop + self.burst_ack = False # reset ack state + self.tx_n_retry_of_burst = 0 # reset retries + break # break retry loop if self.burst_nack: - self.burst_nack = False #reset nack state + self.burst_nack = False # reset nack state # not yet implemented if self.rpt_request_received: pass if self.data_frame_ack_received: - break #break retry loop + break # break retry loop # we need this part for leaving the repeat loop # static.ARQ_STATE == 'DATA' --> when stopping transmission manually if not static.ARQ_STATE: - #print("not ready for data...leaving loop....") + # print("not ready for data...leaving loop....") break self.calculate_transfer_rate_tx(tx_start_of_transmission, bufferposition_end, len(data_out)) # NEXT ATTEMPT - structlog.get_logger("structlog").debug("[TNC] ATTEMPT:", retry=self.tx_n_retry_of_burst, maxretries=TX_N_MAX_RETRIES_PER_BURST, overflows=static.BUFFER_OVERFLOW_COUNTER) + structlog.get_logger("structlog").debug("[TNC] ATTEMPT:", retry=self.tx_n_retry_of_burst, + maxretries=TX_N_MAX_RETRIES_PER_BURST, + overflows=static.BUFFER_OVERFLOW_COUNTER) # update buffer position bufferposition = bufferposition_end @@ -828,27 +841,34 @@ class DATA(): # update stats self.calculate_transfer_rate_tx(tx_start_of_transmission, bufferposition_end, len(data_out)) - jsondata = {"arq":"transmission", "status" :"transmitting", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "transmitting", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - #GOING TO NEXT ITERATION + # GOING TO NEXT ITERATION if self.data_frame_ack_received: static.INFO.append("ARQ;TRANSMITTING;SUCCESS") - jsondata = {"arq":"transmission", "status" :"success", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "success", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - structlog.get_logger("structlog").info("[TNC] ARQ | TX | DATA TRANSMITTED!", BytesPerMinute=static.ARQ_BYTES_PER_MINUTE, BitsPerSecond=static.ARQ_BITS_PER_SECOND, overflows=static.BUFFER_OVERFLOW_COUNTER) + structlog.get_logger("structlog").info("[TNC] ARQ | TX | DATA TRANSMITTED!", + BytesPerMinute=static.ARQ_BYTES_PER_MINUTE, + BitsPerSecond=static.ARQ_BITS_PER_SECOND, + overflows=static.BUFFER_OVERFLOW_COUNTER) else: static.INFO.append("ARQ;TRANSMITTING;FAILED") - jsondata = {"arq":"transmission", "status" :"failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - structlog.get_logger("structlog").info("[TNC] ARQ | TX | TRANSMISSION FAILED OR TIME OUT!", overflows=static.BUFFER_OVERFLOW_COUNTER) + structlog.get_logger("structlog").info("[TNC] ARQ | TX | TRANSMISSION FAILED OR TIME OUT!", + overflows=static.BUFFER_OVERFLOW_COUNTER) self.stop_transmission() # and last but not least doing a state cleanup @@ -860,7 +880,7 @@ class DATA(): sys.exit(0) # signalling frames received - def burst_ack_received(self, data_in:bytes): + def burst_ack_received(self, data_in: bytes): """ Args: @@ -870,17 +890,18 @@ class DATA(): """ # increase speed level if we received a burst ack - #self.speed_level += 1 - #if self.speed_level >= len(self.mode_list)-1: + # self.speed_level += 1 + # if self.speed_level >= len(self.mode_list)-1: # self.speed_level = len(self.mode_list)-1 # only process data if we are in ARQ and BUSY state if static.ARQ_STATE: - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) self.burst_ack = True # Force data loops of TNC to stop and continue with next frame - self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp - self.burst_ack_snr= int.from_bytes(bytes(data_in[5:6]), "big") - self.speed_level= int.from_bytes(bytes(data_in[6:7]), "big") + self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp + self.burst_ack_snr = int.from_bytes(bytes(data_in[5:6]), "big") + self.speed_level = int.from_bytes(bytes(data_in[6:7]), "big") static.ARQ_SPEED_LEVEL = self.speed_level structlog.get_logger("structlog").debug("[TNC] burst_ack_received:", speed_level=self.speed_level) # print(self.speed_level) @@ -890,7 +911,7 @@ class DATA(): self.n_retries_per_burst = 0 # signalling frames received - def burst_nack_received(self, data_in:bytes): + def burst_nack_received(self, data_in: bytes): """ Args: @@ -900,17 +921,18 @@ class DATA(): """ # increase speed level if we received a burst ack - #self.speed_level += 1 - #if self.speed_level >= len(self.mode_list)-1: + # self.speed_level += 1 + # if self.speed_level >= len(self.mode_list)-1: # self.speed_level = len(self.mode_list)-1 # only process data if we are in ARQ and BUSY state if static.ARQ_STATE: - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) self.burst_nack = True # Force data loops of TNC to stop and continue with next frame - self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp - self.burst_ack_snr= int.from_bytes(bytes(data_in[5:6]), "big") - self.speed_level= int.from_bytes(bytes(data_in[6:7]), "big") + self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp + self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big") + self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big") static.ARQ_SPEED_LEVEL = self.speed_level self.burst_nack_counter += 1 structlog.get_logger("structlog").debug("[TNC] burst_nack_received:", speed_level=self.speed_level) @@ -920,12 +942,13 @@ class DATA(): """ """ # only process data if we are in ARQ and BUSY state if static.ARQ_STATE: - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) self.data_frame_ack_received = True # Force data loops of TNC to stop and continue with next frame - self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp - self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp + self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp + self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp - def frame_nack_received(self, data_in:bytes): # pylint: disable=unused-argument + def frame_nack_received(self, data_in: bytes): # pylint: disable=unused-argument """ Args: @@ -934,17 +957,19 @@ class DATA(): Returns: """ - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) static.INFO.append("ARQ;TRANSMITTING;FAILED") - jsondata = {"arq":"transmission", "status" : "failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp + self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp if not TESTMODE: self.arq_cleanup() - def burst_rpt_received(self, data_in:bytes): + def burst_rpt_received(self, data_in: bytes): """ Args: @@ -955,10 +980,11 @@ class DATA(): """ # only process data if we are in ARQ and BUSY state if static.ARQ_STATE and static.TNC_STATE == 'BUSY': - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) self.rpt_request_received = True - self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp + self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp self.rpt_request_buffer = [] missing_area = bytes(data_in[3:12]) # 1:9 @@ -982,7 +1008,9 @@ class DATA(): """ # das hier müssen wir checken. Sollte vielleicht in INIT!!! self.datachannel_timeout = False - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE) + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", + state=static.ARQ_SESSION_STATE) self.open_session(callsign) @@ -1010,16 +1038,19 @@ class DATA(): self.IS_ARQ_SESSION_MASTER = True static.ARQ_SESSION_STATE = 'connecting' - connection_frame = bytearray(14) - connection_frame[:1] = bytes([221]) - connection_frame[1:4] = static.DXCALLSIGN_CRC - connection_frame[4:7] = static.MYCALLSIGN_CRC + connection_frame = bytearray(14) + connection_frame[:1] = bytes([221]) + connection_frame[1:4] = static.DXCALLSIGN_CRC + connection_frame[4:7] = static.MYCALLSIGN_CRC connection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) while not static.ARQ_SESSION: time.sleep(0.01) - for attempt in range(1,self.session_connect_max_retries+1): - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>?<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", a=attempt, state=static.ARQ_SESSION_STATE) + for attempt in range(1, self.session_connect_max_retries + 1): + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>?<<[" + str(static.DXCALLSIGN, + 'utf-8') + "]", a=attempt, + state=static.ARQ_SESSION_STATE) self.enqueue_frame_for_tx(connection_frame) @@ -1040,7 +1071,7 @@ class DATA(): self.close_session() return False - def received_session_opener(self, data_in:bytes): + def received_session_opener(self, data_in: bytes): """ Args: @@ -1057,8 +1088,11 @@ class DATA(): static.DXCALLSIGN_CRC = bytes(data_in[4:7]) static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", + state=static.ARQ_SESSION_STATE) static.ARQ_SESSION = True static.TNC_STATE = 'BUSY' @@ -1067,8 +1101,11 @@ class DATA(): def close_session(self): """ Close the ARQ session """ static.ARQ_SESSION_STATE = 'disconnecting' - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]", + state=static.ARQ_SESSION_STATE) static.INFO.append("ARQ;SESSION;CLOSE") self.IS_ARQ_SESSION_MASTER = False static.ARQ_SESSION = False @@ -1077,7 +1114,7 @@ class DATA(): self.send_disconnect_frame() - def received_session_close(self, data_in:bytes): + def received_session_close(self, data_in: bytes): """ Closes the session when a close session frame is received and the DXCALLSIGN_CRC matches the remote station participating in the session. @@ -1091,8 +1128,11 @@ class DATA(): _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) if _valid_crc: static.ARQ_SESSION_STATE = 'disconnected' - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]", + state=static.ARQ_SESSION_STATE) static.INFO.append("ARQ;SESSION;CLOSE") self.IS_ARQ_SESSION_MASTER = False @@ -1105,14 +1145,14 @@ class DATA(): # static.TNC_STATE = 'BUSY' # static.ARQ_SESSION_STATE = 'connected' - connection_frame = bytearray(14) - connection_frame[:1] = bytes([222]) + connection_frame = bytearray(14) + connection_frame[:1] = bytes([222]) connection_frame[1:4] = static.DXCALLSIGN_CRC connection_frame[4:7] = static.MYCALLSIGN_CRC self.enqueue_frame_for_tx(connection_frame) - def received_session_heartbeat(self, data_in:bytes): + def received_session_heartbeat(self, data_in: bytes): """ Args: @@ -1125,9 +1165,10 @@ class DATA(): _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) if _valid_crc: structlog.get_logger("structlog").debug("[TNC] Received session heartbeat") - helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp + self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp static.ARQ_SESSION = True static.ARQ_SESSION_STATE = 'connected' @@ -1139,7 +1180,8 @@ class DATA(): # ############################################################################################################ # ARQ DATA CHANNEL HANDLER # ############################################################################################################ - def open_dc_and_transmit(self, data_out:bytes, mode:int, n_frames_per_burst:int, transmission_uuid:str, mycallsign): + def open_dc_and_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int, transmission_uuid: str, + mycallsign): """ Args: @@ -1178,9 +1220,9 @@ class DATA(): if static.ARQ_STATE: self.arq_transmit(data_out, mode, n_frames_per_burst) else: - return False + return False - def arq_open_data_channel(self, mode:int, n_frames_per_burst:int, mycallsign): + def arq_open_data_channel(self, mode: int, n_frames_per_burst: int, mycallsign): """ Args: @@ -1205,18 +1247,21 @@ class DATA(): structlog.get_logger("structlog").debug("[TNC] Requesting manual mode --> not yet implemented ") frametype = bytes([mode]) - connection_frame = bytearray(14) - connection_frame[:1] = frametype - connection_frame[1:4] = static.DXCALLSIGN_CRC - connection_frame[4:7] = static.MYCALLSIGN_CRC - connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign) + connection_frame = bytearray(14) + connection_frame[:1] = frametype + connection_frame[1:4] = static.DXCALLSIGN_CRC + connection_frame[4:7] = static.MYCALLSIGN_CRC + connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign) connection_frame[13:14] = bytes([n_frames_per_burst]) while not static.ARQ_STATE: time.sleep(0.01) - for attempt in range(1,self.data_channel_max_retries+1): + for attempt in range(1, self.data_channel_max_retries + 1): static.INFO.append("DATACHANNEL;OPENING") - structlog.get_logger("structlog").info("[TNC] ARQ | DATA | TX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", attempt=f"{str(attempt)}/{str(self.data_channel_max_retries)}") + structlog.get_logger("structlog").info( + "[TNC] ARQ | DATA | TX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, + 'utf-8') + "]", + attempt=f"{str(attempt)}/{str(self.data_channel_max_retries)}") self.enqueue_frame_for_tx(connection_frame) @@ -1232,13 +1277,18 @@ class DATA(): if attempt == self.data_channel_max_retries: static.INFO.append("DATACHANNEL;FAILED") - structlog.get_logger("structlog").debug("[TNC] arq_open_data_channel:", transmission_uuid=self.transmission_uuid) + structlog.get_logger("structlog").debug("[TNC] arq_open_data_channel:", + transmission_uuid=self.transmission_uuid) # print(self.transmission_uuid) - jsondata = {"arq":"transmission", "status" :"failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE} + jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid, + "percent": static.ARQ_TRANSMISSION_PERCENT, + "bytesperminute": static.ARQ_BYTES_PER_MINUTE} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - structlog.get_logger("structlog").warning("[TNC] ARQ | TX | DATA [" + str(mycallsign, 'utf-8') + "]>>X<<[" + str(static.DXCALLSIGN, 'utf-8') + "]") + structlog.get_logger("structlog").warning( + "[TNC] ARQ | TX | DATA [" + str(mycallsign, 'utf-8') + "]>>X<<[" + str(static.DXCALLSIGN, + 'utf-8') + "]") self.datachannel_timeout = True if not TESTMODE: self.arq_cleanup() @@ -1247,9 +1297,9 @@ class DATA(): # open_session frame and can still hear us. self.close_session() return False - #sys.exit() # close thread and so connection attempts + # sys.exit() # close thread and so connection attempts - def arq_received_data_channel_opener(self, data_in:bytes): + def arq_received_data_channel_opener(self, data_in: bytes): """ Args: @@ -1266,7 +1316,7 @@ class DATA(): n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big") frametype = int.from_bytes(bytes(data_in[:1]), "big") - #check if we received low bandwith mode + # check if we received low bandwith mode if frametype == 225: self.received_low_bandwith_mode = False self.mode_list = self.mode_list_high_bw @@ -1284,7 +1334,8 @@ class DATA(): # updated modes we are listening to self.set_listening_modes(self.mode_list[self.speed_level]) - helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) # check if callsign ssid override valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4]) @@ -1294,7 +1345,9 @@ class DATA(): self.arq_cleanup() return - structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", bandwith="wide") + structlog.get_logger("structlog").info( + "[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", + bandwith="wide") static.ARQ_STATE = True static.TNC_STATE = 'BUSY' @@ -1310,20 +1363,25 @@ class DATA(): frametype = bytes([226]) structlog.get_logger("structlog").debug("[TNC] Responding with high bandwidth mode") - connection_frame = bytearray(14) - connection_frame[:1] = frametype - connection_frame[1:4] = static.DXCALLSIGN_CRC - connection_frame[4:7] = static.MYCALLSIGN_CRC - connection_frame[13:14] = bytes([static.ARQ_PROTOCOL_VERSION]) #crc8 of version for checking protocol version + connection_frame = bytearray(14) + connection_frame[:1] = frametype + connection_frame[1:4] = static.DXCALLSIGN_CRC + connection_frame[4:7] = static.MYCALLSIGN_CRC + connection_frame[13:14] = bytes([static.ARQ_PROTOCOL_VERSION]) # crc8 of version for checking protocol version self.enqueue_frame_for_tx(connection_frame) - structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", bandwith="wide", snr=static.SNR) + structlog.get_logger("structlog").info( + "[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", + bandwith="wide", snr=static.SNR) # set start of transmission for our statistics self.rx_start_of_transmission = time.time() - def arq_received_channel_is_open(self, data_in:bytes): + # reset our data channel watchdog + self.data_channel_last_received = int(time.time()) + + def arq_received_channel_is_open(self, data_in: bytes): """ Called if we received a data channel opener Args: @@ -1350,9 +1408,13 @@ class DATA(): self.speed_level = len(self.mode_list) - 1 structlog.get_logger("structlog").debug("[TNC] high bandwidth mode", modes=self.mode_list) - helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, + static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) - structlog.get_logger("structlog").info("[TNC] ARQ | DATA | TX | [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR) + structlog.get_logger("structlog").info( + "[TNC] ARQ | DATA | TX | [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, + 'utf-8') + "]", + snr=static.SNR) # as soon as we set ARQ_STATE to DATA, transmission starts static.ARQ_STATE = True @@ -1361,11 +1423,12 @@ class DATA(): static.TNC_STATE = 'IDLE' static.ARQ_STATE = False static.INFO.append("PROTOCOL;VERSION_MISMATCH") - structlog.get_logger("structlog").warning("[TNC] protocol version mismatch:", received=protocol_version, own=static.ARQ_PROTOCOL_VERSION) + structlog.get_logger("structlog").warning("[TNC] protocol version mismatch:", received=protocol_version, + own=static.ARQ_PROTOCOL_VERSION) self.arq_cleanup() # ---------- PING - def transmit_ping(self, dxcallsign:bytes): + def transmit_ping(self, dxcallsign: bytes): """ Funktion for controlling pings Args: @@ -1378,12 +1441,13 @@ class DATA(): static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) static.INFO.append("PING;SENDING") - structlog.get_logger("structlog").info("[TNC] PING REQ [" + str(self.mycallsign, 'utf-8') + "] >>> [" + str(static.DXCALLSIGN, 'utf-8') + "]" ) + structlog.get_logger("structlog").info( + "[TNC] PING REQ [" + str(self.mycallsign, 'utf-8') + "] >>> [" + str(static.DXCALLSIGN, 'utf-8') + "]") - ping_frame = bytearray(14) - ping_frame[:1] = bytes([210]) - ping_frame[1:4] = static.DXCALLSIGN_CRC - ping_frame[4:7] = static.MYCALLSIGN_CRC + ping_frame = bytearray(14) + ping_frame[:1] = bytes([210]) + ping_frame[1:4] = static.DXCALLSIGN_CRC + ping_frame[4:7] = static.MYCALLSIGN_CRC ping_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) structlog.get_logger("structlog").info("[TNC] ENABLE FSK", state=static.ENABLE_FSK) @@ -1392,7 +1456,7 @@ class DATA(): else: self.enqueue_frame_for_tx(ping_frame) - def received_ping(self, data_in:bytes): + def received_ping(self, data_in: bytes): """ Called if we received a ping @@ -1404,7 +1468,8 @@ class DATA(): """ static.DXCALLSIGN_CRC = bytes(data_in[4:7]) static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) - helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) static.INFO.append("PING;RECEIVING") @@ -1416,10 +1481,12 @@ class DATA(): # print("ping not for me...") return - structlog.get_logger("structlog").info("[TNC] PING REQ [" + str(mycallsign, 'utf-8') + "] <<< [" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR ) + structlog.get_logger("structlog").info( + "[TNC] PING REQ [" + str(mycallsign, 'utf-8') + "] <<< [" + str(static.DXCALLSIGN, 'utf-8') + "]", + snr=static.SNR) - ping_frame = bytearray(14) - ping_frame[:1] = bytes([211]) + ping_frame = bytearray(14) + ping_frame[:1] = bytes([211]) ping_frame[1:4] = static.DXCALLSIGN_CRC ping_frame[4:7] = static.MYCALLSIGN_CRC ping_frame[7:13] = static.MYGRID @@ -1430,7 +1497,7 @@ class DATA(): else: self.enqueue_frame_for_tx(ping_frame) - def received_ping_ack(self, data_in:bytes): + def received_ping_ack(self, data_in: bytes): """ Called if a PING ack has been received Args: @@ -1442,15 +1509,20 @@ class DATA(): static.DXCALLSIGN_CRC = bytes(data_in[4:7]) static.DXGRID = bytes(data_in[7:13]).rstrip(b'\x00') - jsondata = {"type" : "ping", "status" : "ack", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), "dxgrid": str(static.DXGRID, 'utf-8'), "snr": str(static.SNR)} + jsondata = {"type": "ping", "status": "ack", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()), + "mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), + "dxgrid": str(static.DXGRID, 'utf-8'), "snr": str(static.SNR)} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) - helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING-ACK', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING-ACK', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) static.INFO.append("PING;RECEIVEDACK") - structlog.get_logger("structlog").info("[TNC] PING ACK [" + str(self.mycallsign, 'utf-8') + "] >|< [" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR ) + structlog.get_logger("structlog").info( + "[TNC] PING ACK [" + str(self.mycallsign, 'utf-8') + "] >|< [" + str(static.DXCALLSIGN, 'utf-8') + "]", + snr=static.SNR) static.TNC_STATE = 'IDLE' def stop_transmission(self): @@ -1458,11 +1530,11 @@ class DATA(): Force a stop of the running transmission """ structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") - stop_frame = bytearray(14) - stop_frame[:1] = bytes([249]) + stop_frame = bytearray(14) + stop_frame[:1] = bytes([249]) stop_frame[1:4] = static.DXCALLSIGN_CRC stop_frame[4:7] = static.MYCALLSIGN_CRC - stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) + stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) self.enqueue_frame_for_tx(stop_frame, copies=2, repeat_delay=250) @@ -1484,9 +1556,9 @@ class DATA(): # ----------- BROADCASTS def run_beacon(self): """ - Controlling funktion for running a beacon + Controlling function for running a beacon Args: - interval:int: + self: arq class Returns: @@ -1500,8 +1572,8 @@ class DATA(): structlog.get_logger("structlog").info("[TNC] Sending beacon!", interval=self.beacon_interval) beacon_frame = bytearray(14) - beacon_frame[:1] = bytes([250]) - beacon_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) + beacon_frame[:1] = bytes([250]) + beacon_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) beacon_frame[9:13] = static.MYGRID[:4] structlog.get_logger("structlog").info("[TNC] ENABLE FSK", state=static.ENABLE_FSK) @@ -1518,7 +1590,7 @@ class DATA(): structlog.get_logger("structlog").debug("[TNC] run_beacon: ", exception=e) # print(e) - def received_beacon(self, data_in:bytes): + def received_beacon(self, data_in: bytes): """ Called if we received a beacon Args: @@ -1531,19 +1603,23 @@ class DATA(): dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7])) dxgrid = bytes(data_in[9:13]).rstrip(b'\x00') - jsondata = {"type" : "beacon", "status" : "received", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)} + jsondata = {"type": "beacon", "status": "received", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()), + "mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), + "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) static.INFO.append("BEACON;RECEIVING") - structlog.get_logger("structlog").info("[TNC] BEACON RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR) - helpers.add_to_heard_stations(dxcallsign,dxgrid, 'BEACON', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] BEACON RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR) + helpers.add_to_heard_stations(dxcallsign, dxgrid, 'BEACON', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) def transmit_cq(self): """ Transmit a CQ Args: - Nothing + self Returns: Nothing @@ -1551,9 +1627,9 @@ class DATA(): structlog.get_logger("structlog").info("[TNC] CQ CQ CQ") static.INFO.append("CQ;SENDING") - cq_frame = bytearray(14) - cq_frame[:1] = bytes([200]) - cq_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) + cq_frame = bytearray(14) + cq_frame[:1] = bytes([200]) + cq_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) cq_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("utf-8")) structlog.get_logger("structlog").info("[TNC] ENABLE FSK", state=static.ENABLE_FSK) @@ -1564,7 +1640,7 @@ class DATA(): else: self.enqueue_frame_for_tx(cq_frame) - def received_cq(self, data_in:bytes): + def received_cq(self, data_in: bytes): """ Called when we receive a CQ frame Args: @@ -1579,8 +1655,10 @@ class DATA(): # print(dxcallsign) dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "utf-8") static.INFO.append("CQ;RECEIVING") - structlog.get_logger("structlog").info("[TNC] CQ RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR) - helpers.add_to_heard_stations(dxcallsign, dxgrid, 'CQ CQ CQ', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] CQ RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR) + helpers.add_to_heard_stations(dxcallsign, dxgrid, 'CQ CQ CQ', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) if static.RESPOND_TO_CQ: self.transmit_qrv() @@ -1589,7 +1667,7 @@ class DATA(): """ Called when we send a QRV frame Args: - data_in:bytes: + self Returns: Nothing @@ -1602,9 +1680,9 @@ class DATA(): static.INFO.append("QRV;SENDING") structlog.get_logger("structlog").info("[TNC] Sending QRV!") - qrv_frame = bytearray(14) - qrv_frame[:1] = bytes([201]) - qrv_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) + qrv_frame = bytearray(14) + qrv_frame[:1] = bytes([201]) + qrv_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) qrv_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("utf-8")) structlog.get_logger("structlog").info("[TNC] ENABLE FSK", state=static.ENABLE_FSK) @@ -1614,7 +1692,7 @@ class DATA(): else: self.enqueue_frame_for_tx(qrv_frame) - def received_qrv(self, data_in:bytes): + def received_qrv(self, data_in: bytes): """ Called when we receive a QRV frame Args: @@ -1627,16 +1705,20 @@ class DATA(): dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7])) dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "utf-8") - jsondata = {"type" : "qrv", "status" : "received", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)} + jsondata = {"type": "qrv", "status": "received", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()), + "mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), + "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)} json_data_out = json.dumps(jsondata) sock.SOCKET_QUEUE.put(json_data_out) static.INFO.append("QRV;RECEIVING") - structlog.get_logger("structlog").info("[TNC] QRV RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR) - helpers.add_to_heard_stations(dxcallsign,dxgrid, 'QRV', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY) + structlog.get_logger("structlog").info( + "[TNC] QRV RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR) + helpers.add_to_heard_stations(dxcallsign, dxgrid, 'QRV', static.SNR, static.FREQ_OFFSET, + static.HAMLIB_FREQUENCY) # ------------ CALUCLATE TRANSFER RATES - def calculate_transfer_rate_rx(self, rx_start_of_transmission:float, receivedbytes:int) -> list: + def calculate_transfer_rate_rx(self, rx_start_of_transmission: float, receivedbytes: int) -> list: """ Calculate transfer rate for received data Args: @@ -1651,7 +1733,8 @@ class DATA(): try: if static.TOTAL_BYTES == 0: static.TOTAL_BYTES = 1 - static.ARQ_TRANSMISSION_PERCENT = min(int((receivedbytes*static.ARQ_COMPRESSION_FACTOR / (static.TOTAL_BYTES)) * 100), 100) + static.ARQ_TRANSMISSION_PERCENT = min( + int((receivedbytes * static.ARQ_COMPRESSION_FACTOR / (static.TOTAL_BYTES)) * 100), 100) transmissiontime = time.time() - self.rx_start_of_transmission @@ -1678,13 +1761,14 @@ class DATA(): """ # reset ARQ statistics static.ARQ_BYTES_PER_MINUTE_BURST = 0 - static.ARQ_BYTES_PER_MINUTE = 0 - static.ARQ_BITS_PER_SECOND_BURST = 0 - static.ARQ_BITS_PER_SECOND = 0 - static.ARQ_TRANSMISSION_PERCENT = 0 - static.TOTAL_BYTES = 0 + static.ARQ_BYTES_PER_MINUTE = 0 + static.ARQ_BITS_PER_SECOND_BURST = 0 + static.ARQ_BITS_PER_SECOND = 0 + static.ARQ_TRANSMISSION_PERCENT = 0 + static.TOTAL_BYTES = 0 - def calculate_transfer_rate_tx(self, tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list: + def calculate_transfer_rate_tx(self, tx_start_of_transmission: float, sentbytes: int, + tx_buffer_length: int) -> list: """ Calcualte Transferrate for transmission Args: @@ -1701,8 +1785,8 @@ class DATA(): transmissiontime = time.time() - tx_start_of_transmission if sentbytes > 0: - static.ARQ_BITS_PER_SECOND = int((sentbytes * 8) / transmissiontime) # Bits per Second - static.ARQ_BYTES_PER_MINUTE = int((sentbytes) / (transmissiontime / 60)) # Bytes per Minute + static.ARQ_BITS_PER_SECOND = int((sentbytes * 8) / transmissiontime) # Bits per Second + static.ARQ_BYTES_PER_MINUTE = int((sentbytes) / (transmissiontime / 60)) # Bytes per Minute else: static.ARQ_BITS_PER_SECOND = 0 @@ -1734,12 +1818,12 @@ class DATA(): self.data_frame_ack_received = False static.RX_BURST_BUFFER = [] static.RX_FRAME_BUFFER = b'' - self.burst_ack_snr= 255 + self.burst_ack_snr = 255 # reset modem receiving state to reduce cpu load modem.RECEIVE_DATAC1 = False modem.RECEIVE_DATAC3 = False - #modem.RECEIVE_FSK_LDPC_0 = False + # modem.RECEIVE_FSK_LDPC_0 = False modem.RECEIVE_FSK_LDPC_1 = False # reset buffer overflow counter @@ -1766,7 +1850,7 @@ class DATA(): static.BEACON_PAUSE = False - def arq_reset_ack(self,state:bool): + def arq_reset_ack(self, state: bool): """ Funktion for resetting acknowledge states Args: @@ -1830,20 +1914,25 @@ class DATA(): DATA BURST """ # IRS SIDE + # TODO: We need to redesign this part for cleaner state handling if not static.ARQ_STATE or static.ARQ_SESSION_STATE != 'connected' or static.TNC_STATE != 'BUSY' or not self.is_IRS: - return - + # return only if not ARQ STATE and not ARQ SESSION STATE as they are different use cases + if not static.ARQ_STATE and static.ARQ_SESSION_STATE == 'disconnected': + return + # we want to reach this state only if connected ( == return above not called ) if self.data_channel_last_received + self.time_list[self.speed_level] > time.time(): # print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time()) pass else: - structlog.get_logger("structlog").warning("[TNC] Frame timeout", attempt=self.n_retries_per_burst, max_attempts=self.rx_n_max_retries_per_burst, speed_level=self.speed_level) + structlog.get_logger("structlog").warning("[TNC] Frame timeout", attempt=self.n_retries_per_burst, + max_attempts=self.rx_n_max_retries_per_burst, + speed_level=self.speed_level) self.frame_received_counter = 0 self.burst_nack_counter += 1 if self.burst_nack_counter >= 2: self.speed_level -= 1 - #print(self.burst_nack_counter) - #print(self.speed_level) + # print(self.burst_nack_counter) + # print(self.speed_level) static.ARQ_SPEED_LEVEL = self.speed_level self.burst_nack_counter = 0 if self.speed_level <= 0: @@ -1862,7 +1951,6 @@ class DATA(): self.stop_transmission() self.arq_cleanup() - def data_channel_keep_alive_watchdog(self): """ watchdog which checks if we are running into a connection timeout @@ -1873,11 +1961,12 @@ class DATA(): time.sleep(0.01) if self.data_channel_last_received + self.transmission_timeout > time.time(): time.sleep(0.01) - #print(self.data_channel_last_received + self.transmission_timeout - time.time()) - #pass + # print(self.data_channel_last_received + self.transmission_timeout - time.time()) + # pass else: self.data_channel_last_received = 0 - structlog.get_logger("structlog").info("[TNC] DATA [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]") + structlog.get_logger("structlog").info( + "[TNC] DATA [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]") static.INFO.append("ARQ;RECEIVING;FAILED") if not TESTMODE: self.arq_cleanup() @@ -1891,7 +1980,9 @@ class DATA(): if self.arq_session_last_received + self.arq_session_timeout > time.time(): time.sleep(0.01) else: - structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, 'utf-8') + "]") + structlog.get_logger("structlog").info( + "[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<>[" + str(static.DXCALLSIGN, + 'utf-8') + "]") static.INFO.append("ARQ;SESSION;TIMEOUT") self.close_session() @@ -1907,4 +1998,4 @@ class DATA(): time.sleep(2) def send_test_frame(self): - modem.MODEM_TRANSMIT_QUEUE.put([12,1,0,[bytearray(126)]]) + modem.MODEM_TRANSMIT_QUEUE.put([12, 1, 0, [bytearray(126)]])