FreeDATA/freedata_server/arq_session_iss.py

272 lines
12 KiB
Python
Raw Normal View History

2023-12-05 14:40:04 +00:00
import threading
import data_frame_factory
import random
2023-12-05 17:50:39 +00:00
from codec2 import FREEDV_MODE
2023-12-14 16:29:04 +00:00
from modem_frametypes import FRAME_TYPE
2023-12-05 17:50:39 +00:00
import arq_session
2023-12-12 21:05:32 +00:00
import helpers
2023-12-19 14:01:08 +00:00
from enum import Enum
import time
2023-12-05 14:40:04 +00:00
2023-12-19 14:01:08 +00:00
class ISS_State(Enum):
NEW = 0
OPEN_SENT = 1
INFO_SENT = 2
BURST_SENT = 3
ENDED = 4
FAILED = 5
2023-12-29 18:25:59 +00:00
ABORTING = 6 # state while running abort sequence and waiting for stop ack
ABORTED = 7 # stop ack received
2023-12-05 14:40:04 +00:00
2023-12-19 14:01:08 +00:00
class ARQSessionISS(arq_session.ARQSession):
2023-12-05 14:40:04 +00:00
RETRIES_CONNECT = 5
2024-05-06 08:02:05 +00:00
RETRIES_INFO = 10
RETRIES_DATA = 15
RETRIES_STOP = 5
# DJ2LS: 3 seconds seems to be too small for radios with a too slow PTT toggle time
# DJ2LS: 3.5 seconds is working well WITHOUT a channel busy detection delay
2024-03-31 17:02:34 +00:00
TIMEOUT_CHANNEL_BUSY = 0
2023-12-30 20:47:16 +00:00
TIMEOUT_CONNECT_ACK = 3.5 + TIMEOUT_CHANNEL_BUSY
TIMEOUT_TRANSFER = 2.5 + TIMEOUT_CHANNEL_BUSY
2023-12-30 20:47:16 +00:00
TIMEOUT_STOP_ACK = 3.5 + TIMEOUT_CHANNEL_BUSY
2023-12-05 14:40:04 +00:00
2023-12-14 16:29:04 +00:00
STATE_TRANSITION = {
2023-12-19 14:01:08 +00:00
ISS_State.OPEN_SENT: {
2023-12-14 16:29:04 +00:00
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
},
2023-12-19 14:01:08 +00:00
ISS_State.INFO_SENT: {
2023-12-14 16:29:04 +00:00
FRAME_TYPE.ARQ_SESSION_OPEN_ACK.value: 'send_info',
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
},
2023-12-19 14:01:08 +00:00
ISS_State.BURST_SENT: {
2023-12-14 16:29:04 +00:00
FRAME_TYPE.ARQ_SESSION_INFO_ACK.value: 'send_data',
FRAME_TYPE.ARQ_BURST_ACK.value: 'send_data',
},
2023-12-21 14:05:22 +00:00
ISS_State.FAILED:{
2023-12-29 18:25:59 +00:00
FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_aborted'
2023-12-24 12:20:51 +00:00
},
ISS_State.ABORTING: {
FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_aborted',
2024-04-19 14:24:15 +00:00
2023-12-24 12:20:51 +00:00
},
ISS_State.ABORTED: {
FRAME_TYPE.ARQ_STOP_ACK.value: 'transmission_aborted',
2023-12-21 14:05:22 +00:00
}
2023-12-14 16:29:04 +00:00
}
def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes):
super().__init__(config, modem, dxcall, state_manager)
2024-01-04 20:44:59 +00:00
self.state_manager = state_manager
2023-12-05 14:40:04 +00:00
self.data = data
self.total_length = len(data)
2023-12-14 16:29:04 +00:00
self.data_crc = ''
self.type_byte = type_byte
2023-12-14 16:29:04 +00:00
self.confirmed_bytes = 0
2024-03-29 13:28:02 +00:00
self.expected_byte_offset = 0
2023-12-05 14:40:04 +00:00
2023-12-19 14:01:08 +00:00
self.state = ISS_State.NEW
self.state_enum = ISS_State # needed for access State enum from outside
2023-12-14 16:29:04 +00:00
self.id = self.generate_id()
2024-01-04 20:44:59 +00:00
2023-12-05 14:40:04 +00:00
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
def generate_id(self):
2024-01-04 20:44:59 +00:00
while True:
random_int = random.randint(1,255)
if random_int not in self.state_manager.arq_iss_sessions:
return random_int
if len(self.state_manager.arq_iss_sessions) >= 255:
return False
2024-02-23 13:38:08 +00:00
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode, isARQBurst=False, ):
2023-12-29 18:25:59 +00:00
while retries > 0:
2023-12-15 15:40:05 +00:00
self.event_frame_received = threading.Event()
2023-12-14 16:29:04 +00:00
if isinstance(frame_or_burst, list): burst = frame_or_burst
else: burst = [frame_or_burst]
for f in burst:
2023-12-15 13:41:11 +00:00
self.transmit_frame(f, mode)
2023-12-19 22:32:04 +00:00
self.event_frame_received.clear()
2023-12-14 16:57:58 +00:00
self.log(f"Waiting {timeout} seconds...")
2023-12-14 16:29:04 +00:00
if self.event_frame_received.wait(timeout):
2023-12-14 16:57:58 +00:00
return
self.log("Timeout!")
2023-12-05 14:40:04 +00:00
retries = retries - 1
# TODO TEMPORARY TEST FOR SENDING IN LOWER SPEED LEVEL IF WE HAVE TWO FAILED TRANSMISSIONS!!!
if retries == self.RETRIES_DATA - 2 and isARQBurst and self.speed_level > 0 and self.state not in [ISS_State.ABORTED, ISS_State.ABORTING]:
2024-02-23 13:38:08 +00:00
self.log("SENDING IN FALLBACK SPEED LEVEL", isWarning=True)
self.speed_level = 0
2024-03-31 15:56:28 +00:00
print(f" CONFIRMED BYTES: {self.confirmed_bytes}")
self.send_data({'flag':{'ABORT': False, 'FINAL': False}, 'speed_level': self.speed_level}, fallback=True)
return
2023-12-19 14:01:08 +00:00
self.set_state(ISS_State.FAILED)
2023-12-20 18:30:38 +00:00
self.transmission_failed()
2023-12-05 14:40:04 +00:00
def launch_twr(self, frame_or_burst, timeout, retries, mode, isARQBurst=False):
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode, isARQBurst], daemon=True)
2023-12-14 16:29:04 +00:00
twr.start()
2023-12-05 14:40:04 +00:00
2023-12-14 16:29:04 +00:00
def start(self):
maximum_bandwidth = self.config['MODEM']['maximum_bandwidth']
2024-04-07 18:36:34 +00:00
print(maximum_bandwidth)
self.event_manager.send_arq_session_new(
True, self.id, self.dxcall, self.total_length, self.state.name)
2024-04-01 18:16:12 +00:00
session_open_frame = self.frame_factory.build_arq_session_open(self.dxcall, self.id, maximum_bandwidth, self.protocol_version)
2023-12-16 09:28:30 +00:00
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
2023-12-19 14:01:08 +00:00
self.set_state(ISS_State.OPEN_SENT)
2023-12-12 21:05:32 +00:00
2024-02-22 14:05:54 +00:00
def update_speed_level(self, frame):
self.log("---------------------------------------------------------", isWarning=True)
# Log the received frame for debugging
self.log(f"Received frame: {frame}", isWarning=True)
# Extract the speed_level directly from the frame
if 'speed_level' in frame:
new_speed_level = frame['speed_level']
# Ensure the new speed level is within the allowable range
if 0 <= new_speed_level < len(self.SPEED_LEVEL_DICT):
# Log the speed level change if it's different from the current speed level
if new_speed_level != self.speed_level:
self.log(f"Changing speed level from {self.speed_level} to {new_speed_level}", isWarning=True)
self.speed_level = new_speed_level # Update the current speed level
else:
self.log("Received speed level is the same as the current speed level.", isWarning=True)
else:
self.log(f"Received speed level {new_speed_level} is out of allowable range.", isWarning=True)
else:
self.log("No speed level specified in the received frame.", isWarning=True)
2023-12-14 16:29:04 +00:00
2024-01-04 09:01:29 +00:00
def send_info(self, irs_frame):
# check if we received an abort flag
if irs_frame["flag"]["ABORT"]:
2024-04-01 18:24:11 +00:00
return self.transmission_aborted(irs_frame)
2024-01-04 09:01:29 +00:00
info_frame = self.frame_factory.build_arq_session_info(self.id, self.total_length,
2023-12-12 21:05:32 +00:00
helpers.get_crc_32(self.data),
2024-02-24 20:49:53 +00:00
self.snr, self.type_byte)
2023-12-16 16:21:07 +00:00
2024-05-06 08:02:05 +00:00
self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_INFO, mode=FREEDV_MODE.signalling)
2023-12-19 14:01:08 +00:00
self.set_state(ISS_State.INFO_SENT)
2023-12-12 21:05:32 +00:00
return None, None
2024-03-31 15:56:28 +00:00
def send_data(self, irs_frame, fallback=None):
2024-04-01 09:18:45 +00:00
# interrupt transmission when aborting
if self.state in [ISS_State.ABORTED, ISS_State.ABORTING]:
2024-04-19 14:24:15 +00:00
#self.event_frame_received.set()
#self.send_stop()
2024-04-01 09:18:45 +00:00
return
2024-02-24 20:49:53 +00:00
# update statistics
2024-02-27 21:39:45 +00:00
self.update_histograms(self.confirmed_bytes, self.total_length)
2024-02-22 14:05:54 +00:00
self.update_speed_level(irs_frame)
2024-03-29 13:28:02 +00:00
if self.expected_byte_offset > self.total_length:
self.confirmed_bytes = self.total_length
2024-03-31 15:56:28 +00:00
elif not fallback:
2024-03-29 13:28:02 +00:00
self.confirmed_bytes = self.expected_byte_offset
2024-04-01 09:05:21 +00:00
2024-03-29 13:28:02 +00:00
self.log(f"IRS confirmed {self.confirmed_bytes}/{self.total_length} bytes")
2024-05-11 08:43:33 +00:00
self.event_manager.send_arq_session_progress(True, self.id, self.dxcall, self.confirmed_bytes, self.total_length, self.state.name, self.speed_level, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
2023-12-05 14:40:04 +00:00
2024-01-04 09:01:29 +00:00
# check if we received an abort flag
2023-12-28 21:27:49 +00:00
if irs_frame["flag"]["ABORT"]:
self.transmission_aborted(irs_frame)
return None, None
2023-12-28 21:27:49 +00:00
2023-12-20 18:30:38 +00:00
if irs_frame["flag"]["FINAL"]:
if self.confirmed_bytes == self.total_length and irs_frame["flag"]["CHECKSUM"]:
self.transmission_ended(irs_frame)
2023-12-20 18:30:38 +00:00
else:
2023-12-24 12:20:51 +00:00
self.transmission_failed()
return None, None
2023-12-20 15:43:08 +00:00
2023-12-14 16:29:04 +00:00
payload_size = self.get_data_payload_size()
burst = []
2024-03-24 19:07:18 +00:00
for _ in range(0, self.frames_per_burst):
2023-12-14 16:29:04 +00:00
offset = self.confirmed_bytes
2024-03-29 13:28:02 +00:00
#self.expected_byte_offset = offset
2023-12-14 16:29:04 +00:00
payload = self.data[offset : offset + payload_size]
2024-03-29 13:28:02 +00:00
#self.expected_byte_offset = offset + payload_size
self.expected_byte_offset = offset + len(payload)
2024-04-13 13:00:44 +00:00
#print(f"EXPECTED----------------------{self.expected_byte_offset}")
2023-12-12 21:33:17 +00:00
data_frame = self.frame_factory.build_arq_burst_frame(
2023-12-14 21:53:32 +00:00
self.SPEED_LEVEL_DICT[self.speed_level]["mode"],
2024-03-29 13:28:02 +00:00
self.id, offset, payload, self.speed_level)
2023-12-14 16:29:04 +00:00
burst.append(data_frame)
2024-05-06 08:02:05 +00:00
self.launch_twr(burst, self.TIMEOUT_TRANSFER, self.RETRIES_DATA, mode='auto', isARQBurst=True)
2023-12-19 14:01:08 +00:00
self.set_state(ISS_State.BURST_SENT)
return None, None
2023-12-20 18:30:38 +00:00
def transmission_ended(self, irs_frame):
2023-12-28 21:27:49 +00:00
# final function for sucessfully ended transmissions
self.session_ended = time.time()
2023-12-20 18:30:38 +00:00
self.set_state(ISS_State.ENDED)
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
2024-02-27 21:39:45 +00:00
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
#print(self.state_manager.p2p_connection_sessions)
#print(self.arq_data_type_handler.state_manager.p2p_connection_sessions)
2024-02-27 21:39:45 +00:00
self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
self.state_manager.remove_arq_iss_session(self.id)
2024-02-02 18:37:02 +00:00
self.states.setARQ(False)
return None, None
2023-12-20 18:30:38 +00:00
2023-12-24 15:15:37 +00:00
def transmission_failed(self, irs_frame=None):
2023-12-28 21:27:49 +00:00
# final function for failed transmissions
self.session_ended = time.time()
2023-12-20 18:30:38 +00:00
self.set_state(ISS_State.FAILED)
2024-03-24 19:07:18 +00:00
self.log("Transmission failed!")
2024-02-27 21:39:45 +00:00
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
2024-02-02 18:37:02 +00:00
self.states.setARQ(False)
2024-01-29 16:50:28 +00:00
2024-02-27 21:39:45 +00:00
self.arq_data_type_handler.failed(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
return None, None
2023-12-21 14:05:22 +00:00
2023-12-24 12:20:51 +00:00
def abort_transmission(self, irs_frame=None):
2023-12-28 21:27:49 +00:00
# function for starting the abort sequence
2024-03-24 19:07:18 +00:00
self.log("aborting transmission...")
2023-12-29 18:25:59 +00:00
self.set_state(ISS_State.ABORTING)
2023-12-25 12:26:51 +00:00
self.event_manager.send_arq_session_finished(
2024-02-27 21:39:45 +00:00
True, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
2023-12-28 21:27:49 +00:00
2024-04-19 14:24:15 +00:00
# clear audio out queue
self.modem.audio_out_queue.queue.clear()
# wait for transmit function to be ready before setting event
threading.Event().wait(0.100)
2023-12-28 21:27:49 +00:00
# break actual retries
2023-12-29 18:25:59 +00:00
self.event_frame_received.set()
2023-12-28 21:27:49 +00:00
2024-04-19 14:24:15 +00:00
# sleep some time for avoiding packet collission
threading.Event().wait(self.TIMEOUT_STOP_ACK)
self.send_stop()
2023-12-21 14:44:54 +00:00
def send_stop(self):
2023-12-21 14:05:22 +00:00
stop_frame = self.frame_factory.build_arq_stop(self.id)
2024-05-06 08:02:05 +00:00
self.launch_twr(stop_frame, self.TIMEOUT_STOP_ACK, self.RETRIES_STOP, mode=FREEDV_MODE.signalling)
2023-12-24 12:20:51 +00:00
2023-12-28 21:27:49 +00:00
def transmission_aborted(self, irs_frame):
2023-12-24 12:20:51 +00:00
self.log("session aborted")
self.session_ended = time.time()
2023-12-25 12:26:51 +00:00
self.set_state(ISS_State.ABORTED)
2024-01-04 09:01:29 +00:00
# break actual retries
self.event_frame_received.set()
2023-12-25 12:26:51 +00:00
self.event_manager.send_arq_session_finished(
2024-02-27 21:39:45 +00:00
True, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
self.state_manager.remove_arq_iss_session(self.id)
2024-02-02 18:37:02 +00:00
self.states.setARQ(False)
return None, None