# -*- coding: utf-8 -*- """ Created on Fri Dec 25 21:25:14 2020 @author: DJ2LS """ import time from datetime import datetime,timezone import crcengine import structlog import numpy as np import threading import hashlib import hmac import os import sys from pathlib import Path log = structlog.get_logger("helpers") def wait(seconds: float) -> bool: """ Args: seconds: Returns: """ timeout = time.time() + seconds while time.time() < timeout: threading.Event().wait(0.01) return True def get_crc_8(data: str) -> bytes: """Author: DJ2LS Get the CRC8 of a byte string param: data = bytes() Args: data: Returns: CRC-8 (CCITT) of the provided data as bytes """ crc_algorithm = crcengine.new("crc8-ccitt") # load crc8 library crc_data = crc_algorithm(data) crc_data = crc_data.to_bytes(1, byteorder="big") return crc_data def get_crc_16(data: str) -> bytes: """Author: DJ2LS Get the CRC16 of a byte string param: data = bytes() Args: data: Returns: CRC-16 (CCITT) of the provided data as bytes """ if not isinstance(data, (bytes)): data = bytes(data,"utf-8") crc_algorithm = crcengine.new("crc16-ccitt-false") # load crc16 library return crc_algorithm(data).to_bytes(2, byteorder="big") def get_crc_24(data: str) -> bytes: """Author: DJ2LS Get the CRC24-OPENPGP of a byte string https://github.com/GardenTools/CrcEngine#examples param: data = bytes() Args: data: Returns: CRC-24 (OpenPGP) of the provided data as bytes """ if not isinstance(data, (bytes)): data = bytes(data,'utf-8') crc_algorithm = crcengine.create( 0x864CFB, 24, 0xB704CE, ref_in=False, ref_out=False, xor_out=0, name="crc-24-openpgp", ) return crc_algorithm(data).to_bytes(3,byteorder="big") def get_crc_32(data: str) -> bytes: """Author: DJ2LS Get the CRC32 of a byte string param: data = bytes() Args: data: Returns: CRC-32 of the provided data as bytes """ if not isinstance(data, (bytes)): data = bytes(data,"utf-8") crc_algorithm = crcengine.new("crc32") # load crc32 library return crc_algorithm(data).to_bytes(4, byteorder="big") def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list): """ Args: dxcallsign: dxgrid: datatype: snr: offset: frequency: Returns: Nothing """ # check if buffer empty if len(heard_stations_list) == 0: heard_stations_list.append( [dxcallsign, dxgrid, int(datetime.now(timezone.utc).timestamp()), datatype, snr, offset, frequency] ) # if not, we search and update else: for i in range(len(heard_stations_list)): # Update callsign with new timestamp if heard_stations_list[i].count(dxcallsign) > 0: heard_stations_list[i] = [ dxcallsign, dxgrid, int(time.time()), datatype, snr, offset, frequency, ] break # Insert if nothing found if i == len(heard_stations_list) - 1: heard_stations_list.append( [ dxcallsign, dxgrid, int(time.time()), datatype, snr, offset, frequency, ] ) break # for idx, item in enumerate(heard_stations_list): # if dxcallsign in item: # item = [dxcallsign, int(time.time())] # heard_stations_list[idx] = item def callsign_to_bytes(callsign: str) -> bytes: """ Args: callsign: Returns: """ # http://www.aprs.org/aprs11/SSIDs.txt # -0 Your primary station usually fixed and message capable # -1 generic additional station, digi, mobile, wx, etc # -2 generic additional station, digi, mobile, wx, etc # -3 generic additional station, digi, mobile, wx, etc # -4 generic additional station, digi, mobile, wx, etc # -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc) # -6 Special activity, Satellite ops, camping or 6 meters, etc # -7 walkie talkies, HT's or other human portable # -8 boats, sailboats, RV's or second main mobile # -9 Primary Mobile (usually message capable) # -10 internet, Igates, echolink, winlink, AVRS, APRN, etc # -11 balloons, aircraft, spacecraft, etc # -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc # -13 Weather stations # -14 Truckers or generally full time drivers # -15 generic additional station, digi, mobile, wx, etc # Try converting to bytestring if possible type string try: callsign = callsign.encode("utf-8") except TypeError: # This is expected depending on the type of the `callsign` argument. # log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err) pass except Exception as err: log.debug("[HLP] callsign_to_bytes: Error converting callsign to bytes:", e=err, data=callsign) # Need this step to reduce the needed payload by the callsign # (stripping "-" out of the callsign) callsign = callsign.split(b"-") ssid = 0 try: ssid = int(callsign[1]) except IndexError: # This is expected when callsign doesn't have a dash. # log.debug("[HLP] callsign_to_bytes: Error callsign SSID to integer:", e=err) pass except Exception as err: log.debug("[HLP] callsign_to_bytes: Error splitting callsign/ssid:", e=err) # callsign = callsign[0] # bytestring = bytearray(8) # bytestring[:len(callsign)] = callsign # bytestring[7:8] = bytes([ssid]) # ---- callsign with encoding always 6 bytes long callsign = callsign[0].decode("utf-8") ssid = bytes([ssid]).decode("utf-8") return encode_call(callsign + ssid) # return bytes(bytestring) def bytes_to_callsign(bytestring: bytes) -> bytes: """ Convert our callsign, received by a frame to a callsign in a human readable format Args: bytestring: Returns: bytes """ # http://www.aprs.org/aprs11/SSIDs.txt # -0 Your primary station usually fixed and message capable # -1 generic additional station, digi, mobile, wx, etc # -2 generic additional station, digi, mobile, wx, etc # -3 generic additional station, digi, mobile, wx, etc # -4 generic additional station, digi, mobile, wx, etc # -5 Other networks (Dstar, Iphones, Androids, Blackberry's etc) # -6 Special activity, Satellite ops, camping or 6 meters, etc # -7 walkie talkies, HT's or other human portable # -8 boats, sailboats, RV's or second main mobile # -9 Primary Mobile (usually message capable) # -10 internet, Igates, echolink, winlink, AVRS, APRN, etc # -11 balloons, aircraft, spacecraft, etc # -12 APRStt, DTMF, RFID, devices, one-way trackers*, etc # -13 Weather stations # -14 Truckers or generally full time drivers # -15 generic additional station, digi, mobile, wx, etc # we need to do this step to reduce the needed paypload by the callsign ( stripping "-" out of the callsign ) """ callsign = bytes(bytestring[:7]) callsign = callsign.rstrip(b"\x00") ssid = int.from_bytes(bytes(bytestring[7:8]), "big") callsign = callsign + b"-" callsign = callsign.decode("utf-8") callsign = callsign + str(ssid) callsign = callsign.encode("utf-8") return bytes(callsign) """ decoded = decode_call(bytestring) callsign = decoded[:-1] ssid = ord(bytes(decoded[-1], "utf-8")) return bytes(f"{callsign}-{ssid}", "utf-8") def check_callsign(callsign: str, crc_to_check: bytes, ssid_list): """ Function to check a crc against a callsign to calculate the ssid by generating crc until we find the correct SSID Args: callsign: Callsign which we want to check crc_to_check: The CRC which we want the callsign to check against Returns: [True, Callsign + SSID] False """ if not isinstance(callsign, (bytes)): callsign = bytes(callsign,'utf-8') log.debug("[HLP] check_callsign: Checking:", callsign=callsign) try: # We want the callsign without SSID callsign = callsign.split(b"-")[0] except IndexError: # This is expected when `callsign` doesn't have a dash. pass except Exception as err: log.debug("[HLP] check_callsign: Error converting to bytes:", e=err) for ssid in ssid_list: call_with_ssid = callsign + b'-' + (str(ssid)).encode('utf-8') #call_with_ssid.extend("-".encode("utf-8")) #call_with_ssid.extend(str(ssid).encode("utf-8")) callsign_crc = get_crc_24(call_with_ssid) if callsign_crc == crc_to_check: log.debug("[HLP] check_callsign matched:", call_with_ssid=call_with_ssid) return [True, call_with_ssid] return [False, b''] def check_session_id(id: bytes, id_to_check: bytes): """ Funktion to check if we received the correct session id Args: id: our own session id id_to_check: The session id byte we want to check Returns: True False """ if id_to_check == b'\x00': return False log.debug("[HLP] check_sessionid: Checking:", ownid=id, check=id_to_check) return id == id_to_check def encode_grid(grid): """ @author: DB1UJ Args: grid:string: maidenhead QTH locater [a-r][a-r][0-9][0-9][a-x][a-x] Returns: 4 bytes contains 26 bit valid data with encoded grid locator """ out_code_word = 0 grid = grid.upper() # upper case to be save int_first = ord(grid[0]) - 65 # -65 offset for "A" become zero, utf8 table int_sec = ord(grid[1]) - 65 # -65 offset for "A" become zero, utf8 table int_val = (int_first * 18) + int_sec # encode for modulo devision, 2 numbers in 1 out_code_word = int_val & 0b111111111 # only 9 bit LSB A - R * A - R is needed out_code_word <<= 9 # shift 9 bit left having space next bits, letter A-R * A-R int_val = int(grid[2:4]) # number string to number int, highest value 99 out_code_word |= int_val & 0b1111111 # using bit OR to add new value out_code_word <<= 7 # shift 7 bit left having space next bits, letter A-X int_val = ord(grid[4]) - 65 # -65 offset for 'A' become zero, utf8 table out_code_word |= int_val & 0b11111 # using bit OR to add new value out_code_word <<= 5 # shift 5 bit left having space next bits, letter A-X int_val = ord(grid[5]) - 65 # -65 offset for 'A' become zero, utf8 table out_code_word |= int_val & 0b11111 # using bit OR to add new value return out_code_word.to_bytes(length=4, byteorder="big") def decode_grid(b_code_word: bytes): """ @author: DB1UJ Args: b_code_word:bytes: 4 bytes with 26 bit valid data LSB Returns: grid:str: upper case maidenhead QTH locater [A-R][A-R][0-9][0-9][A-X][A-X] """ code_word = int.from_bytes(b_code_word, byteorder="big", signed=False) grid = chr((code_word & 0b11111) + 65) code_word >>= 5 grid = chr((code_word & 0b11111) + 65) + grid code_word >>= 7 grid = str(int(code_word & 0b1111111)) + grid if (code_word & 0b1111111) < 10: grid = f"0{grid}" code_word >>= 9 int_val = int(code_word & 0b111111111) int_first, int_sec = divmod(int_val, 18) return chr(int(int_first) + 65) + chr(int(int_sec) + 65) + grid def encode_call(call): """ @author: DB1UJ Args: call:string: ham radio call sign [A-Z,0-9], last char SSID 0-63 Returns: 6 bytes contains 6 bits/sign encoded 8 char call sign with binary SSID (only upper letters + numbers, SSID) """ out_code_word = 0 call = call.upper() # upper case to be save for char in call: int_val = ord(char) - 48 # -48 reduce bits, begin with first number utf8 table out_code_word <<= 6 # shift left 6 bit, making space for a new char out_code_word |= ( int_val & 0b111111 ) # bit OR adds the new char, masked with AND 0b111111 out_code_word >>= 6 # clean last char out_code_word <<= 6 # make clean space out_code_word |= ord(call[-1]) & 0b111111 # add the SSID uncoded only 0 - 63 return out_code_word.to_bytes(length=6, byteorder="big") def decode_call(b_code_word: bytes): """ @author: DB1UJ Args: b_code_word:bytes: 6 bytes with 6 bits/sign valid data char signs LSB Returns: call:str: upper case ham radio call sign [A-Z,0-9] + binary SSID """ code_word = int.from_bytes(b_code_word, byteorder="big", signed=False) ssid = chr(code_word & 0b111111) # save the uncoded binary SSID call = str() while code_word != 0: call = chr((code_word & 0b111111) + 48) + call code_word >>= 6 call = call[:-1] + ssid # remove the last char from call and replace with SSID return call def snr_to_bytes(snr): """create a byte from snr value """ # make sure we have onl 1 byte snr # min max = -12.7 / 12.7 # enough for detecting if a channel is good or bad snr = snr * 10 snr = np.clip(snr, -127, 127) snr = int(snr).to_bytes(1, byteorder='big', signed=True) return snr def snr_from_bytes(snr): """create int from snr byte""" snr = int.from_bytes(snr, byteorder='big', signed=True) snr = snr / 10 return snr def safe_execute(default, exception, function, *args): """ https://stackoverflow.com/a/36671208 from json import loads safe_execute("Oh no, explosions occurred!", TypeError, loads, None) """ try: return function(*args) except exception: return default def return_key_from_object(default, obj, key): try: return obj[key] except KeyError: return default def bool_to_string(state): return "True" if state else "False" def get_hmac_salt(dxcallsign: bytes, mycallsign: bytes): filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt" if sys.platform in ["linux"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["darwin"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["win32", "win64"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename else: try: subfolder = Path('hmac') filepath = subfolder / filename except Exception as e: log.error( "[Modem] [HMAC] File lookup error", file=filepath, ) # check if file exists else return false if not check_if_file_exists(filepath): return False log.info("[SCK] [HMAC] File lookup", file=filepath) try: with open(filepath, "r") as file: line = file.readlines() hmac_salt = bytes(line[-1], "utf-8").split(b'\n') hmac_salt = hmac_salt[0] return hmac_salt if delete_last_line_from_hmac_list(filepath, -1) else False except Exception as e: log.warning("[SCK] [HMAC] File lookup failed", file=filepath, e=e) return False def search_hmac_salt(dxcallsign: bytes, mycallsign: bytes, search_token, data_frame, token_iters): filename = f"freedata_hmac_STATION_{mycallsign.decode('utf-8')}_REMOTE_{dxcallsign.decode('utf-8')}.txt" if sys.platform in ["linux"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["darwin"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename elif sys.platform in ["win32", "win64"]: if hasattr(sys, "_MEIPASS"): filepath = getattr(sys, "_MEIPASS") + '/hmac/' + filename else: subfolder = Path('hmac') filepath = subfolder / filename else: try: subfolder = Path('hmac') filepath = subfolder / filename except Exception as e: log.error( "[Modem] [HMAC] File lookup error", file=filepath, ) # check if file exists else return false if not check_if_file_exists(filepath): log.warning( "[Modem] [HMAC] Token file not found", file=filepath, ) return False try: with open(filepath, "r") as file: token_list = file.readlines() token_iters = min(token_iters, len(token_list)) for _ in range(1, token_iters + 1): key = token_list[len(token_list) - _][:-1] key = bytes(key, "utf-8") search_digest = hmac.new(key, data_frame, hashlib.sha256).digest()[:4] # TODO Remove this debugging information if not needed anymore # print("-----------------------------------------") # print(_) # print(f" key-------------{key}") # print(f" key-------------{token_list[len(token_list) - _][:-1]}") # print(f" key-------------{key.hex()}") # print(f" search token----{search_token.hex()}") # print(f" search digest---{search_digest.hex()}") if search_token.hex() == search_digest.hex(): token_position = len(token_list) - _ delete_last_line_from_hmac_list(filepath, token_position) log.info( "[Modem] [HMAC] Signature found", expected=search_token.hex(), ) return True log.warning( "[Modem] [HMAC] Signature not found", expected=search_token.hex(), filepath=filepath, ) return False except Exception as e: log.warning( "[Modem] [HMAC] Lookup failed", e=e, expected=search_token, ) return False def delete_last_line_from_hmac_list(filepath, position): # check if file exists else return false if not check_if_file_exists(filepath): return False try: linearray = [] with open(filepath, "r") as file: linearray = file.readlines()[:position] #print(linearray) with open(filepath, "w") as file: #print(linearray) for line in linearray: file.write(line) return True except Exception: return False def check_if_file_exists(path): try: # check if file size is present and filesize > 0 if os.path.isfile(path): filesize = os.path.getsize(path) if filesize > 0: return True else: return False else: return False except Exception as e: log.warning( "[Modem] [FILE] Lookup failed", e=e, path=path, ) return False