diff --git a/modem/arq_session.py b/modem/arq_session.py index 0fbc3696..f26e642a 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -2,6 +2,7 @@ import queue, threading import codec2 import data_frame_factory import structlog +from event_manager import EventManager from modem_frametypes import FRAME_TYPE class ARQSession(): @@ -37,6 +38,8 @@ class ARQSession(): self.logger = structlog.get_logger(type(self).__name__) self.config = config + self.event_manager: EventManager = modem.event_manager + self.snr = [] self.dxcall = dxcall @@ -67,7 +70,10 @@ class ARQSession(): self.modem.transmit(mode, 1, 1, frame) def set_state(self, state): - self.log(f"{type(self).__name__} state change from {self.state} to {state}") + if self.state == state: + self.log(f"{type(self).__name__} state {self.state.name} unchanged.") + else: + self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}") self.state = state def get_data_payload_size(self): diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 9f4eed55..1bb512b8 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -3,17 +3,18 @@ import arq_session import helpers from modem_frametypes import FRAME_TYPE from codec2 import FREEDV_MODE +from enum import Enum +class IRS_State(Enum): + NEW = 0 + OPEN_ACK_SENT = 1 + INFO_ACK_SENT = 2 + BURST_REPLY_SENT = 3 + ENDED = 4 + FAILED = 5 class ARQSessionIRS(arq_session.ARQSession): - STATE_NEW = 0 - STATE_OPEN_ACK_SENT = 1 - STATE_INFO_ACK_SENT = 2 - STATE_BURST_REPLY_SENT = 3 - STATE_ENDED = 4 - STATE_FAILED = 5 - RETRIES_CONNECT = 3 RETRIES_TRANSFER = 3 # we need to increase this @@ -21,18 +22,18 @@ class ARQSessionIRS(arq_session.ARQSession): TIMEOUT_DATA = 12 STATE_TRANSITION = { - STATE_NEW: { + IRS_State.NEW: { FRAME_TYPE.ARQ_SESSION_OPEN.value : 'send_open_ack', }, - STATE_OPEN_ACK_SENT: { + IRS_State.OPEN_ACK_SENT: { FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack', FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack', }, - STATE_INFO_ACK_SENT: { + IRS_State.INFO_ACK_SENT: { FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack', FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', }, - STATE_BURST_REPLY_SENT: { + IRS_State.BURST_REPLY_SENT: { FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data', }, } @@ -44,7 +45,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.dxcall = dxcall self.version = 1 - self.state = self.STATE_NEW + self.state = IRS_State.NEW self.total_length = 0 self.total_crc = '' @@ -69,14 +70,14 @@ class ARQSessionIRS(arq_session.ARQSession): self.log(f"Waiting {timeout} seconds...") if not self.event_frame_received.wait(timeout): # use case: data burst got lost, we want to send a NACK with updated speed level - if self.state in [self.STATE_BURST_REPLY_SENT, self.STATE_INFO_ACK_SENT]: + if self.state in [IRS_State.BURST_REPLY_SENT, IRS_State.INFO_ACK_SENT]: self.transmitted_acks = 0 self.calibrate_speed_settings() self.send_burst_nack() return self.log("Timeout waiting for ISS. Session failed.") - self.set_state(self.STATE_FAILED) + self.set_state(IRS_State.FAILED) def launch_transmit_and_wait(self, frame, timeout, mode): thread_wait = threading.Thread(target = self.transmit_and_wait, @@ -90,7 +91,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.version, self.snr[0]) self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_OPEN_ACK_SENT) + self.set_state(IRS_State.OPEN_ACK_SENT) def send_info_ack(self, info_frame): # Get session info from ISS @@ -107,7 +108,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.id, self.total_crc, self.snr[0], self.speed_level, self.frames_per_burst) self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_INFO_ACK_SENT) + self.set_state(IRS_State.INFO_ACK_SENT) def send_burst_nack(self): self.calibrate_speed_settings() @@ -147,17 +148,17 @@ class ARQSessionIRS(arq_session.ARQSession): # increase ack counter self.transmitted_acks += 1 self.transmit_frame(ack, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_BURST_REPLY_SENT) + self.set_state(IRS_State.BURST_REPLY_SENT) return if self.final_crc_matches(): self.log("All data received successfully!") self.transmit_frame(ack, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_ENDED) + self.set_state(IRS_State.ENDED) else: self.log("CRC fail at the end of transmission!") - self.set_state(self.STATE_FAILED) + self.set_state(IRS_State.FAILED) def calibrate_speed_settings(self): self.speed_level = 0 # for now stay at lowest speed level diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 24dca43e..ba4d7cc6 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -6,29 +6,31 @@ from codec2 import FREEDV_MODE from modem_frametypes import FRAME_TYPE import arq_session import helpers +from enum import Enum + +class ISS_State(Enum): + NEW = 0 + OPEN_SENT = 1 + INFO_SENT = 2 + BURST_SENT = 3 + ENDED = 4 + FAILED = 5 class ARQSessionISS(arq_session.ARQSession): - STATE_NEW = 0 - STATE_OPEN_SENT = 1 - STATE_INFO_SENT = 2 - STATE_BURST_SENT = 3 - STATE_ENDED = 4 - STATE_FAILED = 5 - RETRIES_CONNECT = 10 TIMEOUT_CONNECT_ACK = 3 TIMEOUT_TRANSFER = 3 STATE_TRANSITION = { - STATE_OPEN_SENT: { + ISS_State.OPEN_SENT: { FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info', }, - STATE_INFO_SENT: { + ISS_State.INFO_SENT: { FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info', FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data', }, - STATE_BURST_SENT: { + ISS_State.BURST_SENT: { FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data', FRAME_TYPE.ARQ_BURST_ACK.value: 'send_data', FRAME_TYPE.ARQ_BURST_NACK.value: 'send_data', @@ -42,7 +44,7 @@ class ARQSessionISS(arq_session.ARQSession): self.confirmed_bytes = 0 - self.state = self.STATE_NEW + self.state = ISS_State.NEW self.id = self.generate_id() self.frame_factory = data_frame_factory.DataFrameFactory(self.config) @@ -62,7 +64,7 @@ class ARQSessionISS(arq_session.ARQSession): return self.log("Timeout!") retries = retries - 1 - self.set_state(self.STATE_FAILED) + self.set_state(ISS_State.FAILED) self.log("Session failed") def launch_twr(self, frame_or_burst, timeout, retries, mode): @@ -72,7 +74,7 @@ class ARQSessionISS(arq_session.ARQSession): def start(self): session_open_frame = self.frame_factory.build_arq_session_open(self.dxcall, self.id) self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_OPEN_SENT) + self.set_state(ISS_State.OPEN_SENT) def set_speed_and_frames_per_burst(self, frame): self.speed_level = frame['speed_level'] @@ -86,7 +88,7 @@ class ARQSessionISS(arq_session.ARQSession): self.snr[0]) self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) - self.set_state(self.STATE_INFO_SENT) + self.set_state(ISS_State.INFO_SENT) def send_data(self, irs_frame): self.set_speed_and_frames_per_burst(irs_frame) @@ -96,7 +98,7 @@ class ARQSessionISS(arq_session.ARQSession): self.log(f"IRS confirmed {self.confirmed_bytes}/{len(self.data)} bytes") if self.confirmed_bytes == len(self.data): - self.set_state(self.STATE_ENDED) + self.set_state(ISS_State.ENDED) self.log("All data transfered!") return payload_size = self.get_data_payload_size() @@ -109,4 +111,4 @@ class ARQSessionISS(arq_session.ARQSession): self.id, self.confirmed_bytes, payload) burst.append(data_frame) self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_CONNECT, mode='auto') - self.set_state(self.STATE_BURST_SENT) + self.set_state(ISS_State.BURST_SENT) diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index 0340a60d..555229ce 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -14,11 +14,13 @@ from frame_dispatcher import DISPATCHER import random import structlog import numpy as np +from event_manager import EventManager class TestModem: def __init__(self): self.data_queue_received = queue.Queue() self.demodulator = unittest.mock.Mock() + self.event_manager = EventManager([queue.Queue()]) def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool: self.data_queue_received.put(frames)