first attempt with ASYNCIO instead of THREADING

This commit is contained in:
DJ2LS 2021-03-04 14:28:01 +01:00 committed by GitHub
parent 0f66b9dad4
commit 821be594f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 34 deletions

View file

@ -11,6 +11,7 @@ import threading
import time
from random import randrange
import asyncio
import trio
import static
import modem
@ -24,6 +25,7 @@ import helpers
# ARQ DATA HANDLER
#############################################################################################################
def arq_data_received(data_in):
static.TNC_STATE = 'BUSY'
@ -87,6 +89,7 @@ def arq_data_received(data_in):
#TRANSMIT ACK FRAME FOR BURST-----------------------------------------------
modem.transmit_signalling(ack_frame)
static.CHANNEL_STATE = 'RECEIVING_DATA'
#TRANSMIT_ARQ_ACK_THREAD = threading.Thread(target=modem.transmit_arq_ack, args=[ack_frame], name="TRANSMIT_ARQ_BURST")
#TRANSMIT_ARQ_ACK_THREAD.start()
#while static.ARQ_STATE == 'SENDING_ACK':
@ -120,7 +123,7 @@ def arq_data_received(data_in):
#TRANSMIT RPT FRAME FOR BURST-----------------------------------------------
modem.transmit_signalling(rpt_frame)
static.CHANNEL_STATE = 'RECEIVING_DATA'
# ---------------------------- FRAME MACHINE
# --------------- IF LIST NOT CONTAINS "None" stick everything together
@ -178,7 +181,8 @@ def arq_data_received(data_in):
logging.info("ARQ | TX | ARQ DATA FRAME ACK [" + str(static.FRAME_CRC.hex()) +"] [BER."+str(static.BER)+"]")
modem.transmit_signalling(ack_frame)
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
# clearing buffers and resetting counters
static.ARQ_RX_BURST_BUFFER = []
static.ARQ_RX_FRAME_BUFFER = []
@ -187,12 +191,16 @@ def arq_data_received(data_in):
static.ARQ_N_ARQ_FRAMES_PER_DATA_FRAME = 0
static.TNC_STATE = 'IDLE'
static.ARQ_SEND_KEEP_ALIVE = True
static.ARQ_READY_FOR_DATA = False
logging.info("DATA ["+ str(static.MYCALLSIGN, 'utf-8') + "]<< >>["+ str(static.DXCALLSIGN, 'utf-8') + "] [BER."+str(static.BER)+"]")
else:
print("ARQ_FRAME_BOF_RECEIVED " + str(static.ARQ_FRAME_BOF_RECEIVED))
print("ARQ_FRAME_EOF_RECEIVED " + str(static.ARQ_FRAME_EOF_RECEIVED))
logging.error("ARQ | RX | DATA FRAME NOT SUCESSFULLY RECEIVED!")
static.ARQ_STATE = 'IDLE'
static.ARQ_SEND_KEEP_ALIVE = True
static.ARQ_READY_FOR_DATA = False
logging.info("DATA ["+ str(static.MYCALLSIGN, 'utf-8') + "]<< >>["+ str(static.DXCALLSIGN, 'utf-8') + "] [BER."+str(static.BER)+"]")
@ -264,9 +272,9 @@ async def arq_transmit(data_out):
#TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST")
#TRANSMIT_ARQ_BURST_THREAD.start()
#asyncio.run(modem.transmit_arq_burst())
modem.transmit_arq_burst()
await modem.transmit_arq_burst()
# lets wait during sending. After sending is finished we will continue
while static.CHANNEL_STATE == 'SENDING_DATA':
while static.CHANNEL_STATE == 'SENDING_DATA':
time.sleep(0.01)
# --------------------------- START TIMER FOR WAITING FOR ACK ---> IF TIMEOUT REACHED, ACK_TIMEOUT = 1
@ -276,7 +284,18 @@ async def arq_transmit(data_out):
helpers.arq_reset_timeout(False)
helpers.arq_reset_ack(False)
#print(static.ARQ_RX_ACK_TIMEOUT)
#print("timeout......?!?")
#asyncio.ensure_future(helpers.set_variable_after_timeout())
##################task = asyncio.create_task(helpers.set_after_timeout())
#async with trio.open_nursery() as nursery:
# nursery.start_soon(helpers.set_after_timeout())
#print("TIMEOUT glaube gestartet...")
#print(task)
#print(static.ARQ_RX_ACK_TIMEOUT)
acktimer = threading.Timer(static.ARQ_RX_ACK_TIMEOUT_SECONDS, helpers.arq_ack_timeout)
acktimer.start()
@ -301,12 +320,10 @@ async def arq_transmit(data_out):
#TRANSMIT_ARQ_BURST_THREAD = threading.Thread(target=modem.transmit_arq_burst, name="TRANSMIT_ARQ_BURST")
#TRANSMIT_ARQ_BURST_THREAD.start()
#asyncio.run(modem.transmit_arq_burst())
modem.transmit_arq_burst()
await modem.transmit_arq_burst()
# lets wait during sending. After sending is finished we will continue
while static.ARQ_STATE == 'SENDING_DATA':
time.sleep(0.01)
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
helpers.arq_reset_timeout(False)
@ -389,16 +406,19 @@ async def arq_transmit(data_out):
# ----------- if no ACK received and out of retries.....stop frame sending
if static.ARQ_ACK_RECEIVED == False and static.ARQ_FRAME_ACK_RECEIVED == False and static.ARQ_RX_ACK_TIMEOUT == True:
logging.error("ARQ | TX | NO ACK RECEIVED | DATA SHOULD BE RESEND!")
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
logging.error("------------------------------------------------------")
break
#-------------------------BREAK TX BUFFER LOOP IF ALL PACKETS HAVE BEEN SENT AND WE GOT A FRAME ACK
elif static.ARQ_N_SENT_FRAMES == static.TX_BUFFER_SIZE and static.ARQ_FRAME_ACK_RECEIVED == True:
logging.log(25,"ARQ | RX | FRAME ACK RECEIVED - DATA TRANSMITTED! :-)")
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
break
elif static.ARQ_FRAME_ACK_RECEIVED == False and static.ARQ_RX_FRAME_TIMEOUT == True:
logging.error("ARQ | TX | NO FRAME ACK RECEIVED")
static.CHANNEL_STATE = 'RECEIVING_DATA'
break
else:
@ -427,8 +447,8 @@ async def arq_transmit(data_out):
# BURST MACHINE TO DEFINE N BURSTS PER FRAME ---> LATER WE CAN USE CHANNEL MESSUREMENT TO SET FRAMES PER BURST
def get_n_frames_per_burst():
n_frames_per_burst = randrange(1,10)
#n_frames_per_burst = 1
#n_frames_per_burst = randrange(1,10)
n_frames_per_burst = 1
return n_frames_per_burst
@ -544,7 +564,7 @@ async def arq_open_data_channel():
connection_frame[1:2] = static.DXCALLSIGN_CRC8
connection_frame[2:3] = static.MYCALLSIGN_CRC8
connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE])
connection_frame[13:14] = bytes([225])
#connection_frame[13:14] = bytes([225])
while static.CHANNEL_STATE == 'SENDING_SIGNALLING':
time.sleep(0.01)
@ -563,7 +583,7 @@ def arq_received_data_channel_opener(data_in):
connection_frame[1:2] = static.DXCALLSIGN_CRC8
connection_frame[2:3] = static.MYCALLSIGN_CRC8
connection_frame[12:13] = bytes([static.ARQ_DATA_CHANNEL_MODE])
connection_frame[13:14] = bytes([226])
#connection_frame[13:14] = bytes([226])
while static.CHANNEL_STATE == 'SENDING_SIGNALLING':
time.sleep(0.01)

