From 305daca6263b32aefcac88adf76c4bc4f2cad357 Mon Sep 17 00:00:00 2001 From: Paul Kronenwetter Date: Fri, 24 Jun 2022 13:07:34 -0400 Subject: [PATCH] Test to validate processing of STOP frames. --- test/test_datac0.py | 15 +++++++++++++++ test/test_datac0_negative.py | 18 ++++++++++++++++-- test/util_datac0.py | 14 ++++++++++---- test/util_datac0_negative.py | 29 +++++++++++++++++++++++++---- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/test/test_datac0.py b/test/test_datac0.py index 3761b298..4721ed5c 100644 --- a/test/test_datac0.py +++ b/test/test_datac0.py @@ -46,31 +46,37 @@ def parameters() -> dict: # Construct message to start ping. ping_data = {"type": "ping", "command": "ping", "dxcallsign": "ZZ9YY-0"} connect_data = {"type": "arq", "command": "connect", "dxcallsign": "ZZ9YY-0"} + stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "ZZ9YY-0"} beacon_timeout = 6 cq_timeout = 8 ping_timeout = 5 connect_timeout = 10 + stop_timeout = 5 beacon_tx_check = '"beacon":"transmitting"' cq_tx_check = '"qrv":"received"' ping_tx_check = '"ping":"transmitting"' connect_tx_check = '"session":"connecting"' + stop_tx_check = '"status":"stopped"' beacon_rx_check = '"beacon":"received"' cq_rx_check = '"cq":"received"' ping_rx_check = '"ping":"received"' connect_rx_check = '"connect":"received"' + stop_rx_check = '"status":"stopped"' beacon_final_tx_check = [beacon_tx_check] cq_final_tx_check = ['"cq":"transmitting"', cq_tx_check] ping_final_tx_check = [ping_tx_check, '"ping":"acknowledge"'] connect_final_tx_check = ['"status":"connected"', '"connect":"acknowledge"'] + stop_final_tx_check = [stop_tx_check] beacon_final_rx_check = [beacon_rx_check] cq_final_rx_check = [cq_rx_check, '"qrv":"transmitting"'] ping_final_rx_check = [ping_rx_check] connect_final_rx_check = [connect_rx_check] + stop_final_rx_check = [stop_rx_check] return { "beacon": ( @@ -105,6 +111,14 @@ def parameters() -> dict: ping_final_tx_check, ping_final_rx_check, ), + "stop": ( + stop_data, + stop_timeout, + stop_tx_check, + stop_rx_check, + stop_final_tx_check, + stop_final_rx_check, + ), } @@ -179,6 +193,7 @@ def analyze_results(station1: list, station2: list, call_list: list): pytest.param("ping", marks=pytest.mark.flaky(reruns=2)), pytest.param("cq", marks=pytest.mark.flaky(reruns=20)), # pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")), + pytest.param("stop", marks=pytest.mark.flaky(reruns=0)), ], ) def test_datac0(frame_type: str, tmp_path): diff --git a/test/test_datac0_negative.py b/test/test_datac0_negative.py index e24d59a6..c23a0a82 100644 --- a/test/test_datac0_negative.py +++ b/test/test_datac0_negative.py @@ -36,26 +36,32 @@ def parameters() -> dict: # Construct message to start ping. ping_data = {"type": "ping", "command": "ping", "dxcallsign": ""} connect_data = {"type": "arq", "command": "connect", "dxcallsign": ""} + stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "DD5GG-3"} beacon_timeout = 1 ping_timeout = 1 connect_timeout = 1 + stop_timeout = 1 beacon_tx_check = '"status":"Failed"' ping_tx_check = '"ping","status":"Failed"' connect_tx_check = '"status":"Failed"' + stop_tx_check = '"status":"stopped"' beacon_rx_check = '"beacon":"received"' ping_rx_check = '"ping":"received"' connect_rx_check = '"connect":"received"' + stop_rx_check = '"status":"stopped"' beacon_final_tx_check = [beacon_tx_check] ping_final_tx_check = [ping_tx_check] connect_final_tx_check = [connect_tx_check] + stop_final_tx_check = [stop_tx_check] beacon_final_rx_check = [beacon_rx_check] ping_final_rx_check = [ping_rx_check] connect_final_rx_check = [connect_rx_check] + stop_final_rx_check = [stop_rx_check] return { "beacon": ( @@ -82,6 +88,14 @@ def parameters() -> dict: ping_final_tx_check, ping_final_rx_check, ), + "stop": ( + stop_data, + stop_timeout, + stop_tx_check, + stop_rx_check, + stop_final_tx_check, + stop_final_rx_check, + ), } @@ -148,8 +162,8 @@ def analyze_results(station1: list, station2: list, call_list: list): # @pytest.mark.parametrize("frame_type", ["beacon", "connect", "ping"]) -@pytest.mark.parametrize("frame_type", ["ping"]) -def test_datac0(frame_type: str, tmp_path): +@pytest.mark.parametrize("frame_type", ["ping", "stop"]) +def test_datac0_negative(frame_type: str, tmp_path): log_handler.setup_logging(filename=tmp_path / "test_datac0", level="DEBUG") log = structlog.get_logger("test_datac0") diff --git a/test/util_datac0.py b/test/util_datac0.py index 7b2d60a6..9d0e5504 100644 --- a/test/util_datac0.py +++ b/test/util_datac0.py @@ -23,6 +23,8 @@ import modem import sock import static import structlog +from codec2 import FREEDV_MODE +from static import FRAME_TYPE as FR_TYPE def t_setup( @@ -102,7 +104,7 @@ def t_datac0_1( # log.info("S1 TX: ", frames=t_frames) for item in t_frames: frametype = int.from_bytes(item[:1], "big") # type: ignore - log.info("S1 TX: ", TX=frametype) + log.info("S1 TX: ", TX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -120,7 +122,7 @@ def t_datac0_1( bytes_per_frame=bytes_per_frame, ) frametype = int.from_bytes(t_bytes_out[:1], "big") - log.info("S1 RX: ", RX=frametype) + log.info("S1 RX: ", RX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -142,6 +144,10 @@ def t_datac0_1( log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL) time.sleep(0.5) + if "stop" in data["command"]: + log.debug("t_datac0_1: STOP test, setting TNC state") + static.TNC_STATE = "BUSY" + static.ARQ_STATE = True sock.process_tnc_commands(json.dumps(data, indent=None)) time.sleep(0.5) sock.process_tnc_commands(json.dumps(data, indent=None)) @@ -213,7 +219,7 @@ def t_datac0_2( # log.info("S2 TX: ", frames=t_frames) for item in t_frames: frametype = int.from_bytes(item[:1], "big") # type: ignore - log.info("S2 TX: ", TX=frametype) + log.info("S2 TX: ", TX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -231,7 +237,7 @@ def t_datac0_2( bytes_per_frame=bytes_per_frame, ) frametype = int.from_bytes(t_bytes_out[:1], "big") - log.info("S2 RX: ", RX=frametype) + log.info("S2 RX: ", RX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. diff --git a/test/util_datac0_negative.py b/test/util_datac0_negative.py index fec2196d..0099bef0 100644 --- a/test/util_datac0_negative.py +++ b/test/util_datac0_negative.py @@ -17,6 +17,8 @@ import modem import sock import static import structlog +from codec2 import FREEDV_MODE +from static import FRAME_TYPE as FR_TYPE def t_setup( @@ -96,7 +98,7 @@ def t_datac0_1( # log.info("S1 TX: ", frames=t_frames) for item in t_frames: frametype = int.from_bytes(item[:1], "big") # type: ignore - log.info("S1 TX: ", TX=frametype) + log.info("S1 TX: ", TX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -114,7 +116,7 @@ def t_datac0_1( bytes_per_frame=bytes_per_frame, ) frametype = int.from_bytes(t_bytes_out[:1], "big") - log.info("S1 RX: ", RX=frametype) + log.info("S1 RX: ", RX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -135,6 +137,18 @@ def t_datac0_1( log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL) log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL) + orig_dxcall = static.DXCALLSIGN + if "stop" in data["command"]: + time.sleep(0.5) + log.debug( + "t_datac0_1: STOP test, setting TNC state", + mycall=static.MYCALLSIGN, + dxcall=static.DXCALLSIGN, + ) + static.DXCALLSIGN = helpers.callsign_to_bytes(data["dxcallsign"]) + static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) + static.TNC_STATE = "BUSY" + static.ARQ_STATE = True sock.process_tnc_commands(json.dumps(data, indent=None)) sock.process_tnc_commands(json.dumps(data, indent=None)) @@ -152,6 +166,12 @@ def t_datac0_1( time.sleep(0.1) log.info("station1, first") + if "stop" in data["command"]: + time.sleep(0.5) + log.debug("STOP test, resetting DX callsign") + static.DXCALLSIGN = orig_dxcall + static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) + data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall} sock.process_tnc_commands(json.dumps(data, indent=None)) time.sleep(0.5) @@ -204,7 +224,7 @@ def t_datac0_2( # log.info("S2 TX: ", frames=t_frames) for item in t_frames: frametype = int.from_bytes(item[:1], "big") # type: ignore - log.info("S2 TX: ", TX=frametype) + log.info("S2 TX: ", TX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -222,7 +242,7 @@ def t_datac0_2( bytes_per_frame=bytes_per_frame, ) frametype = int.from_bytes(t_bytes_out[:1], "big") - log.info("S2 RX: ", RX=frametype) + log.info("S2 RX: ", RX=FR_TYPE(frametype).name) # Apologies for the Python "magic." "orig_func" is a pointer to the # original function captured before this one was put in place. @@ -242,6 +262,7 @@ def t_datac0_2( log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL) log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL) + log.info("t_datac0_2:", mycall=static.MYCALLSIGN) if "cq" in data: t_data = {"type": "arq", "command": "stop_transmission"}