Use pytest's tmp_dir fixture

Used for log file and FIFO files.
This commit is contained in:
Paul Kronenwetter 2022-06-12 14:20:49 -04:00
parent f2f9fce6a4
commit 7a3259eee2
6 changed files with 58 additions and 29 deletions

View file

@ -8,11 +8,11 @@ Created on Wed Dec 23 07:04:24 2020
import contextlib
import multiprocessing
import os
import sys
import threading
import time
import zlib
from pprint import pformat
import helpers
import log_handler
@ -26,7 +26,6 @@ except ImportError:
import util_chat_text_1 as util1
import util_chat_text_2 as util2
log_handler.setup_logging(filename="", level="INFO")
STATIONS = ["AA2BB", "ZZ9YY"]
@ -100,16 +99,15 @@ def analyze_results(station1: list, station2: list, call_list: list):
locate_data_with_crc(s2, text, data, frametype)
# log.info("Everything")
# log.info("S1:", s1=pformat(station1))
# log.info("S2:", s2=pformat(station2))
@pytest.mark.parametrize("freedv_mode", ["datac1", "datac3"])
@pytest.mark.parametrize("n_frames_per_burst", [1]) # Higher fpb is broken.
@pytest.mark.parametrize("message_no", range(len(messages)))
@pytest.mark.flaky(reruns=2)
def test_chat_text(freedv_mode: str, n_frames_per_burst: int, message_no: int):
def test_chat_text(
freedv_mode: str, n_frames_per_burst: int, message_no: int, tmp_path
):
log_handler.setup_logging(filename=tmp_path / "test_chat_text", level="INFO")
log = structlog.get_logger("test_chat_text")
s1_data = []
@ -148,6 +146,7 @@ def test_chat_text(freedv_mode: str, n_frames_per_burst: int, message_no: int):
STATIONS[1],
messages[message_no],
True, # low bandwidth mode
tmp_path,
),
daemon=True,
),
@ -161,6 +160,7 @@ def test_chat_text(freedv_mode: str, n_frames_per_burst: int, message_no: int):
STATIONS[0],
messages[message_no],
True, # low bandwidth mode
tmp_path,
),
daemon=True,
),
@ -183,6 +183,12 @@ def test_chat_text(freedv_mode: str, n_frames_per_burst: int, message_no: int):
for pipe_recv in pipe_receivers:
pipe_recv.join()
for idx in range(2):
try:
os.unlink(tmp_path / f"hfchannel{idx+1}")
except FileNotFoundError as fnfe:
log.debug(f"Unlinking pipe: {fnfe}")
for p_item in proc:
assert p_item.exitcode == 0
p_item.close()

View file

@ -6,7 +6,9 @@ import os
import sys
import time
import log_handler
import pytest
import structlog
try:
import test.util_tnc_IRS as irs
@ -21,17 +23,19 @@ except ImportError:
# This test is currently a little inconsistent.
@pytest.mark.flaky(reruns=2)
@pytest.mark.parametrize("command", ["CQ", "PING", "BEACON"])
def test_tnc(command):
def test_tnc(command, tmp_path):
log_handler.setup_logging(filename=tmp_path / "test_tnc", level="INFO")
log = structlog.get_logger("test_tnc")
iss_proc = multiprocessing.Process(target=iss.t_arq_iss, args=[command])
irs_proc = multiprocessing.Process(target=irs.t_arq_irs, args=[command])
# print("Starting threads.")
iss_proc = multiprocessing.Process(target=iss.t_arq_iss, args=[command, tmp_path])
irs_proc = multiprocessing.Process(target=irs.t_arq_irs, args=[command, tmp_path])
log.debug("Starting threads.")
iss_proc.start()
irs_proc.start()
time.sleep(12)
# print("Terminating threads.")
log.debug("Terminating threads.")
irs_proc.terminate()
iss_proc.terminate()
irs_proc.join()
@ -39,9 +43,9 @@ def test_tnc(command):
for idx in range(2):
try:
os.unlink(f"/tmp/hfchannel{idx+1}")
os.unlink(tmp_path / f"hfchannel{idx+1}")
except FileNotFoundError as fnfe:
print(f"Unlinking pipe: {fnfe}")
log.debug(f"Unlinking pipe: {fnfe}")
assert iss_proc.exitcode in [0, -15], f"Transmit side failed test. {iss_proc}"
assert irs_proc.exitcode in [0, -15], f"Receive side failed test. {irs_proc}"

View file

