mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
197 lines
7.7 KiB
Python
197 lines
7.7 KiB
Python
import sys
|
|
import time
|
|
|
|
sys.path.append('freedata-server')
|
|
|
|
import unittest
|
|
import unittest.mock
|
|
from config import CONFIG
|
|
import helpers
|
|
import queue
|
|
import threading
|
|
import base64
|
|
from command_p2p_connection import P2PConnectionCommand
|
|
from state_manager import StateManager
|
|
from frame_dispatcher import DISPATCHER
|
|
import random
|
|
import structlog
|
|
import numpy as np
|
|
from event_manager import EventManager
|
|
from state_manager import StateManager
|
|
from data_frame_factory import DataFrameFactory
|
|
import codec2
|
|
import p2p_connection
|
|
|
|
from socket_interface_commands import SocketCommandHandler
|
|
|
|
class TestModem:
|
|
def __init__(self, event_q, state_q):
|
|
self.data_queue_received = queue.Queue()
|
|
self.demodulator = unittest.mock.Mock()
|
|
self.event_manager = EventManager([event_q])
|
|
self.logger = structlog.get_logger('Modem')
|
|
self.states = StateManager(state_q)
|
|
|
|
def getFrameTransmissionTime(self, mode):
|
|
samples = 0
|
|
c2instance = codec2.open_instance(mode.value)
|
|
samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance)
|
|
samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance)
|
|
samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance)
|
|
time = samples / 8000
|
|
return time
|
|
|
|
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
|
|
# Simulate transmission time
|
|
tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT
|
|
self.logger.info(f"TX {tx_time} seconds...")
|
|
threading.Event().wait(tx_time)
|
|
|
|
transmission = {
|
|
'mode': mode,
|
|
'bytes': frames,
|
|
}
|
|
self.data_queue_received.put(transmission)
|
|
|
|
|
|
class TestP2PConnectionSession(unittest.TestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
config_manager = CONFIG('freedata-server/config.ini.example')
|
|
cls.config = config_manager.read()
|
|
cls.logger = structlog.get_logger("TESTS")
|
|
cls.frame_factory = DataFrameFactory(cls.config)
|
|
|
|
# ISS
|
|
cls.iss_config_manager = config_manager
|
|
cls.iss_state_manager = StateManager(queue.Queue())
|
|
cls.iss_event_manager = EventManager([queue.Queue()])
|
|
cls.iss_event_queue = queue.Queue()
|
|
cls.iss_state_queue = queue.Queue()
|
|
cls.iss_p2p_data_queue = queue.Queue()
|
|
|
|
|
|
cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue)
|
|
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
|
|
cls.iss_event_manager,
|
|
cls.iss_state_manager,
|
|
cls.iss_modem)
|
|
|
|
#cls.iss_socket_interface_handler = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager)
|
|
#cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234)
|
|
|
|
# IRS
|
|
cls.irs_state_manager = StateManager(queue.Queue())
|
|
cls.irs_event_manager = EventManager([queue.Queue()])
|
|
cls.irs_event_queue = queue.Queue()
|
|
cls.irs_state_queue = queue.Queue()
|
|
cls.irs_p2p_data_queue = queue.Queue()
|
|
cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue)
|
|
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
|
|
cls.irs_event_manager,
|
|
cls.irs_state_manager,
|
|
cls.irs_modem)
|
|
|
|
# Frame loss probability in %
|
|
cls.loss_probability = 30
|
|
|
|
cls.channels_running = True
|
|
|
|
cls.disconnect_received = False
|
|
|
|
def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER):
|
|
while self.channels_running:
|
|
# Transfer data between both parties
|
|
try:
|
|
transmission = modem_transmit_queue.get(timeout=1)
|
|
if random.randint(0, 100) < self.loss_probability:
|
|
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
|
|
continue
|
|
|
|
frame_bytes = transmission['bytes']
|
|
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
|
|
except queue.Empty:
|
|
continue
|
|
self.logger.info(f"[{threading.current_thread().name}] Channel closed.")
|
|
|
|
def waitForSession(self, q, outbound=False):
|
|
while True and self.channels_running:
|
|
ev = q.get()
|
|
print(ev)
|
|
if 'P2P_CONNECTION_DISCONNECT_ACK' in ev or self.disconnect_received:
|
|
self.logger.info(f"[{threading.current_thread().name}] session ended.")
|
|
break
|
|
|
|
def establishChannels(self):
|
|
self.channels_running = True
|
|
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
|
|
args=[self.iss_modem.data_queue_received,
|
|
self.irs_frame_dispatcher],
|
|
name="ISS to IRS channel")
|
|
self.iss_to_irs_channel.start()
|
|
|
|
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker,
|
|
args=[self.irs_modem.data_queue_received,
|
|
self.iss_frame_dispatcher],
|
|
name="IRS to ISS channel")
|
|
self.irs_to_iss_channel.start()
|
|
|
|
def waitAndCloseChannels(self):
|
|
self.waitForSession(self.iss_event_queue, True)
|
|
self.channels_running = False
|
|
self.waitForSession(self.irs_event_queue, False)
|
|
self.channels_running = False
|
|
|
|
def generate_random_string(self, min_length, max_length):
|
|
import string
|
|
length = random.randint(min_length, max_length)
|
|
return ''.join(random.choices(string.ascii_letters, k=length))#
|
|
|
|
def DisabledtestARQSessionSmallPayload(self):
|
|
# set Packet Error Rate (PER) / frame loss probability
|
|
self.loss_probability = 0
|
|
|
|
self.establishChannels()
|
|
|
|
handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager)
|
|
handler.handle_connect(["AA1AAA-1", "BB2BBB-2"])
|
|
|
|
self.connected_event = threading.Event()
|
|
self.connected_event.wait()
|
|
|
|
for session_id in self.iss_state_manager.p2p_connection_sessions:
|
|
session = self.iss_state_manager.get_p2p_connection_session(session_id)
|
|
session.ENTIRE_CONNECTION_TIMEOUT = 15
|
|
# Generate and add 5 random entries to the queue
|
|
for _ in range(3):
|
|
min_length = (30 * _ ) + 1
|
|
max_length = (30 * _ ) + 1
|
|
print(min_length)
|
|
print(max_length)
|
|
random_entry = self.generate_random_string(min_length, max_length)
|
|
session.p2p_data_tx_queue.put(random_entry)
|
|
|
|
session.p2p_data_tx_queue.put('12345')
|
|
self.waitAndCloseChannels()
|
|
|
|
|
|
class TestSocket:
|
|
def __init__(self, test_class):
|
|
self.sent_data = [] # To capture data sent through this socket
|
|
self.test_class = test_class
|
|
def sendall(self, data):
|
|
print(f"Mock sendall called with data: {data}")
|
|
self.sent_data.append(data)
|
|
self.event_handler(data)
|
|
|
|
def event_handler(self, data):
|
|
if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data:
|
|
self.test_class.connected_event.set()
|
|
|
|
if b'DISCONNECTED\r\n' in self.sent_data:
|
|
self.disconnect_received = True
|
|
self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n')
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|