FreeDATA/tests/test_arq_session.py

109 lines
3.9 KiB
Python
Raw Normal View History

2023-12-05 14:40:04 +00:00
import sys
sys.path.append('modem')
import unittest
import unittest.mock
2023-12-05 14:40:04 +00:00
from config import CONFIG
import helpers
import queue
import threading
import base64
from command_arq_raw import ARQRawCommand
from state_manager import StateManager
from frame_dispatcher import DISPATCHER
import random
import structlog
import numpy as np
2023-12-19 14:01:08 +00:00
from event_manager import EventManager
2023-12-05 14:40:04 +00:00
class TestModem:
def __init__(self):
self.data_queue_received = queue.Queue()
self.demodulator = unittest.mock.Mock()
2023-12-19 14:01:08 +00:00
self.event_manager = EventManager([queue.Queue()])
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
self.data_queue_received.put(frames)
2023-12-05 14:40:04 +00:00
class TestARQSession(unittest.TestCase):
@classmethod
def setUpClass(cls):
config_manager = CONFIG('modem/config.ini.example')
cls.config = config_manager.read()
cls.logger = structlog.get_logger("TESTS")
2023-12-05 14:40:04 +00:00
# ISS
cls.iss_modem = TestModem()
2023-12-05 14:40:04 +00:00
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_queue = queue.Queue()
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_queue,
cls.iss_state_manager,
cls.iss_modem)
2023-12-05 14:40:04 +00:00
# IRS
cls.irs_modem = TestModem()
2023-12-05 14:40:04 +00:00
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_queue = queue.Queue()
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_queue,
cls.irs_state_manager,
cls.irs_modem)
# Frame loss probability in %
2023-12-06 10:34:31 +00:00
cls.loss_probability = 50
2023-12-05 14:40:04 +00:00
2023-12-05 14:40:04 +00:00
def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER):
while True:
frame_bytes = modem_transmit_queue.get()
if random.randint(0, 100) < self.loss_probability:
2023-12-06 10:59:35 +00:00
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
2023-12-06 10:47:47 +00:00
continue
2023-12-05 14:40:04 +00:00
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
2023-12-05 14:40:04 +00:00
def establishChannels(self):
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
args=[self.iss_modem.data_queue_received,
2023-12-05 14:40:04 +00:00
self.irs_frame_dispatcher],
2023-12-05 18:01:48 +00:00
name = "ISS to IRS channel")
2023-12-05 14:40:04 +00:00
self.iss_to_irs_channel.start()
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker,
args=[self.irs_modem.data_queue_received,
2023-12-05 14:40:04 +00:00
self.iss_frame_dispatcher],
name = "IRS to ISS channel")
self.irs_to_iss_channel.start()
def xtestARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability
2023-12-14 16:41:57 +00:00
self.loss_probability = 30
2023-12-05 14:40:04 +00:00
self.establishChannels()
params = {
'dxcall': "DJ2LS-3",
'data': base64.b64encode(bytes("Hello world!", encoding="utf-8")),
}
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem)
def testARQSessionLargePayload(self):
# set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 30
self.establishChannels()
params = {
'dxcall': "DJ2LS-3",
'data': base64.b64encode(np.random.bytes(1000)),
}
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem)
2023-12-05 14:40:04 +00:00
if __name__ == '__main__':
unittest.main()