mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
268 lines
11 KiB
Python
268 lines
11 KiB
Python
import threading
|
|
import arq_session
|
|
import helpers
|
|
from modem_frametypes import FRAME_TYPE
|
|
from codec2 import FREEDV_MODE
|
|
from enum import Enum
|
|
import time
|
|
|
|
class IRS_State(Enum):
|
|
NEW = 0
|
|
OPEN_ACK_SENT = 1
|
|
INFO_ACK_SENT = 2
|
|
BURST_REPLY_SENT = 3
|
|
ENDED = 4
|
|
FAILED = 5
|
|
ABORTED = 6
|
|
|
|
class ARQSessionIRS(arq_session.ARQSession):
|
|
|
|
TIMEOUT_CONNECT = 55 #14.2
|
|
TIMEOUT_DATA = 120
|
|
|
|
STATE_TRANSITION = {
|
|
IRS_State.NEW: {
|
|
FRAME_TYPE.ARQ_SESSION_OPEN.value : 'send_open_ack',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
},
|
|
IRS_State.OPEN_ACK_SENT: {
|
|
FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack',
|
|
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
|
|
},
|
|
IRS_State.INFO_ACK_SENT: {
|
|
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
|
|
},
|
|
IRS_State.BURST_REPLY_SENT: {
|
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
|
|
},
|
|
IRS_State.ENDED: {
|
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
|
|
},
|
|
IRS_State.FAILED: {
|
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack'
|
|
},
|
|
IRS_State.ABORTED: {
|
|
FRAME_TYPE.ARQ_STOP.value: 'send_stop_ack',
|
|
FRAME_TYPE.ARQ_SESSION_OPEN.value: 'send_open_ack',
|
|
FRAME_TYPE.ARQ_SESSION_INFO.value: 'send_info_ack',
|
|
FRAME_TYPE.ARQ_BURST_FRAME.value: 'receive_data',
|
|
},
|
|
}
|
|
|
|
def __init__(self, config: dict, modem, dxcall: str, session_id: int):
|
|
super().__init__(config, modem, dxcall)
|
|
|
|
self.id = session_id
|
|
self.dxcall = dxcall
|
|
self.version = 1
|
|
|
|
self.state = IRS_State.NEW
|
|
self.state_enum = IRS_State # needed for access State enum from outside
|
|
|
|
self.type_byte = None
|
|
self.total_length = 0
|
|
self.total_crc = ''
|
|
self.received_data = None
|
|
self.received_bytes = 0
|
|
self.received_crc = None
|
|
|
|
self.abort = False
|
|
|
|
def all_data_received(self):
|
|
return self.total_length == self.received_bytes
|
|
|
|
def final_crc_matches(self) -> bool:
|
|
return self.total_crc == helpers.get_crc_32(bytes(self.received_data)).hex()
|
|
|
|
def transmit_and_wait(self, frame, timeout, mode):
|
|
self.event_frame_received.clear()
|
|
self.transmit_frame(frame, mode)
|
|
self.log(f"Waiting {timeout} seconds...")
|
|
if not self.event_frame_received.wait(timeout):
|
|
self.log("Timeout waiting for ISS. Session failed.")
|
|
self.transmission_failed()
|
|
|
|
def launch_transmit_and_wait(self, frame, timeout, mode):
|
|
thread_wait = threading.Thread(target = self.transmit_and_wait,
|
|
args = [frame, timeout, mode], daemon=True)
|
|
thread_wait.start()
|
|
|
|
def send_open_ack(self, open_frame):
|
|
self.event_manager.send_arq_session_new(
|
|
False, self.id, self.dxcall, 0, self.state.name)
|
|
ack_frame = self.frame_factory.build_arq_session_open_ack(
|
|
self.id,
|
|
self.dxcall,
|
|
self.version,
|
|
self.snr, flag_abort=self.abort)
|
|
self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
|
if not self.abort:
|
|
self.set_state(IRS_State.OPEN_ACK_SENT)
|
|
return None, None
|
|
|
|
def send_info_ack(self, info_frame):
|
|
# Get session info from ISS
|
|
self.received_data = bytearray(info_frame['total_length'])
|
|
self.total_length = info_frame['total_length']
|
|
self.total_crc = info_frame['total_crc']
|
|
self.dx_snr.append(info_frame['snr'])
|
|
self.type_byte = info_frame['type']
|
|
|
|
self.calibrate_speed_settings()
|
|
|
|
self.log(f"New transfer of {self.total_length} bytes")
|
|
self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length, self.state.name)
|
|
|
|
info_ack = self.frame_factory.build_arq_session_info_ack(
|
|
self.id, self.total_crc, self.snr,
|
|
self.speed_level, self.frames_per_burst, flag_abort=self.abort)
|
|
self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
|
if not self.abort:
|
|
self.set_state(IRS_State.INFO_ACK_SENT)
|
|
return None, None
|
|
|
|
def process_incoming_data(self, frame):
|
|
if frame['offset'] != self.received_bytes:
|
|
self.log(f"Discarding data offset {frame['offset']}")
|
|
return False
|
|
|
|
remaining_data_length = self.total_length - self.received_bytes
|
|
|
|
# Is this the last data part?
|
|
if remaining_data_length <= len(frame['data']):
|
|
# we only want the remaining length, not the entire frame data
|
|
data_part = frame['data'][:remaining_data_length]
|
|
else:
|
|
# we want the entire frame data
|
|
data_part = frame['data']
|
|
|
|
self.received_data[frame['offset']:] = data_part
|
|
self.received_bytes += len(data_part)
|
|
self.log(f"Received {self.received_bytes}/{self.total_length} bytes")
|
|
self.event_manager.send_arq_session_progress(
|
|
False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name, self.calculate_session_statistics(self.received_bytes, self.total_length))
|
|
|
|
return True
|
|
|
|
def receive_data(self, burst_frame):
|
|
self.process_incoming_data(burst_frame)
|
|
# update statistics
|
|
self.update_histograms(self.received_bytes, self.total_length)
|
|
|
|
if not self.all_data_received():
|
|
self.calibrate_speed_settings(burst_frame=burst_frame)
|
|
ack = self.frame_factory.build_arq_burst_ack(
|
|
self.id,
|
|
self.received_bytes,
|
|
self.speed_level,
|
|
self.frames_per_burst,
|
|
self.snr,
|
|
flag_abort=self.abort
|
|
)
|
|
|
|
self.set_state(IRS_State.BURST_REPLY_SENT)
|
|
self.launch_transmit_and_wait(ack, self.TIMEOUT_DATA, mode=FREEDV_MODE.signalling)
|
|
return None, None
|
|
|
|
if self.final_crc_matches():
|
|
self.log("All data received successfully!")
|
|
ack = self.frame_factory.build_arq_burst_ack(self.id,
|
|
self.received_bytes,
|
|
self.speed_level,
|
|
self.frames_per_burst,
|
|
self.snr,
|
|
flag_final=True,
|
|
flag_checksum=True)
|
|
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
|
self.log("ACK sent")
|
|
self.session_ended = time.time()
|
|
self.set_state(IRS_State.ENDED)
|
|
self.event_manager.send_arq_session_finished(
|
|
False, self.id, self.dxcall, True, self.state.name, data=self.received_data, statistics=self.calculate_session_statistics(self.received_bytes, self.total_length))
|
|
|
|
return self.received_data, self.type_byte
|
|
else:
|
|
|
|
ack = self.frame_factory.build_arq_burst_ack(self.id,
|
|
self.received_bytes,
|
|
self.speed_level,
|
|
self.frames_per_burst,
|
|
self.snr,
|
|
flag_final=True,
|
|
flag_checksum=False)
|
|
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
|
self.log("CRC fail at the end of transmission!")
|
|
return self.transmission_failed()
|
|
|
|
def calibrate_speed_settings(self, burst_frame=None):
|
|
if burst_frame:
|
|
received_speed_level = burst_frame['speed_level']
|
|
else:
|
|
received_speed_level = 0
|
|
|
|
latest_snr = self.snr if self.snr else -10
|
|
appropriate_speed_level = self.get_appropriate_speed_level(latest_snr)
|
|
modes_to_decode = {}
|
|
|
|
# Log the latest SNR, current, appropriate speed levels, and the previous speed level
|
|
self.log(
|
|
f"Latest SNR: {latest_snr}, Current Speed Level: {self.speed_level}, Appropriate Speed Level: {appropriate_speed_level}, Previous Speed Level: {self.previous_speed_level}",
|
|
isWarning=True)
|
|
|
|
# Adjust the speed level by one step towards the appropriate level, if needed
|
|
if appropriate_speed_level > self.speed_level and self.speed_level < len(self.SPEED_LEVEL_DICT) - 1:
|
|
# we need to ensure, the received data is equal to our speed level before changing it
|
|
if received_speed_level == self.speed_level:
|
|
self.speed_level += 1
|
|
elif appropriate_speed_level < self.speed_level and self.speed_level > 0:
|
|
# we need to ensure, the received data is equal to our speed level before changing it
|
|
if received_speed_level == self.speed_level:
|
|
self.speed_level -= 1
|
|
|
|
# Always decode the current mode
|
|
current_mode = self.get_mode_by_speed_level(self.speed_level).value
|
|
modes_to_decode[current_mode] = True
|
|
|
|
# Decode the previous speed level mode
|
|
if self.previous_speed_level != self.speed_level:
|
|
previous_mode = self.get_mode_by_speed_level(self.previous_speed_level).value
|
|
modes_to_decode[previous_mode] = True
|
|
self.previous_speed_level = self.speed_level # Update the previous speed level
|
|
|
|
self.log(f"Modes to Decode: {list(modes_to_decode.keys())}", isWarning=True)
|
|
# Apply the new decode mode based on the updated and previous speed levels
|
|
self.modem.demodulator.set_decode_mode(modes_to_decode)
|
|
|
|
return self.speed_level
|
|
|
|
def abort_transmission(self):
|
|
self.log("Aborting transmission... setting abort flag")
|
|
self.abort = True
|
|
|
|
def send_stop_ack(self, stop_frame):
|
|
stop_ack = self.frame_factory.build_arq_stop_ack(self.id)
|
|
self.launch_transmit_and_wait(stop_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
|
self.set_state(IRS_State.ABORTED)
|
|
self.states.setARQ(False)
|
|
self.event_manager.send_arq_session_finished(
|
|
False, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics(self.received_bytes, self.total_length))
|
|
return None, None
|
|
|
|
def transmission_failed(self, irs_frame=None):
|
|
# final function for failed transmissions
|
|
self.session_ended = time.time()
|
|
self.set_state(IRS_State.FAILED)
|
|
self.log("Transmission failed!")
|
|
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=self.calculate_session_statistics(self.received_bytes, self.total_length))
|
|
self.states.setARQ(False)
|
|
return None, None
|