FreeDATA/test/test_modem.py
Paul Kronenwetter 5c6cee1c21
Enhance tests (#185)
* Initial attempt to create unit tests for DATA class (tnc).

* Completed initial set of tests.

* Adding pytest to install packages.

* Corrects issue #173 fix

I didn't look carefully enough at `helpers.check_callsign` to see that it returns a list. The first element of the list is `True`/`False`.

* Making check_callsign evaluation more consistent.

* Update .gitignore

this is more a test commit to see if GitHub Client for MacOS is working,

* mkfifo test prototype

First partially working prototype for testing the full tnc with mkfifo named pipes.

* single tnc test file

moved to a single file for running tnc tests

* fixed typo

* Added parameters to tests.

Make other minor tweaks and documentation.

* Clean up two existing tests.

Adapted both tests to pytest and maintained
compatibility with existing ctest method.
Tweaked CMakeLists.txt .

* Adding pure python  highsnr_stdio_P_P_multi test.

Intended to replace highsnr_stdio_P_P_multi which uses POSIX shell.

* Adding pure python  highsnr_stdio_P_P_datac0 test.

Intended to replace highsnr_stdio_P_P_datac0 which uses POSIX shell.

* Parameterize recent tests.

Renamed datac0 to datacx after including all data codecs in test.

* Parameterize mode as well.

Add ability to run tests from main directory as well as within test/.

* Add list of tests and brief descriptions.

* Add more native python tests conversions.

* Update README with new tests.

* Tweak README again.

* Rename test to be findable by pytest.

* Rename test for ctest.

* Update correct file this time.

* Minor test tweaks.

* Add modem test proof-of-concept.

* Adjustment to ARQ short test.

* Various refactorings.

Type hints, trailing backslash, range usage, etc.

* Ignore unknown arguments in argparse.

* Minor cleanups.

* Update test/README.md.

* Update test_pa to quiet pylint.

* Give up trying to suppress structlog output.

* Correct module comments.

* Remove excess trailing spaces.

* Remove excess newlines.

* Various refactorings.

Type hints, trailing backslash, range usage, etc.

* mkfifo test prototype

First partially working prototype for testing the full tnc with mkfifo named pipes.

* Update test_tnc and tweak IRS/ISS.

* Correct test_modem to detect failures.

* Trying to be less dependent on env variables.

* Add IRS/ISS tests to ctests

* Pin codec2 revision to v1.0.3.

* Correcting git mistake.

* Pin codec2 revision to master.

This should be a specific release, that implements freedv_set_tuning_range.

Co-authored-by: DJ2LS <75909252+DJ2LS@users.noreply.github.com>
2022-05-21 23:04:17 +00:00

161 lines
4.1 KiB
Python

"""
Tests for the FreeDATA modem.
"""
import multiprocessing
import sys
import time
import pytest
sys.path.insert(0, "..")
sys.path.insert(0, "../tnc")
import helpers
import modem
import static
def print_frame(data: bytearray):
"""
Pretty-print the provided frame.
:param data: Frame to be output
:type data: bytearray
"""
print(f"Type : {int(data[0])}")
print(f"DXCRC : {bytes(data[1:4])}")
print(f"CallCRC: {bytes(data[4:7])}")
print(f"Call : {helpers.bytes_to_callsign(data[7:13])}")
def t_create_frame(frame_type: int, mycall: str, dxcall: str) -> bytearray:
"""
Generate the requested frame.
:param frame_type: The numerical type of the desired frame.
:type frame_type: int
:param mycall: Callsign of the near station
:type mycall: str
:param dxcall: Callsign of the far station
:type dxcall: str
:return: Bytearray of the requested frame
:rtype: bytearray
"""
mycallsign_bytes = helpers.callsign_to_bytes(mycall)
mycallsign = helpers.bytes_to_callsign(mycallsign_bytes)
mycallsign_crc = helpers.get_crc_24(mycallsign)
dxcallsign_bytes = helpers.callsign_to_bytes(dxcall)
dxcallsign = helpers.bytes_to_callsign(dxcallsign_bytes)
dxcallsign_crc = helpers.get_crc_24(dxcallsign)
frame = bytearray(14)
frame[:1] = bytes([frame_type])
frame[1:4] = dxcallsign_crc
frame[4:7] = mycallsign_crc
frame[7:13] = mycallsign_bytes
return frame
def t_create_session_close(mycall: str, dxcall: str) -> bytearray:
"""
Generate the session_close frame.
:param mycall: Callsign of the near station
:type mycall: str
:param dxcall: Callsign of the far station
:type dxcall: str
:return: Bytearray of the requested frame
:rtype: bytearray
"""
return t_create_frame(223, mycall, dxcall)
def t_create_start_session(mycall: str, dxcall: str) -> bytearray:
"""
Generate the create_session frame.
:param mycall: Callsign of the near station
:type mycall: str
:param dxcall: Callsign of the far station
:type dxcall: str
:return: Bytearray of the requested frame
:rtype: bytearray
"""
return t_create_frame(221, mycall, dxcall)
def t_tsh_dummy():
"""Replacement function for transmit"""
print("In t_tsh_dummy")
def t_modem():
"""
Execute test to validate that receiving a session open frame sets the correct machine
state.
"""
t_mode = t_repeats = t_repeat_delay = 0
t_frames = []
# enable testmode
modem.TESTMODE = True
modem.RXCHANNEL = "/tmp/hfchannel1"
modem.TXCHANNEL = "/tmp/hfchannel2"
static.HAMLIB_RADIOCONTROL = "disabled"
def t_tx_dummy(mode, repeats, repeat_delay, frames):
"""Replacement function for transmit"""
print(f"t_tx_dummy: In transmit({mode}, {repeats}, {repeat_delay}, {frames})")
nonlocal t_mode, t_repeats, t_repeat_delay, t_frames
t_mode = mode
t_repeats = repeats
t_repeat_delay = repeat_delay
t_frames = frames[:]
static.TRANSMITTING = False
# Create the modem
local_modem = modem.RF()
# Replace transmit routine with our own, an effective No-Op.
local_modem.transmit = t_tx_dummy
txbuffer = t_create_start_session("AA9AA", "DC2EJ")
# Start the transmission
static.TRANSMITTING = True
modem.MODEM_TRANSMIT_QUEUE.put([14, 5, 250, txbuffer])
while static.TRANSMITTING:
time.sleep(0.1)
# Check that the contents were transferred correctly.
assert t_mode == 14
assert t_repeats == 5
assert t_repeat_delay == 250
assert t_frames == txbuffer
def test_modem_queue():
proc = multiprocessing.Process(target=t_modem, args=())
# print("Starting threads.")
proc.start()
time.sleep(0.5)
# print("Terminating threads.")
proc.terminate()
proc.join()
# print(f"\n{proc.exitcode=}")
assert proc.exitcode == 0
if __name__ == "__main__":
# Run pytest with the current script as the filename.
ecode = pytest.main(["-v", "-s", sys.argv[0]])
if ecode == 0:
print("errors: 0")
else:
print(ecode)