Accepted some Sourcery solutions

This commit is contained in:
DJ2LS 2022-05-23 14:02:22 +02:00
parent 1bab085ca8
commit 7a530175f8
5 changed files with 47 additions and 53 deletions

View file

@ -300,33 +300,29 @@ class RF:
static.BUFFER_OVERFLOW_COUNTER[0] += 1
# Avoid buffer overflow by filling only if buffer not full and selected datachannel mode
if not self.datac1_buffer.nbuffer + length_x > self.datac1_buffer.size:
if RECEIVE_DATAC1:
self.datac1_buffer.push(x)
else:
if self.datac1_buffer.nbuffer + length_x > self.datac1_buffer.size:
static.BUFFER_OVERFLOW_COUNTER[1] += 1
elif RECEIVE_DATAC1:
self.datac1_buffer.push(x)
# Avoid buffer overflow by filling only if buffer not full and selected datachannel mode
if not self.datac3_buffer.nbuffer + length_x > self.datac3_buffer.size:
if RECEIVE_DATAC3:
self.datac3_buffer.push(x)
else:
if self.datac3_buffer.nbuffer + length_x > self.datac3_buffer.size:
static.BUFFER_OVERFLOW_COUNTER[2] += 1
elif RECEIVE_DATAC3:
self.datac3_buffer.push(x)
# Avoid buffer overflow by filling only if buffer not full and selected datachannel mode
if not self.fsk_ldpc_buffer_0.nbuffer + length_x > self.fsk_ldpc_buffer_0.size:
if static.ENABLE_FSK:
self.fsk_ldpc_buffer_0.push(x)
else:
if self.fsk_ldpc_buffer_0.nbuffer + length_x > self.fsk_ldpc_buffer_0.size:
static.BUFFER_OVERFLOW_COUNTER[3] += 1
elif static.ENABLE_FSK:
self.fsk_ldpc_buffer_0.push(x)
# Avoid buffer overflow by filling only if buffer not full and selected datachannel mode
if not self.fsk_ldpc_buffer_1.nbuffer + length_x > self.fsk_ldpc_buffer_1.size:
if RECEIVE_FSK_LDPC_1 and static.ENABLE_FSK:
self.fsk_ldpc_buffer_1.push(x)
else:
if self.fsk_ldpc_buffer_1.nbuffer + length_x > self.fsk_ldpc_buffer_1.size:
static.BUFFER_OVERFLOW_COUNTER[4] += 1
elif RECEIVE_FSK_LDPC_1 and static.ENABLE_FSK:
self.fsk_ldpc_buffer_1.push(x)
if len(self.modoutqueue) <= 0 or self.mod_out_locked:
data_out48k = np.zeros(frames, dtype=np.int16)

View file

