splitting ARQ into subclasses

This commit is contained in:
DJ2LS 2023-11-19 22:12:30 +01:00
parent d94399e07c
commit b1d8810bcf
5 changed files with 2097 additions and 2001 deletions

View file

@ -14,12 +14,15 @@ import helpers
import structlog
from modem_frametypes import FRAME_TYPE as FR_TYPE
import data_handler_broadcasts
import data_handler_data_broadcasts
import data_handler_ping
import data_handler_arq
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, RX_BUFFER, MODEM_TRANSMIT_QUEUE
from data_handler_broadcasts import BROADCAST
from data_handler_data_broadcasts import DATABROADCAST
from data_handler_ping import PING
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, RX_BUFFER, MODEM_TRANSMIT_QUEUE
from data_handler_arq_iss import ISS
from data_handler_arq_irs import IRS
from data_handler_arq import ARQ
from data_handler_arq_session import SESSION
TESTMODE = False
@ -35,10 +38,18 @@ class DATA:
self.config = config
# init data handlers
self.broadcasts = data_handler_broadcasts.BROADCAST(config, event_queue, states)
self.data_broadcasts = data_handler_data_broadcasts.DATABROADCAST(config, event_queue, states)
self.ping = data_handler_ping.PING(config, event_queue, states)
self.arq = data_handler_arq.ARQ(config, event_queue, states)
self.broadcasts = BROADCAST(config, event_queue, states)
self.data_broadcasts = DATABROADCAST(config, event_queue, states)
self.ping = PING(config, event_queue, states)
# Initialize ARQ
self.arq = ARQ(config, event_queue, states)
self.arq_irs = IRS(config, event_queue, states)
self.arq_iss = ISS(config, event_queue, states)
self.arq_session = SESSION(config, event_queue, states)
self.data_queue_transmit = DATA_QUEUE_TRANSMIT
self.data_queue_received = DATA_QUEUE_RECEIVED
@ -47,41 +58,41 @@ class DATA:
# instead of a long series of if-elif-else statements.
self.rx_dispatcher = {
FR_TYPE.ARQ_DC_OPEN_ACK_N.value: (
self.arq.arq_received_channel_is_open,
self.arq_iss.arq_received_channel_is_open,
"ARQ OPEN ACK (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_ACK_W.value: (
self.arq.arq_received_channel_is_open,
self.arq_iss.arq_received_channel_is_open,
"ARQ OPEN ACK (Wide)",
),
FR_TYPE.ARQ_DC_OPEN_N.value: (
self.arq.arq_received_data_channel_opener,
self.arq_irs.arq_received_data_channel_opener,
"ARQ Data Channel Open (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_W.value: (
self.arq.arq_received_data_channel_opener,
self.arq_irs.arq_received_data_channel_opener,
"ARQ Data Channel Open (Wide)",
),
FR_TYPE.ARQ_SESSION_CLOSE.value: (
self.arq.received_session_close,
self.arq_session.received_session_close,
"ARQ CLOSE SESSION",
),
FR_TYPE.ARQ_SESSION_HB.value: (
self.arq.received_session_heartbeat,
self.arq_session.received_session_heartbeat,
"ARQ HEARTBEAT",
),
FR_TYPE.ARQ_SESSION_OPEN.value: (
self.arq.received_session_opener,
self.arq_session.received_session_opener,
"ARQ OPEN SESSION",
),
FR_TYPE.ARQ_STOP.value: (self.arq.received_stop_transmission, "ARQ STOP TX"),
FR_TYPE.BEACON.value: (self.broadcasts.received_beacon, "BEACON"),
FR_TYPE.BURST_ACK.value: (self.arq.burst_ack_nack_received, "BURST ACK"),
FR_TYPE.BURST_NACK.value: (self.arq.burst_ack_nack_received, "BURST NACK"),
FR_TYPE.BURST_ACK.value: (self.arq_iss.burst_ack_nack_received, "BURST ACK"),
FR_TYPE.BURST_NACK.value: (self.arq_iss.burst_ack_nack_received, "BURST NACK"),
FR_TYPE.CQ.value: (self.broadcasts.received_cq, "CQ"),
FR_TYPE.FR_ACK.value: (self.arq.frame_ack_received, "FRAME ACK"),
FR_TYPE.FR_NACK.value: (self.arq.frame_nack_received, "FRAME NACK"),
FR_TYPE.FR_REPEAT.value: (self.arq.burst_rpt_received, "REPEAT REQUEST"),
FR_TYPE.FR_ACK.value: (self.arq_iss.frame_ack_received, "FRAME ACK"),
FR_TYPE.FR_NACK.value: (self.arq_iss.frame_nack_received, "FRAME NACK"),
FR_TYPE.FR_REPEAT.value: (self.arq_iss.burst_rpt_received, "REPEAT REQUEST"),
FR_TYPE.PING_ACK.value: (self.ping.received_ping_ack, "PING ACK"),
FR_TYPE.PING.value: (self.ping.received_ping, "PING"),
FR_TYPE.QRV.value: (self.broadcasts.received_qrv, "QRV"),
@ -93,7 +104,7 @@ class DATA:
self.command_dispatcher = {
# "CONNECT": (self.arq_session_handler, "CONNECT"),
"CQ": (self.broadcasts.transmit_cq, "CQ"),
"DISCONNECT": (self.arq.close_session, "DISCONNECT"),
"DISCONNECT": (self.arq_session.close_session, "DISCONNECT"),
"SEND_TEST_FRAME": (self.broadcasts.send_test_frame, "TEST"),
"STOP": (self.arq.stop_transmission, "STOP"),
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,809 @@
import base64
import time
import uuid
import lzma
import helpers
import numpy as np
from codec2 import FREEDV_MODE
from queues import RX_BUFFER
from modem_frametypes import FRAME_TYPE as FR_TYPE
from data_handler_arq import ARQ
class IRS(ARQ):
def __init__(self, config, event_queue, states):
super().__init__(config, event_queue, states)
pass
def arq_process_received_data_frame(self, data_frame, snr, signed):
"""
"""
# transmittion duration
signed = "True" if signed else "False"
duration = time.time() - self.rx_start_of_transmission
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr
)
self.log.info("[Modem] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter,
bytesperminute=self.states.arq_bytes_per_minute, total_bytes=self.states.arq_total_bytes,
duration=duration, hmac_signed=signed)
# Decompress the data frame
data_frame_decompressed = lzma.decompress(data_frame)
self.arq_compression_factor = len(data_frame_decompressed) / len(
data_frame
)
data_frame = data_frame_decompressed
self.transmission_uuid = str(uuid.uuid4())
timestamp = int(time.time())
# Re-code data_frame in base64, UTF-8 for JSON UI communication.
base64_data = base64.b64encode(data_frame).decode("UTF-8")
# check if RX_BUFFER isn't full
if not RX_BUFFER.full():
# make sure we have always the correct buffer size
RX_BUFFER.maxsize = int(self.arq_rx_buffer_size)
else:
# if full, free space by getting an item
self.log.info(
"[Modem] ARQ | RX | RX_BUFFER FULL - dropping old data",
buffer_size=RX_BUFFER.qsize(),
maxsize=int(self.arq_rx_buffer_size)
)
RX_BUFFER.get()
# add item to RX_BUFFER
self.log.info(
"[Modem] ARQ | RX | saving data to rx buffer",
buffer_size=RX_BUFFER.qsize() + 1,
maxsize=RX_BUFFER.maxsize
)
try:
# RX_BUFFER[0] = transmission uuid
# RX_BUFFER[1] = timestamp
# RX_BUFFER[2] = dxcallsign
# RX_BUFFER[3] = dxgrid
# RX_BUFFER[4] = data
# RX_BUFFER[5] = hmac signed
# RX_BUFFER[6] = compression factor
# RX_BUFFER[7] = bytes per minute
# RX_BUFFER[8] = duration
# RX_BUFFER[9] = self.frame_nack_counter
# RX_BUFFER[10] = speed list stats
RX_BUFFER.put(
[
self.transmission_uuid,
timestamp,
self.dxcallsign,
self.dxgrid,
base64_data,
signed,
self.arq_compression_factor,
self.states.arq_bytes_per_minute,
duration,
self.frame_nack_counter,
self.states.arq_speed_list
]
)
except Exception as e:
# File "/usr/lib/python3.7/queue.py", line 133, in put
# if self.maxsize > 0
# TypeError: '>' not supported between instances of 'str' and 'int'
#
# Occurs on Raspberry Pi and Python 3.7
self.log.error(
"[Modem] ARQ | RX | error occurred when saving data!",
e=e,
uuid=self.transmission_uuid,
timestamp=timestamp,
dxcall=self.dxcallsign,
dxgrid=self.dxgrid,
data=base64_data
)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="transmission",
status="received",
uuid=self.transmission_uuid,
percent=self.states.arq_transmission_percent,
bytesperminute=self.states.arq_bytes_per_minute,
compression=self.arq_compression_factor,
timestamp=timestamp,
finished=0,
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(self.dxcallsign, "UTF-8"),
dxgrid=str(self.dxgrid, "UTF-8"),
data=base64_data,
irs=helpers.bool_to_string(self.is_IRS),
hmac_signed=signed,
duration=duration,
nacks=self.frame_nack_counter,
speed_list=self.states.arq_speed_list
)
if self.enable_stats:
duration = time.time() - self.rx_start_of_transmission
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="received", duration=duration)
self.log.info(
"[Modem] ARQ | RX | SENDING DATA FRAME ACK")
self.send_data_ack_frame(snr)
# Update statistics AFTER the frame ACK is sent
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr
)
self.log.info(
"[Modem] | RX | DATACHANNEL ["
+ str(self.mycallsign, "UTF-8")
+ "]<< >>["
+ str(self.dxcallsign, "UTF-8")
+ "]",
snr=snr,
)
def arq_received_data_channel_opener(self, data_in: bytes, snr):
"""
Received request to open data channel frame
Args:
data_in:bytes:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
# stop processing if we don't want to respond to a call when not in a arq session
if not self.respond_to_call and not self.states.is_arq_session:
return False
# stop processing if not in arq session, but modem state is busy and we have a different session id
# use-case we get a connection request while connecting to another station
if not self.states.is_arq_session and self.states.is_modem_busy and data_in[13:14] != self.session_id:
return False
self.arq_file_transfer = True
# check if callsign ssid override
_, self.mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4], self.ssid_list)
# ignore channel opener if already in ARQ STATE
# use case: Station A is connecting to Station B while
# Station B already tries connecting to Station A.
# For avoiding ignoring repeated connect request in case of packet loss
# we are only ignoring packets in case we are ISS
if self.arq_state_event.is_set() and not self.is_IRS:
return False
self.is_IRS = True
self.dxcallsign_crc = bytes(data_in[4:7])
self.dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13]))
self.states.set("dxcallsign", self.dxcallsign)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="transmission",
status="opening",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
irs=helpers.bool_to_string(self.is_IRS)
)
frametype = int.from_bytes(bytes(data_in[:1]), "big")
# check if we received low bandwidth mode
# possible channel constellations
# ISS(w) <-> IRS(w)
# ISS(w) <-> IRS(n)
# ISS(n) <-> IRS(w)
# ISS(n) <-> IRS(n)
if frametype == FR_TYPE.ARQ_DC_OPEN_W.value and not self.low_bandwidth_mode:
# ISS(w) <-> IRS(w)
constellation = "ISS(w) <-> IRS(w)"
self.received_LOW_BANDWIDTH_MODE = False
self.mode_list = self.mode_list_high_bw
self.time_list = self.time_list_high_bw
self.snr_list = self.snr_list_high_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_W.value:
# ISS(w) <-> IRS(n)
constellation = "ISS(w) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = False
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value and not self.low_bandwidth_mode:
# ISS(n) <-> IRS(w)
constellation = "ISS(n) <-> IRS(w)"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value:
# ISS(n) <-> IRS(n)
constellation = "ISS(n) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
else:
constellation = "not matched"
self.received_LOW_BANDWIDTH_MODE = True
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
# get mode which fits to given SNR
# initially set speed_level 0 in case of bad SNR and no matching mode
self.speed_level = 0
# calculate initial speed level in correlation to latest known SNR
for i in range(len(self.mode_list)):
if snr >= self.snr_list[i]:
self.speed_level = i
# check if speed level fits to busy condition
if not self.check_if_mode_fits_to_busy_slot():
self.speed_level = 0
# Update modes we are listening to
self.set_listening_modes(True, True, self.mode_list[self.speed_level])
self.dxgrid = b'------'
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"DATA",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.session_id = data_in[13:14]
# check again if callsign ssid override
_, self.mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4], self.ssid_list)
self.log.info(
"[Modem] ARQ | DATA | RX | ["
+ str(self.mycallsign, "UTF-8")
+ "]>> <<["
+ str(self.dxcallsign, "UTF-8")
+ "]",
channel_constellation=constellation,
)
# Reset data_channel/burst timestamps
# TIMING TEST
self.data_channel_last_received = int(time.time())
self.burst_last_received = int(time.time() + 10) # we might need some more time so lets increase this
# Set ARQ State AFTER resetting timeouts
# this avoids timeouts starting too early
self.states.set("is_arq_state", True)
self.states.set("is_modem_busy", True)
self.reset_statistics()
# Select the frame type based on the current Modem mode
if self.low_bandwidth_mode or self.received_LOW_BANDWIDTH_MODE:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_ACK_N.value])
self.log.debug("[Modem] Responding with low bandwidth mode")
else:
frametype = bytes([FR_TYPE.ARQ_DC_OPEN_ACK_W.value])
self.log.debug("[Modem] Responding with high bandwidth mode")
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = frametype
connection_frame[1:2] = self.session_id
connection_frame[8:9] = bytes([self.speed_level])
connection_frame[13:14] = bytes([self.arq_protocol_version])
self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="transmission",
status="opened",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
irs=helpers.bool_to_string(self.is_IRS)
)
self.log.info(
"[Modem] ARQ | DATA | RX | ["
+ str(self.mycallsign, "UTF-8")
+ "]>>|<<["
+ str(self.dxcallsign, "UTF-8")
+ "]",
bandwidth="wide",
snr=snr,
)
# set start of transmission for our statistics
self.rx_start_of_transmission = time.time()
# TIMING TEST
# Reset data_channel/burst timestamps once again for avoiding running into timeout
# and therefore sending a NACK
self.data_channel_last_received = int(time.time())
self.burst_last_received = int(time.time() + 10) # we might need some more time so lets increase this
def calculate_transfer_rate_rx(
self, rx_start_of_transmission: float, receivedbytes: int, snr
) -> list:
"""
Calculate transfer rate for received data
Args:
rx_start_of_transmission:float:
receivedbytes:int:
Returns: List of:
bits_per_second: float,
bytes_per_minute: float,
transmission_percent: float
"""
try:
if self.states.arq_total_bytes == 0:
self.states.set("arq_total_bytes", 1)
arq_transmission_percent = min(
int(
(
receivedbytes
* self.arq_compression_factor
/ self.states.arq_total_bytes
)
* 100
),
100,
)
transmissiontime = time.time() - self.rx_start_of_transmission
if receivedbytes > 0:
arq_bits_per_second = int((receivedbytes * 8) / transmissiontime)
bytes_per_minute = int(
receivedbytes / (transmissiontime / 60)
)
arq_seconds_until_finish = int(((self.states.arq_total_bytes - receivedbytes) / (
bytes_per_minute * self.arq_compression_factor)) * 60) - 20 # offset because of frame ack/nack
speed_chart = {"snr": snr, "bpm": bytes_per_minute, "timestamp": int(time.time())}
# check if data already in list
if speed_chart not in self.states.arq_speed_list:
self.states.arq_speed_list.append(speed_chart)
else:
arq_bits_per_second = 0
bytes_per_minute = 0
arq_seconds_until_finish = 0
except Exception as err:
self.log.error(f"[Modem] calculate_transfer_rate_rx: Exception: {err}")
arq_transmission_percent = 0.0
arq_bits_per_second = 0
bytes_per_minute = 0
self.states.set("arq_bits_per_second", arq_bits_per_second)
self.states.set("bytes_per_minute", bytes_per_minute)
self.states.set("arq_transmission_percent", arq_transmission_percent)
self.states.set("arq_compression_factor", self.arq_compression_factor)
return [
arq_bits_per_second,
bytes_per_minute,
arq_transmission_percent,
]
def send_burst_nack_frame(self, snr: bytes) -> None:
"""Build and send NACK frame for received DATA frame"""
nack_frame = bytearray(self.length_sig1_frame)
nack_frame[:1] = bytes([FR_TYPE.FR_NACK.value])
nack_frame[1:2] = self.session_id
nack_frame[2:3] = helpers.snr_to_bytes(snr)
nack_frame[3:4] = bytes([int(self.speed_level)])
nack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
# TRANSMIT NACK FRAME FOR BURST
# TODO Do we have to send ident frame?
# self.enqueue_frame_for_tx([ack_frame, self.send_ident_frame(False)], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0)
# wait if we have a channel busy condition
if self.states.channel_busy:
self.channel_busy_handler()
self.enqueue_frame_for_tx([nack_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0)
# reset burst timeout in case we had to wait too long
self.burst_last_received = time.time()
def send_burst_nack_frame_watchdog(self, tx_n_frames_per_burst) -> None:
"""Build and send NACK frame for watchdog timeout"""
# increment nack counter for transmission stats
self.frame_nack_counter += 1
# we need to clear our rx burst buffer
self.arq_rx_burst_buffer = []
# Create and send ACK frame
self.log.info("[Modem] ARQ | RX | SENDING NACK")
nack_frame = bytearray(self.length_sig1_frame)
nack_frame[:1] = bytes([FR_TYPE.BURST_NACK.value])
nack_frame[1:2] = self.session_id
nack_frame[2:3] = helpers.snr_to_bytes(0)
nack_frame[3:4] = bytes([int(self.speed_level)])
nack_frame[4:5] = bytes([int(tx_n_frames_per_burst)])
nack_frame[5:9] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
# wait if we have a channel busy condition
if self.states.channel_busy:
self.channel_busy_handler()
# TRANSMIT NACK FRAME FOR BURST
self.enqueue_frame_for_tx([nack_frame], c2_mode=FREEDV_MODE.sig1.value, copies=1, repeat_delay=0)
# reset frame counter for not increasing speed level
self.frame_received_counter = 0
def arq_data_received(
self, data_in: bytes, bytes_per_frame: int, snr: float, freedv
) -> None:
"""
Args:
data_in:bytes:
bytes_per_frame:int:
snr:float:
freedv:
Returns:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
data_in = bytes(data_in)
# only process data if we are in ARQ and BUSY state else return to quit
if not self.states.is_arq_state and not self.states.is_modem_busy:
self.log.warning("[Modem] wrong modem state - dropping data", is_arq_state=self.states.is_arq_state,
modem_state=self.states.is_modem_busy)
return
self.arq_file_transfer = True
self.states.set("is_modem_busy", True)
self.states.set("is_arq_state", True)
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
self.burst_last_received = int(time.time())
# Extract some important data from the frame
# Get sequence number of burst frame
self.rx_n_frame_of_burst = int.from_bytes(bytes(data_in[:1]), "big") - 10
# Get number of bursts from received frame
self.rx_n_frames_per_burst = int.from_bytes(bytes(data_in[1:2]), "big")
# The RX burst buffer needs to have a fixed length filled with "None".
# We need this later for counting the "Nones" to detect missing data.
# Check if burst buffer has expected length else create it
if len(self.arq_rx_burst_buffer) != self.rx_n_frames_per_burst:
self.arq_rx_burst_buffer = [None] * self.rx_n_frames_per_burst
# Append data to rx burst buffer
self.arq_rx_burst_buffer[self.rx_n_frame_of_burst] = data_in[self.arq_burst_header_size:] # type: ignore
self.dxgrid = b'------'
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"DATA",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
# Check if we received all frames in the burst by checking if burst buffer has no more "Nones"
# This is the ideal case because we received all data
if None not in self.arq_rx_burst_buffer:
# then iterate through burst buffer and stick the burst together
# the temp burst buffer is needed for checking, if we already received data
temp_burst_buffer = b""
for value in self.arq_rx_burst_buffer:
# self.arq_rx_frame_buffer += self.arq_rx_burst_buffer[i]
temp_burst_buffer += bytes(value) # type: ignore
# free up burst buffer
self.arq_rx_burst_buffer = []
# TODO Needs to be removed as soon as mode error is fixed
# catch possible modem error which leads into false byteorder
# modem possibly decodes too late - data then is pushed to buffer
# which leads into wrong byteorder
# Lets put this in try/except so we are not crashing modem as its highly experimental
# This might only work for datac1 and datac3
try:
# area_of_interest = (modem.get_bytes_per_frame(self.mode_list[speed_level] - 1) -3) * 2
if self.arq_rx_frame_buffer.endswith(temp_burst_buffer[:246]) and len(temp_burst_buffer) >= 246:
self.log.warning(
"[Modem] ARQ | RX | wrong byteorder received - dropping data"
)
# we need to run a return here, so we are not sending an ACK
# return
except Exception as e:
self.log.warning(
"[Modem] ARQ | RX | wrong byteorder check failed", e=e
)
self.log.debug("[Modem] temp_burst_buffer", buffer=temp_burst_buffer)
self.log.debug("[Modem] self.arq_rx_frame_buffer", buffer=self.arq_rx_frame_buffer)
# if frame buffer ends not with the current frame, we are going to append new data
# if data already exists, we received the frame correctly,
# but the ACK frame didn't receive its destination (ISS)
if self.arq_rx_frame_buffer.endswith(temp_burst_buffer):
self.log.info(
"[Modem] ARQ | RX | Frame already received - sending ACK again"
)
else:
# Here we are going to search for our data in the last received bytes.
# This reduces the chance we will lose the entire frame in the case of signalling frame loss
# self.arq_rx_frame_buffer --> existing data
# temp_burst_buffer --> new data
# search_area --> area where we want to search
search_area = self.arq_burst_last_payload * self.rx_n_frames_per_burst
search_position = len(self.arq_rx_frame_buffer) - search_area
# if search position < 0, then search position = 0
search_position = max(0, search_position)
# find position of data. returns -1 if nothing found in area else >= 0
# we are beginning from the end, so if data exists twice or more,
# only the last one should be replaced
# we are going to only check position against minimum data frame payload
# use case: receive data, which already contains received data
# while the payload of data received before is shorter than actual payload
get_position = self.arq_rx_frame_buffer[search_position:].rfind(
temp_burst_buffer[:self.arq_burst_minimum_payload]
)
# if we find data, replace it at this position with the new data and strip it
if get_position >= 0:
self.arq_rx_frame_buffer = self.arq_rx_frame_buffer[
: search_position + get_position
]
self.log.warning(
"[Modem] ARQ | RX | replacing existing buffer data",
area=search_area,
pos=get_position,
)
else:
self.log.debug("[Modem] ARQ | RX | appending data to buffer")
self.arq_rx_frame_buffer += temp_burst_buffer
self.arq_burst_last_payload = len(temp_burst_buffer)
# Check if we didn't receive a BOF and EOF yet to avoid sending
# ack frames if we already received all data
if (
not self.rx_frame_bof_received
and not self.rx_frame_eof_received
and data_in.find(self.data_frame_eof) < 0
):
self.arq_calculate_speed_level(snr)
# TIMING TEST
# self.data_channel_last_received = int(time.time()) + 6 + 6
# self.burst_last_received = int(time.time()) + 6 + 6
self.data_channel_last_received = int(time.time())
self.burst_last_received = int(time.time())
# Create and send ACK frame
self.log.info("[Modem] ARQ | RX | SENDING ACK", finished=self.states.arq_seconds_until_finish,
bytesperminute=self.states.arq_bytes_per_minute)
self.send_burst_ack_frame(snr)
# Reset n retries per burst counter
self.n_retries_per_burst = 0
# calculate statistics
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr
)
# send a network message with information
self.send_data_to_socket_queue(
freedata="modem-message",
arq="transmission",
status="receiving",
uuid=self.transmission_uuid,
percent=self.states.arq_transmission_percent,
bytesperminute=self.states.arq_bytes_per_minute,
compression=self.arq_compression_factor,
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
finished=self.states.arq_seconds_until_finish,
irs=helpers.bool_to_string(self.is_IRS)
)
else:
self.log.warning(
"[Modem] data_handler: missing data in burst buffer...",
frame=self.rx_n_frame_of_burst + 1,
frames=self.rx_n_frames_per_burst
)
# We have a BOF and EOF flag in our data. If we received both we received our frame.
# In case of loosing data, but we received already a BOF and EOF we need to make sure, we
# received the complete last burst by checking it for Nones
bof_position = self.arq_rx_frame_buffer.find(self.data_frame_bof)
eof_position = self.arq_rx_frame_buffer.find(self.data_frame_eof)
# get total bytes per transmission information as soon we received a frame with a BOF
if bof_position >= 0:
self.arq_extract_statistics_from_data_frame(bof_position, eof_position, snr)
if (
bof_position >= 0
and eof_position > 0
and None not in self.arq_rx_burst_buffer
):
self.log.debug(
"[Modem] arq_data_received:",
bof_position=bof_position,
eof_position=eof_position,
)
self.rx_frame_bof_received = True
self.rx_frame_eof_received = True
# Extract raw data from buffer
payload = self.arq_rx_frame_buffer[
bof_position + len(self.data_frame_bof): eof_position
]
# Get the data frame crc
data_frame_crc = payload[:4] # 0:4 = 4 bytes
# Get the data frame length
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 = 4 bytes
self.states.set("arq_total_bytes", frame_length)
# 8:9 = compression factor
data_frame = payload[9:]
data_frame_crc_received = helpers.get_crc_32(data_frame)
# check if hmac signing enabled
if self.enable_hmac:
self.log.info(
"[Modem] [HMAC] Enabled",
)
# now check if we have valid hmac signature - returns salt or bool
salt_found = helpers.search_hmac_salt(self.dxcallsign, self.mycallsign, data_frame_crc, data_frame,
token_iters=100)
if salt_found:
# hmac digest received
self.arq_process_received_data_frame(data_frame, snr, signed=True)
else:
# hmac signature wrong
self.arq_process_received_data_frame(data_frame, snr, signed=False)
elif data_frame_crc == data_frame_crc_received:
self.log.warning(
"[Modem] [HMAC] Disabled, using CRC",
)
self.arq_process_received_data_frame(data_frame, snr, signed=False)
else:
self.send_data_to_socket_queue(
freedata="modem-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
irs=helpers.bool_to_string(self.is_IRS)
)
duration = time.time() - self.rx_start_of_transmission
self.log.warning(
"[Modem] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
e="wrong crc",
expected=data_frame_crc.hex(),
received=data_frame_crc_received.hex(),
nacks=self.frame_nack_counter,
duration=duration,
bytesperminute=self.states.arq_bytes_per_minute,
compression=self.arq_compression_factor,
data=data_frame,
)
if self.enable_stats:
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration)
self.log.info("[Modem] ARQ | RX | Sending NACK", finished=self.states.arq_seconds_until_finish,
bytesperminute=self.states.arq_bytes_per_minute)
self.send_burst_nack_frame(snr)
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
# Finally cleanup our buffers and states,
self.arq_cleanup()
def arq_extract_statistics_from_data_frame(self, bof_position, eof_position, snr):
payload = self.arq_rx_frame_buffer[
bof_position + len(self.data_frame_bof): eof_position
]
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes
self.states.set("arq_total_bytes", frame_length)
compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes
# limit to max value of 255
compression_factor = np.clip(compression_factor, 0, 255)
self.arq_compression_factor = compression_factor / 10
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(self.arq_rx_frame_buffer), snr
)
def send_burst_ack_frame(self, snr) -> None:
"""Build and send ACK frame for burst DATA frame"""
ack_frame = bytearray(self.length_sig1_frame)
ack_frame[:1] = bytes([FR_TYPE.BURST_ACK.value])
ack_frame[1:2] = self.session_id
ack_frame[2:3] = helpers.snr_to_bytes(snr)
ack_frame[3:4] = bytes([int(self.speed_level)])
ack_frame[4:8] = len(self.arq_rx_frame_buffer).to_bytes(4, byteorder="big")
# wait if we have a channel busy condition
if self.states.channel_busy:
self.channel_busy_handler()
# Transmit frame
self.enqueue_frame_for_tx([ack_frame], c2_mode=FREEDV_MODE.sig1.value)
def send_data_ack_frame(self, snr) -> None:
"""Build and send ACK frame for received DATA frame"""
ack_frame = bytearray(self.length_sig1_frame)
ack_frame[:1] = bytes([FR_TYPE.FR_ACK.value])
ack_frame[1:2] = self.session_id
ack_frame[2:3] = helpers.snr_to_bytes(snr)
# wait if we have a channel busy condition
if self.states.channel_busy:
self.channel_busy_handler()
# Transmit frame
self.enqueue_frame_for_tx([ack_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0)
def send_retransmit_request_frame(self) -> None:
# check where a None is in our burst buffer and do frame+1, because lists start at 0
# FIXME Check to see if there's a `frame - 1` in the receive portion. Remove both if there is.
missing_frames = [
frame + 1
for frame, element in enumerate(self.arq_rx_burst_buffer)
if element is None
]
rpt_frame = bytearray(self.length_sig1_frame)
rpt_frame[:1] = bytes([FR_TYPE.FR_REPEAT.value])
rpt_frame[1:2] = self.session_id
rpt_frame[2:2 + len(missing_frames)] = missing_frames
self.log.info("[Modem] ARQ | RX | Requesting", frames=missing_frames)
# Transmit frame
self.enqueue_frame_for_tx([rpt_frame], c2_mode=FREEDV_MODE.sig1.value, copies=1, repeat_delay=0)

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,234 @@
import time
import helpers
from codec2 import FREEDV_MODE
from queues import MODEM_TRANSMIT_QUEUE
from modem_frametypes import FRAME_TYPE as FR_TYPE
from data_handler_arq import ARQ
class SESSION(ARQ):
def __init__(self, config, event_queue, states):
super().__init__(config, event_queue, states)
pass
def received_session_close(self, data_in: bytes, snr):
"""
Closes the session when a close session frame is received and
the DXCALLSIGN_CRC matches the remote station participating in the session.
Args:
data_in:bytes:
"""
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
# Close the session if the CRC matches the remote station in static.
_valid_crc, mycallsign = helpers.check_callsign(self.mycallsign, bytes(data_in[2:5]), self.ssid_list)
_valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2]))
if (_valid_crc or _valid_session) and self.states.arq_session_state not in ["disconnected"]:
self.states.set("arq_session_state", "disconnected")
self.dxgrid = b'------'
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"DATA",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.log.info(
"[Modem] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]<<X>>["
+ str(self.dxcallsign, "UTF-8")
+ "]",
self.states.arq_session_state,
)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="session",
status="close",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
)
self.IS_ARQ_SESSION_MASTER = False
self.states.is_arq_session = False
self.arq_cleanup()
def transmit_session_heartbeat(self) -> None:
"""Send ARQ sesion heartbeat while connected"""
# self.states.is_arq_session = True
# self.states.set("is_modem_busy", True)
# self.states.set("arq_session_state", "connected")
connection_frame = bytearray(self.length_sig0_frame)
connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value])
connection_frame[1:2] = self.session_id
self.send_data_to_socket_queue(
freedata="modem-message",
arq="session",
status="connected",
heartbeat="transmitting",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
)
self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0)
def received_session_heartbeat(self, data_in: bytes, snr) -> None:
"""
Received an ARQ session heartbeat, record and update state accordingly.
Args:
data_in:bytes:
"""
# Accept session data if the DXCALLSIGN_CRC matches the station in static or session id.
_valid_crc, _ = helpers.check_callsign(self.dxcallsign, bytes(data_in[4:7]), self.ssid_list)
_valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2]))
if _valid_crc or _valid_session and self.states.arq_session_state in ["connected", "connecting"]:
self.log.debug("[Modem] Received session heartbeat")
self.dxgrid = b'------'
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"SESSION-HB",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="session",
status="connected",
heartbeat="received",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
)
self.states.is_arq_session = True
self.states.set("arq_session_state", "connected")
self.states.set("is_modem_busy", True)
# Update the timeout timestamps
self.arq_session_last_received = int(time.time())
self.data_channel_last_received = int(time.time())
# transmit session heartbeat only
# -> if not session master
# --> this will be triggered by heartbeat watchdog
# -> if not during a file transfer
# -> if ARQ_SESSION_STATE != disconnecting, disconnected, failed
# --> to avoid heartbeat toggle loops while disconnecting
if (
not self.IS_ARQ_SESSION_MASTER
and not self.arq_file_transfer
and self.states.arq_session_state != 'disconnecting'
and self.states.arq_session_state != 'disconnected'
and self.states.arq_session_state != 'failed'
):
self.transmit_session_heartbeat()
def close_session(self) -> None:
"""Close the ARQ session"""
self.states.set("arq_session_state", "disconnecting")
self.log.info(
"[Modem] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]<<X>>["
+ str(self.dxcallsign, "UTF-8")
+ "]",
self.states.arq_session_state,
)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="session",
status="close",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
)
self.IS_ARQ_SESSION_MASTER = False
self.states.is_arq_session = False
# we need to send disconnect frame before doing arq cleanup
# we would lose our session id then
self.send_disconnect_frame()
# transmit morse identifier if configured
if self.enable_morse_identifier:
MODEM_TRANSMIT_QUEUE.put(["morse", 1, 0, self.mycallsign])
self.arq_cleanup()
def received_session_opener(self, data_in: bytes, snr) -> None:
"""
Received a session open request packet.
Args:
data_in:bytes:
"""
# if we don't want to respond to calls, return False
if not self.respond_to_call:
return False
# ignore channel opener if already in ARQ STATE
# use case: Station A is connecting to Station B while
# Station B already tries connecting to Station A.
# For avoiding ignoring repeated connect request in case of packet loss
# we are only ignoring packets in case we are ISS
if self.states.is_arq_session and self.IS_ARQ_SESSION_MASTER:
return False
self.IS_ARQ_SESSION_MASTER = False
self.states.set("arq_session_state", "connecting")
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
self.session_id = bytes(data_in[1:2])
self.dxcallsign_crc = bytes(data_in[5:8])
self.dxcallsign = helpers.bytes_to_callsign(bytes(data_in[8:14]))
self.states.set("dxcallsign", self.dxcallsign)
# check if callsign ssid override
valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[2:5], self.ssid_list)
self.mycallsign = mycallsign
self.dxgrid = b'------'
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"DATA",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.log.info(
"[Modem] SESSION ["
+ str(self.mycallsign, "UTF-8")
+ "]>>|<<["
+ str(self.dxcallsign, "UTF-8")
+ "]",
self.states.arq_session_state,
)
self.states.is_arq_session = True
self.states.set("is_modem_busy", True)
self.send_data_to_socket_queue(
freedata="modem-message",
arq="session",
status="connected",
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
)
self.transmit_session_heartbeat()