diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 74b2d6a5..f26fbf12 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -37,6 +37,9 @@ class ARQSessionIRS(arq_session.ARQSession): IRS_State.BURST_REPLY_SENT: { FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', }, + IRS_State.ENDED: { + FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', + }, } def __init__(self, config: dict, modem, dxcall: str, session_id: int): diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index 8a9c0d72..03419901 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -17,10 +17,10 @@ import numpy as np from event_manager import EventManager class TestModem: - def __init__(self): + def __init__(self, event_q): self.data_queue_received = queue.Queue() self.demodulator = unittest.mock.Mock() - self.event_manager = EventManager([queue.Queue()]) + self.event_manager = EventManager([event_q]) def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: self.data_queue_received.put(frames) @@ -31,22 +31,21 @@ class TestARQSession(unittest.TestCase): def setUpClass(cls): config_manager = CONFIG('modem/config.ini.example') cls.config = config_manager.read() - cls.logger = structlog.get_logger("TESTS") # ISS - cls.iss_modem = TestModem() cls.iss_state_manager = StateManager(queue.Queue()) cls.iss_event_queue = queue.Queue() + cls.iss_modem = TestModem(cls.iss_event_queue) cls.iss_frame_dispatcher = DISPATCHER(cls.config, cls.iss_event_queue, cls.iss_state_manager, cls.iss_modem) # IRS - cls.irs_modem = TestModem() cls.irs_state_manager = StateManager(queue.Queue()) cls.irs_event_queue = queue.Queue() + cls.irs_modem = TestModem(cls.irs_event_queue) cls.irs_frame_dispatcher = DISPATCHER(cls.config, cls.irs_event_queue, cls.irs_state_manager, @@ -55,18 +54,31 @@ class TestARQSession(unittest.TestCase): # Frame loss probability in % cls.loss_probability = 30 + cls.channels_running = True - - def channelWorker(self, modem_transmit_queue: queue, frame_dispatcher: DISPATCHER): - while True: - frame_bytes = modem_transmit_queue.get() - if random.randint(0, 100) < self.loss_probability: - self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER): + while self.channels_running: + # Transfer data between both parties + try: + frame_bytes = modem_transmit_queue.get(timeout=1) + if random.randint(0, 100) < self.loss_probability: + self.logger.info(f"[{threading.current_thread().name}] Frame lost...") + continue + frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) + except queue.Empty: continue - frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0) - + self.logger.info(f"[{threading.current_thread().name}] Channel closed.") + def waitForSession(self, q, outbound = False): + key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound' + while True: + ev = q.get() + if key in ev and 'success' in ev[key]: + self.logger.info(f"[{threading.current_thread().name}] {key} session ended.") + break + def establishChannels(self): + self.channels_running = True self.iss_to_irs_channel = threading.Thread(target=self.channelWorker, args=[self.iss_modem.data_queue_received, self.irs_frame_dispatcher], @@ -79,7 +91,12 @@ class TestARQSession(unittest.TestCase): name = "IRS to ISS channel") self.irs_to_iss_channel.start() - def xtestARQSessionSmallPayload(self): + def waitAndCloseChannels(self): + self.waitForSession(self.iss_event_queue, True) + self.waitForSession(self.irs_event_queue, False) + self.channels_running = False + + def testARQSessionSmallPayload(self): # set Packet Error Rate (PER) / frame loss probability self.loss_probability = 30 @@ -90,10 +107,11 @@ class TestARQSession(unittest.TestCase): } cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd.run(self.iss_event_queue, self.iss_modem) + self.waitAndCloseChannels() def testARQSessionLargePayload(self): # set Packet Error Rate (PER) / frame loss probability - self.loss_probability = 50 + self.loss_probability = 10 self.establishChannels() params = { @@ -103,6 +121,7 @@ class TestARQSession(unittest.TestCase): cmd = ARQRawCommand(self.config, self.iss_state_manager, self.iss_event_queue, params) cmd.run(self.iss_event_queue, self.iss_modem) + self.waitAndCloseChannels() if __name__ == '__main__': unittest.main()