2022-06-15 01:48:15 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
2022-06-19 13:55:50 +00:00
|
|
|
Send- and receive-side station emulator for control frame tests over a high quality
|
|
|
|
simulated audio channel.
|
2022-06-15 01:48:15 +00:00
|
|
|
|
2022-06-19 13:55:50 +00:00
|
|
|
Near end-to-end test for sending / receiving control frames through the TNC and modem
|
|
|
|
and back through on the other station. Data injection initiates from the queue used
|
|
|
|
by the daemon process into and out of the TNC.
|
|
|
|
|
|
|
|
Invoked from test_datac0.py.
|
2022-06-19 14:04:46 +00:00
|
|
|
|
|
|
|
@author: N2KIQ
|
2022-06-15 01:48:15 +00:00
|
|
|
"""
|
|
|
|
|
|
|
|
import json
|
|
|
|
import time
|
|
|
|
from pprint import pformat
|
|
|
|
from typing import Callable, Tuple
|
|
|
|
|
|
|
|
import data_handler
|
|
|
|
import helpers
|
|
|
|
import modem
|
|
|
|
import sock
|
|
|
|
import static
|
|
|
|
import structlog
|
2022-06-24 17:07:34 +00:00
|
|
|
from static import FRAME_TYPE as FR_TYPE
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
|
|
|
|
def t_setup(
|
2022-06-17 11:41:52 +00:00
|
|
|
station: int,
|
2022-06-15 01:48:15 +00:00
|
|
|
mycall: str,
|
|
|
|
dxcall: str,
|
|
|
|
rx_channel: str,
|
|
|
|
tx_channel: str,
|
|
|
|
lowbwmode: bool,
|
|
|
|
t_transmit,
|
|
|
|
t_process_data,
|
|
|
|
tmp_path,
|
|
|
|
):
|
|
|
|
# Disable data_handler testmode - This is required to test a conversation.
|
|
|
|
data_handler.TESTMODE = False
|
|
|
|
modem.RXCHANNEL = tmp_path / rx_channel
|
|
|
|
modem.TESTMODE = True
|
|
|
|
modem.TXCHANNEL = tmp_path / tx_channel
|
|
|
|
static.HAMLIB_RADIOCONTROL = "disabled"
|
|
|
|
static.LOW_BANDWIDTH_MODE = lowbwmode or True
|
|
|
|
static.MYGRID = bytes("AA12aa", "utf-8")
|
|
|
|
static.RESPOND_TO_CQ = True
|
|
|
|
static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
|
|
|
|
|
|
|
mycallsign = helpers.callsign_to_bytes(mycall)
|
|
|
|
mycallsign = helpers.bytes_to_callsign(mycallsign)
|
|
|
|
static.MYCALLSIGN = mycallsign
|
|
|
|
static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN)
|
|
|
|
|
|
|
|
dxcallsign = helpers.callsign_to_bytes(dxcall)
|
|
|
|
dxcallsign = helpers.bytes_to_callsign(dxcallsign)
|
|
|
|
static.DXCALLSIGN = dxcallsign
|
|
|
|
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
|
|
|
|
|
|
|
|
# Create the TNC
|
|
|
|
tnc = data_handler.DATA()
|
|
|
|
orig_rx_func = data_handler.DATA.process_data
|
|
|
|
data_handler.DATA.process_data = t_process_data
|
2022-06-17 11:41:52 +00:00
|
|
|
tnc.log = structlog.get_logger(f"station{station}_DATA")
|
2022-06-15 01:48:15 +00:00
|
|
|
# Limit the frame-ack timeout
|
|
|
|
tnc.time_list_low_bw = [3, 1, 1]
|
|
|
|
tnc.time_list_high_bw = [3, 1, 1]
|
|
|
|
tnc.time_list = [3, 1, 1]
|
|
|
|
# Limit number of retries
|
|
|
|
tnc.rx_n_max_retries_per_burst = 4
|
|
|
|
|
|
|
|
# Create the modem
|
|
|
|
t_modem = modem.RF()
|
|
|
|
orig_tx_func = modem.RF.transmit
|
|
|
|
modem.RF.transmit = t_transmit
|
2022-06-17 11:41:52 +00:00
|
|
|
t_modem.log = structlog.get_logger(f"station{station}_RF")
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
return tnc, orig_rx_func, orig_tx_func
|
|
|
|
|
|
|
|
|
|
|
|
def t_datac0_1(
|
|
|
|
parent_pipe,
|
|
|
|
mycall: str,
|
|
|
|
dxcall: str,
|
|
|
|
config: Tuple,
|
|
|
|
tmp_path,
|
|
|
|
):
|
|
|
|
log = structlog.get_logger("station1")
|
|
|
|
orig_tx_func: Callable
|
|
|
|
orig_rx_func: Callable
|
|
|
|
log.debug("t_datac0_1:", TMP_PATH=tmp_path)
|
|
|
|
|
|
|
|
# Unpack tuple
|
|
|
|
data, timeout_duration, tx_check, _, final_tx_check, _ = config
|
|
|
|
|
|
|
|
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
|
|
|
|
"""'Wrap' RF.transmit function to extract the arguments."""
|
|
|
|
nonlocal orig_tx_func, parent_pipe
|
|
|
|
|
|
|
|
t_frames = frames
|
|
|
|
parent_pipe.send(t_frames)
|
|
|
|
# log.info("S1 TX: ", frames=t_frames)
|
|
|
|
for item in t_frames:
|
|
|
|
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
2022-06-24 17:07:34 +00:00
|
|
|
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
|
|
|
# original function captured before this one was put in place.
|
|
|
|
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
|
|
|
|
|
|
|
|
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
|
|
|
|
"""'Wrap' DATA.process_data function to extract the arguments."""
|
|
|
|
nonlocal orig_rx_func, parent_pipe
|
|
|
|
|
|
|
|
t_bytes_out = bytes(bytes_out)
|
|
|
|
parent_pipe.send(t_bytes_out)
|
|
|
|
log.debug(
|
|
|
|
"S1 RX: ",
|
|
|
|
bytes_out=t_bytes_out,
|
|
|
|
bytes_per_frame=bytes_per_frame,
|
|
|
|
)
|
|
|
|
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
2022-06-24 17:07:34 +00:00
|
|
|
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
|
|
|
# original function captured before this one was put in place.
|
|
|
|
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
|
|
|
|
|
|
|
|
tnc, orig_rx_func, orig_tx_func = t_setup(
|
2022-06-17 11:41:52 +00:00
|
|
|
1,
|
2022-06-15 01:48:15 +00:00
|
|
|
mycall,
|
|
|
|
dxcall,
|
|
|
|
"hfchannel1",
|
|
|
|
"hfchannel2",
|
|
|
|
True,
|
|
|
|
t_transmit,
|
|
|
|
t_process_data,
|
|
|
|
tmp_path,
|
|
|
|
)
|
|
|
|
|
|
|
|
log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL)
|
|
|
|
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
|
|
|
|
|
2022-06-17 19:45:52 +00:00
|
|
|
time.sleep(0.5)
|
2022-06-24 17:07:34 +00:00
|
|
|
if "stop" in data["command"]:
|
|
|
|
log.debug("t_datac0_1: STOP test, setting TNC state")
|
|
|
|
static.TNC_STATE = "BUSY"
|
|
|
|
static.ARQ_STATE = True
|
2022-06-15 01:48:15 +00:00
|
|
|
sock.process_tnc_commands(json.dumps(data, indent=None))
|
|
|
|
time.sleep(0.5)
|
|
|
|
sock.process_tnc_commands(json.dumps(data, indent=None))
|
|
|
|
|
|
|
|
# Assure the test completes.
|
2022-06-17 19:45:52 +00:00
|
|
|
timeout = time.time() + timeout_duration
|
2022-06-15 01:48:15 +00:00
|
|
|
while tx_check not in str(sock.SOCKET_QUEUE.queue):
|
|
|
|
if time.time() > timeout:
|
|
|
|
log.warning(
|
|
|
|
"station1 TIMEOUT",
|
|
|
|
first=True,
|
|
|
|
queue=str(sock.SOCKET_QUEUE.queue),
|
|
|
|
tx_check=tx_check,
|
|
|
|
)
|
|
|
|
break
|
|
|
|
time.sleep(0.1)
|
|
|
|
log.info("station1, first")
|
2022-11-20 23:18:01 +00:00
|
|
|
# override ARQ SESSION STATE for allowing disconnect command
|
|
|
|
static.ARQ_SESSION_STATE = "connected"
|
2022-06-15 01:48:15 +00:00
|
|
|
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
|
|
|
|
sock.process_tnc_commands(json.dumps(data, indent=None))
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
# Allow enough time for this side to process the disconnect frame.
|
|
|
|
timeout = time.time() + timeout_duration
|
|
|
|
while tnc.data_queue_transmit.queue:
|
|
|
|
if time.time() > timeout:
|
2022-06-17 19:45:52 +00:00
|
|
|
log.warning("station1", TIMEOUT=True, dq_tx=tnc.data_queue_transmit.queue)
|
2022-06-15 01:48:15 +00:00
|
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
log.info("station1, final")
|
|
|
|
|
|
|
|
# log.info("S1 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
|
|
|
|
# log.info("S1 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
|
|
|
|
log.debug("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
|
|
|
|
|
|
|
|
for item in final_tx_check:
|
|
|
|
assert item in str(
|
|
|
|
sock.SOCKET_QUEUE.queue
|
|
|
|
), f"{item} not found in {str(sock.SOCKET_QUEUE.queue)}"
|
|
|
|
assert ':"failed"' not in str(sock.SOCKET_QUEUE.queue)
|
|
|
|
|
|
|
|
assert '"command_response":"disconnect","status":"OK"' in str(
|
|
|
|
sock.SOCKET_QUEUE.queue
|
|
|
|
)
|
|
|
|
log.warning("station1: Exiting!")
|
|
|
|
|
2022-06-17 11:41:52 +00:00
|
|
|
|
2022-06-15 01:48:15 +00:00
|
|
|
def t_datac0_2(
|
|
|
|
parent_pipe,
|
|
|
|
mycall: str,
|
|
|
|
dxcall: str,
|
|
|
|
config: Tuple,
|
|
|
|
tmp_path,
|
|
|
|
):
|
|
|
|
log = structlog.get_logger("station2")
|
|
|
|
orig_tx_func: Callable
|
|
|
|
orig_rx_func: Callable
|
|
|
|
log.debug("t_datac0_2:", TMP_PATH=tmp_path)
|
|
|
|
|
|
|
|
# Unpack tuple
|
2022-06-17 19:45:52 +00:00
|
|
|
data, timeout_duration, _, rx_check, _, final_rx_check = config
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
|
|
|
|
"""'Wrap' RF.transmit function to extract the arguments."""
|
|
|
|
nonlocal orig_tx_func, parent_pipe
|
|
|
|
|
|
|
|
t_frames = frames
|
|
|
|
parent_pipe.send(t_frames)
|
|
|
|
# log.info("S2 TX: ", frames=t_frames)
|
|
|
|
for item in t_frames:
|
|
|
|
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
2022-06-24 17:07:34 +00:00
|
|
|
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
|
|
|
# original function captured before this one was put in place.
|
|
|
|
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
|
|
|
|
|
|
|
|
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
|
|
|
|
"""'Wrap' DATA.process_data function to extract the arguments."""
|
|
|
|
nonlocal orig_rx_func, parent_pipe
|
|
|
|
|
|
|
|
t_bytes_out = bytes(bytes_out)
|
|
|
|
parent_pipe.send(t_bytes_out)
|
|
|
|
log.debug(
|
|
|
|
"S2 RX: ",
|
|
|
|
bytes_out=t_bytes_out,
|
|
|
|
bytes_per_frame=bytes_per_frame,
|
|
|
|
)
|
|
|
|
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
2022-06-24 17:07:34 +00:00
|
|
|
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
|
2022-06-15 01:48:15 +00:00
|
|
|
|
|
|
|
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
|
|
|
# original function captured before this one was put in place.
|
|
|
|
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
|
|
|
|
|
|
|
|
_, orig_rx_func, orig_tx_func = t_setup(
|
2022-06-17 11:41:52 +00:00
|
|
|
2,
|
2022-06-15 01:48:15 +00:00
|
|
|
mycall,
|
|
|
|
dxcall,
|
|
|
|
"hfchannel2",
|
|
|
|
"hfchannel1",
|
|
|
|
True,
|
|
|
|
t_transmit,
|
|
|
|
t_process_data,
|
|
|
|
tmp_path,
|
|
|
|
)
|
|
|
|
|
|
|
|
log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL)
|
|
|
|
log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL)
|
|
|
|
|
2022-06-17 19:45:52 +00:00
|
|
|
if "cq" in data:
|
|
|
|
t_data = {"type": "arq", "command": "stop_transmission"}
|
|
|
|
sock.process_tnc_commands(json.dumps(t_data, indent=None))
|
|
|
|
sock.process_tnc_commands(json.dumps(t_data, indent=None))
|
|
|
|
|
2022-06-15 01:48:15 +00:00
|
|
|
# Assure the test completes.
|
|
|
|
timeout = time.time() + timeout_duration
|
|
|
|
# Compare with the string conversion instead of repeatedly dumping
|
|
|
|
# the queue to an object for comparisons.
|
|
|
|
while rx_check not in str(sock.SOCKET_QUEUE.queue):
|
|
|
|
if time.time() > timeout:
|
|
|
|
log.warning(
|
|
|
|
"station2 TIMEOUT",
|
|
|
|
first=True,
|
|
|
|
queue=str(sock.SOCKET_QUEUE.queue),
|
|
|
|
rx_check=rx_check,
|
|
|
|
)
|
|
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
log.info("station2, first")
|
|
|
|
|
|
|
|
# Allow enough time for this side to receive the disconnect frame.
|
|
|
|
timeout = time.time() + timeout_duration
|
|
|
|
while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue):
|
|
|
|
if time.time() > timeout:
|
|
|
|
log.warning("station2", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
|
|
|
|
break
|
|
|
|
time.sleep(0.5)
|
|
|
|
log.info("station2, final")
|
|
|
|
|
|
|
|
# log.info("S2 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
|
|
|
|
# log.info("S2 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
|
|
|
|
log.debug("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
|
|
|
|
|
|
|
|
for item in final_rx_check:
|
|
|
|
assert item in str(
|
|
|
|
sock.SOCKET_QUEUE.queue
|
|
|
|
), f"{item} not found in {str(sock.SOCKET_QUEUE.queue)}"
|
2022-12-24 22:13:27 +00:00
|
|
|
# TODO: Not sure why we need this for every test run
|
|
|
|
# assert '"arq":"session","status":"close"' in str(sock.SOCKET_QUEUE.queue)
|
2022-06-15 01:48:15 +00:00
|
|
|
log.warning("station2: Exiting!")
|