updated ctests

This commit is contained in:
DJ2LS 2023-04-28 12:40:33 +02:00
parent 69749d30ae
commit 09cf659154
6 changed files with 53 additions and 52 deletions

View file

@ -48,11 +48,11 @@ def parameters() -> dict:
connect_data = {"type": "arq", "command": "connect", "dxcallsign": "ZZ9YY-0"}
stop_data = {"type": "arq", "command": "stop_transmission", "dxcallsign": "ZZ9YY-0"}
beacon_timeout = 6
cq_timeout = 8
ping_timeout = 5
connect_timeout = 10
stop_timeout = 5
beacon_timeout = 1
ping_timeout = 1
cq_timeout = 2
connect_timeout = 1
stop_timeout = 1
beacon_tx_check = '"beacon":"transmitting"'
cq_tx_check = '"qrv":"received"'
@ -192,8 +192,8 @@ def analyze_results(station1: list, station2: list, call_list: list):
pytest.param("beacon", marks=pytest.mark.flaky(reruns=2)),
pytest.param("ping", marks=pytest.mark.flaky(reruns=2)),
pytest.param("cq", marks=pytest.mark.flaky(reruns=20)),
# pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")),
pytest.param("stop", marks=pytest.mark.flaky(reruns=0)),
#pytest.param("cq", marks=pytest.mark.xfail(reason="Too unstable for CI")),
pytest.param("stop", marks=pytest.mark.flaky(reruns=2)),
],
)
def test_datac13(frame_type: str, tmp_path):

View file

@ -40,8 +40,8 @@ def parameters() -> dict:
beacon_timeout = 1
ping_timeout = 1
connect_timeout = 1
stop_timeout = 1
connect_timeout = 2
stop_timeout = 2
beacon_tx_check = '"status":"Failed"'
ping_tx_check = '"ping","status":"Failed"'
@ -162,7 +162,10 @@ def analyze_results(station1: list, station2: list, call_list: list):
# @pytest.mark.parametrize("frame_type", ["beacon", "connect", "ping"])
@pytest.mark.parametrize("frame_type", ["ping", "stop"])
@pytest.mark.parametrize("frame_type", [
"ping",
pytest.param("stop", marks=pytest.mark.flaky(reruns=10))
])
def test_datac13_negative(frame_type: str, tmp_path):
log_handler.setup_logging(filename=tmp_path / "test_datac13", level="DEBUG")
log = structlog.get_logger("test_datac13")

View file

