mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
Improve ARQ session states using Enum
This commit is contained in:
parent
6a596f1087
commit
726385361e
4 changed files with 47 additions and 36 deletions
|
@ -2,6 +2,7 @@ import queue, threading
|
||||||
import codec2
|
import codec2
|
||||||
import data_frame_factory
|
import data_frame_factory
|
||||||
import structlog
|
import structlog
|
||||||
|
from event_manager import EventManager
|
||||||
from modem_frametypes import FRAME_TYPE
|
from modem_frametypes import FRAME_TYPE
|
||||||
|
|
||||||
class ARQSession():
|
class ARQSession():
|
||||||
|
@ -37,6 +38,8 @@ class ARQSession():
|
||||||
self.logger = structlog.get_logger(type(self).__name__)
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
|
self.event_manager: EventManager = modem.event_manager
|
||||||
|
|
||||||
self.snr = []
|
self.snr = []
|
||||||
|
|
||||||
self.dxcall = dxcall
|
self.dxcall = dxcall
|
||||||
|
@ -67,7 +70,10 @@ class ARQSession():
|
||||||
self.modem.transmit(mode, 1, 1, frame)
|
self.modem.transmit(mode, 1, 1, frame)
|
||||||
|
|
||||||
def set_state(self, state):
|
def set_state(self, state):
|
||||||
self.log(f"{type(self).__name__} state change from {self.state} to {state}")
|
if self.state == state:
|
||||||
|
self.log(f"{type(self).__name__} state {self.state.name} unchanged.")
|
||||||
|
else:
|
||||||
|
self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}")
|
||||||
self.state = state
|
self.state = state
|
||||||
|
|
||||||
def get_data_payload_size(self):
|
def get_data_payload_size(self):
|
||||||
|
|
|
@ -3,17 +3,18 @@ import arq_session
|
||||||
import helpers
|
import helpers
|
||||||
from modem_frametypes import FRAME_TYPE
|
from modem_frametypes import FRAME_TYPE
|
||||||
from codec2 import FREEDV_MODE
|
from codec2 import FREEDV_MODE
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
class IRS_State(Enum):
|
||||||
|
NEW = 0
|
||||||
|
OPEN_ACK_SENT = 1
|
||||||
|
INFO_ACK_SENT = 2
|
||||||
|
BURST_REPLY_SENT = 3
|
||||||
|
ENDED = 4
|
||||||
|
FAILED = 5
|
||||||
|
|
||||||
class ARQSessionIRS(arq_session.ARQSession):
|
class ARQSessionIRS(arq_session.ARQSession):
|
||||||
|
|
||||||
STATE_NEW = 0
|
|
||||||
STATE_OPEN_ACK_SENT = 1
|
|
||||||
STATE_INFO_ACK_SENT = 2
|
|
||||||
STATE_BURST_REPLY_SENT = 3
|
|
||||||
STATE_ENDED = 4
|
|
||||||
STATE_FAILED = 5
|
|
||||||
|
|
||||||
RETRIES_CONNECT = 3
|
RETRIES_CONNECT = 3
|
||||||
RETRIES_TRANSFER = 3 # we need to increase this
|
RETRIES_TRANSFER = 3 # we need to increase this
|
||||||
|
|
||||||
|
@ -21,18 +22,18 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
TIMEOUT_DATA = 12
|
TIMEOUT_DATA = 12
|
||||||
|
|
||||||
STATE_TRANSITION = {
|
STATE_TRANSITION = {
|
||||||
STATE_NEW: {
|
IRS_State.NEW: {
|
||||||
FRAME_TYPE.ARQ_SESSION_OPEN.value : 'send_open_ack',
|
FRAME_TYPE.ARQ_SESSION_OPEN.value : 'send_open_ack',
|
||||||
},
|
},
|
||||||
STATE_OPEN_ACK_SENT: {
|
IRS_State.OPEN_ACK_SENT: {
|
||||||
FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack',
|
FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack',
|
||||||
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
||||||
},
|
},
|
||||||
STATE_INFO_ACK_SENT: {
|
IRS_State.INFO_ACK_SENT: {
|
||||||
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
||||||
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
||||||
},
|
},
|
||||||
STATE_BURST_REPLY_SENT: {
|
IRS_State.BURST_REPLY_SENT: {
|
||||||
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -44,7 +45,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.dxcall = dxcall
|
self.dxcall = dxcall
|
||||||
self.version = 1
|
self.version = 1
|
||||||
|
|
||||||
self.state = self.STATE_NEW
|
self.state = IRS_State.NEW
|
||||||
|
|
||||||
self.total_length = 0
|
self.total_length = 0
|
||||||
self.total_crc = ''
|
self.total_crc = ''
|
||||||
|
@ -69,14 +70,14 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.log(f"Waiting {timeout} seconds...")
|
self.log(f"Waiting {timeout} seconds...")
|
||||||
if not self.event_frame_received.wait(timeout):
|
if not self.event_frame_received.wait(timeout):
|
||||||
# use case: data burst got lost, we want to send a NACK with updated speed level
|
# use case: data burst got lost, we want to send a NACK with updated speed level
|
||||||
if self.state in [self.STATE_BURST_REPLY_SENT, self.STATE_INFO_ACK_SENT]:
|
if self.state in [IRS_State.BURST_REPLY_SENT, IRS_State.INFO_ACK_SENT]:
|
||||||
self.transmitted_acks = 0
|
self.transmitted_acks = 0
|
||||||
self.calibrate_speed_settings()
|
self.calibrate_speed_settings()
|
||||||
self.send_burst_nack()
|
self.send_burst_nack()
|
||||||
return
|
return
|
||||||
|
|
||||||
self.log("Timeout waiting for ISS. Session failed.")
|
self.log("Timeout waiting for ISS. Session failed.")
|
||||||
self.set_state(self.STATE_FAILED)
|
self.set_state(IRS_State.FAILED)
|
||||||
|
|
||||||
def launch_transmit_and_wait(self, frame, timeout, mode):
|
def launch_transmit_and_wait(self, frame, timeout, mode):
|
||||||
thread_wait = threading.Thread(target = self.transmit_and_wait,
|
thread_wait = threading.Thread(target = self.transmit_and_wait,
|
||||||
|
@ -90,7 +91,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.version,
|
self.version,
|
||||||
self.snr[0])
|
self.snr[0])
|
||||||
self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_OPEN_ACK_SENT)
|
self.set_state(IRS_State.OPEN_ACK_SENT)
|
||||||
|
|
||||||
def send_info_ack(self, info_frame):
|
def send_info_ack(self, info_frame):
|
||||||
# Get session info from ISS
|
# Get session info from ISS
|
||||||
|
@ -107,7 +108,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
self.id, self.total_crc, self.snr[0],
|
self.id, self.total_crc, self.snr[0],
|
||||||
self.speed_level, self.frames_per_burst)
|
self.speed_level, self.frames_per_burst)
|
||||||
self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_INFO_ACK_SENT)
|
self.set_state(IRS_State.INFO_ACK_SENT)
|
||||||
|
|
||||||
def send_burst_nack(self):
|
def send_burst_nack(self):
|
||||||
self.calibrate_speed_settings()
|
self.calibrate_speed_settings()
|
||||||
|
@ -147,17 +148,17 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
# increase ack counter
|
# increase ack counter
|
||||||
self.transmitted_acks += 1
|
self.transmitted_acks += 1
|
||||||
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_BURST_REPLY_SENT)
|
self.set_state(IRS_State.BURST_REPLY_SENT)
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.final_crc_matches():
|
if self.final_crc_matches():
|
||||||
self.log("All data received successfully!")
|
self.log("All data received successfully!")
|
||||||
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_ENDED)
|
self.set_state(IRS_State.ENDED)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
self.log("CRC fail at the end of transmission!")
|
self.log("CRC fail at the end of transmission!")
|
||||||
self.set_state(self.STATE_FAILED)
|
self.set_state(IRS_State.FAILED)
|
||||||
|
|
||||||
def calibrate_speed_settings(self):
|
def calibrate_speed_settings(self):
|
||||||
self.speed_level = 0 # for now stay at lowest speed level
|
self.speed_level = 0 # for now stay at lowest speed level
|
||||||
|
|
|
@ -6,29 +6,31 @@ from codec2 import FREEDV_MODE
|
||||||
from modem_frametypes import FRAME_TYPE
|
from modem_frametypes import FRAME_TYPE
|
||||||
import arq_session
|
import arq_session
|
||||||
import helpers
|
import helpers
|
||||||
|
from enum import Enum
|
||||||
|
|
||||||
|
class ISS_State(Enum):
|
||||||
|
NEW = 0
|
||||||
|
OPEN_SENT = 1
|
||||||
|
INFO_SENT = 2
|
||||||
|
BURST_SENT = 3
|
||||||
|
ENDED = 4
|
||||||
|
FAILED = 5
|
||||||
|
|
||||||
class ARQSessionISS(arq_session.ARQSession):
|
class ARQSessionISS(arq_session.ARQSession):
|
||||||
|
|
||||||
STATE_NEW = 0
|
|
||||||
STATE_OPEN_SENT = 1
|
|
||||||
STATE_INFO_SENT = 2
|
|
||||||
STATE_BURST_SENT = 3
|
|
||||||
STATE_ENDED = 4
|
|
||||||
STATE_FAILED = 5
|
|
||||||
|
|
||||||
RETRIES_CONNECT = 10
|
RETRIES_CONNECT = 10
|
||||||
TIMEOUT_CONNECT_ACK = 3
|
TIMEOUT_CONNECT_ACK = 3
|
||||||
TIMEOUT_TRANSFER = 3
|
TIMEOUT_TRANSFER = 3
|
||||||
|
|
||||||
STATE_TRANSITION = {
|
STATE_TRANSITION = {
|
||||||
STATE_OPEN_SENT: {
|
ISS_State.OPEN_SENT: {
|
||||||
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
|
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
|
||||||
},
|
},
|
||||||
STATE_INFO_SENT: {
|
ISS_State.INFO_SENT: {
|
||||||
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
|
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
|
||||||
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
|
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
|
||||||
},
|
},
|
||||||
STATE_BURST_SENT: {
|
ISS_State.BURST_SENT: {
|
||||||
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
|
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
|
||||||
FRAME_TYPE.ARQ_BURST_ACK.value: 'send_data',
|
FRAME_TYPE.ARQ_BURST_ACK.value: 'send_data',
|
||||||
FRAME_TYPE.ARQ_BURST_NACK.value: 'send_data',
|
FRAME_TYPE.ARQ_BURST_NACK.value: 'send_data',
|
||||||
|
@ -42,7 +44,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
|
|
||||||
self.confirmed_bytes = 0
|
self.confirmed_bytes = 0
|
||||||
|
|
||||||
self.state = self.STATE_NEW
|
self.state = ISS_State.NEW
|
||||||
self.id = self.generate_id()
|
self.id = self.generate_id()
|
||||||
|
|
||||||
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
||||||
|
@ -62,7 +64,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
return
|
return
|
||||||
self.log("Timeout!")
|
self.log("Timeout!")
|
||||||
retries = retries - 1
|
retries = retries - 1
|
||||||
self.set_state(self.STATE_FAILED)
|
self.set_state(ISS_State.FAILED)
|
||||||
self.log("Session failed")
|
self.log("Session failed")
|
||||||
|
|
||||||
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
||||||
|
@ -72,7 +74,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
def start(self):
|
def start(self):
|
||||||
session_open_frame = self.frame_factory.build_arq_session_open(self.dxcall, self.id)
|
session_open_frame = self.frame_factory.build_arq_session_open(self.dxcall, self.id)
|
||||||
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_OPEN_SENT)
|
self.set_state(ISS_State.OPEN_SENT)
|
||||||
|
|
||||||
def set_speed_and_frames_per_burst(self, frame):
|
def set_speed_and_frames_per_burst(self, frame):
|
||||||
self.speed_level = frame['speed_level']
|
self.speed_level = frame['speed_level']
|
||||||
|
@ -86,7 +88,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.snr[0])
|
self.snr[0])
|
||||||
|
|
||||||
self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
self.set_state(self.STATE_INFO_SENT)
|
self.set_state(ISS_State.INFO_SENT)
|
||||||
|
|
||||||
def send_data(self, irs_frame):
|
def send_data(self, irs_frame):
|
||||||
self.set_speed_and_frames_per_burst(irs_frame)
|
self.set_speed_and_frames_per_burst(irs_frame)
|
||||||
|
@ -96,7 +98,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.log(f"IRS confirmed {self.confirmed_bytes}/{len(self.data)} bytes")
|
self.log(f"IRS confirmed {self.confirmed_bytes}/{len(self.data)} bytes")
|
||||||
|
|
||||||
if self.confirmed_bytes == len(self.data):
|
if self.confirmed_bytes == len(self.data):
|
||||||
self.set_state(self.STATE_ENDED)
|
self.set_state(ISS_State.ENDED)
|
||||||
self.log("All data transfered!")
|
self.log("All data transfered!")
|
||||||
return
|
return
|
||||||
payload_size = self.get_data_payload_size()
|
payload_size = self.get_data_payload_size()
|
||||||
|
@ -109,4 +111,4 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.id, self.confirmed_bytes, payload)
|
self.id, self.confirmed_bytes, payload)
|
||||||
burst.append(data_frame)
|
burst.append(data_frame)
|
||||||
self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_CONNECT, mode='auto')
|
self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_CONNECT, mode='auto')
|
||||||
self.set_state(self.STATE_BURST_SENT)
|
self.set_state(ISS_State.BURST_SENT)
|
||||||
|
|
|
@ -14,11 +14,13 @@ from frame_dispatcher import DISPATCHER
|
||||||
import random
|
import random
|
||||||
import structlog
|
import structlog
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
from event_manager import EventManager
|
||||||
|
|
||||||
class TestModem:
|
class TestModem:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.data_queue_received = queue.Queue()
|
self.data_queue_received = queue.Queue()
|
||||||
self.demodulator = unittest.mock.Mock()
|
self.demodulator = unittest.mock.Mock()
|
||||||
|
self.event_manager = EventManager([queue.Queue()])
|
||||||
|
|
||||||
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
|
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
|
||||||
self.data_queue_received.put(frames)
|
self.data_queue_received.put(frames)
|
||||||
|
|
Loading…
Reference in a new issue