FreeDATA/modem/helpers.py

760 lines
24 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
"""
Created on Fri Dec 25 21:25:14 2020
@author: DJ2LS
"""
import time
2023-01-05 15:28:41 +01:00
from datetime import datetime,timezone
2021-03-12 14:14:36 +01:00
import crcengine
import structlog
2022-11-11 11:18:07 +01:00
import numpy as np
import threading
import hashlib
import hmac
2023-08-18 08:10:17 +02:00
import os
import sys
2023-08-26 16:36:24 +02:00
from pathlib import Path
2024-02-10 21:28:07 +01:00
import platform
import subprocess
import psutil
import glob
2023-08-26 16:36:24 +02:00
2022-06-01 02:35:35 +02:00
log = structlog.get_logger("helpers")
def wait(seconds: float) -> bool:
"""
Args:
seconds:
Returns:
"""
2021-08-14 20:00:32 +02:00
timeout = time.time() + seconds
2021-09-25 15:24:25 +02:00
2021-08-14 20:00:32 +02:00
while time.time() < timeout:
threading.Event().wait(0.01)
return True
2022-05-09 02:41:49 +02:00
2023-11-28 20:00:39 +01:00
def get_crc_8(data: str) -> bytes:
"""Author: DJ2LS
2022-05-09 02:41:49 +02:00
2021-03-12 14:39:36 +01:00
Get the CRC8 of a byte string
2022-05-09 02:41:49 +02:00
2021-03-12 14:39:36 +01:00
param: data = bytes()
Args:
data:
Returns:
CRC-8 (CCITT) of the provided data as bytes
2021-03-12 14:39:36 +01:00
"""
2023-12-13 23:07:06 +01:00
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data,"utf-8")
crc_algorithm = crcengine.new("crc8-ccitt") # load crc8 library
crc_data = crc_algorithm(data)
crc_data = crc_data.to_bytes(1, byteorder="big")
return crc_data
2023-11-28 20:00:39 +01:00
def get_crc_16(data: str) -> bytes:
"""Author: DJ2LS
2022-05-09 02:41:49 +02:00
2021-03-12 14:39:36 +01:00
Get the CRC16 of a byte string
2022-05-09 02:41:49 +02:00
2021-03-12 14:39:36 +01:00
param: data = bytes()
Args:
data:
Returns:
CRC-16 (CCITT) of the provided data as bytes
2021-03-12 14:39:36 +01:00
"""
2023-12-13 23:07:06 +01:00
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
2023-11-18 09:28:43 +01:00
data = bytes(data,"utf-8")
crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library
2023-11-18 09:28:43 +01:00
return crc_algorithm(data).to_bytes(2, byteorder="big")
2021-03-12 14:14:36 +01:00
2023-11-28 20:00:39 +01:00
def get_crc_24(data: str) -> bytes:
"""Author: DJ2LS
2022-05-09 02:41:49 +02:00
Get the CRC24-OPENPGP of a byte string
https://github.com/GardenTools/CrcEngine#examples
2022-05-09 02:41:49 +02:00
param: data = bytes()
Args:
data:
Returns:
CRC-24 (OpenPGP) of the provided data as bytes
"""
2023-12-13 23:07:06 +01:00
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
2023-11-19 08:30:47 +01:00
data = bytes(data,'utf-8')
2024-01-15 16:16:33 +01:00
params = crcengine.CrcParams(0x864cfb, 24, 0xb704ce, reflect_in=False, reflect_out=False, xor_out=0)
crc_algorithm = crcengine.create(params=params)
2023-11-18 09:28:43 +01:00
return crc_algorithm(data).to_bytes(3,byteorder="big")
2022-05-09 02:41:49 +02:00
2023-11-28 20:00:39 +01:00
def get_crc_32(data: str) -> bytes:
"""Author: DJ2LS
2022-05-09 02:41:49 +02:00
Get the CRC32 of a byte string
2022-05-09 02:41:49 +02:00
param: data = bytes()
Args:
2022-05-09 02:41:49 +02:00
data:
Returns:
CRC-32 of the provided data as bytes
"""
2023-12-13 23:07:06 +01:00
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data, "utf-8")
crc_algorithm = crcengine.new("crc32") # load crc32 library
2023-11-18 09:28:43 +01:00
return crc_algorithm(data).to_bytes(4, byteorder="big")
from datetime import datetime, timezone
import time
def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list, distance_km=None,
distance_miles=None):
"""
Args:
dxcallsign (str): The callsign of the DX station.
dxgrid (str): The Maidenhead grid square of the DX station.
datatype (str): The type of data received (e.g., FT8, CW).
snr (int): Signal-to-noise ratio of the received signal.
offset (float): Frequency offset.
frequency (float): Base frequency of the received signal.
heard_stations_list (list): List containing heard stations.
distance_km (float): Distance to the DX station in kilometers.
distance_miles (float): Distance to the DX station in miles.
Returns:
Nothing. The function updates the heard_stations_list in-place.
"""
# Convert current timestamp to an integer
current_timestamp = int(datetime.now(timezone.utc).timestamp())
2023-05-26 14:25:48 +02:00
# Initialize the new entry
new_entry = [
dxcallsign, dxgrid, current_timestamp, datatype, snr, offset, frequency, distance_km, distance_miles
]
# Check if the buffer is empty or if the callsign is not already in the list
if not any(dxcallsign == station[0] for station in heard_stations_list):
heard_stations_list.append(new_entry)
else:
# Search for the existing entry and update
for i, entry in enumerate(heard_stations_list):
if entry[0] == dxcallsign:
heard_stations_list[i] = new_entry
break
2021-09-25 15:24:25 +02:00
2023-11-19 08:30:47 +01:00
def callsign_to_bytes(callsign: str) -> bytes:
"""
Args:
callsign:
Returns:
"""
# http://www.aprs.org/aprs11/SSIDs.txt
# -0 Your primary station usually fixed and message capable
# -1 generic additional station, digi, mobile, wx, etc
# -2 generic additional station, digi, mobile, wx, etc
# -3 generic additional station, digi, mobile, wx, etc
# -4 generic additional station, digi, mobile, wx, etc
# -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc)
# -6 Special activity, Satellite ops, camping or 6 meters, etc
# -7 walkie talkies, HT's or other human portable
# -8 boats, sailboats, RV's or second main mobile
# -9 Primary Mobile (usually message capable)
# -10 internet, Igates, echolink, winlink, AVRS, APRN, etc
# -11 balloons, aircraft, spacecraft, etc
# -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc
# -13 Weather stations
# -14 Truckers or generally full time drivers
# -15 generic additional station, digi, mobile, wx, etc
2022-05-09 02:41:49 +02:00
# Try converting to bytestring if possible type string
2022-05-09 02:41:49 +02:00
try:
2023-11-19 08:30:47 +01:00
callsign = callsign.encode("utf-8")
except TypeError:
# This is expected depending on the type of the `callsign` argument.
# log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err)
pass
except Exception as err:
log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err, data=callsign)
# Need this step to reduce the needed payload by the callsign
# (stripping "-" out of the callsign)
callsign = callsign.split(b"-")
ssid = 0
try:
ssid = int(callsign[1])
except IndexError:
# This is expected when callsign doesn't have a dash.
# log.debug("[HLP] callsign_to_bytes: Error callsign SSID to integer:", e=err)
pass
except Exception as err:
2023-11-19 08:30:47 +01:00
log.debug("[HLP] callsign_to_bytes: Error splitting callsign/ssid:", e=err)
2022-05-09 02:41:49 +02:00
# callsign = callsign[0]
# bytestring = bytearray(8)
# bytestring[:len(callsign)] = callsign
# bytestring[7:8] = bytes([ssid])
# ---- callsign with encoding always 6 bytes long
callsign = callsign[0].decode("utf-8")
ssid = bytes([ssid]).decode("utf-8")
return encode_call(callsign + ssid)
# return bytes(bytestring)
2022-05-23 13:11:16 +02:00
def bytes_to_callsign(bytestring: bytes) -> bytes:
"""
Convert our callsign, received by a frame to a callsign in a human readable format
Args:
bytestring:
Returns:
bytes
"""
# http://www.aprs.org/aprs11/SSIDs.txt
# -0 Your primary station usually fixed and message capable
# -1 generic additional station, digi, mobile, wx, etc
# -2 generic additional station, digi, mobile, wx, etc
# -3 generic additional station, digi, mobile, wx, etc
# -4 generic additional station, digi, mobile, wx, etc
# -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc)
# -6 Special activity, Satellite ops, camping or 6 meters, etc
# -7 walkie talkies, HT's or other human portable
# -8 boats, sailboats, RV's or second main mobile
# -9 Primary Mobile (usually message capable)
# -10 internet, Igates, echolink, winlink, AVRS, APRN, etc
# -11 balloons, aircraft, spacecraft, etc
# -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc
# -13 Weather stations
# -14 Truckers or generally full time drivers
# -15 generic additional station, digi, mobile, wx, etc
2022-05-09 02:41:49 +02:00
# we need to do this step to reduce the needed paypload by the callsign ( stripping "-" out of the callsign )
"""
callsign = bytes(bytestring[:7])
callsign = callsign.rstrip(b"\x00")
ssid = int.from_bytes(bytes(bytestring[7:8]), "big")
callsign = callsign + b"-"
callsign = callsign.decode("utf-8")
callsign = callsign + str(ssid)
callsign = callsign.encode("utf-8")
2022-05-09 02:41:49 +02:00
return bytes(callsign)
"""
decoded = decode_call(bytestring)
callsign = decoded[:-1]
ssid = ord(bytes(decoded[-1], "utf-8"))
return bytes(f"{callsign}-{ssid}", "utf-8")
2022-03-19 12:42:10 +01:00
2023-11-19 08:30:47 +01:00
def check_callsign(callsign: str, crc_to_check: bytes, ssid_list):
2022-03-19 12:42:10 +01:00
"""
Function to check a crc against a callsign to calculate the
ssid by generating crc until we find the correct SSID
2022-03-19 12:42:10 +01:00
Args:
callsign: Callsign which we want to check
crc_to_check: The CRC which we want the callsign to check against
2022-03-19 12:42:10 +01:00
Returns:
[True, Callsign + SSID]
False
"""
print(callsign)
2023-11-19 08:30:47 +01:00
if not isinstance(callsign, (bytes)):
callsign = bytes(callsign,'utf-8')
2022-03-19 12:42:10 +01:00
try:
# We want the callsign without SSID
splitted_callsign = callsign.split(b"-")
callsign = splitted_callsign[0]
ssid = splitted_callsign[1].decode()
2022-05-09 02:41:49 +02:00
except IndexError:
# This is expected when `callsign` doesn't have a dash.
ssid = 0
except Exception as err:
2023-11-19 08:30:47 +01:00
log.debug("[HLP] check_callsign: Error converting to bytes:", e=err)
2022-05-09 02:41:49 +02:00
# ensure, we are always have the own ssid in ssid_list even if it is empty
if ssid not in ssid_list:
ssid_list.append(str(ssid))
for ssid in ssid_list:
2023-11-19 08:30:47 +01:00
call_with_ssid = callsign + b'-' + (str(ssid)).encode('utf-8')
callsign_crc = get_crc_24(call_with_ssid)
2023-12-16 12:54:16 +01:00
callsign_crc = callsign_crc.hex()
2022-03-19 12:42:10 +01:00
if callsign_crc == crc_to_check:
2023-12-16 12:54:16 +01:00
log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid, checksum=crc_to_check)
2023-11-29 19:09:01 +01:00
return [True, call_with_ssid.decode()]
2022-05-09 02:41:49 +02:00
log.debug("[HLP] check_callsign: Checking:", callsign=callsign, crc_to_check=crc_to_check, own_crc=callsign_crc)
return [False, b'']
2022-10-05 19:24:50 +02:00
def check_session_id(id: bytes, id_to_check: bytes):
"""
Funktion to check if we received the correct session id
Args:
id: our own session id
id_to_check: The session id byte we want to check
Returns:
True
False
"""
if id_to_check == b'\x00':
return False
2022-10-06 11:21:36 +02:00
log.debug("[HLP] check_sessionid: Checking:", ownid=id, check=id_to_check)
2022-10-05 20:28:47 +02:00
return id == id_to_check
2022-10-05 19:24:50 +02:00
def encode_grid(grid):
"""
2022-05-23 09:37:24 +02:00
@author: DB1UJ
Args:
grid:string: maidenhead QTH locater [a-r][a-r][0-9][0-9][a-x][a-x]
Returns:
4 bytes contains 26 bit valid data with encoded grid locator
"""
out_code_word = 0
grid = grid.upper() # upper case to be save
int_first = ord(grid[0]) - 65 # -65 offset for "A" become zero, utf8 table
int_sec = ord(grid[1]) - 65 # -65 offset for "A" become zero, utf8 table
int_val = (int_first * 18) + int_sec # encode for modulo devision, 2 numbers in 1
out_code_word = int_val & 0b111111111 # only 9 bit LSB A - R * A - R is needed
out_code_word <<= 9 # shift 9 bit left having space next bits, letter A-R * A-R
int_val = int(grid[2:4]) # number string to number int, highest value 99
out_code_word |= int_val & 0b1111111 # using bit OR to add new value
out_code_word <<= 7 # shift 7 bit left having space next bits, letter A-X
int_val = ord(grid[4]) - 65 # -65 offset for 'A' become zero, utf8 table
out_code_word |= int_val & 0b11111 # using bit OR to add new value
out_code_word <<= 5 # shift 5 bit left having space next bits, letter A-X
int_val = ord(grid[5]) - 65 # -65 offset for 'A' become zero, utf8 table
out_code_word |= int_val & 0b11111 # using bit OR to add new value
return out_code_word.to_bytes(length=4, byteorder="big")
def decode_grid(b_code_word: bytes):
"""
2022-05-23 09:37:24 +02:00
@author: DB1UJ
Args:
b_code_word:bytes: 4 bytes with 26 bit valid data LSB
Returns:
grid:str: upper case maidenhead QTH locater [A-R][A-R][0-9][0-9][A-X][A-X]
"""
code_word = int.from_bytes(b_code_word, byteorder="big", signed=False)
2022-05-09 02:41:49 +02:00
grid = chr((code_word & 0b11111) + 65)
code_word >>= 5
grid = chr((code_word & 0b11111) + 65) + grid
code_word >>= 7
grid = str(int(code_word & 0b1111111)) + grid
if (code_word & 0b1111111) < 10:
grid = f"0{grid}"
code_word >>= 9
int_val = int(code_word & 0b111111111)
int_first, int_sec = divmod(int_val, 18)
2022-12-26 12:49:01 +01:00
return chr(int(int_first) + 65) + chr(int(int_sec) + 65) + grid
def encode_call(call):
"""
2022-05-23 09:37:24 +02:00
@author: DB1UJ
Args:
call:string: ham radio call sign [A-Z,0-9], last char SSID 0-63
Returns:
6 bytes contains 6 bits/sign encoded 8 char call sign with binary SSID
(only upper letters + numbers, SSID)
"""
out_code_word = 0
call = call.upper() # upper case to be save
for char in call:
int_val = ord(char) - 48 # -48 reduce bits, begin with first number utf8 table
out_code_word <<= 6 # shift left 6 bit, making space for a new char
out_code_word |= (
int_val & 0b111111
) # bit OR adds the new char, masked with AND 0b111111
out_code_word >>= 6 # clean last char
out_code_word <<= 6 # make clean space
out_code_word |= ord(call[-1]) & 0b111111 # add the SSID uncoded only 0 - 63
2022-05-09 02:41:49 +02:00
return out_code_word.to_bytes(length=6, byteorder="big")
def decode_call(b_code_word: bytes):
"""
2022-05-23 09:37:24 +02:00
@author: DB1UJ
Args:
b_code_word:bytes: 6 bytes with 6 bits/sign valid data char signs LSB
Returns:
call:str: upper case ham radio call sign [A-Z,0-9] + binary SSID
"""
code_word = int.from_bytes(b_code_word, byteorder="big", signed=False)
ssid = chr(code_word & 0b111111) # save the uncoded binary SSID
call = str()
while code_word != 0:
call = chr((code_word & 0b111111) + 48) + call
code_word >>= 6
call = call[:-1] + ssid # remove the last char from call and replace with SSID
return call
2022-11-09 20:47:46 +01:00
2022-11-09 20:47:46 +01:00
def snr_to_bytes(snr):
"""create a byte from snr value """
# make sure we have onl 1 byte snr
# min max = -12.7 / 12.7
# enough for detecting if a channel is good or bad
snr = snr * 10
snr = np.clip(snr, -127, 127)
snr = int(snr).to_bytes(1, byteorder='big', signed=True)
return snr
2022-11-09 20:47:46 +01:00
def snr_from_bytes(snr):
"""create int from snr byte"""
snr = int.from_bytes(snr, byteorder='big', signed=True)
snr = snr / 10
2023-02-02 22:41:35 +01:00
return snr
def safe_execute(default, exception, function, *args):
"""
https://stackoverflow.com/a/36671208
from json import loads
safe_execute("Oh no, explosions occurred!", TypeError, loads, None)
"""
try:
return function(*args)
except exception:
return default
def return_key_from_object(default, obj, key):
2023-02-02 23:49:52 +01:00
2023-02-02 22:41:35 +01:00
try:
return obj[key]
except KeyError:
2023-02-05 14:35:10 +01:00
return default
def bool_to_string(state):
return "True" if state else "False"
def get_hmac_salt(dxcallsign: bytes, mycallsign: bytes):
2023-08-26 16:56:45 +02:00
filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt"
if sys.platform in ["linux"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
2023-08-18 08:10:17 +02:00
2023-08-26 16:56:45 +02:00
elif sys.platform in ["darwin"]:
2023-08-18 08:10:17 +02:00
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
2023-08-26 16:56:45 +02:00
subfolder = Path('hmac')
filepath = subfolder / filename
2023-08-18 08:10:17 +02:00
elif sys.platform in ["win32", "win64"]:
2023-08-26 16:56:45 +02:00
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
2023-08-18 08:10:17 +02:00
else:
2023-08-26 16:56:45 +02:00
try:
subfolder = Path('hmac')
filepath = subfolder / filename
except Exception as e:
log.error(
"[Modem] [HMAC] File lookup error", file=filepath,
2023-08-26 16:56:45 +02:00
)
2023-08-18 08:10:17 +02:00
# check if file exists else return false
if not check_if_file_exists(filepath):
return False
log.info("[SCK] [HMAC] File lookup", file=filepath)
2023-08-08 08:27:22 +02:00
try:
2023-08-18 08:10:17 +02:00
with open(filepath, "r") as file:
line = file.readlines()
2023-08-16 09:58:49 +02:00
hmac_salt = bytes(line[-1], "utf-8").split(b'\n')
hmac_salt = hmac_salt[0]
2023-08-18 08:10:17 +02:00
return hmac_salt if delete_last_line_from_hmac_list(filepath, -1) else False
2023-08-26 16:56:45 +02:00
except Exception as e:
log.warning("[SCK] [HMAC] File lookup failed", file=filepath, e=e)
return False
def search_hmac_salt(dxcallsign: bytes, mycallsign: bytes, search_token, data_frame, token_iters):
2023-08-26 16:13:51 +02:00
2023-08-26 16:18:53 +02:00
filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt"
2023-08-26 16:41:08 +02:00
if sys.platform in ["linux"]:
2023-08-18 08:10:17 +02:00
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
2023-08-26 16:41:08 +02:00
subfolder = Path('hmac')
filepath = subfolder / filename
elif sys.platform in ["darwin"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
2023-08-18 08:10:17 +02:00
elif sys.platform in ["win32", "win64"]:
2023-08-26 16:41:08 +02:00
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
2023-08-18 08:10:17 +02:00
else:
2023-08-26 16:41:08 +02:00
try:
subfolder = Path('hmac')
filepath = subfolder / filename
except Exception as e:
log.error(
"[Modem] [HMAC] File lookup error", file=filepath,
2023-08-26 16:41:08 +02:00
)
2023-08-18 08:10:17 +02:00
# check if file exists else return false
2023-08-26 16:41:08 +02:00
if not check_if_file_exists(filepath):
2023-08-26 16:16:48 +02:00
log.warning(
"[Modem] [HMAC] Token file not found", file=filepath,
2023-08-26 16:16:48 +02:00
)
2023-08-18 08:10:17 +02:00
return False
try:
2023-08-18 08:10:17 +02:00
with open(filepath, "r") as file:
token_list = file.readlines()
token_iters = min(token_iters, len(token_list))
for _ in range(1, token_iters + 1):
key = token_list[len(token_list) - _][:-1]
2023-08-16 09:58:49 +02:00
key = bytes(key, "utf-8")
search_digest = hmac.new(key, data_frame, hashlib.sha256).digest()[:4]
2023-10-22 10:04:10 +02:00
# TODO Remove this debugging information if not needed anymore
2023-08-18 08:10:17 +02:00
# print("-----------------------------------------")
# print(_)
# print(f" key-------------{key}")
# print(f" key-------------{token_list[len(token_list) - _][:-1]}")
# print(f" key-------------{key.hex()}")
# print(f" search token----{search_token.hex()}")
# print(f" search digest---{search_digest.hex()}")
2023-08-26 16:56:45 +02:00
if search_token.hex() == search_digest.hex():
token_position = len(token_list) - _
2023-08-18 08:10:17 +02:00
delete_last_line_from_hmac_list(filepath, token_position)
2023-08-26 16:13:51 +02:00
log.info(
"[Modem] [HMAC] Signature found", expected=search_token.hex(),
2023-08-16 09:58:49 +02:00
)
return True
2023-08-16 09:58:49 +02:00
log.warning(
"[Modem] [HMAC] Signature not found", expected=search_token.hex(), filepath=filepath,
2023-08-16 09:58:49 +02:00
)
return False
2023-08-16 09:58:49 +02:00
except Exception as e:
log.warning(
"[Modem] [HMAC] Lookup failed", e=e, expected=search_token,
2023-08-16 09:58:49 +02:00
)
return False
2023-08-18 08:10:17 +02:00
def delete_last_line_from_hmac_list(filepath, position):
# check if file exists else return false
if not check_if_file_exists(filepath):
return False
try:
linearray = []
2023-08-18 08:10:17 +02:00
with open(filepath, "r") as file:
linearray = file.readlines()[:position]
2023-08-19 10:54:39 +02:00
#print(linearray)
2023-08-18 08:10:17 +02:00
with open(filepath, "w") as file:
2023-08-19 10:54:39 +02:00
#print(linearray)
for line in linearray:
file.write(line)
return True
except Exception:
2023-08-18 08:10:17 +02:00
return False
def check_if_file_exists(path):
try:
2023-08-19 10:32:53 +02:00
# check if file size is present and filesize > 0
if os.path.isfile(path):
filesize = os.path.getsize(path)
if filesize > 0:
return True
else:
return False
else:
return False
2023-08-18 08:10:17 +02:00
except Exception as e:
log.warning(
"[Modem] [FILE] Lookup failed", e=e, path=path,
2023-08-18 08:10:17 +02:00
)
return False
def set_bit(byte, position, value):
"""Set the bit at 'position' to 'value' in the given byte."""
if not 0 <= position <= 7:
raise ValueError("Position must be between 0 and 7")
if value:
return byte | (1 << position)
else:
return byte & ~(1 << position)
def get_bit(byte, position):
"""Get the boolean value of the bit at 'position' in the given byte."""
if not 0 <= position <= 7:
raise ValueError("Position must be between 0 and 7")
return (byte & (1 << position)) != 0
def set_flag(byte, flag_name, value, flag_dict):
"""Set the flag in the byte according to the flag dictionary.
# Define a dictionary mapping flag names to their bit positions
flag_dict = {
'FLAG1': 0, # Bit position for FLAG1
'FLAG2': 1, # Bit position for FLAG2, etc.
'FLAG3': 2
}
"""
if flag_name not in flag_dict:
raise ValueError(f"Unknown flag name: {flag_name}")
position = flag_dict[flag_name]
return set_bit(byte, position, value)
2024-02-10 21:28:07 +01:00
def get_flag(byte, flag_name, flag_dict):
"""Get the value of the flag from the byte according to the flag dictionary."""
if flag_name not in flag_dict:
raise ValueError(f"Unknown flag name: {flag_name}")
position = flag_dict[flag_name]
return get_bit(byte, position)
2024-02-10 21:28:07 +01:00
2024-02-22 10:49:30 +01:00
def find_binary_paths(binary_name="rigctld", search_system_wide=False):
2024-02-10 21:28:07 +01:00
"""
2024-02-22 10:49:30 +01:00
Search for a binary within the current working directory, its subdirectories, and optionally,
system-wide locations and the PATH environment variable.
2024-02-10 21:28:07 +01:00
:param binary_name: The base name of the binary to search for, without extension.
:param search_system_wide: Boolean flag to enable or disable system-wide search.
2024-02-22 10:49:30 +01:00
:return: A list of full paths to the binary if found, otherwise an empty list.
2024-02-10 21:28:07 +01:00
"""
2024-02-22 10:49:30 +01:00
binary_paths = [] # Initialize an empty list to store found paths
2024-02-10 21:28:07 +01:00
# Adjust binary name for Windows
if platform.system() == 'Windows':
binary_name += ".exe"
# Search in the current working directory and subdirectories
root_path = os.getcwd()
2024-02-10 21:28:07 +01:00
for dirpath, dirnames, filenames in os.walk(root_path):
if binary_name in filenames:
2024-02-22 10:49:30 +01:00
binary_paths.append(os.path.join(dirpath, binary_name))
# If system-wide search is enabled, look in system locations and PATH
if search_system_wide:
system_paths = os.environ.get('PATH', '').split(os.pathsep)
# Optionally add common binary locations for Unix-like and Windows systems
if platform.system() != 'Windows':
system_paths.extend(['/usr/bin', '/usr/local/bin', '/bin'])
else:
2024-02-22 10:49:30 +01:00
system_paths.extend(glob.glob("C:\\Program Files\\Hamlib*\\bin"))
system_paths.extend(glob.glob("C:\\Program Files (x86)\\Hamlib*\\bin"))
for path in system_paths:
potential_path = os.path.join(path, binary_name)
if os.path.isfile(potential_path):
2024-02-22 10:49:30 +01:00
binary_paths.append(potential_path)
return binary_paths
2024-02-10 21:28:07 +01:00
def kill_and_execute(binary_path, additional_args=None):
"""
Kills any running instances of the binary across Linux, macOS, and Windows, then starts a new one non-blocking.
:param binary_path: The full path to the binary to execute.
:param additional_args: A list of additional arguments to pass to the binary.
:return: subprocess.Popen object of the started process
"""
# Kill any existing instances of the binary
for proc in psutil.process_iter(attrs=['pid', 'name', 'cmdline']):
try:
cmdline = proc.info['cmdline']
# Ensure cmdline is iterable and not None
if cmdline and binary_path in ' '.join(cmdline):
proc.kill()
print(f"Killed running instance with PID: {proc.info['pid']}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass # Process no longer exists or no permission to kill
# Execute the binary with additional arguments non-blocking
command = [binary_path] + (additional_args if additional_args else [])
2024-02-11 08:48:49 +01:00
return subprocess.Popen(command)