@ -5,7 +5,7 @@ simulated audio channel.
Near end-to-end test for sending / receiving control frames through the TNC and modem
and back through on the other station. Data injection initiates from the queue used
by the daemon process into and out of the TNC.
by the daemon process into and out of the TNCParam.
Invoked from test_datac13.py.
@ -21,7 +21,8 @@ import data_handler
import helpers
import modem
import sock
from static import ARQ, AudioParam, Beacon, Channel, Daemon, HamlibParam, ModemParam, Station, Statistics, TCIParam, TNC, FRAME_TYPE as FR_TYPE
from static import ARQ, HamlibParam, ModemParam, Station, TNC as TNCParam
from static import FRAME_TYPE as FR_TYPE
import structlog
#from static import FRAME_TYPE as FR_TYPE
@ -47,9 +48,9 @@ def t_setup(
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / tx_channel
HamlibParam.hamlib_radiocontrol = "disabled"
TNC.low_bandwidth_mode = lowbwmode or True
TNCParam.low_bandwidth_mode = lowbwmode or True
Station.mygrid = bytes("AA12aa", "utf-8")
Station.respond_to_cq = True
TNCParam.respond_to_cq = True
Station.ssid_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
mycallsign = helpers.callsign_to_bytes(mycall)
@ -63,16 +64,18 @@ def t_setup(
Station.dxcallsign_crc = helpers.get_crc_24(Station.dxcallsign)
# Create the TNC
tnc = data_handler.DATA()
tnc_data_handler = data_handler.DATA()
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
tnc.log = structlog.get_logger(f"station{station}_DATA")
tnc_data_handler.log = structlog.get_logger(f"station{station}_DATA")
# Limit the frame-ack timeout
tnc.time_list_low_bw = [3, 1, 1]
tnc.time_list_high_bw = [3, 1, 1]
tnc.time_list = [3, 1, 1]
tnc_data_handler.time_list_low_bw = [3, 1, 1]
tnc_data_handler.time_list_high_bw = [3, 1, 1]
tnc_data_handler.time_list = [3, 1, 1]
# Limit number of retries
tnc.rx_n_max_retries_per_burst = 4
tnc_data_handler.rx_n_max_retries_per_burst = 4
#ModemParam.tx_delay = 500 # add additional delay time for passing test
# Create the modem
t_modem = modem.RF()
@ -80,7 +83,7 @@ def t_setup(
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger(f"station{station}_RF")
return tnc, orig_rx_func, orig_tx_func
return tnc_data_handler, orig_rx_func, orig_tx_func
def t_datac13_1(
@ -131,7 +134,7 @@ def t_datac13_1(
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
tnc_data_handler, orig_rx_func, orig_tx_func = t_setup(
1,
mycall,
dxcall,
@ -149,14 +152,14 @@ def t_datac13_1(
time.sleep(0.5)
if "stop" in data["command"]:
log.debug("t_datac13_1: STOP test, setting TNC state")
TNC.tnc_state = "BUSY"
TNCParam.tnc_state = "BUSY"
ARQ.arq_state = True
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(0.5)
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
timeout = time.time() + timeout_duration
while tx_check not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning(
@ -166,7 +169,7 @@ def t_datac13_1(
tx_check=tx_check,
)
break
time.sleep(0.1)
time.sleep(0.5)
log.info("station1, first")
# override ARQ SESSION STATE for allowing disconnect command
ARQ.arq_session_state = "connected"
@ -175,16 +178,16 @@ def t_datac13_1(
time.sleep(0.5)
# Allow enough time for this side to process the disconnect frame.
timeout = time.time() + timeout_duration
while tnc.data_queue_transmit.queue:
timeout = time.time() + timeout_duration
while tnc_data_handler.data_queue_transmit.queue:
if time.time() > timeout:
log.warning("station1", TIMEOUT=True, dq_tx=tnc.data_queue_transmit.queue)
log.warning("station1", TIMEOUT=True, dq_tx=tnc_data_handler.data_queue_transmit.queue)
break
time.sleep(0.5)
log.info("station1, final")
# log.info("S1 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
# log.info("S1 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
# log.info("S1 DQT: ", DQ_Tx=pformat(TNCParam.data_queue_transmit.queue))
# log.info("S1 DQR: ", DQ_Rx=pformat(TNCParam.data_queue_received.queue))
log.debug("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_tx_check:
@ -268,7 +271,7 @@ def t_datac13_2(
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(t_data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
timeout = time.time() + timeout_duration
# Compare with the string conversion instead of repeatedly dumping
# the queue to an object for comparisons.
while rx_check not in str(sock.SOCKET_QUEUE.queue):
@ -284,7 +287,7 @@ def t_datac13_2(
log.info("station2, first")
# Allow enough time for this side to receive the disconnect frame.
timeout = time.time() + timeout_duration
timeout = time.time() + timeout_duration
while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning("station2", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
@ -292,8 +295,8 @@ def t_datac13_2(
time.sleep(0.5)
log.info("station2, final")
# log.info("S2 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue))
# log.info("S2 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue))
# log.info("S2 DQT: ", DQ_Tx=pformat(TNCParam.data_queue_transmit.queue))
# log.info("S2 DQR: ", DQ_Rx=pformat(TNCParam.data_queue_received.queue))
log.debug("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue))
for item in final_rx_check:

View file

@ -43,7 +43,7 @@ def t_setup(
HamlibParam.hamlib_radiocontrol = "disabled"
TNC.low_bandwidth_mode = lowbwmode or True
Station.mygrid = bytes("AA12aa", "utf-8")
Station.respond_to_cq = True
TNC.respond_to_cq = True
Station.ssid_list = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
mycallsign_bytes = helpers.callsign_to_bytes(mycall)
@ -57,24 +57,24 @@ def t_setup(
Station.dxcallsign_crc = helpers.get_crc_24(dxcallsign)
# Create the TNC
tnc = data_handler.DATA()
tnc_data_handler = data_handler.DATA()
orig_rx_func = data_handler.DATA.process_data
data_handler.DATA.process_data = t_process_data
tnc.log = structlog.get_logger(f"station{station}_DATA")
tnc_data_handler.log = structlog.get_logger(f"station{station}_DATA")
# Limit the frame-ack timeout
tnc.time_list_low_bw = [3, 1, 1]
tnc.time_list_high_bw = [3, 1, 1]
tnc.time_list = [3, 1, 1]
tnc_data_handler.time_list_low_bw = [3, 1, 1]
tnc_data_handler.time_list_high_bw = [3, 1, 1]
tnc_data_handler.time_list = [3, 1, 1]
# Limit number of retries
tnc.rx_n_max_retries_per_burst = 4
tnc_data_handler.rx_n_max_retries_per_burst = 4
ModemParam.tx_delay = 500 # add additional delay time for passing test
# Create the modem
t_modem = modem.RF()
orig_tx_func = modem.RF.transmit
modem.RF.transmit = t_transmit
t_modem.log = structlog.get_logger(f"station{station}_RF")
return tnc, orig_rx_func, orig_tx_func
return tnc_data_handler, orig_rx_func, orig_tx_func
def t_datac13_1(
@ -125,7 +125,7 @@ def t_datac13_1(
# original function captured before this one was put in place.
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
tnc_data_handler, orig_rx_func, orig_tx_func = t_setup(
1,
mycall,
dxcall,
@ -182,9 +182,9 @@ def t_datac13_1(
# Allow enough time for this side to process the disconnect frame.
timeout = time.time() + timeout_duration
while tnc.data_queue_transmit.queue:
while tnc_data_handler.data_queue_transmit.queue:
if time.time() > timeout:
log.warning("station1", TIMEOUT=True, dq_tx=tnc.data_queue_transmit.queue)
log.warning("station1", TIMEOUT=True, dq_tx=tnc_data_handler.data_queue_transmit.queue)
break
time.sleep(0.5)
log.info("station1, final")
@ -292,9 +292,6 @@ def t_datac13_2(
# Allow enough time for this side to receive the disconnect frame.
timeout = time.time() + timeout_duration
while '"arq":"session", "status":"close"' not in str(sock.SOCKET_QUEUE.queue):
if time.time() > timeout:
log.warning("station2", TIMEOUT=True, queue=str(sock.SOCKET_QUEUE.queue))
break

View file

@ -100,8 +100,6 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
# we want to transmit scatter data only once to reduce network traffic
ModemParam.scatter = []
# we want to display INFO messages only once
#static.INFO = []
# self.request.sendall(sock_data)
threading.Event().wait(0.15)

View file

@ -113,7 +113,7 @@ class Station:
dxcallsign_crc: bytes = b"A"
mygrid: bytes = b""
dxgrid: bytes = b""
ssid_list = [] # ssid list we are responding to
ssid_list = [] # ssid list we are responding to
@dataclass