Merge pull request #355 from DJ2LS/ls-arq

code refactoring
This commit is contained in:
DJ2LS 2023-02-11 14:14:27 +01:00 committed by GitHub
commit dd2658819c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 1189 additions and 1115 deletions

View file

@ -21,7 +21,7 @@ jobs:
- python-version: "3.9"
- python-version: "3.10"
- python-version: "3.11"
- python-version: "3.12-dev"
#- python-version: "3.12-dev"
steps:
- uses: actions/checkout@v3

View file

@ -39,6 +39,10 @@ def t_setup(
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
# Enable socket testmode for overriding socket class
sock.TESTMODE = True
modem.RXCHANNEL = tmp_path / rx_channel
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / tx_channel
@ -147,9 +151,9 @@ def t_datac0_1(
log.debug("t_datac0_1: STOP test, setting TNC state")
static.TNC_STATE = "BUSY"
static.ARQ_STATE = True
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(0.5)
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
@ -167,7 +171,7 @@ def t_datac0_1(
# override ARQ SESSION STATE for allowing disconnect command
static.ARQ_SESSION_STATE = "connected"
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(0.5)
# Allow enough time for this side to process the disconnect frame.
@ -260,8 +264,8 @@ def t_datac0_2(
if "cq" in data:
t_data = {"type": "arq", "command": "stop_transmission"}
sock.process_tnc_commands(json.dumps(t_data, indent=None))
sock.process_tnc_commands(json.dumps(t_data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(t_data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(t_data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration

View file

@ -33,6 +33,10 @@ def t_setup(
):
# Disable data_handler testmode - This is required to test a conversation.
data_handler.TESTMODE = False
# Enable socket testmode for overriding socket class
sock.TESTMODE = True
modem.RXCHANNEL = tmp_path / rx_channel
modem.TESTMODE = True
modem.TXCHANNEL = tmp_path / tx_channel
@ -148,8 +152,8 @@ def t_datac0_1(
static.DXCALLSIGN_CRC = helpers.get_crc_24(static.DXCALLSIGN)
static.TNC_STATE = "BUSY"
static.ARQ_STATE = True
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration
@ -173,7 +177,7 @@ def t_datac0_1(
# override ARQ SESSION STATE for allowing disconnect command
static.ARQ_SESSION_STATE = "connected"
data = {"type": "arq", "command": "disconnect", "dxcallsign": dxcall}
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(0.5)
# Allow enough time for this side to process the disconnect frame.
@ -266,8 +270,8 @@ def t_datac0_2(
if "cq" in data:
t_data = {"type": "arq", "command": "stop_transmission"}
sock.process_tnc_commands(json.dumps(t_data, indent=None))
sock.process_tnc_commands(json.dumps(t_data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(t_data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(t_data, indent=None))
# Assure the test completes.
timeout = time.time() + timeout_duration

View file

@ -124,17 +124,17 @@ def t_arq_iss(*args):
time.sleep(0.5)
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(1.5)
data = {"type": "arq", "command": "stop_transmission"}
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
time.sleep(0.5)
sock.process_tnc_commands(json.dumps(data, indent=None))
sock.ThreadedTCPRequestHandler.process_tnc_commands(None,json.dumps(data, indent=None))
# Set timeout
timeout = time.time() + 15

View file

@ -135,34 +135,75 @@ class DAEMON:
# data[2] mygrid
# data[3] rx_audio
# data[4] tx_audio
# REMOVED - data[5] devicename
# REMOVED - data[6] deviceport
# REMOVED - data[7] serialspeed
# REMOVED - data[8] pttprotocol
# REMOVED - data[9] pttport
# REMOVED - data[10] data_bits
# REMOVED - data[11] stop_bits
# REMOVED - data[12] handshake
# data[13] radiocontrol
# data[14] rigctld_ip
# data[15] rigctld_port
# data[16] send_scatter
# data[17] send_fft
# data[18] low_bandwidth_mode
# data[19] tuning_range_fmin
# data[20] tuning_range_fmax
# data[21] enable FSK
# data[22] tx-audio-level
# data[23] respond_to_cq
# data[24] rx_buffer_size
# data[25] explorer
# data[26] ssid_list
# data[27] auto_tune
# data[28] stats
# TODO: We need to remove 5-12 and adjust the list number for other paramters
# This is only a dirty fix
# data[5] radiocontrol
# data[6] rigctld_ip
# data[7] rigctld_port
# data[8] send_scatter
# data[9] send_fft
# data[10] low_bandwidth_mode
# data[11] tuning_range_fmin
# data[12] tuning_range_fmax
# data[13] enable FSK
# data[14] tx-audio-level
# data[15] respond_to_cq
# data[16] rx_buffer_size
# data[17] explorer
# data[18] ssid_list
# data[19] auto_tune
# data[20] stats
if data[0] == "STARTTNC":
self.start_tnc(data)
if data[0] == "TEST_HAMLIB":
# data[9] radiocontrol
# data[10] rigctld_ip
# data[11] rigctld_port
self.test_hamlib_ptt(data)
except Exception as err1:
self.log.error("[DMN] worker: Exception: ", e=err1)
def test_hamlib_ptt(self, data):
radiocontrol = data[1]
rigctld_ip = data[2]
rigctld_port = data[3]
# check how we want to control the radio
if radiocontrol == "direct":
print("direct hamlib support deprecated - not usable anymore")
sys.exit(1)
elif radiocontrol == "rigctl":
print("rigctl support deprecated - not usable anymore")
sys.exit(1)
elif radiocontrol == "rigctld":
import rigctld as rig
else:
import rigdummy as rig
hamlib = rig.radio()
hamlib.open_rig(
rigctld_ip=rigctld_ip,
rigctld_port=rigctld_port,
)
# hamlib_version = rig.hamlib_version
hamlib.set_ptt(True)
if hamlib.get_ptt():
self.log.info("[DMN] Hamlib PTT", status="SUCCESS")
response = {"command": "test_hamlib", "result": "SUCCESS"}
else:
self.log.warning("[DMN] Hamlib PTT", status="NO SUCCESS")
response = {"command": "test_hamlib", "result": "NOSUCCESS"}
hamlib.set_ptt(False)
hamlib.close_rig()
jsondata = json.dumps(response)
sock.SOCKET_QUEUE.put(jsondata)
def start_tnc(self, data):
self.log.warning("[DMN] Starting TNC", rig=data[5], port=data[6])
# list of parameters, necessary for running subprocess command as a list
@ -225,9 +266,6 @@ class DAEMON:
# safe data to config file
config.write_entire_config(data)
# Try running tnc from binary, else run from source
# This helps running the tnc in a developer environment
try:
@ -270,62 +308,6 @@ class DAEMON:
static.TNCPROCESS = proc
static.TNCSTARTED = True
"""
# WE HAVE THIS PART in SOCKET
if data[0] == "STOPTNC":
static.TNCPROCESS.kill()
self.log.warning("[DMN] Stopping TNC")
#os.kill(static.TNCPROCESS, signal.SIGKILL)
static.TNCSTARTED = False
"""
# data[9] radiocontrol
# data[10] rigctld_ip
# data[11] rigctld_port
if data[0] == "TEST_HAMLIB":
radiocontrol = data[1]
rigctld_ip = data[2]
rigctld_port = data[3]
# check how we want to control the radio
if radiocontrol == "direct":
print("direct hamlib support deprecated - not usable anymore")
sys.exit(1)
elif radiocontrol == "rigctl":
print("rigctl support deprecated - not usable anymore")
sys.exit(1)
elif radiocontrol == "rigctld":
import rigctld as rig
else:
import rigdummy as rig
hamlib = rig.radio()
hamlib.open_rig(
rigctld_ip=rigctld_ip,
rigctld_port=rigctld_port,
)
# hamlib_version = rig.hamlib_version
hamlib.set_ptt(True)
if pttstate := hamlib.get_ptt():
self.log.info("[DMN] Hamlib PTT", status="SUCCESS")
response = {"command": "test_hamlib", "result": "SUCCESS"}
else:
self.log.warning("[DMN] Hamlib PTT", status="NO SUCCESS")
response = {"command": "test_hamlib", "result": "NOSUCCESS"}
hamlib.set_ptt(False)
hamlib.close_rig()
jsondata = json.dumps(response)
sock.SOCKET_QUEUE.put(jsondata)
except Exception as err1:
self.log.error("[DMN] worker: Exception: ", e=err1)
if __name__ == "__main__":
mainlog = structlog.get_logger(__file__)
# we need to run this on Windows for multiprocessing support

View file

@ -638,15 +638,6 @@ class DATA:
# is intended for this station.
data_in = bytes(data_in)
# TODO: this seems not to work anymore
# get received crc for different mycall ssids
# check if callsign ssid override
# _, mycallsign = helpers.check_callsign(
# self.mycallsign, data_in[2:5]
# )
# attempt fixing this
mycallsign = self.mycallsign
# only process data if we are in ARQ and BUSY state else return to quit
if not static.ARQ_STATE and static.TNC_STATE not in ["BUSY"]:
self.log.warning("[TNC] wrong tnc state - dropping data", arq_state=static.ARQ_STATE, tnc_state=static.TNC_STATE)
@ -704,7 +695,7 @@ class DATA:
# catch possible modem error which leads into false byteorder
# modem possibly decodes too late - data then is pushed to buffer
# which leads into wrong byteorder
# Lets put this in try/except so we are not crashing tnc as its hihgly experimental
# Lets put this in try/except so we are not crashing tnc as its highly experimental
# This might only work for datac1 and datac3
try:
# area_of_interest = (modem.get_bytes_per_frame(self.mode_list[speed_level] - 1) -3) * 2
@ -719,7 +710,6 @@ class DATA:
"[TNC] ARQ | RX | wrong byteorder check failed", e=e
)
# if frame buffer ends not with the current frame, we are going to append new data
# if data already exists, we received the frame correctly,
# but the ACK frame didn't receive its destination (ISS)
@ -737,7 +727,6 @@ class DATA:
# temp_burst_buffer --> new data
# search_area --> area where we want to search
# data_mode = self.mode_list[self.speed_level]
# payload_per_frame = modem.get_bytes_per_frame(data_mode) - 2
# search_area = payload_per_frame - 3 # (3 bytes arq frame header)
@ -774,28 +763,11 @@ class DATA:
and data_in.find(self.data_frame_eof) < 0
):
self.frame_received_counter += 1
# try increasing speed level only if we had two successful decodes
if self.frame_received_counter >= 2:
self.frame_received_counter = 0
# make sure new speed level isn't higher than available modes
new_speed_level = min(self.speed_level + 1, len(self.mode_list) - 1)
# check if actual snr is higher than minimum snr for next mode
if static.SNR >= self.snr_list[new_speed_level]:
self.speed_level = new_speed_level
else:
self.log.info("[TNC] ARQ | increasing speed level not possible because of SNR limit",
given_snr=static.SNR,
needed_snr=self.snr_list[new_speed_level]
)
static.ARQ_SPEED_LEVEL = self.speed_level
# Update modes we are listening to
self.set_listening_modes(False, True, self.mode_list[self.speed_level])
self.arq_calculate_speed_level(snr)
# Create and send ACK frame
self.log.info("[TNC] ARQ | RX | SENDING ACK", finished=static.ARQ_SECONDS_UNTIL_FINISH, bytesperminute=static.ARQ_BYTES_PER_MINUTE)
self.log.info("[TNC] ARQ | RX | SENDING ACK", finished=static.ARQ_SECONDS_UNTIL_FINISH,
bytesperminute=static.ARQ_BYTES_PER_MINUTE)
self.send_burst_ack_frame(snr)
# Reset n retries per burst counter
@ -837,7 +809,6 @@ class DATA:
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
# Should never reach this point
else:
self.log.error(
"[TNC] data_handler: Should not reach this point...",
@ -854,19 +825,7 @@ class DATA:
# get total bytes per transmission information as soon we received a frame with a BOF
if bof_position >= 0:
payload = static.RX_FRAME_BUFFER[
bof_position + len(self.data_frame_bof): eof_position
]
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes
static.TOTAL_BYTES = frame_length
compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes
# limit to max value of 255
compression_factor = np.clip(compression_factor, 0, 255)
static.ARQ_COMPRESSION_FACTOR = compression_factor / 10
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
self.arq_extract_statistics_from_data_frame(bof_position, eof_position)
if (
bof_position >= 0
and eof_position > 0
@ -896,13 +855,91 @@ class DATA:
# Check if data_frame_crc is equal with received crc
if data_frame_crc == data_frame_crc_received:
self.arq_process_received_data_frame(data_frame, snr)
else:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
irs=helpers.bool_to_string(self.is_IRS)
)
duration = time.time() - self.rx_start_of_transmission
self.log.warning(
"[TNC] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
e="wrong crc",
expected=data_frame_crc.hex(),
received=data_frame_crc_received.hex(),
overflows=static.BUFFER_OVERFLOW_COUNTER,
nacks=self.frame_nack_counter,
duration=duration,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
compression=static.ARQ_COMPRESSION_FACTOR,
data=data_frame,
)
if static.ENABLE_STATS:
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration)
self.log.info("[TNC] ARQ | RX | Sending NACK", finished=static.ARQ_SECONDS_UNTIL_FINISH, bytesperminute=static.ARQ_BYTES_PER_MINUTE)
self.send_burst_nack_frame(snr)
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
# Finally cleanup our buffers and states,
self.arq_cleanup()
def arq_extract_statistics_from_data_frame(self, bof_position, eof_position):
payload = static.RX_FRAME_BUFFER[
bof_position + len(self.data_frame_bof): eof_position
]
frame_length = int.from_bytes(payload[4:8], "big") # 4:8 4bytes
static.TOTAL_BYTES = frame_length
compression_factor = int.from_bytes(payload[8:9], "big") # 4:8 4bytes
# limit to max value of 255
compression_factor = np.clip(compression_factor, 0, 255)
static.ARQ_COMPRESSION_FACTOR = compression_factor / 10
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
def arq_calculate_speed_level(self, snr):
self.frame_received_counter += 1
# try increasing speed level only if we had two successful decodes
if self.frame_received_counter >= 2:
self.frame_received_counter = 0
# make sure new speed level isn't higher than available modes
new_speed_level = min(self.speed_level + 1, len(self.mode_list) - 1)
# check if actual snr is higher than minimum snr for next mode
if static.SNR >= self.snr_list[new_speed_level]:
self.speed_level = new_speed_level
else:
self.log.info("[TNC] ARQ | increasing speed level not possible because of SNR limit",
given_snr=static.SNR,
needed_snr=self.snr_list[new_speed_level]
)
static.ARQ_SPEED_LEVEL = self.speed_level
# Update modes we are listening to
self.set_listening_modes(False, True, self.mode_list[self.speed_level])
def arq_process_received_data_frame(self, data_frame, snr):
"""
"""
# transmittion duration
duration = time.time() - self.rx_start_of_transmission
self.calculate_transfer_rate_rx(
self.rx_start_of_transmission, len(static.RX_FRAME_BUFFER)
)
self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter,bytesperminute=static.ARQ_BYTES_PER_MINUTE, total_bytes=static.TOTAL_BYTES, duration=duration)
self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter,
bytesperminute=static.ARQ_BYTES_PER_MINUTE, total_bytes=static.TOTAL_BYTES, duration=duration)
# Decompress the data frame
data_frame_decompressed = lzma.decompress(data_frame)
@ -967,7 +1004,7 @@ class DATA:
self.save_data_to_folder(
self.transmission_uuid,
timestamp,
mycallsign,
self.mycallsign,
static.DXCALLSIGN,
static.DXGRID,
data_frame
@ -989,7 +1026,7 @@ class DATA:
status="received",
uuid=self.transmission_uuid,
timestamp=timestamp,
mycallsign=str(mycallsign, "UTF-8"),
mycallsign=str(self.mycallsign, "UTF-8"),
dxcallsign=str(static.DXCALLSIGN, "UTF-8"),
dxgrid=str(static.DXGRID, "UTF-8"),
data=base64_data,
@ -1001,10 +1038,7 @@ class DATA:
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="received", duration=duration)
self.log.info(
"[TNC] ARQ | RX | SENDING DATA FRAME ACK",
snr=snr,
crc=data_frame_crc.hex(),
)
"[TNC] ARQ | RX | SENDING DATA FRAME ACK")
self.send_data_ack_frame(snr)
# Update statistics AFTER the frame ACK is sent
@ -1021,43 +1055,6 @@ class DATA:
snr=snr,
)
else:
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
status="failed",
uuid=self.transmission_uuid,
mycallsign=str(self.mycallsign, 'UTF-8'),
dxcallsign=str(self.dxcallsign, 'UTF-8'),
irs=helpers.bool_to_string(self.is_IRS)
)
duration = time.time() - self.rx_start_of_transmission
self.log.warning(
"[TNC] ARQ | RX | DATA FRAME NOT SUCCESSFULLY RECEIVED!",
e="wrong crc",
expected=data_frame_crc.hex(),
received=data_frame_crc_received.hex(),
overflows=static.BUFFER_OVERFLOW_COUNTER,
nacks=self.frame_nack_counter,
duration=duration,
bytesperminute=static.ARQ_BYTES_PER_MINUTE,
compression=static.ARQ_COMPRESSION_FACTOR,
data=data_frame,
)
if static.ENABLE_STATS:
self.stats.push(frame_nack_counter=self.frame_nack_counter, status="wrong_crc", duration=duration)
self.log.info("[TNC] ARQ | RX | Sending NACK", finished=static.ARQ_SECONDS_UNTIL_FINISH, bytesperminute=static.ARQ_BYTES_PER_MINUTE)
self.send_burst_nack_frame(snr)
# Update arq_session timestamp
self.arq_session_last_received = int(time.time())
# Finally cleanup our buffers and states,
self.arq_cleanup()
def arq_transmit(self, data_out: bytes, mode: int, n_frames_per_burst: int):
"""
Transmit ARQ frame
@ -1068,8 +1065,6 @@ class DATA:
n_frames_per_burst:int:
"""
self.arq_file_transfer = True
# set signalling modes we want to listen to
# we are in an ongoing arq transmission, so we don't need sig0 actually
modem.RECEIVE_SIG0 = False
@ -1078,9 +1073,9 @@ class DATA:
self.tx_n_retry_of_burst = 0 # retries we already sent data
# Maximum number of retries to send before declaring a frame is lost
# save len of data_out to TOTAL_BYTES for our statistics --> kBytes
# static.TOTAL_BYTES = round(len(data_out) / 1024, 2)
# save len of data_out to TOTAL_BYTES for our statistics
static.TOTAL_BYTES = len(data_out)
self.arq_file_transfer = True
frame_total_size = len(data_out).to_bytes(4, byteorder="big")
# Compress data frame
@ -1135,21 +1130,6 @@ class DATA:
while not self.data_frame_ack_received and static.ARQ_STATE:
# we have self.tx_n_max_retries_per_burst attempts for sending a burst
for self.tx_n_retry_of_burst in range(self.tx_n_max_retries_per_burst):
# data_mode = mode
# self.log.debug("[TNC] FIXED MODE:", mode=FREEDV_MODE(data_mode).name)
# we are doing a modulo check of transmission retries of the actual burst
# every 2nd retry which fails, decreases speedlevel by 1.
# as soon as we received an ACK for the current burst, speed_level will increase
# by 1.
# The intent is to optimize speed by adapting to the current RF conditions.
# if not self.tx_n_retry_of_burst % 2 and self.tx_n_retry_of_burst > 0:
# self.speed_level = max(self.speed_level - 1, 0)
# if self.tx_n_retry_of_burst <= 1:
# self.speed_level += 1
# self.speed_level = max(self.speed_level + 1, len(self.mode_list) - 1)
# Bound speed level to:
# - minimum of either the speed or the length of mode list - 1
# - maximum of either the speed or zero
@ -1169,9 +1149,6 @@ class DATA:
# Payload information
payload_per_frame = modem.get_bytes_per_frame(data_mode) - 2
# Tempbuffer list for storing our data frames
tempbuffer = []
# Append data frames with n_frames_per_burst to tempbuffer
# TODO: this part needs a complete rewrite!
# n_frames_per_burst = 1 is working
@ -1197,9 +1174,7 @@ class DATA:
)
frame = arqheader + extended_data_out
# Append frame to tempbuffer for transmission
tempbuffer.append(frame)
tempbuffer = [frame]
self.log.debug("[TNC] tempbuffer:", tempbuffer=tempbuffer)
self.log.info(
"[TNC] ARQ | TX | FRAMES",
@ -1212,18 +1187,12 @@ class DATA:
self.enqueue_frame_for_tx([t_buf_item], c2_mode=data_mode)
# After transmission finished, wait for an ACK or RPT frame
# burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100
# while (not self.burst_ack and not self.burst_nack and
# not self.rpt_request_received and not self.data_frame_ack_received and
# time.time() < burstacktimeout and static.ARQ_STATE):
# threading.Event().wait(0.01)
# burstacktimeout = time.time() + self.burst_ack_timeout_seconds + 100
while static.ARQ_STATE and not (
self.burst_ack
or self.burst_nack
or self.rpt_request_received
or self.data_frame_ack_received
while (
static.ARQ_STATE
and not self.burst_ack
and not self.burst_nack
and not self.rpt_request_received
and not self.data_frame_ack_received
):
threading.Event().wait(0.01)
@ -1239,20 +1208,15 @@ class DATA:
if self.burst_nack:
self.burst_nack = False # reset nack state
# not yet implemented
if self.rpt_request_received:
pass
if self.data_frame_ack_received:
self.log.debug(
"[TNC] arq_transmit: Received FRAME ACK. Sending next chunk."
"[TNC] arq_transmit: Received FRAME ACK. Braking retry loop."
)
break # break retry loop
# We need this part for leaving the repeat loop
# static.ARQ_STATE == "DATA" --> when stopping transmission manually
if not static.ARQ_STATE:
# print("not ready for data...leaving loop....")
self.log.debug(
"[TNC] arq_transmit: ARQ State changed to FALSE. Breaking retry loop."
)
@ -1268,7 +1232,6 @@ class DATA:
maxretries=self.tx_n_max_retries_per_burst,
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
# End of FOR loop
# update buffer position
bufferposition = bufferposition_end
@ -1302,6 +1265,20 @@ class DATA:
# GOING TO NEXT ITERATION
if self.data_frame_ack_received:
self.arq_transmit_success()
else:
self.arq_transmit_failed()
if TESTMODE:
# Quit after transmission
self.log.debug("[TNC] TESTMODE: arq_transmit exiting.")
sys.exit(0)
def arq_transmit_success(self):
"""
will be called if we successfully transmitted all of queued data
"""
# we need to wait until sending "transmitted" state
# gui database is too slow for handling this within 0.001 seconds
# so let's sleep a little
@ -1332,7 +1309,10 @@ class DATA:
# finally do an arq cleanup
self.arq_cleanup()
else:
def arq_transmit_failed(self):
"""
will be called if we not successfully transmitted all of queued data
"""
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",
@ -1351,14 +1331,8 @@ class DATA:
overflows=static.BUFFER_OVERFLOW_COUNTER,
)
self.stop_transmission()
if TESTMODE:
# Quit after transmission
self.log.debug("[TNC] TESTMODE: arq_transmit exiting.")
sys.exit(0)
def burst_ack_nack_received(self, data_in: bytes) -> None:
"""
Received an ACK/NACK for a transmitted frame, keep track and
@ -1393,6 +1367,8 @@ class DATA:
self.burst_nack_counter = 0
# Reset n retries per burst counter
self.n_retries_per_burst = 0
self.burst_ack_snr = helpers.snr_from_bytes(data_in[2:3])
else:
# Decrease speed level if we received a burst nack
# self.speed_level = max(self.speed_level - 1, 0)
@ -1400,7 +1376,7 @@ class DATA:
self.burst_nack = True
# Increment burst nack counter
self.burst_nack_counter += 1
desc = "nack"
self.burst_ack_snr = 'NaN'
# Update data_channel timestamp
self.data_channel_last_received = int(time.time())
@ -1411,12 +1387,6 @@ class DATA:
self.speed_level = int.from_bytes(bytes(data_in[3:4]), "big")
static.ARQ_SPEED_LEVEL = self.speed_level
#self.log.debug(
# f"[TNC] burst_{desc}_received:",
# speed_level=self.speed_level,
# c2_mode=FREEDV_MODE(self.mode_list[self.speed_level]).name,
#)
def frame_ack_received(
self, data_in: bytes # pylint: disable=unused-argument
) -> None:
@ -1494,7 +1464,8 @@ class DATA:
"""
# Only process data if we are in ARQ and BUSY state
if static.ARQ_STATE and static.TNC_STATE == "BUSY":
if not static.ARQ_STATE or static.TNC_STATE != "BUSY":
return
static.DXGRID = b'------'
helpers.add_to_heard_stations(
static.DXCALLSIGN,
@ -1514,8 +1485,7 @@ class DATA:
for i in range(0, 6, 2):
if not missing_area[i: i + 2].endswith(b"\x00\x00"):
missing = missing_area[i: i + 2]
self.rpt_request_buffer.insert(0, missing)
self.rpt_request_buffer.insert(0, missing_area[i: i + 2])
############################################################################################################
# ARQ SESSION HANDLER
@ -1803,7 +1773,6 @@ class DATA:
data_in:bytes:
"""
print(static.ARQ_SESSION_STATE)
# We've arrived here from process_data which already checked that the frame
# is intended for this station.
# Close the session if the CRC matches the remote station in static.
@ -2169,7 +2138,7 @@ class DATA:
self.mode_list = self.mode_list_high_bw
self.time_list = self.time_list_high_bw
self.snr_list = self.snr_list_high_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_W.value and static.LOW_BANDWIDTH_MODE:
elif frametype == FR_TYPE.ARQ_DC_OPEN_W.value:
# ISS(w) <-> IRS(n)
constellation = "ISS(w) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = False
@ -2183,7 +2152,7 @@ class DATA:
self.mode_list = self.mode_list_low_bw
self.time_list = self.time_list_low_bw
self.snr_list = self.snr_list_low_bw
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value and static.LOW_BANDWIDTH_MODE:
elif frametype == FR_TYPE.ARQ_DC_OPEN_N.value:
# ISS(n) <-> IRS(n)
constellation = "ISS(n) <-> IRS(n)"
self.received_LOW_BANDWIDTH_MODE = True
@ -2198,7 +2167,7 @@ class DATA:
self.snr_list = self.snr_list_low_bw
# get mode which fits to given SNR
# initially set speed_level 0 in case of really bad SNR and no matching mode
# initially set speed_level 0 in case of bad SNR and no matching mode
self.speed_level = 0
for i in range(len(self.mode_list)):
if static.SNR >= self.snr_list[i]:
@ -2260,8 +2229,6 @@ class DATA:
connection_frame[:1] = frametype
connection_frame[1:2] = self.session_id
connection_frame[8:9] = bytes([self.speed_level])
# For checking protocol version on the receiving side
connection_frame[13:14] = bytes([static.ARQ_PROTOCOL_VERSION])
self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
@ -2405,8 +2372,8 @@ class DATA:
ping_frame[4:7] = helpers.get_crc_24(mycallsign)
ping_frame[7:13] = helpers.callsign_to_bytes(mycallsign)
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.datac0.value)
@ -2461,6 +2428,14 @@ class DATA:
snr=str(static.SNR),
)
if static.RESPOND_TO_CALL:
self.transmit_ping_ack()
def transmit_ping_ack(self):
"""
transmit a ping ack frame
called by def received_ping
"""
ping_frame = bytearray(self.length_sig0_frame)
ping_frame[:1] = bytes([FR_TYPE.PING_ACK.value])
ping_frame[1:4] = static.DXCALLSIGN_CRC
@ -2614,9 +2589,9 @@ class DATA:
beacon_frame[:1] = bytes([FR_TYPE.BEACON.value])
beacon_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign)
beacon_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("UTF-8"))
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
if static.ENABLE_FSK:
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.enqueue_frame_for_tx(
[beacon_frame],
c2_mode=FREEDV_MODE.fsk_ldpc_0.value,
@ -2694,10 +2669,10 @@ class DATA:
cq_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign)
cq_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("UTF-8"))
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.log.debug("[TNC] CQ Frame:", data=[cq_frame])
if static.ENABLE_FSK:
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.datac0.value, copies=1, repeat_delay=0)
@ -2769,7 +2744,6 @@ class DATA:
qrv_frame[7:11] = helpers.encode_grid(static.MYGRID.decode("UTF-8"))
qrv_frame[11:12] = helpers.snr_to_bytes(static.SNR)
if static.ENABLE_FSK:
self.log.info("[TNC] ENABLE FSK", state=static.ENABLE_FSK)
self.enqueue_frame_for_tx([qrv_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
@ -3015,6 +2989,7 @@ class DATA:
self.data_frame_ack_received = state
def set_listening_modes(self, enable_sig0: bool, enable_sig1: bool, mode: int) -> None:
# sourcery skip: extract-duplicate-method
"""
Function for setting the data modes we are listening to for saving cpu power
@ -3090,19 +3065,13 @@ class DATA:
print(time.time() - (self.burst_last_received + self.time_list[self.speed_level]))
print("-----------------------")
if modem_error_state:
self.log.warning(
"[TNC] Decoding Error",
attempt=self.n_retries_per_burst,
max_attempts=self.rx_n_max_retries_per_burst,
speed_level=self.speed_level,
)
else:
self.log.warning(
"[TNC] Burst timeout",
"[TNC] Burst decoding error or timeout",
attempt=self.n_retries_per_burst,
max_attempts=self.rx_n_max_retries_per_burst,
speed_level=self.speed_level,
modem_error_state=modem_error_state
)
# reset self.burst_last_received

View file

@ -537,17 +537,22 @@ class RF:
x = np.frombuffer(txbuffer, dtype=np.int16)
if static.AUDIO_AUTO_TUNE:
if static.HAMLIB_ALC == 0.0:
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL + 30
elif 0.0 < static.HAMLIB_ALC <= 0.8:
print("0.001 > static.HAMLIB_ALC <= 0.8")
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL + 20
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))
elif 0.8 < static.HAMLIB_ALC < 0.99:
print("0.8 > static.HAMLIB_ALC <= 0.99")
elif 0.0 < static.HAMLIB_ALC <= 0.1:
print("0.0 < static.HAMLIB_ALC <= 0.1")
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL + 2
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))
elif 1.0 <= static.HAMLIB_ALC:
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL - 2
elif 0.1 < static.HAMLIB_ALC < 0.2:
print("0.1 < static.HAMLIB_ALC < 0.2")
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))
elif 0.2 < static.HAMLIB_ALC < 0.99:
print("0.2 < static.HAMLIB_ALC < 0.99")
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL - 20
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))
elif 1.0 >=static.HAMLIB_ALC:
print("1.0 >= static.HAMLIB_ALC")
static.TX_AUDIO_LEVEL = static.TX_AUDIO_LEVEL - 40
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))
else:
self.log.debug("[MDM] AUDIO TUNE", audio_level=str(static.TX_AUDIO_LEVEL), alc_level=str(static.HAMLIB_ALC))

