mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
5c6cee1c21
* Initial attempt to create unit tests for DATA class (tnc). * Completed initial set of tests. * Adding pytest to install packages. * Corrects issue #173 fix I didn't look carefully enough at `helpers.check_callsign` to see that it returns a list. The first element of the list is `True`/`False`. * Making check_callsign evaluation more consistent. * Update .gitignore this is more a test commit to see if GitHub Client for MacOS is working, * mkfifo test prototype First partially working prototype for testing the full tnc with mkfifo named pipes. * single tnc test file moved to a single file for running tnc tests * fixed typo * Added parameters to tests. Make other minor tweaks and documentation. * Clean up two existing tests. Adapted both tests to pytest and maintained compatibility with existing ctest method. Tweaked CMakeLists.txt . * Adding pure python highsnr_stdio_P_P_multi test. Intended to replace highsnr_stdio_P_P_multi which uses POSIX shell. * Adding pure python highsnr_stdio_P_P_datac0 test. Intended to replace highsnr_stdio_P_P_datac0 which uses POSIX shell. * Parameterize recent tests. Renamed datac0 to datacx after including all data codecs in test. * Parameterize mode as well. Add ability to run tests from main directory as well as within test/. * Add list of tests and brief descriptions. * Add more native python tests conversions. * Update README with new tests. * Tweak README again. * Rename test to be findable by pytest. * Rename test for ctest. * Update correct file this time. * Minor test tweaks. * Add modem test proof-of-concept. * Adjustment to ARQ short test. * Various refactorings. Type hints, trailing backslash, range usage, etc. * Ignore unknown arguments in argparse. * Minor cleanups. * Update test/README.md. * Update test_pa to quiet pylint. * Give up trying to suppress structlog output. * Correct module comments. * Remove excess trailing spaces. * Remove excess newlines. * Various refactorings. Type hints, trailing backslash, range usage, etc. * mkfifo test prototype First partially working prototype for testing the full tnc with mkfifo named pipes. * Update test_tnc and tweak IRS/ISS. * Correct test_modem to detect failures. * Trying to be less dependent on env variables. * Add IRS/ISS tests to ctests * Pin codec2 revision to v1.0.3. * Correcting git mistake. * Pin codec2 revision to master. This should be a specific release, that implements freedv_set_tuning_range. Co-authored-by: DJ2LS <75909252+DJ2LS@users.noreply.github.com>
160 lines
4.1 KiB
Python
160 lines
4.1 KiB
Python
"""
|
|
Tests for the FreeDATA modem.
|
|
"""
|
|
|
|
import multiprocessing
|
|
import sys
|
|
import time
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, "..")
|
|
sys.path.insert(0, "../tnc")
|
|
import helpers
|
|
import modem
|
|
import static
|
|
|
|
|
|
def print_frame(data: bytearray):
|
|
"""
|
|
Pretty-print the provided frame.
|
|
|
|
:param data: Frame to be output
|
|
:type data: bytearray
|
|
"""
|
|
print(f"Type : {int(data[0])}")
|
|
print(f"DXCRC : {bytes(data[1:4])}")
|
|
print(f"CallCRC: {bytes(data[4:7])}")
|
|
print(f"Call : {helpers.bytes_to_callsign(data[7:13])}")
|
|
|
|
|
|
def t_create_frame(frame_type: int, mycall: str, dxcall: str) -> bytearray:
|
|
"""
|
|
Generate the requested frame.
|
|
|
|
:param frame_type: The numerical type of the desired frame.
|
|
:type frame_type: int
|
|
:param mycall: Callsign of the near station
|
|
:type mycall: str
|
|
:param dxcall: Callsign of the far station
|
|
:type dxcall: str
|
|
:return: Bytearray of the requested frame
|
|
:rtype: bytearray
|
|
"""
|
|
mycallsign_bytes = helpers.callsign_to_bytes(mycall)
|
|
mycallsign = helpers.bytes_to_callsign(mycallsign_bytes)
|
|
mycallsign_crc = helpers.get_crc_24(mycallsign)
|
|
|
|
dxcallsign_bytes = helpers.callsign_to_bytes(dxcall)
|
|
dxcallsign = helpers.bytes_to_callsign(dxcallsign_bytes)
|
|
dxcallsign_crc = helpers.get_crc_24(dxcallsign)
|
|
|
|
frame = bytearray(14)
|
|
frame[:1] = bytes([frame_type])
|
|
frame[1:4] = dxcallsign_crc
|
|
frame[4:7] = mycallsign_crc
|
|
frame[7:13] = mycallsign_bytes
|
|
|
|
return frame
|
|
|
|
|
|
def t_create_session_close(mycall: str, dxcall: str) -> bytearray:
|
|
"""
|
|
Generate the session_close frame.
|
|
|
|
:param mycall: Callsign of the near station
|
|
:type mycall: str
|
|
:param dxcall: Callsign of the far station
|
|
:type dxcall: str
|
|
:return: Bytearray of the requested frame
|
|
:rtype: bytearray
|
|
"""
|
|
return t_create_frame(223, mycall, dxcall)
|
|
|
|
|
|
def t_create_start_session(mycall: str, dxcall: str) -> bytearray:
|
|
"""
|
|
Generate the create_session frame.
|
|
|
|
:param mycall: Callsign of the near station
|
|
:type mycall: str
|
|
:param dxcall: Callsign of the far station
|
|
:type dxcall: str
|
|
:return: Bytearray of the requested frame
|
|
:rtype: bytearray
|
|
"""
|
|
return t_create_frame(221, mycall, dxcall)
|
|
|
|
|
|
def t_tsh_dummy():
|
|
"""Replacement function for transmit"""
|
|
print("In t_tsh_dummy")
|
|
|
|
|
|
def t_modem():
|
|
"""
|
|
Execute test to validate that receiving a session open frame sets the correct machine
|
|
state.
|
|
"""
|
|
t_mode = t_repeats = t_repeat_delay = 0
|
|
t_frames = []
|
|
|
|
# enable testmode
|
|
modem.TESTMODE = True
|
|
modem.RXCHANNEL = "/tmp/hfchannel1"
|
|
modem.TXCHANNEL = "/tmp/hfchannel2"
|
|
static.HAMLIB_RADIOCONTROL = "disabled"
|
|
|
|
def t_tx_dummy(mode, repeats, repeat_delay, frames):
|
|
"""Replacement function for transmit"""
|
|
print(f"t_tx_dummy: In transmit({mode}, {repeats}, {repeat_delay}, {frames})")
|
|
nonlocal t_mode, t_repeats, t_repeat_delay, t_frames
|
|
t_mode = mode
|
|
t_repeats = repeats
|
|
t_repeat_delay = repeat_delay
|
|
t_frames = frames[:]
|
|
static.TRANSMITTING = False
|
|
|
|
# Create the modem
|
|
local_modem = modem.RF()
|
|
|
|
# Replace transmit routine with our own, an effective No-Op.
|
|
local_modem.transmit = t_tx_dummy
|
|
|
|
txbuffer = t_create_start_session("AA9AA", "DC2EJ")
|
|
|
|
# Start the transmission
|
|
static.TRANSMITTING = True
|
|
modem.MODEM_TRANSMIT_QUEUE.put([14, 5, 250, txbuffer])
|
|
while static.TRANSMITTING:
|
|
time.sleep(0.1)
|
|
|
|
# Check that the contents were transferred correctly.
|
|
assert t_mode == 14
|
|
assert t_repeats == 5
|
|
assert t_repeat_delay == 250
|
|
assert t_frames == txbuffer
|
|
|
|
|
|
def test_modem_queue():
|
|
proc = multiprocessing.Process(target=t_modem, args=())
|
|
# print("Starting threads.")
|
|
proc.start()
|
|
|
|
time.sleep(0.5)
|
|
|
|
# print("Terminating threads.")
|
|
proc.terminate()
|
|
proc.join()
|
|
|
|
# print(f"\n{proc.exitcode=}")
|
|
assert proc.exitcode == 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run pytest with the current script as the filename.
|
|
ecode = pytest.main(["-v", "-s", sys.argv[0]])
|
|
if ecode == 0:
|
|
print("errors: 0")
|
|
else:
|
|
print(ecode)
|