Test to validate processing of STOP frames.

This commit is contained in:
Paul Kronenwetter 2022-06-24 13:07:34 -04:00
parent eeab780086
commit 305daca626
4 changed files with 66 additions and 10 deletions

View file

@ -46,31 +46,37 @@ def parameters() -> dict:
# Construct message to start ping.
ping_data = {"type": "ping", "command": "ping", "dxcallsign": "ZZ9YY-0"}
connect_data = {"type": "arq", "command": "connect", "dxcallsign": "ZZ9YY-0"}
stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "ZZ9YY-0"}
beacon_timeout = 6
cq_timeout = 8
ping_timeout = 5
connect_timeout = 10
stop_timeout = 5
beacon_tx_check = '"beacon":"transmitting"'
cq_tx_check = '"qrv":"received"'
ping_tx_check = '"ping":"transmitting"'
connect_tx_check = '"session":"connecting"'
stop_tx_check = '"status":"stopped"'
beacon_rx_check = '"beacon":"received"'
cq_rx_check = '"cq":"received"'
ping_rx_check = '"ping":"received"'
connect_rx_check = '"connect":"received"'
stop_rx_check = '"status":"stopped"'
beacon_final_tx_check = [beacon_tx_check]
cq_final_tx_check = ['"cq":"transmitting"', cq_tx_check]
ping_final_tx_check = [ping_tx_check, '"ping":"acknowledge"']
connect_final_tx_check = ['"status":"connected"', '"connect":"acknowledge"']
stop_final_tx_check = [stop_tx_check]
beacon_final_rx_check = [beacon_rx_check]
cq_final_rx_check = [cq_rx_check, '"qrv":"transmitting"']
ping_final_rx_check = [ping_rx_check]
connect_final_rx_check = [connect_rx_check]
stop_final_rx_check = [stop_rx_check]
return {
"beacon": (
@ -105,6 +111,14 @@ def parameters() -> dict:
ping_final_tx_check,
ping_final_rx_check,
),
"stop": (
stop_data,
stop_timeout,
stop_tx_check,
stop_rx_check,
stop_final_tx_check,
stop_final_rx_check,
),
}
@ -179,6 +193,7 @@ def analyze_results(station1: list, station2: list, call_list: list):
pytest.param("ping", marks=pytest.mark.flaky(reruns=2)),
pytest.param("cq", marks=pytest.mark.flaky(reruns=20)),
# pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")),
pytest.param("stop", marks=pytest.mark.flaky(reruns=0)),
],
)
def test_datac0(frame_type: str, tmp_path):

View file

@ -36,26 +36,32 @@ def parameters() -> dict:
# Construct message to start ping.
ping_data = {"type": "ping", "command": "ping", "dxcallsign": ""}
connect_data = {"type": "arq", "command": "connect", "dxcallsign": ""}
stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "DD5GG-3"}
beacon_timeout = 1
ping_timeout = 1
connect_timeout = 1
stop_timeout = 1
beacon_tx_check = '"status":"Failed"'
ping_tx_check = '"ping","status":"Failed"'
connect_tx_check = '"status":"Failed"'
stop_tx_check = '"status":"stopped"'
beacon_rx_check = '"beacon":"received"'
ping_rx_check = '"ping":"received"'
connect_rx_check = '"connect":"received"'
stop_rx_check = '"status":"stopped"'
beacon_final_tx_check = [beacon_tx_check]
ping_final_tx_check = [ping_tx_check]
connect_final_tx_check = [connect_tx_check]
stop_final_tx_check = [stop_tx_check]
beacon_final_rx_check = [beacon_rx_check]
ping_final_rx_check = [ping_rx_check]
connect_final_rx_check = [connect_rx_check]
stop_final_rx_check = [stop_rx_check]
return {
"beacon": (
@ -82,6 +88,14 @@ def parameters() -> dict:
ping_final_tx_check,
ping_final_rx_check,
),
"stop": (
stop_data,
stop_timeout,
stop_tx_check,
stop_rx_check,
stop_final_tx_check,
stop_final_rx_check,
),
}
@ -148,8 +162,8 @@ def analyze_results(station1: list, station2: list, call_list: list):
# @pytest.mark.parametrize("frame_type", ["beacon", "connect", "ping"])
@pytest.mark.parametrize("frame_type", ["ping"])
def test_datac0(frame_type: str, tmp_path):
@pytest.mark.parametrize("frame_type", ["ping", "stop"])
def test_datac0_negative(frame_type: str, tmp_path):
log_handler.setup_logging(filename=tmp_path / "test_datac0", level="DEBUG")
log = structlog.get_logger("test_datac0")

