mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
Update legacy TNC test
This commit is contained in:
parent
34cb6e2cb6
commit
55b1f8c2cb
|
@ -18,10 +18,8 @@ except ImportError:
|
|||
import util_tnc_ISS as iss
|
||||
|
||||
|
||||
# These do not update sock.SOCKET_QUEUE:
|
||||
# "CONNECT"
|
||||
# This test is currently a little inconsistent.
|
||||
@pytest.mark.parametrize("command", ["CQ", "PING", "BEACON"])
|
||||
@pytest.mark.parametrize("command", ["CONNECT"])
|
||||
@pytest.mark.flaky(reruns=2)
|
||||
def test_tnc(command, tmp_path):
|
||||
log_handler.setup_logging(filename=tmp_path / "test_tnc", level="INFO")
|
||||
|
|
|
@ -13,7 +13,6 @@ from typing import Callable
|
|||
|
||||
import structlog
|
||||
|
||||
sys.path.insert(0, "..")
|
||||
sys.path.insert(0, "../tnc")
|
||||
import data_handler
|
||||
import helpers
|
||||
|
@ -54,6 +53,8 @@ def t_arq_irs(*args):
|
|||
MESSAGE = args[0]
|
||||
tmp_path = args[1]
|
||||
|
||||
sock.log = structlog.get_logger("util_tnc_IRS_sock")
|
||||
|
||||
# enable testmode
|
||||
data_handler.TESTMODE = True
|
||||
modem.RXCHANNEL = tmp_path / "hfchannel2"
|
||||
|
@ -73,6 +74,7 @@ def t_arq_irs(*args):
|
|||
|
||||
# start data handler
|
||||
tnc = data_handler.DATA()
|
||||
tnc.log = structlog.get_logger("util_tnc_IRS_DATA")
|
||||
|
||||
# Inject a way to exit the TNC infinite loop
|
||||
IRS_original_arq_cleanup = tnc.arq_cleanup
|
||||
|
@ -80,6 +82,7 @@ def t_arq_irs(*args):
|
|||
|
||||
# start modem
|
||||
t_modem = modem.RF()
|
||||
t_modem.log = structlog.get_logger("util_tnc_IRS_RF")
|
||||
|
||||
# Set timeout
|
||||
timeout = time.time() + 15
|
||||
|
@ -87,6 +90,8 @@ def t_arq_irs(*args):
|
|||
while time.time() < timeout:
|
||||
time.sleep(0.1)
|
||||
|
||||
log.warning("queue:", queue=sock.SOCKET_QUEUE.queue)
|
||||
|
||||
assert not "TIMEOUT!"
|
||||
|
||||
|
||||
|
|
|
@ -54,6 +54,8 @@ def t_arq_iss(*args):
|
|||
MESSAGE = args[0]
|
||||
tmp_path = args[1]
|
||||
|
||||
sock.log = structlog.get_logger("util_tnc_ISS_sock")
|
||||
|
||||
# enable testmode
|
||||
data_handler.TESTMODE = True
|
||||
modem.RXCHANNEL = tmp_path / "hfchannel1"
|
||||
|
@ -80,6 +82,7 @@ def t_arq_iss(*args):
|
|||
|
||||
# start data handler
|
||||
tnc = data_handler.DATA()
|
||||
tnc.log = structlog.get_logger("util_tnc_ISS_DATA")
|
||||
|
||||
# Inject a way to exit the TNC infinite loop
|
||||
ISS_original_arq_cleanup = tnc.arq_cleanup
|
||||
|
@ -87,6 +90,7 @@ def t_arq_iss(*args):
|
|||
|
||||
# start modem
|
||||
t_modem = modem.RF()
|
||||
t_modem.log = structlog.get_logger("util_tnc_ISS_RF")
|
||||
|
||||
# mode = codec2.freedv_get_mode_value_by_name(FREEDV_MODE)
|
||||
# n_frames_per_burst = N_FRAMES_PER_BURST
|
||||
|
@ -104,25 +108,17 @@ def t_arq_iss(*args):
|
|||
# data_handler.DATA_QUEUE_TRANSMIT.put(['ARQ_RAW', bytes_out, 255, n_frames_per_burst, '123', b'DJ2LS-0'])
|
||||
|
||||
data = {}
|
||||
if MESSAGE in ["BEACON"]:
|
||||
data = {"type": "command", "command": "start_beacon", "parameter": 5}
|
||||
elif MESSAGE in ["CQ"]:
|
||||
data = {"type": "command", "command": "cqcqcq"}
|
||||
elif MESSAGE in ["CONNECT"]:
|
||||
if MESSAGE in ["CONNECT"]:
|
||||
data = {
|
||||
"type": "arq",
|
||||
"command": "connect",
|
||||
"dxcallsign": str(dxcallsign, encoding="UTF-8"),
|
||||
}
|
||||
elif MESSAGE in ["PING"]:
|
||||
data = {
|
||||
"type": "ping",
|
||||
"command": "ping",
|
||||
"dxcallsign": str(dxcallsign, encoding="UTF-8"),
|
||||
}
|
||||
else:
|
||||
assert not MESSAGE, f"{MESSAGE} not known to test."
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
sock.process_tnc_commands(json.dumps(data, indent=None))
|
||||
|
|
Loading…
Reference in a new issue