adjusted tests

This commit is contained in:
DJ2LS 2024-03-19 13:01:25 +01:00
parent 5ed11d771f
commit 9da3fb80f0
3 changed files with 29 additions and 21 deletions

View file

@ -192,7 +192,7 @@ class P2PConnection:
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
return
def connected_iss(self):
def connected_iss(self, frame=None):
self.log("CONNECTED ISS...........................")
self.set_state(States.CONNECTED)
self.is_ISS = True

View file

@ -5,8 +5,6 @@ import select
from queue import Queue
from socket_interface_commands import SocketCommandHandler
# Shared queue for command and data handlers
data_queue = Queue()
class CommandSocket(socketserver.BaseRequestHandler):
#def __init__(self, request, client_address, server):

View file

@ -22,6 +22,7 @@ from data_frame_factory import DataFrameFactory
import codec2
import p2p_connection
from socket_interface_commands import SocketCommandHandler
class TestModem:
def __init__(self, event_q, state_q):
@ -63,6 +64,7 @@ class TestP2PConnectionSession(unittest.TestCase):
cls.frame_factory = DataFrameFactory(cls.config)
# ISS
cls.iss_config_manager = config_manager
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_manager = EventManager([queue.Queue()])
cls.iss_event_queue = queue.Queue()
@ -76,6 +78,9 @@ class TestP2PConnectionSession(unittest.TestCase):
cls.iss_state_manager,
cls.iss_modem)
#cls.iss_socket_interface_handler = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager)
#cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234)
# IRS
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_manager = EventManager([queue.Queue()])
@ -146,34 +151,39 @@ class TestP2PConnectionSession(unittest.TestCase):
self.loss_probability = 0
self.establishChannels()
params = {
'destination': "BB2BBB-2",
'origin': "AA1AAA-1",
}
cmd = P2PConnectionCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
session = cmd.run(self.iss_event_queue, self.iss_modem)
if session.session_id:
self.iss_state_manager.register_p2p_connection_session(session)
session.connect()
# Generate and add 5 random entries to the queue
for _ in range(5):
random_entry = self.generate_random_string(2, 11)
session.p2p_data_tx_queue.put(random_entry)
handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager)
handler.handle_connect(["AA1AAA-1", "BB2BBB-2"])
self.connected_event = threading.Event()
self.connected_event.wait()
for session_id in self.iss_state_manager.p2p_connection_sessions:
session = self.iss_state_manager.get_p2p_connection_session(session_id)
# Generate and add 5 random entries to the queue
for _ in range(5):
random_entry = self.generate_random_string(2, 11)
session.p2p_data_tx_queue.put(random_entry)
self.waitAndCloseChannels()
del cmd
class TestSocket:
def __init__(self, isCmd=True):
self.isCmd = isCmd
def __init__(self, test_class):
self.sent_data = [] # To capture data sent through this socket
self.received_data = b"" # To simulate data received by this socket
self.test_class = test_class
def sendall(self, data):
print(f"Mock sendall called with data: {data}")
self.sent_data.append(data)
self.event_handler(data)
def event_handler(self, data):
if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data:
self.test_class.connected_event.set()
if __name__ == '__main__':