FreeDATA/test/util_chat_text_2.py

162 lines
5.4 KiB
Python
Raw Normal View History

2022-05-22 13:41:21 +00:00
# -*- coding: utf-8 -*-
"""
Receive-side station emulator for connect frame tests over a high quality simulated audio channel.
2022-05-22 13:41:21 +00:00
Near end-to-end test for sending / receiving connection control frames through the
TNC and modem and back through on the other station. Data injection initiates from the
queue used by the daemon process into and out of the TNC.
Invoked from test_chat_text.py.
2022-06-19 14:04:46 +00:00
@author: N2KIQ
2022-05-22 13:41:21 +00:00
"""
import time
from pprint import pformat
from typing import Callable
2022-05-22 13:41:21 +00:00
import data_handler
import helpers
import modem
import sock
2022-05-22 13:41:21 +00:00
import static
2022-06-05 01:55:19 +00:00
import structlog
2022-05-22 13:41:21 +00:00
def t_setup(
mycall: str,
dxcall: str,
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
modem.RXCHANNEL = tmp_path / "hfchannel2"
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / "hfchannel1"
static.HAMLIB_RADIOCONTROL = "disabled"
static.LOW_BANDWIDTH_MODE = lowbwmode
static.MYGRID = bytes("AA12aa", "utf-8")
static.RESPOND_TO_CQ = True
static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
mycallsign = helpers.callsign_to_bytes(mycall)
mycallsign = helpers.bytes_to_callsign(mycallsign)
static.MYCALLSIGN = mycallsign
static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN)
dxcallsign = helpers.callsign_to_bytes(dxcall)
dxcallsign = helpers.bytes_to_callsign(dxcallsign)
static.DXCALLSIGN = dxcallsign
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
# Create the TNC
tnc = data_handler.DATA()
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
tnc.log = structlog.get_logger("station2_DATA")
# Limit the frame-ack timeout
tnc.time_list_low_bw = [1, 1, 1]
tnc.time_list_high_bw = [1, 1, 1]
tnc.time_list = [1, 1, 1]
# Limit number of retries
2022-06-16 01:12:21 +00:00
tnc.rx_n_max_retries_per_burst = 5
# Create the modem
t_modem = modem.RF()
orig_tx_func = modem.RF.transmit
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger("station2_RF")
return tnc, orig_rx_func, orig_tx_func
2022-05-22 13:41:21 +00:00
def t_highsnr_arq_short_station2(
parent_pipe,
freedv_mode: str,
n_frames_per_burst: int,
mycall: str,
dxcall: str,
message: str,
2022-05-25 01:12:29 +00:00
lowbwmode: bool,
tmp_path,
2022-05-22 13:41:21 +00:00
):
2022-05-31 23:39:12 +00:00
log = structlog.get_logger("station2")
orig_tx_func: Callable
orig_rx_func: Callable
log.info("t_highsnr_arq_short_station2:", TMP_PATH=tmp_path)
2022-05-22 13:41:21 +00:00
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
2022-05-31 23:39:12 +00:00
# log.info("S2 TX: ", frames=t_frames)
for item in t_frames:
2022-06-05 01:55:19 +00:00
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S2 TX: ", TX=frametype)
2022-05-22 13:41:21 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
2022-05-22 13:41:21 +00:00
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S2 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S2 RX: ", RX=frametype)
2022-05-22 13:41:21 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path
)
2022-05-22 13:41:21 +00:00
log.info("t_highsnr_arq_short_station2:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_highsnr_arq_short_station2:", TXCHANNEL=modem.TXCHANNEL)
# Assure the test completes.
2022-06-13 01:25:40 +00:00
timeout = time.time() + 25
# Compare with the string conversion instead of repeatedly dumping
# the queue to an object for comparisons.
while (
'"arq":"transmission","status":"received"' not in str(sock.SOCKET_QUEUE.queue)
or static.ARQ_STATE
):
if time.time() > timeout:
log.warning("station2 TIMEOUT", first=True)
break
time.sleep(0.5)
log.info("station2, first", arq_state=pformat(static.ARQ_STATE))
2022-05-31 23:39:12 +00:00
# Allow enough time for this side to receive the disconnect frame.
2022-06-13 01:25:40 +00:00
timeout = time.time() + 20
2022-06-08 01:33:42 +00:00
while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue):
2022-05-31 23:39:12 +00:00
if time.time() > timeout:
2022-06-19 20:45:31 +00:00
log.warning("station2", TIMEOUT=True)
2022-05-31 23:39:12 +00:00
break
time.sleep(0.5)
log.info("station2", arq_state=pformat(static.ARQ_STATE))
# log.info("S2 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
# log.info("S2 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
2022-06-16 01:12:21 +00:00
log.info("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
2022-06-15 23:25:59 +00:00
2022-06-06 22:18:52 +00:00
assert '"arq":"transmission","status":"received"' in str(sock.SOCKET_QUEUE.queue)
2022-06-15 23:25:59 +00:00
2022-06-06 22:18:52 +00:00
assert '"arq":"session","status":"close"' in str(sock.SOCKET_QUEUE.queue)
2022-06-19 20:45:31 +00:00
log.warning("station2: Exiting!")