2022-05-21 23:04:17 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
"""
|
2022-06-19 13:55:50 +00:00
|
|
|
Receive-side station emulator for connect frame tests over a high quality simulated audio channel.
|
2022-05-21 23:04:17 +00:00
|
|
|
|
2022-06-19 13:55:50 +00:00
|
|
|
Near end-to-end test for sending / receiving connection control frames through the
|
|
|
|
TNC and modem and back through on the other station. Data injection initiates from the
|
|
|
|
queue used by the daemon process into and out of the TNC.
|
|
|
|
|
|
|
|
Invoked from test_tnc.py.
|
2022-06-19 14:04:46 +00:00
|
|
|
|
|
|
|
@author: N2KIQ
|
2022-05-21 23:04:17 +00:00
|
|
|
"""
|
|
|
|
|
2022-06-10 20:54:20 +00:00
|
|
|
import signal
|
2022-05-21 23:04:17 +00:00
|
|
|
import sys
|
|
|
|
import time
|
2022-06-10 20:54:20 +00:00
|
|
|
from typing import Callable
|
2022-05-21 23:04:17 +00:00
|
|
|
|
2022-06-07 00:26:41 +00:00
|
|
|
import structlog
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
sys.path.insert(0, "../tnc")
|
|
|
|
import data_handler
|
|
|
|
import helpers
|
|
|
|
import modem
|
2022-06-07 00:26:41 +00:00
|
|
|
import sock
|
2022-05-21 23:04:17 +00:00
|
|
|
import static
|
|
|
|
|
2022-06-10 20:54:20 +00:00
|
|
|
IRS_original_arq_cleanup: Callable
|
2022-05-21 23:04:17 +00:00
|
|
|
MESSAGE: str
|
|
|
|
|
2022-06-07 00:26:41 +00:00
|
|
|
log = structlog.get_logger("util_tnc_IRS")
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
def irs_arq_cleanup():
|
|
|
|
"""Replacement for modem.arq_cleanup to detect when to exit process."""
|
2022-06-07 00:26:41 +00:00
|
|
|
log.info(
|
|
|
|
"irs_arq_cleanup", socket_queue=sock.SOCKET_QUEUE.queue, message=MESSAGE.lower()
|
|
|
|
)
|
|
|
|
if '"arq":"transmission","status":"stopped"' in str(sock.SOCKET_QUEUE.queue):
|
|
|
|
# log.info("irs_arq_cleanup", socket_queue=sock.SOCKET_QUEUE.queue)
|
2022-05-21 23:04:17 +00:00
|
|
|
time.sleep(2)
|
2022-06-07 00:26:41 +00:00
|
|
|
if f'"{MESSAGE.lower()}":"receiving"' not in str(
|
|
|
|
sock.SOCKET_QUEUE.queue
|
|
|
|
) and f'"{MESSAGE.lower()}":"received"' not in str(sock.SOCKET_QUEUE.queue):
|
2022-05-21 23:04:17 +00:00
|
|
|
print(f"{MESSAGE} was not received.")
|
2022-06-07 00:26:41 +00:00
|
|
|
log.info("irs_arq_cleanup", socket_queue=sock.SOCKET_QUEUE.queue)
|
2022-06-10 20:54:20 +00:00
|
|
|
# sys.exit does not terminate threads, and os_exit doesn't allow coverage collection.
|
|
|
|
signal.raise_signal(signal.SIGKILL)
|
2022-05-21 23:04:17 +00:00
|
|
|
|
2022-06-10 20:54:20 +00:00
|
|
|
signal.raise_signal(signal.SIGTERM)
|
2022-05-21 23:04:17 +00:00
|
|
|
IRS_original_arq_cleanup()
|
|
|
|
|
|
|
|
|
|
|
|
def t_arq_irs(*args):
|
|
|
|
# pylint: disable=global-statement
|
|
|
|
global IRS_original_arq_cleanup, MESSAGE
|
|
|
|
|
|
|
|
MESSAGE = args[0]
|
2022-06-12 18:20:49 +00:00
|
|
|
tmp_path = args[1]
|
2022-05-21 23:04:17 +00:00
|
|
|
|
2022-06-15 23:24:47 +00:00
|
|
|
sock.log = structlog.get_logger("util_tnc_IRS_sock")
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
# enable testmode
|
|
|
|
data_handler.TESTMODE = True
|
2022-06-12 18:20:49 +00:00
|
|
|
modem.RXCHANNEL = tmp_path / "hfchannel2"
|
2022-05-21 23:04:17 +00:00
|
|
|
modem.TESTMODE = True
|
2022-06-12 18:20:49 +00:00
|
|
|
modem.TXCHANNEL = tmp_path / "hfchannel1"
|
2022-05-21 23:04:17 +00:00
|
|
|
static.HAMLIB_RADIOCONTROL = "disabled"
|
|
|
|
static.RESPOND_TO_CQ = True
|
2022-06-12 18:20:49 +00:00
|
|
|
log.info("t_arq_irs:", RXCHANNEL=modem.RXCHANNEL)
|
|
|
|
log.info("t_arq_irs:", TXCHANNEL=modem.TXCHANNEL)
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
mycallsign = bytes("DN2LS-2", "utf-8")
|
|
|
|
mycallsign = helpers.callsign_to_bytes(mycallsign)
|
|
|
|
static.MYCALLSIGN = helpers.bytes_to_callsign(mycallsign)
|
|
|
|
static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN)
|
|
|
|
static.MYGRID = bytes("AA12aa", "utf-8")
|
|
|
|
static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
|
|
|
|
|
|
|
# start data handler
|
|
|
|
tnc = data_handler.DATA()
|
2022-06-15 23:24:47 +00:00
|
|
|
tnc.log = structlog.get_logger("util_tnc_IRS_DATA")
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
# Inject a way to exit the TNC infinite loop
|
|
|
|
IRS_original_arq_cleanup = tnc.arq_cleanup
|
|
|
|
tnc.arq_cleanup = irs_arq_cleanup
|
|
|
|
|
|
|
|
# start modem
|
|
|
|
t_modem = modem.RF()
|
2022-06-15 23:24:47 +00:00
|
|
|
t_modem.log = structlog.get_logger("util_tnc_IRS_RF")
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
# Set timeout
|
2022-06-12 23:47:53 +00:00
|
|
|
timeout = time.time() + 15
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
while time.time() < timeout:
|
|
|
|
time.sleep(0.1)
|
|
|
|
|
2022-06-15 23:24:47 +00:00
|
|
|
log.warning("queue:", queue=sock.SOCKET_QUEUE.queue)
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
assert not "TIMEOUT!"
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
print("This cannot be run as an application.")
|
|
|
|
sys.exit(1)
|