FreeDATA/tnc/data_handler.py
2022-10-12 07:40:39 +02:00

2654 lines
97 KiB
Python

# -*- coding: UTF-8 -*-
"""
Created on Sun Dec 27 20:43:40 2020
@author: DJ2LS
"""
# pylint: disable=invalid-name, line-too-long, c-extension-no-member
# pylint: disable=import-outside-toplevel, attribute-defined-outside-init
import base64
import sys
import threading
import time
import uuid
import zlib
from random import randrange, randbytes
import codec2
import helpers
import modem
import numpy as np
import sock
import static
import structlog
import ujson as json
from codec2 import FREEDV_MODE
from exceptions import NoCallsign
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, RX_BUFFER
from static import FRAME_TYPE as FR_TYPE
TESTMODE = False
class DATA:
"""Terminal Node Controller for FreeDATA"""
log = structlog.get_logger("DATA")
def __init__(self) -> None:
# Initial call sign. Will be overwritten later
self.mycallsign = static.MYCALLSIGN
self.data_queue_transmit = DATA_QUEUE_TRANSMIT
self.data_queue_received = DATA_QUEUE_RECEIVED
# length of signalling frame
self.length_sig0_frame = 14
self.length_sig1_frame = 14
# hold session id
self.session_id = bytes(1)
# ------- ARQ SESSION
self.arq_file_transfer = False
self.IS_ARQ_SESSION_MASTER = False
self.arq_session_last_received = 0
self.arq_session_timeout = 30
self.session_connect_max_retries = 15
# actual n retries of burst
self.tx_n_retry_of_burst = 0
self.transmission_uuid = ""
self.data_channel_last_received = 0.0 # time of last "live sign" of a frame
self.burst_ack_snr = 0 # SNR from received burst ack frames
# Flag to indicate if we received an acknowledge frame for a burst
self.burst_ack = False
# Flag to indicate if we received an acknowledge frame for a data frame
self.data_frame_ack_received = False
# Flag to indicate if we received an request for repeater frames
self.rpt_request_received = False
self.rpt_request_buffer = [] # requested frames, saved in a list
self.rx_start_of_transmission = 0 # time of transmission start
# 3 bytes for the BOF Beginning of File indicator in a data frame
self.data_frame_bof = b"BOF"
# 3 bytes for the EOF End of File indicator in a data frame
self.data_frame_eof = b"EOF"
self.tx_n_max_retries_per_burst = 50
self.rx_n_max_retries_per_burst = 50
self.n_retries_per_burst = 0
# Flag to indicate if we recevied a low bandwidth mode channel opener
self.received_LOW_BANDWIDTH_MODE = False
self.data_channel_max_retries = 15
self.datachannel_timeout = False
# List of codec2 modes to use in "low bandwidth" mode.
self.mode_list_low_bw = [
FREEDV_MODE.datac3.value,
]
# List for minimum SNR operating level for the corresponding mode in self.mode_list
self.snr_list_low_bw = [0]
# List for time to wait for corresponding mode in seconds
self.time_list_low_bw = [8]
# List of codec2 modes to use in "high bandwidth" mode.
self.mode_list_high_bw = [
FREEDV_MODE.datac3.value,
FREEDV_MODE.datac1.value,
]
# List for minimum SNR operating level for the corresponding mode in self.mode_list
self.snr_list_high_bw = [0, 5]
# List for time to wait for corresponding mode in seconds
self.time_list_high_bw = [8, 9]
# Mode list for selecting between low bandwidth ( 500Hz ) and modes with higher bandwidth
# but ability to fall back to low bandwidth modes if needed.
if static.LOW_BANDWIDTH_MODE:
# List of codec2 modes to use in "low bandwidth" mode.
self.mode_list = self.mode_list_low_bw
# list of times to wait for corresponding mode in seconds
self.time_list = self.time_list_low_bw
else:
# List of codec2 modes to use in "high bandwidth" mode.
self.mode_list = self.mode_list_high_bw
# list of times to wait for corresponding mode in seconds
self.time_list = self.time_list_high_bw
self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode
static.ARQ_SPEED_LEVEL = self.speed_level
self.is_IRS = False
self.burst_nack = False
self.burst_nack_counter = 0
self.frame_received_counter = 0
self.rx_frame_bof_received = False
self.rx_frame_eof_received = False
# TIMEOUTS
self.burst_ack_timeout_seconds = 3.0 # timeout for burst acknowledges
self.data_frame_ack_timeout_seconds = 3.0 # timeout for data frame acknowledges
self.rpt_ack_timeout_seconds = 3.0 # timeout for rpt frame acknowledges
self.transmission_timeout = 500 # transmission timeout in seconds
# Dictionary of functions and log messages used in process_data
# instead of a long series of if-elif-else statements.
self.rx_dispatcher = {
FR_TYPE.ARQ_DC_OPEN_ACK_N.value: (
self.arq_received_channel_is_open,
"ARQ OPEN ACK (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_ACK_W.value: (
self.arq_received_channel_is_open,
"ARQ OPEN ACK (Wide)",
),
FR_TYPE.ARQ_DC_OPEN_N.value: (
self.arq_received_data_channel_opener,
"ARQ Data Channel Open (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_W.value: (
self.arq_received_data_channel_opener,
"ARQ Data Channel Open (Wide)",
),
FR_TYPE.ARQ_SESSION_CLOSE.value: (
self.received_session_close,
"ARQ CLOSE SESSION",
),
FR_TYPE.ARQ_SESSION_HB.value: (
self.received_session_heartbeat,
"ARQ HEARTBEAT",
),
FR_TYPE.ARQ_SESSION_OPEN.value: (
self.received_session_opener,
"ARQ OPEN SESSION",
),
FR_TYPE.ARQ_STOP.value: (self.received_stop_transmission, "ARQ STOP TX"),
FR_TYPE.BEACON.value: (self.received_beacon, "BEACON"),
FR_TYPE.BURST_ACK.value: (self.burst_ack_nack_received, "BURST ACK"),
FR_TYPE.BURST_NACK.value: (self.burst_ack_nack_received, "BURST NACK"),
FR_TYPE.CQ.value: (self.received_cq, "CQ"),
FR_TYPE.FR_ACK.value: (self.frame_ack_received, "FRAME ACK"),
FR_TYPE.FR_NACK.value: (self.frame_nack_received, "FRAME NACK"),
FR_TYPE.FR_REPEAT.value: (self.burst_rpt_received, "REPEAT REQUEST"),
FR_TYPE.PING_ACK.value: (self.received_ping_ack, "PING ACK"),
FR_TYPE.PING.value: (self.received_ping, "PING"),
FR_TYPE.QRV.value: (self.received_qrv, "QRV"),
}
self.command_dispatcher = {
"CONNECT": (self.arq_session_handler, "CONNECT"),
"CQ": (self.transmit_cq, "CQ"),
"DISCONNECT": (self.close_session, "DISCONNECT"),
"SEND_TEST_FRAME": (self.send_test_frame, "TEST"),
"STOP": (self.stop_transmission, "STOP"),
}
# Start worker and watchdog threads
worker_thread_transmit = threading.Thread(
target=self.worker_transmit, name="worker thread transmit", daemon=True
)
worker_thread_transmit.start()
worker_thread_receive = threading.Thread(
target=self.worker_receive, name="worker thread receive", daemon=True
)
worker_thread_receive.start()
# START THE THREAD FOR THE TIMEOUT WATCHDOG
watchdog_thread = threading.Thread(
target=self.watchdog, name="watchdog", daemon=True
)
watchdog_thread.start()
arq_session_thread = threading.Thread(
target=self.heartbeat, name="watchdog", daemon=True
)
arq_session_thread.start()
self.beacon_interval = 0
self.beacon_thread = threading.Thread(
target=self.run_beacon, name="watchdog", daemon=True
)
self.beacon_thread.start()
def worker_transmit(self) -> None:
"""Dispatch incoming UI instructions for transmitting operations"""
while True:
data = self.data_queue_transmit.get()
# Dispatch commands known to command_dispatcher
if data[0] in self.command_dispatcher:
self.log.debug(f"[TNC] TX {self.command_dispatcher[data[0]][1]}...")
self.command_dispatcher[data[0]][0]()
# Dispatch commands that need more arguments.
elif data[0] == "PING":
# [1] dxcallsign
self.transmit_ping(data[1])
elif data[0] == "BEACON":
# [1] INTERVAL int
# [2] STATE bool
if data[2]:
self.beacon_interval = data[1]
static.BEACON_STATE = True
else:
static.BEACON_STATE = False
elif data[0] == "ARQ_RAW":
# [1] DATA_OUT bytes
# [2] MODE int
# [3] N_FRAMES_PER_BURST int
# [4] self.transmission_uuid str
# [5] mycallsign with ssid
self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5])
else:
self.log.error(
"[TNC] worker_transmit: received invalid command:", data=data
)
def worker_receive(self) -> None:
"""Queue received data for processing"""
while True:
data = self.data_queue_received.get()
# [0] bytes
# [1] freedv instance
# [2] bytes_per_frame
self.process_data(
bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2]
)
def process_data(self, bytes_out, freedv, bytes_per_frame: int) -> None:
"""
Process incoming data and decide what to do with the frame.
Args:
bytes_out:
freedv:
bytes_per_frame:
Returns:
"""
self.log.debug(
"[TNC] process_data:", n_retries_per_burst=self.n_retries_per_burst
)
# Process data only if broadcast or we are the receiver
# bytes_out[1:4] == callsign check for signalling frames,
# bytes_out[2:5] == transmission
# we could also create an own function, which returns True.
frametype = int.from_bytes(bytes(bytes_out[:1]), "big")
# check for callsign CRC
_valid1, _ = helpers.check_callsign(self.mycallsign, bytes(bytes_out[1:4]))
_valid2, _ = helpers.check_callsign(self.mycallsign, bytes(bytes_out[2:5]))
# check for session ID
# signalling frames
_valid3 = helpers.check_session_id(self.session_id, bytes(bytes_out[1:2]))
# arq data frames
_valid4 = helpers.check_session_id(self.session_id, bytes(bytes_out[2:3]))
if (
_valid1
or _valid2
or _valid3
or _valid4
or frametype
in [
FR_TYPE.CQ.value,
FR_TYPE.QRV.value,
FR_TYPE.PING.value,
FR_TYPE.BEACON.value,
]
):
# CHECK IF FRAMETYPE IS BETWEEN 10 and 50 ------------------------
# frame = frametype - 10
# n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big")
# Dispatch activity based on received frametype
if frametype in self.rx_dispatcher:
# Process frames "known" by rx_dispatcher
self.log.debug(f"[TNC] {self.rx_dispatcher[frametype][1]} RECEIVED....")
self.rx_dispatcher[frametype][0](bytes_out[:-2])
# Process frametypes requiring a different set of arguments.
elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
# get snr of received data
# FIXME: find a fix for this - after moving to classes, this no longer works
# snr = self.calculate_snr(freedv)
snr = static.SNR
self.log.debug("[TNC] RX SNR", snr=snr)
# send payload data to arq checker without CRC16
self.arq_data_received(
bytes(bytes_out[:-2]), bytes_per_frame, snr, freedv
)
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
# if static.RX_BURST_BUFFER.count(None) <= 1 or (frame+1) == n_frames_per_burst:
# self.log.debug(f"[TNC] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
# self.c_lib.freedv_set_sync(freedv, 0)
# TESTFRAMES
elif frametype == FR_TYPE.TEST_FRAME.value:
self.log.debug("[TNC] TESTFRAME RECEIVED", frame=bytes_out[:])
# Unknown frame type
else:
self.log.warning(
"[TNC] ARQ - other frame type", frametype=FR_TYPE(frametype).name
)
else:
# for debugging purposes to receive all data
self.log.debug(
"[TNC] Foreign frame received",
frame=bytes_out[:-2].hex(),
frame_type=FR_TYPE(int.from_bytes(bytes_out[:1], byteorder="big")).name,
)
def enqueue_frame_for_tx(
self,
frame_to_tx: bytearray,
c2_mode=FREEDV_MODE.datac0.value,
copies=1,
repeat_delay=0,
) -> None:
"""
Send (transmit) supplied frame to TNC
:param frame_to_tx: Frame data to send
:type frame_to_tx: bytearray
:param c2_mode: Codec2 mode to use, defaults to 14 (datac0)
:type c2_mode: int, optional
:param copies: Number of frame copies to send, defaults to 1
:type copies: int, optional
:param repeat_delay: Delay time before sending repeat frame, defaults to 0
:type repeat_delay: int, optional
"""
self.log.debug("[TNC] enqueue_frame_for_tx", c2_mode=FREEDV_MODE(c2_mode).name)
# Set the TRANSMITTING flag before adding an object to the transmit queue
# TODO: This is not that nice, we could improve this somehow
static.TRANSMITTING = True
modem.MODEM_TRANSMIT_QUEUE.put([c2_mode, copies, repeat_delay, [frame_to_tx]])
# Wait while transmitting
while static.TRANSMITTING:
time.sleep(0.01)
def send_data_to_socket_queue(self, **jsondata):
"""
Send information to the UI via JSON and the sock.SOCKET_QUEUE.
Args:
Dictionary containing the data to be sent, in the format:
key=value, for each item. E.g.:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="received",
status="success",
uuid=self.transmission_uuid,
timestamp=timestamp,
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
data=base64_data,
)
"""
self.log.debug("[TNC] send_data_to_socket_queue:", jsondata=jsondata)
json_data_out = json.dumps(jsondata)
sock.SOCKET_QUEUE.put(json_data_out)
def send_burst_ack_frame(self, snr) -> None:
"""Build and send ACK frame for burst DATA frame"""
ack_frame = bytearray(self.length_sig1_frame)
ack_frame[:1] = bytes([FR_TYPE.BURST_ACK.value])
# ack_frame[1:4] = static.DXCALLSIGN_CRC
# ack_frame[4:7] = static.MYCALLSIGN_CRC
ack_frame[1:2] = self.session_id
ack_frame[2:3] = bytes([int(snr)])
ack_frame[3:4] = bytes([int(self.speed_level)])
# Transmit frame
self.enqueue_frame_for_tx(ack_frame, c2_mode=FREEDV_MODE.datac0.value)
def send_data_ack_frame(self, snr) -> None:
"""Build and send ACK frame for received DATA frame"""
ack_frame = bytearray(self.length_sig1_frame)
ack_frame[:1] = bytes([FR_TYPE.FR_ACK.value])
ack_frame[1:2] = self.session_id
# ack_frame[1:4] = static.DXCALLSIGN_CRC
# ack_frame[4:7] = static.MYCALLSIGN_CRC
# ack_frame[7:8] = bytes([int(snr)])
# ack_frame[8:9] = bytes([int(self.speed_level)])
# Transmit frame
self.enqueue_frame_for_tx(ack_frame, c2_mode=FREEDV_MODE.datac0.value, copies=3, repeat_delay=0)
def send_retransmit_request_frame(self, freedv) -> None:
# check where a None is in our burst buffer and do frame+1, beacuse lists start at 0
# FIXME: Check to see if there's a `frame - 1` in the receive portion. Remove both if there is.
missing_frames = [
frame + 1
for frame, element in enumerate(static.RX_BURST_BUFFER)
if element is None
]
# set n frames per burst to modem
# this is an idea, so it's not getting lost....
# we need to work on this
codec2.api.freedv_set_frames_per_burst(freedv, len(missing_frames))
# TODO: Trim `missing_frames` bytesarray to [7:13] (6) frames, if it's larger.
# TODO: Instead of using int we could use a binary flag
# then create a repeat frame
rpt_frame = bytearray(self.length_sig1_frame)
rpt_frame[:1] = bytes([FR_TYPE.FR_REPEAT.value])
rpt_frame[1:2] = self.session_id
# rpt_frame[1:4] = static.DXCALLSIGN_CRC
# rpt_frame[4:7] = static.MYCALLSIGN_CRC
# rpt_frame[7:13] = missing_frames
self.log.info("[TNC] ARQ | RX | Requesting", frames=missing_frames)
# Transmit frame
self.enqueue_frame_for_tx(rpt_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def send_burst_nack_frame(self, snr: float = 0) -> None:
"""Build and send NACK frame for received DATA frame"""
nack_frame = bytearray(self.length_sig1_frame)
nack_frame[:1] = bytes([FR_TYPE.FR_NACK.value])
nack_frame[1:2] = self.session_id
# nack_frame[1:4] = static.DXCALLSIGN_CRC
# nack_frame[4:7] = static.MYCALLSIGN_CRC
nack_frame[2:3] = bytes([int(snr)])
nack_frame[3:4] = bytes([int(self.speed_level)])
# TRANSMIT NACK FRAME FOR BURST
self.enqueue_frame_for_tx(nack_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def send_burst_nack_frame_watchdog(self, snr: float = 0) -> None:
"""Build and send NACK frame for watchdog timeout"""
nack_frame = bytearray(self.length_sig1_frame)
nack_frame[:1] = bytes([FR_TYPE.BURST_NACK.value])
nack_frame[1:2] = self.session_id
# nack_frame[1:4] = static.DXCALLSIGN_CRC
# nack_frame[4:7] = static.MYCALLSIGN_CRC
nack_frame[2:3] = bytes([int(snr)])
nack_frame[3:4] = bytes([int(self.speed_level)])
# TRANSMIT NACK FRAME FOR BURST
self.enqueue_frame_for_tx(nack_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def send_disconnect_frame(self) -> None:
"""Build and send a disconnect frame"""
disconnection_frame = bytearray(self.length_sig1_frame)
disconnection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_CLOSE.value])
disconnection_frame[1:2] = self.session_id
# disconnection_frame[1:4] = static.DXCALLSIGN_CRC
# disconnection_frame[4:7] = static.MYCALLSIGN_CRC
# TODO: Needed? disconnection_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign)
self.enqueue_frame_for_tx(disconnection_frame, c2_mode=FREEDV_MODE.datac0.value, copies=5, repeat_delay=0)
def arq_data_received(
self, data_in: bytes, bytes_per_frame: int, snr: float, freedv
) -> None:
"""
Args:
data_in:bytes:
bytes_per_frame:int:
snr:float:
freedv:
Returns:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
data_in = bytes(data_in)
# TODO: this seems not to work anymore
# get received crc for different mycall ssids
# check if callsign ssid override
#_, mycallsign = helpers.check_callsign(
# self.mycallsign, data_in[2:5]
#)
# attempt fixing this
mycallsign = self.mycallsign
# only process data if we are in ARQ and BUSY state else return to quit
if not static.ARQ_STATE and static.TNC_STATE != "BUSY":
return
self.arq_file_transfer = True
static.TNC_STATE = "BUSY"
static.ARQ_STATE = True
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
# Extract some important data from the frame
# Get sequence number of burst frame
rx_n_frame_of_burst = int.from_bytes(bytes(data_in[:1]), "big") - 10
# Get number of bursts from received frame
rx_n_frames_per_burst = int.from_bytes(bytes(data_in[1:2]), "big")
# The RX burst buffer needs to have a fixed length filled with "None".
# We need this later for counting the "Nones" to detect missing data.
# Check if burst buffer has expected length else create it
if len(static.RX_BURST_BUFFER) != rx_n_frames_per_burst:
static.RX_BURST_BUFFER = [None] * rx_n_frames_per_burst
# Append data to rx burst buffer
# [frame_type][n_frames_per_burst][CRC24][CRC24]
# static.RX_BURST_BUFFER[rx_n_frame_of_burst] = data_in[8:] # type: ignore
static.RX_BURST_BUFFER[rx_n_frame_of_burst] = data_in[3:] # type: ignore
self.log.debug("[TNC] static.RX_BURST_BUFFER", buffer=static.RX_BURST_BUFFER)
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
snr,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
# Check if we received all frames in the burst by checking if burst buffer has no more "Nones"
# This is the ideal case because we received all data
if None not in static.RX_BURST_BUFFER:
# then iterate through burst buffer and stick the burst together
# the temp burst buffer is needed for checking, if we already recevied data
temp_burst_buffer = b""
for value in static.RX_BURST_BUFFER:
# static.RX_FRAME_BUFFER += static.RX_BURST_BUFFER[i]
temp_burst_buffer += bytes(value) # type: ignore
# if frame buffer ends not with the current frame, we are going to append new data
# if data already exists, we received the frame correctly,
# but the ACK frame didn't receive its destination (ISS)
if static.RX_FRAME_BUFFER.endswith(temp_burst_buffer):
self.log.info(
"[TNC] ARQ | RX | Frame already received - sending ACK again"
)
static.RX_BURST_BUFFER = []
else:
# Here we are going to search for our data in the last received bytes.
# This reduces the chance we will lose the entire frame in the case of signalling frame loss
# static.RX_FRAME_BUFFER --> existing data
# temp_burst_buffer --> new data
# search_area --> area where we want to search
search_area = 510
search_position = len(static.RX_FRAME_BUFFER) - search_area
# find position of data. returns -1 if nothing found in area else >= 0
# we are beginning from the end, so if data exists twice or more,
# only the last one should be replaced
get_position = static.RX_FRAME_BUFFER[search_position:].rfind(
temp_burst_buffer
)
# if we find data, replace it at this position with the new data and strip it
if get_position >= 0:
static.RX_FRAME_BUFFER = static.RX_FRAME_BUFFER[
: search_position + get_position
]
static.RX_FRAME_BUFFER += temp_burst_buffer
self.log.warning(
"[TNC] ARQ | RX | replacing existing buffer data",
area=search_area,
pos=get_position,
)
# If we don't find data in this range, we really have new data and going to replace it
else:
static.RX_FRAME_BUFFER += temp_burst_buffer
self.log.debug("[TNC] ARQ | RX | appending data to buffer")
# Check if we didn't receive a BOF and EOF yet to avoid sending
# ack frames if we already received all data
if (
not self.rx_frame_bof_received
and not self.rx_frame_eof_received
and data_in.find(self.data_frame_eof) < 0
):
self.frame_received_counter += 1
if self.frame_received_counter >= 2:
self.frame_received_counter = 0
self.speed_level = min(
self.speed_level + 1, len(self.mode_list) - 1
)
static.ARQ_SPEED_LEVEL = self.speed_level
# Update modes we are listening to
self.set_listening_modes(False, True, self.mode_list[self.speed_level])
# Create and send ACK frame
self.log.info("[TNC] ARQ | RX | SENDING ACK")
self.send_burst_ack_frame(snr)
# Reset n retries per burst counter
self.n_retries_per_burst = 0
# calculate statistics
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
elif rx_n_frame_of_burst == rx_n_frames_per_burst - 1:
# We have "Nones" in our rx buffer,
# Check if we received last frame of burst - this is an indicator for missed frames.
# With this way of doing this, we always MUST receive the last
# frame of a burst otherwise the entire burst is lost
# TODO: See if a timeout on the send side with re-transmit last burst would help.
self.log.debug(
"[TNC] all frames in burst received:",
frame=rx_n_frame_of_burst,
frames=rx_n_frames_per_burst,
)
self.send_retransmit_request_frame(freedv)
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
# Should never reach this point
else:
self.log.error(
"[TNC] data_handler: Should not reach this point...",
frame=rx_n_frame_of_burst,
frames=rx_n_frames_per_burst,
)
# We have a BOF and EOF flag in our data. If we received both we received our frame.
# In case of loosing data, but we received already a BOF and EOF we need to make sure, we
# received the complete last burst by checking it for Nones
bof_position = static.RX_FRAME_BUFFER.find(self.data_frame_bof)
eof_position = static.RX_FRAME_BUFFER.find(self.data_frame_eof)
# get total bytes per transmission information as soon we recevied a frame with a BOF
if bof_position >= 0:
payload = static.RX_FRAME_BUFFER[
bof_position + len(self.data_frame_bof): eof_position
]
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes
static.TOTAL_BYTES = frame_length
compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes
# limit to max value of 255
compression_factor = np.clip(compression_factor, 0, 255)
static.ARQ_COMPRESSION_FACTOR = compression_factor / 10
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
if (
bof_position >= 0
and eof_position > 0
and None not in static.RX_BURST_BUFFER
):
self.log.debug(
"[TNC] arq_data_received:",
bof_position=bof_position,
eof_position=eof_position,
)
self.rx_frame_bof_received = True
self.rx_frame_eof_received = True
# Extract raw data from buffer
payload = static.RX_FRAME_BUFFER[
bof_position + len(self.data_frame_bof): eof_position
]
# Get the data frame crc
data_frame_crc = payload[:4] # 0:4 = 4 bytes
# Get the data frame length
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 = 4 bytes
static.TOTAL_BYTES = frame_length
# 8:9 = compression factor
data_frame = payload[9:]
data_frame_crc_received = helpers.get_crc_32(data_frame)
# Check if data_frame_crc is equal with received crc
if data_frame_crc == data_frame_crc_received:
self.log.info("[TNC] ARQ | RX | DATA FRAME SUCESSFULLY RECEIVED")
# Decompress the data frame
data_frame_decompressed = zlib.decompress(data_frame)
static.ARQ_COMPRESSION_FACTOR = len(data_frame_decompressed) / len(
data_frame
)
data_frame = data_frame_decompressed
self.transmission_uuid = str(uuid.uuid4())
timestamp = int(time.time())
# Re-code data_frame in base64, UTF-8 for JSON UI communication.
base64_data = base64.b64encode(data_frame).decode("UTF-8")
# check if RX_BUFFER isn't full
if not RX_BUFFER.full():
# make sure we have always the correct buffer size
RX_BUFFER.maxsize = static.RX_BUFFER_SIZE
else:
# if full, free space by getting an item
self.log.info(
"[TNC] ARQ | RX | RX_BUFFER FULL - dropping old data",
buffer_size=RX_BUFFER.qsize(),
maxsize=static.RX_BUFFER_SIZE
)
RX_BUFFER.get()
# add item to RX_BUFFER
self.log.info(
"[TNC] ARQ | RX | saving data to rx buffer",
buffer_size=RX_BUFFER.qsize() + 1,
maxsize=RX_BUFFER.maxsize
)
RX_BUFFER.put(
[
self.transmission_uuid,
timestamp,
static.DXCALLSIGN,
static.DXGRID,
base64_data,
]
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="received",
uuid=self.transmission_uuid,
timestamp=timestamp,
mycallsign=str(mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
data=base64_data,
)
self.log.info(
"[TNC] ARQ | RX | SENDING DATA FRAME ACK",
snr=snr,
crc=data_frame_crc.hex(),
)
self.send_data_ack_frame(snr)
# Update statistics AFTER the frame ACK is sent
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
self.log.info(
"[TNC] | RX | DATACHANNEL ["
+ str(self.mycallsign, "UTF-8")
+ "]<< >>["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
snr=snr,
)
else:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
)
self.log.warning(
"[TNC] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
e="wrong crc",
expected=data_frame_crc.hex(),
received=data_frame_crc_received.hex(),
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
self.log.info("[TNC] ARQ | RX | Sending NACK")
self.send_burst_nack_frame(snr)
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
# Finally cleanup our buffers and states,
self.arq_cleanup()
def arq_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int):
"""
Transmit ARQ frame
Args:
data_out:bytes:
mode:int:
n_frames_per_burst:int:
"""
self.arq_file_transfer = True
# set signalling modes we want to listen to
# we are in an ongoing arq transmission, so we don't need sig0 actually
modem.RECEIVE_SIG0 = False
modem.RECEIVE_SIG1 = True
self.tx_n_retry_of_burst = 0 # retries we already sent data
# Maximum number of retries to send before declaring a frame is lost
# save len of data_out to TOTAL_BYTES for our statistics --> kBytes
# static.TOTAL_BYTES = round(len(data_out) / 1024, 2)
static.TOTAL_BYTES = len(data_out)
frame_total_size = len(data_out).to_bytes(4, byteorder="big")
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="transmitting",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
self.log.info(
"[TNC] | TX | DATACHANNEL",
Bytes=static.TOTAL_BYTES,
)
# Compress data frame
data_frame_compressed = zlib.compress(data_out)
compression_factor = len(data_out) / len(data_frame_compressed)
static.ARQ_COMPRESSION_FACTOR = np.clip(compression_factor, 0, 255)
compression_factor = bytes([int(static.ARQ_COMPRESSION_FACTOR * 10)])
data_out = data_frame_compressed
# Reset data transfer statistics
tx_start_of_transmission = time.time()
self.calculate_transfer_rate_tx(tx_start_of_transmission, 0, len(data_out))
# Append a crc at the beginning and end of file indicators
frame_payload_crc = helpers.get_crc_32(data_out)
self.log.debug("[TNC] frame payload CRC:", crc=frame_payload_crc.hex())
# Assemble the data frame
data_out = (
self.data_frame_bof
+ frame_payload_crc
+ frame_total_size
+ compression_factor
+ data_out
+ self.data_frame_eof
)
# Initial bufferposition is 0
bufferposition = bufferposition_end = 0
# Iterate through data_out buffer
while not self.data_frame_ack_received and static.ARQ_STATE:
# we have self.tx_n_max_retries_per_burst attempts for sending a burst
for self.tx_n_retry_of_burst in range(self.tx_n_max_retries_per_burst):
# data_mode = mode
# self.log.debug("[TNC] FIXED MODE:", mode=FREEDV_MODE(data_mode).name)
# we are doing a modulo check of transmission retries of the actual burst
# every 2nd retry which fails, decreases speedlevel by 1.
# as soon as we received an ACK for the current burst, speed_level will increase
# by 1.
# The intent is to optimize speed by adapting to the current RF conditions.
# if not self.tx_n_retry_of_burst % 2 and self.tx_n_retry_of_burst > 0:
# self.speed_level = max(self.speed_level - 1, 0)
# if self.tx_n_retry_of_burst <= 1:
# self.speed_level += 1
# self.speed_level = max(self.speed_level + 1, len(self.mode_list) - 1)
# Bound speed level to:
# - minimum of either the speed or the length of mode list - 1
# - maximum of either the speed or zero
self.speed_level = min(self.speed_level, len(self.mode_list) - 1)
self.speed_level = max(self.speed_level, 0)
static.ARQ_SPEED_LEVEL = self.speed_level
data_mode = self.mode_list[self.speed_level]
self.log.debug(
"[TNC] Speed-level:",
level=self.speed_level,
retry=self.tx_n_retry_of_burst,
mode=FREEDV_MODE(data_mode).name,
)
# Payload information
payload_per_frame = modem.get_bytes_per_frame(data_mode) - 2
# Tempbuffer list for storing our data frames
tempbuffer = []
# Append data frames with n_frames_per_burst to tempbuffer
# TODO: this part needs a complete rewrite!
# n_frames_per_burst = 1 is working
arqheader = bytearray()
# arqheader[:1] = bytes([FR_TYPE.BURST_01.value + i])
arqheader[:1] = bytes([FR_TYPE.BURST_01.value])
arqheader[1:2] = bytes([n_frames_per_burst])
arqheader[2:3] = self.session_id
# arqheader[2:5] = static.DXCALLSIGN_CRC
# arqheader[5:8] = static.MYCALLSIGN_CRC
bufferposition_end = bufferposition + payload_per_frame - len(arqheader)
# Normal condition
if bufferposition_end <= len(data_out):
frame = data_out[bufferposition:bufferposition_end]
frame = arqheader + frame
# Pad the last bytes of a frame
else:
extended_data_out = data_out[bufferposition:]
extended_data_out += bytes([0]) * (
payload_per_frame - len(extended_data_out) - len(arqheader)
)
frame = arqheader + extended_data_out
# Append frame to tempbuffer for transmission
tempbuffer.append(frame)
self.log.debug("[TNC] tempbuffer:", tempbuffer=tempbuffer)
self.log.info(
"[TNC] ARQ | TX | FRAMES",
mode=FREEDV_MODE(data_mode).name,
fpb=n_frames_per_burst,
retry=self.tx_n_retry_of_burst,
)
for t_buf_item in tempbuffer:
self.enqueue_frame_for_tx(t_buf_item, c2_mode=data_mode)
# After transmission finished, wait for an ACK or RPT frame
# burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100
# while (not self.burst_ack and not self.burst_nack and
# not self.rpt_request_received and not self.data_frame_ack_received and
# time.time() < burstacktimeout and static.ARQ_STATE):
# time.sleep(0.01)
# burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100
while static.ARQ_STATE and not (
self.burst_ack
or self.burst_nack
or self.rpt_request_received
or self.data_frame_ack_received
):
time.sleep(0.01)
# Once we receive a burst ack, reset its state and break the RETRIES loop
if self.burst_ack:
self.burst_ack = False # reset ack state
self.tx_n_retry_of_burst = 0 # reset retries
self.log.debug(
"[TNC] arq_transmit: Received BURST ACK. Sending next chunk."
)
break # break retry loop
if self.burst_nack:
self.burst_nack = False # reset nack state
# not yet implemented
if self.rpt_request_received:
pass
if self.data_frame_ack_received:
self.log.debug(
"[TNC] arq_transmit: Received FRAME ACK. Sending next chunk."
)
break # break retry loop
# We need this part for leaving the repeat loop
# static.ARQ_STATE == "DATA" --> when stopping transmission manually
if not static.ARQ_STATE:
# print("not ready for data...leaving loop....")
self.log.debug(
"[TNC] arq_transmit: ARQ State changed to FALSE. Breaking retry loop."
)
break
self.calculate_transfer_rate_tx(
tx_start_of_transmission, bufferposition_end, len(data_out)
)
# NEXT ATTEMPT
self.log.debug(
"[TNC] ATTEMPT:",
retry=self.tx_n_retry_of_burst,
maxretries=self.tx_n_max_retries_per_burst,
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
# End of FOR loop
# update buffer position
bufferposition = bufferposition_end
# update stats
self.calculate_transfer_rate_tx(
tx_start_of_transmission, bufferposition_end, len(data_out)
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="transmitting",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
# Stay in the while loop until we receive a data_frame_ack. Otherwise
# the loop exits after sending the last frame only once and doesn't
# wait for an acknowledgement.
if self.data_frame_ack_received and bufferposition > len(data_out):
self.log.debug("[TNC] arq_tx: Last fragment sent and acknowledged.")
break
# GOING TO NEXT ITERATION
if self.data_frame_ack_received:
# we need to wait until sending "transmitted" state
# gui database is too slow for handling this within 0.001 seconds
# so let's sleep a little
time.sleep(0.2)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="transmitted",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
self.log.info(
"[TNC] ARQ | TX | DATA TRANSMITTED!",
BytesPerMinute=static.ARQ_BYTES_PER_MINUTE,
BitsPerSecond=static.ARQ_BITS_PER_SECOND,
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
else:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
self.log.info(
"[TNC] ARQ | TX | TRANSMISSION FAILED OR TIME OUT!",
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
self.stop_transmission()
# Last but not least do a state cleanup
self.arq_cleanup()
if TESTMODE:
# Quit after transmission
self.log.debug("[TNC] TESTMODE: arq_transmit exiting.")
sys.exit(0)
def burst_ack_nack_received(self, data_in: bytes) -> None:
"""
Received a ACK/NACK for a transmitted frame, keep track and
make adjustments to speed level if needed.
Args:
data_in:bytes:
Returns:
"""
# Process data only if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
frametype = int.from_bytes(bytes(data_in[:1]), "big")
desc = "ack"
if frametype == FR_TYPE.BURST_ACK.value:
# Increase speed level if we received a burst ack
# self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1)
# Force data retry loops of TX TNC to stop and continue with next frame
self.burst_ack = True
# Reset burst nack counter
self.burst_nack_counter = 0
# Reset n retries per burst counter
self.n_retries_per_burst = 0
else:
# Decrease speed level if we received a burst nack
# self.speed_level = max(self.speed_level - 1, 0)
# Set flag to retry frame again.
self.burst_nack = True
# Increment burst nack counter
self.burst_nack_counter += 1
desc = "nack"
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
self.burst_ack_snr = int.from_bytes(bytes(data_in[2:3]), "big")
self.speed_level = int.from_bytes(bytes(data_in[3:4]), "big")
static.ARQ_SPEED_LEVEL = self.speed_level
self.log.debug(
f"[TNC] burst_{desc}_received:",
speed_level=self.speed_level,
c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name,
)
def frame_ack_received(
self, data_in: bytes # pylint: disable=unused-argument
) -> None:
"""Received an ACK for a transmitted frame"""
# Process data only if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
# Force data loops of TNC to stop and continue with next frame
self.data_frame_ack_received = True
# Update arq_session and data_channel timestamp
self.data_channel_last_received = int(time.time())
self.arq_session_last_received = int(time.time())
def frame_nack_received(
self, data_in: bytes # pylint: disable=unused-argument
) -> None:
"""
Received a NACK for a transmitted frame
Args:
data_in:bytes:
"""
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
# Update data_channel timestamp
self.arq_session_last_received = int(time.time())
self.arq_cleanup()
def burst_rpt_received(self, data_in: bytes):
"""
Repeat request frame received for transmitted frame
Args:
data_in:bytes:
"""
# Only process data if we are in ARQ and BUSY state
if static.ARQ_STATE and static.TNC_STATE == "BUSY":
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.rpt_request_received = True
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
self.rpt_request_buffer = []
missing_area = bytes(data_in[3:12]) # 1:9
for i in range(0, 6, 2):
if not missing_area[i: i + 2].endswith(b"\x00\x00"):
missing = missing_area[i: i + 2]
self.rpt_request_buffer.insert(0, missing)
############################################################################################################
# ARQ SESSION HANDLER
############################################################################################################
def arq_session_handler(self) -> bool:
"""
Create a session with `static.DXCALLSIGN` and wait until the session is open.
Returns:
True if the session was opened successfully
False if the session open request failed
"""
# TODO: we need to check this, maybe placing it to class init
self.datachannel_timeout = False
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]>> <<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
state=static.ARQ_SESSION_STATE,
)
self.open_session()
# wait until data channel is open
while not static.ARQ_SESSION and not self.arq_session_timeout:
time.sleep(0.01)
static.ARQ_SESSION_STATE = "connecting"
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="connecting",
)
if static.ARQ_SESSION and static.ARQ_SESSION_STATE == "connected":
# static.ARQ_SESSION_STATE = "connected"
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="connected",
)
return True
static.ARQ_SESSION_STATE = "failed"
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="failed",
reason="timeout",
)
return False
def open_session(self) -> bool:
"""
Create and send the frame to request a connection.
Returns:
True if the session was opened successfully
False if the session open request failed
"""
self.IS_ARQ_SESSION_MASTER = True
static.ARQ_SESSION_STATE = "connecting"
# create a random session id
self.session_id = randbytes(1)
print(self.session_id)
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_OPEN.value])
connection_frame[1:2] = self.session_id
connection_frame[2:5] = static.DXCALLSIGN_CRC
connection_frame[5:8] = static.MYCALLSIGN_CRC
connection_frame[8:14] = helpers.callsign_to_bytes(self.mycallsign)
while not static.ARQ_SESSION:
time.sleep(0.01)
for attempt in range(self.session_connect_max_retries):
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]>>?<<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
a=attempt + 1, # Adjust for 0-based for user display
state=static.ARQ_SESSION_STATE,
)
self.enqueue_frame_for_tx(connection_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
# Wait for a time, looking to see if `static.ARQ_SESSION`
# indicates we've received a positive response from the far station.
timeout = time.time() + 3
while time.time() < timeout:
time.sleep(0.01)
# Stop waiting if data channel is opened
if static.ARQ_SESSION:
return True
# Session connect timeout, send close_session frame to
# attempt to clean up the far-side, if it received the
# open_session frame and can still hear us.
if not static.ARQ_SESSION:
self.close_session()
return False
# Given the while condition, it will only exit when `static.ARQ_SESSION` is True
return True
def received_session_opener(self, data_in: bytes) -> None:
"""
Received a session open request packet.
Args:
data_in:bytes:
"""
self.IS_ARQ_SESSION_MASTER = False
static.ARQ_SESSION_STATE = "connecting"
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
self.session_id = bytes(data_in[1:2])
static.DXCALLSIGN_CRC = bytes(data_in[5:8])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[8:14]))
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]>>|<<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
state=static.ARQ_SESSION_STATE,
)
static.ARQ_SESSION = True
static.TNC_STATE = "BUSY"
self.transmit_session_heartbeat()
def close_session(self) -> None:
"""Close the ARQ session"""
static.ARQ_SESSION_STATE = "disconnecting"
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]<<X>>["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
state=static.ARQ_SESSION_STATE,
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="close",
)
self.IS_ARQ_SESSION_MASTER = False
static.ARQ_SESSION = False
# we need to send disconnect frame before doing arq cleanup
# we would loose our session id then
self.send_disconnect_frame()
self.arq_cleanup()
def received_session_close(self, data_in: bytes):
"""
Closes the session when a close session frame is received and
the DXCALLSIGN_CRC matches the remote station participating in the session.
Args:
data_in:bytes:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
# Close the session if the CRC matches the remote station in static.
_valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7]))
_valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2]))
if _valid_crc or _valid_session:
static.ARQ_SESSION_STATE = "disconnected"
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]<<X>>["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
state=static.ARQ_SESSION_STATE,
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="close",
)
self.IS_ARQ_SESSION_MASTER = False
static.ARQ_SESSION = False
self.arq_cleanup()
def transmit_session_heartbeat(self) -> None:
"""Send ARQ sesion heartbeat while connected"""
# static.ARQ_SESSION = True
# static.TNC_STATE = "BUSY"
# static.ARQ_SESSION_STATE = "connected"
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value])
connection_frame[1:2] = self.session_id
# connection_frame[1:4] = static.DXCALLSIGN_CRC
# connection_frame[4:7] = static.MYCALLSIGN_CRC
self.enqueue_frame_for_tx(connection_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def received_session_heartbeat(self, data_in: bytes) -> None:
"""
Received an ARQ session heartbeat, record and update state accordingly.
Args:
data_in:bytes:
"""
# Accept session data if the DXCALLSIGN_CRC matches the station in static or session id.
_valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7]))
_valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2]))
if _valid_crc or _valid_session:
self.log.debug("[TNC] Received session heartbeat")
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"SESSION-HB",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
static.ARQ_SESSION = True
static.ARQ_SESSION_STATE = "connected"
static.TNC_STATE = "BUSY"
# Update the timeout timestamps
self.arq_session_last_received = int(time.time())
self.data_channel_last_received = int(time.time())
if not self.IS_ARQ_SESSION_MASTER and not self.arq_file_transfer:
self.transmit_session_heartbeat()
##########################################################################################################
# ARQ DATA CHANNEL HANDLER
##########################################################################################################
def open_dc_and_transmit(
self,
data_out: bytes,
mode: int,
n_frames_per_burst: int,
transmission_uuid: str,
mycallsign,
) -> bool:
"""
Open data channel and transmit data
Args:
data_out:bytes:
mode:int:
n_frames_per_burst:int:
transmission_uuid:str:
mycallsign:bytes:
Returns:
True if the data session was opened and the data was sent
False if the data session was not opened
"""
# overwrite mycallsign in case of different SSID
self.mycallsign = mycallsign
static.TNC_STATE = "BUSY"
self.arq_file_transfer = True
self.transmission_uuid = transmission_uuid
# wait a moment for the case, a heartbeat is already on the way back to us
if static.ARQ_SESSION:
time.sleep(0.5)
self.datachannel_timeout = False
# we need to compress data for gettin a compression factor.
# so we are compressing twice. This is not that nice and maybe there is another way
# for calculating transmission statistics
# static.ARQ_COMPRESSION_FACTOR = len(data_out) / len(zlib.compress(data_out))
self.arq_open_data_channel(mode, n_frames_per_burst, mycallsign)
# wait until data channel is open
while not static.ARQ_STATE and not self.datachannel_timeout:
time.sleep(0.01)
if static.ARQ_STATE:
self.arq_transmit(data_out, mode, n_frames_per_burst)
return True
return False
def arq_open_data_channel(
self, mode: int, n_frames_per_burst: int, mycallsign
) -> bool:
"""
Open an ARQ data channel.
Args:
mode:int:
n_frames_per_burst:int:
mycallsign:bytes:
Returns:
True if the data channel was opened successfully
False if the data channel failed to open
"""
self.is_IRS = False
# init a new random session id if we are not in an arq session
if not static.ARQ_SESSION:
self.session_id = randbytes(1)
print(self.session_id)
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
if static.LOW_BANDWIDTH_MODE:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_N.value])
self.log.debug("[TNC] Requesting low bandwidth mode")
else:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_W.value])
self.log.debug("[TNC] Requesting high bandwidth mode")
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = frametype
connection_frame[1:4] = static.DXCALLSIGN_CRC
connection_frame[4:7] = static.MYCALLSIGN_CRC
connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign)
# connection_frame[13:14] = bytes([n_frames_per_burst])
connection_frame[13:14] = self.session_id
while not static.ARQ_STATE:
time.sleep(0.01)
for attempt in range(self.data_channel_max_retries):
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="opening",
)
self.log.info(
"[TNC] ARQ | DATA | TX | ["
+ str(mycallsign, "UTF-8")
+ "]>> <<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
attempt=f"{str(attempt + 1)}/{str(self.data_channel_max_retries)}",
)
self.enqueue_frame_for_tx(connection_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
timeout = time.time() + 4
while time.time() < timeout:
time.sleep(0.01)
# Stop waiting if data channel is opened
if static.ARQ_STATE:
return True
# `data_channel_max_retries` attempts have been sent. Aborting attempt & cleaning up
self.log.debug(
"[TNC] arq_open_data_channel:", transmission_uuid=self.transmission_uuid
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
reason="unknown",
uuid=self.transmission_uuid,
percent=static.ARQ_TRANSMISSION_PERCENT,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
)
self.log.warning(
"[TNC] ARQ | TX | DATA ["
+ str(mycallsign, "UTF-8")
+ "]>>X<<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]"
)
self.datachannel_timeout = True
# Attempt to cleanup the far-side, if it received the
# open_session frame and can still hear us.
self.close_session()
self.arq_cleanup()
return False
# Shouldn't get here..
return True
def arq_received_data_channel_opener(self, data_in: bytes):
"""
Received request to open data channel framt
Args:
data_in:bytes:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
self.arq_file_transfer = True
self.is_IRS = True
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="opening",
)
static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13]))
# n_frames_per_burst is currently unused
# n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big")
frametype = int.from_bytes(bytes(data_in[:1]), "big")
# check if we received low bandwidth mode
# possible channel constellations
# ISS(w) <-> IRS(w)
# ISS(w) <-> IRS(n)
# ISS(n) <-> IRS(w)
# ISS(n) <-> IRS(n)
if frametype == FR_TYPE.ARQ_DC_OPEN_W.value and not static.LOW_BANDWIDTH_MODE:
# ISS(w) <-> IRS(w)
constellation = "ISS(w) <-> IRS(w)"
self.received_LOW_BANDWIDTH_MODE = False
self.mode_list = self.mode_list_high_bw
self.time_list = self.time_list_high_bw
self.snr_list = self.snr_list_high_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_W.value and static.LOW_BANDWIDTH_MODE:
# ISS(w) <-> IRS(n)
constellation = "ISS(w) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = False
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value and not static.LOW_BANDWIDTH_MODE:
# ISS(n) <-> IRS(w)
constellation = "ISS(n) <-> IRS(w)"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value and static.LOW_BANDWIDTH_MODE:
# ISS(n) <-> IRS(n)
constellation = "ISS(n) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
else:
constellation = "not matched"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
# get mode which fits to given SNR
# initially set speed_level 0 in case of really bad SNR and no matching mode
self.speed_level = 0
for i in range(len(self.mode_list)):
if static.SNR >= self.snr_list[i]:
self.speed_level = i
self.log.debug(
"[TNC] calculated speed level",
speed_level=self.speed_level,
given_snr=static.SNR,
min_snr=self.snr_list[self.speed_level],
)
# Update modes we are listening to
self.set_listening_modes(True, True, self.mode_list[self.speed_level])
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.session_id = data_in[13:14]
print(self.session_id)
# check if callsign ssid override
_, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4])
self.log.info(
"[TNC] ARQ | DATA | RX | ["
+ str(mycallsign, "UTF-8")
+ "]>> <<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
channel_constellation=constellation,
)
static.ARQ_STATE = True
static.TNC_STATE = "BUSY"
self.reset_statistics()
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
# Select the frame type based on the current TNC mode
if static.LOW_BANDWIDTH_MODE or self.received_LOW_BANDWIDTH_MODE:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_ACK_N.value])
self.log.debug("[TNC] Responding with low bandwidth mode")
else:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_ACK_W.value])
self.log.debug("[TNC] Responding with high bandwidth mode")
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = frametype
connection_frame[1:2] = self.session_id
# connection_frame[1:4] = static.DXCALLSIGN_CRC
# connection_frame[4:7] = static.MYCALLSIGN_CRC
connection_frame[8:9] = bytes([self.speed_level])
# For checking protocol version on the receiving side
connection_frame[13:14] = bytes([static.ARQ_PROTOCOL_VERSION])
self.enqueue_frame_for_tx(connection_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
self.log.info(
"[TNC] ARQ | DATA | RX | ["
+ str(mycallsign, "UTF-8")
+ "]>>|<<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
bandwidth="wide",
snr=static.SNR,
)
# set start of transmission for our statistics
self.rx_start_of_transmission = time.time()
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
def arq_received_channel_is_open(self, data_in: bytes) -> None:
"""
Called if we received a data channel opener
Args:
data_in:bytes:
"""
protocol_version = int.from_bytes(bytes(data_in[13:14]), "big")
if protocol_version == static.ARQ_PROTOCOL_VERSION:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="opened",
)
frametype = int.from_bytes(bytes(data_in[:1]), "big")
if frametype == FR_TYPE.ARQ_DC_OPEN_ACK_N.value:
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.log.debug("[TNC] low bandwidth mode", modes=self.mode_list)
else:
self.received_LOW_BANDWIDTH_MODE = False
self.mode_list = self.mode_list_high_bw
self.time_list = self.time_list_high_bw
self.log.debug("[TNC] high bandwidth mode", modes=self.mode_list)
# set speed level from session opener frame which is selected by SNR measurement
self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big")
self.log.debug("[TNC] speed level selected for given SNR", speed_level=self.speed_level)
# self.speed_level = len(self.mode_list) - 1
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.log.info(
"[TNC] ARQ | DATA | TX | ["
+ str(self.mycallsign, "UTF-8")
+ "]>>|<<["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
snr=static.SNR,
)
# as soon as we set ARQ_STATE to DATA, transmission starts
static.ARQ_STATE = True
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
else:
static.TNC_STATE = "IDLE"
static.ARQ_STATE = False
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
reason="protocol version missmatch",
)
# TODO: We should display a message to this effect on the UI.
self.log.warning(
"[TNC] protocol version mismatch:",
received=protocol_version,
own=static.ARQ_PROTOCOL_VERSION,
)
self.arq_cleanup()
# ---------- PING
def transmit_ping(self, dxcallsign: bytes) -> None:
"""
Funktion for controlling pings
Args:
dxcallsign:bytes:
"""
if not str(dxcallsign).strip():
# TODO: We should display a message to this effect on the UI.
self.log.warning("[TNC] Missing required callsign", dxcallsign=dxcallsign)
return
static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
self.send_data_to_socket_queue(freedata="tnc-message", ping="transmitting")
self.log.info(
"[TNC] PING REQ ["
+ str(self.mycallsign, "UTF-8")
+ "] >>> ["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]"
)
ping_frame = bytearray(self.length_sig0_frame)
ping_frame[:1] = bytes([FR_TYPE.PING.value])
ping_frame[1:4] = static.DXCALLSIGN_CRC
ping_frame[4:7] = static.MYCALLSIGN_CRC
ping_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign)
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.enqueue_frame_for_tx(ping_frame, c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx(ping_frame, c2_mode=FREEDV_MODE.datac0.value)
def received_ping(self, data_in: bytes) -> None:
"""
Called if we received a ping
Args:
data_in:bytes:
"""
dxcallsign_CRC = bytes(data_in[4:7])
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13]))
helpers.add_to_heard_stations(
dxcallsign,
static.DXGRID,
"PING",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.send_data_to_socket_queue(
freedata="tnc-message",
ping="received",
uuid=str(uuid.uuid4()),
timestamp=int(time.time()),
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(dxcallsign, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
snr=str(static.SNR),
)
# check if callsign ssid override
valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4])
if not valid:
# PING packet not for me.
self.log.debug("[TNC] received_ping: ping not for this station.")
return
static.DXCALLSIGN_CRC = dxcallsign_CRC
static.DXCALLSIGN = dxcallsign
self.log.info(
"[TNC] PING REQ ["
+ str(mycallsign, "UTF-8")
+ "] <<< ["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
snr=static.SNR,
)
ping_frame = bytearray(self.length_sig0_frame)
ping_frame[:1] = bytes([FR_TYPE.PING_ACK.value])
ping_frame[1:4] = static.DXCALLSIGN_CRC
ping_frame[4:7] = static.MYCALLSIGN_CRC
ping_frame[7:13] = static.MYGRID
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.enqueue_frame_for_tx(ping_frame, c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx(ping_frame, c2_mode=FREEDV_MODE.datac0.value)
def received_ping_ack(self, data_in: bytes) -> None:
"""
Called if a PING ack has been received
Args:
data_in:bytes:
"""
static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXGRID = bytes(data_in[7:13]).rstrip(b"\x00")
self.send_data_to_socket_queue(
freedata="tnc-message",
ping="acknowledge",
uuid=str(uuid.uuid4()),
timestamp=int(time.time()),
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
snr=str(static.SNR),
)
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"PING-ACK",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
self.log.info(
"[TNC] PING ACK ["
+ str(self.mycallsign, "UTF-8")
+ "] >|< ["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]",
snr=static.SNR,
)
static.TNC_STATE = "IDLE"
def stop_transmission(self) -> None:
"""
Force a stop of the running transmission
"""
self.log.warning("[TNC] Stopping transmission!")
stop_frame = bytearray(self.length_sig0_frame)
stop_frame[:1] = bytes([FR_TYPE.ARQ_STOP.value])
stop_frame[1:4] = static.DXCALLSIGN_CRC
stop_frame[4:7] = static.MYCALLSIGN_CRC
# TODO: Not sure if we really need the session id when disconnecting
# stop_frame[1:2] = self.session_id
stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign)
self.enqueue_frame_for_tx(stop_frame, c2_mode=FREEDV_MODE.datac0.value, copies=6, repeat_delay=0)
static.TNC_STATE = "IDLE"
static.ARQ_STATE = False
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="stopped",
)
self.arq_cleanup()
def received_stop_transmission(
self, data_in: bytes
) -> None: # pylint: disable=unused-argument
"""
Received a transmission stop
"""
self.log.warning("[TNC] Stopping transmission!")
static.TNC_STATE = "IDLE"
static.ARQ_STATE = False
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="stopped",
uuid=self.transmission_uuid,
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
)
self.arq_cleanup()
# ----------- BROADCASTS
def run_beacon(self) -> None:
"""
Controlling function for running a beacon
Args:
self: arq class
Returns:
"""
try:
while True:
time.sleep(0.5)
while static.BEACON_STATE:
if (
not static.ARQ_SESSION
and not self.arq_file_transfer
and not static.BEACON_PAUSE
):
self.send_data_to_socket_queue(
freedata="tnc-message",
beacon="transmitting",
interval=self.beacon_interval,
)
self.log.info(
"[TNC] Sending beacon!", interval=self.beacon_interval
)
beacon_frame = bytearray(self.length_sig0_frame)
beacon_frame[:1] = bytes([FR_TYPE.BEACON.value])
beacon_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign)
beacon_frame[9:13] = static.MYGRID[:4]
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.enqueue_frame_for_tx(
beacon_frame,
c2_mode=FREEDV_MODE.fsk_ldpc_0.value,
)
else:
self.enqueue_frame_for_tx(beacon_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1,
repeat_delay=0)
interval_timer = time.time() + self.beacon_interval
while (
time.time() < interval_timer
and static.BEACON_STATE
and not static.BEACON_PAUSE
):
time.sleep(0.01)
except Exception as err:
self.log.debug("[TNC] run_beacon: ", exception=err)
def received_beacon(self, data_in: bytes) -> None:
"""
Called if we received a beacon
Args:
data_in:bytes:
"""
# here we add the received station to the heard stations buffer
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
dxgrid = bytes(data_in[9:13]).rstrip(b"\x00")
self.send_data_to_socket_queue(
freedata="tnc-message",
beacon="received",
uuid=str(uuid.uuid4()),
timestamp=int(time.time()),
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(dxcallsign, "UTF-8"),
dxgrid=str(dxgrid, "UTF-8"),
snr=str(static.SNR),
)
self.log.info(
"[TNC] BEACON RCVD ["
+ str(dxcallsign, "UTF-8")
+ "]["
+ str(dxgrid, "UTF-8")
+ "] ",
snr=static.SNR,
)
helpers.add_to_heard_stations(
dxcallsign,
dxgrid,
"BEACON",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
def transmit_cq(self) -> None:
"""
Transmit a CQ
Args:
self
Returns:
Nothing
"""
self.log.info("[TNC] CQ CQ CQ")
self.send_data_to_socket_queue(
freedata="tnc-message",
cq="transmitting",
)
cq_frame = bytearray(self.length_sig0_frame)
cq_frame[:1] = bytes([FR_TYPE.CQ.value])
cq_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign)
cq_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("UTF-8"))
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.log.debug("[TNC] CQ Frame:", data=[cq_frame])
if static.ENABLE_FSK:
self.enqueue_frame_for_tx(cq_frame, c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx(cq_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def received_cq(self, data_in: bytes) -> None:
"""
Called when we receive a CQ frame
Args:
data_in:bytes:
Returns:
Nothing
"""
# here we add the received station to the heard stations buffer
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
self.log.debug("[TNC] received_cq:", dxcallsign=dxcallsign)
dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8")
self.send_data_to_socket_queue(
freedata="tnc-message",
cq="received",
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
)
self.log.info(
"[TNC] CQ RCVD ["
+ str(dxcallsign, "UTF-8")
+ "]["
+ str(dxgrid, "UTF-8")
+ "] ",
snr=static.SNR,
)
helpers.add_to_heard_stations(
dxcallsign,
dxgrid,
"CQ CQ CQ",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
if static.RESPOND_TO_CQ:
self.transmit_qrv()
def transmit_qrv(self) -> None:
"""
Called when we send a QRV frame
Args:
self
"""
# Sleep a random amount of time before responding to make it more likely to be
# heard when many stations respond. Each DATAC0 frame is 0.44 sec (440ms) in
# duration, plus overhead. Set the wait interval to be random between 0 and 2s
# in 0.5s increments.
helpers.wait(randrange(0, 20, 5) / 10.0)
self.send_data_to_socket_queue(
freedata="tnc-message",
qrv="transmitting",
)
self.log.info("[TNC] Sending QRV!")
qrv_frame = bytearray(self.length_sig0_frame)
qrv_frame[:1] = bytes([FR_TYPE.QRV.value])
qrv_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign)
qrv_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("UTF-8"))
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.enqueue_frame_for_tx(qrv_frame, c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx(qrv_frame, c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
def received_qrv(self, data_in: bytes) -> None:
"""
Called when we receive a QRV frame
Args:
data_in:bytes:
"""
# here we add the received station to the heard stations buffer
dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7]))
dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8")
self.send_data_to_socket_queue(
freedata="tnc-message",
qrv="received",
)
self.log.info(
"[TNC] QRV RCVD ["
+ str(dxcallsign, "UTF-8")
+ "]["
+ str(dxgrid, "UTF-8")
+ "] ",
snr=static.SNR,
)
helpers.add_to_heard_stations(
dxcallsign,
dxgrid,
"QRV",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
# ------------ CALUCLATE TRANSFER RATES
def calculate_transfer_rate_rx(
self, rx_start_of_transmission: float, receivedbytes: int
) -> list:
"""
Calculate transfer rate for received data
Args:
rx_start_of_transmission:float:
receivedbytes:int:
Returns: List of:
bits_per_second: float,
bytes_per_minute: float,
transmission_percent: float
"""
try:
if static.TOTAL_BYTES == 0:
static.TOTAL_BYTES = 1
static.ARQ_TRANSMISSION_PERCENT = min(
int(
(
receivedbytes
* static.ARQ_COMPRESSION_FACTOR
/ static.TOTAL_BYTES
)
* 100
),
100,
)
transmissiontime = time.time() - self.rx_start_of_transmission
if receivedbytes > 0:
static.ARQ_BITS_PER_SECOND = int((receivedbytes * 8) / transmissiontime)
static.ARQ_BYTES_PER_MINUTE = int(
receivedbytes / (transmissiontime / 60)
)
else:
static.ARQ_BITS_PER_SECOND = 0
static.ARQ_BYTES_PER_MINUTE = 0
except Exception as err:
self.log.error(f"[TNC] calculate_transfer_rate_rx: Exception: {err}")
static.ARQ_TRANSMISSION_PERCENT = 0.0
static.ARQ_BITS_PER_SECOND = 0
static.ARQ_BYTES_PER_MINUTE = 0
return [
static.ARQ_BITS_PER_SECOND,
static.ARQ_BYTES_PER_MINUTE,
static.ARQ_TRANSMISSION_PERCENT,
]
def reset_statistics(self) -> None:
"""
Reset statistics
"""
# reset ARQ statistics
static.ARQ_BYTES_PER_MINUTE_BURST = 0
static.ARQ_BYTES_PER_MINUTE = 0
static.ARQ_BITS_PER_SECOND_BURST = 0
static.ARQ_BITS_PER_SECOND = 0
static.ARQ_TRANSMISSION_PERCENT = 0
static.TOTAL_BYTES = 0
def calculate_transfer_rate_tx(
self, tx_start_of_transmission: float, sentbytes: int, tx_buffer_length: int
) -> list:
"""
Calculate transfer rate for transmission
Args:
tx_start_of_transmission:float:
sentbytes:int:
tx_buffer_length:int:
Returns: List of:
bits_per_second: float,
bytes_per_minute: float,
transmission_percent: float
"""
try:
static.ARQ_TRANSMISSION_PERCENT = min(
int((sentbytes / tx_buffer_length) * 100), 100
)
transmissiontime = time.time() - tx_start_of_transmission
if sentbytes > 0:
static.ARQ_BITS_PER_SECOND = int((sentbytes * 8) / transmissiontime)
static.ARQ_BYTES_PER_MINUTE = int(sentbytes / (transmissiontime / 60))
else:
static.ARQ_BITS_PER_SECOND = 0
static.ARQ_BYTES_PER_MINUTE = 0
except Exception as err:
self.log.error(f"[TNC] calculate_transfer_rate_tx: Exception: {err}")
static.ARQ_TRANSMISSION_PERCENT = 0.0
static.ARQ_BITS_PER_SECOND = 0
static.ARQ_BYTES_PER_MINUTE = 0
return [
static.ARQ_BITS_PER_SECOND,
static.ARQ_BYTES_PER_MINUTE,
static.ARQ_TRANSMISSION_PERCENT,
]
# ----------------------CLEANUP AND RESET FUNCTIONS
def arq_cleanup(self) -> None:
"""
Cleanup function which clears all ARQ states
"""
if TESTMODE:
self.log.debug("[TNC] TESTMODE: arq_cleanup: Not performing cleanup.")
return
self.log.debug("[TNC] arq_cleanup")
self.session_id = bytes(1)
self.rx_frame_bof_received = False
self.rx_frame_eof_received = False
self.burst_ack = False
self.rpt_request_received = False
self.data_frame_ack_received = False
static.RX_BURST_BUFFER = []
static.RX_FRAME_BUFFER = b""
self.burst_ack_snr = 255
# reset modem receiving state to reduce cpu load
modem.RECEIVE_SIG0 = True
modem.RECEIVE_SIG1 = False
modem.RECEIVE_DATAC1 = False
modem.RECEIVE_DATAC3 = False
# modem.RECEIVE_FSK_LDPC_0 = False
modem.RECEIVE_FSK_LDPC_1 = False
# reset buffer overflow counter
static.BUFFER_OVERFLOW_COUNTER = [0, 0, 0, 0, 0]
self.is_IRS = False
self.burst_nack = False
self.burst_nack_counter = 0
self.frame_received_counter = 0
self.speed_level = len(self.mode_list) - 1
static.ARQ_SPEED_LEVEL = self.speed_level
# low bandwidth mode indicator
self.received_LOW_BANDWIDTH_MODE = False
# reset retry counter for rx channel / burst
self.n_retries_per_burst = 0
if not static.ARQ_SESSION:
static.TNC_STATE = "IDLE"
static.ARQ_STATE = False
self.arq_file_transfer = False
static.BEACON_PAUSE = False
def arq_reset_ack(self, state: bool) -> None:
"""
Funktion for resetting acknowledge states
Args:
state:bool:
"""
self.burst_ack = state
self.rpt_request_received = state
self.data_frame_ack_received = state
def set_listening_modes(self, enable_sig0: bool, enable_sig1: bool, mode: int) -> None:
"""
Function for setting the data modes we are listening to for saving cpu power
Args:
enable_sig0:int: Enable/Disable signalling mode 0
enable_sig1:int: Enable/Disable signalling mode 1
mode:int: Codec2 mode to listen for
"""
# set modes we want to listen to
modem.RECEIVE_SIG0 = enable_sig0
modem.RECEIVE_SIG1 = enable_sig1
if mode == FREEDV_MODE.datac1.value:
modem.RECEIVE_DATAC1 = True
self.log.debug("[TNC] Changing listening data mode", mode="datac1")
elif mode == FREEDV_MODE.datac3.value:
modem.RECEIVE_DATAC3 = True
self.log.debug("[TNC] Changing listening data mode", mode="datac3")
elif mode == FREEDV_MODE.fsk_ldpc_1.value:
modem.RECEIVE_FSK_LDPC_1 = True
self.log.debug("[TNC] Changing listening data mode", mode="fsk_ldpc_1")
else:
modem.RECEIVE_DATAC1 = True
modem.RECEIVE_DATAC3 = True
modem.RECEIVE_FSK_LDPC_1 = True
self.log.debug(
"[TNC] Changing listening data mode", mode="datac1/datac3/fsk_ldpc"
)
# ------------------------- WATCHDOG FUNCTIONS FOR TIMER
def watchdog(self) -> None:
"""Author: DJ2LS
Watchdog master function. From here, "pet" the watchdogs
"""
while True:
time.sleep(0.1)
self.data_channel_keep_alive_watchdog()
self.burst_watchdog()
self.arq_session_keep_alive_watchdog()
def burst_watchdog(self) -> None:
"""
Watchdog which checks if we are running into a connection timeout
DATA BURST
"""
# IRS SIDE
# TODO: We need to redesign this part for cleaner state handling
# Return if not ARQ STATE and not ARQ SESSION STATE as they are different use cases
if (
not static.ARQ_STATE
and static.ARQ_SESSION_STATE != "connected"
or not self.is_IRS
):
return
# We want to reach this state only if connected ( == return above not called )
if (
self.data_channel_last_received + self.time_list[self.speed_level]
<= time.time()
):
self.log.warning(
"[TNC] Frame timeout",
attempt=self.n_retries_per_burst,
max_attempts=self.rx_n_max_retries_per_burst,
speed_level=self.speed_level,
)
self.frame_received_counter = 0
self.burst_nack_counter += 1
if self.burst_nack_counter >= 2:
self.burst_nack_counter = 0
self.speed_level = max(self.speed_level - 1, 0)
static.ARQ_SPEED_LEVEL = self.speed_level
# Update modes we are listening to
self.set_listening_modes(True, True, self.mode_list[self.speed_level])
# Why not pass `snr` or `static.SNR`?
self.send_burst_nack_frame_watchdog(0)
# Update data_channel timestamp
self.data_channel_last_received = time.time()
self.n_retries_per_burst += 1
else:
# print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time())
pass
if self.n_retries_per_burst >= self.rx_n_max_retries_per_burst:
self.stop_transmission()
self.arq_cleanup()
def data_channel_keep_alive_watchdog(self) -> None:
"""
watchdog which checks if we are running into a connection timeout
DATA CHANNEL
"""
# and not static.ARQ_SEND_KEEP_ALIVE:
if static.ARQ_STATE and static.TNC_STATE == "BUSY":
time.sleep(0.01)
if (
self.data_channel_last_received + self.transmission_timeout
> time.time()
):
time.sleep(5)
timeleft = (self.data_channel_last_received + self.transmission_timeout) - time.time()
self.log.debug("Time left until timeout", seconds=timeleft)
# print(self.data_channel_last_received + self.transmission_timeout - time.time())
# pass
else:
# Clear the timeout timestamp
self.data_channel_last_received = 0
self.log.info(
"[TNC] DATA ["
+ str(self.mycallsign, "UTF-8")
+ "]<<T>>["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]"
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
)
self.arq_cleanup()
def arq_session_keep_alive_watchdog(self) -> None:
"""
watchdog which checks if we are running into a connection timeout
ARQ SESSION
"""
if (
static.ARQ_SESSION
and static.TNC_STATE == "BUSY"
and not self.arq_file_transfer
):
if self.arq_session_last_received + self.arq_session_timeout > time.time():
time.sleep(0.01)
else:
self.log.info(
"[TNC] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]<<T>>["
+ str(static.DXCALLSIGN, "UTF-8")
+ "]"
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="session",
status="failed",
reason="timeout",
)
self.close_session()
def heartbeat(self) -> None:
"""
Heartbeat thread which auto pauses and resumes the heartbeat signal when in an arq session
"""
while True:
time.sleep(0.01)
if (
static.ARQ_SESSION
and self.IS_ARQ_SESSION_MASTER
and static.ARQ_SESSION_STATE == "connected"
and not self.arq_file_transfer
):
time.sleep(1)
self.transmit_session_heartbeat()
time.sleep(2)
def send_test_frame(self) -> None:
"""Send an empty test frame"""
self.enqueue_frame_for_tx(
frame_to_tx=bytearray(126), c2_mode=FREEDV_MODE.datac3.value
)