first code for using hmac token based signing for data

This commit is contained in:
DJ2LS 2023-08-06 23:08:55 +02:00
parent 0f1ab97444
commit 338dbc1d92
6 changed files with 125 additions and 16 deletions

View file

@ -15,7 +15,8 @@ import time
import uuid import uuid
import lzma import lzma
from random import randrange from random import randrange
import hmac
import hashlib
import codec2 import codec2
import helpers import helpers
import modem import modem
@ -326,7 +327,8 @@ class DATA:
# [3] mycallsign with ssid # [3] mycallsign with ssid
# [4] dxcallsign with ssid # [4] dxcallsign with ssid
# [5] attempts # [5] attempts
self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5]) # [6] hmac salt hash
self.open_dc_and_transmit(data[1], data[2], data[3], data[4], data[5], data[6])
elif data[0] == "FEC_IS_WRITING": elif data[0] == "FEC_IS_WRITING":
@ -915,9 +917,18 @@ class DATA:
data_frame = payload[9:] data_frame = payload[9:]
data_frame_crc_received = helpers.get_crc_32(data_frame) data_frame_crc_received = helpers.get_crc_32(data_frame)
# Check if data_frame_crc is equal with received crc # check if hmac signing enabled
if data_frame_crc == data_frame_crc_received: if TNC.enable_hmac:
self.arq_process_received_data_frame(data_frame, snr) hmac_digest = hmac.new(TNC.hmac_salt, data_frame, hashlib.sha256).digest()[:4]
# now check if we have valid hmac signature
if hmac_digest == data_frame_crc_received:
# hmac digest received
self.arq_process_received_data_frame(data_frame, snr, signed=True)
else:
# hmac signature wrong
self.arq_process_received_data_frame(data_frame, snr, signed=False)
else: else:
self.send_data_to_socket_queue( self.send_data_to_socket_queue(
freedata="tnc-message", freedata="tnc-message",
@ -1018,7 +1029,7 @@ class DATA:
# Update modes we are listening to # Update modes we are listening to
self.set_listening_modes(False, True, self.mode_list[self.speed_level]) self.set_listening_modes(False, True, self.mode_list[self.speed_level])
def arq_process_received_data_frame(self, data_frame, snr): def arq_process_received_data_frame(self, data_frame, snr, signed):
""" """
@ -1029,7 +1040,7 @@ class DATA:
self.rx_start_of_transmission, len(ARQ.rx_frame_buffer) self.rx_start_of_transmission, len(ARQ.rx_frame_buffer)
) )
self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter, self.log.info("[TNC] ARQ | RX | DATA FRAME SUCCESSFULLY RECEIVED", nacks=self.frame_nack_counter,
bytesperminute=ARQ.bytes_per_minute, total_bytes=ARQ.total_bytes, duration=duration) bytesperminute=ARQ.bytes_per_minute, total_bytes=ARQ.total_bytes, duration=duration, signed=signed)
# Decompress the data frame # Decompress the data frame
data_frame_decompressed = lzma.decompress(data_frame) data_frame_decompressed = lzma.decompress(data_frame)
@ -1124,7 +1135,8 @@ class DATA:
dxcallsign=str(Station.dxcallsign, "UTF-8"), dxcallsign=str(Station.dxcallsign, "UTF-8"),
dxgrid=str(Station.dxgrid, "UTF-8"), dxgrid=str(Station.dxgrid, "UTF-8"),
data=base64_data, data=base64_data,
irs=helpers.bool_to_string(self.is_IRS) irs=helpers.bool_to_string(self.is_IRS),
signed=signed
) )
if TNC.enable_stats: if TNC.enable_stats:
@ -1149,7 +1161,7 @@ class DATA:
snr=snr, snr=snr,
) )
def arq_transmit(self, data_out: bytes): def arq_transmit(self, data_out: bytes, hmac_salt: str):
""" """
Transmit ARQ frame Transmit ARQ frame
@ -1203,9 +1215,18 @@ class DATA:
tx_start_of_transmission = time.time() tx_start_of_transmission = time.time()
self.calculate_transfer_rate_tx(tx_start_of_transmission, 0, len(data_out)) self.calculate_transfer_rate_tx(tx_start_of_transmission, 0, len(data_out))
# Append a crc at the beginning and end of file indicators # check if hmac signature is available
frame_payload_crc = helpers.get_crc_32(data_out) if hmac_salt not in ['', False]:
self.log.debug("[TNC] frame payload CRC:", crc=frame_payload_crc.hex()) # create hmac digest
hmac_digest = hmac.new(hmac_salt, data_out, hashlib.sha256).digest()
# truncate to 32bit
frame_payload_crc = hmac_digest[:4]
self.log.debug("[TNC] frame payload HMAC:", crc=frame_payload_crc.hex())
else:
# Append a crc at the beginning and end of file indicators
frame_payload_crc = helpers.get_crc_32(data_out)
self.log.debug("[TNC] frame payload CRC:", crc=frame_payload_crc.hex())
# Assemble the data frame # Assemble the data frame
data_out = ( data_out = (
@ -2060,6 +2081,7 @@ class DATA:
mycallsign, mycallsign,
dxcallsign, dxcallsign,
attempts: int, attempts: int,
hmac_salt: str
) -> bool: ) -> bool:
""" """
Open data channel and transmit data Open data channel and transmit data
@ -2108,11 +2130,11 @@ class DATA:
threading.Event().wait(0.01) threading.Event().wait(0.01)
if ARQ.arq_state: if ARQ.arq_state:
self.arq_transmit(data_out) self.arq_transmit(data_out, hmac_salt)
return True return True
return False return False
arq_transmit
def arq_open_data_channel( def arq_open_data_channel(
self, mycallsign self, mycallsign
) -> bool: ) -> bool:

