mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
Adapt TNC States test to use the session ID
This commit is contained in:
parent
616fb214d2
commit
4b37ea4d67
|
@ -15,6 +15,7 @@ Uses util_datac0.py in separate process to perform the data transfer.
|
|||
"""
|
||||
|
||||
import multiprocessing
|
||||
from random import randbytes
|
||||
import sys
|
||||
import time
|
||||
|
||||
|
@ -62,16 +63,23 @@ def t_create_frame(frame_type: int, mycall: str, dxcall: str) -> bytearray:
|
|||
dxcallsign = helpers.bytes_to_callsign(dxcallsign_bytes)
|
||||
dxcallsign_crc = helpers.get_crc_24(dxcallsign)
|
||||
|
||||
# frame = bytearray(14)
|
||||
# frame[:1] = bytes([frame_type])
|
||||
# frame[1:4] = dxcallsign_crc
|
||||
# frame[4:7] = mycallsign_crc
|
||||
# frame[7:13] = mycallsign_bytes
|
||||
session_id = randbytes(1)
|
||||
frame = bytearray(14)
|
||||
frame[:1] = bytes([frame_type])
|
||||
frame[1:4] = dxcallsign_crc
|
||||
frame[4:7] = mycallsign_crc
|
||||
frame[7:13] = mycallsign_bytes
|
||||
frame[1:2] = session_id
|
||||
frame[2:5] = dxcallsign_crc
|
||||
frame[5:8] = mycallsign_crc
|
||||
frame[8:14] = mycallsign_bytes
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
def t_create_session_close(mycall: str, dxcall: str) -> bytearray:
|
||||
def t_create_session_close_old(mycall: str, dxcall: str) -> bytearray:
|
||||
"""
|
||||
Generate the session_close frame.
|
||||
|
||||
|
@ -85,6 +93,23 @@ def t_create_session_close(mycall: str, dxcall: str) -> bytearray:
|
|||
return t_create_frame(223, mycall, dxcall)
|
||||
|
||||
|
||||
def t_create_session_close(session_id: bytes) -> bytearray:
|
||||
"""
|
||||
Generate the session_close frame.
|
||||
|
||||
:param session_id: Session to close
|
||||
:type mycall: int
|
||||
:return: Bytearray of the requested frame
|
||||
:rtype: bytearray
|
||||
"""
|
||||
# return t_create_frame(223, mycall, dxcall)
|
||||
frame = bytearray(14)
|
||||
frame[:1] = bytes([223])
|
||||
frame[1:2] = session_id
|
||||
|
||||
return frame
|
||||
|
||||
|
||||
def t_create_start_session(mycall: str, dxcall: str) -> bytearray:
|
||||
"""
|
||||
Generate the create_session frame.
|
||||
|
@ -150,18 +175,24 @@ def t_foreign_disconnect(mycall: str, dxcall: str):
|
|||
assert static.ARQ_SESSION_STATE == "connecting"
|
||||
|
||||
# Set up a frame from a non-associated station.
|
||||
foreigncall_bytes = helpers.callsign_to_bytes("ZZ0ZZ-0")
|
||||
foreigncall = helpers.bytes_to_callsign(foreigncall_bytes)
|
||||
# foreigncall_bytes = helpers.callsign_to_bytes("ZZ0ZZ-0")
|
||||
# foreigncall = helpers.bytes_to_callsign(foreigncall_bytes)
|
||||
|
||||
close_frame = t_create_session_close("ZZ0ZZ-0", "ZZ0ZZ-0")
|
||||
# close_frame = t_create_session_close_old("ZZ0ZZ-0", "ZZ0ZZ-0")
|
||||
open_session = create_frame[1:2]
|
||||
wrong_session = randbytes(1)
|
||||
while wrong_session == open_session:
|
||||
wrong_session = randbytes(1)
|
||||
close_frame = t_create_session_close(wrong_session)
|
||||
print_frame(close_frame)
|
||||
assert (
|
||||
helpers.check_callsign(static.DXCALLSIGN, bytes(close_frame[4:7]))[0] is False
|
||||
), f"{helpers.get_crc_24(static.DXCALLSIGN)} == {bytes(close_frame[4:7])} but should be not equal."
|
||||
|
||||
assert (
|
||||
helpers.check_callsign(foreigncall, bytes(close_frame[4:7]))[0] is True
|
||||
), f"{helpers.get_crc_24(foreigncall)} != {bytes(close_frame[4:7])} but should be equal."
|
||||
# assert (
|
||||
# helpers.check_callsign(static.DXCALLSIGN, bytes(close_frame[4:7]))[0] is False
|
||||
# ), f"{helpers.get_crc_24(static.DXCALLSIGN)} == {bytes(close_frame[4:7])} but should be not equal."
|
||||
|
||||
# assert (
|
||||
# helpers.check_callsign(foreigncall, bytes(close_frame[4:7]))[0] is True
|
||||
# ), f"{helpers.get_crc_24(foreigncall)} != {bytes(close_frame[4:7])} but should be equal."
|
||||
|
||||
# Send the non-associated session close frame to the TNC
|
||||
tnc.received_session_close(close_frame)
|
||||
|
@ -221,7 +252,9 @@ def t_valid_disconnect(mycall: str, dxcall: str):
|
|||
assert static.ARQ_SESSION_STATE == "connecting"
|
||||
|
||||
# Create packet to be 'received' by this station.
|
||||
close_frame = t_create_session_close(mycall=dxcall, dxcall=mycall)
|
||||
# close_frame = t_create_session_close_old(mycall=dxcall, dxcall=mycall)
|
||||
open_session = create_frame[1:2]
|
||||
close_frame = t_create_session_close(open_session)
|
||||
print_frame(close_frame)
|
||||
tnc.received_session_close(close_frame)
|
||||
|
||||
|
@ -241,7 +274,7 @@ def t_valid_disconnect(mycall: str, dxcall: str):
|
|||
|
||||
@pytest.mark.parametrize("mycall", ["AA1AA-2", "DE2DE-0", "E4AWQ-4"])
|
||||
@pytest.mark.parametrize("dxcall", ["AA9AA-1", "DE2ED-0", "F6QWE-3"])
|
||||
@pytest.mark.flaky(reruns=2)
|
||||
# @pytest.mark.flaky(reruns=2)
|
||||
def test_foreign_disconnect(mycall: str, dxcall: str):
|
||||
proc = multiprocessing.Process(target=t_foreign_disconnect, args=(mycall, dxcall))
|
||||
# print("Starting threads.")
|
||||
|
|
Loading…
Reference in a new issue