diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 51ea7047..677fb79d 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -46,7 +46,7 @@ class ARQSessionIRS(arq_session.ARQSession): speed = 1 version = 1 - ack_frame = self.frame_factory.build_arq_connect_ack(isWideband, self.id, speed, version) + ack_frame = self.frame_factory.build_arq_session_connect_ack(isWideband, self.id, speed, version) self.transmit_frame(ack_frame) self.set_modem_decode_modes(None) diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 4a801e64..6620137e 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -79,13 +79,12 @@ class ARQSessionISS(arq_session.ARQSession): max_size = self.get_payload_size(self.speed_level) end_offset = min(len(self.data), max_size) frame_payload = self.data[offset:end_offset] - data_frame = self.frame_factory.build_arq_session_send(self.speed_level, - self.dxcall, - frame_payload) - self.set_state(self.STATE_SENDING) - if not self.send_arq(data_frame): - return False - offset = end_offset + 1 + # TODO build_arq_session_connect is wrong frame. It seems we need to create the correct function for this + #data_frame = self.frame_factory.build_arq_session_connect(self.speed_level, self.dxcall, frame_payload) + #self.set_state(self.STATE_SENDING) + #if not self.send_arq(data_frame): + # return False + #offset = end_offset + 1 # Send part of the payload using ARQ def send_arq(self, frame): diff --git a/modem/data_frame_factory.py b/modem/data_frame_factory.py index 6c328672..62349245 100644 --- a/modem/data_frame_factory.py +++ b/modem/data_frame_factory.py @@ -109,8 +109,8 @@ class DataFrameFactory: "session_id": 1, } # arq connect frames - self.template_list[FR_TYPE.ARQ_DC_OPEN_N.value] = arq_dc_open - self.template_list[FR_TYPE.ARQ_DC_OPEN_W.value] = arq_dc_open + self.template_list[FR_TYPE.ARQ_SESSION_OPEN_N.value] = arq_dc_open + self.template_list[FR_TYPE.ARQ_SESSION_OPEN_W.value] = arq_dc_open # same structure for narrow and wide types arq_dc_open_ack = { @@ -120,12 +120,12 @@ class DataFrameFactory: "arq_protocol_version": 1 } # arq connect ack frames - self.template_list[FR_TYPE.ARQ_DC_OPEN_ACK_N.value] = arq_dc_open_ack - self.template_list[FR_TYPE.ARQ_DC_OPEN_ACK_W.value] = arq_dc_open_ack + self.template_list[FR_TYPE.ARQ_SESSION_OPEN_ACK_N.value] = arq_dc_open_ack + self.template_list[FR_TYPE.ARQ_SESSION_OPEN_ACK_W.value] = arq_dc_open_ack - # arq burst ack + # arq burst ack self.template_list[FR_TYPE.BURST_ACK.value] = { "frame_length": self.LENGTH_SIG1_FRAME, "session_id": 1, @@ -140,7 +140,7 @@ class DataFrameFactory: "session_id": 1, "snr":1 } - + # arq burst nack self.template_list[FR_TYPE.BURST_NACK.value] = { "frame_length": self.LENGTH_SIG1_FRAME, @@ -281,7 +281,7 @@ class DataFrameFactory: } return self.construct(FR_TYPE.FEC_WAKEUP, payload) - + def build_fec(self, mode, payload): mode_int = codec2.freedv_get_mode_value_by_name(mode) payload_per_frame = codec2.get_bytes_per_frame(mode_int) - 2 @@ -296,19 +296,20 @@ class DataFrameFactory: test_frame[:1] = bytes([FR_TYPE.TEST_FRAME.value]) return test_frame - def build_arq_connect(self, isWideband, destination, session_id): - + def build_arq_session_connect(self, isWideband, destination, session_id): + print(isWideband) + print(destination) + print(session_id) payload = { "destination_crc": helpers.get_crc_24(destination), "origin_crc": helpers.get_crc_24(self.myfullcall), "origin": helpers.callsign_to_bytes(self.myfullcall), "session_id": session_id.to_bytes(1, 'big'), } - - channel_type = FR_TYPE.ARQ_DC_OPEN_W if isWideband else FR_TYPE.ARQ_DC_OPEN_N + channel_type = FR_TYPE.ARQ_SESSION_OPEN_W if isWideband else FR_TYPE.ARQ_SESSION_OPEN_N return self.construct(channel_type, payload) - def build_arq_connect_ack(self, isWideband, session_id, speed_level,arq_protocol_version): + def build_arq_session_connect_ack(self, isWideband, session_id, speed_level,arq_protocol_version): #connection_frame = bytearray(self.length_sig0_frame) #connection_frame[:1] = frametype @@ -322,18 +323,7 @@ class DataFrameFactory: "arq_protocol_version": bytes([arq_protocol_version]), } - channel_type = FR_TYPE.ARQ_DC_OPEN_ACK_W if isWideband else FR_TYPE.ARQ_DC_OPEN_ACK_N - return self.construct(channel_type, payload) - - def build_arq_session_connect(self, isWideband, destination, session_id): - - payload = { - "destination_crc": helpers.get_crc_24(destination), - "origin_crc": helpers.get_crc_24(self.myfullcall), - "origin": helpers.callsign_to_bytes(self.myfullcall), - "session_id": session_id.to_bytes(1, 'big'), - } - channel_type = FR_TYPE.ARQ_DC_OPEN_W if isWideband else FR_TYPE.ARQ_DC_OPEN_N + channel_type = FR_TYPE.ARQ_SESSION_OPEN_ACK_W if isWideband else FR_TYPE.ARQ_SESSION_OPEN_ACK_N return self.construct(channel_type, payload) def build_arq_burst_ack(self, session_id: bytes, snr: int, speed_level: int, len_arq_rx_frame_buffer: int): diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index 02803876..c2aeb7e1 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -13,26 +13,26 @@ from data_frame_factory import DataFrameFactory #from deprecated_data_handler_data_broadcasts import DATABROADCAST #from deprecated_data_handler_ping import PING -from protocol_arq_iss import ISS -from protocol_arq_irs import IRS -from protocol_arq import ARQ -from protocol_arq_session import SESSION +from protocol_arq_session_iss import ISS +from protocol_arq_session_irs import IRS +from protocol_arq_session import ARQ +from protocol_arq_connection import SESSION from frame_handler import FrameHandler from frame_handler_ping import PingFrameHandler from frame_handler_cq import CQFrameHandler -from frame_handler_arq import ARQFrameHandler +from frame_handler_arq_session import ARQFrameHandler class DISPATCHER(): FRAME_HANDLER = { - FR_TYPE.ARQ_DC_OPEN_ACK_N.value: {"class": ARQFrameHandler, "name": "ARQ OPEN ACK (Narrow)"}, - FR_TYPE.ARQ_DC_OPEN_ACK_W.value: {"class": ARQFrameHandler, "name": "ARQ OPEN ACK (Wide)"}, - FR_TYPE.ARQ_DC_OPEN_N.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open (Narrow)"}, - FR_TYPE.ARQ_DC_OPEN_W.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open (Wide)"}, - FR_TYPE.ARQ_SESSION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"}, - FR_TYPE.ARQ_SESSION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, - FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.ARQ_SESSION_OPEN_ACK_N.value: {"class": ARQFrameHandler, "name": "ARQ OPEN ACK (Narrow)"}, + FR_TYPE.ARQ_SESSION_OPEN_ACK_W.value: {"class": ARQFrameHandler, "name": "ARQ OPEN ACK (Wide)"}, + FR_TYPE.ARQ_SESSION_OPEN_N.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open (Narrow)"}, + FR_TYPE.ARQ_SESSION_OPEN_W.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open (Wide)"}, + FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"}, + FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"}, + FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"}, FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP TX"}, FR_TYPE.BEACON.value: {"class": FrameHandler, "name": "BEACON"}, FR_TYPE.BURST_ACK.value: {"class": FrameHandler, "name": "BURST ACK"}, @@ -120,7 +120,7 @@ class DISPATCHER(): def get_id_from_frame(self, data): - if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]: + if data[:1] in [FR_TYPE.ARQ_SESSION_OPEN_N, FR_TYPE.ARQ_SESSION_OPEN_W]: return data[13:14] return None @@ -229,31 +229,31 @@ class DISPATCHER(): # Dictionary of functions and log messages used in process_data # instead of a long series of if-elif-else statements. self.rx_dispatcher = { - FR_TYPE.ARQ_DC_OPEN_ACK_N.value: ( + FR_TYPE.ARQ_SESSION_OPEN_ACK_N.value: ( self.arq_iss.arq_received_channel_is_open, "ARQ OPEN ACK (Narrow)", ), - FR_TYPE.ARQ_DC_OPEN_ACK_W.value: ( + FR_TYPE.ARQ_SESSION_OPEN_ACK_W.value: ( self.arq_iss.arq_received_channel_is_open, "ARQ OPEN ACK (Wide)", ), - FR_TYPE.ARQ_DC_OPEN_N.value: ( + FR_TYPE.ARQ_SESSION_OPEN_N.value: ( self.initialize_arq_transmission_irs, "ARQ Data Channel Open (Narrow)", ), - FR_TYPE.ARQ_DC_OPEN_W.value: ( + FR_TYPE.ARQ_SESSION_OPEN_W.value: ( self.initialize_arq_transmission_irs, "ARQ Data Channel Open (Wide)", ), - FR_TYPE.ARQ_SESSION_CLOSE.value: ( + FR_TYPE.ARQ_CONNECTION_CLOSE.value: ( self.arq_session.received_session_close, "ARQ CLOSE SESSION", ), - FR_TYPE.ARQ_SESSION_HB.value: ( + FR_TYPE.ARQ_CONNECTION_HB.value: ( self.arq_session.received_session_heartbeat, "ARQ HEARTBEAT", ), - FR_TYPE.ARQ_SESSION_OPEN.value: ( + FR_TYPE.ARQ_CONNECTION_OPEN.value: ( self.arq_session.received_session_opener, "ARQ OPEN SESSION", ), diff --git a/modem/frame_handler_arq.py b/modem/frame_handler_arq_session.py similarity index 81% rename from modem/frame_handler_arq.py rename to modem/frame_handler_arq_session.py index c811e1a0..f929763a 100644 --- a/modem/frame_handler_arq.py +++ b/modem/frame_handler_arq_session.py @@ -12,7 +12,7 @@ class ARQFrameHandler(frame_handler.FrameHandler): frame = self.details['frame'] # ARQ session open received - if frame['frame_type_int'] in [FR.ARQ_DC_OPEN_N.value, FR.ARQ_DC_OPEN_W.value]: + if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_N.value, FR.ARQ_SESSION_OPEN_W.value]: session = ARQSessionIRS(self.config, self.tx_frame_queue, frame['origin'], frame['session_id']) @@ -20,6 +20,6 @@ class ARQFrameHandler(frame_handler.FrameHandler): session.run() # ARQ session open ack received - if frame['frame_type_int'] in [FR.ARQ_DC_OPEN_ACK_N.value, FR.ARQ_DC_OPEN_ACK_W.value]: + if frame['frame_type_int'] in [FR.ARQ_SESSION_OPEN_ACK_N.value, FR.ARQ_SESSION_OPEN_ACK_W.value]: iss_session:ARQSessionISS = self.states.get_arq_iss_session(frame['session_id']) iss_session.on_connection_ack_received(frame) diff --git a/modem/modem_frametypes.py b/modem/modem_frametypes.py index 6c1a9d6c..728190be 100644 --- a/modem/modem_frametypes.py +++ b/modem/modem_frametypes.py @@ -26,13 +26,20 @@ class FRAME_TYPE(Enum): PING = 210 PING_ACK = 211 IS_WRITING = 215 - ARQ_SESSION_OPEN = 221 - ARQ_SESSION_HB = 222 - ARQ_SESSION_CLOSE = 223 - ARQ_DC_OPEN_W = 225 - ARQ_DC_OPEN_ACK_W = 226 - ARQ_DC_OPEN_N = 227 - ARQ_DC_OPEN_ACK_N = 228 + # ARQ_SESSION_OPEN = 221 # TODO Rename to ARQ_CONNECTION + # ARQ_SESSION_HB = 222 # TODO Rename to ARQ_CONNECTION + # ARQ_SESSION_CLOSE = 223 # TODO Rename to ARQ_CONNECTION + # ARQ_DC_OPEN_W = 225 # TODO Rename to ARQ_SESSION + # ARQ_DC_OPEN_ACK_W = 226 # TODO Rename to ARQ_SESSION + # ARQ_DC_OPEN_N = 227 # TODO Rename to ARQ_SESSION + # ARQ_DC_OPEN_ACK_N = 228 # TODO Rename to ARQ_SESSION + ARQ_CONNECTION_OPEN = 221 + ARQ_CONNECTION_HB = 222 + ARQ_CONNECTION_CLOSE = 223 + ARQ_SESSION_OPEN_W = 225 + ARQ_SESSION_OPEN_ACK_W = 226 + ARQ_SESSION_OPEN_N = 227 + ARQ_SESSION_OPEN_ACK_N = 228 ARQ_STOP = 249 BEACON = 250 FEC = 251 diff --git a/modem/protocol_arq.py b/modem/protocol_arq.py deleted file mode 100644 index e0dcc916..00000000 --- a/modem/protocol_arq.py +++ /dev/null @@ -1,736 +0,0 @@ - -import threading -import time -import codec2 -import helpers -import modem -import stats -import structlog -from data_frame_factory import DataFrameFactory -from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS -from modem_frametypes import FRAME_TYPE as FR_TYPE -import event_manager - -TESTMODE = False -class ARQ: - def __init__(self, config, event_queue, states): - - self.log = structlog.get_logger("DHARQ") - self.event_queue = event_queue - self.states = states - self.event_manager = event_manager.EventManager([event_queue]) - - self.frame_factory = DataFrameFactory(config) - - # ARQ PROTOCOL VERSION - # v.5 - signalling frame uses datac0 - # v.6 - signalling frame uses datac13 - # v.7 - adjusting ARQ timeout - # v.8 - adjusting ARQ structure - self.arq_protocol_version = 8 - - self.stats = stats.stats(config, event_queue, states) - - # load config - self.mycallsign = config['STATION']['mycall'] - self.myssid = config['STATION']['myssid'] - self.mycallsign += "-" + str(self.myssid) - encoded_call = helpers.callsign_to_bytes(self.mycallsign) - self.mycallsign = helpers.bytes_to_callsign(encoded_call) - self.ssid_list = config['STATION']['ssid_list'] - self.mycallsign_crc = helpers.get_crc_24(self.mycallsign) - self.mygrid = config['STATION']['mygrid'] - self.enable_fsk = config['MODEM']['enable_fsk'] - self.respond_to_cq = config['MODEM']['respond_to_cq'] - self.enable_hmac = config['MODEM']['enable_hmac'] - self.enable_stats = config['STATION']['enable_stats'] - self.enable_morse_identifier = config['MODEM']['enable_morse_identifier'] - self.arq_rx_buffer_size = config['MODEM']['rx_buffer_size'] - self.enable_experimental_features = False - # flag to indicate if modem running in low bandwidth mode - self.low_bandwidth_mode = config["MODEM"]["enable_low_bandwidth_mode"] - - # Enable general responding to channel openers for example - # this can be combined with a callsign blacklist for example - self.respond_to_call = True - - - self.modem_frequency_offset = 0 - - self.dxcallsign = b"ZZ9YY-0" - self.dxcallsign_crc = b'' - self.dxgrid = b'' - - # length of signalling frame - self.length_sig0_frame = 14 - self.length_sig1_frame = 14 - - # duration of frames - self.duration_datac4 = 5.17 - self.duration_datac13 = 2.0 - self.duration_datac1 = 4.18 - self.duration_datac3 = 3.19 - self.duration_sig0_frame = self.duration_datac13 - self.duration_sig1_frame = self.duration_datac13 - self.longest_duration = self.duration_datac4 - - # hold session id - self.session_id = bytes(1) - - # ------- ARQ SESSION - self.arq_file_transfer = False - self.IS_ARQ_SESSION_MASTER = False - self.arq_session_last_received = 0 - self.arq_session_timeout = 30 - self.session_connect_max_retries = 10 - - self.arq_compression_factor = 0 - - self.transmission_uuid = "" - - self.burst_last_received = 0.0 # time of last "live sign" of a burst - self.data_channel_last_received = 0.0 # time of last "live sign" of a frame - - # Flag to indicate if we received an ACK frame for a burst - self.burst_ack = False - # Flag to indicate if we received an ACK frame for a data frame - self.data_frame_ack_received = False - # Flag to indicate if we received a request for repeater frames - self.rpt_request_received = False - self.rpt_request_buffer = [] # requested frames, saved in a list - self.burst_rpt_counter = 0 - - - # 3 bytes for the BOF Beginning of File indicator in a data frame - self.data_frame_bof = b"BOF" - # 3 bytes for the EOF End of File indicator in a data frame - self.data_frame_eof = b"EOF" - - - - - - self.n_retries_per_burst = 0 - self.max_n_frames_per_burst = 1 - - # Flag to indicate if we received a low bandwidth mode channel opener - self.received_LOW_BANDWIDTH_MODE = False - - - self.data_channel_max_retries = 15 - - # event for checking arq_state_event - self.arq_state_event = threading.Event() - # -------------- AVAILABLE MODES START----------- - # IMPORTANT: LISTS MUST BE OF EQUAL LENGTH - - # --------------------- LOW BANDWIDTH - - # List of codec2 modes to use in "low bandwidth" mode. - self.mode_list_low_bw = [ - FREEDV_MODE.datac4.value, - ] - # List for minimum SNR operating level for the corresponding mode in self.mode_list - self.snr_list_low_bw = [-100] - # List for time to wait for corresponding mode in seconds - self.time_list_low_bw = [self.duration_datac4] - - # --------------------- HIGH BANDWIDTH - - # List of codec2 modes to use in "high bandwidth" mode. - self.mode_list_high_bw = [ - FREEDV_MODE.datac4.value, - FREEDV_MODE.datac3.value, - FREEDV_MODE.datac1.value, - ] - # List for minimum SNR operating level for the corresponding mode in self.mode_list - self.snr_list_high_bw = [-100, 0, 3] - # List for time to wait for corresponding mode in seconds - # test with 6,7 --> caused sometimes a frame timeout if ack frame takes longer - # TODO Need to check why ACK frames needs more time - # TODO Adjust these times - self.time_list_high_bw = [self.duration_datac4, self.duration_datac3, self.duration_datac1] - # -------------- AVAILABLE MODES END----------- - - # Mode list for selecting between low bandwidth ( 500Hz ) and modes with higher bandwidth - # but ability to fall back to low bandwidth modes if needed. - if self.low_bandwidth_mode: - # List of codec2 modes to use in "low bandwidth" mode. - self.mode_list = self.mode_list_low_bw - # list of times to wait for corresponding mode in seconds - self.time_list = self.time_list_low_bw - - else: - # List of codec2 modes to use in "high bandwidth" mode. - self.mode_list = self.mode_list_high_bw - # list of times to wait for corresponding mode in seconds - self.time_list = self.time_list_high_bw - - self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode - self.states.set("arq_speed_level", self.speed_level) - - - self.is_IRS = False - self.burst_nack = False - self.burst_nack_counter = 0 - self.frame_nack_counter = 0 - self.frame_received_counter = 0 - - - # TIMEOUTS - self.transmission_timeout = 180 # transmission timeout in seconds - self.channel_busy_timeout = 3 # time how long we want to wait until channel busy state overrides - - - - - - - # START THE THREAD FOR THE TIMEOUT WATCHDOG - watchdog_thread = threading.Thread( - target=self.watchdog, name="watchdog", daemon=True - ) - watchdog_thread.start() - - arq_session_thread = threading.Thread( - target=self.heartbeat, name="watchdog", daemon=True - ) - arq_session_thread.start() - - def send_ident_frame(self, transmit) -> None: - """Build and send IDENT frame """ - ident_frame = bytearray(self.length_sig1_frame) - ident_frame[:1] = bytes([FR_TYPE.IDENT.value]) - ident_frame[1:self.length_sig1_frame] = self.mycallsign - - # Transmit frame - if transmit: - self.enqueue_frame_for_tx([ident_frame], c2_mode=FREEDV_MODE.sig0.value) - else: - return ident_frame - - def send_disconnect_frame(self) -> None: - """Build and send a disconnect frame""" - disconnection_frame = bytearray(self.length_sig1_frame) - disconnection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_CLOSE.value]) - disconnection_frame[1:2] = self.session_id - disconnection_frame[2:5] = self.dxcallsign_crc - - # wait if we have a channel busy condition - if self.states.channel_busy: - self.channel_busy_handler() - - self.enqueue_frame_for_tx([disconnection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=3, repeat_delay=0) - - - def check_if_mode_fits_to_busy_slot(self): - """ - Check if actual mode is fitting into given busy state - - Returns: - - """ - mode_name = FREEDV_MODE(self.mode_list[self.speed_level]).name - mode_slots = FREEDV_MODE_USED_SLOTS[mode_name].value - if mode_slots in [self.states.channel_busy_slot]: - self.log.warning( - "[Modem] busy slot detection", - slots=self.states.channel_busy_slot, - mode_slots=mode_slots, - ) - return False - return True - - def arq_calculate_speed_level(self, snr): - current_speed_level = self.speed_level - self.frame_received_counter += 1 - # try increasing speed level only if we had two successful decodes - if self.frame_received_counter >= 2: - self.frame_received_counter = 0 - - # make sure new speed level isn't higher than available modes - new_speed_level = min(self.speed_level + 1, len(self.mode_list) - 1) - # check if actual snr is higher than minimum snr for next mode - if snr >= self.snr_list[new_speed_level]: - self.speed_level = new_speed_level - - - else: - self.log.info("[Modem] ARQ | increasing speed level not possible because of SNR limit", - given_snr=snr, - needed_snr=self.snr_list[new_speed_level] - ) - - # calculate if speed level fits to busy condition - if not self.check_if_mode_fits_to_busy_slot(): - self.speed_level = current_speed_level - - self.states.set("arq_speed_level", self.speed_level) - - # Update modes we are listening to - self.set_listening_modes(False, True, self.mode_list[self.speed_level]) - - self.log.debug( - "[Modem] calculated speed level", - speed_level=self.speed_level, - given_snr=snr, - min_snr=self.snr_list[self.speed_level], - ) - - - - - - - # for i in range(0, 6, 2): - # if not missing_area[i: i + 2].endswith(b"\x00\x00"): - # self.rpt_request_buffer.insert(0, missing_area[i: i + 2]) - - - - - - ########################################################################################################## - # ARQ DATA CHANNEL HANDLER - ########################################################################################################## - - - - - - def stop_transmission(self) -> None: - """ - Force a stop of the running transmission - """ - self.log.warning("[Modem] Stopping transmission!") - - self.event_manager.send_custom_event( - freedata="modem-message", - arq="transmission", - status="stopped", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8') - ) - - stop_frame = bytearray(self.length_sig0_frame) - stop_frame[:1] = bytes([FR_TYPE.ARQ_STOP.value]) - stop_frame[1:4] = self.dxcallsign_crc - stop_frame[4:7] = self.mycallsign_crc - # TODO Not sure if we really need the session id when disconnecting - # stop_frame[1:2] = self.session_id - stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) - self.enqueue_frame_for_tx([stop_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) - - self.arq_cleanup() - - def received_stop_transmission( - self, deconstructed_frame: list - ) -> None: # pylint: disable=unused-argument - """ - Received a transmission stop - """ - self.log.warning("[Modem] Stopping transmission!") - self.states.set("is_modem_busy", False) - self.states.set("is_arq_state", False) - self.event_manager.send_custom_event( - freedata="modem-message", - arq="transmission", - status="stopped", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - uuid=self.transmission_uuid - ) - self.arq_cleanup() - - def channel_busy_handler(self): - """ - function for handling the channel busy situation - Args: - - Returns: - """ - self.log.warning("[Modem] Channel busy, waiting until free...") - self.event_manager.send_custom_event( - freedata="modem-message", - channel="busy", - status="waiting", - ) - - # wait while timeout not reached and our busy state is busy - channel_busy_timeout = time.time() + self.channel_busy_timeout - while self.states.channel_busy and time.time() < channel_busy_timeout and not self.check_if_mode_fits_to_busy_slot(): - threading.Event().wait(0.01) - - # ------------ CALCULATE TRANSFER RATES - - - def reset_statistics(self) -> None: - """ - Reset statistics - """ - # reset ARQ statistics - self.states.set("bytes_per_minute_burst", 0) - self.states.set("arq_total_bytes", 0) - self.states.set("self.states.arq_seconds_until_finish", 0) - self.states.set("arq_bits_per_second", 0) - self.states.set("bytes_per_minute", 0) - self.states.set("arq_transmission_percent", 0) - self.states.set("arq_compression_factor", 0) - - - # ----------------------CLEANUP AND RESET FUNCTIONS - def arq_cleanup(self) -> None: - """ - Cleanup function which clears all ARQ states - """ - - # TODO - # We need to check if we are in a ARQ session - # Then we cant delete the session_id for now - self.states.delete_arq_instance_by_id(self.session_id) - - - if TESTMODE: - self.log.debug("[Modem] TESTMODE: arq_cleanup: Not performing cleanup.") - return - - self.log.debug("[Modem] arq_cleanup") - # wait a second for smoother arq behaviour - helpers.wait(1.0) - - self.rx_frame_bof_received = False - self.rx_frame_eof_received = False - self.burst_ack = False - self.rpt_request_received = False - self.burst_rpt_counter = 0 - self.data_frame_ack_received = False - self.arq_rx_burst_buffer = [] - self.arq_rx_frame_buffer = b"" - self.burst_ack_snr = 0 - self.arq_burst_last_payload = 0 - self.rx_n_frame_of_burst = 0 - self.rx_n_frames_per_burst = 0 - - # reset modem receiving state to reduce cpu load - modem.demodulator.RECEIVE_SIG0 = True - modem.demodulator.RECEIVE_SIG1 = False - modem.demodulator.RECEIVE_DATAC1 = False - modem.demodulator.RECEIVE_DATAC3 = False - modem.demodulator.RECEIVE_DATAC4 = False - # modem.demodulator.RECEIVE_FSK_LDPC_0 = False - modem.demodulator.RECEIVE_FSK_LDPC_1 = False - - self.is_IRS = False - self.burst_nack = False - self.burst_nack_counter = 0 - self.frame_nack_counter = 0 - self.frame_received_counter = 0 - self.speed_level = len(self.mode_list) - 1 - self.states.set("arq_speed_level", self.speed_level) - - # low bandwidth mode indicator - self.received_LOW_BANDWIDTH_MODE = False - - # reset retry counter for rx channel / burst - self.n_retries_per_burst = 0 - - # reset max retries possibly overriden by api - self.session_connect_max_retries = 10 - self.data_channel_max_retries = 10 - - self.states.set("arq_session_state", "disconnected") - self.states.arq_speed_list = [] - self.states.set("is_arq_state", False) - self.arq_state_event = threading.Event() - self.arq_file_transfer = False - - self.beacon_paused = False - # reset beacon interval timer for not directly starting beacon after ARQ - self.beacon_interval_timer = time.time() + self.beacon_interval - - def arq_reset_ack(self, state: bool) -> None: - """ - Funktion for resetting acknowledge states - Args: - state:bool: - - """ - self.burst_ack = state - self.rpt_request_received = state - self.data_frame_ack_received = state - - def set_listening_modes(self, enable_sig0: bool, enable_sig1: bool, mode: int) -> None: - # sourcery skip: extract-duplicate-method - """ - Function for setting the data modes we are listening to for saving cpu power - - Args: - enable_sig0:int: Enable/Disable signalling mode 0 - enable_sig1:int: Enable/Disable signalling mode 1 - mode:int: Codec2 mode to listen for - - """ - # set modes we want to listen to - modem.RECEIVE_SIG0 = enable_sig0 - modem.RECEIVE_SIG1 = enable_sig1 - - if mode == codec2.FREEDV_MODE.datac1.value: - modem.RECEIVE_DATAC1 = True - modem.RECEIVE_DATAC3 = False - modem.RECEIVE_DATAC4 = False - modem.RECEIVE_FSK_LDPC_1 = False - self.log.debug("[Modem] Changing listening data mode", mode="datac1") - elif mode == codec2.FREEDV_MODE.datac3.value: - modem.RECEIVE_DATAC1 = False - modem.RECEIVE_DATAC3 = True - modem.RECEIVE_DATAC4 = False - modem.RECEIVE_FSK_LDPC_1 = False - self.log.debug("[Modem] Changing listening data mode", mode="datac3") - elif mode == codec2.FREEDV_MODE.datac4.value: - modem.RECEIVE_DATAC1 = False - modem.RECEIVE_DATAC3 = False - modem.RECEIVE_DATAC4 = True - modem.RECEIVE_FSK_LDPC_1 = False - self.log.debug("[Modem] Changing listening data mode", mode="datac4") - elif mode == codec2.FREEDV_MODE.fsk_ldpc_1.value: - modem.RECEIVE_DATAC1 = False - modem.RECEIVE_DATAC3 = False - modem.RECEIVE_DATAC4 = False - modem.RECEIVE_FSK_LDPC_1 = True - self.log.debug("[Modem] Changing listening data mode", mode="fsk_ldpc_1") - else: - modem.RECEIVE_DATAC1 = True - modem.RECEIVE_DATAC3 = True - modem.RECEIVE_DATAC4 = True - modem.RECEIVE_FSK_LDPC_1 = True - self.log.debug( - "[Modem] Changing listening data mode", mode="datac1/datac3/fsk_ldpc" - ) - - # ------------------------- WATCHDOG FUNCTIONS FOR TIMER - def watchdog(self) -> None: - """Author: DJ2LS - - Watchdog master function. From here, "pet" the watchdogs - - """ - while True: - threading.Event().wait(0.1) - self.data_channel_keep_alive_watchdog() - self.burst_watchdog() - self.arq_session_keep_alive_watchdog() - - def burst_watchdog(self) -> None: - """ - Watchdog which checks if we are running into a connection timeout - DATA BURST - """ - # IRS SIDE - # TODO We need to redesign this part for cleaner state handling - # Return if not ARQ STATE and not ARQ SESSION STATE as they are different use cases - if ( - not self.states.is_arq_state - and self.states.arq_session_state != "connected" - or not self.is_IRS - ): - return - - # get modem error state - modem_error_state = modem.get_modem_error_state() - - # We want to reach this state only if connected ( == return above not called ) - if self.rx_n_frames_per_burst > 1: - # uses case for IRS: reduce time for waiting by counting "None" in burst buffer - frames_left = self.arq_rx_burst_buffer.count(None) - elif self.rx_n_frame_of_burst == 0 and self.rx_n_frames_per_burst == 0: - # use case for IRS: We didn't receive a burst yet, because the first one got lost - # in this case we don't have any information about the expected burst length - # we must assume, we are getting a burst with max_n_frames_per_burst - frames_left = self.max_n_frames_per_burst - else: - frames_left = 1 - - # make sure we don't have a 0 here for avoiding too short timeouts - if frames_left == 0: - frames_left = 1 - - # timeout is reached, if we didnt receive data, while we waited - # for the corresponding data frame + the transmitted signalling frame of ack/nack - # + a small offset of about 1 second - timeout = \ - ( - self.burst_last_received - + (self.time_list[self.speed_level] * frames_left) - + self.duration_sig0_frame - + self.channel_busy_timeout - + 1 - ) - - # override calculation - # if we reached 2/3 of the waiting time and didnt received a signal - # then send NACK earlier - time_left = timeout - time.time() - waiting_time = (self.time_list[ - self.speed_level] * frames_left) + self.duration_sig0_frame + self.channel_busy_timeout + 1 - timeout_percent = 100 - (time_left / waiting_time * 100) - # timeout_percent = 0 - if timeout_percent >= 75 and not self.states.is_codec2_traffic and not self.states.isTransmitting(): - override = True - else: - override = False - - # TODO Enable this for development - print( - f"timeout expected in:{round(timeout - time.time())} | timeout percent: {timeout_percent} | frames left: {frames_left} of {self.rx_n_frames_per_burst} | speed level: {self.speed_level}") - # if timeout is expired, but we are receivingt codec2 data, - # better wait some more time because data might be important for us - # reason for this situation can be delays on IRS and ISS, maybe because both had a busy channel condition. - # Nevertheless, we need to keep timeouts short for efficiency - if timeout <= time.time() or modem_error_state and not self.states.is_codec2_traffic and not self.states.isTransmitting() or override: - self.log.warning( - "[Modem] Burst decoding error or timeout", - attempt=self.n_retries_per_burst, - max_attempts=self.rx_n_max_retries_per_burst, - speed_level=self.speed_level, - modem_error_state=modem_error_state - ) - - print( - f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {self.arq_rx_burst_buffer.count(None)}") - # check if we have N frames per burst > 1 - if self.rx_n_frames_per_burst > 1 and self.burst_rpt_counter < 3 and self.arq_rx_burst_buffer.count( - None) > 0: - # reset self.burst_last_received - self.burst_last_received = time.time() + self.time_list[self.speed_level] * frames_left - self.burst_rpt_counter += 1 - self.send_retransmit_request_frame() - - else: - - # reset self.burst_last_received counter - self.burst_last_received = time.time() - - # reduce speed level if nack counter increased - self.frame_received_counter = 0 - self.burst_nack_counter += 1 - if self.burst_nack_counter >= 2: - self.burst_nack_counter = 0 - self.speed_level = max(self.speed_level - 1, 0) - self.states.set("arq_speed_level", self.speed_level) - - # TODO Create better mechanisms for handling n frames per burst for bad channels - # reduce frames per burst - if self.burst_rpt_counter >= 2: - tx_n_frames_per_burst = max(self.rx_n_frames_per_burst - 1, 1) - else: - tx_n_frames_per_burst = self.rx_n_frames_per_burst - - # Update modes we are listening to - self.set_listening_modes(True, True, self.mode_list[self.speed_level]) - - # TODO Does SNR make sense for NACK if we dont have an actual SNR information? - self.send_burst_nack_frame_watchdog(tx_n_frames_per_burst) - - # Update data_channel timestamp - # TODO Disabled this one for testing. - # self.data_channel_last_received = time.time() - self.n_retries_per_burst += 1 - else: - # debugging output - # print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time()) - pass - - if self.n_retries_per_burst >= self.rx_n_max_retries_per_burst: - self.stop_transmission() - - def data_channel_keep_alive_watchdog(self) -> None: - """ - watchdog which checks if we are running into a connection timeout - DATA CHANNEL - """ - # and not static.ARQ_SEND_KEEP_ALIVE: - if self.states.is_arq_state and self.states.is_modem_busy: - threading.Event().wait(0.01) - if ( - self.data_channel_last_received + self.transmission_timeout - > time.time() - ): - - timeleft = int((self.data_channel_last_received + self.transmission_timeout) - time.time()) - self.states.set("arq_seconds_until_timeout", timeleft) - if timeleft % 10 == 0: - self.log.debug("Time left until channel timeout", seconds=timeleft) - - # threading.Event().wait(5) - # print(self.data_channel_last_received + self.transmission_timeout - time.time()) - # pass - else: - # Clear the timeout timestamp - self.data_channel_last_received = 0 - self.log.info( - "[Modem] DATA [" - + str(self.mycallsign, "UTF-8") - + "]<>[" - + str(self.dxcallsign, "UTF-8") - + "]" - ) - self.event_manager.send_custom_event( - freedata="modem-message", - arq="transmission", - status="failed", - uuid=self.transmission_uuid, - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - irs=helpers.bool_to_string(self.is_IRS) - ) - self.arq_cleanup() - - def arq_session_keep_alive_watchdog(self) -> None: - """ - watchdog which checks if we are running into a connection timeout - ARQ SESSION - """ - if ( - self.states.is_arq_session - and self.states.is_modem_busy - and not self.arq_file_transfer - ): - if self.arq_session_last_received + self.arq_session_timeout > time.time(): - threading.Event().wait(0.01) - else: - self.log.info( - "[Modem] SESSION [" - + str(self.mycallsign, "UTF-8") - + "]<>[" - + str(self.dxcallsign, "UTF-8") - + "]" - ) - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="failed", - reason="timeout", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) - self.close_session() - - def heartbeat(self) -> None: - """ - Heartbeat thread which auto pauses and resumes the heartbeat signal when in an arq session - """ - while True: - threading.Event().wait(0.01) - # additional check for smoother stopping if heartbeat transmission - while not self.arq_file_transfer: - threading.Event().wait(0.01) - if ( - self.states.is_arq_session - and self.IS_ARQ_SESSION_MASTER - and self.states.arq_session_state == "connected" - # and not self.arq_file_transfer - ): - threading.Event().wait(1) - self.transmit_session_heartbeat() - threading.Event().wait(2) - diff --git a/modem/protocol_arq_connection.py b/modem/protocol_arq_connection.py new file mode 100644 index 00000000..cf9bdc1e --- /dev/null +++ b/modem/protocol_arq_connection.py @@ -0,0 +1,309 @@ + +import time +import helpers +from codec2 import FREEDV_MODE +from modem_frametypes import FRAME_TYPE as FR_TYPE + +from protocol_arq_session import ARQ + + +class SESSION(ARQ): + + def __init__(self, config, event_queue, states): + super().__init__(config, event_queue, states) + + def received_session_close(self, data_in: bytes, snr): + """ + Closes the session when a close session frame is received and + the DXCALLSIGN_CRC matches the remote station participating in the session. + + Args: + data_in:bytes: + + """ + # We've arrived here from process_data which already checked that the frame + # is intended for this station. + # Close the session if the CRC matches the remote station in static. + _valid_crc, mycallsign = helpers.check_callsign(self.mycallsign, bytes(data_in[2:5]), self.ssid_list) + _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) + if (_valid_crc or _valid_session) and self.states.arq_session_state not in ["disconnected"]: + self.states.set("arq_session_state", "disconnected") + self.dxgrid = b'------' + helpers.add_to_heard_stations( + self.dxcallsign, + self.dxgrid, + "DATA", + snr, + self.modem_frequency_offset, + self.states.radio_frequency, + self.states.heard_stations + ) + self.log.info( + "[Modem] SESSION [" + + str(self.mycallsign, "UTF-8") + + "]<>[" + + str(self.dxcallsign, "UTF-8") + + "]", + self.states.arq_session_state, + ) + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="close", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.IS_ARQ_SESSION_MASTER = False + self.states.is_arq_session = False + self.arq_cleanup() + + def transmit_session_heartbeat(self) -> None: + """Send ARQ sesion heartbeat while connected""" + # self.states.is_arq_session = True + # self.states.set("is_modem_busy", True) + # self.states.set("arq_session_state", "connected") + + connection_frame = bytearray(self.length_sig0_frame) + connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value]) + connection_frame[1:2] = self.session_id + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="connected", + heartbeat="transmitting", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) + + def received_session_heartbeat(self, data_in: bytes, snr) -> None: + """ + Received an ARQ session heartbeat, record and update state accordingly. + Args: + data_in:bytes: + + """ + # Accept session data if the DXCALLSIGN_CRC matches the station in static or session id. + _valid_crc, _ = helpers.check_callsign(self.dxcallsign, bytes(data_in[4:7]), self.ssid_list) + _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) + if _valid_crc or _valid_session and self.states.arq_session_state in ["connected", "connecting"]: + self.log.debug("[Modem] Received session heartbeat") + self.dxgrid = b'------' + helpers.add_to_heard_stations( + self.dxcallsign, + self.dxgrid, + "SESSION-HB", + snr, + self.modem_frequency_offset, + self.states.radio_frequency, + self.states.heard_stations + ) + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="connected", + heartbeat="received", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.states.is_arq_session = True + self.states.set("arq_session_state", "connected") + self.states.set("is_modem_busy", True) + + # Update the timeout timestamps + self.arq_session_last_received = int(time.time()) + self.data_channel_last_received = int(time.time()) + # transmit session heartbeat only + # -> if not session master + # --> this will be triggered by heartbeat watchdog + # -> if not during a file transfer + # -> if ARQ_SESSION_STATE != disconnecting, disconnected, failed + # --> to avoid heartbeat toggle loops while disconnecting + if ( + not self.IS_ARQ_SESSION_MASTER + and not self.arq_file_transfer + and self.states.arq_session_state != 'disconnecting' + and self.states.arq_session_state != 'disconnected' + and self.states.arq_session_state != 'failed' + ): + self.transmit_session_heartbeat() + + + def close_session(self) -> None: + """Close the ARQ session""" + self.states.set("arq_session_state", "disconnecting") + + self.log.info( + "[Modem] SESSION [" + + str(self.mycallsign, "UTF-8") + + "]<>[" + + str(self.dxcallsign, "UTF-8") + + "]", + self.states.arq_session_state, + ) + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="close", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.IS_ARQ_SESSION_MASTER = False + self.states.is_arq_session = False + + # we need to send disconnect frame before doing arq cleanup + # we would lose our session id then + self.send_disconnect_frame() + + # transmit morse identifier if configured + if self.enable_morse_identifier: + MODEM_TRANSMIT_QUEUE.put(["morse", 1, 0, self.mycallsign]) + self.arq_cleanup() + + def open_session(self) -> bool: + """ + Create and send the frame to request a connection. + + Returns: + True if the session was opened successfully + False if the session open request failed + """ + self.IS_ARQ_SESSION_MASTER = True + self.states.set("arq_session_state", "connecting") + + # create a random session id + self.session_id = np.random.bytes(1) + + # build connection frame + connection_frame = self.frame_factory.build_arq_session_connect( + session_id=self.session_id, + destination_crc=self.dxcallsign_crc, + ) + while not self.states.is_arq_session: + threading.Event().wait(0.01) + for attempt in range(self.session_connect_max_retries): + self.log.info( + "[Modem] SESSION [" + + str(self.mycallsign, "UTF-8") + + "]>>?<<[" + + str(self.dxcallsign, "UTF-8") + + "]", + a=f"{str(attempt + 1)}/{str(self.session_connect_max_retries)}", + state=self.states.arq_session_state, + ) + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="connecting", + attempt=attempt + 1, + maxattempts=self.session_connect_max_retries, + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) + + # Wait for a time, looking to see if `self.states.is_arq_session` + # indicates we've received a positive response from the far station. + timeout = time.time() + 3 + while time.time() < timeout: + threading.Event().wait(0.01) + # Stop waiting if data channel is opened + if self.states.is_arq_session: + return True + + # Stop waiting and interrupt if data channel is getting closed while opening + if self.states.arq_session_state == "disconnecting": + # disabled this session close as its called twice + # self.close_session() + return False + + # Session connect timeout, send close_session frame to + # attempt to clean up the far-side, if it received the + # open_session frame and can still hear us. + if not self.states.is_arq_session: + self.close_session() + return False + + # Given the while condition, it will only exit when `self.states.is_arq_session` is True + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="connected", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + return True + + def received_session_opener(self, data_in: bytes, snr) -> None: + """ + Received a session open request packet. + + Args: + data_in:bytes: + """ + # if we don't want to respond to calls, return False + if not self.respond_to_call: + return False + + # ignore channel opener if already in ARQ STATE + # use case: Station A is connecting to Station B while + # Station B already tries connecting to Station A. + # For avoiding ignoring repeated connect request in case of packet loss + # we are only ignoring packets in case we are ISS + if self.states.is_arq_session and self.IS_ARQ_SESSION_MASTER: + return False + + self.IS_ARQ_SESSION_MASTER = False + self.states.set("arq_session_state", "connecting") + + # Update arq_session timestamp + self.arq_session_last_received = int(time.time()) + + self.session_id = bytes(data_in[1:2]) + self.dxcallsign_crc = bytes(data_in[5:8]) + self.dxcallsign = helpers.bytes_to_callsign(bytes(data_in[8:14])) + self.states.set("dxcallsign", self.dxcallsign) + + # check if callsign ssid override + valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[2:5], self.ssid_list) + self.mycallsign = mycallsign + self.dxgrid = b'------' + helpers.add_to_heard_stations( + self.dxcallsign, + self.dxgrid, + "DATA", + snr, + self.modem_frequency_offset, + self.states.radio_frequency, + self.states.heard_stations + ) + self.log.info( + "[Modem] SESSION [" + + str(self.mycallsign, "UTF-8") + + "]>>|<<[" + + str(self.dxcallsign, "UTF-8") + + "]", + self.states.arq_session_state, + ) + self.states.is_arq_session = True + self.states.set("is_modem_busy", True) + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="session", + status="connected", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + ) + + self.transmit_session_heartbeat() diff --git a/modem/protocol_arq_session.py b/modem/protocol_arq_session.py index a6f96b3b..e0dcc916 100644 --- a/modem/protocol_arq_session.py +++ b/modem/protocol_arq_session.py @@ -1,309 +1,736 @@ +import threading import time +import codec2 import helpers -from codec2 import FREEDV_MODE +import modem +import stats +import structlog +from data_frame_factory import DataFrameFactory +from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS from modem_frametypes import FRAME_TYPE as FR_TYPE +import event_manager -from protocol_arq import ARQ - - -class SESSION(ARQ): - +TESTMODE = False +class ARQ: def __init__(self, config, event_queue, states): - super().__init__(config, event_queue, states) - def received_session_close(self, data_in: bytes, snr): - """ - Closes the session when a close session frame is received and - the DXCALLSIGN_CRC matches the remote station participating in the session. + self.log = structlog.get_logger("DHARQ") + self.event_queue = event_queue + self.states = states + self.event_manager = event_manager.EventManager([event_queue]) - Args: - data_in:bytes: + self.frame_factory = DataFrameFactory(config) - """ - # We've arrived here from process_data which already checked that the frame - # is intended for this station. - # Close the session if the CRC matches the remote station in static. - _valid_crc, mycallsign = helpers.check_callsign(self.mycallsign, bytes(data_in[2:5]), self.ssid_list) - _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) - if (_valid_crc or _valid_session) and self.states.arq_session_state not in ["disconnected"]: - self.states.set("arq_session_state", "disconnected") - self.dxgrid = b'------' - helpers.add_to_heard_stations( - self.dxcallsign, - self.dxgrid, - "DATA", - snr, - self.modem_frequency_offset, - self.states.radio_frequency, - self.states.heard_stations - ) - self.log.info( - "[Modem] SESSION [" - + str(self.mycallsign, "UTF-8") - + "]<>[" - + str(self.dxcallsign, "UTF-8") - + "]", - self.states.arq_session_state, - ) + # ARQ PROTOCOL VERSION + # v.5 - signalling frame uses datac0 + # v.6 - signalling frame uses datac13 + # v.7 - adjusting ARQ timeout + # v.8 - adjusting ARQ structure + self.arq_protocol_version = 8 - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="close", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) + self.stats = stats.stats(config, event_queue, states) - self.IS_ARQ_SESSION_MASTER = False - self.states.is_arq_session = False - self.arq_cleanup() + # load config + self.mycallsign = config['STATION']['mycall'] + self.myssid = config['STATION']['myssid'] + self.mycallsign += "-" + str(self.myssid) + encoded_call = helpers.callsign_to_bytes(self.mycallsign) + self.mycallsign = helpers.bytes_to_callsign(encoded_call) + self.ssid_list = config['STATION']['ssid_list'] + self.mycallsign_crc = helpers.get_crc_24(self.mycallsign) + self.mygrid = config['STATION']['mygrid'] + self.enable_fsk = config['MODEM']['enable_fsk'] + self.respond_to_cq = config['MODEM']['respond_to_cq'] + self.enable_hmac = config['MODEM']['enable_hmac'] + self.enable_stats = config['STATION']['enable_stats'] + self.enable_morse_identifier = config['MODEM']['enable_morse_identifier'] + self.arq_rx_buffer_size = config['MODEM']['rx_buffer_size'] + self.enable_experimental_features = False + # flag to indicate if modem running in low bandwidth mode + self.low_bandwidth_mode = config["MODEM"]["enable_low_bandwidth_mode"] - def transmit_session_heartbeat(self) -> None: - """Send ARQ sesion heartbeat while connected""" - # self.states.is_arq_session = True - # self.states.set("is_modem_busy", True) - # self.states.set("arq_session_state", "connected") - - connection_frame = bytearray(self.length_sig0_frame) - connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value]) - connection_frame[1:2] = self.session_id - - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="connected", - heartbeat="transmitting", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) - - self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - - def received_session_heartbeat(self, data_in: bytes, snr) -> None: - """ - Received an ARQ session heartbeat, record and update state accordingly. - Args: - data_in:bytes: - - """ - # Accept session data if the DXCALLSIGN_CRC matches the station in static or session id. - _valid_crc, _ = helpers.check_callsign(self.dxcallsign, bytes(data_in[4:7]), self.ssid_list) - _valid_session = helpers.check_session_id(self.session_id, bytes(data_in[1:2])) - if _valid_crc or _valid_session and self.states.arq_session_state in ["connected", "connecting"]: - self.log.debug("[Modem] Received session heartbeat") - self.dxgrid = b'------' - helpers.add_to_heard_stations( - self.dxcallsign, - self.dxgrid, - "SESSION-HB", - snr, - self.modem_frequency_offset, - self.states.radio_frequency, - self.states.heard_stations - ) - - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="connected", - heartbeat="received", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) - - self.states.is_arq_session = True - self.states.set("arq_session_state", "connected") - self.states.set("is_modem_busy", True) - - # Update the timeout timestamps - self.arq_session_last_received = int(time.time()) - self.data_channel_last_received = int(time.time()) - # transmit session heartbeat only - # -> if not session master - # --> this will be triggered by heartbeat watchdog - # -> if not during a file transfer - # -> if ARQ_SESSION_STATE != disconnecting, disconnected, failed - # --> to avoid heartbeat toggle loops while disconnecting - if ( - not self.IS_ARQ_SESSION_MASTER - and not self.arq_file_transfer - and self.states.arq_session_state != 'disconnecting' - and self.states.arq_session_state != 'disconnected' - and self.states.arq_session_state != 'failed' - ): - self.transmit_session_heartbeat() + # Enable general responding to channel openers for example + # this can be combined with a callsign blacklist for example + self.respond_to_call = True - def close_session(self) -> None: - """Close the ARQ session""" - self.states.set("arq_session_state", "disconnecting") + self.modem_frequency_offset = 0 - self.log.info( - "[Modem] SESSION [" - + str(self.mycallsign, "UTF-8") - + "]<>[" - + str(self.dxcallsign, "UTF-8") - + "]", - self.states.arq_session_state, - ) + self.dxcallsign = b"ZZ9YY-0" + self.dxcallsign_crc = b'' + self.dxgrid = b'' - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="close", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) + # length of signalling frame + self.length_sig0_frame = 14 + self.length_sig1_frame = 14 + # duration of frames + self.duration_datac4 = 5.17 + self.duration_datac13 = 2.0 + self.duration_datac1 = 4.18 + self.duration_datac3 = 3.19 + self.duration_sig0_frame = self.duration_datac13 + self.duration_sig1_frame = self.duration_datac13 + self.longest_duration = self.duration_datac4 + + # hold session id + self.session_id = bytes(1) + + # ------- ARQ SESSION + self.arq_file_transfer = False self.IS_ARQ_SESSION_MASTER = False - self.states.is_arq_session = False + self.arq_session_last_received = 0 + self.arq_session_timeout = 30 + self.session_connect_max_retries = 10 - # we need to send disconnect frame before doing arq cleanup - # we would lose our session id then - self.send_disconnect_frame() + self.arq_compression_factor = 0 - # transmit morse identifier if configured - if self.enable_morse_identifier: - MODEM_TRANSMIT_QUEUE.put(["morse", 1, 0, self.mycallsign]) - self.arq_cleanup() + self.transmission_uuid = "" - def open_session(self) -> bool: + self.burst_last_received = 0.0 # time of last "live sign" of a burst + self.data_channel_last_received = 0.0 # time of last "live sign" of a frame + + # Flag to indicate if we received an ACK frame for a burst + self.burst_ack = False + # Flag to indicate if we received an ACK frame for a data frame + self.data_frame_ack_received = False + # Flag to indicate if we received a request for repeater frames + self.rpt_request_received = False + self.rpt_request_buffer = [] # requested frames, saved in a list + self.burst_rpt_counter = 0 + + + # 3 bytes for the BOF Beginning of File indicator in a data frame + self.data_frame_bof = b"BOF" + # 3 bytes for the EOF End of File indicator in a data frame + self.data_frame_eof = b"EOF" + + + + + + self.n_retries_per_burst = 0 + self.max_n_frames_per_burst = 1 + + # Flag to indicate if we received a low bandwidth mode channel opener + self.received_LOW_BANDWIDTH_MODE = False + + + self.data_channel_max_retries = 15 + + # event for checking arq_state_event + self.arq_state_event = threading.Event() + # -------------- AVAILABLE MODES START----------- + # IMPORTANT: LISTS MUST BE OF EQUAL LENGTH + + # --------------------- LOW BANDWIDTH + + # List of codec2 modes to use in "low bandwidth" mode. + self.mode_list_low_bw = [ + FREEDV_MODE.datac4.value, + ] + # List for minimum SNR operating level for the corresponding mode in self.mode_list + self.snr_list_low_bw = [-100] + # List for time to wait for corresponding mode in seconds + self.time_list_low_bw = [self.duration_datac4] + + # --------------------- HIGH BANDWIDTH + + # List of codec2 modes to use in "high bandwidth" mode. + self.mode_list_high_bw = [ + FREEDV_MODE.datac4.value, + FREEDV_MODE.datac3.value, + FREEDV_MODE.datac1.value, + ] + # List for minimum SNR operating level for the corresponding mode in self.mode_list + self.snr_list_high_bw = [-100, 0, 3] + # List for time to wait for corresponding mode in seconds + # test with 6,7 --> caused sometimes a frame timeout if ack frame takes longer + # TODO Need to check why ACK frames needs more time + # TODO Adjust these times + self.time_list_high_bw = [self.duration_datac4, self.duration_datac3, self.duration_datac1] + # -------------- AVAILABLE MODES END----------- + + # Mode list for selecting between low bandwidth ( 500Hz ) and modes with higher bandwidth + # but ability to fall back to low bandwidth modes if needed. + if self.low_bandwidth_mode: + # List of codec2 modes to use in "low bandwidth" mode. + self.mode_list = self.mode_list_low_bw + # list of times to wait for corresponding mode in seconds + self.time_list = self.time_list_low_bw + + else: + # List of codec2 modes to use in "high bandwidth" mode. + self.mode_list = self.mode_list_high_bw + # list of times to wait for corresponding mode in seconds + self.time_list = self.time_list_high_bw + + self.speed_level = len(self.mode_list) - 1 # speed level for selecting mode + self.states.set("arq_speed_level", self.speed_level) + + + self.is_IRS = False + self.burst_nack = False + self.burst_nack_counter = 0 + self.frame_nack_counter = 0 + self.frame_received_counter = 0 + + + # TIMEOUTS + self.transmission_timeout = 180 # transmission timeout in seconds + self.channel_busy_timeout = 3 # time how long we want to wait until channel busy state overrides + + + + + + + # START THE THREAD FOR THE TIMEOUT WATCHDOG + watchdog_thread = threading.Thread( + target=self.watchdog, name="watchdog", daemon=True + ) + watchdog_thread.start() + + arq_session_thread = threading.Thread( + target=self.heartbeat, name="watchdog", daemon=True + ) + arq_session_thread.start() + + def send_ident_frame(self, transmit) -> None: + """Build and send IDENT frame """ + ident_frame = bytearray(self.length_sig1_frame) + ident_frame[:1] = bytes([FR_TYPE.IDENT.value]) + ident_frame[1:self.length_sig1_frame] = self.mycallsign + + # Transmit frame + if transmit: + self.enqueue_frame_for_tx([ident_frame], c2_mode=FREEDV_MODE.sig0.value) + else: + return ident_frame + + def send_disconnect_frame(self) -> None: + """Build and send a disconnect frame""" + disconnection_frame = bytearray(self.length_sig1_frame) + disconnection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_CLOSE.value]) + disconnection_frame[1:2] = self.session_id + disconnection_frame[2:5] = self.dxcallsign_crc + + # wait if we have a channel busy condition + if self.states.channel_busy: + self.channel_busy_handler() + + self.enqueue_frame_for_tx([disconnection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=3, repeat_delay=0) + + + def check_if_mode_fits_to_busy_slot(self): """ - Create and send the frame to request a connection. + Check if actual mode is fitting into given busy state Returns: - True if the session was opened successfully - False if the session open request failed + """ - self.IS_ARQ_SESSION_MASTER = True - self.states.set("arq_session_state", "connecting") + mode_name = FREEDV_MODE(self.mode_list[self.speed_level]).name + mode_slots = FREEDV_MODE_USED_SLOTS[mode_name].value + if mode_slots in [self.states.channel_busy_slot]: + self.log.warning( + "[Modem] busy slot detection", + slots=self.states.channel_busy_slot, + mode_slots=mode_slots, + ) + return False + return True - # create a random session id - self.session_id = np.random.bytes(1) + def arq_calculate_speed_level(self, snr): + current_speed_level = self.speed_level + self.frame_received_counter += 1 + # try increasing speed level only if we had two successful decodes + if self.frame_received_counter >= 2: + self.frame_received_counter = 0 - # build connection frame - connection_frame = self.frame_factory.build_arq_session_connect( - session_id=self.session_id, - destination_crc=self.dxcallsign_crc, + # make sure new speed level isn't higher than available modes + new_speed_level = min(self.speed_level + 1, len(self.mode_list) - 1) + # check if actual snr is higher than minimum snr for next mode + if snr >= self.snr_list[new_speed_level]: + self.speed_level = new_speed_level + + + else: + self.log.info("[Modem] ARQ | increasing speed level not possible because of SNR limit", + given_snr=snr, + needed_snr=self.snr_list[new_speed_level] + ) + + # calculate if speed level fits to busy condition + if not self.check_if_mode_fits_to_busy_slot(): + self.speed_level = current_speed_level + + self.states.set("arq_speed_level", self.speed_level) + + # Update modes we are listening to + self.set_listening_modes(False, True, self.mode_list[self.speed_level]) + + self.log.debug( + "[Modem] calculated speed level", + speed_level=self.speed_level, + given_snr=snr, + min_snr=self.snr_list[self.speed_level], ) - while not self.states.is_arq_session: + + + + + + + # for i in range(0, 6, 2): + # if not missing_area[i: i + 2].endswith(b"\x00\x00"): + # self.rpt_request_buffer.insert(0, missing_area[i: i + 2]) + + + + + + ########################################################################################################## + # ARQ DATA CHANNEL HANDLER + ########################################################################################################## + + + + + + def stop_transmission(self) -> None: + """ + Force a stop of the running transmission + """ + self.log.warning("[Modem] Stopping transmission!") + + self.event_manager.send_custom_event( + freedata="modem-message", + arq="transmission", + status="stopped", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8') + ) + + stop_frame = bytearray(self.length_sig0_frame) + stop_frame[:1] = bytes([FR_TYPE.ARQ_STOP.value]) + stop_frame[1:4] = self.dxcallsign_crc + stop_frame[4:7] = self.mycallsign_crc + # TODO Not sure if we really need the session id when disconnecting + # stop_frame[1:2] = self.session_id + stop_frame[7:13] = helpers.callsign_to_bytes(self.mycallsign) + self.enqueue_frame_for_tx([stop_frame], c2_mode=FREEDV_MODE.sig1.value, copies=3, repeat_delay=0) + + self.arq_cleanup() + + def received_stop_transmission( + self, deconstructed_frame: list + ) -> None: # pylint: disable=unused-argument + """ + Received a transmission stop + """ + self.log.warning("[Modem] Stopping transmission!") + self.states.set("is_modem_busy", False) + self.states.set("is_arq_state", False) + self.event_manager.send_custom_event( + freedata="modem-message", + arq="transmission", + status="stopped", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + uuid=self.transmission_uuid + ) + self.arq_cleanup() + + def channel_busy_handler(self): + """ + function for handling the channel busy situation + Args: + + Returns: + """ + self.log.warning("[Modem] Channel busy, waiting until free...") + self.event_manager.send_custom_event( + freedata="modem-message", + channel="busy", + status="waiting", + ) + + # wait while timeout not reached and our busy state is busy + channel_busy_timeout = time.time() + self.channel_busy_timeout + while self.states.channel_busy and time.time() < channel_busy_timeout and not self.check_if_mode_fits_to_busy_slot(): threading.Event().wait(0.01) - for attempt in range(self.session_connect_max_retries): + + # ------------ CALCULATE TRANSFER RATES + + + def reset_statistics(self) -> None: + """ + Reset statistics + """ + # reset ARQ statistics + self.states.set("bytes_per_minute_burst", 0) + self.states.set("arq_total_bytes", 0) + self.states.set("self.states.arq_seconds_until_finish", 0) + self.states.set("arq_bits_per_second", 0) + self.states.set("bytes_per_minute", 0) + self.states.set("arq_transmission_percent", 0) + self.states.set("arq_compression_factor", 0) + + + # ----------------------CLEANUP AND RESET FUNCTIONS + def arq_cleanup(self) -> None: + """ + Cleanup function which clears all ARQ states + """ + + # TODO + # We need to check if we are in a ARQ session + # Then we cant delete the session_id for now + self.states.delete_arq_instance_by_id(self.session_id) + + + if TESTMODE: + self.log.debug("[Modem] TESTMODE: arq_cleanup: Not performing cleanup.") + return + + self.log.debug("[Modem] arq_cleanup") + # wait a second for smoother arq behaviour + helpers.wait(1.0) + + self.rx_frame_bof_received = False + self.rx_frame_eof_received = False + self.burst_ack = False + self.rpt_request_received = False + self.burst_rpt_counter = 0 + self.data_frame_ack_received = False + self.arq_rx_burst_buffer = [] + self.arq_rx_frame_buffer = b"" + self.burst_ack_snr = 0 + self.arq_burst_last_payload = 0 + self.rx_n_frame_of_burst = 0 + self.rx_n_frames_per_burst = 0 + + # reset modem receiving state to reduce cpu load + modem.demodulator.RECEIVE_SIG0 = True + modem.demodulator.RECEIVE_SIG1 = False + modem.demodulator.RECEIVE_DATAC1 = False + modem.demodulator.RECEIVE_DATAC3 = False + modem.demodulator.RECEIVE_DATAC4 = False + # modem.demodulator.RECEIVE_FSK_LDPC_0 = False + modem.demodulator.RECEIVE_FSK_LDPC_1 = False + + self.is_IRS = False + self.burst_nack = False + self.burst_nack_counter = 0 + self.frame_nack_counter = 0 + self.frame_received_counter = 0 + self.speed_level = len(self.mode_list) - 1 + self.states.set("arq_speed_level", self.speed_level) + + # low bandwidth mode indicator + self.received_LOW_BANDWIDTH_MODE = False + + # reset retry counter for rx channel / burst + self.n_retries_per_burst = 0 + + # reset max retries possibly overriden by api + self.session_connect_max_retries = 10 + self.data_channel_max_retries = 10 + + self.states.set("arq_session_state", "disconnected") + self.states.arq_speed_list = [] + self.states.set("is_arq_state", False) + self.arq_state_event = threading.Event() + self.arq_file_transfer = False + + self.beacon_paused = False + # reset beacon interval timer for not directly starting beacon after ARQ + self.beacon_interval_timer = time.time() + self.beacon_interval + + def arq_reset_ack(self, state: bool) -> None: + """ + Funktion for resetting acknowledge states + Args: + state:bool: + + """ + self.burst_ack = state + self.rpt_request_received = state + self.data_frame_ack_received = state + + def set_listening_modes(self, enable_sig0: bool, enable_sig1: bool, mode: int) -> None: + # sourcery skip: extract-duplicate-method + """ + Function for setting the data modes we are listening to for saving cpu power + + Args: + enable_sig0:int: Enable/Disable signalling mode 0 + enable_sig1:int: Enable/Disable signalling mode 1 + mode:int: Codec2 mode to listen for + + """ + # set modes we want to listen to + modem.RECEIVE_SIG0 = enable_sig0 + modem.RECEIVE_SIG1 = enable_sig1 + + if mode == codec2.FREEDV_MODE.datac1.value: + modem.RECEIVE_DATAC1 = True + modem.RECEIVE_DATAC3 = False + modem.RECEIVE_DATAC4 = False + modem.RECEIVE_FSK_LDPC_1 = False + self.log.debug("[Modem] Changing listening data mode", mode="datac1") + elif mode == codec2.FREEDV_MODE.datac3.value: + modem.RECEIVE_DATAC1 = False + modem.RECEIVE_DATAC3 = True + modem.RECEIVE_DATAC4 = False + modem.RECEIVE_FSK_LDPC_1 = False + self.log.debug("[Modem] Changing listening data mode", mode="datac3") + elif mode == codec2.FREEDV_MODE.datac4.value: + modem.RECEIVE_DATAC1 = False + modem.RECEIVE_DATAC3 = False + modem.RECEIVE_DATAC4 = True + modem.RECEIVE_FSK_LDPC_1 = False + self.log.debug("[Modem] Changing listening data mode", mode="datac4") + elif mode == codec2.FREEDV_MODE.fsk_ldpc_1.value: + modem.RECEIVE_DATAC1 = False + modem.RECEIVE_DATAC3 = False + modem.RECEIVE_DATAC4 = False + modem.RECEIVE_FSK_LDPC_1 = True + self.log.debug("[Modem] Changing listening data mode", mode="fsk_ldpc_1") + else: + modem.RECEIVE_DATAC1 = True + modem.RECEIVE_DATAC3 = True + modem.RECEIVE_DATAC4 = True + modem.RECEIVE_FSK_LDPC_1 = True + self.log.debug( + "[Modem] Changing listening data mode", mode="datac1/datac3/fsk_ldpc" + ) + + # ------------------------- WATCHDOG FUNCTIONS FOR TIMER + def watchdog(self) -> None: + """Author: DJ2LS + + Watchdog master function. From here, "pet" the watchdogs + + """ + while True: + threading.Event().wait(0.1) + self.data_channel_keep_alive_watchdog() + self.burst_watchdog() + self.arq_session_keep_alive_watchdog() + + def burst_watchdog(self) -> None: + """ + Watchdog which checks if we are running into a connection timeout + DATA BURST + """ + # IRS SIDE + # TODO We need to redesign this part for cleaner state handling + # Return if not ARQ STATE and not ARQ SESSION STATE as they are different use cases + if ( + not self.states.is_arq_state + and self.states.arq_session_state != "connected" + or not self.is_IRS + ): + return + + # get modem error state + modem_error_state = modem.get_modem_error_state() + + # We want to reach this state only if connected ( == return above not called ) + if self.rx_n_frames_per_burst > 1: + # uses case for IRS: reduce time for waiting by counting "None" in burst buffer + frames_left = self.arq_rx_burst_buffer.count(None) + elif self.rx_n_frame_of_burst == 0 and self.rx_n_frames_per_burst == 0: + # use case for IRS: We didn't receive a burst yet, because the first one got lost + # in this case we don't have any information about the expected burst length + # we must assume, we are getting a burst with max_n_frames_per_burst + frames_left = self.max_n_frames_per_burst + else: + frames_left = 1 + + # make sure we don't have a 0 here for avoiding too short timeouts + if frames_left == 0: + frames_left = 1 + + # timeout is reached, if we didnt receive data, while we waited + # for the corresponding data frame + the transmitted signalling frame of ack/nack + # + a small offset of about 1 second + timeout = \ + ( + self.burst_last_received + + (self.time_list[self.speed_level] * frames_left) + + self.duration_sig0_frame + + self.channel_busy_timeout + + 1 + ) + + # override calculation + # if we reached 2/3 of the waiting time and didnt received a signal + # then send NACK earlier + time_left = timeout - time.time() + waiting_time = (self.time_list[ + self.speed_level] * frames_left) + self.duration_sig0_frame + self.channel_busy_timeout + 1 + timeout_percent = 100 - (time_left / waiting_time * 100) + # timeout_percent = 0 + if timeout_percent >= 75 and not self.states.is_codec2_traffic and not self.states.isTransmitting(): + override = True + else: + override = False + + # TODO Enable this for development + print( + f"timeout expected in:{round(timeout - time.time())} | timeout percent: {timeout_percent} | frames left: {frames_left} of {self.rx_n_frames_per_burst} | speed level: {self.speed_level}") + # if timeout is expired, but we are receivingt codec2 data, + # better wait some more time because data might be important for us + # reason for this situation can be delays on IRS and ISS, maybe because both had a busy channel condition. + # Nevertheless, we need to keep timeouts short for efficiency + if timeout <= time.time() or modem_error_state and not self.states.is_codec2_traffic and not self.states.isTransmitting() or override: + self.log.warning( + "[Modem] Burst decoding error or timeout", + attempt=self.n_retries_per_burst, + max_attempts=self.rx_n_max_retries_per_burst, + speed_level=self.speed_level, + modem_error_state=modem_error_state + ) + + print( + f"frames_per_burst {self.rx_n_frame_of_burst} / {self.rx_n_frames_per_burst}, Repeats: {self.burst_rpt_counter} Nones: {self.arq_rx_burst_buffer.count(None)}") + # check if we have N frames per burst > 1 + if self.rx_n_frames_per_burst > 1 and self.burst_rpt_counter < 3 and self.arq_rx_burst_buffer.count( + None) > 0: + # reset self.burst_last_received + self.burst_last_received = time.time() + self.time_list[self.speed_level] * frames_left + self.burst_rpt_counter += 1 + self.send_retransmit_request_frame() + + else: + + # reset self.burst_last_received counter + self.burst_last_received = time.time() + + # reduce speed level if nack counter increased + self.frame_received_counter = 0 + self.burst_nack_counter += 1 + if self.burst_nack_counter >= 2: + self.burst_nack_counter = 0 + self.speed_level = max(self.speed_level - 1, 0) + self.states.set("arq_speed_level", self.speed_level) + + # TODO Create better mechanisms for handling n frames per burst for bad channels + # reduce frames per burst + if self.burst_rpt_counter >= 2: + tx_n_frames_per_burst = max(self.rx_n_frames_per_burst - 1, 1) + else: + tx_n_frames_per_burst = self.rx_n_frames_per_burst + + # Update modes we are listening to + self.set_listening_modes(True, True, self.mode_list[self.speed_level]) + + # TODO Does SNR make sense for NACK if we dont have an actual SNR information? + self.send_burst_nack_frame_watchdog(tx_n_frames_per_burst) + + # Update data_channel timestamp + # TODO Disabled this one for testing. + # self.data_channel_last_received = time.time() + self.n_retries_per_burst += 1 + else: + # debugging output + # print((self.data_channel_last_received + self.time_list[self.speed_level])-time.time()) + pass + + if self.n_retries_per_burst >= self.rx_n_max_retries_per_burst: + self.stop_transmission() + + def data_channel_keep_alive_watchdog(self) -> None: + """ + watchdog which checks if we are running into a connection timeout + DATA CHANNEL + """ + # and not static.ARQ_SEND_KEEP_ALIVE: + if self.states.is_arq_state and self.states.is_modem_busy: + threading.Event().wait(0.01) + if ( + self.data_channel_last_received + self.transmission_timeout + > time.time() + ): + + timeleft = int((self.data_channel_last_received + self.transmission_timeout) - time.time()) + self.states.set("arq_seconds_until_timeout", timeleft) + if timeleft % 10 == 0: + self.log.debug("Time left until channel timeout", seconds=timeleft) + + # threading.Event().wait(5) + # print(self.data_channel_last_received + self.transmission_timeout - time.time()) + # pass + else: + # Clear the timeout timestamp + self.data_channel_last_received = 0 + self.log.info( + "[Modem] DATA [" + + str(self.mycallsign, "UTF-8") + + "]<>[" + + str(self.dxcallsign, "UTF-8") + + "]" + ) + self.event_manager.send_custom_event( + freedata="modem-message", + arq="transmission", + status="failed", + uuid=self.transmission_uuid, + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + irs=helpers.bool_to_string(self.is_IRS) + ) + self.arq_cleanup() + + def arq_session_keep_alive_watchdog(self) -> None: + """ + watchdog which checks if we are running into a connection timeout + ARQ SESSION + """ + if ( + self.states.is_arq_session + and self.states.is_modem_busy + and not self.arq_file_transfer + ): + if self.arq_session_last_received + self.arq_session_timeout > time.time(): + threading.Event().wait(0.01) + else: self.log.info( "[Modem] SESSION [" + str(self.mycallsign, "UTF-8") - + "]>>?<<[" + + "]<>[" + str(self.dxcallsign, "UTF-8") - + "]", - a=f"{str(attempt + 1)}/{str(self.session_connect_max_retries)}", - state=self.states.arq_session_state, + + "]" ) - self.event_manager.send_custom_event( freedata="modem-message", arq="session", - status="connecting", - attempt=attempt + 1, - maxattempts=self.session_connect_max_retries, + status="failed", + reason="timeout", mycallsign=str(self.mycallsign, 'UTF-8'), dxcallsign=str(self.dxcallsign, 'UTF-8'), ) - - self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - - # Wait for a time, looking to see if `self.states.is_arq_session` - # indicates we've received a positive response from the far station. - timeout = time.time() + 3 - while time.time() < timeout: - threading.Event().wait(0.01) - # Stop waiting if data channel is opened - if self.states.is_arq_session: - return True - - # Stop waiting and interrupt if data channel is getting closed while opening - if self.states.arq_session_state == "disconnecting": - # disabled this session close as its called twice - # self.close_session() - return False - - # Session connect timeout, send close_session frame to - # attempt to clean up the far-side, if it received the - # open_session frame and can still hear us. - if not self.states.is_arq_session: self.close_session() - return False - # Given the while condition, it will only exit when `self.states.is_arq_session` is True - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="connected", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) - return True - - def received_session_opener(self, data_in: bytes, snr) -> None: + def heartbeat(self) -> None: """ - Received a session open request packet. - - Args: - data_in:bytes: + Heartbeat thread which auto pauses and resumes the heartbeat signal when in an arq session """ - # if we don't want to respond to calls, return False - if not self.respond_to_call: - return False + while True: + threading.Event().wait(0.01) + # additional check for smoother stopping if heartbeat transmission + while not self.arq_file_transfer: + threading.Event().wait(0.01) + if ( + self.states.is_arq_session + and self.IS_ARQ_SESSION_MASTER + and self.states.arq_session_state == "connected" + # and not self.arq_file_transfer + ): + threading.Event().wait(1) + self.transmit_session_heartbeat() + threading.Event().wait(2) - # ignore channel opener if already in ARQ STATE - # use case: Station A is connecting to Station B while - # Station B already tries connecting to Station A. - # For avoiding ignoring repeated connect request in case of packet loss - # we are only ignoring packets in case we are ISS - if self.states.is_arq_session and self.IS_ARQ_SESSION_MASTER: - return False - - self.IS_ARQ_SESSION_MASTER = False - self.states.set("arq_session_state", "connecting") - - # Update arq_session timestamp - self.arq_session_last_received = int(time.time()) - - self.session_id = bytes(data_in[1:2]) - self.dxcallsign_crc = bytes(data_in[5:8]) - self.dxcallsign = helpers.bytes_to_callsign(bytes(data_in[8:14])) - self.states.set("dxcallsign", self.dxcallsign) - - # check if callsign ssid override - valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[2:5], self.ssid_list) - self.mycallsign = mycallsign - self.dxgrid = b'------' - helpers.add_to_heard_stations( - self.dxcallsign, - self.dxgrid, - "DATA", - snr, - self.modem_frequency_offset, - self.states.radio_frequency, - self.states.heard_stations - ) - self.log.info( - "[Modem] SESSION [" - + str(self.mycallsign, "UTF-8") - + "]>>|<<[" - + str(self.dxcallsign, "UTF-8") - + "]", - self.states.arq_session_state, - ) - self.states.is_arq_session = True - self.states.set("is_modem_busy", True) - - self.event_manager.send_custom_event( - freedata="modem-message", - arq="session", - status="connected", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - ) - - self.transmit_session_heartbeat() diff --git a/modem/protocol_arq_irs.py b/modem/protocol_arq_session_irs.py similarity index 99% rename from modem/protocol_arq_irs.py rename to modem/protocol_arq_session_irs.py index 9f16e8bb..84c223c6 100644 --- a/modem/protocol_arq_irs.py +++ b/modem/protocol_arq_session_irs.py @@ -9,7 +9,7 @@ from codec2 import FREEDV_MODE from queues import RX_BUFFER from modem_frametypes import FRAME_TYPE as FR_TYPE -from protocol_arq import ARQ +from protocol_arq_session import ARQ class IRS(ARQ): diff --git a/modem/protocol_arq_iss.py b/modem/protocol_arq_session_iss.py similarity index 99% rename from modem/protocol_arq_iss.py rename to modem/protocol_arq_session_iss.py index 17377c8e..e2c1a3a0 100644 --- a/modem/protocol_arq_iss.py +++ b/modem/protocol_arq_session_iss.py @@ -13,7 +13,7 @@ from codec2 import FREEDV_MODE from modem_frametypes import FRAME_TYPE as FR_TYPE import event_manager -from protocol_arq import ARQ +from protocol_arq_session import ARQ class ISS(ARQ): def __init__(self, config, event_queue, states): diff --git a/tests/test_arq_session.py b/tests/test_arq_session.py index e3de109c..305c5acf 100644 --- a/tests/test_arq_session.py +++ b/tests/test_arq_session.py @@ -70,6 +70,10 @@ class TestARQSession(unittest.TestCase): self.irs_to_iss_channel.start() def testARQSession(self): + + # set Packet Error Rate (PER) / frame loss probability + self.loss_probability = 0 + self.establishChannels() params = { 'dxcall': "DJ2LS-3", diff --git a/tools/cleanup.sh b/tools/cleanup.sh index 1376b87d..07572dcb 100644 --- a/tools/cleanup.sh +++ b/tools/cleanup.sh @@ -1,3 +1,3 @@ -#autopep8 --in-place --select W291,W293,W391,E231 --ignore E501 ../modem.py ../data_handler.py ../deprecated_main.py ../deprecated_sock.py ../deprecated_static.py ../helpers.py +#autopep8 --in-place --select W291,W293,W391,E231 --ignore E501 ../modem.py ../deprecated_data_handler.py ../deprecated_main.py ../deprecated_sock.py ../deprecated_static.py ../helpers.py autopep8 --in-place --ignore E501 ../modem.py ../data_handler.py ../main.py ../sock.py ../static.py ../helpers.py