FreeDATA/test/util_chat_text_1.py

220 lines
7.6 KiB
Python
Raw Normal View History

2022-05-22 13:41:21 +00:00
# -*- coding: utf-8 -*-
"""
Send-side station emulator for connect frame tests over a high quality simulated audio channel.
2022-05-22 13:41:21 +00:00
Near end-to-end test for sending / receiving connection control frames through the
TNC and modem and back through on the other station. Data injection initiates from the
queue used by the daemon process into and out of the TNC.
Invoked from test_chat_text.py.
2022-06-19 14:04:46 +00:00
@author: N2KIQ
2022-05-22 13:41:21 +00:00
"""
import base64
import json
import time
from pprint import pformat
from typing import Callable
2022-05-22 13:41:21 +00:00
import codec2
import data_handler
import helpers
import modem
import sock
2023-04-27 19:43:56 +00:00
from static import ARQ, AudioParam, Beacon, Channel, Daemon, HamlibParam, ModemParam, Station, Statistics, TCIParam, TNC
2022-06-05 01:55:19 +00:00
import structlog
2022-05-22 13:41:21 +00:00
def t_setup(
mycall: str,
dxcall: str,
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
modem.RXCHANNEL = tmp_path / "hfchannel1"
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / "hfchannel2"
2023-04-27 19:43:56 +00:00
HamlibParam.hamlib_radiocontrol = "disabled"
TNC.low_bandwidth_mode = lowbwmode
Station.mygrid = bytes("AA12aa", "utf-8")
TNC.respond_to_cq = True
Station.ssid_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# override ARQ SESSION STATE for allowing disconnect command
2023-04-27 19:43:56 +00:00
ARQ.arq_session_state = "connected"
mycallsign = helpers.callsign_to_bytes(mycall)
mycallsign = helpers.bytes_to_callsign(mycallsign)
2023-04-27 19:43:56 +00:00
Station.mycallsign = mycallsign
Station.mycallsign_crc = helpers.get_crc_24(Station.mycallsign)
dxcallsign = helpers.callsign_to_bytes(dxcall)
dxcallsign = helpers.bytes_to_callsign(dxcallsign)
2023-04-27 19:43:56 +00:00
Station.dxcallsign = dxcallsign
Station.dxcallsign_crc = helpers.get_crc_24(Station.dxcallsign)
# Create the TNC
tnc = data_handler.DATA()
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
tnc.log = structlog.get_logger("station1_DATA")
# Limit the frame-ack timeout
tnc.time_list_low_bw = [3, 1, 1]
tnc.time_list_high_bw = [3, 1, 1]
tnc.time_list = [3, 1, 1]
# Limit number of retries
2022-06-16 01:12:21 +00:00
tnc.rx_n_max_retries_per_burst = 5
# Create the modem
t_modem = modem.RF()
orig_tx_func = modem.RF.transmit
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger("station1_RF")
return tnc, orig_rx_func, orig_tx_func
2022-05-22 13:41:21 +00:00
def t_highsnr_arq_short_station1(
parent_pipe,
freedv_mode: str,
n_frames_per_burst: int,
mycall: str,
dxcall: str,
message: str,
2022-05-25 01:12:29 +00:00
lowbwmode: bool,
tmp_path,
2022-05-22 13:41:21 +00:00
):
2022-05-31 23:39:12 +00:00
log = structlog.get_logger("station1")
orig_tx_func: Callable
orig_rx_func: Callable
log.info("t_highsnr_arq_short_station1:", TMP_PATH=tmp_path)
2022-05-22 13:41:21 +00:00
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
nonlocal orig_tx_func, parent_pipe
t_frames = frames
parent_pipe.send(t_frames)
2022-05-31 23:39:12 +00:00
# log.info("S1 TX: ", frames=t_frames)
for item in t_frames:
2022-06-05 01:55:19 +00:00
frametype = int.from_bytes(item[:1], "big") # type: ignore
2022-06-19 20:45:31 +00:00
log.info("S1 TX: ", mode=static.FRAME_TYPE(frametype).name)
if (
2023-04-27 19:43:56 +00:00
TNC.low_bandwidth_mode
2022-06-19 20:45:31 +00:00
and frametype == static.FRAME_TYPE.ARQ_DC_OPEN_W.value
):
mesg = (
"enqueue_frame_for_tx: Low BW global "
"but DC Open narrow frame NOT chosen."
)
# Low bandwidth data type, wide bandwidth frame type
log.error(mesg)
assert False, mesg
if (
2023-04-27 19:43:56 +00:00
not TNC.low_bandwidth_mode
2022-06-19 20:45:31 +00:00
and frametype == static.FRAME_TYPE.ARQ_DC_OPEN_N.value
):
mesg = (
"enqueue_frame_for_tx: High BW global "
"but DC Open wide frame NOT chosen."
)
# High bandwidth data type, low bandwidth frame type
log.error(mesg)
assert False, mesg
2022-05-22 13:41:21 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore
def t_process_data(self, bytes_out, freedv, bytes_per_frame: int):
"""'Wrap' DATA.process_data function to extract the arguments."""
nonlocal orig_rx_func, parent_pipe
2022-05-22 13:41:21 +00:00
t_bytes_out = bytes(bytes_out)
parent_pipe.send(t_bytes_out)
log.debug(
"S1 RX: ",
bytes_out=t_bytes_out,
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S1 RX: ", RX=frametype)
2022-05-22 13:41:21 +00:00
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path
)
2022-05-22 13:41:21 +00:00
log.info("t_highsnr_arq_short_station1:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_highsnr_arq_short_station1:", TXCHANNEL=modem.TXCHANNEL)
# Construct message to dxstation.
b64_str = str(base64.b64encode(bytes(message, "UTF-8")), "UTF-8").strip()
2022-05-22 13:41:21 +00:00
data = {
"type": "arq",
"command": "send_raw",
"parameter": [
{
"data": b64_str,
"dxcallsign": dxcall,
2022-05-31 23:39:12 +00:00
"mode": codec2.FREEDV_MODE[freedv_mode].value,
"n_frames": n_frames_per_burst,
2022-05-22 13:41:21 +00:00
}
],
}
sock.process_tnc_commands(json.dumps(data, indent=None))
2022-05-22 13:41:21 +00:00
# Assure the test completes.
timeout = time.time() + 25
# Compare with the string conversion instead of repeatedly dumping
# the queue to an object for comparisons.
2022-06-08 01:33:42 +00:00
while '"arq":"transmission","status":"transmitted"' not in str(
sock.SOCKET_QUEUE.queue
):
2022-05-31 23:39:12 +00:00
if time.time() > timeout:
log.warning("station1 TIMEOUT", first=True)
2022-05-31 23:39:12 +00:00
break
time.sleep(0.1)
2023-04-27 19:43:56 +00:00
log.info("station1, first", arq_state=pformat(ARQ.arq_state))
2022-05-22 13:41:21 +00:00
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
sock.process_tnc_commands(json.dumps(data, indent=None))
time.sleep(0.5)
# override ARQ SESSION STATE for allowing disconnect command
2023-04-27 19:43:56 +00:00
ARQ.arq_session_state = "connected"
sock.process_tnc_commands(json.dumps(data, indent=None))
2022-05-22 13:41:21 +00:00
# Allow enough time for this side to process the disconnect frame.
timeout = time.time() + 20
2023-04-27 19:43:56 +00:00
while ARQ.arq_state or tnc.data_queue_transmit.queue:
2022-05-31 23:39:12 +00:00
if time.time() > timeout:
log.error("station1", TIMEOUT=True)
break
time.sleep(0.5)
2023-04-27 19:43:56 +00:00
log.info("station1", arq_state=pformat(ARQ.arq_state))
2022-05-31 23:39:12 +00:00
# log.info("S1 DQT: ", DQ_Tx=pformat(modem.data_queue_transmit.queue))
# log.info("S1 DQR: ", DQ_Rx=pformat(modem.data_queue_received.queue))
2022-06-16 01:12:21 +00:00
log.info("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
2022-06-15 23:25:59 +00:00
assert '"arq":"transmission","status":"transmitting"' in str(
sock.SOCKET_QUEUE.queue
)
2022-06-06 22:18:52 +00:00
assert '"arq":"transmission","status":"transmitted"' in str(sock.SOCKET_QUEUE.queue)
2022-06-08 01:33:42 +00:00
assert '"arq":"transmission","status":"failed"' not in str(sock.SOCKET_QUEUE.queue)
assert '"percent":100' in str(sock.SOCKET_QUEUE.queue)
2022-06-15 23:25:59 +00:00
assert '"command_response":"disconnect","status":"OK"' in str(
sock.SOCKET_QUEUE.queue
)
2022-05-31 23:39:12 +00:00
log.error("station1: Exiting!")