From 821be594f25de51ad3f9a85da66a0291a619c043 Mon Sep 17 00:00:00 2001 From: DJ2LS <75909252+DJ2LS@users.noreply.github.com> Date: Thu, 4 Mar 2021 14:28:01 +0100 Subject: [PATCH] first attempt with ASYNCIO instead of THREADING --- data_handler.py | 44 ++++++++++++++++++++++++++++++++------------ helpers.py | 26 +++++++++++++++++++++++--- modem.py | 32 +++++++++++++------------------- sock.py | 2 ++ 4 files changed, 70 insertions(+), 34 deletions(-) diff --git a/data_handler.py b/data_handler.py index 1efdc284..e8ee925b 100644 --- a/data_handler.py +++ b/data_handler.py @@ -11,6 +11,7 @@ import threading import time from random import randrange import asyncio +import trio import static import modem @@ -24,6 +25,7 @@ import helpers # ARQ DATA HANDLER ############################################################################################################# + def arq_data_received(data_in): static.TNC_STATE = 'BUSY' @@ -87,6 +89,7 @@ def arq_data_received(data_in): #TRANSMIT ACK FRAME FOR BURST----------------------------------------------- modem.transmit_signalling(ack_frame) + static.CHANNEL_STATE = 'RECEIVING_DATA' #TRANSMIT_ARQ_ACK_THREAD = threading.Thread(target=modem.transmit_arq_ack, args=[ack_frame], name="TRANSMIT_ARQ_BURST") #TRANSMIT_ARQ_ACK_THREAD.start() #while static.ARQ_STATE == 'SENDING_ACK': @@ -120,7 +123,7 @@ def arq_data_received(data_in): #TRANSMIT RPT FRAME FOR BURST----------------------------------------------- modem.transmit_signalling(rpt_frame) - + static.CHANNEL_STATE = 'RECEIVING_DATA' # ---------------------------- FRAME MACHINE # --------------- IF LIST NOT CONTAINS "None" stick everything together @@ -178,7 +181,8 @@ def arq_data_received(data_in): logging.info("ARQ | TX | ARQ DATA FRAME ACK [" + str(static.FRAME_CRC.hex()) +"] [BER."+str(static.BER)+"]") modem.transmit_signalling(ack_frame) - + + static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' # clearing buffers and resetting counters static.ARQ_RX_BURST_BUFFER = [] static.ARQ_RX_FRAME_BUFFER = [] @@ -187,12 +191,16 @@ def arq_data_received(data_in): static.ARQ_N_ARQ_FRAMES_PER_DATA_FRAME = 0 static.TNC_STATE = 'IDLE' static.ARQ_SEND_KEEP_ALIVE = True + static.ARQ_READY_FOR_DATA = False logging.info("DATA ["+ str(static.MYCALLSIGN, 'utf-8') + "]<< >>["+ str(static.DXCALLSIGN, 'utf-8') + "] [BER."+str(static.BER)+"]") else: + print("ARQ_FRAME_BOF_RECEIVED " + str(static.ARQ_FRAME_BOF_RECEIVED)) + print("ARQ_FRAME_EOF_RECEIVED " + str(static.ARQ_FRAME_EOF_RECEIVED)) logging.error("ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!") static.ARQ_STATE = 'IDLE' static.ARQ_SEND_KEEP_ALIVE = True + static.ARQ_READY_FOR_DATA = False logging.info("DATA ["+ str(static.MYCALLSIGN, 'utf-8') + "]<< >>["+ str(static.DXCALLSIGN, 'utf-8') + "] [BER."+str(static.BER)+"]") @@ -264,9 +272,9 @@ async def arq_transmit(data_out): #TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") #TRANSMIT_ARQ_BURST_THREAD.start() #asyncio.run(modem.transmit_arq_burst()) - modem.transmit_arq_burst() + await modem.transmit_arq_burst() # lets wait during sending. After sending is finished we will continue - while static.CHANNEL_STATE == 'SENDING_DATA': + while static.CHANNEL_STATE == 'SENDING_DATA': time.sleep(0.01) # --------------------------- START TIMER FOR WAITING FOR ACK ---> IF TIMEOUT REACHED, ACK_TIMEOUT = 1 @@ -276,7 +284,18 @@ async def arq_transmit(data_out): helpers.arq_reset_timeout(False) helpers.arq_reset_ack(False) - + #print(static.ARQ_RX_ACK_TIMEOUT) + #print("timeout......?!?") + #asyncio.ensure_future(helpers.set_variable_after_timeout()) + ##################task = asyncio.create_task(helpers.set_after_timeout()) + #async with trio.open_nursery() as nursery: + # nursery.start_soon(helpers.set_after_timeout()) + + + #print("TIMEOUT glaube gestartet...") + #print(task) + #print(static.ARQ_RX_ACK_TIMEOUT) + acktimer = threading.Timer(static.ARQ_RX_ACK_TIMEOUT_SECONDS, helpers.arq_ack_timeout) acktimer.start() @@ -301,12 +320,10 @@ async def arq_transmit(data_out): #TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST") #TRANSMIT_ARQ_BURST_THREAD.start() #asyncio.run(modem.transmit_arq_burst()) - modem.transmit_arq_burst() + await modem.transmit_arq_burst() # lets wait during sending. After sending is finished we will continue while static.ARQ_STATE == 'SENDING_DATA': time.sleep(0.01) - - static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' helpers.arq_reset_timeout(False) @@ -389,16 +406,19 @@ async def arq_transmit(data_out): # ----------- if no ACK received and out of retries.....stop frame sending if static.ARQ_ACK_RECEIVED == False and static.ARQ_FRAME_ACK_RECEIVED == False and static.ARQ_RX_ACK_TIMEOUT == True: logging.error("ARQ | TX | NO ACK RECEIVED | DATA SHOULD BE RESEND!") + static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' logging.error("------------------------------------------------------") break #-------------------------BREAK TX BUFFER LOOP IF ALL PACKETS HAVE BEEN SENT AND WE GOT A FRAME ACK elif static.ARQ_N_SENT_FRAMES == static.TX_BUFFER_SIZE and static.ARQ_FRAME_ACK_RECEIVED == True: logging.log(25,"ARQ | RX | FRAME ACK RECEIVED - DATA TRANSMITTED! :-)") + static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' break elif static.ARQ_FRAME_ACK_RECEIVED == False and static.ARQ_RX_FRAME_TIMEOUT == True: logging.error("ARQ | TX | NO FRAME ACK RECEIVED") + static.CHANNEL_STATE = 'RECEIVING_DATA' break else: @@ -427,8 +447,8 @@ async def arq_transmit(data_out): # BURST MACHINE TO DEFINE N BURSTS PER FRAME ---> LATER WE CAN USE CHANNEL MESSUREMENT TO SET FRAMES PER BURST def get_n_frames_per_burst(): - n_frames_per_burst = randrange(1,10) - #n_frames_per_burst = 1 + #n_frames_per_burst = randrange(1,10) + n_frames_per_burst = 1 return n_frames_per_burst @@ -544,7 +564,7 @@ async def arq_open_data_channel(): connection_frame[1:2] = static.DXCALLSIGN_CRC8 connection_frame[2:3] = static.MYCALLSIGN_CRC8 connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE]) - connection_frame[13:14] = bytes([225]) + #connection_frame[13:14] = bytes([225]) while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) @@ -563,7 +583,7 @@ def arq_received_data_channel_opener(data_in): connection_frame[1:2] = static.DXCALLSIGN_CRC8 connection_frame[2:3] = static.MYCALLSIGN_CRC8 connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE]) - connection_frame[13:14] = bytes([226]) + #connection_frame[13:14] = bytes([226]) while static.CHANNEL_STATE == 'SENDING_SIGNALLING': time.sleep(0.01) diff --git a/helpers.py b/helpers.py index 1404c869..4da88103 100644 --- a/helpers.py +++ b/helpers.py @@ -11,6 +11,7 @@ import threading import logging import crcengine import pyaudio +import asyncio import data_handler @@ -31,6 +32,17 @@ def get_crc_16(data): crc_data = crc_algorithm(data) crc_data = crc_data.to_bytes(2, byteorder='big') return crc_data + +async def set_after_timeout(): + while True: + logging.info("HALLO?!?") + time.sleep(1) + print("HALLOIOIOIOIOIOI") + static.ARQ_RX_ACK_TIMEOUT = True + await asyncio.sleep(1.1) + #await asyncio.sleep(timeout) + #vars()[variable] = value + def arq_disconnect_timeout(): @@ -39,17 +51,17 @@ def arq_disconnect_timeout(): def arq_ack_timeout(): - if static.ARQ_STATE == 'RECEIVING_SIGNALLING': + if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING': static.ARQ_RX_ACK_TIMEOUT = True logging.debug("ARQ_RX_ACK_TIMEOUT") def arq_rpt_timeout(): - if static.ARQ_STATE == 'RECEIVING_SIGNALLING': + if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING': static.ARQ_RX_RPT_TIMEOUT = True logging.debug("ARQ_RX_RPT_TIMEOUT") def arq_frame_timeout(): - if static.ARQ_STATE == 'RECEIVING_SIGNALLING': + if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING': static.ARQ_RX_FRAME_TIMEOUT = True logging.debug("ARQ_RX_FRAME_TIMEOUT") @@ -71,6 +83,8 @@ def arq_reset_frame_machine(): static.ARQ_TX_N_FRAMES_PER_BURST = 0 static.TNC_STATE = b'IDLE' static.ARQ_SEND_KEEP_ALIVE = True + static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' + static.ARQ_READY_FOR_DATA = False #start sending keep alive after some seconds #acktimer = threading.Timer(3.0, data_handler.arq_connect) @@ -109,3 +123,9 @@ def list_audio_devices(): for line in devices: print(line) + + + + + + diff --git a/modem.py b/modem.py index f6295c23..fe0ce378 100644 --- a/modem.py +++ b/modem.py @@ -100,7 +100,10 @@ class RF(): def transmit_signalling(self,ack_buffer): #print(ack_buffer) #static.ARQ_STATE = 'SENDING_ACK' + + state_before_transmit = static.CHANNEL_STATE static.CHANNEL_STATE = 'SENDING_SIGNALLING' + static.PTT_STATE = True self.my_rig.set_ptt(self.hamlib_ptt_type,1) @@ -140,14 +143,16 @@ class RF(): self.my_rig.set_ptt(self.hamlib_ptt_type,0) static.PTT_STATE = False - static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' + static.CHANNEL_STATE = state_before_transmit + #static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' #static.ARQ_STATE = 'RECEIVING_DATA' #-------------------------------------------------------------------------------------------------------- # GET ARQ BURST FRAME VOM BUFFER AND MODULATE IT - def transmit_arq_burst(self): + async def transmit_arq_burst(self): self.my_rig.set_ptt(self.hamlib_ptt_type,1) static.PTT_STATE = True + state_before_transmit = static.CHANNEL_STATE static.CHANNEL_STATE = 'SENDING_DATA' self.c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte) @@ -248,6 +253,7 @@ class RF(): # -------------- transmit audio self.stream_tx.write(bytes(txbuffer)) #static.ARQ_STATE = 'IDLE' + #static.CHANNEL_STATE = state_before_transmit static.CHANNEL_STATE = 'RECEIVING_SIGNALLING' static.PTT_STATE = False self.my_rig.set_ptt(self.hamlib_ptt_type,0) @@ -282,7 +288,7 @@ class RF(): bytes_out = bytes_out() #get pointer to bytes_out while static.FREEDV_RECEIVE == True: - time.sleep(0.01) + time.sleep(0.05) # stuck in sync counter stuck_in_sync_counter = 0 @@ -292,8 +298,6 @@ class RF(): # here we do a buffer cleanup before returning to demod loop dummy_mod = bytes(self.c_lib.freedv_nin(freedv)) self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod) - self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod) - self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod) #demod loop while (static.CHANNEL_STATE == 'RECEIVING_DATA' and static.ARQ_DATA_CHANNEL_MODE == mode) or (static.CHANNEL_STATE == 'RECEIVING_SIGNALLING' and static.FREEDV_SIGNALLING_MODE == mode): @@ -302,7 +306,7 @@ class RF(): static.FREEDV_DATA_BYTES_PER_FRAME = bytes_per_frame static.FREEDV_DATA_PAYLOAD_PER_FRAME = bytes_per_frame - 2 - #time.sleep(0.01) + time.sleep(0.01) nin = self.c_lib.freedv_nin(freedv) #nin = int(nin*(static.AUDIO_SAMPLE_RATE_RX/static.MODEM_SAMPLE_RATE)) data_in = self.stream_rx.read(nin, exception_on_overflow = False) @@ -330,12 +334,6 @@ class RF(): stuck_in_sync_10_counter = 0 #----------------------------------- - # get bit errors - #Tbits = self.c_lib.freedv_get_total_bits(freedv) - #Terrs = self.c_lib.freedv_get_total_bit_errors(freedv) - #if Tbits != 0: - # static.UNCODED_BER = Terrs/Tbits - if nbytes == bytes_per_frame:##########################################################FREEDV_DATA_BYTES_PER_FRAME self.calculate_ber(freedv) @@ -433,20 +431,16 @@ class RF(): print(frametype) # DO UNSYNC AFTER LAST BURST by checking the frame nums agains the total frames per burst if frame == n_frames_per_burst: - - - #reset bit error counters - #self.c_lib.freedv_set_total_bit_errors(freedv,0) - #self.c_lib.freedv_set_total_bits(freedv,0) logging.debug("LAST FRAME ---> UNSYNC") self.c_lib.freedv_set_sync(freedv, 0) #FORCE UNSYNC rxstatus = self.c_lib.freedv_get_rx_status(freedv) - #logging.info("DATA-" + str(rxstatus)) + #logging.info("DATA-" + str(mode) + " " +str(rxstatus)) if rxstatus == 10: self.c_lib.freedv_set_sync(freedv, 0) #FORCE UNSYNC - print("SIGNALLING -SYNC 10- Trigger") + print(" -SYNC 10- Trigger - M:" + str(mode)) + self.calculate_ber(freedv) def calculate_ber(self,freedv): diff --git a/sock.py b/sock.py index 26a0f1f8..47019b34 100644 --- a/sock.py +++ b/sock.py @@ -93,6 +93,7 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler): if data.startswith('ARQ:DATA') and static.ARQ_STATE == 'CONNECTED': static.ARQ_READY_FOR_DATA = False logging.info("CMD | NEW ARQ DATA") + self.request.sendall(b'SENDIN ARQ DATA') asyncio.run(data_handler.arq_open_data_channel()) #data_handler.arq_open_data_channel() @@ -108,6 +109,7 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler): data_out = bytes(arqdata[1], 'utf-8') asyncio.run(data_handler.arq_transmit(data_out)) + #data_handler.arq_transmit(data_out) #TRANSMIT_ARQ = threading.Thread(target=data_handler.transmit, args=[data_out], name="TRANSMIT_ARQ") #TRANSMIT_ARQ.start()