@ -10,6 +10,7 @@ import base64
import json
import time
from pprint import pformat
from typing import Callable
import codec2
import data_handler
@ -26,12 +27,13 @@ def t_setup(
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
modem.RXCHANNEL = "/tmp/hfchannel1"
modem.RXCHANNEL = tmp_path / "hfchannel1"
modem.TESTMODE = True
modem.TXCHANNEL = "/tmp/hfchannel2"
modem.TXCHANNEL = tmp_path / "hfchannel2"
static.HAMLIB_RADIOCONTROL = "disabled"
static.LOW_BANDWIDTH_MODE = lowbwmode
static.MYGRID = bytes("AA12aa", "utf-8")
@ -77,10 +79,12 @@ def t_highsnr_arq_short_station1(
dxcall: str,
message: str,
lowbwmode: bool,
tmp_path,
):
log = structlog.get_logger("station1")
orig_tx_func: object
orig_rx_func: object
orig_tx_func: Callable
orig_rx_func: Callable
log.info("t_highsnr_arq_short_station1:", TMP_PATH=tmp_path)
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
@ -116,9 +120,12 @@ def t_highsnr_arq_short_station1(
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
mycall, dxcall, lowbwmode, t_transmit, t_process_data
mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path
)
log.info("t_highsnr_arq_short_station1:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_highsnr_arq_short_station1:", TXCHANNEL=modem.TXCHANNEL)
# Construct message to dxstation.
b64_str = str(base64.b64encode(bytes(message, "UTF-8")), "UTF-8").strip()
data = {

View file

@ -8,6 +8,7 @@ Created on Wed Dec 23 07:04:24 2020
import time
from pprint import pformat
from typing import Callable
import data_handler
import helpers
@ -23,12 +24,13 @@ def t_setup(
lowbwmode: bool,
t_transmit,
t_process_data,
tmp_path,
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
modem.RXCHANNEL = "/tmp/hfchannel2"
modem.RXCHANNEL = tmp_path / "hfchannel2"
modem.TESTMODE = True
modem.TXCHANNEL = "/tmp/hfchannel1"
modem.TXCHANNEL = tmp_path / "hfchannel1"
static.HAMLIB_RADIOCONTROL = "disabled"
static.LOW_BANDWIDTH_MODE = lowbwmode
static.MYGRID = bytes("AA12aa", "utf-8")
@ -74,10 +76,12 @@ def t_highsnr_arq_short_station2(
dxcall: str,
message: str,
lowbwmode: bool,
tmp_path,
):
log = structlog.get_logger("station2")
orig_tx_func: object
orig_rx_func: object
orig_tx_func: Callable
orig_rx_func: Callable
log.info("t_highsnr_arq_short_station2:", TMP_PATH=tmp_path)
def t_transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray):
"""'Wrap' RF.transmit function to extract the arguments."""
@ -113,9 +117,12 @@ def t_highsnr_arq_short_station2(
orig_rx_func(self, bytes_out, freedv, bytes_per_frame) # type: ignore
tnc, orig_rx_func, orig_tx_func = t_setup(
mycall, dxcall, lowbwmode, t_transmit, t_process_data
mycall, dxcall, lowbwmode, t_transmit, t_process_data, tmp_path
)
log.info("t_highsnr_arq_short_station2:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_highsnr_arq_short_station2:", TXCHANNEL=modem.TXCHANNEL)
# # This transaction should take less than 14 sec.
# timeout = time.time() + 25
# # Compare with the string conversion instead of repeatedly dumping

View file

@ -6,7 +6,6 @@ Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import os
import signal
import sys
import time
@ -53,14 +52,17 @@ def t_arq_irs(*args):
global IRS_original_arq_cleanup, MESSAGE
MESSAGE = args[0]
tmp_path = args[1]
# enable testmode
data_handler.TESTMODE = True
modem.RXCHANNEL = tmp_path / "hfchannel2"
modem.TESTMODE = True
modem.RXCHANNEL = "/tmp/hfchannel2"
modem.TXCHANNEL = "/tmp/hfchannel1"
modem.TXCHANNEL = tmp_path / "hfchannel1"
static.HAMLIB_RADIOCONTROL = "disabled"
static.RESPOND_TO_CQ = True
log.info("t_arq_irs:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_arq_irs:", TXCHANNEL=modem.TXCHANNEL)
mycallsign = bytes("DN2LS-2", "utf-8")
mycallsign = helpers.callsign_to_bytes(mycallsign)

View file

@ -52,13 +52,16 @@ def t_arq_iss(*args):
global ISS_original_arq_cleanup, MESSAGE
MESSAGE = args[0]
tmp_path = args[1]
# enable testmode
data_handler.TESTMODE = True
modem.RXCHANNEL = tmp_path / "hfchannel1"
modem.TESTMODE = True
modem.RXCHANNEL = "/tmp/hfchannel1"
modem.TXCHANNEL = "/tmp/hfchannel2"
modem.TXCHANNEL = tmp_path / "hfchannel2"
static.HAMLIB_RADIOCONTROL = "disabled"
log.info("t_arq_iss:", RXCHANNEL=modem.RXCHANNEL)
log.info("t_arq_iss:", TXCHANNEL=modem.TXCHANNEL)
mycallsign = bytes("DJ2LS-2", "utf-8")
mycallsign = helpers.callsign_to_bytes(mycallsign)