mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
Merge pull request #227 from DJ2LS/issue_194_test
Test to validate processing of STOP frames.
This commit is contained in:
commit
316cbbda02
|
@ -46,31 +46,37 @@ def parameters() -> dict:
|
|||
# Construct message to start ping.
|
||||
ping_data = {"type": "ping", "command": "ping", "dxcallsign": "ZZ9YY-0"}
|
||||
connect_data = {"type": "arq", "command": "connect", "dxcallsign": "ZZ9YY-0"}
|
||||
stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "ZZ9YY-0"}
|
||||
|
||||
beacon_timeout = 6
|
||||
cq_timeout = 8
|
||||
ping_timeout = 5
|
||||
connect_timeout = 10
|
||||
stop_timeout = 5
|
||||
|
||||
beacon_tx_check = '"beacon":"transmitting"'
|
||||
cq_tx_check = '"qrv":"received"'
|
||||
ping_tx_check = '"ping":"transmitting"'
|
||||
connect_tx_check = '"session":"connecting"'
|
||||
stop_tx_check = '"status":"stopped"'
|
||||
|
||||
beacon_rx_check = '"beacon":"received"'
|
||||
cq_rx_check = '"cq":"received"'
|
||||
ping_rx_check = '"ping":"received"'
|
||||
connect_rx_check = '"connect":"received"'
|
||||
stop_rx_check = '"status":"stopped"'
|
||||
|
||||
beacon_final_tx_check = [beacon_tx_check]
|
||||
cq_final_tx_check = ['"cq":"transmitting"', cq_tx_check]
|
||||
ping_final_tx_check = [ping_tx_check, '"ping":"acknowledge"']
|
||||
connect_final_tx_check = ['"status":"connected"', '"connect":"acknowledge"']
|
||||
stop_final_tx_check = [stop_tx_check]
|
||||
|
||||
beacon_final_rx_check = [beacon_rx_check]
|
||||
cq_final_rx_check = [cq_rx_check, '"qrv":"transmitting"']
|
||||
ping_final_rx_check = [ping_rx_check]
|
||||
connect_final_rx_check = [connect_rx_check]
|
||||
stop_final_rx_check = [stop_rx_check]
|
||||
|
||||
return {
|
||||
"beacon": (
|
||||
|
@ -105,6 +111,14 @@ def parameters() -> dict:
|
|||
ping_final_tx_check,
|
||||
ping_final_rx_check,
|
||||
),
|
||||
"stop": (
|
||||
stop_data,
|
||||
stop_timeout,
|
||||
stop_tx_check,
|
||||
stop_rx_check,
|
||||
stop_final_tx_check,
|
||||
stop_final_rx_check,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
@ -179,6 +193,7 @@ def analyze_results(station1: list, station2: list, call_list: list):
|
|||
pytest.param("ping", marks=pytest.mark.flaky(reruns=2)),
|
||||
pytest.param("cq", marks=pytest.mark.flaky(reruns=20)),
|
||||
# pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")),
|
||||
pytest.param("stop", marks=pytest.mark.flaky(reruns=0)),
|
||||
],
|
||||
)
|
||||
def test_datac0(frame_type: str, tmp_path):
|
||||
|
|
|
@ -36,26 +36,32 @@ def parameters() -> dict:
|
|||
# Construct message to start ping.
|
||||
ping_data = {"type": "ping", "command": "ping", "dxcallsign": ""}
|
||||
connect_data = {"type": "arq", "command": "connect", "dxcallsign": ""}
|
||||
stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "DD5GG-3"}
|
||||
|
||||
beacon_timeout = 1
|
||||
ping_timeout = 1
|
||||
connect_timeout = 1
|
||||
stop_timeout = 1
|
||||
|
||||
beacon_tx_check = '"status":"Failed"'
|
||||
ping_tx_check = '"ping","status":"Failed"'
|
||||
connect_tx_check = '"status":"Failed"'
|
||||
stop_tx_check = '"status":"stopped"'
|
||||
|
||||
beacon_rx_check = '"beacon":"received"'
|
||||
ping_rx_check = '"ping":"received"'
|
||||
connect_rx_check = '"connect":"received"'
|
||||
stop_rx_check = '"status":"stopped"'
|
||||
|
||||
beacon_final_tx_check = [beacon_tx_check]
|
||||
ping_final_tx_check = [ping_tx_check]
|
||||
connect_final_tx_check = [connect_tx_check]
|
||||
stop_final_tx_check = [stop_tx_check]
|
||||
|
||||
beacon_final_rx_check = [beacon_rx_check]
|
||||
ping_final_rx_check = [ping_rx_check]
|
||||
connect_final_rx_check = [connect_rx_check]
|
||||
stop_final_rx_check = [stop_rx_check]
|
||||
|
||||
return {
|
||||
"beacon": (
|
||||
|
@ -82,6 +88,14 @@ def parameters() -> dict:
|
|||
ping_final_tx_check,
|
||||
ping_final_rx_check,
|
||||
),
|
||||
"stop": (
|
||||
stop_data,
|
||||
stop_timeout,
|
||||
stop_tx_check,
|
||||
stop_rx_check,
|
||||
stop_final_tx_check,
|
||||
stop_final_rx_check,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
@ -148,8 +162,8 @@ def analyze_results(station1: list, station2: list, call_list: list):
|
|||
|
||||
|
||||
# @pytest.mark.parametrize("frame_type", ["beacon", "connect", "ping"])
|
||||
@pytest.mark.parametrize("frame_type", ["ping"])
|
||||
def test_datac0(frame_type: str, tmp_path):
|
||||
@pytest.mark.parametrize("frame_type", ["ping", "stop"])
|
||||
def test_datac0_negative(frame_type: str, tmp_path):
|
||||
log_handler.setup_logging(filename=tmp_path / "test_datac0", level="DEBUG")
|
||||
log = structlog.get_logger("test_datac0")
|
||||
|
||||
|
|
|
@ -23,6 +23,8 @@ import modem
|
|||
import sock
|
||||
import static
|
||||
import structlog
|
||||
from codec2 import FREEDV_MODE
|
||||
from static import FRAME_TYPE as FR_TYPE
|
||||
|
||||
|
||||
def t_setup(
|
||||
|
@ -102,7 +104,7 @@ def t_datac0_1(
|
|||
# log.info("S1 TX: ", frames=t_frames)
|
||||
for item in t_frames:
|
||||
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
||||
log.info("S1 TX: ", TX=frametype)
|
||||
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -120,7 +122,7 @@ def t_datac0_1(
|
|||
bytes_per_frame=bytes_per_frame,
|
||||
)
|
||||
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
||||
log.info("S1 RX: ", RX=frametype)
|
||||
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -142,6 +144,10 @@ def t_datac0_1(
|
|||
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
|
||||
|
||||
time.sleep(0.5)
|
||||
if "stop" in data["command"]:
|
||||
log.debug("t_datac0_1: STOP test, setting TNC state")
|
||||
static.TNC_STATE = "BUSY"
|
||||
static.ARQ_STATE = True
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
time.sleep(0.5)
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
|
@ -213,7 +219,7 @@ def t_datac0_2(
|
|||
# log.info("S2 TX: ", frames=t_frames)
|
||||
for item in t_frames:
|
||||
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
||||
log.info("S2 TX: ", TX=frametype)
|
||||
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -231,7 +237,7 @@ def t_datac0_2(
|
|||
bytes_per_frame=bytes_per_frame,
|
||||
)
|
||||
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
||||
log.info("S2 RX: ", RX=frametype)
|
||||
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
|
|
@ -17,6 +17,8 @@ import modem
|
|||
import sock
|
||||
import static
|
||||
import structlog
|
||||
from codec2 import FREEDV_MODE
|
||||
from static import FRAME_TYPE as FR_TYPE
|
||||
|
||||
|
||||
def t_setup(
|
||||
|
@ -96,7 +98,7 @@ def t_datac0_1(
|
|||
# log.info("S1 TX: ", frames=t_frames)
|
||||
for item in t_frames:
|
||||
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
||||
log.info("S1 TX: ", TX=frametype)
|
||||
log.info("S1 TX: ", TX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -114,7 +116,7 @@ def t_datac0_1(
|
|||
bytes_per_frame=bytes_per_frame,
|
||||
)
|
||||
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
||||
log.info("S1 RX: ", RX=frametype)
|
||||
log.info("S1 RX: ", RX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -135,6 +137,18 @@ def t_datac0_1(
|
|||
log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL)
|
||||
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
|
||||
|
||||
orig_dxcall = static.DXCALLSIGN
|
||||
if "stop" in data["command"]:
|
||||
time.sleep(0.5)
|
||||
log.debug(
|
||||
"t_datac0_1: STOP test, setting TNC state",
|
||||
mycall=static.MYCALLSIGN,
|
||||
dxcall=static.DXCALLSIGN,
|
||||
)
|
||||
static.DXCALLSIGN = helpers.callsign_to_bytes(data["dxcallsign"])
|
||||
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
|
||||
static.TNC_STATE = "BUSY"
|
||||
static.ARQ_STATE = True
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
|
||||
|
@ -152,6 +166,12 @@ def t_datac0_1(
|
|||
time.sleep(0.1)
|
||||
log.info("station1, first")
|
||||
|
||||
if "stop" in data["command"]:
|
||||
time.sleep(0.5)
|
||||
log.debug("STOP test, resetting DX callsign")
|
||||
static.DXCALLSIGN = orig_dxcall
|
||||
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
|
||||
|
||||
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
time.sleep(0.5)
|
||||
|
@ -204,7 +224,7 @@ def t_datac0_2(
|
|||
# log.info("S2 TX: ", frames=t_frames)
|
||||
for item in t_frames:
|
||||
frametype = int.from_bytes(item[:1], "big") # type: ignore
|
||||
log.info("S2 TX: ", TX=frametype)
|
||||
log.info("S2 TX: ", TX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -222,7 +242,7 @@ def t_datac0_2(
|
|||
bytes_per_frame=bytes_per_frame,
|
||||
)
|
||||
frametype = int.from_bytes(t_bytes_out[:1], "big")
|
||||
log.info("S2 RX: ", RX=frametype)
|
||||
log.info("S2 RX: ", RX=FR_TYPE(frametype).name)
|
||||
|
||||
# Apologies for the Python "magic." "orig_func" is a pointer to the
|
||||
# original function captured before this one was put in place.
|
||||
|
@ -242,6 +262,7 @@ def t_datac0_2(
|
|||
|
||||
log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL)
|
||||
log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL)
|
||||
log.info("t_datac0_2:", mycall=static.MYCALLSIGN)
|
||||
|
||||
if "cq" in data:
|
||||
t_data = {"type": "arq", "command": "stop_transmission"}
|
||||
|
|
Loading…
Reference in a new issue