FreeDATA/tnc/sock.py

336 lines
15 KiB
Python
Raw Normal View History

2021-02-16 13:23:57 +00:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Fri Dec 25 21:25:14 2020
@author: DJ2LS
2021-08-08 09:08:34 +00:00
# GET COMMANDS
# "command" : "..."
# SET COMMANDS
# "command" : "..."
# "parameter" : " ..."
# DATA COMMANDS
# "command" : "..."
# "type" : "..."
# "dxcallsign" : "..."
# "data" : "..."
2021-02-16 13:23:57 +00:00
"""
import socketserver
import threading
import logging
2021-08-23 16:14:00 +00:00
import ujson as json
#import json
import asyncio
2021-03-17 10:22:06 +00:00
import time
2021-02-16 13:23:57 +00:00
import static
2021-02-24 13:22:28 +00:00
import data_handler
2021-12-29 19:54:22 +00:00
2021-02-16 19:49:02 +00:00
import helpers
2021-09-25 13:24:25 +00:00
import sys
import os
2021-11-18 18:40:22 +00:00
import logging, structlog, log_handler
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
2021-09-25 13:24:25 +00:00
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
2021-02-16 18:39:08 +00:00
def handle(self):
2021-09-26 15:51:11 +00:00
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").debug("[TNC] Client connected", ip=self.client_address[0])
2021-09-25 13:24:25 +00:00
# loop through socket buffer until timeout is reached. then close buffer
2021-08-08 09:43:09 +00:00
socketTimeout = time.time() + 3
while socketTimeout > time.time():
2021-05-29 14:57:31 +00:00
2021-05-28 18:54:23 +00:00
time.sleep(0.01)
encoding = 'utf-8'
#data = str(self.request.recv(1024), 'utf-8')
data = bytes()
2021-09-25 13:24:25 +00:00
# we need to loop through buffer until end of chunk is reached or timeout occured
while True and socketTimeout > time.time():
chunk = self.request.recv(71) # we keep amount of bytes short
2021-05-28 18:54:23 +00:00
data += chunk
2021-09-25 13:24:25 +00:00
# or chunk.endswith(b'\n'):
if chunk.endswith(b'}\n') or chunk.endswith(b'}'):
2021-05-28 18:54:23 +00:00
break
data = data[:-1] # remove b'\n'
data = str(data, 'utf-8')
if len(data) > 0:
socketTimeout = time.time() + static.SOCKET_TIMEOUT
2021-09-25 13:24:25 +00:00
2021-05-28 18:54:23 +00:00
# convert data to json object
# we need to do some error handling in case of socket timeout or decoding issue
try:
# only read first line of string. multiple lines will cause an json error
# this occurs possibly, if we are getting data too fast
# data = data.splitlines()[0]
# IndexError: list index out of range
data = data.splitlines()[0]
received_json = json.loads(data)
2021-08-08 09:08:34 +00:00
2021-09-25 13:24:25 +00:00
# except ValueError as e:
# print("++++++++++++ START OF JSON ERROR +++++++++++++++++++++++")
# print(e)
# print("-----------------------------------")
# print(data)
# print("++++++++++++ END OF JSON ERROR +++++++++++++++++++++++++")
# received_json = {}
# break
2021-09-25 13:24:25 +00:00
# try:
2021-05-29 14:57:31 +00:00
# CQ CQ CQ -----------------------------------------------------
if received_json["command"] == "CQCQCQ":
#socketTimeout = 0
2021-09-25 13:24:25 +00:00
# asyncio.run(data_handler.transmit_cq())
CQ_THREAD = threading.Thread(target=data_handler.transmit_cq, args=[], name="CQ")
CQ_THREAD.start()
2021-05-29 14:57:31 +00:00
2021-12-25 16:05:38 +00:00
# START_BEACON -----------------------------------------------------
if received_json["command"] == "START_BEACON":
static.BEACON_STATE = True
interval = int(received_json["parameter"])
BEACON_THREAD = threading.Thread(target=data_handler.run_beacon, args=[interval], name="START BEACON")
BEACON_THREAD.start()
2021-12-25 16:05:38 +00:00
# STOP_BEACON -----------------------------------------------------
if received_json["command"] == "STOP_BEACON":
static.BEACON_STATE = False
structlog.get_logger("structlog").warning("[TNC] Stopping beacon!")
2021-05-29 14:57:31 +00:00
# PING ----------------------------------------------------------
if received_json["type"] == 'PING' and received_json["command"] == "PING":
2021-05-29 14:57:31 +00:00
# send ping frame and wait for ACK
dxcallsign = received_json["dxcallsign"]
2021-09-25 13:24:25 +00:00
# asyncio.run(data_handler.transmit_ping(dxcallsign))
PING_THREAD = threading.Thread(target=data_handler.transmit_ping, args=[dxcallsign], name="PING")
PING_THREAD.start()
2021-05-29 14:57:31 +00:00
2021-09-25 13:24:25 +00:00
if received_json["type"] == 'ARQ' and received_json["command"] == "sendFile":
2021-05-29 14:57:31 +00:00
static.TNC_STATE = 'BUSY'
2021-09-25 13:24:25 +00:00
# on a new transmission we reset the timer
2021-05-29 14:57:31 +00:00
static.ARQ_START_OF_TRANSMISSION = int(time.time())
2021-09-25 13:24:25 +00:00
2021-05-29 14:57:31 +00:00
dxcallsign = received_json["dxcallsign"]
2021-07-28 16:43:41 +00:00
mode = int(received_json["mode"])
n_frames = int(received_json["n_frames"])
filename = received_json["filename"]
filetype = received_json["filetype"]
data = received_json["data"]
checksum = received_json["checksum"]
2021-09-26 15:51:11 +00:00
2021-09-25 13:24:25 +00:00
2021-05-29 14:57:31 +00:00
static.DXCALLSIGN = bytes(dxcallsign, 'utf-8')
2021-09-25 13:24:25 +00:00
static.DXCALLSIGN_CRC8 = helpers.get_crc_8(
static.DXCALLSIGN)
# dt = datatype
# --> f = file
# --> m = message
# fn = filename
# ft = filetype
# d = data
# crc = checksum
rawdata = {"dt": "f", "fn": filename, "ft": filetype,"d": data, "crc": checksum}
2021-08-18 18:03:38 +00:00
dataframe = json.dumps(rawdata)
2021-07-28 16:43:41 +00:00
data_out = bytes(dataframe, 'utf-8')
2021-09-08 16:04:21 +00:00
2021-05-29 14:57:31 +00:00
ARQ_DATA_THREAD = threading.Thread(target=data_handler.open_dc_and_transmit, args=[data_out, mode, n_frames], name="ARQ_DATA")
ARQ_DATA_THREAD.start()
# asyncio.run(data_handler.arq_transmit(data_out))
2021-11-19 16:30:17 +00:00
# send message
if received_json["type"] == 'ARQ' and received_json["command"] == "sendMessage":
static.TNC_STATE = 'BUSY'
print(received_json)
# on a new transmission we reset the timer
static.ARQ_START_OF_TRANSMISSION = int(time.time())
dxcallsign = received_json["dxcallsign"]
mode = int(received_json["mode"])
n_frames = int(received_json["n_frames"])
data = received_json["d"] # d = data
checksum = received_json["crc"] # crc = checksum
2021-11-19 16:30:17 +00:00
2021-05-29 14:57:31 +00:00
2021-11-19 16:30:17 +00:00
static.DXCALLSIGN = bytes(dxcallsign, 'utf-8')
static.DXCALLSIGN_CRC8 = helpers.get_crc_8(static.DXCALLSIGN)
# dt = datatype
# --> f = file
# --> m = message
# fn = filename
# ft = filetype
# d = data
# crc = checksum
rawdata = {"dt": "m","d": data, "crc": checksum}
2021-11-19 16:30:17 +00:00
dataframe = json.dumps(rawdata)
data_out = bytes(dataframe, 'utf-8')
ARQ_DATA_THREAD = threading.Thread(target=data_handler.open_dc_and_transmit, args=[data_out, mode, n_frames], name="ARQ_DATA")
ARQ_DATA_THREAD.start()
if received_json["type"] == 'ARQ' and received_json["command"] == "stopTransmission":
print(" >>> STOPPING TRANSMISSION <<<")
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").warning("[TNC] Stopping transmission!")
static.TNC_STATE = 'IDLE'
static.ARQ_STATE = False
2021-05-29 14:57:31 +00:00
# SETTINGS AND STATUS ---------------------------------------------
2021-11-18 18:40:22 +00:00
2021-09-25 13:24:25 +00:00
2021-05-29 14:57:31 +00:00
if received_json["type"] == 'GET' and received_json["command"] == 'STATION_INFO':
output = {
"COMMAND": "STATION_INFO",
2021-09-25 13:24:25 +00:00
"TIMESTAMP": received_json["timestamp"],
2021-05-29 14:57:31 +00:00
"MY_CALLSIGN": str(static.MYCALLSIGN, encoding),
"DX_CALLSIGN": str(static.DXCALLSIGN, encoding),
2021-08-06 20:09:16 +00:00
"DX_GRID": str(static.DXGRID, encoding),
2021-09-25 13:24:25 +00:00
"EOF": "EOF",
2021-05-29 14:57:31 +00:00
}
2021-09-25 13:24:25 +00:00
2021-05-29 14:57:31 +00:00
jsondata = json.dumps(output)
self.request.sendall(bytes(jsondata, encoding))
if received_json["type"] == 'GET' and received_json["command"] == 'TNC_STATE':
2021-09-26 15:51:11 +00:00
2021-05-29 14:57:31 +00:00
output = {
"COMMAND": "TNC_STATE",
2021-09-25 13:24:25 +00:00
"TIMESTAMP": received_json["timestamp"],
2021-05-29 14:57:31 +00:00
"PTT_STATE": str(static.PTT_STATE),
#"CHANNEL_STATE": str(static.CHANNEL_STATE),
2021-05-29 14:57:31 +00:00
"TNC_STATE": str(static.TNC_STATE),
"ARQ_STATE": str(static.ARQ_STATE),
"AUDIO_RMS": str(static.AUDIO_RMS),
2021-05-29 20:18:11 +00:00
"SNR": str(static.SNR),
2021-09-25 13:24:25 +00:00
"FREQUENCY": str(static.HAMLIB_FREQUENCY),
"MODE": str(static.HAMLIB_MODE),
"BANDWITH": str(static.HAMLIB_BANDWITH),
"FFT": str(static.FFT),
"SCATTER": static.SCATTER,
"RX_BUFFER_LENGTH": str(len(static.RX_BUFFER)),
"RX_MSG_BUFFER_LENGTH": str(len(static.RX_MSG_BUFFER)),
2021-09-25 13:24:25 +00:00
"ARQ_BYTES_PER_MINUTE": str(static.ARQ_BYTES_PER_MINUTE),
"ARQ_BYTES_PER_MINUTE_BURST": str(static.ARQ_BYTES_PER_MINUTE_BURST),
"ARQ_COMPRESSION_FACTOR": str(static.ARQ_COMPRESSION_FACTOR),
2021-09-25 13:24:25 +00:00
"ARQ_TRANSMISSION_PERCENT": str(static.ARQ_TRANSMISSION_PERCENT),
"TOTAL_BYTES": str(static.TOTAL_BYTES),
"INFO" : static.INFO,
"BEACON_STATE" : str(static.BEACON_STATE),
2021-09-25 13:24:25 +00:00
"STATIONS": [],
"EOF": "EOF",
}
2021-09-25 13:24:25 +00:00
# we want to transmit scatter data only once to reduce network traffic
static.SCATTER = []
# we want to display INFO messages only once
static.INFO = []
# add heard stations to heard stations object
for i in range(0, len(static.HEARD_STATIONS)):
output["STATIONS"].append({"DXCALLSIGN": str(static.HEARD_STATIONS[i][0], 'utf-8'), "DXGRID": str(static.HEARD_STATIONS[i][1], 'utf-8'),"TIMESTAMP": static.HEARD_STATIONS[i][2], "DATATYPE": static.HEARD_STATIONS[i][3], "SNR": static.HEARD_STATIONS[i][4], "OFFSET": static.HEARD_STATIONS[i][5], "FREQUENCY": static.HEARD_STATIONS[i][6]})
2021-09-25 13:24:25 +00:00
try:
2021-08-23 16:14:00 +00:00
jsondata = json.dumps(output)
except ValueError as e:
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").error(e, data=jsondata)
2021-09-25 13:24:25 +00:00
2021-08-23 16:14:00 +00:00
try:
self.request.sendall(bytes(jsondata, encoding))
except Exception as e:
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").error(e, data=jsondata)
2021-07-25 16:00:18 +00:00
2021-05-29 14:57:31 +00:00
if received_json["type"] == 'GET' and received_json["command"] == 'RX_BUFFER':
2021-08-15 16:15:08 +00:00
output = {
"COMMAND": "RX_BUFFER",
2021-09-25 13:24:25 +00:00
"DATA-ARRAY": [],
"EOF": "EOF",
2021-08-15 16:15:08 +00:00
}
for i in range(0, len(static.RX_BUFFER)):
2021-09-25 13:24:25 +00:00
2021-08-17 18:00:57 +00:00
rawdata = json.loads(static.RX_BUFFER[i][3])
2021-09-25 13:24:25 +00:00
output["DATA-ARRAY"].append({"DXCALLSIGN": str(static.RX_BUFFER[i][0], 'utf-8'), "DXGRID": str(static.RX_BUFFER[i][1], 'utf-8'), "TIMESTAMP": static.RX_BUFFER[i][2], "RXDATA": [rawdata]})
2021-08-17 18:00:57 +00:00
jsondata = json.dumps(output)
self.request.sendall(bytes(jsondata, encoding))
2021-05-29 14:57:31 +00:00
if received_json["type"] == 'GET' and received_json["command"] == 'RX_MSG_BUFFER':
output = {
"COMMAND": "RX_MSG_BUFFER",
"DATA-ARRAY": [],
"EOF": "EOF",
}
for i in range(0, len(static.RX_MSG_BUFFER)):
rawdata = json.loads(static.RX_MSG_BUFFER[i][3])
output["DATA-ARRAY"].append({"DXCALLSIGN": str(static.RX_MSG_BUFFER[i][0], 'utf-8'), "DXGRID": str(static.RX_MSG_BUFFER[i][1], 'utf-8'), "TIMESTAMP": static.RX_MSG_BUFFER[i][2], "RXDATA": [rawdata]})
jsondata = json.dumps(output)
self.request.sendall(bytes(jsondata, encoding))
2021-05-29 14:57:31 +00:00
if received_json["type"] == 'SET' and received_json["command"] == 'DEL_RX_BUFFER':
static.RX_BUFFER = []
if received_json["type"] == 'SET' and received_json["command"] == 'DEL_RX_MSG_BUFFER':
static.RX_MSG_BUFFER = []
2021-09-25 13:24:25 +00:00
# exception, if JSON cant be decoded
# except Exception as e:
2021-09-10 15:59:01 +00:00
except:
2021-08-08 09:08:34 +00:00
print("############ START OF ERROR #####################")
print("SOCKET COMMAND ERROR: " + data)
2021-09-10 15:59:01 +00:00
e = sys.exc_info()[0]
2021-08-08 09:08:34 +00:00
print(e)
2021-07-25 14:34:28 +00:00
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
2021-08-08 09:08:34 +00:00
print(exc_type, fname, exc_tb.tb_lineno)
print("############ END OF ERROR #######################")
2022-01-02 23:27:05 +00:00
structlog.get_logger("structlog").warning("[TNC] reset of tcp/ip connection...")
2021-09-25 13:24:25 +00:00
#socketTimeout = 0
2021-02-16 13:36:01 +00:00
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").error("[TNC] Network error", e = sys.exc_info()[0])
structlog.get_logger("structlog").warning("[TNC] Closing client socket")
2021-09-25 13:24:25 +00:00
2021-11-18 18:40:22 +00:00
def start_cmd_socket():
try:
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").info("[TNC] Starting TCP/IP socket", port=static.PORT)
2021-09-25 13:24:25 +00:00
# https://stackoverflow.com/a/16641793
socketserver.TCPServer.allow_reuse_address = True
2021-12-28 16:05:48 +00:00
cmdserver = ThreadedTCPServer((static.HOST, static.PORT), ThreadedTCPRequestHandler)
server_thread = threading.Thread(target=cmdserver.serve_forever)
server_thread.daemon = True
2021-09-25 13:24:25 +00:00
server_thread.start()
2021-03-12 13:14:36 +00:00
except:
2021-11-18 18:40:22 +00:00
structlog.get_logger("structlog").error("[TNC] Starting TCP/IP socket failed", port=static.PORT)
e = sys.exc_info()[0]
print(e)
exc_type, exc_obj, exc_tb = sys.exc_info()
fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
print(exc_type, fname, exc_tb.tb_lineno)