From 37af525cc5c47151d66110f3f900c9deadf3487b Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Sun, 28 Feb 2021 16:46:18 +0100 Subject: [PATCH] first attempt with ASYNCIO instead of THREADING --- data_handler.py | 116 ++++++++++++++++++++++++++++-------------------- helpers.py | 17 ++++++- modem.py | 5 ++- sock.py | 27 ++++++----- 4 files changed, 103 insertions(+), 62 deletions(-) diff --git a/data_handler.py b/data_handler.py index ccb62916..6071f717 100644 --- a/data_handler.py +++ b/data_handler.py @@ -23,7 +23,7 @@ import asyncio # ARQ DATA HANDLER ############################################################################################################# -def data_received(data_in): +def arq_data_received(data_in): static.TNC_STATE = 'BUSY' @@ -206,7 +206,7 @@ def data_received(data_in): -def transmit(data_out): +async def arq_transmit(data_out): if static.ARQ_DATA_CHANNEL_MODE == 10: payload_per_frame = 512-2 @@ -221,12 +221,8 @@ def transmit(data_out): static.ARQ_PAYLOAD_PER_FRAME = payload_per_frame - 8 - - print("static.ARQ_DATA_PAYLOAD_PER_FRAME " + str(static.FREEDV_DATA_PAYLOAD_PER_FRAME)) - print("static.ARQ_PAYLOAD_PER_FRAME " + str(static.ARQ_PAYLOAD_PER_FRAME)) - - - + #print("static.ARQ_DATA_PAYLOAD_PER_FRAME " + str(static.FREEDV_DATA_PAYLOAD_PER_FRAME)) + #print("static.ARQ_PAYLOAD_PER_FRAME " + str(static.ARQ_PAYLOAD_PER_FRAME)) frame_header_length = 6 #4 @@ -242,7 +238,7 @@ def transmit(data_out): static.TX_BUFFER = [data_out[i:i+static.ARQ_PAYLOAD_PER_FRAME] for i in range(0, len(data_out), static.ARQ_PAYLOAD_PER_FRAME)] static.TX_BUFFER_SIZE = len(static.TX_BUFFER) - logging.info("ARQ | TX | DATA FRAME --- BYTES: " + str(len(data_out)) + " ARQ FRAMES: " + str(static.TX_BUFFER_SIZE)) + logging.info("ARQ | TX | M:" + str(static.ARQ_DATA_CHANNEL_MODE) + " | DATA FRAME --- BYTES: " + str(len(data_out)) + " ARQ FRAMES: " + str(static.TX_BUFFER_SIZE)) # --------------------------------------------- THIS IS THE MAIN LOOP----------------------------------------------------------------- @@ -272,13 +268,14 @@ def transmit(data_out): for static.TX_N_RETRIES in range(static.TX_N_MAX_RETRIES): if static.ARQ_N_SENT_FRAMES + 1 <= static.TX_BUFFER_SIZE: - logging.log(24, "ARQ | TX | " + str(static.ARQ_DATA_CHANNEL_MODE) + " | F:[" + str(static.ARQ_N_SENT_FRAMES+1) + "-" + str(static.ARQ_N_SENT_FRAMES + static.ARQ_TX_N_FRAMES_PER_BURST) + "] | T:[" + str(static.ARQ_N_SENT_FRAMES) + "/" + str(static.TX_BUFFER_SIZE) + "] [" + str(int(static.ARQ_N_SENT_FRAMES/(static.TX_BUFFER_SIZE)*100)).zfill(3) + "%] | A:[" + str(static.TX_N_RETRIES+1) + "/" + str(static.TX_N_MAX_RETRIES) + "]") + logging.log(24, "ARQ | TX | M:" + str(static.ARQ_DATA_CHANNEL_MODE) + " | F:[" + str(static.ARQ_N_SENT_FRAMES+1) + "-" + str(static.ARQ_N_SENT_FRAMES + static.ARQ_TX_N_FRAMES_PER_BURST) + "] | T:[" + str(static.ARQ_N_SENT_FRAMES) + "/" + str(static.TX_BUFFER_SIZE) + "] [" + str(int(static.ARQ_N_SENT_FRAMES/(static.TX_BUFFER_SIZE)*100)).zfill(3) + "%] | A:[" + str(static.TX_N_RETRIES+1) + "/" + str(static.TX_N_MAX_RETRIES) + "]") # lets start a thread to transmit nonblocking - TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") - TRANSMIT_ARQ_BURST_THREAD.start() - + #TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") + #TRANSMIT_ARQ_BURST_THREAD.start() + #asyncio.run(modem.transmit_arq_burst()) + modem.transmit_arq_burst() # lets wait during sending. After sending is finished we will continue while static.CHANNEL_STATE == 'SENDING_DATA': time.sleep(0.01) @@ -312,9 +309,10 @@ def transmit(data_out): logging.warning("ARQ | RX | REQUEST FOR REPEATING FRAMES: " + str(static.ARQ_RPT_FRAMES)) logging.warning("ARQ | TX | SENDING REQUESTED FRAMES: " + str(static.ARQ_RPT_FRAMES)) - TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") - TRANSMIT_ARQ_BURST_THREAD.start() - + #TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") + #TRANSMIT_ARQ_BURST_THREAD.start() + #asyncio.run(modem.transmit_arq_burst()) + modem.transmit_arq_burst() # lets wait during sending. After sending is finished we will continue while static.ARQ_STATE == 'SENDING_DATA': time.sleep(0.01) @@ -408,7 +406,6 @@ def transmit(data_out): #-------------------------BREAK TX BUFFER LOOP IF ALL PACKETS HAVE BEEN SENT AND WE GOT A FRAME ACK elif static.ARQ_N_SENT_FRAMES == static.TX_BUFFER_SIZE and static.ARQ_FRAME_ACK_RECEIVED == True: logging.log(25,"ARQ | RX | FRAME ACK RECEIVED - DATA TRANSMITTED! :-)") - logging.log(25,"------------------------------------------------------") break elif static.ARQ_FRAME_ACK_RECEIVED == False and static.ARQ_RX_FRAME_TIMEOUT == True: @@ -433,6 +430,8 @@ def transmit(data_out): logging.info("ARQ | TX | BUFFER EMPTY") helpers.arq_reset_frame_machine() + await asyncio.sleep(2) + arq_transmit_keep_alive() @@ -469,7 +468,7 @@ def burst_rpt_received(data_in): # ARQ CONNECT HANDLER ############################################################################################################# -def arq_connect(): +async def arq_connect(): static.ARQ_STATE = 'CONNECTING' logging.info("CONN ["+ str(static.MYCALLSIGN, 'utf-8') + "]-> <-["+ str(static.DXCALLSIGN, 'utf-8') + "]") frame_type = bytes([220]) @@ -483,10 +482,11 @@ def arq_connect(): #connection_frame[13:14] = bytes([static.ARQ_READY_FOR_DATA]) #print(connection_frame) - TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") - TRANSMIT_CONNECT_THREAD.start() - - + #TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") + #TRANSMIT_CONNECT_THREAD.start() + #asyncio.run(modem.transmit_signalling(connection_frame)) + modem.transmit_signalling(connection_frame) + def arq_received_connect(data_in): static.ARQ_STATE = 'CONNECTING' @@ -504,11 +504,19 @@ def arq_received_connect(data_in): #connection_frame[12:13] = bytes([static.FREEDV_DATA_MODE]) #send ACK for connect - TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") - TRANSMIT_CONNECT_THREAD.start() - - + #TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") + #TRANSMIT_CONNECT_THREAD.start() + #asyncio.run(modem.transmit_signalling(connection_frame)) + modem.transmit_signalling(connection_frame) +def arq_transmit_keep_alive(): + frame_type = bytes([221]) + connection_frame = bytearray(14) + connection_frame[:1] = frame_type + connection_frame[1:2] = static.DXCALLSIGN_CRC8 + connection_frame[2:3] = static.MYCALLSIGN_CRC8 + modem.transmit_signalling(connection_frame) + def arq_received_connect_keep_alive(data_in): if static.ARQ_SEND_KEEP_ALIVE == True and (static.ARQ_STATE == 'CONNECTING' or static.ARQ_STATE == 'CONNECTED'): @@ -527,7 +535,8 @@ def arq_received_connect_keep_alive(data_in): acktimer = threading.Timer(1.0, modem.transmit_signalling, args=[connection_frame]) acktimer.start() else: - print("keep alive = False") + pass + #print("keep alive = False") ############################################################################################################# # ARQ DATA CHANNEL HANDLER ############################################################################################################# @@ -536,14 +545,14 @@ def arq_received_connect_keep_alive(data_in): async def arq_open_data_channel(): # we need to wait until the last keep alive has been sent. - logging.info("OPEN DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >> << ["+ str(static.DXCALLSIGN, 'utf-8') + "]") + logging.info("OPENING DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >> << ["+ str(static.DXCALLSIGN, 'utf-8') + "]") static.ARQ_SEND_KEEP_ALIVE = False static.ARQ_DATA_CHANNEL_MODE = 12 while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) - print("wir warten 10 sekunden...") - await asyncio.sleep(10) + print("wir warten 2 sekunden...") + await asyncio.sleep(2) connection_frame = bytearray(14) @@ -553,16 +562,17 @@ async def arq_open_data_channel(): connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE]) connection_frame[13:14] = bytes([225]) - TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") - TRANSMIT_CONNECT_THREAD.start() + #TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") + #TRANSMIT_CONNECT_THREAD.start() + #asyncio.run(modem.transmit_signalling(connection_frame)) while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) - + modem.transmit_signalling(connection_frame) def arq_received_data_channel_opener(data_in): - logging.info("OPEN DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >> << ["+ str(static.DXCALLSIGN, 'utf-8') + "]") + logging.info("OPENING DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >> << ["+ str(static.DXCALLSIGN, 'utf-8') + "]") static.ARQ_SEND_KEEP_ALIVE = False static.ARQ_DATA_CHANNEL_MODE = int.from_bytes(bytes(data_in[12:13]), "big") #static.ARQ_READY_FOR_DATA = int.from_bytes(bytes(data_in[13:14]), "big") @@ -574,10 +584,12 @@ def arq_received_data_channel_opener(data_in): connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE]) connection_frame[13:14] = bytes([226]) - TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") - TRANSMIT_CONNECT_THREAD.start() + #TRANSMIT_CONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[connection_frame], name="TRANSMIT_ARQ") + #TRANSMIT_CONNECT_THREAD.start() + #asyncio.run(modem.transmit_signalling(connection_frame)) while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) + modem.transmit_signalling(connection_frame) print("waiting for data....") static.CHANNEL_STATE = 'RECEIVING_DATA' # einen timeout benötigen wir auch noch.... @@ -587,7 +599,7 @@ def arq_received_channel_is_open(data_in): static.ARQ_SEND_KEEP_ALIVE == False if static.ARQ_DATA_CHANNEL_MODE == int.from_bytes(bytes(data_in[12:13]), "big"): - logging.info("OPEN DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >>|<< ["+ str(static.DXCALLSIGN, 'utf-8') + "]") + logging.info("OPENING DATA CHANNEL ["+ str(static.MYCALLSIGN, 'utf-8') + "] >>|<< ["+ str(static.DXCALLSIGN, 'utf-8') + "]") time.sleep(1) static.ARQ_READY_FOR_DATA = True #static.CHANNEL_STATE = 'RECEIVING_DATA': @@ -598,23 +610,27 @@ def arq_received_channel_is_open(data_in): -def arq_disconnect(): +async def arq_disconnect(): # we need to create a "force ignore all" so we don't receive frames any more... Then we don't need a timer static.ARQ_STATE = 'DISCONNECTING' - logging.info("DISC ["+ str(static.MYCALLSIGN, 'utf-8') + "] <--> ["+ str(static.DXCALLSIGN, 'utf-8') + "]") + logging.info("DISC ["+ str(static.MYCALLSIGN, 'utf-8') + "] <-> ["+ str(static.DXCALLSIGN, 'utf-8') + "]") frame_type = bytes([222]) disconnection_frame = frame_type + static.MYCALLSIGN - - + disconnect_timer = threading.Timer(5.0, helpers.arq_disconnect_timeout) disconnect_timer.start() - TRANSMIT_DISCONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[disconnection_frame], name="TRANSMIT_ARQ") - TRANSMIT_DISCONNECT_THREAD.start() + #TRANSMIT_DISCONNECT_THREAD = threading.Thread(target=modem.transmit_signalling, args=[disconnection_frame], name="TRANSMIT_ARQ") + #TRANSMIT_DISCONNECT_THREAD.start() + #asyncio.run(modem.transmit_signalling(disconnection_frame)) + #print("senden...") + #modem.transmit_signalling(disconnection_frame) + #print("könnte das auch ein await regeln?!") while static.CHANNEL_STATE == 'SENDING_SIGNALLING' or static.ARQ_WAIT_FOR_DISCONNECT == False: time.sleep(0.01) + modem.transmit_signalling(disconnection_frame) logging.info("DISC ["+ str(static.MYCALLSIGN, 'utf-8') + "]< X >["+ str(static.DXCALLSIGN, 'utf-8') + "]") static.ARQ_STATE = 'IDLE' @@ -623,7 +639,7 @@ def arq_disconnect(): def arq_disconnect_received(data_in): static.ARQ_STATE = 'DISCONNECTED' - logging.info("DISC ["+ str(static.MYCALLSIGN, 'utf-8') + "] < > ["+ str(static.DXCALLSIGN, 'utf-8') + "]") + logging.info("DISC ["+ str(static.MYCALLSIGN, 'utf-8') + "]< X >["+ str(static.DXCALLSIGN, 'utf-8') + "]") static.ARQ_STATE = 'DISCONNECTED' static.TNC_STATE = 'IDLE' static.DXCALLSIGN = b'' @@ -642,11 +658,13 @@ def transmit_ping(callsign): ping_payload = b'PING' ping_frame = frame_type + ping_payload - TRANSMIT_PING_THREAD = threading.Thread(target=modem.transmit_signalling, args=[ping_frame], name="TRANSMIT_ARQ") - TRANSMIT_PING_THREAD.start() + #TRANSMIT_PING_THREAD = threading.Thread(target=modem.transmit_signalling, args=[ping_frame], name="TRANSMIT_ARQ") + #TRANSMIT_PING_THREAD.start() + #asyncio.run(modem.transmit_signalling(ping_frame)) # wait while sending.... while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) + modem.transmit_signalling(ping_frame) def received_ping(data_in): @@ -655,11 +673,13 @@ def received_ping(data_in): ping_payload = b'PING_ACK' ping_frame = frame_type + static.MYCALLSIGN + ping_payload - TRANSMIT_PING_THREAD = threading.Thread(target=modem.transmit_signalling, args=[ping_frame], name="TRANSMIT_ARQ") - TRANSMIT_PING_THREAD.start() + #TRANSMIT_PING_THREAD = threading.Thread(target=modem.transmit_signalling, args=[ping_frame], name="TRANSMIT_ARQ") + #TRANSMIT_PING_THREAD.start() + #asyncio.run(modem.transmit_signalling(ping_frame)) # wait while sending.... while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) + modem.transmit_signalling(ping_frame) def received_ping_ack(data_in): diff --git a/helpers.py b/helpers.py index 8716acdd..1404c869 100644 --- a/helpers.py +++ b/helpers.py @@ -12,6 +12,8 @@ import logging import crcengine import pyaudio +import data_handler + import static @@ -29,6 +31,12 @@ def get_crc_16(data): crc_data = crc_algorithm(data) crc_data = crc_data.to_bytes(2, byteorder='big') return crc_data + + +def arq_disconnect_timeout(): + static.ARQ_WAIT_FOR_DISCONNECT = True + logging.debug("ARQ_WAIT_FOR_DISCONNECT") + def arq_ack_timeout(): if static.ARQ_STATE == 'RECEIVING_SIGNALLING': @@ -61,9 +69,14 @@ def arq_reset_frame_machine(): static.TX_N_RETRIES = 0 static.ARQ_N_SENT_FRAMES = 0 static.ARQ_TX_N_FRAMES_PER_BURST = 0 - static.TNC_STATE = b'IDLE' - + static.TNC_STATE = b'IDLE' + static.ARQ_SEND_KEEP_ALIVE = True + #start sending keep alive after some seconds + #acktimer = threading.Timer(3.0, data_handler.arq_connect) + #acktimer.start() + #await asyncio.sleep(2) + #modem.transmit_arq_connect() def setup_logging(): logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s:\t%(message)s', datefmt='%H:%M:%S', level=logging.INFO) diff --git a/modem.py b/modem.py index 75656bfe..d4a53e25 100644 --- a/modem.py +++ b/modem.py @@ -11,6 +11,7 @@ from ctypes import * import pathlib import pyaudio import audioop +import asyncio #import sys import logging import time @@ -329,7 +330,7 @@ class RF(): static.UNCODED_BER = Terrs/Tbits - if nbytes == bytes_per_frame:###################################################################################################FREEDV_DATA_BYTES_PER_FRAME + if nbytes == bytes_per_frame:##########################################################FREEDV_DATA_BYTES_PER_FRAME @@ -351,7 +352,7 @@ class RF(): if 50 >= frametype >= 10: if frame != 3 or force == True: - data_handler.data_received(bytes(bytes_out[:-2])) #send payload data to arq checker without CRC16 + data_handler.arq_data_received(bytes(bytes_out[:-2])) #send payload data to arq checker without CRC16 #print("static.ARQ_RX_BURST_BUFFER.count(None) " + str(static.ARQ_RX_BURST_BUFFER.count(None))) if static.ARQ_RX_BURST_BUFFER.count(None) <= 1: diff --git a/sock.py b/sock.py index e0f0d077..314f5d85 100644 --- a/sock.py +++ b/sock.py @@ -44,6 +44,7 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler): if data == 'CQCQCQ': for i in range(0,3): data_handler.transmit_cq() + self.request.sendall(bytes("CALLING CQ")) while static.ARQ_STATE == 'SENDING_SIGNALLING': time.sleep(0.1) pass @@ -71,25 +72,29 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler): if static.TNC_STATE == 'IDLE': # here we send an "CONNECT FRAME - ARQ_CONNECT_THREAD = threading.Thread(target=data_handler.arq_connect, name="ARQ_CONNECT") - ARQ_CONNECT_THREAD.start() - + #ARQ_CONNECT_THREAD = threading.Thread(target=data_handler.arq_connect, name="ARQ_CONNECT") + #ARQ_CONNECT_THREAD.start() + asyncio.run(data_handler.arq_connect()) + self.request.sendall(bytes("CONNECTING", encoding)) + #data_handler.arq_connect() # ARQ DISCONNECT FROM CALLSIGN ---------------------------------------- if data == 'ARQ:DISCONNECT': - ARQ_DISCONNECT_THREAD = threading.Thread(target=data_handler.arq_disconnect, name="ARQ_DISCONNECT") - ARQ_DISCONNECT_THREAD.start() - - + #ARQ_DISCONNECT_THREAD = threading.Thread(target=data_handler.arq_disconnect, name="ARQ_DISCONNECT") + #ARQ_DISCONNECT_THREAD.start() + asyncio.run(data_handler.arq_disconnect()) + self.request.sendall(bytes("DISCONNECTING", encoding)) + #data_handler.arq_disconnect() # TRANSMIT ARQ MESSAGE ------------------------------------------ # wen need to change the TNC_STATE to "CONNECTE" and need to make sure we have a valid callsign and callsign crc8 of the DX station - print(static.ARQ_STATE) + #print(static.ARQ_STATE) if data.startswith('ARQ:DATA') and static.ARQ_STATE == 'CONNECTED': static.ARQ_READY_FOR_DATA = False logging.info("CMD | NEW ARQ DATA") asyncio.run(data_handler.arq_open_data_channel()) + #data_handler.arq_open_data_channel() #wait until we set the data mode # here we need a timeout as well!!! @@ -102,8 +107,10 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler): arqdata = data.split('ARQ:') data_out = bytes(arqdata[1], 'utf-8') - TRANSMIT_ARQ = threading.Thread(target=data_handler.transmit, args=[data_out], name="TRANSMIT_ARQ") - TRANSMIT_ARQ.start() + asyncio.run(data_handler.arq_transmit(data_out)) + #data_handler.arq_transmit(data_out) + #TRANSMIT_ARQ = threading.Thread(target=data_handler.transmit, args=[data_out], name="TRANSMIT_ARQ") + #TRANSMIT_ARQ.start()