small network improvements

tests with fifo queue where not that successfull. This is a part we need to look later on closes #107 and closes #105
This commit is contained in:
dj2ls 2022-01-06 22:15:14 +01:00
parent 99722089dd
commit 63d73b63f3
3 changed files with 38 additions and 79 deletions

View file

@ -12,6 +12,7 @@ import argparse
import threading import threading
import static import static
import subprocess import subprocess
import socketserver
import sys import sys
import helpers import helpers
@ -74,6 +75,15 @@ if __name__ == '__main__':
# --------------------------------------------START CMD SERVER # --------------------------------------------START CMD SERVER
CMD_SERVER_THREAD = threading.Thread(target=sock.start_cmd_socket, name="cmd server") try:
CMD_SERVER_THREAD.start() structlog.get_logger("structlog").info("[TNC] Starting TCP/IP socket", port=static.PORT)
# https://stackoverflow.com/a/16641793
socketserver.TCPServer.allow_reuse_address = True
cmdserver = sock.ThreadedTCPServer((static.HOST, static.PORT), sock.ThreadedTCPRequestHandler)
server_thread = threading.Thread(target=cmdserver.serve_forever)
server_thread.daemon = True
server_thread.start()
except Exception as e:
structlog.get_logger("structlog").error("[TNC] Starting TCP/IP socket failed", port=static.PORT, e=e)

View file

