mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
Improve tests slightly.
Change parameterization method, more retries for CQ. Fix pingtest criteria. Adjust CQ test criteria.
This commit is contained in:
parent
73688299fd
commit
fc1b838e8d
2 changed files with 37 additions and 18 deletions
|
@ -40,12 +40,13 @@ def parameters() -> dict:
|
|||
connect_data = {"type": "arq", "command": "connect", "dxcallsign": "ZZ9YY-0"}
|
||||
|
||||
beacon_timeout = 6
|
||||
cq_timeout = 10
|
||||
ping_timeout = 10
|
||||
cq_timeout = 8
|
||||
ping_timeout = 5
|
||||
connect_timeout = 10
|
||||
|
||||
beacon_tx_check = '"beacon":"transmitted"'
|
||||
cq_tx_check = '"cq":"transmitting"'
|
||||
beacon_tx_check = '"beacon":"transmitting"'
|
||||
# cq_tx_check = '"cq":"transmitting"'
|
||||
cq_tx_check = '"qrv":"received"'
|
||||
ping_tx_check = '"ping":"transmitting"'
|
||||
connect_tx_check = '"session":"connecting"'
|
||||
|
||||
|
@ -54,15 +55,19 @@ def parameters() -> dict:
|
|||
ping_rx_check = '"ping":"received"'
|
||||
connect_rx_check = '"connect":"received"'
|
||||
|
||||
beacon_final_tx_check = ['"beacon":"transmitting"']
|
||||
cq_final_tx_check = ['"cq":"transmitting"']
|
||||
ping_final_tx_check = ['"ping":"transmitting"', '"ping":"acknowledge"']
|
||||
beacon_final_tx_check = [beacon_tx_check]
|
||||
# cq_final_tx_check = [cq_tx_check, '"qrv":"received"']
|
||||
cq_final_tx_check = ['"cq":"transmitting"', cq_tx_check]
|
||||
# , '"qrv":"received"'] # <- Adding this to "cq" exacerbates the FIFO "delay"
|
||||
# because the TX side needs to receive something. With this, the
|
||||
# test fails more often than not. Oddly it works well for ping.
|
||||
ping_final_tx_check = [ping_tx_check, '"ping":"acknowledge"']
|
||||
connect_final_tx_check = ['"status":"connected"', '"connect":"acknowledge"']
|
||||
|
||||
beacon_final_rx_check = ['"beacon":"received"']
|
||||
cq_final_rx_check = ['"cq":"received"', '"qrv":"transmitting"']
|
||||
ping_final_rx_check = ['"ping":"received"']
|
||||
connect_final_rx_check = ['"connect":"received"']
|
||||
beacon_final_rx_check = [beacon_rx_check]
|
||||
cq_final_rx_check = [cq_rx_check, '"qrv":"transmitting"']
|
||||
ping_final_rx_check = [ping_rx_check]
|
||||
connect_final_rx_check = [connect_rx_check]
|
||||
|
||||
return {
|
||||
"beacon": (
|
||||
|
@ -162,9 +167,18 @@ def analyze_results(station1: list, station2: list, call_list: list):
|
|||
locate_data_with_crc(s2, text, data, frametype)
|
||||
|
||||
|
||||
# frame_type "connect" doesn't work 2022-Jun-16.
|
||||
@pytest.mark.parametrize("frame_type", ["beacon", "cq", "ping"]) # , "connect"])
|
||||
@pytest.mark.flaky(reruns=2)
|
||||
# frame_type "connect" doesn't work 2022-Jun-16. Missing / incomplete SOCKET_QUEUE data.
|
||||
# frame_type "cq" is overly flaky. Can't get the timing right / FIFO not delivering data.
|
||||
@pytest.mark.parametrize(
|
||||
"frame_type",
|
||||
[
|
||||
pytest.param("beacon", marks=pytest.mark.flaky(reruns=2)),
|
||||
pytest.param("ping", marks=pytest.mark.flaky(reruns=2)),
|
||||
pytest.param("cq", marks=pytest.mark.flaky(reruns=20)),
|
||||
# pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")),
|
||||
],
|
||||
)
|
||||
# @pytest.mark.flaky(reruns=2)
|
||||
def test_datac0(frame_type: str, tmp_path):
|
||||
log_handler.setup_logging(filename=tmp_path / "test_datac0", level="DEBUG")
|
||||
log = structlog.get_logger("test_datac0")
|
||||
|
|
|
@ -135,12 +135,13 @@ def t_datac0_1(
|
|||
log.info("t_datac0_1:", RXCHANNEL=modem.RXCHANNEL)
|
||||
log.info("t_datac0_1:", TXCHANNEL=modem.TXCHANNEL)
|
||||
|
||||
time.sleep(0.5)
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
time.sleep(0.5)
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
|
||||
# Assure the test completes.
|
||||
timeout = time.time() + 10 # 25
|
||||
timeout = time.time() + timeout_duration
|
||||
while tx_check not in str(sock.SOCKET_QUEUE.queue):
|
||||
if time.time() > timeout:
|
||||
log.warning(
|
||||
|
@ -156,13 +157,12 @@ def t_datac0_1(
|
|||
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
time.sleep(0.5)
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
|
||||
# Allow enough time for this side to process the disconnect frame.
|
||||
timeout = time.time() + timeout_duration
|
||||
while tnc.data_queue_transmit.queue:
|
||||
if time.time() > timeout:
|
||||
log.warning("station1", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
|
||||
log.warning("station1", TIMEOUT=True, dq_tx=tnc.data_queue_transmit.queue)
|
||||
break
|
||||
time.sleep(0.5)
|
||||
log.info("station1, final")
|
||||
|
@ -196,7 +196,7 @@ def t_datac0_2(
|
|||
log.debug("t_datac0_2:", TMP_PATH=tmp_path)
|
||||
|
||||
# Unpack tuple
|
||||
_, timeout_duration, _, rx_check, _, final_rx_check = config
|
||||
data, timeout_duration, _, rx_check, _, final_rx_check = config
|
||||
|
||||
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
|
||||
"""'Wrap' RF.transmit function to extract the arguments."""
|
||||
|
@ -246,6 +246,11 @@ def t_datac0_2(
|
|||
log.info("t_datac0_2:", RXCHANNEL=modem.RXCHANNEL)
|
||||
log.info("t_datac0_2:", TXCHANNEL=modem.TXCHANNEL)
|
||||
|
||||
if "cq" in data:
|
||||
t_data = {"type": "arq", "command": "stop_transmission"}
|
||||
sock.process_tnc_commands(json.dumps(t_data, indent=None))
|
||||
sock.process_tnc_commands(json.dumps(t_data, indent=None))
|
||||
|
||||
# Assure the test completes.
|
||||
timeout = time.time() + timeout_duration
|
||||
# Compare with the string conversion instead of repeatedly dumping
|
||||
|
|
Loading…
Reference in a new issue