diff --git a/test/test_cq.py b/test/test_cq.py new file mode 100644 index 00000000..1572fd25 --- /dev/null +++ b/test/test_cq.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Wed Dec 23 07:04:24 2020 + +@author: DJ2LS +""" + +import contextlib +import multiprocessing +import os +import sys +import threading +import time +import zlib + +import helpers +import log_handler +import pytest +import structlog + +try: + import test.util_cq_1 as util1 + import test.util_cq_2 as util2 +except ImportError: + import util_cq_1 as util1 + import util_cq_2 as util2 + + +STATIONS = ["AA2BB", "ZZ9YY"] + +PIPE_THREAD_RUNNING = True + + +def locate_data_with_crc(source_list: list, text: str, data: bytes, frametype: str): + """Try to locate data in source_list.""" + log = structlog.get_logger("locate_data_with_crc") + + if data in source_list: + with contextlib.suppress(): + data = zlib.decompress(data[2:]) + log.info(f"analyze_results: {text} no CRC", _frametype=frametype, data=data) + elif data + helpers.get_crc_8(data) in source_list: + with contextlib.suppress(zlib.error): + data = zlib.decompress(data[2:-1]) + log.info(f"analyze_results: {text} CRC 8", _frametype=frametype, data=data) + elif data + helpers.get_crc_16(data) in source_list: + with contextlib.suppress(zlib.error): + data = zlib.decompress(data[2:-2]) + log.info(f"analyze_results: {text} CRC16", _frametype=frametype, data=data) + elif data + helpers.get_crc_24(data) in source_list: + with contextlib.suppress(zlib.error): + data = zlib.decompress(data[2:-3]) + log.info(f"analyze_results: {text} CRC24", _frametype=frametype, data=data) + elif data + helpers.get_crc_32(data) in source_list: + with contextlib.suppress(zlib.error): + data = zlib.decompress(data[2:-4]) + log.info(f"analyze_results: {text} CRC32", _frametype=frametype, data=data) + else: + log.info( + f"analyze_results: {text} not received:", + _frame=frametype, + data=data, + ) + + +def analyze_results(station1: list, station2: list, call_list: list): + """Examine the information retrieved from the sub-processes.""" + # Data in these lists is either a series of bytes of received data, + # or a bytearray of transmitted data from the station. + log = structlog.get_logger("analyze_results") + + # Check that each station's transmitted data was received by the other. + for s1, s2, text in [ + (station1, station2, call_list[0]), + (station2, station1, call_list[1]), + ]: + for s1_item in s1: + if not isinstance(s1_item, list): + continue + data = bytes(s1_item[0]) + frametypeno = int.from_bytes(data[:1], "big") + # frametype = static.FRAME_TYPE(frametypeno).name + frametype = str(frametypeno) + s1_crc = helpers.decode_call(helpers.bytes_to_callsign(data[1:4])) + s2_crc = helpers.decode_call(helpers.bytes_to_callsign(data[2:5])) + log.info( + "analyze_results: callsign CRCs:", + tx_station=text, + s1_crc=s1_crc, + s2_crc=s2_crc, + ) + + locate_data_with_crc(s2, text, data, frametype) + + +@pytest.mark.flaky(reruns=2) +def test_cq(tmp_path): + log_handler.setup_logging(filename=tmp_path / "test_cq", level="INFO") + log = structlog.get_logger("test_cq") + + s1_data = [] + s2_data = [] + + def recv_data(buffer: list, pipe): + while PIPE_THREAD_RUNNING: + if pipe.poll(0.1): + buffer.append(pipe.recv()) + else: + time.sleep(0.1) + + def recv_from_pipes(s1_rx, s1_pipe, s2_rx, s2_pipe) -> list: + processes = [ + threading.Thread(target=recv_data, args=(s1_rx, s1_pipe)), + threading.Thread(target=recv_data, args=(s2_rx, s2_pipe)), + ] + for item in processes: + item.start() + + return processes + + # This sufficiently separates the two halves of the test. This is needed + # because both scripts change global state. They would conflict if running in + # the same process. + from_s1, s1_send = multiprocessing.Pipe() + from_s2, s2_send = multiprocessing.Pipe() + proc = [ + multiprocessing.Process( + target=util1.t_cq1, + args=( + s1_send, + STATIONS[0], + STATIONS[1], + True, # low bandwidth mode + tmp_path, + ), + daemon=True, + ), + multiprocessing.Process( + target=util2.t_cq2, + args=( + s2_send, + STATIONS[1], + STATIONS[0], + True, # low bandwidth mode + tmp_path, + ), + daemon=True, + ), + ] + + pipe_receivers = recv_from_pipes(s1_data, from_s1, s2_data, from_s2) + # log.debug("Creating ") + # print("Starting threads.") + for p_item in proc: + p_item.start() + + # This relies on each process exiting when its job is complete! + + # print("Waiting for threads to exit.") + for p_item in proc: + p_item.join() + + global PIPE_THREAD_RUNNING # pylint: disable=global-statement + PIPE_THREAD_RUNNING = False + for pipe_recv in pipe_receivers: + pipe_recv.join() + + for idx in range(2): + try: + os.unlink(tmp_path / f"hfchannel{idx+1}") + except FileNotFoundError as fnfe: + log.debug(f"Unlinking pipe: {fnfe}") + + for p_item in proc: + assert p_item.exitcode == 0 + p_item.close() + + analyze_results(s1_data, s2_data, STATIONS) + + +if __name__ == "__main__": + # Run pytest with the current script as the filename. + ecode = pytest.main(["-s", "-v", sys.argv[0]]) + if ecode == 0: + print("errors: 0") + else: + print(ecode) diff --git a/test/util_cq_1.py b/test/util_cq_1.py new file mode 100644 index 00000000..a9a6adee --- /dev/null +++ b/test/util_cq_1.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Wed Dec 23 07:04:24 2020 + +@author: DJ2LS +""" + +import json +import time +from pprint import pformat +from typing import Callable + +import data_handler +import helpers +import modem +import sock +import static +import structlog + + +def t_setup( + mycall: str, + dxcall: str, + lowbwmode: bool, + t_transmit, + t_process_data, + tmp_path, +): + # Disable data_handler testmode - This is required to test a conversation. + data_handler.TESTMODE = False + modem.RXCHANNEL = tmp_path / "hfchannel1" + modem.TESTMODE = True + modem.TXCHANNEL = tmp_path / "hfchannel2" + static.HAMLIB_RADIOCONTROL = "disabled" + static.LOW_BANDWIDTH_MODE = lowbwmode + static.MYGRID = bytes("AA12aa", "utf-8") + static.RESPOND_TO_CQ = True + static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + mycallsign = helpers.callsign_to_bytes(mycall) + mycallsign = helpers.bytes_to_callsign(mycallsign) + static.MYCALLSIGN = mycallsign + static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN) + + dxcallsign = helpers.callsign_to_bytes(dxcall) + dxcallsign = helpers.bytes_to_callsign(dxcallsign) + static.DXCALLSIGN = dxcallsign + static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) + + # Create the TNC + tnc = data_handler.DATA() + orig_rx_func = data_handler.DATA.process_data + data_handler.DATA.process_data = t_process_data + tnc.log = structlog.get_logger("station1_DATA") + # Limit the frame-ack timeout + tnc.time_list_low_bw = [3, 1, 1] + tnc.time_list_high_bw = [3, 1, 1] + tnc.time_list = [3, 1, 1] + # Limit number of retries + tnc.rx_n_max_retries_per_burst = 4 + + # Create the modem + t_modem = modem.RF() + orig_tx_func = modem.RF.transmit + modem.RF.transmit = t_transmit + t_modem.log = structlog.get_logger("station1_RF") + + return tnc, orig_rx_func, orig_tx_func + + +def t_cq1( + parent_pipe, + mycall: str, + dxcall: str, + lowbwmode: bool, + tmp_path, +): + log = structlog.get_logger("station1") + orig_tx_func: Callable + orig_rx_func: Callable + log.info("t_cq1:", TMP_PATH=tmp_path) + + def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray): + """'Wrap' RF.transmit function to extract the arguments.""" + nonlocal orig_tx_func, parent_pipe + + t_frames = frames + parent_pipe.send(t_frames) + # log.info("S1 TX: ", frames=t_frames) + for item in t_frames: + frametype = int.from_bytes(item[:1], "big") # type: ignore + log.info("S1 TX: ", TX=frametype) + + # Apologies for the Python "magic." "orig_func" is a pointer to the + # original function captured before this one was put in place. + orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore + + def t_process_data(self, bytes_out, freedv, bytes_per_frame: int): + """'Wrap' DATA.process_data function to extract the arguments.""" + nonlocal orig_rx_func, parent_pipe + + t_bytes_out = bytes(bytes_out) + parent_pipe.send(t_bytes_out) + log.debug( + "S1 RX: ", + bytes_out=t_bytes_out, + bytes_per_frame=bytes_per_frame, + ) + frametype = int.from_bytes(t_bytes_out[:1], "big") + log.info("S1 RX: ", RX=frametype) + + # Apologies for the Python "magic." "orig_func" is a pointer to the + # original function captured before this one was put in place. + orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore + + tnc, orig_rx_func, orig_tx_func = t_setup( + mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path + ) + + log.info("t_cq1:", RXCHANNEL=modem.RXCHANNEL) + log.info("t_cq1:", TXCHANNEL=modem.TXCHANNEL) + + # Construct message to start cq. + data = {"type": "command", "command": "cqcqcq"} + + sock.process_tnc_commands(json.dumps(data, indent=None)) + time.sleep(0.5) + sock.process_tnc_commands(json.dumps(data, indent=None)) + + timeout = time.time() + 5 + # Compare with the string conversion instead of repeatedly dumping + # the queue to an object for comparisons. + while '"cq":"transmitting"' not in str(sock.SOCKET_QUEUE.queue): + if time.time() > timeout: + log.warning("station1 TIMEOUT", first=True) + break + time.sleep(0.1) + log.info("station1, first", arq_state=pformat(static.ARQ_STATE)) + + data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall} + sock.process_tnc_commands(json.dumps(data, indent=None)) + time.sleep(0.5) + sock.process_tnc_commands(json.dumps(data, indent=None)) + + # Allow enough time for this side to process the disconnect frame. + timeout = time.time() + 5 + while static.ARQ_STATE or tnc.data_queue_transmit.queue: + if time.time() > timeout: + log.error("station1", TIMEOUT=True) + break + time.sleep(0.5) + log.info("station1", arq_state=pformat(static.ARQ_STATE)) + + # log.info("S1 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue)) + # log.info("S1 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue)) + # log.info("S1 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue)) + assert '"cq":"transmitting"' in str(sock.SOCKET_QUEUE.queue) + assert '"cq":"failed"' not in str(sock.SOCKET_QUEUE.queue) + assert '"command_response":"disconnect","status":"OK"' in str( + sock.SOCKET_QUEUE.queue + ) + log.error("station1: Exiting!") diff --git a/test/util_cq_2.py b/test/util_cq_2.py new file mode 100644 index 00000000..e4a18ed0 --- /dev/null +++ b/test/util_cq_2.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Wed Dec 23 07:04:24 2020 + +@author: DJ2LS +""" + +import time +from pprint import pformat +from typing import Callable + +import data_handler +import helpers +import modem +import sock +import static +import structlog + + +def t_setup( + mycall: str, + dxcall: str, + lowbwmode: bool, + t_transmit, + t_process_data, + tmp_path, +): + # Disable data_handler testmode - This is required to test a conversation. + data_handler.TESTMODE = False + modem.RXCHANNEL = tmp_path / "hfchannel2" + modem.TESTMODE = True + modem.TXCHANNEL = tmp_path / "hfchannel1" + static.HAMLIB_RADIOCONTROL = "disabled" + static.LOW_BANDWIDTH_MODE = lowbwmode + static.MYGRID = bytes("AA12aa", "utf-8") + static.RESPOND_TO_CQ = True + static.SSID_LIST = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + + mycallsign = helpers.callsign_to_bytes(mycall) + mycallsign = helpers.bytes_to_callsign(mycallsign) + static.MYCALLSIGN = mycallsign + static.MYCALLSIGN_CRC = helpers.get_crc_24(static.MYCALLSIGN) + + dxcallsign = helpers.callsign_to_bytes(dxcall) + dxcallsign = helpers.bytes_to_callsign(dxcallsign) + static.DXCALLSIGN = dxcallsign + static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN) + + # Create the TNC + tnc = data_handler.DATA() + orig_rx_func = data_handler.DATA.process_data + data_handler.DATA.process_data = t_process_data + tnc.log = structlog.get_logger("station2_DATA") + # Limit the frame-ack timeout + tnc.time_list_low_bw = [1, 1, 1] + tnc.time_list_high_bw = [1, 1, 1] + tnc.time_list = [1, 1, 1] + # Limit number of retries + tnc.rx_n_max_retries_per_burst = 4 + + # Create the modem + t_modem = modem.RF() + orig_tx_func = modem.RF.transmit + modem.RF.transmit = t_transmit + t_modem.log = structlog.get_logger("station2_RF") + + return tnc, orig_rx_func, orig_tx_func + + +def t_cq2( + parent_pipe, + mycall: str, + dxcall: str, + lowbwmode: bool, + tmp_path, +): + log = structlog.get_logger("station2") + orig_tx_func: Callable + orig_rx_func: Callable + log.info("t_cq2:", TMP_PATH=tmp_path) + + def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray): + """'Wrap' RF.transmit function to extract the arguments.""" + nonlocal orig_tx_func, parent_pipe + + t_frames = frames + parent_pipe.send(t_frames) + # log.info("S2 TX: ", frames=t_frames) + for item in t_frames: + frametype = int.from_bytes(item[:1], "big") # type: ignore + log.info("S2 TX: ", TX=frametype) + + # Apologies for the Python "magic." "orig_func" is a pointer to the + # original function captured before this one was put in place. + orig_tx_func(self, mode, repeats, repeat_delay, frames) # type: ignore + + def t_process_data(self, bytes_out, freedv, bytes_per_frame: int): + """'Wrap' DATA.process_data function to extract the arguments.""" + nonlocal orig_rx_func, parent_pipe + + t_bytes_out = bytes(bytes_out) + parent_pipe.send(t_bytes_out) + log.debug( + "S2 RX: ", + bytes_out=t_bytes_out, + bytes_per_frame=bytes_per_frame, + ) + frametype = int.from_bytes(t_bytes_out[:1], "big") + log.info("S2 RX: ", RX=frametype) + + # Apologies for the Python "magic." "orig_func" is a pointer to the + # original function captured before this one was put in place. + orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore + + tnc, orig_rx_func, orig_tx_func = t_setup( + mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path + ) + + log.info("t_cq2:", RXCHANNEL=modem.RXCHANNEL) + log.info("t_cq2:", TXCHANNEL=modem.TXCHANNEL) + + timeout = time.time() + 5 + # Compare with the string conversion instead of repeatedly dumping + # the queue to an object for comparisons. + while '"cq":"received"' not in str(sock.SOCKET_QUEUE.queue): + if time.time() > timeout: + log.warning("station2 TIMEOUT", first=True) + break + time.sleep(0.5) + log.info("station2, first", arq_state=pformat(static.ARQ_STATE)) + + # Allow enough time for this side to receive the disconnect frame. + timeout = time.time() + 6 + while '"arq":"session","status":"close"' not in str(sock.SOCKET_QUEUE.queue): + if time.time() > timeout: + log.error("station2", TIMEOUT=True) + break + time.sleep(0.5) + log.info("station2", arq_state=pformat(static.ARQ_STATE)) + + # log.info("S2 DQT: ", DQ_Tx=pformat(tnc.data_queue_transmit.queue)) + # log.info("S2 DQR: ", DQ_Rx=pformat(tnc.data_queue_received.queue)) + # log.info("S2 Socket: ", socket_queue=pformat(sock.SOCKET_QUEUE.queue)) + assert '"cq":"received"' in str(sock.SOCKET_QUEUE.queue) + assert '"qrv":"transmitting"' in str(sock.SOCKET_QUEUE.queue) + assert '"arq":"session","status":"close"' in str(sock.SOCKET_QUEUE.queue) + log.error("station2: Exiting!")