View file

@ -489,3 +489,33 @@ def return_key_from_object(default, obj, key):
def bool_to_string(state): def bool_to_string(state):
return "True" if state else "False" return "True" if state else "False"
def get_hmac_salt(dxcallsign: bytes, mycallsign: bytes):
filename = f"freedata_hmac_tokens_{int(time.time())}_{dxcallsign}_{mycallsign}.txt"
try:
with open(filename, "w") as file:
line = file.readlines()
hmac_salt = line[-1]
return hmac_salt if delete_last_line_from_hmac_list(filename) else False
except Exception:
return False
def delete_last_line_from_hmac_list(filename):
try:
linearray = []
with open(filename, "r") as file:
linearray = file.readlines()[:-1]
print(linearray)
with open(filename, "w") as file:
print(linearray)
for line in linearray:
file.write(line)
return True
except Exception:
return False

View file

@ -253,6 +253,14 @@ if __name__ == "__main__":
help="Enable mesh protocol", help="Enable mesh protocol",
) )
PARSER.add_argument(
"--hmac",
dest="hmac_salt",
type=str,
default="False",
help="Enable and set hmac message salt",
)
ARGS = PARSER.parse_args() ARGS = PARSER.parse_args()
# set save to folder state for allowing downloading files to local file system # set save to folder state for allowing downloading files to local file system
@ -307,6 +315,12 @@ if __name__ == "__main__":
TCIParam.port = ARGS.tci_port TCIParam.port = ARGS.tci_port
ModemParam.tx_delay = ARGS.tx_delay ModemParam.tx_delay = ARGS.tx_delay
MeshParam.enable_protocol = ARGS.enable_mesh MeshParam.enable_protocol = ARGS.enable_mesh
if ARGS.hmac_salt not in ["False"]:
TNC.hmac_salt = ARGS.hmac_salt
TNC.enable_hmac = False
else:
TNC.hmac_salt = ''
TNC.enable_hmac = False
except Exception as e: except Exception as e:
log.error("[DMN] Error reading config file", exception=e) log.error("[DMN] Error reading config file", exception=e)

View file

@ -780,9 +780,13 @@ class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler):
raise TypeError raise TypeError
binarydata = base64.b64decode(base64data) binarydata = base64.b64decode(base64data)
# check if hmac hash is provided
try:
hmac_salt = helpers.get_hmac_salt(dxcallsign, mycallsign)
except Exception:
hmac_salt = ''
DATA_QUEUE_TRANSMIT.put( DATA_QUEUE_TRANSMIT.put(
["ARQ_RAW", binarydata, arq_uuid, mycallsign, dxcallsign, attempts] ["ARQ_RAW", binarydata, arq_uuid, mycallsign, dxcallsign, attempts, hmac_salt]
) )
except Exception as err: except Exception as err:

View file

@ -143,6 +143,8 @@ class TNC:
respond_to_call: bool = True # respond to cq, ping, connection request, file request if not in session respond_to_call: bool = True # respond to cq, ping, connection request, file request if not in session
heard_stations = [] heard_stations = []
listen: bool = True listen: bool = True
enable_hmac: bool = False
hmac_salt: str = ''
# ------------ # ------------

View file

@ -0,0 +1,37 @@
import argparse
import numpy as np
import time
def create_hmac_salts(dxcallsign: str, mycallsign: str, num_tokens: int = 10000):
"""
Creates a file with tokens for hmac signing
Args:
dxcallsign:
mycallsign:
int:
Returns:
bool
"""
try:
# Create and write random strings to a file
with open(f"freedata_hmac_tokens_{int(time.time())}_{dxcallsign}_{mycallsign}.txt", "w") as file:
for _ in range(num_tokens):
random_str = np.random.bytes(16).hex()
file.write(random_str + '\n')
except Exception:
print("error creating hmac file")
parser = argparse.ArgumentParser(description='FreeDATA token generator')
parser.add_argument('--dxcallsign', dest="dxcallsign", default='AA0AA', help="Select the destination callsign", type=str)
parser.add_argument('--mycallsign', dest="mycallsign", default='AA0AA', help="Select the own callsign", type=str)
parser.add_argument('--tokens', dest="tokens", default='10000', help="Amount of tokens to create", type=int)
args = parser.parse_args()
create_hmac_salts(args.dxcallsign, args.mycallsign, int(args.tokens))