mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
235 lines
9.4 KiB
Python
235 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Test small (single-frame) and large (multi-frame) messages over a high quality
|
|
simulated audio channel.
|
|
|
|
Near end-to-end test for sending / receiving data through the TNC and modem
|
|
and back through on the other station. Data injection initiates from the
|
|
queue used by the daemon process into and out of the TNC. Tests both low- and
|
|
high-bandwidth data frames (datac3 and datac1 respectively) from Codec2.
|
|
|
|
Can be invoked from CMake, pytest, coverage or directly.
|
|
|
|
Uses util_chat_test_[12].py in separate processes to perform the data transfer.
|
|
|
|
@author: N2KIQ
|
|
"""
|
|
|
|
import contextlib
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
import threading
|
|
import time
|
|
import zlib
|
|
|
|
import helpers
|
|
import log_handler
|
|
import pytest
|
|
import structlog
|
|
|
|
try:
|
|
import test.util_chat_text_1 as util1
|
|
import test.util_chat_text_2 as util2
|
|
except ImportError:
|
|
import util_chat_text_1 as util1
|
|
import util_chat_text_2 as util2
|
|
|
|
|
|
STATIONS = ["AA2BB", "ZZ9YY"]
|
|
|
|
bytes_out = b'{"dt":"f","fn":"zeit.txt","ft":"text\\/plain","d":"data:text\\/plain;base64,MyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5CgMyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5Cg=MyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5CgMyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5CgMyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5Cg=","crc":"123123123"}'
|
|
|
|
messages = [
|
|
"This is a test chat...",
|
|
"This is a much longer message, hopefully longer than each of the datac1 and "
|
|
"datac3 frames available to use in this modem. This should be long enough, "
|
|
"but to err on the side of completeness this will string on for many more "
|
|
"words before coming to the long awaited conclusion. We are not at the "
|
|
"concluding point just yet because there is still more space to be taken up "
|
|
"in the datac3 frame. Perhaps now would be a good place to terminate this test "
|
|
"message, but perhaps not because we need a few more bytes. Here then we stop. "
|
|
"This compresses so well that I need more data, even more stuff than is already "
|
|
"here and included in the unreadable diatribe below, or is it a soliloquy? "
|
|
"MyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obm"
|
|
"UgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5"
|
|
"NjY5NzY1NTU4Nzc4MjA5CgMyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0"
|
|
"MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG"
|
|
"9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5Cg=MyBtb2Rlcywgb2huZSBjbGFzcwowL"
|
|
"jAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODk"
|
|
"xMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5CgMyBtb2R"
|
|
"lcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjIgbW9kZXMsIG9obmUgY2xhc"
|
|
"3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY2xhc3MKMC4wMDA5NjY5NzY"
|
|
"1NTU4Nzc4MjA5CgMyBtb2Rlcywgb2huZSBjbGFzcwowLjAwMDk2OTQ4MTE4MDk5MTg0MTcKCjI"
|
|
"gbW9kZXMsIG9obmUgY2xhc3MKMC4wMDA5NjY1NDUxODkxMjI1Mzk0CgoxIG1vZGUsIG9obmUgY"
|
|
"2xhc3MKMC4wMDA5NjY5NzY1NTU4Nzc4MjA5Cg=",
|
|
]
|
|
PIPE_THREAD_RUNNING = True
|
|
|
|
|
|
def locate_data_with_crc(source_list: list, text: str, data: bytes, frametype: str):
|
|
"""Try to locate data in source_list."""
|
|
log = structlog.get_logger("locate_data_with_crc")
|
|
|
|
if data in source_list:
|
|
with contextlib.suppress():
|
|
data = zlib.decompress(data[2:])
|
|
log.info(f"analyze_results: {text} no CRC", _frametype=frametype, data=data)
|
|
elif data + helpers.get_crc_8(data) in source_list:
|
|
with contextlib.suppress(zlib.error):
|
|
data = zlib.decompress(data[2:-1])
|
|
log.info(f"analyze_results: {text} CRC 8", _frametype=frametype, data=data)
|
|
elif data + helpers.get_crc_16(data) in source_list:
|
|
with contextlib.suppress(zlib.error):
|
|
data = zlib.decompress(data[2:-2])
|
|
log.info(f"analyze_results: {text} CRC16", _frametype=frametype, data=data)
|
|
elif data + helpers.get_crc_24(data) in source_list:
|
|
with contextlib.suppress(zlib.error):
|
|
data = zlib.decompress(data[2:-3])
|
|
log.info(f"analyze_results: {text} CRC24", _frametype=frametype, data=data)
|
|
elif data + helpers.get_crc_32(data) in source_list:
|
|
with contextlib.suppress(zlib.error):
|
|
data = zlib.decompress(data[2:-4])
|
|
log.info(f"analyze_results: {text} CRC32", _frametype=frametype, data=data)
|
|
else:
|
|
log.info(
|
|
f"analyze_results: {text} not received:",
|
|
_frame=frametype,
|
|
data=data,
|
|
)
|
|
|
|
|
|
def analyze_results(station1: list, station2: list, call_list: list):
|
|
"""Examine the information retrieved from the sub-processes."""
|
|
# Data in these lists is either a series of bytes of received data,
|
|
# or a bytearray of transmitted data from the station.
|
|
log = structlog.get_logger("analyze_results")
|
|
|
|
# Check that each station's transmitted data was received by the other.
|
|
for s1, s2, text in [
|
|
(station1, station2, call_list[0]),
|
|
(station2, station1, call_list[1]),
|
|
]:
|
|
for s1_item in s1:
|
|
if not isinstance(s1_item, list):
|
|
continue
|
|
data = bytes(s1_item[0])
|
|
frametypeno = int.from_bytes(data[:1], "big")
|
|
# frametype = static.FRAME_TYPE(frametypeno).name
|
|
frametype = str(frametypeno)
|
|
s1_crc = helpers.decode_call(helpers.bytes_to_callsign(data[1:4]))
|
|
s2_crc = helpers.decode_call(helpers.bytes_to_callsign(data[2:5]))
|
|
log.info(
|
|
"analyze_results: callsign CRCs:",
|
|
tx_station=text,
|
|
s1_crc=s1_crc,
|
|
s2_crc=s2_crc,
|
|
)
|
|
|
|
locate_data_with_crc(s2, text, data, frametype)
|
|
|
|
|
|
@pytest.mark.parametrize("freedv_mode", ["datac1", "datac3"])
|
|
@pytest.mark.parametrize("n_frames_per_burst", [1]) # Higher fpb is broken.
|
|
@pytest.mark.parametrize("message_no", range(len(messages)))
|
|
@pytest.mark.flaky(reruns=2)
|
|
def test_chat_text(
|
|
freedv_mode: str, n_frames_per_burst: int, message_no: int, tmp_path
|
|
):
|
|
log_handler.setup_logging(filename=tmp_path / "test_chat_text", level="INFO")
|
|
log = structlog.get_logger("test_chat_text")
|
|
|
|
s1_data = []
|
|
s2_data = []
|
|
|
|
def recv_data(buffer: list, pipe):
|
|
while PIPE_THREAD_RUNNING:
|
|
if pipe.poll(0.1):
|
|
buffer.append(pipe.recv())
|
|
else:
|
|
time.sleep(0.1)
|
|
|
|
def recv_from_pipes(s1_rx, s1_pipe, s2_rx, s2_pipe) -> list:
|
|
processes = [
|
|
threading.Thread(target=recv_data, args=(s1_rx, s1_pipe)),
|
|
threading.Thread(target=recv_data, args=(s2_rx, s2_pipe)),
|
|
]
|
|
for item in processes:
|
|
item.start()
|
|
|
|
return processes
|
|
|
|
# This sufficiently separates the two halves of the test. This is needed
|
|
# because both scripts change global state. They would conflict if running in
|
|
# the same process.
|
|
from_s1, s1_send = multiprocessing.Pipe()
|
|
from_s2, s2_send = multiprocessing.Pipe()
|
|
proc = [
|
|
multiprocessing.Process(
|
|
target=util1.t_highsnr_arq_short_station1,
|
|
args=(
|
|
s1_send,
|
|
freedv_mode,
|
|
n_frames_per_burst,
|
|
STATIONS[0],
|
|
STATIONS[1],
|
|
messages[message_no],
|
|
freedv_mode == "datac3", # == low bandwidth mode
|
|
tmp_path,
|
|
),
|
|
daemon=True,
|
|
),
|
|
multiprocessing.Process(
|
|
target=util2.t_highsnr_arq_short_station2,
|
|
args=(
|
|
s2_send,
|
|
freedv_mode,
|
|
n_frames_per_burst,
|
|
STATIONS[1],
|
|
STATIONS[0],
|
|
messages[message_no],
|
|
freedv_mode == "datac3", # == low bandwidth mode
|
|
tmp_path,
|
|
),
|
|
daemon=True,
|
|
),
|
|
]
|
|
|
|
pipe_receivers = recv_from_pipes(s1_data, from_s1, s2_data, from_s2)
|
|
# log.debug("Creating ")
|
|
# print("Starting threads.")
|
|
for p_item in proc:
|
|
p_item.start()
|
|
|
|
# This relies on each process exiting when its job is complete!
|
|
|
|
# print("Waiting for threads to exit.")
|
|
for p_item in proc:
|
|
p_item.join()
|
|
|
|
global PIPE_THREAD_RUNNING # pylint: disable=global-statement
|
|
PIPE_THREAD_RUNNING = False
|
|
for pipe_recv in pipe_receivers:
|
|
pipe_recv.join()
|
|
|
|
for idx in range(2):
|
|
try:
|
|
os.unlink(tmp_path / f"hfchannel{idx+1}")
|
|
except FileNotFoundError as fnfe:
|
|
log.debug(f"Unlinking pipe: {fnfe}")
|
|
|
|
for p_item in proc:
|
|
assert p_item.exitcode == 0
|
|
p_item.close()
|
|
|
|
analyze_results(s1_data, s2_data, STATIONS)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run pytest with the current script as the filename.
|
|
ecode = pytest.main(["-s", "-v", sys.argv[0]])
|
|
if ecode == 0:
|
|
print("errors: 0")
|
|
else:
|
|
print(ecode)
|