Close ARQ test channels on end-to-end tests

This commit is contained in:
Pedro 2023-12-19 23:01:18 +01:00
parent 073966122f
commit 5d8554df08
2 changed files with 37 additions and 15 deletions

View file

@ -37,6 +37,9 @@ class ARQSessionIRS(arq_session.ARQSession):
IRS_State.BURST_REPLY_SENT: { IRS_State.BURST_REPLY_SENT: {
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
}, },
IRS_State.ENDED: {
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
},
} }
def __init__(self, config: dict, modem, dxcall: str, session_id: int): def __init__(self, config: dict, modem, dxcall: str, session_id: int):

View file

@ -17,10 +17,10 @@ import numpy as np
from event_manager import EventManager from event_manager import EventManager
class TestModem: class TestModem:
def __init__(self): def __init__(self, event_q):
self.data_queue_received = queue.Queue() self.data_queue_received = queue.Queue()
self.demodulator = unittest.mock.Mock() self.demodulator = unittest.mock.Mock()
self.event_manager = EventManager([queue.Queue()]) self.event_manager = EventManager([event_q])
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
self.data_queue_received.put(frames) self.data_queue_received.put(frames)
@ -31,22 +31,21 @@ class TestARQSession(unittest.TestCase):
def setUpClass(cls): def setUpClass(cls):
config_manager = CONFIG('modem/config.ini.example') config_manager = CONFIG('modem/config.ini.example')
cls.config = config_manager.read() cls.config = config_manager.read()
cls.logger = structlog.get_logger("TESTS") cls.logger = structlog.get_logger("TESTS")
# ISS # ISS
cls.iss_modem = TestModem()
cls.iss_state_manager = StateManager(queue.Queue()) cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_queue = queue.Queue() cls.iss_event_queue = queue.Queue()
cls.iss_modem = TestModem(cls.iss_event_queue)
cls.iss_frame_dispatcher = DISPATCHER(cls.config, cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_queue, cls.iss_event_queue,
cls.iss_state_manager, cls.iss_state_manager,
cls.iss_modem) cls.iss_modem)
# IRS # IRS
cls.irs_modem = TestModem()
cls.irs_state_manager = StateManager(queue.Queue()) cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_queue = queue.Queue() cls.irs_event_queue = queue.Queue()
cls.irs_modem = TestModem(cls.irs_event_queue)
cls.irs_frame_dispatcher = DISPATCHER(cls.config, cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_queue, cls.irs_event_queue,
cls.irs_state_manager, cls.irs_state_manager,
@ -55,18 +54,31 @@ class TestARQSession(unittest.TestCase):
# Frame loss probability in % # Frame loss probability in %
cls.loss_probability = 30 cls.loss_probability = 30
cls.channels_running = True
def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER):
def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER): while self.channels_running:
while True: # Transfer data between both parties
frame_bytes = modem_transmit_queue.get() try:
if random.randint(0, 100) < self.loss_probability: frame_bytes = modem_transmit_queue.get(timeout=1)
self.logger.info(f"[{threading.current_thread().name}] Frame lost...") if random.randint(0, 100) < self.loss_probability:
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
continue
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
except queue.Empty:
continue continue
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) self.logger.info(f"[{threading.current_thread().name}] Channel closed.")
def waitForSession(self, q, outbound = False):
key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound'
while True:
ev = q.get()
if key in ev and 'success' in ev[key]:
self.logger.info(f"[{threading.current_thread().name}] {key} session ended.")
break
def establishChannels(self): def establishChannels(self):
self.channels_running = True
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
args=[self.iss_modem.data_queue_received, args=[self.iss_modem.data_queue_received,
self.irs_frame_dispatcher], self.irs_frame_dispatcher],
@ -79,7 +91,12 @@ class TestARQSession(unittest.TestCase):
name = "IRS to ISS channel") name = "IRS to ISS channel")
self.irs_to_iss_channel.start() self.irs_to_iss_channel.start()
def xtestARQSessionSmallPayload(self): def waitAndCloseChannels(self):
self.waitForSession(self.iss_event_queue, True)
self.waitForSession(self.irs_event_queue, False)
self.channels_running = False
def testARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability # set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 30 self.loss_probability = 30
@ -90,10 +107,11 @@ class TestARQSession(unittest.TestCase):
} }
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem) cmd.run(self.iss_event_queue, self.iss_modem)
self.waitAndCloseChannels()
def testARQSessionLargePayload(self): def testARQSessionLargePayload(self):
# set Packet Error Rate (PER) / frame loss probability # set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 50 self.loss_probability = 10
self.establishChannels() self.establishChannels()
params = { params = {
@ -103,6 +121,7 @@ class TestARQSession(unittest.TestCase):
cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params)
cmd.run(self.iss_event_queue, self.iss_modem) cmd.run(self.iss_event_queue, self.iss_modem)
self.waitAndCloseChannels()
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()