Initial cut of fix for issue #206.

This commit is contained in:
Paul Kronenwetter 2022-06-17 19:48:47 -04:00
parent ca1b079e4a
commit 292754af2d
5 changed files with 545 additions and 4 deletions

View file

@ -0,0 +1,242 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Negative tests for datac0 frames.
@author: kronenpj
"""
import contextlib
import multiprocessing
import os
import sys
import threading
import time
import zlib
import helpers
import log_handler
import pytest
import structlog
try:
import test.util_datac0_negative as util
except ImportError:
import util_datac0_negative as util
STATIONS = ["AA2BB", "ZZ9YY"]
PIPE_THREAD_RUNNING = True
def parameters() -> dict:
# Construct message to start beacon.
beacon_data = {"type": "command", "command": "start_beacon", "parameter": "-5"}
# Construct message to start ping.
ping_data = {"type": "ping", "command": "ping", "dxcallsign": ""}
connect_data = {"type": "arq", "command": "connect", "dxcallsign": ""}
beacon_timeout = 1
ping_timeout = 1
connect_timeout = 1
beacon_tx_check = '"status":"Failed"'
ping_tx_check = '"ping","status":"Failed"'
connect_tx_check = '"status":"Failed"'
beacon_rx_check = '"beacon":"received"'
ping_rx_check = '"ping":"received"'
connect_rx_check = '"connect":"received"'
beacon_final_tx_check = [beacon_tx_check]
ping_final_tx_check = [ping_tx_check]
connect_final_tx_check = [connect_tx_check]
beacon_final_rx_check = [beacon_rx_check]
ping_final_rx_check = [ping_rx_check]
connect_final_rx_check = [connect_rx_check]
return {
"beacon": (
beacon_data,
beacon_timeout,
beacon_tx_check,
beacon_rx_check,
beacon_final_tx_check,
beacon_final_rx_check,
),
"connect": (
connect_data,
connect_timeout,
connect_tx_check,
connect_rx_check,
connect_final_tx_check,
connect_final_rx_check,
),
"ping": (
ping_data,
ping_timeout,
ping_tx_check,
ping_rx_check,
ping_final_tx_check,
ping_final_rx_check,
),
}
def locate_data_with_crc(source_list: list, text: str, data: bytes, frametype: str):
"""Try to locate data in source_list."""
log = structlog.get_logger("locate_data_with_crc")
if data in source_list:
with contextlib.suppress():
data = zlib.decompress(data[2:])
log.info(f"analyze_results: {text} no CRC", _frametype=frametype, data=data)
elif data + helpers.get_crc_8(data) in source_list:
with contextlib.suppress(zlib.error):
data = zlib.decompress(data[2:-1])
log.info(f"analyze_results: {text} CRC 8", _frametype=frametype, data=data)
elif data + helpers.get_crc_16(data) in source_list:
with contextlib.suppress(zlib.error):
data = zlib.decompress(data[2:-2])
log.info(f"analyze_results: {text} CRC16", _frametype=frametype, data=data)
elif data + helpers.get_crc_24(data) in source_list:
with contextlib.suppress(zlib.error):
data = zlib.decompress(data[2:-3])
log.info(f"analyze_results: {text} CRC24", _frametype=frametype, data=data)
elif data + helpers.get_crc_32(data) in source_list:
with contextlib.suppress(zlib.error):
data = zlib.decompress(data[2:-4])
log.info(f"analyze_results: {text} CRC32", _frametype=frametype, data=data)
else:
log.info(
f"analyze_results: {text} not received:",
_frame=frametype,
data=data,
)
def analyze_results(station1: list, station2: list, call_list: list):
"""Examine the information retrieved from the sub-processes."""
# Data in these lists is either a series of bytes of received data,
# or a bytearray of transmitted data from the station.
log = structlog.get_logger("analyze_results")
# Check that each station's transmitted data was received by the other.
for s1, s2, text in [
(station1, station2, call_list[0]),
(station2, station1, call_list[1]),
]:
for s1_item in s1:
if not isinstance(s1_item, list):
continue
data = bytes(s1_item[0])
frametypeno = int.from_bytes(data[:1], "big")
# frametype = static.FRAME_TYPE(frametypeno).name
frametype = str(frametypeno)
s1_crc = helpers.decode_call(helpers.bytes_to_callsign(data[1:4]))
s2_crc = helpers.decode_call(helpers.bytes_to_callsign(data[2:5]))
log.info(
"analyze_results: callsign CRCs:",
tx_station=text,
s1_crc=s1_crc,
s2_crc=s2_crc,
)
locate_data_with_crc(s2, text, data, frametype)
# @pytest.mark.parametrize("frame_type", ["beacon", "connect", "ping"])
@pytest.mark.parametrize("frame_type", ["ping"])
def test_datac0(frame_type: str, tmp_path):
log_handler.setup_logging(filename=tmp_path / "test_datac0", level="DEBUG")
log = structlog.get_logger("test_datac0")
s1_data = []
s2_data = []
def recv_data(buffer: list, pipe):
while PIPE_THREAD_RUNNING:
if pipe.poll(0.1):
buffer.append(pipe.recv())
else:
time.sleep(0.1)
def recv_from_pipes(s1_rx, s1_pipe, s2_rx, s2_pipe) -> list:
processes = [
threading.Thread(target=recv_data, args=(s1_rx, s1_pipe)),
threading.Thread(target=recv_data, args=(s2_rx, s2_pipe)),
]
for item in processes:
item.start()
return processes
# This sufficiently separates the two halves of the test. This is needed
# because both scripts change global state. They would conflict if running in
# the same process.
from_s1, s1_send = multiprocessing.Pipe()
from_s2, s2_send = multiprocessing.Pipe()
proc = [
multiprocessing.Process(
target=util.t_datac0_1,
args=(
s1_send,
STATIONS[0],
STATIONS[1],
parameters()[frame_type],
tmp_path,
),
daemon=True,
),
multiprocessing.Process(
target=util.t_datac0_2,
args=(
s2_send,
STATIONS[1],
STATIONS[0],
parameters()[frame_type],
tmp_path,
),
daemon=True,
),
]
pipe_receivers = recv_from_pipes(s1_data, from_s1, s2_data, from_s2)
# log.debug("Creating ")
# print("Starting threads.")
for p_item in proc:
p_item.start()
# This relies on each process exiting when its job is complete!
# print("Waiting for threads to exit.")
for p_item in proc:
p_item.join()
global PIPE_THREAD_RUNNING # pylint: disable=global-statement
PIPE_THREAD_RUNNING = False
for pipe_recv in pipe_receivers:
pipe_recv.join()
for idx in range(2):
try:
os.unlink(tmp_path / f"hfchannel{idx+1}")
except FileNotFoundError as fnfe:
log.debug(f"Unlinking pipe: {fnfe}")
for p_item in proc:
assert p_item.exitcode == 0
p_item.close()
analyze_results(s1_data, s2_data, STATIONS)
if __name__ == "__main__":
# Run pytest with the current script as the filename.
ecode = pytest.main(["-s", "-v", sys.argv[0]])
if ecode == 0:
print("errors: 0")
else:
print(ecode)

View file

@ -0,0 +1,286 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Negative test utilities for datac0 frames.
@author: kronenpj
"""
import json
import time
from pprint import pformat
from typing import Callable, Tuple
import data_handler
import helpers
import modem
import sock
import static
import structlog
def t_setup(
station: int,
mycall: str,
dxcall: str,
rx_channel: str,
tx_channel: str,
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
modem.RXCHANNEL = tmp_path / rx_channel
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / tx_channel
static.HAMLIB_RADIOCONTROL = "disabled"
static.LOW_BANDWIDTH_MODE = lowbwmode or True
static.MYGRID = bytes("AA12aa", "utf-8")
static.RESPOND_TO_CQ = True
static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
mycallsign = helpers.callsign_to_bytes(mycall)
mycallsign = helpers.bytes_to_callsign(mycallsign)
static.MYCALLSIGN = mycallsign
static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN)
dxcallsign = helpers.callsign_to_bytes(dxcall)
dxcallsign = helpers.bytes_to_callsign(dxcallsign)
static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
# Create the TNC
tnc = data_handler.DATA()
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
tnc.log = structlog.get_logger(f"station{station}_DATA")
# Limit the frame-ack timeout
tnc.time_list_low_bw = [3, 1, 1]
tnc.time_list_high_bw = [3, 1, 1]
tnc.time_list = [3, 1, 1]
# Limit number of retries
tnc.rx_n_max_retries_per_burst = 4
# Create the modem
t_modem = modem.RF()
orig_tx_func = modem.RF.transmit
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger(f"station{station}_RF")
return tnc, orig_rx_func, orig_tx_func
def t_datac0_1(
parent_pipe,
mycall: str,
dxcall: str,
config: Tuple,
tmp_path,
):
log = structlog.get_logger("station1")
orig_tx_func: Callable
orig_rx_func: Callable
log.debug("t_datac0_1:", TMP_PATH=tmp_path)
# Unpack tuple
data, timeout_duration, tx_check, _, final_tx_check, _ = config
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
# log.info("S1 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S1 TX: ", TX=frametype)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S1 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S1 RX: ", RX=frametype)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
1,
mycall,
dxcall,
"hfchannel1",
"hfchannel2",
True,
t_transmit,
t_process_data,
tmp_path,
)
log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.process_tnc_commands(json.dumps(data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
while tx_check not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning(
"station1 TIMEOUT",
first=True,
queue=str(sock.SOCKET_QUEUE.queue),
tx_check=tx_check,
)
break
time.sleep(0.1)
log.info("station1, first")
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
sock.process_tnc_commands(json.dumps(data, indent=None))
time.sleep(0.5)
# Allow enough time for this side to process the disconnect frame.
timeout = time.time() + timeout_duration
while tnc.data_queue_transmit.queue:
if time.time() > timeout:
log.warning("station1", TIMEOUT=True, dq_tx=tnc.data_queue_transmit.queue)
break
time.sleep(0.5)
log.info("station1, final")
# log.info("S1 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
# log.info("S1 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
log.debug("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_tx_check:
assert item in str(
sock.SOCKET_QUEUE.queue
), f"{item} not found in {str(sock.SOCKET_QUEUE.queue)}"
assert '"command_response":"disconnect","status":"OK"' in str(
sock.SOCKET_QUEUE.queue
)
log.warning("station1: Exiting!")
def t_datac0_2(
parent_pipe,
mycall: str,
dxcall: str,
config: Tuple,
tmp_path,
):
log = structlog.get_logger("station2")
orig_tx_func: Callable
orig_rx_func: Callable
log.debug("t_datac0_2:", TMP_PATH=tmp_path)
# Unpack tuple
data, timeout_duration, _, rx_check, _, final_rx_check = config
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
# log.info("S2 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S2 TX: ", TX=frametype)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S2 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S2 RX: ", RX=frametype)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
_, orig_rx_func, orig_tx_func = t_setup(
2,
mycall,
dxcall,
"hfchannel2",
"hfchannel1",
True,
t_transmit,
t_process_data,
tmp_path,
)
log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL)
if "cq" in data:
t_data = {"type": "arq", "command": "stop_transmission"}
sock.process_tnc_commands(json.dumps(t_data, indent=None))
sock.process_tnc_commands(json.dumps(t_data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
# Compare with the string conversion instead of repeatedly dumping
# the queue to an object for comparisons.
while rx_check not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning(
"station2 TIMEOUT",
first=True,
queue=str(sock.SOCKET_QUEUE.queue),
rx_check=rx_check,
)
break
time.sleep(0.5)
log.info("station2, first")
# Allow enough time for this side to receive the disconnect frame.
timeout = time.time() + timeout_duration
while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning("station2", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
break
time.sleep(0.5)
log.info("station2, final")
# log.info("S2 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
# log.info("S2 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
log.debug("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_rx_check:
assert item not in str(
sock.SOCKET_QUEUE.queue
), f"{item} found in {str(sock.SOCKET_QUEUE.queue)}"
assert '"arq":"session","status":"close"' in str(sock.SOCKET_QUEUE.queue)
log.warning("station2: Exiting!")

View file

@ -24,6 +24,7 @@ import sock
import static
import structlog
import ujson as json
from exceptions import NoCallsign
TESTMODE = False
@ -1820,13 +1821,13 @@ class DATA:
dxcallsign:bytes:
"""
if not str(dxcallsign).strip():
self.log.warning("[TNC] Missing required callsign", dxcallsign=dxcallsign)
return
static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
self.send_data_to_socket_queue(
freedata="tnc-message",
ping="transmitting",
)
self.send_data_to_socket_queue(freedata="tnc-message", ping="transmitting")
self.log.info(
"[TNC] PING REQ ["
+ str(self.mycallsign, "UTF-8")

6
tnc/exceptions.py Normal file
View file

@ -0,0 +1,6 @@
"""
Custom exceptions for FreeDATA Python code
"""
class NoCallsign(UserWarning):
"""Raised when a required callsign is not provided"""

View file

@ -30,6 +30,7 @@ import helpers
import static
import structlog
import ujson as json
from exceptions import NoCallsign
SOCKET_QUEUE = queue.Queue()
DAEMON_QUEUE = queue.Queue()
@ -272,6 +273,8 @@ def process_tnc_commands(data):
# send ping frame and wait for ACK
try:
dxcallsign = received_json["dxcallsign"]
if not str(dxcallsign).strip():
raise NoCallsign
# additional step for beeing sure our callsign is correctly
# in case we are not getting a station ssid
@ -281,6 +284,9 @@ def process_tnc_commands(data):
data_handler.DATA_QUEUE_TRANSMIT.put(["PING", dxcallsign])
command_response("ping", True)
except NoCallsign:
command_response("ping", False)
log.warning("[SCK] callsign required for ping", command=received_json)
except Exception as err:
command_response("ping", False)
log.warning(