View file

@ -39,6 +39,8 @@ DAEMON_QUEUE = queue.Queue()
CONNECTED_CLIENTS = set()
CLOSE_SIGNAL = False
TESTMODE = False
log = structlog.get_logger("sock")
@ -50,11 +52,9 @@ class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
# noinspection PyTypeChecker
class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
""" """
connection_alive = False
connection_alive = False
log = structlog.get_logger("ThreadedTCPRequestHandler")
@ -91,7 +91,8 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
client.send(sock_data)
except Exception as err:
self.log.info("[SCK] Connection lost", e=err)
# TODO: Check if we really should set connection alive to false. This might disconnect all other clients as well...
# TODO: Check if we really should set connection alive to false.
# This might disconnect all other clients as well...
self.connection_alive = False
except Exception as err:
self.log.debug("[SCK] catch harmless RuntimeError: Set changed size during iteration", e=err)
@ -127,14 +128,14 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
# iterate thorugh data list
for commands in data:
if self.server.server_address[1] == static.PORT:
process_tnc_commands(commands)
self.process_tnc_commands(commands)
else:
process_daemon_commands(commands)
self.process_daemon_commands(commands)
# wait some time between processing multiple commands
# this is only a first test to avoid doubled transmission
# we might improve this by only processing one command or
# doing some kind of selection to determin which commands need to be dropped
# doing some kind of selection to determine which commands need to be dropped
# and which one can be processed during a running transmission
threading.Event().wait(0.5)
@ -184,14 +185,15 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
)
try:
CONNECTED_CLIENTS.remove(self.request)
except Exception:
except Exception as e:
self.log.warning(
"[SCK] client connection already removed from client list",
client=self.request,
e=e,
)
def process_tnc_commands(data):
# ------------------------ TNC COMMANDS
def process_tnc_commands(self, data):
"""
process tnc commands
@ -209,8 +211,134 @@ def process_tnc_commands(data):
received_json = json.loads(data)
log.debug("[SCK] CMD", command=received_json)
# ENABLE TNC LISTENING STATE -----------------------------------------------------
# ENABLE TNC LISTENING STATE
if received_json["type"] == "set" and received_json["command"] == "listen":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_listen(None, received_json)
else:
self.tnc_set_listen(received_json)
# START STOP AUDIO RECORDING
if received_json["type"] == "set" and received_json["command"] == "record_audio":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_record_audio(None, received_json)
else:
self.tnc_set_record_audio(received_json)
# SET ENABLE/DISABLE RESPOND TO CALL
if received_json["type"] == "set" and received_json["command"] == "respond_to_call":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_respond_to_call(None, received_json)
else:
self.tnc_set_respond_to_call(received_json)
# SET ENABLE RESPOND TO CQ
if received_json["type"] == "set" and received_json["command"] == "respond_to_cq":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_record_audio(None, received_json)
else:
self.tnc_set_record_audio(received_json)
# SET TX AUDIO LEVEL
if received_json["type"] == "set" and received_json["command"] == "tx_audio_level":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_tx_audio_level(None, received_json)
else:
self.tnc_set_tx_audio_level(received_json)
# TRANSMIT TEST FRAME
if received_json["type"] == "set" and received_json["command"] == "send_test_frame":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_send_test_frame(None, received_json)
else:
self.tnc_set_send_test_frame(received_json)
# CQ CQ CQ
if received_json["command"] == "cqcqcq":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_cqcqcq(None, received_json)
else:
self.tnc_cqcqcq(received_json)
# START_BEACON
if received_json["command"] == "start_beacon":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_start_beacon(None, received_json)
else:
self.tnc_start_beacon(received_json)
# STOP_BEACON
if received_json["command"] == "stop_beacon":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_stop_beacon(None, received_json)
else:
self.tnc_stop_beacon(received_json)
# PING
if received_json["type"] == "ping" and received_json["command"] == "ping":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_ping_ping(None, received_json)
else:
self.tnc_ping_ping(received_json)
# CONNECT
if received_json["type"] == "arq" and received_json["command"] == "connect":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_arq_connect(None, received_json)
else:
self.tnc_arq_connect(received_json)
# DISCONNECT
if received_json["type"] == "arq" and received_json["command"] == "disconnect":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_arq_disconnect(None, received_json)
else:
self.tnc_arq_disconnect(received_json)
# TRANSMIT RAW DATA
if received_json["type"] == "arq" and received_json["command"] == "send_raw":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_arq_send_raw(None, received_json)
else:
self.tnc_arq_send_raw(received_json)
# STOP TRANSMISSION
if received_json["type"] == "arq" and received_json["command"] == "stop_transmission":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_arq_stop_transmission(None, received_json)
else:
self.tnc_arq_stop_transmission(received_json)
# GET RX BUFFER
if received_json["type"] == "get" and received_json["command"] == "rx_buffer":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_get_rx_buffer(None, received_json)
else:
self.tnc_get_rx_buffer(received_json)
# DELETE RX BUFFER
if received_json["type"] == "set" and received_json["command"] == "del_rx_buffer":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_del_rx_buffer(None, received_json)
else:
self.tnc_set_del_rx_buffer(received_json)
# SET FREQUENCY
if received_json["type"] == "set" and received_json["command"] == "frequency":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_frequency(None, received_json)
else:
self.tnc_set_frequency(received_json)
# SET MODE
if received_json["type"] == "set" and received_json["command"] == "mode":
if TESTMODE:
ThreadedTCPRequestHandler.tnc_set_mode(None, received_json)
else:
self.tnc_set_mode(received_json)
except Exception as err:
log.error("[SCK] JSON decoding error", e=err)
def tnc_set_listen(self, received_json):
try:
static.LISTEN = received_json["state"] in ['true', 'True', True, "ON", "on"]
command_response("listen", True)
@ -228,8 +356,7 @@ def process_tnc_commands(data):
"[SCK] CQ command execution error", e=err, command=received_json
)
# START STOP AUDIO RECORDING -----------------------------------------------------
if received_json["type"] == "set" and received_json["command"] == "record_audio":
def tnc_set_record_audio(self, received_json):
try:
if not static.AUDIO_RECORD:
static.AUDIO_RECORD_FILE = wave.open(f"{int(time.time())}_audio_recording.wav", 'w')
@ -249,9 +376,7 @@ def process_tnc_commands(data):
"[SCK] CQ command execution error", e=err, command=received_json
)
# SET ENABLE/DISABLE RESPOND TO CALL -----------------------------------------------------
if received_json["type"] == "set" and received_json["command"] == "respond_to_call":
def tnc_set_respond_to_call(self, received_json):
try:
static.RESPOND_TO_CALL = received_json["state"] in ['true', 'True', True]
command_response("respond_to_call", True)
@ -262,8 +387,7 @@ def process_tnc_commands(data):
"[SCK] CQ command execution error", e=err, command=received_json
)
# SET ENABLE RESPOND TO CQ -----------------------------------------------------
if received_json["type"] == "set" and received_json["command"] == "respond_to_cq":
def tnc_set_respond_to_cq(self, received_json):
try:
static.RESPOND_TO_CQ = received_json["state"] in ['true', 'True', True]
command_response("respond_to_cq", True)
@ -274,11 +398,7 @@ def process_tnc_commands(data):
"[SCK] CQ command execution error", e=err, command=received_json
)
# SET TX AUDIO LEVEL -----------------------------------------------------
if (
received_json["type"] == "set"
and received_json["command"] == "tx_audio_level"
):
def tnc_set_tx_audio_level(self, received_json):
try:
static.TX_AUDIO_LEVEL = int(received_json["value"])
command_response("tx_audio_level", True)
@ -291,11 +411,7 @@ def process_tnc_commands(data):
command=received_json,
)
# TRANSMIT TEST FRAME ----------------------------------------------------
if (
received_json["type"] == "set"
and received_json["command"] == "send_test_frame"
):
def tnc_set_send_test_frame(self, received_json):
try:
DATA_QUEUE_TRANSMIT.put(["SEND_TEST_FRAME"])
command_response("send_test_frame", True)
@ -307,8 +423,7 @@ def process_tnc_commands(data):
command=received_json,
)
# CQ CQ CQ -----------------------------------------------------
if received_json["command"] == "cqcqcq":
def tnc_cqcqcq(self, received_json):
try:
DATA_QUEUE_TRANSMIT.put(["CQ"])
command_response("cqcqcq", True)
@ -319,8 +434,7 @@ def process_tnc_commands(data):
"[SCK] CQ command execution error", e=err, command=received_json
)
# START_BEACON -----------------------------------------------------
if received_json["command"] == "start_beacon":
def tnc_start_beacon(self, received_json):
try:
static.BEACON_STATE = True
interval = int(received_json["parameter"])
@ -334,8 +448,7 @@ def process_tnc_commands(data):
command=received_json,
)
# STOP_BEACON -----------------------------------------------------
if received_json["command"] == "stop_beacon":
def tnc_stop_beacon(self, received_json):
try:
log.warning("[SCK] Stopping beacon!")
static.BEACON_STATE = False
@ -349,8 +462,7 @@ def process_tnc_commands(data):
command=received_json,
)
# PING ----------------------------------------------------------
if received_json["type"] == "ping" and received_json["command"] == "ping":
def tnc_ping_ping(self, received_json):
# send ping frame and wait for ACK
try:
dxcallsign = received_json["dxcallsign"]
@ -383,8 +495,7 @@ def process_tnc_commands(data):
"[SCK] PING command execution error", e=err, command=received_json
)
# CONNECT ----------------------------------------------------------
if received_json["type"] == "arq" and received_json["command"] == "connect":
def tnc_arq_connect(self, received_json):
# pause our beacon first
static.BEACON_PAUSE = True
@ -442,8 +553,7 @@ def process_tnc_commands(data):
# allow beacon transmission again
static.BEACON_PAUSE = False
# DISCONNECT ----------------------------------------------------------
if received_json["type"] == "arq" and received_json["command"] == "disconnect":
def tnc_arq_disconnect(self, received_json):
try:
if static.ARQ_SESSION_STATE not in ["disconnecting", "disconnected", "failed"]:
DATA_QUEUE_TRANSMIT.put(["DISCONNECT"])
@ -466,8 +576,7 @@ def process_tnc_commands(data):
command=received_json,
)
# TRANSMIT RAW DATA -------------------------------------------
if received_json["type"] == "arq" and received_json["command"] == "send_raw":
def tnc_arq_send_raw(self, received_json):
static.BEACON_PAUSE = True
# wait some random time
@ -541,11 +650,7 @@ def process_tnc_commands(data):
command=received_json,
)
# STOP TRANSMISSION ----------------------------------------------------------
if (
received_json["type"] == "arq"
and received_json["command"] == "stop_transmission"
):
def tnc_arq_stop_transmission(self, received_json):
try:
if static.TNC_STATE == "BUSY" or static.ARQ_STATE:
DATA_QUEUE_TRANSMIT.put(["STOP"])
@ -559,7 +664,7 @@ def process_tnc_commands(data):
"[SCK] STOP command execution error", e=err, command=received_json
)
if received_json["type"] == "get" and received_json["command"] == "rx_buffer":
def tnc_get_rx_buffer(self, received_json):
try:
if not RX_BUFFER.empty():
output = {
@ -591,10 +696,7 @@ def process_tnc_commands(data):
command=received_json,
)
if (
received_json["type"] == "set"
and received_json["command"] == "del_rx_buffer"
):
def tnc_set_del_rx_buffer(self, received_json):
try:
RX_BUFFER.queue.clear()
command_response("del_rx_buffer", True)
@ -606,21 +708,7 @@ def process_tnc_commands(data):
command=received_json,
)
# SET FREQUENCY -----------------------------------------------------
if received_json["command"] == "frequency" and received_json["type"] == "set":
try:
RIGCTLD_COMMAND_QUEUE.put(["set_frequency", received_json["frequency"]])
command_response("set_frequency", True)
except Exception as err:
command_response("set_frequency", False)
log.warning(
"[SCK] Set frequency command execution error",
e=err,
command=received_json,
)
# SET MODE -----------------------------------------------------
if received_json["command"] == "mode" and received_json["type"] == "set":
def tnc_set_mode(self, received_json):
try:
RIGCTLD_COMMAND_QUEUE.put(["set_mode", received_json["mode"]])
command_response("set_mode", True)
@ -632,77 +720,20 @@ def process_tnc_commands(data):
command=received_json,
)
def tnc_set_frequency(self, received_json):
try:
RIGCTLD_COMMAND_QUEUE.put(["set_frequency", received_json["frequency"]])
command_response("set_frequency", True)
except Exception as err:
log.error("[SCK] JSON decoding error", e=err)
def send_tnc_state():
"""
send the tnc state to network
"""
encoding = "utf-8"
output = {
"command": "tnc_state",
"ptt_state": str(static.PTT_STATE),
"tnc_state": str(static.TNC_STATE),
"arq_state": str(static.ARQ_STATE),
"arq_session": str(static.ARQ_SESSION),
"arq_session_state": str(static.ARQ_SESSION_STATE),
"audio_dbfs": str(static.AUDIO_DBFS),
"snr": str(static.SNR),
"frequency": str(static.HAMLIB_FREQUENCY),
"rf_level": str(static.HAMLIB_RF),
"strength": str(static.HAMLIB_STRENGTH),
"alc": str(static.HAMLIB_ALC),
"audio_level": str(static.TX_AUDIO_LEVEL),
"audio_auto_tune": str(static.AUDIO_AUTO_TUNE),
"speed_level": str(static.ARQ_SPEED_LEVEL),
"mode": str(static.HAMLIB_MODE),
"bandwidth": str(static.HAMLIB_BANDWIDTH),
"fft": str(static.FFT),
"channel_busy": str(static.CHANNEL_BUSY),
"scatter": static.SCATTER,
"rx_buffer_length": str(RX_BUFFER.qsize()),
"rx_msg_buffer_length": str(len(static.RX_MSG_BUFFER)),
"arq_bytes_per_minute": str(static.ARQ_BYTES_PER_MINUTE),
"arq_bytes_per_minute_burst": str(static.ARQ_BYTES_PER_MINUTE_BURST),
"arq_seconds_until_finish": str(static.ARQ_SECONDS_UNTIL_FINISH),
"arq_compression_factor": str(static.ARQ_COMPRESSION_FACTOR),
"arq_transmission_percent": str(static.ARQ_TRANSMISSION_PERCENT),
"speed_list": static.SPEED_LIST,
"total_bytes": str(static.TOTAL_BYTES),
"beacon_state": str(static.BEACON_STATE),
"stations": [],
"mycallsign": str(static.MYCALLSIGN, encoding),
"mygrid": str(static.MYGRID, encoding),
"dxcallsign": str(static.DXCALLSIGN, encoding),
"dxgrid": str(static.DXGRID, encoding),
"hamlib_status": static.HAMLIB_STATUS,
"listen": str(static.LISTEN),
"audio_recording": str(static.AUDIO_RECORD),
}
# add heard stations to heard stations object
for heard in static.HEARD_STATIONS:
output["stations"].append(
{
"dxcallsign": str(heard[0], "utf-8"),
"dxgrid": str(heard[1], "utf-8"),
"timestamp": heard[2],
"datatype": heard[3],
"snr": heard[4],
"offset": heard[5],
"frequency": heard[6],
}
command_response("set_frequency", False)
log.warning(
"[SCK] Set frequency command execution error",
e=err,
command=received_json,
)
return json.dumps(output)
# This appears to have been taken out of a class, but is never called because
# the `self.request.sendall` call is a syntax error as `self` is undefined and
# we don't see errors in use.
def process_daemon_commands(data):
# ------------------------ DAEMON COMMANDS
def process_daemon_commands(self, data):
"""
process daemon commands
@ -717,7 +748,27 @@ def process_daemon_commands(data):
# convert data to json object
received_json = json.loads(data)
log.debug("[SCK] CMD", command=received_json)
if received_json["type"] == "set" and received_json["command"] == "mycallsign":
self.daemon_set_mycallsign(received_json)
if received_json["type"] == "set" and received_json["command"] == "mygrid":
self.daemon_set_mygrid(received_json)
if (
received_json["type"] == "set"
and received_json["command"] == "start_tnc"
and not static.TNCSTARTED
):
self.daemon_start_tnc(received_json)
if received_json["type"] == "get" and received_json["command"] == "test_hamlib":
self.daemon_test_hamlib(received_json)
if received_json["type"] == "set" and received_json["command"] == "stop_tnc":
self.daemon_stop_tnc(received_json)
def daemon_set_mycallsign(self, received_json):
try:
callsign = received_json["parameter"]
@ -742,7 +793,7 @@ def process_daemon_commands(data):
command_response("mycallsign", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
if received_json["type"] == "set" and received_json["command"] == "mygrid":
def daemon_set_mygrid(self, received_json):
try:
mygrid = received_json["parameter"]
@ -757,11 +808,7 @@ def process_daemon_commands(data):
command_response("mygrid", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
if (
received_json["type"] == "set"
and received_json["command"] == "start_tnc"
and not static.TNCSTARTED
):
def daemon_start_tnc(self, received_json):
try:
startparam = received_json["parameter"][0]
@ -832,7 +879,20 @@ def process_daemon_commands(data):
command_response("start_tnc", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
if received_json["type"] == "get" and received_json["command"] == "test_hamlib":
def daemon_stop_tnc(self, received_json):
try:
static.TNCPROCESS.kill()
# unregister process from atexit to avoid process zombies
atexit.unregister(static.TNCPROCESS.kill)
log.warning("[SCK] Stopping TNC")
static.TNCSTARTED = False
command_response("stop_tnc", True)
except Exception as err:
command_response("stop_tnc", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
def daemon_test_hamlib(self, received_json):
try:
radiocontrol = str(received_json["parameter"][0]["radiocontrol"])
rigctld_ip = str(received_json["parameter"][0]["rigctld_ip"])
@ -851,19 +911,6 @@ def process_daemon_commands(data):
command_response("test_hamlib", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
if received_json["type"] == "set" and received_json["command"] == "stop_tnc":
try:
static.TNCPROCESS.kill()
# unregister process from atexit to avoid process zombies
atexit.unregister(static.TNCPROCESS.kill)
log.warning("[SCK] Stopping TNC")
static.TNCSTARTED = False
command_response("stop_tnc", True)
except Exception as err:
command_response("stop_tnc", False)
log.warning("[SCK] command execution error", e=err, command=received_json)
def send_daemon_state():
"""
@ -897,6 +944,69 @@ def send_daemon_state():
return None
def send_tnc_state():
"""
send the tnc state to network
"""
encoding = "utf-8"
output = {
"command": "tnc_state",
"ptt_state": str(static.PTT_STATE),
"tnc_state": str(static.TNC_STATE),
"arq_state": str(static.ARQ_STATE),
"arq_session": str(static.ARQ_SESSION),
"arq_session_state": str(static.ARQ_SESSION_STATE),
"audio_dbfs": str(static.AUDIO_DBFS),
"snr": str(static.SNR),
"frequency": str(static.HAMLIB_FREQUENCY),
"rf_level": str(static.HAMLIB_RF),
"strength": str(static.HAMLIB_STRENGTH),
"alc": str(static.HAMLIB_ALC),
"audio_level": str(static.TX_AUDIO_LEVEL),
"audio_auto_tune": str(static.AUDIO_AUTO_TUNE),
"speed_level": str(static.ARQ_SPEED_LEVEL),
"mode": str(static.HAMLIB_MODE),
"bandwidth": str(static.HAMLIB_BANDWIDTH),
"fft": str(static.FFT),
"channel_busy": str(static.CHANNEL_BUSY),
"scatter": static.SCATTER,
"rx_buffer_length": str(RX_BUFFER.qsize()),
"rx_msg_buffer_length": str(len(static.RX_MSG_BUFFER)),
"arq_bytes_per_minute": str(static.ARQ_BYTES_PER_MINUTE),
"arq_bytes_per_minute_burst": str(static.ARQ_BYTES_PER_MINUTE_BURST),
"arq_seconds_until_finish": str(static.ARQ_SECONDS_UNTIL_FINISH),
"arq_compression_factor": str(static.ARQ_COMPRESSION_FACTOR),
"arq_transmission_percent": str(static.ARQ_TRANSMISSION_PERCENT),
"speed_list": static.SPEED_LIST,
"total_bytes": str(static.TOTAL_BYTES),
"beacon_state": str(static.BEACON_STATE),
"stations": [],
"mycallsign": str(static.MYCALLSIGN, encoding),
"mygrid": str(static.MYGRID, encoding),
"dxcallsign": str(static.DXCALLSIGN, encoding),
"dxgrid": str(static.DXGRID, encoding),
"hamlib_status": static.HAMLIB_STATUS,
"listen": str(static.LISTEN),
"audio_recording": str(static.AUDIO_RECORD),
}
# add heard stations to heard stations object
for heard in static.HEARD_STATIONS:
output["stations"].append(
{
"dxcallsign": str(heard[0], "utf-8"),
"dxgrid": str(heard[1], "utf-8"),
"timestamp": heard[2],
"datatype": heard[3],
"snr": heard[4],
"offset": heard[5],
"frequency": heard[6],
}
)
return json.dumps(output)
def command_response(command, status):
s_status = "OK" if status else "Failed"
jsondata = {"command_response": command, "status": s_status}

View file