first attempt of fixing #189

This commit is contained in:
dj2ls 2022-05-19 22:15:24 +02:00
parent a1c09c7fd9
commit 1f66f54f5f

View file

@ -8,9 +8,7 @@ Created on Sun Dec 27 20:43:40 2020
# pylint: disable=invalid-name, line-too-long, c-extension-no-member
# pylint: disable=import-outside-toplevel
import asyncio
import base64
import logging
import queue
import sys
import threading
@ -25,7 +23,6 @@ import ujson as json
import codec2
import helpers
import log_handler
import modem
import sock
import static
@ -36,8 +33,9 @@ DATA_QUEUE_TRANSMIT = queue.Queue()
DATA_QUEUE_RECEIVED = queue.Queue()
class DATA():
class DATA:
""" Terminal Node Controller for FreeDATA """
def __init__(self):
self.mycallsign = static.MYCALLSIGN # initial call sign. Will be overwritten later
@ -76,17 +74,17 @@ class DATA():
self.mode_list_low_bw = [14, 12]
self.time_list_low_bw = [3, 7]
self.mode_list_high_bw = [14, 12, 10] #201 = FSK mode list of available modes, each mode will be used 2times per speed level
self.time_list_high_bw = [3, 7, 8, 30] # list for time to wait for correspinding mode in seconds
self.mode_list_high_bw = [14, 12, 10] # mode list of available modes,each mode will be used 2 times per level
self.time_list_high_bw = [3, 7, 8, 30] # list for time to wait for corresponding mode in seconds
# mode list for selecting between low bandwith ( 500Hz ) and normal modes with higher bandwith
# mode list for selecting between low bandwidth ( 500Hz ) and normal modes with higher bandwidth
if static.LOW_BANDWITH_MODE:
self.mode_list = self.mode_list_low_bw # mode list of available modes, each mode will be used 2times per speed level
self.time_list = self.time_list_low_bw # list for time to wait for correspinding mode in seconds
self.time_list = self.time_list_low_bw # list for time to wait for corresponding mode in seconds
else:
self.mode_list = self.mode_list_high_bw # mode list of available modes, each mode will be used 2times per speed level
self.time_list = self.time_list_high_bw # list for time to wait for correspinding mode in seconds
self.time_list = self.time_list_high_bw # list for time to wait for corresponding mode in seconds
self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode
static.ARQ_SPEED_LEVEL = self.speed_level
@ -101,7 +99,8 @@ class DATA():
self.transmission_timeout = 360 # transmission timeout in seconds
worker_thread_transmit = threading.Thread(target=self.worker_transmit, name="worker thread transmit",daemon=True)
worker_thread_transmit = threading.Thread(target=self.worker_transmit, name="worker thread transmit",
daemon=True)
worker_thread_transmit.start()
worker_thread_receive = threading.Thread(target=self.worker_receive, name="worker thread receive", daemon=True)
@ -446,8 +445,6 @@ class DATA():
self.arq_file_transfer = True
RX_PAYLOAD_PER_MODEM_FRAME = bytes_per_frame - 2 # payload per moden frame
static.TNC_STATE = 'BUSY'
static.ARQ_STATE = True
static.INFO.append("ARQ;RECEIVING")
@ -468,7 +465,8 @@ class DATA():
structlog.get_logger("structlog").debug("[TNC] static.RX_BURST_BUFFER", buffer=static.RX_BURST_BUFFER)
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', snr, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', snr, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
# Check if we received all frames in the burst by checking if burst buffer has no more "Nones"
# This is the ideal case because we received all data
@ -503,7 +501,8 @@ class DATA():
if get_position >= 0:
static.RX_FRAME_BUFFER = static.RX_FRAME_BUFFER[:search_position + get_position]
static.RX_FRAME_BUFFER += temp_burst_buffer
structlog.get_logger("structlog").warning("[TNC] ARQ | RX | replacing existing buffer data", area=search_area, pos=get_position)
structlog.get_logger("structlog").warning("[TNC] ARQ | RX | replacing existing buffer data",
area=search_area, pos=get_position)
# if we dont find data n this range, we really have new data and going to replace it
else:
static.RX_FRAME_BUFFER += temp_burst_buffer
@ -540,13 +539,15 @@ class DATA():
# Check if we received last frame of burst - this is an indicator for missed frames.
# With this way of doing this, we always MUST receive the last frame of a burst otherwise the entire
# burst is lost
structlog.get_logger("structlog").debug("[TNC] all frames in burst received:", frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST)
structlog.get_logger("structlog").debug("[TNC] all frames in burst received:", frame=RX_N_FRAME_OF_BURST,
frames=RX_N_FRAMES_PER_BURST)
self.send_retransmit_request_frame(freedv)
self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER))
# Should never reach this point
else:
structlog.get_logger("structlog").error("[TNC] data_handler: Should not reach this point...", frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST)
structlog.get_logger("structlog").error("[TNC] data_handler: Should not reach this point...",
frame=RX_N_FRAME_OF_BURST, frames=RX_N_FRAMES_PER_BURST)
# We have a BOF and EOF flag in our data. If we received both we received our frame.
# In case of loosing data but we received already a BOF and EOF we need to make sure, we
@ -556,7 +557,6 @@ class DATA():
# get total bytes per transmission information as soon we recevied a frame with a BOF
if bof_position >= 0:
payload = static.RX_FRAME_BUFFER[bof_position + len(self.data_frame_bof):eof_position]
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes
static.TOTAL_BYTES = frame_length
@ -566,7 +566,8 @@ class DATA():
self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER))
if bof_position >= 0 and eof_position > 0 and None not in static.RX_BURST_BUFFER:
structlog.get_logger("structlog").debug("[TNC] arq_data_received:", bof_position=bof_position, eof_position=eof_position)
structlog.get_logger("structlog").debug("[TNC] arq_data_received:", bof_position=bof_position,
eof_position=eof_position)
# print(f"bof_position {bof_position} / eof_position {eof_position}")
self.rx_frame_bof_received = True
self.rx_frame_eof_received = True
@ -606,24 +607,31 @@ class DATA():
# Re-code data_frame in base64, UTF-8 for JSON UI communication.
base64_data = base64.b64encode(data_frame).decode("utf-8")
static.RX_BUFFER.append([uniqueid, timestamp, static.DXCALLSIGN, static.DXGRID, base64_data])
jsondata = {"arq":"received", "uuid" : uniqueid, "timestamp": timestamp, "mycallsign" : str(mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), "dxgrid": str(static.DXGRID, 'utf-8'), "data": base64_data}
jsondata = {"arq": "received", "uuid": uniqueid, "timestamp": timestamp,
"mycallsign": str(mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'),
"dxgrid": str(static.DXGRID, 'utf-8'), "data": base64_data}
json_data_out = json.dumps(jsondata)
structlog.get_logger("structlog").debug("[TNC] arq_data_received:", jsondata=jsondata)
# print(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
static.INFO.append("ARQ;RECEIVING;SUCCESS")
structlog.get_logger("structlog").info("[TNC] ARQ | RX | SENDING DATA FRAME ACK", snr=snr, crc=data_frame_crc.hex())
structlog.get_logger("structlog").info("[TNC] ARQ | RX | SENDING DATA FRAME ACK", snr=snr,
crc=data_frame_crc.hex())
self.send_data_ack_frame(snr)
# update our statistics AFTER the frame ACK
self.calculate_transfer_rate_rx(self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER))
structlog.get_logger("structlog").info("[TNC] | RX | DATACHANNEL [" +
str(self.mycallsign, 'utf-8') + "]<< >>[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=snr)
str(self.mycallsign, 'utf-8') + "]<< >>[" + str(
static.DXCALLSIGN, 'utf-8') + "]", snr=snr)
else:
static.INFO.append("ARQ;RECEIVING;FAILED")
structlog.get_logger("structlog").warning("[TNC] ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!", e="wrong crc", expected=data_frame_crc, received=data_frame_crc_received, overflows=static.BUFFER_OVERFLOW_COUNTER)
structlog.get_logger("structlog").warning("[TNC] ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!",
e="wrong crc", expected=data_frame_crc,
received=data_frame_crc_received,
overflows=static.BUFFER_OVERFLOW_COUNTER)
structlog.get_logger("structlog").info("[TNC] ARQ | RX | Sending NACK")
self.send_burst_nack_frame(snr)
@ -669,7 +677,8 @@ class DATA():
frame_total_size = len(data_out).to_bytes(4, byteorder='big')
static.INFO.append("ARQ;TRANSMITTING")
jsondata = {"arq":"transmission", "status" :"transmitting", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "transmitting", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
@ -734,7 +743,8 @@ class DATA():
static.ARQ_SPEED_LEVEL = self.speed_level
data_mode = self.mode_list[self.speed_level]
structlog.get_logger("structlog").debug("[TNC] Speed-level:", level=self.speed_level, retry=self.tx_n_retry_of_burst, mode=data_mode)
structlog.get_logger("structlog").debug("[TNC] Speed-level:", level=self.speed_level,
retry=self.tx_n_retry_of_burst, mode=data_mode)
# payload information
payload_per_frame = modem.get_bytes_per_frame(data_mode) - 2
@ -773,7 +783,8 @@ class DATA():
tempbuffer.append(frame)
structlog.get_logger("structlog").debug("[TNC] tempbuffer:", tempbuffer=tempbuffer)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | FRAMES", mode=data_mode, fpb=TX_N_FRAMES_PER_BURST, retry=self.tx_n_retry_of_burst)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | FRAMES", mode=data_mode,
fpb=TX_N_FRAMES_PER_BURST, retry=self.tx_n_retry_of_burst)
# we need to set our TRANSMITTING flag before we are adding an object the transmit queue
# this is not that nice, we could improve this somehow
@ -820,7 +831,9 @@ class DATA():
self.calculate_transfer_rate_tx(tx_start_of_transmission, bufferposition_end, len(data_out))
# NEXT ATTEMPT
structlog.get_logger("structlog").debug("[TNC] ATTEMPT:", retry=self.tx_n_retry_of_burst, maxretries=TX_N_MAX_RETRIES_PER_BURST, overflows=static.BUFFER_OVERFLOW_COUNTER)
structlog.get_logger("structlog").debug("[TNC] ATTEMPT:", retry=self.tx_n_retry_of_burst,
maxretries=TX_N_MAX_RETRIES_PER_BURST,
overflows=static.BUFFER_OVERFLOW_COUNTER)
# update buffer position
bufferposition = bufferposition_end
@ -828,7 +841,8 @@ class DATA():
# update stats
self.calculate_transfer_rate_tx(tx_start_of_transmission, bufferposition_end, len(data_out))
jsondata = {"arq":"transmission", "status" :"transmitting", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "transmitting", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
@ -836,19 +850,25 @@ class DATA():
if self.data_frame_ack_received:
static.INFO.append("ARQ;TRANSMITTING;SUCCESS")
jsondata = {"arq":"transmission", "status" :"success", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "success", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | DATA TRANSMITTED!", BytesPerMinute=static.ARQ_BYTES_PER_MINUTE, BitsPerSecond=static.ARQ_BITS_PER_SECOND, overflows=static.BUFFER_OVERFLOW_COUNTER)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | DATA TRANSMITTED!",
BytesPerMinute=static.ARQ_BYTES_PER_MINUTE,
BitsPerSecond=static.ARQ_BITS_PER_SECOND,
overflows=static.BUFFER_OVERFLOW_COUNTER)
else:
static.INFO.append("ARQ;TRANSMITTING;FAILED")
jsondata = {"arq":"transmission", "status" :"failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | TRANSMISSION FAILED OR TIME OUT!", overflows=static.BUFFER_OVERFLOW_COUNTER)
structlog.get_logger("structlog").info("[TNC] ARQ | TX | TRANSMISSION FAILED OR TIME OUT!",
overflows=static.BUFFER_OVERFLOW_COUNTER)
self.stop_transmission()
# and last but not least doing a state cleanup
@ -876,7 +896,8 @@ class DATA():
# only process data if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.burst_ack = True # Force data loops of TNC to stop and continue with next frame
self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp
self.burst_ack_snr = int.from_bytes(bytes(data_in[5:6]), "big")
@ -906,11 +927,12 @@ class DATA():
# only process data if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.burst_nack = True # Force data loops of TNC to stop and continue with next frame
self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp
self.burst_ack_snr= int.from_bytes(bytes(data_in[5:6]), "big")
self.speed_level= int.from_bytes(bytes(data_in[6:7]), "big")
self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big")
self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big")
static.ARQ_SPEED_LEVEL = self.speed_level
self.burst_nack_counter += 1
structlog.get_logger("structlog").debug("[TNC] burst_nack_received:", speed_level=self.speed_level)
@ -920,7 +942,8 @@ class DATA():
""" """
# only process data if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.data_frame_ack_received = True # Force data loops of TNC to stop and continue with next frame
self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp
self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp
@ -934,9 +957,11 @@ class DATA():
Returns:
"""
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
static.INFO.append("ARQ;TRANSMITTING;FAILED")
jsondata = {"arq":"transmission", "status" : "failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT, "bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp
@ -955,7 +980,8 @@ class DATA():
"""
# only process data if we are in ARQ and BUSY state
if static.ARQ_STATE and static.TNC_STATE == 'BUSY':
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.rpt_request_received = True
self.data_channel_last_received = int(time.time()) # we need to update our timeout timestamp
@ -982,7 +1008,9 @@ class DATA():
"""
# das hier müssen wir checken. Sollte vielleicht in INIT!!!
self.datachannel_timeout = False
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE)
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]",
state=static.ARQ_SESSION_STATE)
self.open_session(callsign)
@ -1019,7 +1047,10 @@ class DATA():
while not static.ARQ_SESSION:
time.sleep(0.01)
for attempt in range(1, self.session_connect_max_retries + 1):
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>?<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", a=attempt, state=static.ARQ_SESSION_STATE)
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>?<<[" + str(static.DXCALLSIGN,
'utf-8') + "]", a=attempt,
state=static.ARQ_SESSION_STATE)
self.enqueue_frame_for_tx(connection_frame)
@ -1057,8 +1088,11 @@ class DATA():
static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13]))
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]",
state=static.ARQ_SESSION_STATE)
static.ARQ_SESSION = True
static.TNC_STATE = 'BUSY'
@ -1067,8 +1101,11 @@ class DATA():
def close_session(self):
""" Close the ARQ session """
static.ARQ_SESSION_STATE = 'disconnecting'
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<X>>[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<X>>[" + str(static.DXCALLSIGN, 'utf-8') + "]",
state=static.ARQ_SESSION_STATE)
static.INFO.append("ARQ;SESSION;CLOSE")
self.IS_ARQ_SESSION_MASTER = False
static.ARQ_SESSION = False
@ -1091,8 +1128,11 @@ class DATA():
_valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7]))
if _valid_crc:
static.ARQ_SESSION_STATE = 'disconnected'
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<X>>[" + str(static.DXCALLSIGN, 'utf-8') + "]", state=static.ARQ_SESSION_STATE)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<X>>[" + str(static.DXCALLSIGN, 'utf-8') + "]",
state=static.ARQ_SESSION_STATE)
static.INFO.append("ARQ;SESSION;CLOSE")
self.IS_ARQ_SESSION_MASTER = False
@ -1125,7 +1165,8 @@ class DATA():
_valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7]))
if _valid_crc:
structlog.get_logger("structlog").debug("[TNC] Received session heartbeat")
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'SESSION-HB', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
self.arq_session_last_received = int(time.time()) # we need to update our timeout timestamp
@ -1139,7 +1180,8 @@ class DATA():
# ############################################################################################################
# ARQ DATA CHANNEL HANDLER
# ############################################################################################################
def open_dc_and_transmit(self, data_out:bytes, mode:int, n_frames_per_burst:int, transmission_uuid:str, mycallsign):
def open_dc_and_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int, transmission_uuid: str,
mycallsign):
"""
Args:
@ -1216,7 +1258,10 @@ class DATA():
time.sleep(0.01)
for attempt in range(1, self.data_channel_max_retries + 1):
static.INFO.append("DATACHANNEL;OPENING")
structlog.get_logger("structlog").info("[TNC] ARQ | DATA | TX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", attempt=f"{str(attempt)}/{str(self.data_channel_max_retries)}")
structlog.get_logger("structlog").info(
"[TNC] ARQ | DATA | TX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN,
'utf-8') + "]",
attempt=f"{str(attempt)}/{str(self.data_channel_max_retries)}")
self.enqueue_frame_for_tx(connection_frame)
@ -1232,13 +1277,18 @@ class DATA():
if attempt == self.data_channel_max_retries:
static.INFO.append("DATACHANNEL;FAILED")
structlog.get_logger("structlog").debug("[TNC] arq_open_data_channel:", transmission_uuid=self.transmission_uuid)
structlog.get_logger("structlog").debug("[TNC] arq_open_data_channel:",
transmission_uuid=self.transmission_uuid)
# print(self.transmission_uuid)
jsondata = {"arq":"transmission", "status" :"failed", "uuid" : self.transmission_uuid, "percent" : static.ARQ_TRANSMISSION_PERCENT, "bytesperminute" : static.ARQ_BYTES_PER_MINUTE}
jsondata = {"arq": "transmission", "status": "failed", "uuid": self.transmission_uuid,
"percent": static.ARQ_TRANSMISSION_PERCENT,
"bytesperminute": static.ARQ_BYTES_PER_MINUTE}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
structlog.get_logger("structlog").warning("[TNC] ARQ | TX | DATA [" + str(mycallsign, 'utf-8') + "]>>X<<[" + str(static.DXCALLSIGN, 'utf-8') + "]")
structlog.get_logger("structlog").warning(
"[TNC] ARQ | TX | DATA [" + str(mycallsign, 'utf-8') + "]>>X<<[" + str(static.DXCALLSIGN,
'utf-8') + "]")
self.datachannel_timeout = True
if not TESTMODE:
self.arq_cleanup()
@ -1284,7 +1334,8 @@ class DATA():
# updated modes we are listening to
self.set_listening_modes(self.mode_list[self.speed_level])
helpers.add_to_heard_stations(static.DXCALLSIGN,static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
# check if callsign ssid override
valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4])
@ -1294,7 +1345,9 @@ class DATA():
self.arq_cleanup()
return
structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]", bandwith="wide")
structlog.get_logger("structlog").info(
"[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>> <<[" + str(static.DXCALLSIGN, 'utf-8') + "]",
bandwith="wide")
static.ARQ_STATE = True
static.TNC_STATE = 'BUSY'
@ -1318,11 +1371,16 @@ class DATA():
self.enqueue_frame_for_tx(connection_frame)
structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", bandwith="wide", snr=static.SNR)
structlog.get_logger("structlog").info(
"[TNC] ARQ | DATA | RX | [" + str(mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]",
bandwith="wide", snr=static.SNR)
# set start of transmission for our statistics
self.rx_start_of_transmission = time.time()
# reset our data channel watchdog
self.data_channel_last_received = int(time.time())
def arq_received_channel_is_open(self, data_in: bytes):
"""
Called if we received a data channel opener
@ -1350,9 +1408,13 @@ class DATA():
self.speed_level = len(self.mode_list) - 1
structlog.get_logger("structlog").debug("[TNC] high bandwidth mode", modes=self.mode_list)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'DATA-CHANNEL', static.SNR,
static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info("[TNC] ARQ | DATA | TX | [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR)
structlog.get_logger("structlog").info(
"[TNC] ARQ | DATA | TX | [" + str(self.mycallsign, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN,
'utf-8') + "]",
snr=static.SNR)
# as soon as we set ARQ_STATE to DATA, transmission starts
static.ARQ_STATE = True
@ -1361,7 +1423,8 @@ class DATA():
static.TNC_STATE = 'IDLE'
static.ARQ_STATE = False
static.INFO.append("PROTOCOL;VERSION_MISMATCH")
structlog.get_logger("structlog").warning("[TNC] protocol version mismatch:", received=protocol_version, own=static.ARQ_PROTOCOL_VERSION)
structlog.get_logger("structlog").warning("[TNC] protocol version mismatch:", received=protocol_version,
own=static.ARQ_PROTOCOL_VERSION)
self.arq_cleanup()
# ---------- PING
@ -1378,7 +1441,8 @@ class DATA():
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
static.INFO.append("PING;SENDING")
structlog.get_logger("structlog").info("[TNC] PING REQ [" + str(self.mycallsign, 'utf-8') + "] >>> [" + str(static.DXCALLSIGN, 'utf-8') + "]" )
structlog.get_logger("structlog").info(
"[TNC] PING REQ [" + str(self.mycallsign, 'utf-8') + "] >>> [" + str(static.DXCALLSIGN, 'utf-8') + "]")
ping_frame = bytearray(14)
ping_frame[:1] = bytes([210])
@ -1404,7 +1468,8 @@ class DATA():
"""
static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13]))
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
static.INFO.append("PING;RECEIVING")
@ -1416,7 +1481,9 @@ class DATA():
# print("ping not for me...")
return
structlog.get_logger("structlog").info("[TNC] PING REQ [" + str(mycallsign, 'utf-8') + "] <<< [" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR )
structlog.get_logger("structlog").info(
"[TNC] PING REQ [" + str(mycallsign, 'utf-8') + "] <<< [" + str(static.DXCALLSIGN, 'utf-8') + "]",
snr=static.SNR)
ping_frame = bytearray(14)
ping_frame[:1] = bytes([211])
@ -1442,15 +1509,20 @@ class DATA():
static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXGRID = bytes(data_in[7:13]).rstrip(b'\x00')
jsondata = {"type" : "ping", "status" : "ack", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'), "dxgrid": str(static.DXGRID, 'utf-8'), "snr": str(static.SNR)}
jsondata = {"type": "ping", "status": "ack", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()),
"mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(static.DXCALLSIGN, 'utf-8'),
"dxgrid": str(static.DXGRID, 'utf-8'), "snr": str(static.SNR)}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING-ACK', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
helpers.add_to_heard_stations(static.DXCALLSIGN, static.DXGRID, 'PING-ACK', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
static.INFO.append("PING;RECEIVEDACK")
structlog.get_logger("structlog").info("[TNC] PING ACK [" + str(self.mycallsign, 'utf-8') + "] >|< [" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR )
structlog.get_logger("structlog").info(
"[TNC] PING ACK [" + str(self.mycallsign, 'utf-8') + "] >|< [" + str(static.DXCALLSIGN, 'utf-8') + "]",
snr=static.SNR)
static.TNC_STATE = 'IDLE'
def stop_transmission(self):
@ -1484,9 +1556,9 @@ class DATA():
# ----------- BROADCASTS
def run_beacon(self):
"""
Controlling funktion for running a beacon
Controlling function for running a beacon
Args:
interval:int:
self: arq class
Returns:
@ -1531,19 +1603,23 @@ class DATA():
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
dxgrid = bytes(data_in[9:13]).rstrip(b'\x00')
jsondata = {"type" : "beacon", "status" : "received", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)}
jsondata = {"type": "beacon", "status": "received", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()),
"mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'),
"dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
static.INFO.append("BEACON;RECEIVING")
structlog.get_logger("structlog").info("[TNC] BEACON RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign,dxgrid, 'BEACON', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] BEACON RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign, dxgrid, 'BEACON', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
def transmit_cq(self):
"""
Transmit a CQ
Args:
Nothing
self
Returns:
Nothing
@ -1579,8 +1655,10 @@ class DATA():
# print(dxcallsign)
dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "utf-8")
static.INFO.append("CQ;RECEIVING")
structlog.get_logger("structlog").info("[TNC] CQ RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign, dxgrid, 'CQ CQ CQ', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] CQ RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign, dxgrid, 'CQ CQ CQ', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
if static.RESPOND_TO_CQ:
self.transmit_qrv()
@ -1589,7 +1667,7 @@ class DATA():
"""
Called when we send a QRV frame
Args:
data_in:bytes:
self
Returns:
Nothing
@ -1627,13 +1705,17 @@ class DATA():
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "utf-8")
jsondata = {"type" : "qrv", "status" : "received", "uuid" : str(uuid.uuid4()), "timestamp": int(time.time()), "mycallsign" : str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'), "dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)}
jsondata = {"type": "qrv", "status": "received", "uuid": str(uuid.uuid4()), "timestamp": int(time.time()),
"mycallsign": str(self.mycallsign, 'utf-8'), "dxcallsign": str(dxcallsign, 'utf-8'),
"dxgrid": str(dxgrid, 'utf-8'), "snr": str(static.SNR)}
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
static.INFO.append("QRV;RECEIVING")
structlog.get_logger("structlog").info("[TNC] QRV RCVD [" + str(dxcallsign, 'utf-8') + "]["+ str(dxgrid, 'utf-8') +"] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign,dxgrid, 'QRV', static.SNR, static.FREQ_OFFSET, static.HAMLIB_FREQUENCY)
structlog.get_logger("structlog").info(
"[TNC] QRV RCVD [" + str(dxcallsign, 'utf-8') + "][" + str(dxgrid, 'utf-8') + "] ", snr=static.SNR)
helpers.add_to_heard_stations(dxcallsign, dxgrid, 'QRV', static.SNR, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY)
# ------------ CALUCLATE TRANSFER RATES
def calculate_transfer_rate_rx(self, rx_start_of_transmission: float, receivedbytes: int) -> list:
@ -1651,7 +1733,8 @@ class DATA():
try:
if static.TOTAL_BYTES == 0:
static.TOTAL_BYTES = 1
static.ARQ_TRANSMISSION_PERCENT = min(int((receivedbytes*static.ARQ_COMPRESSION_FACTOR / (static.TOTAL_BYTES)) * 100), 100)
static.ARQ_TRANSMISSION_PERCENT = min(
int((receivedbytes * static.ARQ_COMPRESSION_FACTOR / (static.TOTAL_BYTES)) * 100), 100)
transmissiontime = time.time() - self.rx_start_of_transmission
@ -1684,7 +1767,8 @@ class DATA():
static.ARQ_TRANSMISSION_PERCENT = 0
static.TOTAL_BYTES = 0
def calculate_transfer_rate_tx(self, tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list:
def calculate_transfer_rate_tx(self, tx_start_of_transmission: float, sentbytes: int,
tx_buffer_length: int) -> list:
"""
Calcualte Transferrate for transmission
Args:
@ -1830,14 +1914,19 @@ class DATA():
DATA BURST
"""
# IRS SIDE
# TODO: We need to redesign this part for cleaner state handling
if not static.ARQ_STATE or static.ARQ_SESSION_STATE != 'connected' or static.TNC_STATE != 'BUSY' or not self.is_IRS:
# return only if not ARQ STATE and not ARQ SESSION STATE as they are different use cases
if not static.ARQ_STATE and static.ARQ_SESSION_STATE == 'disconnected':
return
# we want to reach this state only if connected ( == return above not called )
if self.data_channel_last_received + self.time_list[self.speed_level] > time.time():
# print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time())
pass
else:
structlog.get_logger("structlog").warning("[TNC] Frame timeout", attempt=self.n_retries_per_burst, max_attempts=self.rx_n_max_retries_per_burst, speed_level=self.speed_level)
structlog.get_logger("structlog").warning("[TNC] Frame timeout", attempt=self.n_retries_per_burst,
max_attempts=self.rx_n_max_retries_per_burst,
speed_level=self.speed_level)
self.frame_received_counter = 0
self.burst_nack_counter += 1
if self.burst_nack_counter >= 2:
@ -1862,7 +1951,6 @@ class DATA():
self.stop_transmission()
self.arq_cleanup()
def data_channel_keep_alive_watchdog(self):
"""
watchdog which checks if we are running into a connection timeout
@ -1877,7 +1965,8 @@ class DATA():
# pass
else:
self.data_channel_last_received = 0
structlog.get_logger("structlog").info("[TNC] DATA [" + str(self.mycallsign, 'utf-8') + "]<<T>>[" + str(static.DXCALLSIGN, 'utf-8') + "]")
structlog.get_logger("structlog").info(
"[TNC] DATA [" + str(self.mycallsign, 'utf-8') + "]<<T>>[" + str(static.DXCALLSIGN, 'utf-8') + "]")
static.INFO.append("ARQ;RECEIVING;FAILED")
if not TESTMODE:
self.arq_cleanup()
@ -1891,7 +1980,9 @@ class DATA():
if self.arq_session_last_received + self.arq_session_timeout > time.time():
time.sleep(0.01)
else:
structlog.get_logger("structlog").info("[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<T>>[" + str(static.DXCALLSIGN, 'utf-8') + "]")
structlog.get_logger("structlog").info(
"[TNC] SESSION [" + str(self.mycallsign, 'utf-8') + "]<<T>>[" + str(static.DXCALLSIGN,
'utf-8') + "]")
static.INFO.append("ARQ;SESSION;TIMEOUT")
self.close_session()