@ -23,20 +23,13 @@ Created on Fri Dec 25 21:25:14 2020
import socketserver import socketserver
import threading import threading
import logging
import ujson as json import ujson as json
#import json
import asyncio
import time import time
import static import static
import data_handler import data_handler
import helpers import helpers
import sys import sys
import os import os
import logging, structlog, log_handler import logging, structlog, log_handler
@ -48,56 +41,49 @@ class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self): def handle(self):
structlog.get_logger("structlog").debug("[TNC] Client connected", ip=self.client_address[0]) structlog.get_logger("structlog").debug("[TNC] Client connected", ip=self.client_address[0], port=self.client_address[1])
# set encoding
encoding = 'utf-8'
# loop through socket buffer until timeout is reached. then close buffer # loop through socket buffer until timeout is reached. then close buffer
socketTimeout = time.time() + 3 socketTimeout = time.time() + static.SOCKET_TIMEOUT
while socketTimeout > time.time(): while socketTimeout > time.time():
time.sleep(0.01) time.sleep(0.01)
encoding = 'utf-8'
#data = str(self.request.recv(1024), 'utf-8')
data = bytes() data = bytes()
# we need to loop through buffer until end of chunk is reached or timeout occured # we need to loop through buffer until end of chunk is reached or timeout occured
while True and socketTimeout > time.time(): while socketTimeout > time.time():
chunk = self.request.recv(71) # we keep amount of bytes short chunk = self.request.recv(71) # we keep amount of bytes short
data += chunk data += chunk
# or chunk.endswith(b'\n'):
if chunk.endswith(b'}\n') or chunk.endswith(b'}'): if chunk.startswith(b'{"type"') and chunk.endswith(b'}\n'):
break break
data = data[:-1] # remove b'\n' data = data[:-1] # remove b'\n'
data = str(data, 'utf-8') data = str(data, encoding)
if len(data) > 0: if len(data) > 0:
# reset socket timeout
socketTimeout = time.time() + static.SOCKET_TIMEOUT socketTimeout = time.time() + static.SOCKET_TIMEOUT
# convert data to json object
# we need to do some error handling in case of socket timeout or decoding issue
try:
# only read first line of string. multiple lines will cause an json error # only read first line of string. multiple lines will cause an json error
# this occurs possibly, if we are getting data too fast # this occurs possibly, if we are getting data too fast
# data = data.splitlines()[0] # data = data.splitlines()[0]
# IndexError: list index out of range
data = data.splitlines()[0] data = data.splitlines()[0]
received_json = json.loads(data)
# except ValueError as e:
# print("++++++++++++ START OF JSON ERROR +++++++++++++++++++++++") # we need to do some error handling in case of socket timeout or decoding issue
# print(e) try:
# print("-----------------------------------")
# print(data) # convert data to json object
# print("++++++++++++ END OF JSON ERROR +++++++++++++++++++++++++") received_json = json.loads(data)
# received_json = {}
# break
# try:
# CQ CQ CQ ----------------------------------------------------- # CQ CQ CQ -----------------------------------------------------
if received_json["command"] == "CQCQCQ": if received_json["command"] == "CQCQCQ":
#socketTimeout = 0
# asyncio.run(data_handler.transmit_cq())
CQ_THREAD = threading.Thread(target=data_handler.transmit_cq, args=[], name="CQ") CQ_THREAD = threading.Thread(target=data_handler.transmit_cq, args=[], name="CQ")
CQ_THREAD.start() CQ_THREAD.start()
@ -118,7 +104,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
if received_json["type"] == 'PING' and received_json["command"] == "PING": if received_json["type"] == 'PING' and received_json["command"] == "PING":
# send ping frame and wait for ACK # send ping frame and wait for ACK
dxcallsign = received_json["dxcallsign"] dxcallsign = received_json["dxcallsign"]
# asyncio.run(data_handler.transmit_ping(dxcallsign))
PING_THREAD = threading.Thread(target=data_handler.transmit_ping, args=[dxcallsign], name="PING") PING_THREAD = threading.Thread(target=data_handler.transmit_ping, args=[dxcallsign], name="PING")
PING_THREAD.start() PING_THREAD.start()
@ -139,8 +125,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
static.DXCALLSIGN = bytes(dxcallsign, 'utf-8') static.DXCALLSIGN = bytes(dxcallsign, 'utf-8')
static.DXCALLSIGN_CRC8 = helpers.get_crc_8( static.DXCALLSIGN_CRC8 = helpers.get_crc_8(static.DXCALLSIGN)
static.DXCALLSIGN)
# dt = datatype # dt = datatype
# --> f = file # --> f = file
@ -155,7 +140,6 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
ARQ_DATA_THREAD = threading.Thread(target=data_handler.open_dc_and_transmit, args=[data_out, mode, n_frames], name="ARQ_DATA") ARQ_DATA_THREAD = threading.Thread(target=data_handler.open_dc_and_transmit, args=[data_out, mode, n_frames], name="ARQ_DATA")
ARQ_DATA_THREAD.start() ARQ_DATA_THREAD.start()
# asyncio.run(data_handler.arq_transmit(data_out))
# send message # send message
if received_json["type"] == 'ARQ' and received_json["command"] == "sendMessage": if received_json["type"] == 'ARQ' and received_json["command"] == "sendMessage":
static.TNC_STATE = 'BUSY' static.TNC_STATE = 'BUSY'
@ -194,9 +178,7 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
static.TNC_STATE = 'IDLE' static.TNC_STATE = 'IDLE'
static.ARQ_STATE = False static.ARQ_STATE = False
# SETTINGS AND STATUS ---------------------------------------------
if received_json["type"] == 'GET' and received_json["command"] == 'STATION_INFO': if received_json["type"] == 'GET' and received_json["command"] == 'STATION_INFO':
output = { output = {
@ -295,41 +277,8 @@ class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
static.RX_MSG_BUFFER = [] static.RX_MSG_BUFFER = []
# exception, if JSON cant be decoded # exception, if JSON cant be decoded
# except Exception as e: except Exception as e:
except:
print("############ START OF ERROR #####################")
print("SOCKET COMMAND ERROR: " + data)
e = sys.exc_info()[0]
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)
print("############ END OF ERROR #######################")
structlog.get_logger("structlog").warning("[TNC] reset of tcp/ip connection...")
#socketTimeout = 0 #socketTimeout = 0
structlog.get_logger("structlog").error("[TNC] Network error", e=e)
structlog.get_logger("structlog").error("[TNC] Network error", e = sys.exc_info()[0]) structlog.get_logger("structlog").warning("[TNC] Closing client socket", ip=self.client_address[0], port=self.client_address[1])
structlog.get_logger("structlog").warning("[TNC] Closing client socket")
def start_cmd_socket():
try:
structlog.get_logger("structlog").info("[TNC] Starting TCP/IP socket", port=static.PORT)
# https://stackoverflow.com/a/16641793
socketserver.TCPServer.allow_reuse_address = True
cmdserver = ThreadedTCPServer((static.HOST, static.PORT), ThreadedTCPRequestHandler)
server_thread = threading.Thread(target=cmdserver.serve_forever)
server_thread.daemon = True
server_thread.start()
except:
structlog.get_logger("structlog").error("[TNC] Starting TCP/IP socket failed", port=static.PORT)
e = sys.exc_info()[0]
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)

View file

@ -29,7 +29,7 @@ DXGRID = b''
# Server Defaults # Server Defaults
HOST = "0.0.0.0" HOST = "0.0.0.0"
PORT = 3000 PORT = 3000
SOCKET_TIMEOUT = 3 # seconds SOCKET_TIMEOUT = 1 # seconds
# --------------------------------- # ---------------------------------