View file

@ -11,6 +11,7 @@ import threading
import logging
import crcengine
import pyaudio
import asyncio
import data_handler
@ -31,6 +32,17 @@ def get_crc_16(data):
crc_data = crc_algorithm(data)
crc_data = crc_data.to_bytes(2, byteorder='big')
return crc_data
async def set_after_timeout():
while True:
logging.info("HALLO?!?")
time.sleep(1)
print("HALLOIOIOIOIOIOI")
static.ARQ_RX_ACK_TIMEOUT = True
await asyncio.sleep(1.1)
#await asyncio.sleep(timeout)
#vars()[variable] = value
def arq_disconnect_timeout():
@ -39,17 +51,17 @@ def arq_disconnect_timeout():
def arq_ack_timeout():
if static.ARQ_STATE == 'RECEIVING_SIGNALLING':
if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING':
static.ARQ_RX_ACK_TIMEOUT = True
logging.debug("ARQ_RX_ACK_TIMEOUT")
def arq_rpt_timeout():
if static.ARQ_STATE == 'RECEIVING_SIGNALLING':
if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING':
static.ARQ_RX_RPT_TIMEOUT = True
logging.debug("ARQ_RX_RPT_TIMEOUT")
def arq_frame_timeout():
if static.ARQ_STATE == 'RECEIVING_SIGNALLING':
if static.CHANNEL_STATE == 'RECEIVING_SIGNALLING':
static.ARQ_RX_FRAME_TIMEOUT = True
logging.debug("ARQ_RX_FRAME_TIMEOUT")
@ -71,6 +83,8 @@ def arq_reset_frame_machine():
static.ARQ_TX_N_FRAMES_PER_BURST = 0
static.TNC_STATE = b'IDLE'
static.ARQ_SEND_KEEP_ALIVE = True
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
static.ARQ_READY_FOR_DATA = False
#start sending keep alive after some seconds
#acktimer = threading.Timer(3.0, data_handler.arq_connect)
@ -109,3 +123,9 @@ def list_audio_devices():
for line in devices:
print(line)

View file