View file

@ -23,6 +23,8 @@ import modem
import sock
import static
import structlog
from codec2 import FREEDV_MODE
from static import FRAME_TYPE as FR_TYPE
def t_setup(
@ -102,7 +104,7 @@ def t_datac0_1(
# log.info("S1 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S1 TX: ", TX=frametype)
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -120,7 +122,7 @@ def t_datac0_1(
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S1 RX: ", RX=frametype)
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -142,6 +144,10 @@ def t_datac0_1(
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
time.sleep(0.5)
if "stop" in data["command"]:
log.debug("t_datac0_1: STOP test, setting TNC state")
static.TNC_STATE = "BUSY"
static.ARQ_STATE = True
sock.process_tnc_commands(json.dumps(data, indent=None))
time.sleep(0.5)
sock.process_tnc_commands(json.dumps(data, indent=None))
@ -213,7 +219,7 @@ def t_datac0_2(
# log.info("S2 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S2 TX: ", TX=frametype)
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -231,7 +237,7 @@ def t_datac0_2(
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S2 RX: ", RX=frametype)
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.

View file

@ -17,6 +17,8 @@ import modem
import sock
import static
import structlog
from codec2 import FREEDV_MODE
from static import FRAME_TYPE as FR_TYPE
def t_setup(
@ -96,7 +98,7 @@ def t_datac0_1(
# log.info("S1 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S1 TX: ", TX=frametype)
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -114,7 +116,7 @@ def t_datac0_1(
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S1 RX: ", RX=frametype)
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -135,6 +137,18 @@ def t_datac0_1(
log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
orig_dxcall = static.DXCALLSIGN
if "stop" in data["command"]:
time.sleep(0.5)
log.debug(
"t_datac0_1: STOP test, setting TNC state",
mycall=static.MYCALLSIGN,
dxcall=static.DXCALLSIGN,
)
static.DXCALLSIGN = helpers.callsign_to_bytes(data["dxcallsign"])
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
static.TNC_STATE = "BUSY"
static.ARQ_STATE = True
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.process_tnc_commands(json.dumps(data, indent=None))
@ -152,6 +166,12 @@ def t_datac0_1(
time.sleep(0.1)
log.info("station1, first")
if "stop" in data["command"]:
time.sleep(0.5)
log.debug("STOP test, resetting DX callsign")
static.DXCALLSIGN = orig_dxcall
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
sock.process_tnc_commands(json.dumps(data, indent=None))
time.sleep(0.5)
@ -204,7 +224,7 @@ def t_datac0_2(
# log.info("S2 TX: ", frames=t_frames)
for item in t_frames:
frametype = int.from_bytes(item[:1], "big") # type: ignore
log.info("S2 TX: ", TX=frametype)
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -222,7 +242,7 @@ def t_datac0_2(
bytes_per_frame=bytes_per_frame,
)
frametype = int.from_bytes(t_bytes_out[:1], "big")
log.info("S2 RX: ", RX=frametype)
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
# Apologies for the Python "magic." "orig_func" is a pointer to the
# original function captured before this one was put in place.
@ -242,6 +262,7 @@ def t_datac0_2(
log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL)
log.info("t_datac0_2:", mycall=static.MYCALLSIGN)
if "cq" in data:
t_data = {"type": "arq", "command": "stop_transmission"}