first version with RX_BUFFER as queue #233

This commit is contained in:
dj2ls 2022-09-05 10:37:50 +02:00
parent c3f8e3d899
commit 830b62583b
4 changed files with 38 additions and 24 deletions

View file

@ -25,7 +25,7 @@ import structlog
import ujson as json
from codec2 import FREEDV_MODE
from exceptions import NoCallsign
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, RX_BUFFER
from static import FRAME_TYPE as FR_TYPE
TESTMODE = False
@ -697,7 +697,20 @@ class DATA:
# Re-code data_frame in base64, UTF-8 for JSON UI communication.
base64_data = base64.b64encode(data_frame).decode("UTF-8")
static.RX_BUFFER.append(
if not RX_BUFFER.full():
# check if RX_BUFFER isn't full
pass
else:
# if full, free space by getting an item
self.log.info(
"[TNC] ARQ | RX | RX_BUFFER FULL - dropping old data",
buffer_size=RX_BUFFER.qsize(),
)
RX_BUFFER.get()
# add item to RX_BUFFER
RX_BUFFER.put(
[
self.transmission_uuid,
timestamp,
@ -706,6 +719,7 @@ class DATA:
base64_data,
]
)
self.send_data_to_socket_queue(
freedata="tnc-message",
arq="transmission",

View file

@ -9,3 +9,6 @@ DATA_QUEUE_RECEIVED = queue.Queue()
# Initialize FIFO queue to store received frames
MODEM_RECEIVED_QUEUE = queue.Queue()
MODEM_TRANSMIT_QUEUE = queue.Queue()
# Initialize FIFO queue to finally store received data
RX_BUFFER = queue.Queue(maxsize=20)

View file

@ -30,7 +30,7 @@ import static
import structlog
import ujson as json
from exceptions import NoCallsign
from queues import DATA_QUEUE_TRANSMIT
from queues import DATA_QUEUE_TRANSMIT, RX_BUFFER
SOCKET_QUEUE = queue.Queue()
DAEMON_QUEUE = queue.Queue()
@ -420,24 +420,22 @@ def process_tnc_commands(data):
"data-array": [],
}
for i in range(len(static.RX_BUFFER)):
# print(static.RX_BUFFER[i][4])
# rawdata = json.loads(static.RX_BUFFER[i][4])
base64_data = static.RX_BUFFER[i][4]
output["data-array"].append(
{
"uuid": static.RX_BUFFER[i][0],
"timestamp": static.RX_BUFFER[i][1],
"dxcallsign": str(static.RX_BUFFER[i][2], "utf-8"),
"dxgrid": str(static.RX_BUFFER[i][3], "utf-8"),
"data": base64_data,
}
)
jsondata = json.dumps(output)
# self.request.sendall(bytes(jsondata, encoding))
SOCKET_QUEUE.put(jsondata)
command_response("rx_buffer", True)
if not RX_BUFFER.empty():
for _buffer_length in range(RX_BUFFER.qsize()):
base64_data = RX_BUFFER.queue[_buffer_length][4]
output["data-array"].append(
{
"uuid": RX_BUFFER.queue[_buffer_length][0],
"timestamp": RX_BUFFER.queue[_buffer_length][1],
"dxcallsign": str(RX_BUFFER.queue[_buffer_length][2], "utf-8"),
"dxgrid": str(RX_BUFFER.queue[_buffer_length][3], "utf-8"),
"data": base64_data,
}
)
jsondata = json.dumps(output)
# self.request.sendall(bytes(jsondata, encoding))
SOCKET_QUEUE.put(jsondata)
command_response("rx_buffer", True)
except Exception as err:
command_response("rx_buffer", False)
@ -452,7 +450,7 @@ def process_tnc_commands(data):
and received_json["command"] == "del_rx_buffer"
):
try:
static.RX_BUFFER = []
RX_BUFFER.queue.clear()
command_response("del_rx_buffer", True)
except Exception as err:
command_response("del_rx_buffer", False)
@ -489,7 +487,7 @@ def send_tnc_state():
"fft": str(static.FFT),
"channel_busy": str(static.CHANNEL_BUSY),
"scatter": static.SCATTER,
"rx_buffer_length": str(len(static.RX_BUFFER)),
"rx_buffer_length": str(RX_BUFFER.qsize()),
"rx_msg_buffer_length": str(len(static.RX_MSG_BUFFER)),
"arq_bytes_per_minute": str(static.ARQ_BYTES_PER_MINUTE),
"arq_bytes_per_minute_burst": str(static.ARQ_BYTES_PER_MINUTE_BURST),

View file

@ -109,7 +109,6 @@ BEACON_STATE: bool = False
BEACON_PAUSE: bool = False
# ------- RX BUFFER
RX_BUFFER: list = []
RX_MSG_BUFFER: list = []
RX_BURST_BUFFER: list = []
RX_FRAME_BUFFER: bytes = b""