@ -100,7 +100,10 @@ class RF():
def transmit_signalling(self,ack_buffer):
#print(ack_buffer)
#static.ARQ_STATE = 'SENDING_ACK'
state_before_transmit = static.CHANNEL_STATE
static.CHANNEL_STATE = 'SENDING_SIGNALLING'
static.PTT_STATE = True
self.my_rig.set_ptt(self.hamlib_ptt_type,1)
@ -140,14 +143,16 @@ class RF():
self.my_rig.set_ptt(self.hamlib_ptt_type,0)
static.PTT_STATE = False
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
static.CHANNEL_STATE = state_before_transmit
#static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
#static.ARQ_STATE = 'RECEIVING_DATA'
#--------------------------------------------------------------------------------------------------------
# GET ARQ BURST FRAME VOM BUFFER AND MODULATE IT
def transmit_arq_burst(self):
async def transmit_arq_burst(self):
self.my_rig.set_ptt(self.hamlib_ptt_type,1)
static.PTT_STATE = True
state_before_transmit = static.CHANNEL_STATE
static.CHANNEL_STATE = 'SENDING_DATA'
self.c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
@ -248,6 +253,7 @@ class RF():
# -------------- transmit audio
self.stream_tx.write(bytes(txbuffer))
#static.ARQ_STATE = 'IDLE'
#static.CHANNEL_STATE = state_before_transmit
static.CHANNEL_STATE = 'RECEIVING_SIGNALLING'
static.PTT_STATE = False
self.my_rig.set_ptt(self.hamlib_ptt_type,0)
@ -282,7 +288,7 @@ class RF():
bytes_out = bytes_out() #get pointer to bytes_out
while static.FREEDV_RECEIVE == True:
time.sleep(0.01)
time.sleep(0.05)
# stuck in sync counter
stuck_in_sync_counter = 0
@ -292,8 +298,6 @@ class RF():
# here we do a buffer cleanup before returning to demod loop
dummy_mod = bytes(self.c_lib.freedv_nin(freedv))
self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod)
self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod)
self.c_lib.freedv_rawdatarx(freedv, bytes_out, dummy_mod)
#demod loop
while (static.CHANNEL_STATE == 'RECEIVING_DATA' and static.ARQ_DATA_CHANNEL_MODE == mode) or (static.CHANNEL_STATE == 'RECEIVING_SIGNALLING' and static.FREEDV_SIGNALLING_MODE == mode):
@ -302,7 +306,7 @@ class RF():
static.FREEDV_DATA_BYTES_PER_FRAME = bytes_per_frame
static.FREEDV_DATA_PAYLOAD_PER_FRAME = bytes_per_frame - 2
#time.sleep(0.01)
time.sleep(0.01)
nin = self.c_lib.freedv_nin(freedv)
#nin = int(nin*(static.AUDIO_SAMPLE_RATE_RX/static.MODEM_SAMPLE_RATE))
data_in = self.stream_rx.read(nin, exception_on_overflow = False)
@ -330,12 +334,6 @@ class RF():
stuck_in_sync_10_counter = 0
#-----------------------------------
# get bit errors
#Tbits = self.c_lib.freedv_get_total_bits(freedv)
#Terrs = self.c_lib.freedv_get_total_bit_errors(freedv)
#if Tbits != 0:
# static.UNCODED_BER = Terrs/Tbits
if nbytes == bytes_per_frame:##########################################################FREEDV_DATA_BYTES_PER_FRAME
self.calculate_ber(freedv)
@ -433,20 +431,16 @@ class RF():
print(frametype)
# DO UNSYNC AFTER LAST BURST by checking the frame nums agains the total frames per burst
if frame == n_frames_per_burst:
#reset bit error counters
#self.c_lib.freedv_set_total_bit_errors(freedv,0)
#self.c_lib.freedv_set_total_bits(freedv,0)
logging.debug("LAST FRAME ---> UNSYNC")
self.c_lib.freedv_set_sync(freedv, 0) #FORCE UNSYNC
rxstatus = self.c_lib.freedv_get_rx_status(freedv)
#logging.info("DATA-" + str(rxstatus))
#logging.info("DATA-" + str(mode) + " " +str(rxstatus))
if rxstatus == 10:
self.c_lib.freedv_set_sync(freedv, 0) #FORCE UNSYNC
print("SIGNALLING -SYNC 10- Trigger")
print(" -SYNC 10- Trigger - M:" + str(mode))
self.calculate_ber(freedv)
def calculate_ber(self,freedv):

View file

@ -93,6 +93,7 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler):
if data.startswith('ARQ:DATA') and static.ARQ_STATE == 'CONNECTED':
static.ARQ_READY_FOR_DATA = False
logging.info("CMD | NEW ARQ DATA")
self.request.sendall(b'SENDIN ARQ DATA')
asyncio.run(data_handler.arq_open_data_channel())
#data_handler.arq_open_data_channel()
@ -108,6 +109,7 @@ class CMDTCPRequestHandler(socketserver.BaseRequestHandler):
data_out = bytes(arqdata[1], 'utf-8')
asyncio.run(data_handler.arq_transmit(data_out))
#data_handler.arq_transmit(data_out)
#TRANSMIT_ARQ = threading.Thread(target=data_handler.transmit, args=[data_out], name="TRANSMIT_ARQ")
#TRANSMIT_ARQ.start()