FreeDATA/test/util_datac13.py

310 lines
11 KiB
Python
Raw Normal View History

2022-06-15 01:48:15 +00:00
# -*- coding: utf-8 -*-
"""
Send- and receive-side station emulator for control frame tests over a high quality
simulated audio channel.
2022-06-15 01:48:15 +00:00
2023-10-22 08:00:37 +00:00
Near end-to-end test for sending / receiving control frames through the Modem and modem
and back through on the other station. Data injection initiates from the queue used
2023-10-22 08:00:37 +00:00
by the daemon process into and out of the ModemParam.
2023-04-21 13:04:09 +00:00
Invoked from test_datac13.py.
2022-06-19 14:04:46 +00:00
@author: N2KIQ
2022-06-15 01:48:15 +00:00
"""
import json
import time
from pprint import pformat
from typing import Callable, Tuple
import data_handler
import helpers
import modem
import sock
2023-10-22 09:55:22 +00:00
from global_instances import ARQ, AudioParam, Beacon, Channel, Daemon, HamlibParam, ModemParam, Station, Statistics, TCIParam, Modem
2023-04-28 10:40:33 +00:00
from static import FRAME_TYPE as FR_TYPE
2022-06-15 01:48:15 +00:00
import structlog
2023-04-27 19:43:56 +00:00
#from static import FRAME_TYPE as FR_TYPE
2022-06-15 01:48:15 +00:00
def t_setup(
station: int,
2022-06-15 01:48:15 +00:00
mycall: str,
dxcall: str,
rx_channel: str,
tx_channel: str,
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
2023-05-08 09:24:52 +00:00
data_handler.TESTMODE = True
2023-02-10 09:17:08 +00:00
# Enable socket testmode for overriding socket class
sock.TESTMODE = True
2022-06-15 01:48:15 +00:00
modem.RXCHANNEL = tmp_path / rx_channel
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / tx_channel
2023-04-27 19:43:56 +00:00
HamlibParam.hamlib_radiocontrol = "disabled"
2023-10-22 08:00:37 +00:00
ModemParam.low_bandwidth_mode = lowbwmode or True
2023-04-27 19:43:56 +00:00
Station.mygrid = bytes("AA12aa", "utf-8")
2023-10-22 08:00:37 +00:00
ModemParam.respond_to_cq = True
2023-04-27 19:43:56 +00:00
Station.ssid_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
2022-06-15 01:48:15 +00:00
mycallsign = helpers.callsign_to_bytes(mycall)
mycallsign = helpers.bytes_to_callsign(mycallsign)
2023-04-27 19:43:56 +00:00
Station.mycallsign = mycallsign
Station.mycallsign_crc = helpers.get_crc_24(Station.mycallsign)
2022-06-15 01:48:15 +00:00
dxcallsign = helpers.callsign_to_bytes(dxcall)
dxcallsign = helpers.bytes_to_callsign(dxcallsign)
2023-04-27 19:43:56 +00:00
Station.dxcallsign = dxcallsign
Station.dxcallsign_crc = helpers.get_crc_24(Station.dxcallsign)
2022-06-15 01:48:15 +00:00
2023-10-22 08:00:37 +00:00
# Create the Modem
modem_data_handler = data_handler.DATA()
2022-06-15 01:48:15 +00:00
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
2023-10-22 08:00:37 +00:00
modem_data_handler.log = structlog.get_logger(f"station{station}_DATA")
2022-06-15 01:48:15 +00:00
# Limit the frame-ack timeout
2023-10-22 08:00:37 +00:00
modem_data_handler.time_list_low_bw = [8, 8, 8]
modem_data_handler.time_list_high_bw = [8, 8, 8]
modem_data_handler.time_list = [8, 8, 8]
2022-06-15 01:48:15 +00:00
# Limit number of retries
2023-10-22 08:00:37 +00:00
modem_data_handler.rx_n_max_retries_per_burst = 4
2023-05-04 14:47:33 +00:00
ModemParam.tx_delay = 50 # add additional delay time for passing test
2023-04-28 10:40:33 +00:00
2022-06-15 01:48:15 +00:00
# Create the modem
t_modem = modem.RF()
orig_tx_func = modem.RF.transmit
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger(f"station{station}_RF")
2022-06-15 01:48:15 +00:00
2023-10-22 08:00:37 +00:00
return modem_data_handler, orig_rx_func, orig_tx_func
2022-06-15 01:48:15 +00:00
2023-04-21 13:04:09 +00:00
def t_datac13_1(
2022-06-15 01:48:15 +00:00
parent_pipe,
mycall: str,
dxcall: str,
config: Tuple,
tmp_path,
):
log = structlog.get_logger("station1")
orig_tx_func: Callable
orig_rx_func: Callable
2023-04-21 13:04:09 +00:00
log.debug("t_datac13_1:", TMP_PATH=tmp_path)
2022-06-15 01:48:15 +00:00
# Unpack tuple
data, timeout_duration, tx_check, _, final_tx_check, _ = config
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
# log.info("S1 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
2022-06-15 01:48:15 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S1 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
2022-06-15 01:48:15 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
2023-10-22 08:00:37 +00:00
modem_data_handler, orig_rx_func, orig_tx_func = t_setup(
1,
2022-06-15 01:48:15 +00:00
mycall,
dxcall,
"hfchannel1",
"hfchannel2",
True,
t_transmit,
t_process_data,
tmp_path,
)
2023-04-21 13:04:09 +00:00
log.info("t_datac13_1:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac13_1:", TXCHANNEL=modem.TXCHANNEL)
2022-06-15 01:48:15 +00:00
time.sleep(0.5)
if "stop" in data["command"]:
2023-10-22 08:00:37 +00:00
log.debug("t_datac13_1: STOP test, setting Modem state")
ModemParam.modem_state = "BUSY"
2023-04-27 19:43:56 +00:00
ARQ.arq_state = True
2023-10-22 08:00:37 +00:00
sock.ThreadedTCPRequestHandler.process_modem_commands(None,json.dumps(data, indent=None))
2022-06-15 01:48:15 +00:00
time.sleep(0.5)
2023-10-22 08:00:37 +00:00
sock.ThreadedTCPRequestHandler.process_modem_commands(None,json.dumps(data, indent=None))
2022-06-15 01:48:15 +00:00
# Assure the test completes.
2023-05-08 09:24:52 +00:00
timeout = time.time() + timeout_duration + 5
2022-06-15 01:48:15 +00:00
while tx_check not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning(
"station1 TIMEOUT",
first=True,
queue=str(sock.SOCKET_QUEUE.queue),
tx_check=tx_check,
)
break
2023-04-28 10:40:33 +00:00
time.sleep(0.5)
2022-06-15 01:48:15 +00:00
log.info("station1, first")
# override ARQ SESSION STATE for allowing disconnect command
2023-04-27 19:43:56 +00:00
ARQ.arq_session_state = "connected"
2022-06-15 01:48:15 +00:00
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
2023-10-22 08:00:37 +00:00
sock.ThreadedTCPRequestHandler.process_modem_commands(None,json.dumps(data, indent=None))
2022-06-15 01:48:15 +00:00
time.sleep(0.5)
# Allow enough time for this side to process the disconnect frame.
2023-04-28 10:40:33 +00:00
timeout = time.time() + timeout_duration
2023-10-22 08:00:37 +00:00
while modem_data_handler.data_queue_transmit.queue:
2022-06-15 01:48:15 +00:00
if time.time() > timeout:
2023-10-22 08:00:37 +00:00
log.warning("station1", TIMEOUT=True, dq_tx=modem_data_handler.data_queue_transmit.queue)
2022-06-15 01:48:15 +00:00
break
time.sleep(0.5)
log.info("station1, final")
2023-10-22 08:00:37 +00:00
# log.info("S1 DQT: ", DQ_Tx=pformat(ModemParam.data_queue_transmit.queue))
# log.info("S1 DQR: ", DQ_Rx=pformat(ModemParam.data_queue_received.queue))
2022-06-15 01:48:15 +00:00
log.debug("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_tx_check:
assert item in str(
sock.SOCKET_QUEUE.queue
), f"{item} not found in {str(sock.SOCKET_QUEUE.queue)}"
assert ':"failed"' not in str(sock.SOCKET_QUEUE.queue)
assert '"command_response":"disconnect","status":"OK"' in str(
sock.SOCKET_QUEUE.queue
)
log.warning("station1: Exiting!")
2023-04-21 13:04:09 +00:00
def t_datac13_2(
2022-06-15 01:48:15 +00:00
parent_pipe,
mycall: str,
dxcall: str,
config: Tuple,
tmp_path,
):
log = structlog.get_logger("station2")
orig_tx_func: Callable
orig_rx_func: Callable
2023-04-21 13:04:09 +00:00
log.debug("t_datac13_2:", TMP_PATH=tmp_path)
2022-06-15 01:48:15 +00:00
# Unpack tuple
data, timeout_duration, _, rx_check, _, final_rx_check = config
2022-06-15 01:48:15 +00:00
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
# log.info("S2 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
2022-06-15 01:48:15 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S2 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
2022-06-15 01:48:15 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
_, orig_rx_func, orig_tx_func = t_setup(
2,
2022-06-15 01:48:15 +00:00
mycall,
dxcall,
"hfchannel2",
"hfchannel1",
True,
t_transmit,
t_process_data,
tmp_path,
)
2023-04-21 13:04:09 +00:00
log.info("t_datac13_2:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac13_2:", TXCHANNEL=modem.TXCHANNEL)
2022-06-15 01:48:15 +00:00
2023-10-22 09:13:02 +00:00
# TODO Why do we need this when calling CQ?
2023-05-08 09:24:52 +00:00
#if "cq" in data:
# t_data = {"type": "arq", "command": "stop_transmission"}
2023-10-22 08:00:37 +00:00
# sock.ThreadedTCPRequestHandler.process_modem_commands(None,json.dumps(t_data, indent=None))
# sock.ThreadedTCPRequestHandler.process_modem_commands(None,json.dumps(t_data, indent=None))
2022-06-15 01:48:15 +00:00
# Assure the test completes.
2023-04-28 10:40:33 +00:00
timeout = time.time() + timeout_duration
2022-06-15 01:48:15 +00:00
# Compare with the string conversion instead of repeatedly dumping
# the queue to an object for comparisons.
while rx_check not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning(
"station2 TIMEOUT",
first=True,
queue=str(sock.SOCKET_QUEUE.queue),
rx_check=rx_check,
)
break
time.sleep(0.5)
log.info("station2, first")
# Allow enough time for this side to receive the disconnect frame.
2023-04-28 10:40:33 +00:00
timeout = time.time() + timeout_duration
2022-06-15 01:48:15 +00:00
while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning("station2", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
break
time.sleep(0.5)
log.info("station2, final")
2023-10-22 08:00:37 +00:00
# log.info("S2 DQT: ", DQ_Tx=pformat(ModemParam.data_queue_transmit.queue))
# log.info("S2 DQR: ", DQ_Rx=pformat(ModemParam.data_queue_received.queue))
2022-06-15 01:48:15 +00:00
log.debug("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_rx_check:
assert item in str(
sock.SOCKET_QUEUE.queue
), f"{item} not found in {str(sock.SOCKET_QUEUE.queue)}"
2023-10-22 09:13:02 +00:00
# TODO Not sure why we need this for every test run
2022-12-24 22:13:27 +00:00
# assert '"arq":"session","status":"close"' in str(sock.SOCKET_QUEUE.queue)
2022-06-15 01:48:15 +00:00
log.warning("station2: Exiting!")