diff --git a/modem/data_handler.py b/modem/data_handler.py index 6ed9d3cb..43a59f42 100644 --- a/modem/data_handler.py +++ b/modem/data_handler.py @@ -110,7 +110,6 @@ class DATA: self.received_LOW_BANDWIDTH_MODE = False self.data_channel_max_retries = 15 - self.datachannel_timeout = False # -------------- AVAILABLE MODES START----------- # IMPORTANT: LISTS MUST BE OF EQUAL LENGTH @@ -182,6 +181,8 @@ class DATA: self.data_frame_ack_timeout_seconds = 4.5 # timeout for data frame acknowledges self.rpt_ack_timeout_seconds = 4.5 # timeout for rpt frame acknowledges self.transmission_timeout = 180 # transmission timeout in seconds + self.channel_busy_timeout = 2 # time how long we want to wait until channel busy state overrides + self.datachannel_opening_interval = self.duration_sig1_frame + self.channel_busy_timeout + 1 # time between attempts when opening data channel # Dictionary of functions and log messages used in process_data # instead of a long series of if-elif-else statements. @@ -1742,8 +1743,6 @@ class DATA: Station.dxcallsign = self.dxcallsign Station.dxcallsign_crc = helpers.get_crc_24(self.dxcallsign) - # TODO we need to check this, maybe placing it to class init - self.datachannel_timeout = False self.log.info( "[Modem] SESSION [" + str(self.mycallsign, "UTF-8") @@ -2162,114 +2161,19 @@ class DATA: if ARQ.arq_session: threading.Event().wait(2.5) - self.datachannel_timeout = False - - # we need to compress data for getting a compression factor. - # so we are compressing twice. This is not that nice and maybe there is another way - # for calculating transmission statistics - # ARQ.arq_compression_factor = len(data_out) / len(lzma.compress(data_out)) + # init arq state event + ARQ.arq_state_event = threading.Event() + # finally start the channel opening procedure self.arq_open_data_channel(mycallsign) - # wait until data channel is open - while not ARQ.arq_state and not self.datachannel_timeout and Modem.modem_state in ["BUSY"]: - threading.Event().wait(0.01) - - if ARQ.arq_state: + # if data channel is open, return true else false + if ARQ.arq_state_event.is_set(): + # start arq transmission self.arq_transmit(data_out, hmac_salt) return True - return False - - def arq_open_data_channel( - self, mycallsign - ) -> bool: - """ - Open an ARQ data channel. - - Args: - mycallsign:bytes: - - Returns: - True if the data channel was opened successfully - False if the data channel failed to open - """ - self.is_IRS = False - - # init a new random session id if we are not in an arq session - if not ARQ.arq_session: - self.session_id = np.random.bytes(1) - - # Update data_channel timestamp - self.data_channel_last_received = int(time.time()) - - if Modem.low_bandwidth_mode: - frametype = bytes([FR_TYPE.ARQ_DC_OPEN_N.value]) - self.log.debug("[Modem] Requesting low bandwidth mode") - else: - frametype = bytes([FR_TYPE.ARQ_DC_OPEN_W.value]) - self.log.debug("[Modem] Requesting high bandwidth mode") - - connection_frame = bytearray(self.length_sig0_frame) - connection_frame[:1] = frametype - connection_frame[1:4] = Station.dxcallsign_crc - connection_frame[4:7] = Station.mycallsign_crc - connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign) - connection_frame[13:14] = self.session_id - - while not ARQ.arq_state: - threading.Event().wait(0.01) - for attempt in range(self.data_channel_max_retries): - - self.send_data_to_socket_queue( - freedata="modem-message", - arq="transmission", - status="opening", - mycallsign=str(mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - irs=helpers.bool_to_string(self.is_IRS) - ) - - self.log.info( - "[Modem] ARQ | DATA | TX | [" - + str(mycallsign, "UTF-8") - + "]>> <<[" - + str(self.dxcallsign, "UTF-8") - + "]", - attempt=f"{str(attempt + 1)}/{str(self.data_channel_max_retries)}", - ) - - # Let's check if we have a busy channel and if we are not in a running arq session. - if ModemParam.channel_busy and not ARQ.arq_state: - self.log.warning("[Modem] Channel busy, waiting until free...") - self.send_data_to_socket_queue( - freedata="modem-message", - arq="transmission", - status="waiting", - mycallsign=str(self.mycallsign, 'UTF-8'), - dxcallsign=str(self.dxcallsign, 'UTF-8'), - irs=helpers.bool_to_string(self.is_IRS) - ) - - # wait while timeout not reached and our busy state is busy - channel_busy_timeout = time.time() + 5 - while ModemParam.channel_busy and time.time() < channel_busy_timeout and not self.check_if_mode_fits_to_busy_slot(): - threading.Event().wait(0.01) - - self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - - timeout = time.time() + self.duration_sig1_frame * 3 + (ModemParam.tx_delay / 1000 * 2) - while time.time() < timeout: - threading.Event().wait(0.01) - # Stop waiting if data channel is opened - if ARQ.arq_state: - return True - if Modem.modem_state in ["IDLE"]: - return False - - # `data_channel_max_retries` attempts have been sent. Aborting attempt & cleaning up - self.log.debug( "[Modem] arq_open_data_channel:", transmission_uuid=self.transmission_uuid ) @@ -2297,16 +2201,92 @@ class DATA: + str(self.dxcallsign, "UTF-8") + "]" ) - self.datachannel_timeout = True # Attempt to clean up the far-side, if it received the # open_session frame and can still hear us. self.close_session() + # otherwise return false return False - # Shouldn't get here... - return True + def arq_open_data_channel( + self, mycallsign + ) -> bool: + """ + Open an ARQ data channel. + + Args: + mycallsign:bytes: + + Returns: + True if the data channel was opened successfully + False if the data channel failed to open + """ + # set IRS indicator to false, because we are IRS + self.is_IRS = False + + # init a new random session id if we are not in an arq session + if not ARQ.arq_session: + self.session_id = np.random.bytes(1) + + # Update data_channel timestamp + self.data_channel_last_received = int(time.time()) + + # check if the Modem is running in low bandwidth mode + # then set the corresponding frametype and build frame + if Modem.low_bandwidth_mode: + frametype = bytes([FR_TYPE.ARQ_DC_OPEN_N.value]) + self.log.debug("[Modem] Requesting low bandwidth mode") + else: + frametype = bytes([FR_TYPE.ARQ_DC_OPEN_W.value]) + self.log.debug("[Modem] Requesting high bandwidth mode") + + connection_frame = bytearray(self.length_sig0_frame) + connection_frame[:1] = frametype + connection_frame[1:4] = Station.dxcallsign_crc + connection_frame[4:7] = Station.mycallsign_crc + connection_frame[7:13] = helpers.callsign_to_bytes(mycallsign) + connection_frame[13:14] = self.session_id + + for attempt in range(self.data_channel_max_retries): + + self.send_data_to_socket_queue( + freedata="modem-message", + arq="transmission", + status="opening", + mycallsign=str(mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + irs=helpers.bool_to_string(self.is_IRS) + ) + + self.log.info( + "[Modem] ARQ | DATA | TX | [" + + str(mycallsign, "UTF-8") + + "]>> <<[" + + str(self.dxcallsign, "UTF-8") + + "]", + attempt=f"{str(attempt + 1)}/{str(self.data_channel_max_retries)}", + ) + + # Let's check if we have a busy channel and if we are not in a running arq session. + if ModemParam.channel_busy and not ARQ.arq_state_event.is_set(): + self.channel_busy_handler() + + # if channel free, enqueue frame for tx + self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) + + # wait until timeout or event set + ARQ.arq_state_event.wait(timeout=self.datachannel_opening_interval) + + if ARQ.arq_state_event.is_set(): + return True + if Modem.modem_state in ["IDLE"]: + return False + + # `data_channel_max_retries` attempts have been sent. Aborting attempt & cleaning up + return False + + def arq_received_data_channel_opener(self, data_in: bytes): """ @@ -2564,6 +2544,9 @@ class DATA: # as soon as we set ARQ_STATE to DATA, transmission starts ARQ.arq_state = True + # also set the ARQ event + ARQ.arq_state_event.set() + # Update data_channel timestamp self.data_channel_last_received = int(time.time()) else: @@ -3073,6 +3056,29 @@ class DATA: + "] ", ) + + def channel_busy_handler(self): + """ + function for handling the channel busy situation + Args: + + Returns: + """ + self.log.warning("[Modem] Channel busy, waiting until free...") + self.send_data_to_socket_queue( + freedata="modem-message", + arq="transmission", + status="waiting", + mycallsign=str(self.mycallsign, 'UTF-8'), + dxcallsign=str(self.dxcallsign, 'UTF-8'), + irs=helpers.bool_to_string(self.is_IRS) + ) + + # wait while timeout not reached and our busy state is busy + channel_busy_timeout = time.time() + 5 + while ModemParam.channel_busy and time.time() < channel_busy_timeout and not self.check_if_mode_fits_to_busy_slot(): + threading.Event().wait(0.01) + # ------------ CALCULATE TRANSFER RATES def calculate_transfer_rate_rx( self, rx_start_of_transmission: float, receivedbytes: int @@ -3263,6 +3269,7 @@ class DATA: ARQ.arq_session_state = "disconnected" ARQ.speed_list = [] ARQ.arq_state = False + ARQ.arq_state_event = threading.Event() self.arq_file_transfer = False Beacon.beacon_pause = False diff --git a/modem/static.py b/modem/static.py index 7059b8dc..e63d2956 100644 --- a/modem/static.py +++ b/modem/static.py @@ -11,6 +11,7 @@ from dataclasses import dataclass, field from typing import List import subprocess from enum import Enum +import threading # CHANNEL_STATE = 'RECEIVING_SIGNALLING' @@ -33,6 +34,7 @@ class ARQ: arq_session_state: str = "disconnected" # can be: disconnected, disconnecting, connected, connecting, failed arq_session: bool = False arq_state: bool = False + arq_state_event: threading.Event = field(default_factory=threading.Event) # ARQ PROTOCOL VERSION # v.5 - signalling frame uses datac0 # v.6 - signalling frame uses datac13 @@ -134,7 +136,7 @@ class TCIParam: @dataclass class Modem: - version = "0.11.1-alpha.1" + version = "0.11.2-alpha.1" host: str = "0.0.0.0" port: int = 3000 SOCKET_TIMEOUT: int = 1 # seconds