type definition for functions

This commit is contained in:
dj2ls 2022-01-02 09:51:37 +01:00
parent a6816f8597
commit 94d251c162

View file

@ -52,7 +52,7 @@ DATA_FRAME_BOF = b'\xAA\xAA' # 2 bytes for the BOF End of Fil
DATA_FRAME_EOF = b'\xFF\xFF' # 2 bytes for the EOF End of File indicator in a data frame DATA_FRAME_EOF = b'\xFF\xFF' # 2 bytes for the EOF End of File indicator in a data frame
def arq_data_received(data_in, bytes_per_frame): def arq_data_received(data_in:bytes, bytes_per_frame:int):
data_in = bytes(data_in) data_in = bytes(data_in)
# we neeed to declare our global variables, so the thread has access to them # we neeed to declare our global variables, so the thread has access to them
@ -226,7 +226,7 @@ def arq_data_received(data_in, bytes_per_frame):
def arq_transmit(data_out, mode, n_frames_per_burst): def arq_transmit(data_out:bytes, mode:int, n_frames_per_burst:int):
global RPT_REQUEST_BUFFER global RPT_REQUEST_BUFFER
global DATA_FRAME_ACK_RECEIVED global DATA_FRAME_ACK_RECEIVED
@ -408,7 +408,7 @@ def frame_ack_received():
DATA_CHANNEL_LAST_RECEIVED = int(time.time()) # we need to update our timeout timestamp DATA_CHANNEL_LAST_RECEIVED = int(time.time()) # we need to update our timeout timestamp
def burst_rpt_received(data_in): def burst_rpt_received(data_in:bytes):
global RPT_REQUEST_RECEIVED global RPT_REQUEST_RECEIVED
global RPT_REQUEST_BUFFER global RPT_REQUEST_BUFFER
global DATA_CHANNEL_LAST_RECEIVED global DATA_CHANNEL_LAST_RECEIVED
@ -433,7 +433,7 @@ def burst_rpt_received(data_in):
# ############################################################################################################ # ############################################################################################################
def open_dc_and_transmit(data_out, mode, n_frames_per_burst): def open_dc_and_transmit(data_out:bytes, mode:int, n_frames_per_burst:int):
global DATA_CHANNEL_READY_FOR_DATA global DATA_CHANNEL_READY_FOR_DATA
static.TNC_STATE = 'BUSY' static.TNC_STATE = 'BUSY'
@ -446,7 +446,7 @@ def open_dc_and_transmit(data_out, mode, n_frames_per_burst):
arq_transmit(data_out, mode, n_frames_per_burst) arq_transmit(data_out, mode, n_frames_per_burst)
async def arq_open_data_channel(mode): async def arq_open_data_channel(mode:int):
global DATA_CHANNEL_READY_FOR_DATA global DATA_CHANNEL_READY_FOR_DATA
global DATA_CHANNEL_LAST_RECEIVED global DATA_CHANNEL_LAST_RECEIVED
@ -493,7 +493,7 @@ async def arq_open_data_channel(mode):
sys.exit() # close thread and so connection attempts sys.exit() # close thread and so connection attempts
def arq_received_data_channel_opener(data_in): def arq_received_data_channel_opener(data_in:bytes):
#global DATA_CHANNEL_MODE #global DATA_CHANNEL_MODE
global DATA_CHANNEL_LAST_RECEIVED global DATA_CHANNEL_LAST_RECEIVED
@ -524,7 +524,7 @@ def arq_received_data_channel_opener(data_in):
structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(static.MYCALLSIGN, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR, mode=mode) structlog.get_logger("structlog").info("[TNC] ARQ | DATA | RX | [" + str(static.MYCALLSIGN, 'utf-8') + "]>>|<<[" + str(static.DXCALLSIGN, 'utf-8') + "]", snr=static.SNR, mode=mode)
def arq_received_channel_is_open(data_in): def arq_received_channel_is_open(data_in:bytes):
global DATA_CHANNEL_LAST_RECEIVED global DATA_CHANNEL_LAST_RECEIVED
global DATA_CHANNEL_READY_FOR_DATA global DATA_CHANNEL_READY_FOR_DATA
@ -557,7 +557,7 @@ def arq_received_channel_is_open(data_in):
# PING HANDLER # PING HANDLER
# ############################################################################################################ # ############################################################################################################
def transmit_ping(callsign): def transmit_ping(callsign:str):
static.DXCALLSIGN = bytes(callsign, 'utf-8').rstrip(b'\x00') static.DXCALLSIGN = bytes(callsign, 'utf-8').rstrip(b'\x00')
static.DXCALLSIGN_CRC8 = helpers.get_crc_8(static.DXCALLSIGN) static.DXCALLSIGN_CRC8 = helpers.get_crc_8(static.DXCALLSIGN)
@ -573,7 +573,7 @@ def transmit_ping(callsign):
txbuffer = [ping_frame] txbuffer = [ping_frame]
modem.transmit(mode=14, repeats=1, repeat_delay=0, frames=txbuffer) modem.transmit(mode=14, repeats=1, repeat_delay=0, frames=txbuffer)
def received_ping(data_in, frequency_offset): def received_ping(data_in:bytes, frequency_offset:str):
static.DXCALLSIGN_CRC8 = bytes(data_in[2:3]).rstrip(b'\x00') static.DXCALLSIGN_CRC8 = bytes(data_in[2:3]).rstrip(b'\x00')
static.DXCALLSIGN = bytes(data_in[3:9]).rstrip(b'\x00') static.DXCALLSIGN = bytes(data_in[3:9]).rstrip(b'\x00')
@ -593,7 +593,7 @@ def received_ping(data_in, frequency_offset):
txbuffer = [ping_frame] txbuffer = [ping_frame]
modem.transmit(mode=14, repeats=1, repeat_delay=0, frames=txbuffer) modem.transmit(mode=14, repeats=1, repeat_delay=0, frames=txbuffer)
def received_ping_ack(data_in): def received_ping_ack(data_in:bytes):
static.DXCALLSIGN_CRC8 = bytes(data_in[2:3]).rstrip(b'\x00') static.DXCALLSIGN_CRC8 = bytes(data_in[2:3]).rstrip(b'\x00')
static.DXGRID = bytes(data_in[3:9]).rstrip(b'\x00') static.DXGRID = bytes(data_in[3:9]).rstrip(b'\x00')
@ -609,7 +609,7 @@ def received_ping_ack(data_in):
# BROADCAST HANDLER # BROADCAST HANDLER
# ############################################################################################################ # ############################################################################################################
def run_beacon(interval): def run_beacon(interval:int):
try: try:
structlog.get_logger("structlog").warning("[TNC] Starting beacon!", interval=interval) structlog.get_logger("structlog").warning("[TNC] Starting beacon!", interval=interval)
@ -632,7 +632,7 @@ def run_beacon(interval):
except Exception as e: except Exception as e:
print(e) print(e)
def received_beacon(data_in): def received_beacon(data_in:bytes):
# here we add the received station to the heard stations buffer # here we add the received station to the heard stations buffer
dxcallsign = bytes(data_in[2:8]).rstrip(b'\x00') dxcallsign = bytes(data_in[2:8]).rstrip(b'\x00')
dxgrid = bytes(data_in[8:14]).rstrip(b'\x00') dxgrid = bytes(data_in[8:14]).rstrip(b'\x00')
@ -661,7 +661,7 @@ def transmit_cq():
# pass # pass
def received_cq(data_in): def received_cq(data_in:bytes):
# here we add the received station to the heard stations buffer # here we add the received station to the heard stations buffer
dxcallsign = bytes(data_in[2:8]).rstrip(b'\x00') dxcallsign = bytes(data_in[2:8]).rstrip(b'\x00')
dxgrid = bytes(data_in[8:14]).rstrip(b'\x00') dxgrid = bytes(data_in[8:14]).rstrip(b'\x00')
@ -672,7 +672,7 @@ def received_cq(data_in):
def arq_reset_ack(state): def arq_reset_ack(state:bool):
""" """
Author: DJ2LS Author: DJ2LS
""" """
@ -685,7 +685,7 @@ def arq_reset_ack(state):
DATA_FRAME_ACK_RECEIVED = state DATA_FRAME_ACK_RECEIVED = state
def calculate_transfer_rate_rx(tx_start_of_transmission, receivedbytes, rx_data_length): def calculate_transfer_rate_rx(tx_start_of_transmission:float, receivedbytes:int, rx_data_length:int) -> list:
try: try:
static.ARQ_TRANSMISSION_PERCENT = int((receivedbytes / rx_data_length) * 100) static.ARQ_TRANSMISSION_PERCENT = int((receivedbytes / rx_data_length) * 100)
@ -708,7 +708,7 @@ def calculate_transfer_rate_rx(tx_start_of_transmission, receivedbytes, rx_data_
def calculate_transfer_rate_tx(tx_start_of_transmission, sentbytes, tx_buffer_length): def calculate_transfer_rate_tx(tx_start_of_transmission:float, sentbytes:int, tx_buffer_length:int) -> list:
try: try:
static.ARQ_TRANSMISSION_PERCENT = int((sentbytes / tx_buffer_length) * 100) static.ARQ_TRANSMISSION_PERCENT = int((sentbytes / tx_buffer_length) * 100)