@ -20,7 +20,7 @@ else:
# try importing hamlib
try:
# get python version
python_version = str(sys.version_info[0]) + "." + str(sys.version_info[1])
python_version = f"{str(sys.version_info[0])}.{str(sys.version_info[1])}"
# installation path for Ubuntu 20.04 LTS python modules
# sys.path.append('/usr/local/lib/python'+ python_version +'/site-packages')
@ -29,7 +29,7 @@ try:
sys.path.append('/usr/local/lib/')
# installation path for Suse
sys.path.append('/usr/local/lib64/python' + python_version + '/site-packages')
sys.path.append(f'/usr/local/lib64/python{python_version}/site-packages')
# everything else... not nice, but an attempt to see how its running within app bundle
# this is not needed as python will be shipped with app bundle
@ -53,19 +53,21 @@ try:
structlog.get_logger("structlog").warning("[RIG] Hamlib outdated", found=hamlib_version, recommend=min_hamlib_version)
except Exception as e:
structlog.get_logger("structlog").warning("[RIG] Python Hamlib binding not found", error=e)
try:
structlog.get_logger("structlog").warning("[RIG] Trying to open rigctl")
rigctl = subprocess.Popen("rigctl -V", shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
hamlib_version = rigctl.stdout.readline()
hamlib_version = hamlib_version.split(' ')
if hamlib_version[1] != 'Hamlib':
raise Exception from e
structlog.get_logger("structlog").warning("[RIG] Rigctl found! Please try using this", version=hamlib_version[2])
if hamlib_version[1] == 'Hamlib':
structlog.get_logger("structlog").warning("[RIG] Rigctl found! Please try using this", version=hamlib_version[2])
sys.exit()
else:
raise Exception
sys.exit()
except Exception as e:
structlog.get_logger("structlog").critical("[RIG] HAMLIB NOT INSTALLED", error=e)
hamlib_version = 0
sys.exit()
@ -118,7 +120,7 @@ class radio:
# get devicenumber by looking for deviceobject in Hamlib module
try:
self.devicenumber = int(getattr(Hamlib, self.devicename))
except:
except Exception:
structlog.get_logger("structlog").error("[RIG] Hamlib: rig not supported...")
self.devicenumber = 0
@ -184,7 +186,7 @@ class radio:
structlog.get_logger("structlog").error("[RIG] Hamlib has no permissions", e = error)
help_url = 'https://github.com/DJ2LS/FreeDATA/wiki/UBUNTU-Manual-installation#1-permissions'
structlog.get_logger("structlog").error("[RIG] HELP:", check = help_url)
except:
except Exception:
structlog.get_logger("structlog").info("[RIG] Hamlib device opened", status='SUCCESS')
# set ptt to false if ptt is stuck for some reason

View file

@ -79,7 +79,7 @@ class radio:
structlog.get_logger("structlog").warning("[RIGCTL] Radio not found. Using DUMMY!", error=e)
# set deviceport to dummy port, if we selected dummy model
if self.devicenumber == 1 or self.devicenumber == 6:
if self.devicenumber in {1, 6}:
self.deviceport = '/dev/ttyUSB0'
print(self.devicenumber, self.deviceport, self.serialspeed)
@ -101,14 +101,14 @@ class radio:
def get_frequency(self):
""" """
cmd = self.cmd + ' f'
cmd = f'{self.cmd} f'
sw_proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
time.sleep(0.5)
freq = sw_proc.communicate()[0]
#print('get_frequency', freq, sw_proc.communicate())
try:
return int(freq)
except:
except Exception:
return False
def get_mode(self):
@ -117,7 +117,7 @@ class radio:
#return Hamlib.rig_strrmode(hamlib_mode)
try:
return 'PKTUSB'
except:
except Exception:
return False
def get_bandwith(self):
@ -127,7 +127,7 @@ class radio:
try:
return bandwith
except:
except Exception:
return False
def set_mode(self, mode):
@ -144,14 +144,14 @@ class radio:
def get_ptt(self):
""" """
cmd = self.cmd + ' t'
cmd = f'{self.cmd} t'
sw_proc = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
time.sleep(0.5)
status = sw_proc.communicate()[0]
try:
return status
except:
except Exception:
return False
def set_ptt(self, state):
@ -163,18 +163,15 @@ class radio:
Returns:
"""
cmd = self.cmd + ' T '
cmd = f'{self.cmd} T '
print('set_ptt', state)
if state:
cmd = cmd + '1'
else:
cmd = cmd + '0'
cmd = f'{cmd}1' if state else f'{cmd}0'
print('set_ptt', cmd)
sw_proc = subprocess.Popen(cmd, shell=True, text=True)
try:
return state
except:
except Exception:
return False
def close_rig(self):

View file

@ -91,13 +91,13 @@ class radio():
if self.connected:
try:
self.connection.sendall(command+b'\n')
except:
except Exception:
structlog.get_logger("structlog").warning("[RIGCTLD] Command not executed!", command=command, ip=self.hostname, port=self.port)
self.connected = False
try:
return self.connection.recv(1024)
except:
except Exception:
structlog.get_logger("structlog").warning("[RIGCTLD] No command response!", command=command, ip=self.hostname, port=self.port)
self.connected = False
else:
@ -113,7 +113,7 @@ class radio():
data = data.split(b'\n')
mode = data[0]
return mode.decode("utf-8")
except:
except Exception:
return 0
def get_bandwith(self):
@ -123,7 +123,7 @@ class radio():
data = data.split(b'\n')
bandwith = data[1]
return bandwith.decode("utf-8")
except:
except Exception:
return 0
def get_frequency(self):
@ -131,14 +131,14 @@ class radio():
try:
frequency = self.send_command(b"f")
return frequency.decode("utf-8")
except:
except Exception:
return 0
def get_ptt(self):
""" """
try:
return self.send_command(b"t")
except:
except Exception:
return False
def set_ptt(self, state):
@ -156,5 +156,5 @@ class radio():
else:
self.send_command(b"T 0")
return state
except:
except Exception:
return False

View file

@ -162,9 +162,9 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
port=self.client_address[1])
try:
CONNECTED_CLIENTS.remove(self.request)
except:
except Exception as e:
structlog.get_logger("structlog").warning("[SCK] client connection already removed from client list",
client=self.request)
client=self.request, e=e)
def process_tnc_commands(data):
@ -308,13 +308,13 @@ def process_tnc_commands(data):
# check if specific callsign is set with different SSID than the TNC is initialized
try:
mycallsign = received_json["parameter"][0]["mycallsign"]
except:
except Exception:
mycallsign = static.MYCALLSIGN
# check if transmission uuid provided else set no-uuid
try:
arq_uuid = received_json["uuid"]
except:
except Exception:
arq_uuid = 'no-uuid'
if not len(base64data) % 4:
@ -502,8 +502,8 @@ def process_daemon_commands(data):
# print some debugging parameters
for item in received_json["parameter"][0]:
structlog.get_logger("structlog").debug("[DMN] TNC Startup Config : " + item,
value=received_json["parameter"][0][item])
structlog.get_logger("structlog").debug(f"[DMN] TNC Startup Config : {item}", value=received_json["parameter"][0][item])
DAEMON_QUEUE.put(['STARTTNC',
mycall,
@ -607,9 +607,8 @@ def send_daemon_state():
else:
output["daemon_state"].append({"status": "stopped"})
jsondata = json.dumps(output)
return json.dumps(output)
return jsondata
except Exception as e:
structlog.get_logger("structlog").warning("[SCK] error", e=e)
return None