Merge pull request #234 from DJ2LS/refactor_N2KIQ_202207

Refactor N2KIQ 202207
This commit is contained in:
Paul Kronenwetter 2022-07-11 18:38:59 -04:00 committed by GitHub
commit a9d935ea63
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 178 additions and 235 deletions

View file

@ -221,7 +221,9 @@ def test_chat_text(
for p_item in proc: for p_item in proc:
assert p_item.exitcode == 0 assert p_item.exitcode == 0
p_item.close() # p_item.close() # Python 3.7+ only
p_item.terminate()
p_item.join()
analyze_results(s1_data, s2_data, STATIONS) analyze_results(s1_data, s2_data, STATIONS)

View file

@ -275,7 +275,9 @@ def test_datac0(frame_type: str, tmp_path):
for p_item in proc: for p_item in proc:
assert p_item.exitcode == 0 assert p_item.exitcode == 0
p_item.close() # p_item.close() # Python 3.7+ only
p_item.terminate()
p_item.join()
analyze_results(s1_data, s2_data, STATIONS) analyze_results(s1_data, s2_data, STATIONS)

View file

@ -242,7 +242,9 @@ def test_datac0_negative(frame_type: str, tmp_path):
for p_item in proc: for p_item in proc:
assert p_item.exitcode == 0 assert p_item.exitcode == 0
p_item.close() # p_item.close() # Python 3.7+ only
p_item.terminate()
p_item.join()
analyze_results(s1_data, s2_data, STATIONS) analyze_results(s1_data, s2_data, STATIONS)

View file

@ -75,7 +75,7 @@ def t_HighSNR_C_P_DATACx(
tx_side = path tx_side = path
break break
print(f"{tx_side=} / {rx_side=}") print(f"tx_side={tx_side} / rx_side={rx_side}")
with subprocess.Popen( with subprocess.Popen(
args=[ args=[
@ -178,7 +178,9 @@ def test_HighSNR_C_P_DATACx(
proc.terminate() proc.terminate()
assert proc.exitcode == 0 assert proc.exitcode == 0
proc.close() # proc.close() # Python 3.7+ only
proc.terminate()
proc.join()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -73,7 +73,7 @@ def t_HighSNR_P_C_DATACx(bursts: int, frames_per_burst: int, mode: str):
tx_side = os.path.join("test", tx_side) tx_side = os.path.join("test", tx_side)
os.environ["PYTHONPATH"] += ":." os.environ["PYTHONPATH"] += ":."
print(f"{tx_side=} / {rx_side=}") print(f"tx_side={tx_side} / rx_side={rx_side}")
with subprocess.Popen( with subprocess.Popen(
args=[ args=[
@ -178,7 +178,9 @@ def test_HighSNR_P_C_DATACx(bursts: int, frames_per_burst: int, mode: str):
proc.terminate() proc.terminate()
assert proc.exitcode == 0 assert proc.exitcode == 0
proc.close() # proc.close() # Python 3.7+ only
proc.terminate()
proc.join()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -61,7 +61,7 @@ def t_HighSNR_P_P_DATACx(bursts: int, frames_per_burst: int, mode: str):
rx_side = os.path.join("test", rx_side) rx_side = os.path.join("test", rx_side)
os.environ["PYTHONPATH"] += ":." os.environ["PYTHONPATH"] += ":."
print(f"{tx_side=} / {rx_side=}") print(f"tx_side={tx_side} / rx_side={rx_side}")
with subprocess.Popen( with subprocess.Popen(
args=[ args=[
@ -139,7 +139,9 @@ def test_HighSNR_P_P_DATACx(bursts: int, frames_per_burst: int, mode: str):
proc.terminate() proc.terminate()
assert proc.exitcode == 0 assert proc.exitcode == 0
proc.close() # proc.close() # Python 3.7+ only
proc.terminate()
proc.join()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -61,7 +61,7 @@ def t_HighSNR_P_P_Multi(bursts: int, frames_per_burst: int):
rx_side = os.path.join("test", rx_side) rx_side = os.path.join("test", rx_side)
os.environ["PYTHONPATH"] += ":." os.environ["PYTHONPATH"] += ":."
print(f"{tx_side=} / {rx_side=}") print(f"tx_side={tx_side} / rx_side={rx_side}")
with subprocess.Popen( with subprocess.Popen(
args=[ args=[
@ -134,7 +134,9 @@ def test_HighSNR_P_P_multi(bursts: int, frames_per_burst: int):
proc.terminate() proc.terminate()
assert proc.exitcode == 0 assert proc.exitcode == 0
proc.close() # proc.close() # Python 3.7+ only
proc.terminate()
proc.join()
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -253,7 +253,7 @@ def test_foreign_disconnect(mycall: str, dxcall: str):
proc.terminate() proc.terminate()
proc.join() proc.join()
# print(f"\n{proc.exitcode=}") # print(f"\nproc.exitcode={proc.exitcode}")
assert proc.exitcode == 0 assert proc.exitcode == 0
@ -271,7 +271,7 @@ def test_valid_disconnect(mycall: str, dxcall: str):
proc.terminate() proc.terminate()
proc.join() proc.join()
# print(f"\n{proc.exitcode=}") # print(f"\nproc.exitcode={proc.exitcode}")
assert proc.exitcode == 0 assert proc.exitcode == 0

View file

@ -23,7 +23,6 @@ import modem
import sock import sock
import static import static
import structlog import structlog
from codec2 import FREEDV_MODE
from static import FRAME_TYPE as FR_TYPE from static import FRAME_TYPE as FR_TYPE

View file

@ -17,7 +17,6 @@ import modem
import sock import sock
import static import static
import structlog import structlog
from codec2 import FREEDV_MODE
from static import FRAME_TYPE as FR_TYPE from static import FRAME_TYPE as FR_TYPE

View file

@ -8,7 +8,6 @@ Created on Sun Dec 27 20:43:40 2020
# pylint: disable=import-outside-toplevel, attribute-defined-outside-init # pylint: disable=import-outside-toplevel, attribute-defined-outside-init
import base64 import base64
import queue
import sys import sys
import threading import threading
import time import time
@ -26,13 +25,11 @@ import structlog
import ujson as json import ujson as json
from codec2 import FREEDV_MODE from codec2 import FREEDV_MODE
from exceptions import NoCallsign from exceptions import NoCallsign
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT
from static import FRAME_TYPE as FR_TYPE from static import FRAME_TYPE as FR_TYPE
TESTMODE = False TESTMODE = False
DATA_QUEUE_TRANSMIT = queue.Queue()
DATA_QUEUE_RECEIVED = queue.Queue()
class DATA: class DATA:
"""Terminal Node Controller for FreeDATA""" """Terminal Node Controller for FreeDATA"""
@ -55,9 +52,6 @@ class DATA:
self.transmission_uuid = "" self.transmission_uuid = ""
# Received my callsign crc if we received a crc for another ssid
self.received_mycall_crc = b""
self.data_channel_last_received = 0.0 # time of last "live sign" of a frame self.data_channel_last_received = 0.0 # time of last "live sign" of a frame
self.burst_ack_snr = 0 # SNR from received burst ack frames self.burst_ack_snr = 0 # SNR from received burst ack frames
@ -126,6 +120,57 @@ class DATA:
self.transmission_timeout = 360 # transmission timeout in seconds self.transmission_timeout = 360 # transmission timeout in seconds
# Dictionary of functions and log messages used in process_data
# instead of a long series of if-elif-else statements.
self.rx_dispatcher = {
FR_TYPE.ARQ_DC_OPEN_ACK_N.value: (
self.arq_received_channel_is_open,
"ARQ OPEN ACK (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_ACK_W.value: (
self.arq_received_channel_is_open,
"ARQ OPEN ACK (Wide)",
),
FR_TYPE.ARQ_DC_OPEN_N.value: (
self.arq_received_data_channel_opener,
"ARQ Data Channel Open (Narrow)",
),
FR_TYPE.ARQ_DC_OPEN_W.value: (
self.arq_received_data_channel_opener,
"ARQ Data Channel Open (Wide)",
),
FR_TYPE.ARQ_SESSION_CLOSE.value: (
self.received_session_close,
"ARQ CLOSE SESSION",
),
FR_TYPE.ARQ_SESSION_HB.value: (
self.received_session_heartbeat,
"ARQ HEARTBEAT",
),
FR_TYPE.ARQ_SESSION_OPEN.value: (
self.received_session_opener,
"ARQ OPEN SESSION",
),
FR_TYPE.ARQ_STOP.value: (self.received_stop_transmission, "ARQ STOP TX"),
FR_TYPE.BEACON.value: (self.received_beacon, "BEACON"),
FR_TYPE.BURST_ACK.value: (self.burst_ack_nack_received, "BURST ACK"),
FR_TYPE.BURST_NACK.value: (self.burst_ack_nack_received, "BURST NACK"),
FR_TYPE.CQ.value: (self.received_cq, "CQ"),
FR_TYPE.FR_ACK.value: (self.frame_ack_received, "FRAME ACK"),
FR_TYPE.FR_NACK.value: (self.frame_nack_received, "FRAME NACK"),
FR_TYPE.FR_REPEAT.value: (self.burst_rpt_received, "REPEAT REQUEST"),
FR_TYPE.PING_ACK.value: (self.received_ping_ack, "PING ACK"),
FR_TYPE.PING.value: (self.received_ping, "PING"),
FR_TYPE.QRV.value: (self.received_qrv, "QRV"),
}
self.command_dispatcher = {
"CONNECT": (self.arq_session_handler, "CONNECT"),
"CQ": (self.transmit_cq, "CQ"),
"DISCONNECT": (self.close_session, "DISCONNECT"),
"SEND_TEST_FRAME": (self.send_test_frame, "TEST"),
"STOP": (self.stop_transmission, "STOP"),
}
# Start worker and watchdog threads # Start worker and watchdog threads
worker_thread_transmit = threading.Thread( worker_thread_transmit = threading.Thread(
target=self.worker_transmit, name="worker thread transmit", daemon=True target=self.worker_transmit, name="worker thread transmit", daemon=True
@ -159,13 +204,12 @@ class DATA:
while True: while True:
data = self.data_queue_transmit.get() data = self.data_queue_transmit.get()
# [0] == Command # Dispatch commands known to command_dispatcher
if data[0] == "CQ": if data[0] in self.command_dispatcher:
self.transmit_cq() self.log.debug(f"[TNC] TX {self.command_dispatcher[data[0]][1]}...")
self.command_dispatcher[data[0]][0]()
elif data[0] == "STOP":
self.stop_transmission()
# Dispatch commands that need more arguments.
elif data[0] == "PING": elif data[0] == "PING":
# [1] dxcallsign # [1] dxcallsign
self.transmit_ping(data[1]) self.transmit_ping(data[1])
@ -187,18 +231,6 @@ class DATA:
# [5] mycallsign with ssid # [5] mycallsign with ssid
self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5]) self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5])
elif data[0] == "CONNECT":
# [1] DX CALLSIGN
# self.arq_session_handler(data[1])
self.arq_session_handler()
elif data[0] == "DISCONNECT":
# [1] DX CALLSIGN
self.close_session()
elif data[0] == "SEND_TEST_FRAME":
# [1] DX CALLSIGN
self.send_test_frame()
else: else:
self.log.error( self.log.error(
"[TNC] worker_transmit: received invalid command:", data=data "[TNC] worker_transmit: received invalid command:", data=data
@ -254,7 +286,14 @@ class DATA:
frame = frametype - 10 frame = frametype - 10
n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big") n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big")
if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: # Dispatch activity based on received frametype
if frametype in self.rx_dispatcher:
# Process frames "known" by rx_dispatcher
self.log.debug(f"[TNC] {self.rx_dispatcher[frametype][1]} RECEIVED....")
self.rx_dispatcher[frametype][0](bytes_out[:-2])
# Process frametypes requiring a different set of arguments.
elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
# get snr of received data # get snr of received data
# FIXME: find a fix for this - after moving to classes, this no longer works # FIXME: find a fix for this - after moving to classes, this no longer works
# snr = self.calculate_snr(freedv) # snr = self.calculate_snr(freedv)
@ -270,99 +309,15 @@ class DATA:
# self.log.debug(f"[TNC] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") # self.log.debug(f"[TNC] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
# self.c_lib.freedv_set_sync(freedv, 0) # self.c_lib.freedv_set_sync(freedv, 0)
# BURST ACK
elif frametype == FR_TYPE.BURST_ACK.value:
self.log.debug("[TNC] BURST ACK RECEIVED....")
self.burst_ack_received(bytes_out[:-2])
# FRAME ACK
elif frametype == FR_TYPE.FR_ACK.value:
self.log.debug("[TNC] FRAME ACK RECEIVED....")
self.frame_ack_received()
# FRAME RPT
elif frametype == FR_TYPE.FR_REPEAT.value:
self.log.debug("[TNC] REPEAT REQUEST RECEIVED....")
self.burst_rpt_received(bytes_out[:-2])
# FRAME NACK
elif frametype == FR_TYPE.FR_NACK.value:
self.log.debug("[TNC] FRAME NACK RECEIVED....")
self.frame_nack_received(bytes_out[:-2])
# BURST NACK
elif frametype == FR_TYPE.BURST_NACK.value:
self.log.debug("[TNC] BURST NACK RECEIVED....")
self.burst_nack_received(bytes_out[:-2])
# CQ FRAME
elif frametype == FR_TYPE.CQ.value:
self.log.debug("[TNC] CQ RECEIVED....")
self.received_cq(bytes_out[:-2])
# QRV FRAME
elif frametype == FR_TYPE.QRV.value:
self.log.debug("[TNC] QRV RECEIVED....")
self.received_qrv(bytes_out[:-2])
# PING FRAME
elif frametype == FR_TYPE.PING.value:
self.log.debug("[TNC] PING RECEIVED....")
self.received_ping(bytes_out[:-2])
# PING ACK
elif frametype == FR_TYPE.PING_ACK.value:
self.log.debug("[TNC] PING ACK RECEIVED....")
self.received_ping_ack(bytes_out[:-2])
# SESSION OPENER
elif frametype == FR_TYPE.ARQ_SESSION_OPEN.value:
self.log.debug("[TNC] OPEN SESSION RECEIVED....")
self.received_session_opener(bytes_out[:-2])
# SESSION HEARTBEAT
elif frametype == FR_TYPE.ARQ_SESSION_HB.value:
self.log.debug("[TNC] SESSION HEARTBEAT RECEIVED....")
self.received_session_heartbeat(bytes_out[:-2])
# SESSION CLOSE
elif frametype == FR_TYPE.ARQ_SESSION_CLOSE.value:
self.log.debug("[TNC] CLOSE ARQ SESSION RECEIVED....")
self.received_session_close(bytes_out[:-2])
# ARQ FILE TRANSFER RECEIVED!
elif frametype in [
FR_TYPE.ARQ_DC_OPEN_W.value,
FR_TYPE.ARQ_DC_OPEN_N.value,
]:
self.log.debug("[TNC] ARQ arq_received_data_channel_opener")
self.arq_received_data_channel_opener(bytes_out[:-2])
# ARQ CHANNEL IS OPENED
elif frametype in [
FR_TYPE.ARQ_DC_OPEN_ACK_W.value,
FR_TYPE.ARQ_DC_OPEN_ACK_N.value,
]:
self.log.debug("[TNC] ARQ arq_received_channel_is_open")
self.arq_received_channel_is_open(bytes_out[:-2])
# ARQ STOP TRANSMISSION
elif frametype == FR_TYPE.ARQ_STOP.value:
self.log.debug("[TNC] ARQ received stop transmission")
self.received_stop_transmission()
# this is outdated and we may remove it
elif frametype == FR_TYPE.BEACON.value:
self.log.debug("[TNC] BEACON RECEIVED")
self.received_beacon(bytes_out[:-2])
# TESTFRAMES # TESTFRAMES
elif frametype == FR_TYPE.TEST_FRAME.value: elif frametype == FR_TYPE.TEST_FRAME.value:
self.log.debug("[TNC] TESTFRAME RECEIVED", frame=bytes_out[:]) self.log.debug("[TNC] TESTFRAME RECEIVED", frame=bytes_out[:])
# Unknown frame type # Unknown frame type
else: else:
self.log.warning("[TNC] ARQ - other frame type", frametype=frametype) self.log.warning(
"[TNC] ARQ - other frame type", frametype=FR_TYPE(frametype).name
)
else: else:
# for debugging purposes to receive all data # for debugging purposes to receive all data
@ -402,7 +357,7 @@ class DATA:
while static.TRANSMITTING: while static.TRANSMITTING:
time.sleep(0.01) time.sleep(0.01)
def send_data_to_socket_queue(self, /, **jsondata): def send_data_to_socket_queue(self, **jsondata):
""" """
Send information to the UI via JSON and the sock.SOCKET_QUEUE. Send information to the UI via JSON and the sock.SOCKET_QUEUE.
@ -522,19 +477,15 @@ class DATA:
Returns: Returns:
""" """
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
data_in = bytes(data_in) data_in = bytes(data_in)
# get received crc for different mycall ssids # get received crc for different mycall ssids
self.received_mycall_crc = data_in[2:5]
# check if callsign ssid override # check if callsign ssid override
_valid, mycallsign = helpers.check_callsign( _, mycallsign = helpers.check_callsign(
self.mycallsign, self.received_mycall_crc self.mycallsign, data_in[2:5]
) )
if not _valid:
# ARQ data packet not for me.
self.arq_cleanup()
return
# only process data if we are in ARQ and BUSY state else return to quit # only process data if we are in ARQ and BUSY state else return to quit
if not static.ARQ_STATE and static.TNC_STATE != "BUSY": if not static.ARQ_STATE and static.TNC_STATE != "BUSY":
@ -744,15 +695,6 @@ class DATA:
self.transmission_uuid = str(uuid.uuid4()) self.transmission_uuid = str(uuid.uuid4())
timestamp = int(time.time()) timestamp = int(time.time())
# check if callsign ssid override
_valid, mycallsign = helpers.check_callsign(
self.mycallsign, self.received_mycall_crc
)
if not _valid:
# ARQ data packet not for me.
self.arq_cleanup()
return
# Re-code data_frame in base64, UTF-8 for JSON UI communication. # Re-code data_frame in base64, UTF-8 for JSON UI communication.
base64_data = base64.b64encode(data_frame).decode("UTF-8") base64_data = base64.b64encode(data_frame).decode("UTF-8")
static.RX_BUFFER.append( static.RX_BUFFER.append(
@ -797,7 +739,6 @@ class DATA:
) )
else: else:
self.send_data_to_socket_queue( self.send_data_to_socket_queue(
freedata="tnc-message", freedata="tnc-message",
arq="transmission", arq="transmission",
@ -942,9 +883,8 @@ class DATA:
# TX_N_FRAMES_PER_BURST = 1 is working # TX_N_FRAMES_PER_BURST = 1 is working
arqheader = bytearray() arqheader = bytearray()
arqheader[:1] = bytes( # arqheader[:1] = bytes([FR_TYPE.BURST_01.value + i])
[FR_TYPE.BURST_01.value] arqheader[:1] = bytes([FR_TYPE.BURST_01.value])
) # bytes([FRAME_TYPE.BURST_01.value + i])
arqheader[1:2] = bytes([TX_N_FRAMES_PER_BURST]) arqheader[1:2] = bytes([TX_N_FRAMES_PER_BURST])
arqheader[2:5] = static.DXCALLSIGN_CRC arqheader[2:5] = static.DXCALLSIGN_CRC
arqheader[5:8] = static.MYCALLSIGN_CRC arqheader[5:8] = static.MYCALLSIGN_CRC
@ -1106,10 +1046,9 @@ class DATA:
self.log.debug("[TNC] TESTMODE: arq_transmit exiting.") self.log.debug("[TNC] TESTMODE: arq_transmit exiting.")
sys.exit(0) sys.exit(0)
# signalling frames received def burst_ack_nack_received(self, data_in: bytes) -> None:
def burst_ack_received(self, data_in: bytes):
""" """
Received a ACK for a transmitted frame, keep track and Received a ACK/NACK for a transmitted frame, keep track and
make adjustments to speed level if needed. make adjustments to speed level if needed.
Args: Args:
@ -1118,9 +1057,6 @@ class DATA:
Returns: Returns:
""" """
# Increase speed level if we received a burst ack
# self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1)
# Process data only if we are in ARQ and BUSY state # Process data only if we are in ARQ and BUSY state
if static.ARQ_STATE: if static.ARQ_STATE:
helpers.add_to_heard_stations( helpers.add_to_heard_stations(
@ -1131,62 +1067,42 @@ class DATA:
static.FREQ_OFFSET, static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY, static.HAMLIB_FREQUENCY,
) )
# Force data retry loops of TX TNC to stop and continue with next frame
self.burst_ack = True frametype = int.from_bytes(bytes(data_in[:1]), "big")
desc = "ack"
if frametype == FR_TYPE.BURST_ACK.value:
# Increase speed level if we received a burst ack
# self.speed_level = min(self.speed_level + 1, len(self.mode_list) - 1)
# Force data retry loops of TX TNC to stop and continue with next frame
self.burst_ack = True
# Reset burst nack counter
self.burst_nack_counter = 0
# Reset n retries per burst counter
self.n_retries_per_burst = 0
else:
# Decrease speed level if we received a burst nack
# self.speed_level = max(self.speed_level - 1, 0)
# Set flag to retry frame again.
self.burst_nack = True
# Increment burst nack counter
self.burst_nack_counter += 1
desc = "nack"
# Update data_channel timestamp # Update data_channel timestamp
self.data_channel_last_received = int(time.time()) self.data_channel_last_received = int(time.time())
self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big") self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big")
self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big") self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big")
static.ARQ_SPEED_LEVEL = self.speed_level static.ARQ_SPEED_LEVEL = self.speed_level
self.log.debug( self.log.debug(
"[TNC] burst_ack_received:", f"[TNC] burst_{desc}_received:",
speed_level=self.speed_level, speed_level=self.speed_level,
c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name, c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name,
) )
# Reset burst nack counter def frame_ack_received(
self.burst_nack_counter = 0 self, data_in: bytes # pylint: disable=unused-argument
# Reset n retries per burst counter ) -> None:
self.n_retries_per_burst = 0
# signalling frames received
def burst_nack_received(self, data_in: bytes):
"""
Received a NACK for a transmitted frame, keep track and
make adjustments to speed level if needed.
Args:
data_in:bytes:
"""
# Decrease speed level if we received a burst nack
# self.speed_level = max(self.speed_level - 1, 0)
# only process data if we are in ARQ and BUSY state
if static.ARQ_STATE:
helpers.add_to_heard_stations(
static.DXCALLSIGN,
static.DXGRID,
"DATA-CHANNEL",
static.SNR,
static.FREQ_OFFSET,
static.HAMLIB_FREQUENCY,
)
# Force data loops of TNC to stop and continue with next frame
self.burst_nack = True
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
self.burst_ack_snr = int.from_bytes(bytes(data_in[7:8]), "big")
self.speed_level = int.from_bytes(bytes(data_in[8:9]), "big")
static.ARQ_SPEED_LEVEL = self.speed_level
self.burst_nack_counter += 1
self.log.debug(
"[TNC] burst_nack_received:",
speed_level=self.speed_level,
c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name,
)
def frame_ack_received(self):
"""Received an ACK for a transmitted frame""" """Received an ACK for a transmitted frame"""
# Process data only if we are in ARQ and BUSY state # Process data only if we are in ARQ and BUSY state
if static.ARQ_STATE: if static.ARQ_STATE:
@ -1204,9 +1120,11 @@ class DATA:
self.data_channel_last_received = int(time.time()) self.data_channel_last_received = int(time.time())
self.arq_session_last_received = int(time.time()) self.arq_session_last_received = int(time.time())
def frame_nack_received(self, data_in: bytes): # pylint: disable=unused-argument def frame_nack_received(
self, data_in: bytes # pylint: disable=unused-argument
) -> None:
""" """
Received a NACK for a transmitted framt Received a NACK for a transmitted frame
Args: Args:
data_in:bytes: data_in:bytes:
@ -1444,7 +1362,9 @@ class DATA:
data_in:bytes: data_in:bytes:
""" """
# Close the session if the DXCALLSIGN_CRC matches the station in static. # We've arrived here from process_data which already checked that the frame
# is intended for this station.
# Close the session if the CRC matches the remote station in static.
_valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7])) _valid_crc, _ = helpers.check_callsign(static.DXCALLSIGN, bytes(data_in[4:7]))
if _valid_crc: if _valid_crc:
static.ARQ_SESSION_STATE = "disconnected" static.ARQ_SESSION_STATE = "disconnected"
@ -1559,7 +1479,7 @@ class DATA:
# we need to compress data for gettin a compression factor. # we need to compress data for gettin a compression factor.
# so we are compressing twice. This is not that nice and maybe there is another way # so we are compressing twice. This is not that nice and maybe there is another way
# for calculating transmission statistics # for calculating transmission statistics
static.ARQ_COMPRESSION_FACTOR = len(data_out) / len(zlib.compress(data_out)) # static.ARQ_COMPRESSION_FACTOR = len(data_out) / len(zlib.compress(data_out))
self.arq_open_data_channel(mode, n_frames_per_burst, mycallsign) self.arq_open_data_channel(mode, n_frames_per_burst, mycallsign)
@ -1677,6 +1597,8 @@ class DATA:
data_in:bytes: data_in:bytes:
""" """
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
self.arq_file_transfer = True self.arq_file_transfer = True
self.is_IRS = True self.is_IRS = True
self.send_data_to_socket_queue( self.send_data_to_socket_queue(
@ -1684,10 +1606,13 @@ class DATA:
arq="transmission", arq="transmission",
status="opening", status="opening",
) )
static.DXCALLSIGN_CRC = bytes(data_in[4:7]) static.DXCALLSIGN_CRC = bytes(data_in[4:7])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13]))
n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big") # n_frames_per_burst is currently unused
# n_frames_per_burst = int.from_bytes(bytes(data_in[13:14]), "big")
frametype = int.from_bytes(bytes(data_in[:1]), "big") frametype = int.from_bytes(bytes(data_in[:1]), "big")
# check if we received low bandwidth mode # check if we received low bandwidth mode
if frametype == FR_TYPE.ARQ_DC_OPEN_W.value: if frametype == FR_TYPE.ARQ_DC_OPEN_W.value:
@ -1713,11 +1638,7 @@ class DATA:
) )
# check if callsign ssid override # check if callsign ssid override
valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4]) _, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4])
if not valid:
# ARQ connect packet not for me.
self.arq_cleanup()
return
self.log.info( self.log.info(
"[TNC] ARQ | DATA | RX | [" "[TNC] ARQ | DATA | RX | ["
@ -1845,6 +1766,7 @@ class DATA:
""" """
if not str(dxcallsign).strip(): if not str(dxcallsign).strip():
# TODO: We should display a message to this effect on the UI.
self.log.warning("[TNC] Missing required callsign", dxcallsign=dxcallsign) self.log.warning("[TNC] Missing required callsign", dxcallsign=dxcallsign)
return return
static.DXCALLSIGN = dxcallsign static.DXCALLSIGN = dxcallsign
@ -1879,10 +1801,10 @@ class DATA:
data_in:bytes: data_in:bytes:
""" """
static.DXCALLSIGN_CRC = bytes(data_in[4:7]) dxcallsign_CRC = bytes(data_in[4:7])
static.DXCALLSIGN = helpers.bytes_to_callsign(bytes(data_in[7:13])) dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13]))
helpers.add_to_heard_stations( helpers.add_to_heard_stations(
static.DXCALLSIGN, dxcallsign,
static.DXGRID, static.DXGRID,
"PING", "PING",
static.SNR, static.SNR,
@ -1896,7 +1818,7 @@ class DATA:
uuid=str(uuid.uuid4()), uuid=str(uuid.uuid4()),
timestamp=int(time.time()), timestamp=int(time.time()),
mycallsign=str(self.mycallsign, "UTF-8"), mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"), dxcallsign=str(dxcallsign, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"), dxgrid=str(static.DXGRID, "UTF-8"),
snr=str(static.SNR), snr=str(static.SNR),
) )
@ -1907,6 +1829,8 @@ class DATA:
self.log.debug("[TNC] received_ping: ping not for this station.") self.log.debug("[TNC] received_ping: ping not for this station.")
return return
static.DXCALLSIGN_CRC = dxcallsign_CRC
static.DXCALLSIGN = dxcallsign
self.log.info( self.log.info(
"[TNC] PING REQ [" "[TNC] PING REQ ["
+ str(mycallsign, "UTF-8") + str(mycallsign, "UTF-8")
@ -1990,7 +1914,9 @@ class DATA:
) )
self.arq_cleanup() self.arq_cleanup()
def received_stop_transmission(self) -> None: def received_stop_transmission(
self, data_in: bytes
) -> None: # pylint: disable=unused-argument
""" """
Received a transmission stop Received a transmission stop
""" """
@ -2348,8 +2274,6 @@ class DATA:
self.log.debug("[TNC] arq_cleanup") self.log.debug("[TNC] arq_cleanup")
self.received_mycall_crc = b""
self.rx_frame_bof_received = False self.rx_frame_bof_received = False
self.rx_frame_eof_received = False self.rx_frame_eof_received = False
self.burst_ack = False self.burst_ack = False

View file

@ -11,28 +11,24 @@ Created on Wed Dec 23 07:04:24 2020
import atexit import atexit
import ctypes import ctypes
import os import os
import queue
import sys import sys
import threading import threading
import time import time
from collections import deque from collections import deque
import codec2 import codec2
import data_handler
import numpy as np import numpy as np
import sock import sock
import sounddevice as sd import sounddevice as sd
import static import static
import structlog import structlog
import ujson as json import ujson as json
from queues import DATA_QUEUE_RECEIVED, MODEM_RECEIVED_QUEUE, MODEM_TRANSMIT_QUEUE
TESTMODE = False TESTMODE = False
RXCHANNEL = "" RXCHANNEL = ""
TXCHANNEL = "" TXCHANNEL = ""
# Initialize FIFO queue to store received frames
MODEM_RECEIVED_QUEUE = queue.Queue()
MODEM_TRANSMIT_QUEUE = queue.Queue()
static.TRANSMITTING = False static.TRANSMITTING = False
# Receive only specific modes to reduce CPU load # Receive only specific modes to reduce CPU load
@ -690,7 +686,7 @@ class RF:
# data[0] = bytes_out # data[0] = bytes_out
# data[1] = freedv session # data[1] = freedv session
# data[2] = bytes_per_frame # data[2] = bytes_per_frame
data_handler.DATA_QUEUE_RECEIVED.put([data[0], data[1], data[2]]) DATA_QUEUE_RECEIVED.put([data[0], data[1], data[2]])
self.modem_received_queue.task_done() self.modem_received_queue.task_done()
def get_frequency_offset(self, freedv: ctypes.c_void_p) -> float: def get_frequency_offset(self, freedv: ctypes.c_void_p) -> float:

11
tnc/queues.py Normal file
View file

@ -0,0 +1,11 @@
"""
Hold queues used by more than one module to eliminate cyclic imports.
"""
import queue
DATA_QUEUE_TRANSMIT = queue.Queue()
DATA_QUEUE_RECEIVED = queue.Queue()
# Initialize FIFO queue to store received frames
MODEM_RECEIVED_QUEUE = queue.Queue()
MODEM_TRANSMIT_QUEUE = queue.Queue()

View file

@ -25,12 +25,12 @@ import sys
import threading import threading
import time import time
import data_handler
import helpers import helpers
import static import static
import structlog import structlog
import ujson as json import ujson as json
from exceptions import NoCallsign from exceptions import NoCallsign
from queues import DATA_QUEUE_TRANSMIT
SOCKET_QUEUE = queue.Queue() SOCKET_QUEUE = queue.Queue()
DAEMON_QUEUE = queue.Queue() DAEMON_QUEUE = queue.Queue()
@ -227,7 +227,7 @@ def process_tnc_commands(data):
and received_json["command"] == "send_test_frame" and received_json["command"] == "send_test_frame"
): ):
try: try:
data_handler.DATA_QUEUE_TRANSMIT.put(["SEND_TEST_FRAME"]) DATA_QUEUE_TRANSMIT.put(["SEND_TEST_FRAME"])
command_response("send_test_frame", True) command_response("send_test_frame", True)
except Exception as err: except Exception as err:
command_response("send_test_frame", False) command_response("send_test_frame", False)
@ -240,7 +240,7 @@ def process_tnc_commands(data):
# CQ CQ CQ ----------------------------------------------------- # CQ CQ CQ -----------------------------------------------------
if received_json["command"] == "cqcqcq": if received_json["command"] == "cqcqcq":
try: try:
data_handler.DATA_QUEUE_TRANSMIT.put(["CQ"]) DATA_QUEUE_TRANSMIT.put(["CQ"])
command_response("cqcqcq", True) command_response("cqcqcq", True)
except Exception as err: except Exception as err:
@ -254,7 +254,7 @@ def process_tnc_commands(data):
try: try:
static.BEACON_STATE = True static.BEACON_STATE = True
interval = int(received_json["parameter"]) interval = int(received_json["parameter"])
data_handler.DATA_QUEUE_TRANSMIT.put(["BEACON", interval, True]) DATA_QUEUE_TRANSMIT.put(["BEACON", interval, True])
command_response("start_beacon", True) command_response("start_beacon", True)
except Exception as err: except Exception as err:
command_response("start_beacon", False) command_response("start_beacon", False)
@ -269,7 +269,7 @@ def process_tnc_commands(data):
try: try:
log.warning("[SCK] Stopping beacon!") log.warning("[SCK] Stopping beacon!")
static.BEACON_STATE = False static.BEACON_STATE = False
data_handler.DATA_QUEUE_TRANSMIT.put(["BEACON", None, False]) DATA_QUEUE_TRANSMIT.put(["BEACON", None, False])
command_response("stop_beacon", True) command_response("stop_beacon", True)
except Exception as err: except Exception as err:
command_response("stop_beacon", False) command_response("stop_beacon", False)
@ -293,7 +293,7 @@ def process_tnc_commands(data):
dxcallsign = helpers.callsign_to_bytes(dxcallsign) dxcallsign = helpers.callsign_to_bytes(dxcallsign)
dxcallsign = helpers.bytes_to_callsign(dxcallsign) dxcallsign = helpers.bytes_to_callsign(dxcallsign)
data_handler.DATA_QUEUE_TRANSMIT.put(["PING", dxcallsign]) DATA_QUEUE_TRANSMIT.put(["PING", dxcallsign])
command_response("ping", True) command_response("ping", True)
except NoCallsign: except NoCallsign:
command_response("ping", False) command_response("ping", False)
@ -320,7 +320,7 @@ def process_tnc_commands(data):
static.DXCALLSIGN = dxcallsign static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
data_handler.DATA_QUEUE_TRANSMIT.put(["CONNECT", dxcallsign]) DATA_QUEUE_TRANSMIT.put(["CONNECT", dxcallsign])
command_response("connect", True) command_response("connect", True)
except Exception as err: except Exception as err:
command_response("connect", False) command_response("connect", False)
@ -334,7 +334,7 @@ def process_tnc_commands(data):
if received_json["type"] == "arq" and received_json["command"] == "disconnect": if received_json["type"] == "arq" and received_json["command"] == "disconnect":
# send ping frame and wait for ACK # send ping frame and wait for ACK
try: try:
data_handler.DATA_QUEUE_TRANSMIT.put(["DISCONNECT"]) DATA_QUEUE_TRANSMIT.put(["DISCONNECT"])
command_response("disconnect", True) command_response("disconnect", True)
except Exception as err: except Exception as err:
command_response("disconnect", False) command_response("disconnect", False)
@ -383,7 +383,7 @@ def process_tnc_commands(data):
binarydata = base64.b64decode(base64data) binarydata = base64.b64decode(base64data)
data_handler.DATA_QUEUE_TRANSMIT.put( DATA_QUEUE_TRANSMIT.put(
["ARQ_RAW", binarydata, mode, n_frames, arq_uuid, mycallsign] ["ARQ_RAW", binarydata, mode, n_frames, arq_uuid, mycallsign]
) )
@ -402,7 +402,7 @@ def process_tnc_commands(data):
): ):
try: try:
if static.TNC_STATE == "BUSY" or static.ARQ_STATE: if static.TNC_STATE == "BUSY" or static.ARQ_STATE:
data_handler.DATA_QUEUE_TRANSMIT.put(["STOP"]) DATA_QUEUE_TRANSMIT.put(["STOP"])
log.warning("[SCK] Stopping transmission!") log.warning("[SCK] Stopping transmission!")
static.TNC_STATE = "IDLE" static.TNC_STATE = "IDLE"
static.ARQ_STATE = False static.ARQ_STATE = False