#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Fri Dec 25 21:25:14 2020 @author: DJ2LS # GET COMMANDS # "command" : "..." # SET COMMANDS # "command" : "..." # "parameter" : " ..." # DATA COMMANDS # "command" : "..." # "type" : "..." # "dxcallsign" : "..." # "data" : "..." """ import socketserver import threading import ujson as json import time import static import data_handler import helpers import sys import os import logging, structlog, log_handler import queue import psutil import audio SOCKET_QUEUE = queue.Queue() DAEMON_QUEUE = queue.Queue() CONNECTED_CLIENTS = set() class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass class ThreadedTCPRequestHandler(socketserver.StreamRequestHandler): def send_to_client(self): while self.connection_alive: # send tnc state as network stream # check server port against daemon port and send corresponding data if self.server.server_address[1] == static.PORT and not static.TNCSTARTED: data = send_tnc_state() SOCKET_QUEUE.put(data) else: data = send_daemon_state() SOCKET_QUEUE.put(data) time.sleep(0.5) while not SOCKET_QUEUE.empty(): data = SOCKET_QUEUE.get() sock_data = bytes(data, 'utf-8') sock_data += b'\n' # append line limiter # send data to all clients for client in CONNECTED_CLIENTS: try: client.send(sock_data) except: print("connection lost...") CONNECTED_CLIENTS.remove(self.request) # we want to transmit scatter data only once to reduce network traffic static.SCATTER = [] # we want to display INFO messages only once static.INFO = [] #self.request.sendall(sock_data) time.sleep(0.15) def receive_from_client(self): data = bytes() while self.connection_alive: # BrokenPipeError: [Errno 32] Broken pipe chunk = self.request.recv(2) data += chunk if chunk == b'': #print("connection broken. Closing...") self.connection_alive = False if data.startswith(b'{"type"') and data.endswith(b'}\n'): data = data[:-1] # remove b'\n' print(data) if self.server.server_address[1] == static.PORT: process_tnc_commands(data) else: process_daemon_commands(data) data = bytes() def handle(self): CONNECTED_CLIENTS.add(self.request) structlog.get_logger("structlog").debug("[TNC] Client connected", ip=self.client_address[0], port=self.client_address[1]) self.connection_alive = True self.sendThread = threading.Thread(target=self.send_to_client, args=[]).start() self.receiveThread = threading.Thread(target=self.receive_from_client, args=[]).start() # keep connection alive until we close it while self.connection_alive: time.sleep(1) def finish(self): structlog.get_logger("structlog").warning("[TNC] Closing client socket", ip=self.client_address[0], port=self.client_address[1]) CONNECTED_CLIENTS.remove(self.request) print(CONNECTED_CLIENTS) def process_tnc_commands(data): # we need to do some error handling in case of socket timeout or decoding issue try: # convert data to json object received_json = json.loads(data) # CQ CQ CQ ----------------------------------------------------- if received_json["command"] == "CQCQCQ": data_handler.DATA_QUEUE_TRANSMIT.put(['CQ']) # START_BEACON ----------------------------------------------------- if received_json["command"] == "START_BEACON": static.BEACON_STATE = True interval = int(received_json["parameter"]) data_handler.DATA_QUEUE_TRANSMIT.put(['BEACON', interval, True]) # STOP_BEACON ----------------------------------------------------- if received_json["command"] == "STOP_BEACON": static.BEACON_STATE = False structlog.get_logger("structlog").warning("[TNC] Stopping beacon!") data_handler.DATA_QUEUE_TRANSMIT.put(['BEACON', None, False]) # PING ---------------------------------------------------------- if received_json["type"] == 'PING' and received_json["command"] == "PING": # send ping frame and wait for ACK dxcallsign = received_json["dxcallsign"] data_handler.DATA_QUEUE_TRANSMIT.put(['PING', dxcallsign]) # TRANSMIT FILE ---------------------------------------------------------- if received_json["type"] == 'ARQ' and received_json["command"] == "sendFile": static.TNC_STATE = 'BUSY' # on a new transmission we reset the timer static.ARQ_START_OF_TRANSMISSION = int(time.time()) dxcallsign = received_json["dxcallsign"] mode = int(received_json["mode"]) n_frames = int(received_json["n_frames"]) filename = received_json["filename"] filetype = received_json["filetype"] data = received_json["data"] checksum = received_json["checksum"] static.DXCALLSIGN = bytes(dxcallsign, 'utf-8') static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) # dt = datatype # --> f = file # --> m = message # fn = filename # ft = filetype # d = data # crc = checksum rawdata = {"dt": "f", "fn": filename, "ft": filetype,"d": data, "crc": checksum} dataframe = json.dumps(rawdata) data_out = bytes(dataframe, 'utf-8') print("kommen wir hier an?!?") data_handler.DATA_QUEUE_TRANSMIT.put(['ARQ_FILE', data_out, mode, n_frames]) print(data_handler.DATA_QUEUE_TRANSMIT.qsize()) # TRANSMIT MESSAGE ---------------------------------------------------------- if received_json["type"] == 'ARQ' and received_json["command"] == "sendMessage": static.TNC_STATE = 'BUSY' print(received_json) # on a new transmission we reset the timer static.ARQ_START_OF_TRANSMISSION = int(time.time()) dxcallsign = received_json["dxcallsign"] mode = int(received_json["mode"]) n_frames = int(received_json["n_frames"]) data = received_json["data"] # d = data checksum = received_json["checksum"] # crc = checksum static.DXCALLSIGN = bytes(dxcallsign, 'utf-8') static.DXCALLSIGN_CRC = helpers.get_crc_16(static.DXCALLSIGN) # dt = datatype # --> f = file # --> m = message # fn = filename # ft = filetype # d = data # crc = checksum rawdata = {"dt": "m","d": data, "crc": checksum} dataframe = json.dumps(rawdata) data_out = bytes(dataframe, 'utf-8') data_handler.DATA_QUEUE_TRANSMIT.put(['ARQ_MESSAGE', data_out, mode, n_frames]) # STOP TRANSMISSION ---------------------------------------------------------- if received_json["type"] == 'ARQ' and received_json["command"] == "stopTransmission": data_handler.DATA_QUEUE_TRANSMIT.put(['STOP']) structlog.get_logger("structlog").warning("[TNC] Stopping transmission!") static.TNC_STATE = 'IDLE' static.ARQ_STATE = False if received_json["type"] == 'GET' and received_json["command"] == 'RX_BUFFER': output = { "COMMAND": "RX_BUFFER", "DATA-ARRAY": [], "EOF": "EOF", } for i in range(0, len(static.RX_BUFFER)): rawdata = json.loads(static.RX_BUFFER[i][3]) output["DATA-ARRAY"].append({"DXCALLSIGN": str(static.RX_BUFFER[i][0], 'utf-8'), "DXGRID": str(static.RX_BUFFER[i][1], 'utf-8'), "TIMESTAMP": static.RX_BUFFER[i][2], "RXDATA": [rawdata]}) jsondata = json.dumps(output) #self.request.sendall(bytes(jsondata, encoding)) SOCKET_QUEUE.put(jsondata) if received_json["type"] == 'GET' and received_json["command"] == 'RX_MSG_BUFFER': output = { "COMMAND": "RX_MSG_BUFFER", "DATA-ARRAY": [], "EOF": "EOF", } for i in range(0, len(static.RX_MSG_BUFFER)): rawdata = json.loads(static.RX_MSG_BUFFER[i][3]) output["DATA-ARRAY"].append({"DXCALLSIGN": str(static.RX_MSG_BUFFER[i][0], 'utf-8'), "DXGRID": str(static.RX_MSG_BUFFER[i][1], 'utf-8'), "TIMESTAMP": static.RX_MSG_BUFFER[i][2], "RXDATA": [rawdata]}) jsondata = json.dumps(output) #self.request.sendall(bytes(jsondata, encoding)) SOCKET_QUEUE.put(jsondata) if received_json["type"] == 'SET' and received_json["command"] == 'DEL_RX_BUFFER': static.RX_BUFFER = [] if received_json["type"] == 'SET' and received_json["command"] == 'DEL_RX_MSG_BUFFER': static.RX_MSG_BUFFER = [] # exception, if JSON cant be decoded except Exception as e: structlog.get_logger("structlog").error("[TNC] Network error", e=e) def send_tnc_state(): encoding = 'utf-8' output = { "COMMAND": "TNC_STATE", "PTT_STATE": str(static.PTT_STATE), "TNC_STATE": str(static.TNC_STATE), "ARQ_STATE": str(static.ARQ_STATE), "AUDIO_RMS": str(static.AUDIO_RMS), "SNR": str(static.SNR), "FREQUENCY": str(static.HAMLIB_FREQUENCY), "MODE": str(static.HAMLIB_MODE), "BANDWITH": str(static.HAMLIB_BANDWITH), "FFT": str(static.FFT), "SCATTER": static.SCATTER, "RX_BUFFER_LENGTH": str(len(static.RX_BUFFER)), "RX_MSG_BUFFER_LENGTH": str(len(static.RX_MSG_BUFFER)), "ARQ_BYTES_PER_MINUTE": str(static.ARQ_BYTES_PER_MINUTE), "ARQ_BYTES_PER_MINUTE_BURST": str(static.ARQ_BYTES_PER_MINUTE_BURST), "ARQ_COMPRESSION_FACTOR": str(static.ARQ_COMPRESSION_FACTOR), "ARQ_TRANSMISSION_PERCENT": str(static.ARQ_TRANSMISSION_PERCENT), "TOTAL_BYTES": str(static.TOTAL_BYTES), "INFO" : static.INFO, "BEACON_STATE" : str(static.BEACON_STATE), "STATIONS": [], "MY_CALLSIGN": str(static.MYCALLSIGN, encoding), "DX_CALLSIGN": str(static.DXCALLSIGN, encoding), "DX_GRID": str(static.DXGRID, encoding), "EOF": "EOF", } # add heard stations to heard stations object for i in range(0, len(static.HEARD_STATIONS)): output["STATIONS"].append({"DXCALLSIGN": str(static.HEARD_STATIONS[i][0], 'utf-8'), "DXGRID": str(static.HEARD_STATIONS[i][1], 'utf-8'),"TIMESTAMP": static.HEARD_STATIONS[i][2], "DATATYPE": static.HEARD_STATIONS[i][3], "SNR": static.HEARD_STATIONS[i][4], "OFFSET": static.HEARD_STATIONS[i][5], "FREQUENCY": static.HEARD_STATIONS[i][6]}) jsondata = json.dumps(output) return jsondata def process_daemon_commands(data): # convert data to json object received_json = json.loads(data) if received_json["type"] == 'SET' and received_json["command"] == 'MYCALLSIGN': callsign = received_json["parameter"] print(received_json) if bytes(callsign, 'utf-8') == b'': self.request.sendall(b'INVALID CALLSIGN') structlog.get_logger("structlog").warning("[DMN] SET MYCALL FAILED", call=static.MYCALLSIGN, crc=static.MYCALLSIGN_CRC) else: static.MYCALLSIGN = bytes(callsign, 'utf-8') static.MYCALLSIGN_CRC = helpers.get_crc_16(static.MYCALLSIGN) structlog.get_logger("structlog").info("[DMN] SET MYCALL", call=static.MYCALLSIGN, crc=static.MYCALLSIGN_CRC) if received_json["type"] == 'SET' and received_json["command"] == 'MYGRID': mygrid = received_json["parameter"] if bytes(mygrid, 'utf-8') == b'': self.request.sendall(b'INVALID GRID') else: static.MYGRID = bytes(mygrid, 'utf-8') structlog.get_logger("structlog").info("[DMN] SET MYGRID", grid=static.MYGRID) if received_json["type"] == 'SET' and received_json["command"] == 'STARTTNC' and not static.TNCSTARTED: mycall = str(received_json["parameter"][0]["mycall"]) mygrid = str(received_json["parameter"][0]["mygrid"]) rx_audio = str(received_json["parameter"][0]["rx_audio"]) tx_audio = str(received_json["parameter"][0]["tx_audio"]) devicename = str(received_json["parameter"][0]["devicename"]) deviceport = str(received_json["parameter"][0]["deviceport"]) serialspeed = str(received_json["parameter"][0]["serialspeed"]) pttprotocol = str(received_json["parameter"][0]["pttprotocol"]) pttport = str(received_json["parameter"][0]["pttport"]) data_bits = str(received_json["parameter"][0]["data_bits"]) stop_bits = str(received_json["parameter"][0]["stop_bits"]) handshake = str(received_json["parameter"][0]["handshake"]) radiocontrol = str(received_json["parameter"][0]["radiocontrol"]) rigctld_ip = str(received_json["parameter"][0]["rigctld_ip"]) rigctld_port = str(received_json["parameter"][0]["rigctld_port"]) DAEMON_QUEUE.put(['STARTTNC', \ mycall, \ mygrid, \ rx_audio, \ tx_audio, \ devicename, \ deviceport, \ serialspeed, \ pttprotocol, \ pttport, \ data_bits, \ stop_bits, \ handshake, \ radiocontrol, \ rigctld_ip, \ rigctld_port \ ]) if received_json["type"] == 'GET' and received_json["command"] == 'TEST_HAMLIB': devicename = str(received_json["parameter"][0]["devicename"]) deviceport = str(received_json["parameter"][0]["deviceport"]) serialspeed = str(received_json["parameter"][0]["serialspeed"]) pttprotocol = str(received_json["parameter"][0]["pttprotocol"]) pttport = str(received_json["parameter"][0]["pttport"]) data_bits = str(received_json["parameter"][0]["data_bits"]) stop_bits = str(received_json["parameter"][0]["stop_bits"]) handshake = str(received_json["parameter"][0]["handshake"]) radiocontrol = str(received_json["parameter"][0]["radiocontrol"]) rigctld_ip = str(received_json["parameter"][0]["rigctld_ip"]) rigctld_port = str(received_json["parameter"][0]["rigctld_port"]) DAEMON_QUEUE.put(['TEST_HAMLIB', \ devicename, \ deviceport, \ serialspeed, \ pttprotocol, \ pttport, \ data_bits, \ stop_bits, \ handshake, \ radiocontrol, \ rigctld_ip, \ rigctld_port \ ]) if received_json["type"] == 'SET' and received_json["command"] == 'STOPTNC': static.TNCPROCESS.kill() structlog.get_logger("structlog").warning("[DMN] Stopping TNC") static.TNCSTARTED = False def send_daemon_state(): python_version = str(sys.version_info[0]) + "." + str(sys.version_info[1]) output = { 'COMMAND': 'DAEMON_STATE', 'DAEMON_STATE': [], 'PYTHON_VERSION': str(python_version), 'HAMLIB_VERSION': static.HAMLIB_VERSION, 'INPUT_DEVICES': static.AUDIO_INPUT_DEVICES, 'OUTPUT_DEVICES': static.AUDIO_OUTPUT_DEVICES, 'SERIAL_DEVICES': static.SERIAL_DEVICES, 'CPU': str(psutil.cpu_percent()), 'RAM': str(psutil.virtual_memory().percent), 'VERSION': '0.1' } if static.TNCSTARTED: output["DAEMON_STATE"].append({"STATUS": "running"}) else: output["DAEMON_STATE"].append({"STATUS": "stopped"}) jsondata = json.dumps(output) return jsondata