FreeDATA/modem/helpers.py
2024-02-11 08:48:49 +01:00

770 lines
No EOL
24 KiB
Python

# -*- coding: utf-8 -*-
"""
Created on Fri Dec 25 21:25:14 2020
@author: DJ2LS
"""
import time
from datetime import datetime,timezone
import crcengine
import structlog
import numpy as np
import threading
import hashlib
import hmac
import os
import sys
from pathlib import Path
import platform
import subprocess
import psutil
log = structlog.get_logger("helpers")
def wait(seconds: float) -> bool:
"""
Args:
seconds:
Returns:
"""
timeout = time.time() + seconds
while time.time() < timeout:
threading.Event().wait(0.01)
return True
def get_crc_8(data: str) -> bytes:
"""Author: DJ2LS
Get the CRC8 of a byte string
param: data = bytes()
Args:
data:
Returns:
CRC-8 (CCITT) of the provided data as bytes
"""
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data,"utf-8")
crc_algorithm = crcengine.new("crc8-ccitt") # load crc8 library
crc_data = crc_algorithm(data)
crc_data = crc_data.to_bytes(1, byteorder="big")
return crc_data
def get_crc_16(data: str) -> bytes:
"""Author: DJ2LS
Get the CRC16 of a byte string
param: data = bytes()
Args:
data:
Returns:
CRC-16 (CCITT) of the provided data as bytes
"""
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data,"utf-8")
crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library
return crc_algorithm(data).to_bytes(2, byteorder="big")
def get_crc_24(data: str) -> bytes:
"""Author: DJ2LS
Get the CRC24-OPENPGP of a byte string
https://github.com/GardenTools/CrcEngine#examples
param: data = bytes()
Args:
data:
Returns:
CRC-24 (OpenPGP) of the provided data as bytes
"""
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data,'utf-8')
params = crcengine.CrcParams(0x864cfb, 24, 0xb704ce, reflect_in=False, reflect_out=False, xor_out=0)
crc_algorithm = crcengine.create(params=params)
return crc_algorithm(data).to_bytes(3,byteorder="big")
def get_crc_32(data: str) -> bytes:
"""Author: DJ2LS
Get the CRC32 of a byte string
param: data = bytes()
Args:
data:
Returns:
CRC-32 of the provided data as bytes
"""
if not isinstance(data, (bytes)) or isinstance(data, (bytearray)):
data = bytes(data, "utf-8")
crc_algorithm = crcengine.new("crc32") # load crc32 library
return crc_algorithm(data).to_bytes(4, byteorder="big")
def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list):
"""
Args:
dxcallsign:
dxgrid:
datatype:
snr:
offset:
frequency:
Returns:
Nothing
"""
# check if buffer empty
if len(heard_stations_list) == 0:
heard_stations_list.append(
[dxcallsign, dxgrid, int(datetime.now(timezone.utc).timestamp()), datatype, snr, offset, frequency]
)
# if not, we search and update
else:
for i in range(len(heard_stations_list)):
# Update callsign with new timestamp
if heard_stations_list[i].count(dxcallsign) > 0:
heard_stations_list[i] = [
dxcallsign,
dxgrid,
int(time.time()),
datatype,
snr,
offset,
frequency,
]
break
# Insert if nothing found
if i == len(heard_stations_list) - 1:
heard_stations_list.append(
[
dxcallsign,
dxgrid,
int(time.time()),
datatype,
snr,
offset,
frequency,
]
)
break
# for idx, item in enumerate(heard_stations_list):
# if dxcallsign in item:
# item = [dxcallsign, int(time.time())]
# heard_stations_list[idx] = item
def callsign_to_bytes(callsign: str) -> bytes:
"""
Args:
callsign:
Returns:
"""
# http://www.aprs.org/aprs11/SSIDs.txt
# -0 Your primary station usually fixed and message capable
# -1 generic additional station, digi, mobile, wx, etc
# -2 generic additional station, digi, mobile, wx, etc
# -3 generic additional station, digi, mobile, wx, etc
# -4 generic additional station, digi, mobile, wx, etc
# -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc)
# -6 Special activity, Satellite ops, camping or 6 meters, etc
# -7 walkie talkies, HT's or other human portable
# -8 boats, sailboats, RV's or second main mobile
# -9 Primary Mobile (usually message capable)
# -10 internet, Igates, echolink, winlink, AVRS, APRN, etc
# -11 balloons, aircraft, spacecraft, etc
# -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc
# -13 Weather stations
# -14 Truckers or generally full time drivers
# -15 generic additional station, digi, mobile, wx, etc
# Try converting to bytestring if possible type string
try:
callsign = callsign.encode("utf-8")
except TypeError:
# This is expected depending on the type of the `callsign` argument.
# log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err)
pass
except Exception as err:
log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err, data=callsign)
# Need this step to reduce the needed payload by the callsign
# (stripping "-" out of the callsign)
callsign = callsign.split(b"-")
ssid = 0
try:
ssid = int(callsign[1])
except IndexError:
# This is expected when callsign doesn't have a dash.
# log.debug("[HLP] callsign_to_bytes: Error callsign SSID to integer:", e=err)
pass
except Exception as err:
log.debug("[HLP] callsign_to_bytes: Error splitting callsign/ssid:", e=err)
# callsign = callsign[0]
# bytestring = bytearray(8)
# bytestring[:len(callsign)] = callsign
# bytestring[7:8] = bytes([ssid])
# ---- callsign with encoding always 6 bytes long
callsign = callsign[0].decode("utf-8")
ssid = bytes([ssid]).decode("utf-8")
return encode_call(callsign + ssid)
# return bytes(bytestring)
def bytes_to_callsign(bytestring: bytes) -> bytes:
"""
Convert our callsign, received by a frame to a callsign in a human readable format
Args:
bytestring:
Returns:
bytes
"""
# http://www.aprs.org/aprs11/SSIDs.txt
# -0 Your primary station usually fixed and message capable
# -1 generic additional station, digi, mobile, wx, etc
# -2 generic additional station, digi, mobile, wx, etc
# -3 generic additional station, digi, mobile, wx, etc
# -4 generic additional station, digi, mobile, wx, etc
# -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc)
# -6 Special activity, Satellite ops, camping or 6 meters, etc
# -7 walkie talkies, HT's or other human portable
# -8 boats, sailboats, RV's or second main mobile
# -9 Primary Mobile (usually message capable)
# -10 internet, Igates, echolink, winlink, AVRS, APRN, etc
# -11 balloons, aircraft, spacecraft, etc
# -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc
# -13 Weather stations
# -14 Truckers or generally full time drivers
# -15 generic additional station, digi, mobile, wx, etc
# we need to do this step to reduce the needed paypload by the callsign ( stripping "-" out of the callsign )
"""
callsign = bytes(bytestring[:7])
callsign = callsign.rstrip(b"\x00")
ssid = int.from_bytes(bytes(bytestring[7:8]), "big")
callsign = callsign + b"-"
callsign = callsign.decode("utf-8")
callsign = callsign + str(ssid)
callsign = callsign.encode("utf-8")
return bytes(callsign)
"""
decoded = decode_call(bytestring)
callsign = decoded[:-1]
ssid = ord(bytes(decoded[-1], "utf-8"))
return bytes(f"{callsign}-{ssid}", "utf-8")
def check_callsign(callsign: str, crc_to_check: bytes, ssid_list):
"""
Function to check a crc against a callsign to calculate the
ssid by generating crc until we find the correct SSID
Args:
callsign: Callsign which we want to check
crc_to_check: The CRC which we want the callsign to check against
Returns:
[True, Callsign + SSID]
False
"""
print(callsign)
if not isinstance(callsign, (bytes)):
callsign = bytes(callsign,'utf-8')
try:
# We want the callsign without SSID
splitted_callsign = callsign.split(b"-")
callsign = splitted_callsign[0]
ssid = splitted_callsign[1].decode()
except IndexError:
# This is expected when `callsign` doesn't have a dash.
ssid = 0
except Exception as err:
log.debug("[HLP] check_callsign: Error converting to bytes:", e=err)
# ensure, we are always have the own ssid in ssid_list even if it is empty
if ssid not in ssid_list:
ssid_list.append(str(ssid))
for ssid in ssid_list:
call_with_ssid = callsign + b'-' + (str(ssid)).encode('utf-8')
callsign_crc = get_crc_24(call_with_ssid)
callsign_crc = callsign_crc.hex()
if callsign_crc == crc_to_check:
log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid, checksum=crc_to_check)
return [True, call_with_ssid.decode()]
log.debug("[HLP] check_callsign: Checking:", callsign=callsign, crc_to_check=crc_to_check, own_crc=callsign_crc)
return [False, b'']
def check_session_id(id: bytes, id_to_check: bytes):
"""
Funktion to check if we received the correct session id
Args:
id: our own session id
id_to_check: The session id byte we want to check
Returns:
True
False
"""
if id_to_check == b'\x00':
return False
log.debug("[HLP] check_sessionid: Checking:", ownid=id, check=id_to_check)
return id == id_to_check
def encode_grid(grid):
"""
@author: DB1UJ
Args:
grid:string: maidenhead QTH locater [a-r][a-r][0-9][0-9][a-x][a-x]
Returns:
4 bytes contains 26 bit valid data with encoded grid locator
"""
out_code_word = 0
grid = grid.upper() # upper case to be save
int_first = ord(grid[0]) - 65 # -65 offset for "A" become zero, utf8 table
int_sec = ord(grid[1]) - 65 # -65 offset for "A" become zero, utf8 table
int_val = (int_first * 18) + int_sec # encode for modulo devision, 2 numbers in 1
out_code_word = int_val & 0b111111111 # only 9 bit LSB A - R * A - R is needed
out_code_word <<= 9 # shift 9 bit left having space next bits, letter A-R * A-R
int_val = int(grid[2:4]) # number string to number int, highest value 99
out_code_word |= int_val & 0b1111111 # using bit OR to add new value
out_code_word <<= 7 # shift 7 bit left having space next bits, letter A-X
int_val = ord(grid[4]) - 65 # -65 offset for 'A' become zero, utf8 table
out_code_word |= int_val & 0b11111 # using bit OR to add new value
out_code_word <<= 5 # shift 5 bit left having space next bits, letter A-X
int_val = ord(grid[5]) - 65 # -65 offset for 'A' become zero, utf8 table
out_code_word |= int_val & 0b11111 # using bit OR to add new value
return out_code_word.to_bytes(length=4, byteorder="big")
def decode_grid(b_code_word: bytes):
"""
@author: DB1UJ
Args:
b_code_word:bytes: 4 bytes with 26 bit valid data LSB
Returns:
grid:str: upper case maidenhead QTH locater [A-R][A-R][0-9][0-9][A-X][A-X]
"""
code_word = int.from_bytes(b_code_word, byteorder="big", signed=False)
grid = chr((code_word & 0b11111) + 65)
code_word >>= 5
grid = chr((code_word & 0b11111) + 65) + grid
code_word >>= 7
grid = str(int(code_word & 0b1111111)) + grid
if (code_word & 0b1111111) < 10:
grid = f"0{grid}"
code_word >>= 9
int_val = int(code_word & 0b111111111)
int_first, int_sec = divmod(int_val, 18)
return chr(int(int_first) + 65) + chr(int(int_sec) + 65) + grid
def encode_call(call):
"""
@author: DB1UJ
Args:
call:string: ham radio call sign [A-Z,0-9], last char SSID 0-63
Returns:
6 bytes contains 6 bits/sign encoded 8 char call sign with binary SSID
(only upper letters + numbers, SSID)
"""
out_code_word = 0
call = call.upper() # upper case to be save
for char in call:
int_val = ord(char) - 48 # -48 reduce bits, begin with first number utf8 table
out_code_word <<= 6 # shift left 6 bit, making space for a new char
out_code_word |= (
int_val & 0b111111
) # bit OR adds the new char, masked with AND 0b111111
out_code_word >>= 6 # clean last char
out_code_word <<= 6 # make clean space
out_code_word |= ord(call[-1]) & 0b111111 # add the SSID uncoded only 0 - 63
return out_code_word.to_bytes(length=6, byteorder="big")
def decode_call(b_code_word: bytes):
"""
@author: DB1UJ
Args:
b_code_word:bytes: 6 bytes with 6 bits/sign valid data char signs LSB
Returns:
call:str: upper case ham radio call sign [A-Z,0-9] + binary SSID
"""
code_word = int.from_bytes(b_code_word, byteorder="big", signed=False)
ssid = chr(code_word & 0b111111) # save the uncoded binary SSID
call = str()
while code_word != 0:
call = chr((code_word & 0b111111) + 48) + call
code_word >>= 6
call = call[:-1] + ssid # remove the last char from call and replace with SSID
return call
def snr_to_bytes(snr):
"""create a byte from snr value """
# make sure we have onl 1 byte snr
# min max = -12.7 / 12.7
# enough for detecting if a channel is good or bad
snr = snr * 10
snr = np.clip(snr, -127, 127)
snr = int(snr).to_bytes(1, byteorder='big', signed=True)
return snr
def snr_from_bytes(snr):
"""create int from snr byte"""
snr = int.from_bytes(snr, byteorder='big', signed=True)
snr = snr / 10
return snr
def safe_execute(default, exception, function, *args):
"""
https://stackoverflow.com/a/36671208
from json import loads
safe_execute("Oh no, explosions occurred!", TypeError, loads, None)
"""
try:
return function(*args)
except exception:
return default
def return_key_from_object(default, obj, key):
try:
return obj[key]
except KeyError:
return default
def bool_to_string(state):
return "True" if state else "False"
def get_hmac_salt(dxcallsign: bytes, mycallsign: bytes):
filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt"
if sys.platform in ["linux"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
elif sys.platform in ["darwin"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
elif sys.platform in ["win32", "win64"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
else:
try:
subfolder = Path('hmac')
filepath = subfolder / filename
except Exception as e:
log.error(
"[Modem] [HMAC] File lookup error", file=filepath,
)
# check if file exists else return false
if not check_if_file_exists(filepath):
return False
log.info("[SCK] [HMAC] File lookup", file=filepath)
try:
with open(filepath, "r") as file:
line = file.readlines()
hmac_salt = bytes(line[-1], "utf-8").split(b'\n')
hmac_salt = hmac_salt[0]
return hmac_salt if delete_last_line_from_hmac_list(filepath, -1) else False
except Exception as e:
log.warning("[SCK] [HMAC] File lookup failed", file=filepath, e=e)
return False
def search_hmac_salt(dxcallsign: bytes, mycallsign: bytes, search_token, data_frame, token_iters):
filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt"
if sys.platform in ["linux"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
elif sys.platform in ["darwin"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
elif sys.platform in ["win32", "win64"]:
if hasattr(sys, "_MEIPASS"):
filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename
else:
subfolder = Path('hmac')
filepath = subfolder / filename
else:
try:
subfolder = Path('hmac')
filepath = subfolder / filename
except Exception as e:
log.error(
"[Modem] [HMAC] File lookup error", file=filepath,
)
# check if file exists else return false
if not check_if_file_exists(filepath):
log.warning(
"[Modem] [HMAC] Token file not found", file=filepath,
)
return False
try:
with open(filepath, "r") as file:
token_list = file.readlines()
token_iters = min(token_iters, len(token_list))
for _ in range(1, token_iters + 1):
key = token_list[len(token_list) - _][:-1]
key = bytes(key, "utf-8")
search_digest = hmac.new(key, data_frame, hashlib.sha256).digest()[:4]
# TODO Remove this debugging information if not needed anymore
# print("-----------------------------------------")
# print(_)
# print(f" key-------------{key}")
# print(f" key-------------{token_list[len(token_list) - _][:-1]}")
# print(f" key-------------{key.hex()}")
# print(f" search token----{search_token.hex()}")
# print(f" search digest---{search_digest.hex()}")
if search_token.hex() == search_digest.hex():
token_position = len(token_list) - _
delete_last_line_from_hmac_list(filepath, token_position)
log.info(
"[Modem] [HMAC] Signature found", expected=search_token.hex(),
)
return True
log.warning(
"[Modem] [HMAC] Signature not found", expected=search_token.hex(), filepath=filepath,
)
return False
except Exception as e:
log.warning(
"[Modem] [HMAC] Lookup failed", e=e, expected=search_token,
)
return False
def delete_last_line_from_hmac_list(filepath, position):
# check if file exists else return false
if not check_if_file_exists(filepath):
return False
try:
linearray = []
with open(filepath, "r") as file:
linearray = file.readlines()[:position]
#print(linearray)
with open(filepath, "w") as file:
#print(linearray)
for line in linearray:
file.write(line)
return True
except Exception:
return False
def check_if_file_exists(path):
try:
# check if file size is present and filesize > 0
if os.path.isfile(path):
filesize = os.path.getsize(path)
if filesize > 0:
return True
else:
return False
else:
return False
except Exception as e:
log.warning(
"[Modem] [FILE] Lookup failed", e=e, path=path,
)
return False
def set_bit(byte, position, value):
"""Set the bit at 'position' to 'value' in the given byte."""
if not 0 <= position <= 7:
raise ValueError("Position must be between 0 and 7")
if value:
return byte | (1 << position)
else:
return byte & ~(1 << position)
def get_bit(byte, position):
"""Get the boolean value of the bit at 'position' in the given byte."""
if not 0 <= position <= 7:
raise ValueError("Position must be between 0 and 7")
return (byte & (1 << position)) != 0
def set_flag(byte, flag_name, value, flag_dict):
"""Set the flag in the byte according to the flag dictionary.
# Define a dictionary mapping flag names to their bit positions
flag_dict = {
'FLAG1': 0, # Bit position for FLAG1
'FLAG2': 1, # Bit position for FLAG2, etc.
'FLAG3': 2
}
"""
if flag_name not in flag_dict:
raise ValueError(f"Unknown flag name: {flag_name}")
position = flag_dict[flag_name]
return set_bit(byte, position, value)
def get_flag(byte, flag_name, flag_dict):
"""Get the value of the flag from the byte according to the flag dictionary."""
if flag_name not in flag_dict:
raise ValueError(f"Unknown flag name: {flag_name}")
position = flag_dict[flag_name]
return get_bit(byte, position)
def find_binary_path(binary_name="rigctld", search_system_wide=False):
"""
Search for a binary within the current working directory and its subdirectories.
Optionally, check system-wide locations and PATH environment variable if not found.
:param binary_name: The base name of the binary to search for, without extension.
:param search_system_wide: Boolean flag to enable or disable system-wide search.
:return: The full path to the binary if found, otherwise None.
"""
# Adjust binary name for Windows
if platform.system() == 'Windows':
binary_name += ".exe"
# Search in the current working directory and subdirectories
root_path = os.getcwd()
for dirpath, dirnames, filenames in os.walk(root_path):
if binary_name in filenames:
return os.path.join(dirpath, binary_name)
# If system-wide search is enabled, look in system locations and PATH
if search_system_wide:
system_paths = os.environ.get('PATH', '').split(os.pathsep)
# Optionally add common binary locations for Unix-like and Windows systems
if platform.system() != 'Windows':
system_paths.extend(['/usr/bin', '/usr/local/bin', '/bin'])
else:
system_paths.extend(['C:\\Windows\\System32', 'C:\\Windows'])
for path in system_paths:
potential_path = os.path.join(path, binary_name)
if os.path.isfile(potential_path):
return potential_path
def kill_and_execute(binary_path, additional_args=None):
"""
Kills any running instances of the binary across Linux, macOS, and Windows, then starts a new one non-blocking.
:param binary_path: The full path to the binary to execute.
:param additional_args: A list of additional arguments to pass to the binary.
:return: subprocess.Popen object of the started process
"""
# Kill any existing instances of the binary
for proc in psutil.process_iter(attrs=['pid', 'name', 'cmdline']):
try:
cmdline = proc.info['cmdline']
# Ensure cmdline is iterable and not None
if cmdline and binary_path in ' '.join(cmdline):
proc.kill()
print(f"Killed running instance with PID: {proc.info['pid']}")
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass # Process no longer exists or no permission to kill
# Execute the binary with additional arguments non-blocking
command = [binary_path] + (additional_args if additional_args else [])
return subprocess.Popen(command)