mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
websocket non blocking multi client support
This commit is contained in:
parent
d92c4a4a90
commit
9d8c736917
|
@ -285,8 +285,10 @@ class DATA:
|
||||||
def worker_transmit(self) -> None:
|
def worker_transmit(self) -> None:
|
||||||
"""Dispatch incoming UI instructions for transmitting operations"""
|
"""Dispatch incoming UI instructions for transmitting operations"""
|
||||||
while True:
|
while True:
|
||||||
data = self.data_queue_transmit.get()
|
print("ja?")
|
||||||
|
|
||||||
|
data = self.data_queue_transmit.get()
|
||||||
|
print(data)
|
||||||
# if we are already in ARQ_STATE, or we're receiving codec2 traffic
|
# if we are already in ARQ_STATE, or we're receiving codec2 traffic
|
||||||
# let's wait with processing data
|
# let's wait with processing data
|
||||||
# this should avoid weird toggle states where both stations
|
# this should avoid weird toggle states where both stations
|
||||||
|
@ -363,7 +365,7 @@ class DATA:
|
||||||
self.log.error(
|
self.log.error(
|
||||||
"[Modem] worker_transmit: received invalid command:", data=data
|
"[Modem] worker_transmit: received invalid command:", data=data
|
||||||
)
|
)
|
||||||
|
print("jaaaa")
|
||||||
def worker_receive(self) -> None:
|
def worker_receive(self) -> None:
|
||||||
"""Queue received data for processing"""
|
"""Queue received data for processing"""
|
||||||
while True:
|
while True:
|
||||||
|
@ -2856,7 +2858,6 @@ class DATA:
|
||||||
else:
|
else:
|
||||||
self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0)
|
self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0)
|
||||||
|
|
||||||
|
|
||||||
def received_cq(self, data_in: bytes, snr) -> None:
|
def received_cq(self, data_in: bytes, snr) -> None:
|
||||||
"""
|
"""
|
||||||
Called when we receive a CQ frame
|
Called when we receive a CQ frame
|
||||||
|
|
142
modem/server.py
142
modem/server.py
|
@ -9,15 +9,17 @@ import queue
|
||||||
import server_commands
|
import server_commands
|
||||||
import service_manager
|
import service_manager
|
||||||
import state_manager
|
import state_manager
|
||||||
import explorer
|
|
||||||
from static import Modem as modeminfo
|
from static import Modem as modeminfo
|
||||||
|
import threading
|
||||||
|
import ujson as json
|
||||||
|
|
||||||
app = Flask(__name__)
|
app = Flask(__name__)
|
||||||
CORS(app)
|
CORS(app)
|
||||||
CORS(app, resources={r"/*": {"origins": "*"}})
|
CORS(app, resources={r"/*": {"origins": "*"}})
|
||||||
sock = Sock(app)
|
sock = Sock(app)
|
||||||
|
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 5}
|
||||||
|
print(app.config)
|
||||||
|
print(app.config['SOCK_SERVER_OPTIONS'])
|
||||||
# set config file to use
|
# set config file to use
|
||||||
def set_config():
|
def set_config():
|
||||||
if 'FREEDATA_CONFIG' in os.environ:
|
if 'FREEDATA_CONFIG' in os.environ:
|
||||||
|
@ -26,9 +28,9 @@ def set_config():
|
||||||
config_file = 'config.ini'
|
config_file = 'config.ini'
|
||||||
|
|
||||||
if os.path.exists(config_file):
|
if os.path.exists(config_file):
|
||||||
print("Using config from %s" % config_file)
|
print(f"Using config from {config_file}")
|
||||||
else:
|
else:
|
||||||
print("Config file '%s' not found. Exiting." % config_file)
|
print(f"Config file '{config_file}' not found. Exiting.")
|
||||||
exit(1)
|
exit(1)
|
||||||
|
|
||||||
app.config_manager = CONFIG(config_file)
|
app.config_manager = CONFIG(config_file)
|
||||||
|
@ -36,7 +38,7 @@ def set_config():
|
||||||
set_config()
|
set_config()
|
||||||
|
|
||||||
# start modem
|
# start modem
|
||||||
app.state_queue = queue.Queue() # queue which holds latest events
|
app.state_queue = queue.Queue() # queue which holds latest states
|
||||||
app.modem_events = queue.Queue() # queue which holds latest events
|
app.modem_events = queue.Queue() # queue which holds latest events
|
||||||
app.modem_fft = queue.Queue() # queue which holds lates fft data
|
app.modem_fft = queue.Queue() # queue which holds lates fft data
|
||||||
app.modem_service = queue.Queue() # start / stop modem service
|
app.modem_service = queue.Queue() # start / stop modem service
|
||||||
|
@ -94,42 +96,48 @@ def get_serial_devices():
|
||||||
def post_cqcqcq():
|
def post_cqcqcq():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for triggering a CQ via POST"})
|
return api_response({"info": "endpoint for triggering a CQ via POST"})
|
||||||
server_commands.cqcqcq()
|
if app.states.is_modem_running:
|
||||||
|
server_commands.cqcqcq()
|
||||||
return api_response({"cmd": "cqcqcq"})
|
return api_response({"cmd": "cqcqcq"})
|
||||||
|
|
||||||
@app.route('/modem/beacon', methods=['POST'])
|
@app.route('/modem/beacon', methods=['POST'])
|
||||||
def post_beacon():
|
def post_beacon():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for controlling BEACON STATE via POST"})
|
return api_response({"info": "endpoint for controlling BEACON STATE via POST"})
|
||||||
server_commands.beacon(request.json)
|
if app.states.is_modem_running:
|
||||||
|
server_commands.beacon(request.json)
|
||||||
return api_response(request.json)
|
return api_response(request.json)
|
||||||
|
|
||||||
@app.route('/modem/ping_ping', methods=['POST'])
|
@app.route('/modem/ping_ping', methods=['POST'])
|
||||||
def post_ping():
|
def post_ping():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for controlling PING via POST"})
|
return api_response({"info": "endpoint for controlling PING via POST"})
|
||||||
server_commands.ping_ping(request.json)
|
if app.states.is_modem_running:
|
||||||
|
server_commands.ping_ping(request.json)
|
||||||
return api_response(request.json)
|
return api_response(request.json)
|
||||||
|
|
||||||
@app.route('/modem/send_test_frame', methods=['POST'])
|
@app.route('/modem/send_test_frame', methods=['POST'])
|
||||||
def post_send_test_frame():
|
def post_send_test_frame():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for triggering a TEST_FRAME via POST"})
|
return api_response({"info": "endpoint for triggering a TEST_FRAME via POST"})
|
||||||
server_commands.modem_send_test_frame()
|
if app.states.is_modem_running:
|
||||||
|
server_commands.modem_send_test_frame()
|
||||||
return api_response({"cmd": "test_frame"})
|
return api_response({"cmd": "test_frame"})
|
||||||
|
|
||||||
@app.route('/modem/fec_transmit', methods=['POST'])
|
@app.route('/modem/fec_transmit', methods=['POST'])
|
||||||
def post_send_fec_frame():
|
def post_send_fec_frame():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for triggering a FEC frame via POST"})
|
return api_response({"info": "endpoint for triggering a FEC frame via POST"})
|
||||||
server_commands.modem_fec_transmit(request.json)
|
if app.states.is_modem_running:
|
||||||
|
server_commands.modem_fec_transmit(request.json)
|
||||||
return api_response(request.json)
|
return api_response(request.json)
|
||||||
|
|
||||||
@app.route('/modem/fec_is_writing', methods=['POST'])
|
@app.route('/modem/fec_is_writing', methods=['POST'])
|
||||||
def post_send_fec_is_writing():
|
def post_send_fec_is_writing():
|
||||||
if request.method not in ['POST']:
|
if request.method not in ['POST']:
|
||||||
return api_response({"info": "endpoint for triggering a IS WRITING frame via POST"})
|
return api_response({"info": "endpoint for triggering a IS WRITING frame via POST"})
|
||||||
server_commands.modem_fec_is_writing(request.json)
|
if app.states.is_modem_running:
|
||||||
|
server_commands.modem_fec_is_writing(request.json)
|
||||||
return api_response(request.json)
|
return api_response(request.json)
|
||||||
|
|
||||||
@app.route('/modem/start', methods=['POST'])
|
@app.route('/modem/start', methods=['POST'])
|
||||||
|
@ -158,66 +166,74 @@ def get_modem_version():
|
||||||
# @app.route('/modem/arq_disconnect', methods=['POST'])
|
# @app.route('/modem/arq_disconnect', methods=['POST'])
|
||||||
# @app.route('/modem/send_raw', methods=['POST'])
|
# @app.route('/modem/send_raw', methods=['POST'])
|
||||||
# @app.route('/modem/stop_transmission', methods=['POST'])
|
# @app.route('/modem/stop_transmission', methods=['POST'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# our client set which contains all connected websocket clients
|
|
||||||
client_list = set()
|
|
||||||
# our transmit function which also handles client management
|
|
||||||
def transmit_sock_data(data):
|
|
||||||
try:
|
|
||||||
for client in client_list:
|
|
||||||
try:
|
|
||||||
client.send(data)
|
|
||||||
except Exception:
|
|
||||||
# print("client not connected anymore")
|
|
||||||
client_list.remove(client)
|
|
||||||
except RuntimeError:
|
|
||||||
# print("set changed during iteration")
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Event websocket
|
|
||||||
@sock.route('/events')
|
|
||||||
def sock_events(sock):
|
|
||||||
# it seems we have to keep the logics inside a loop, otherwise connection will be terminated
|
|
||||||
client_list.add(sock)
|
|
||||||
while True:
|
|
||||||
ev = app.modem_events.get()
|
|
||||||
transmit_sock_data(ev)
|
|
||||||
|
|
||||||
# FFT Websocket
|
|
||||||
@sock.route('/fft')
|
|
||||||
def sock_fft(sock):
|
|
||||||
# it seems we have to keep the logics inside a loop, otherwise connection will be terminated
|
|
||||||
client_list.add(sock)
|
|
||||||
while True:
|
|
||||||
fft = app.modem_fft.get()
|
|
||||||
transmit_sock_data(fft)
|
|
||||||
|
|
||||||
@sock.route('/states')
|
|
||||||
def sock_states(sock):
|
|
||||||
# it seems we have to keep the logics inside a loop, otherwise connection will be terminated
|
|
||||||
client_list.add(sock)
|
|
||||||
while True:
|
|
||||||
state = app.state_queue.get()
|
|
||||||
transmit_sock_data(state)
|
|
||||||
|
|
||||||
|
|
||||||
# @app.route('/modem/listen', methods=['POST']) # not needed if we are restarting modem on changing settings
|
# @app.route('/modem/listen', methods=['POST']) # not needed if we are restarting modem on changing settings
|
||||||
# @app.route('/modem/record_audio', methods=['POST'])
|
# @app.route('/modem/record_audio', methods=['POST'])
|
||||||
# @app.route('/modem/responde_to_call', methods=['POST']) # not needed if we are restarting modem on changing settings
|
# @app.route('/modem/responde_to_call', methods=['POST']) # not needed if we are restarting modem on changing settings
|
||||||
# @app.route('/modem/responde_to_cq', methods=['POST']) # not needed if we are restarting modem on changing settings
|
# @app.route('/modem/responde_to_cq', methods=['POST']) # not needed if we are restarting modem on changing settings
|
||||||
# @app.route('/modem/audio_levels', methods=['POST']) # tx and rx # not needed if we are restarting modem on changing settings
|
# @app.route('/modem/audio_levels', methods=['POST']) # tx and rx # not needed if we are restarting modem on changing settings
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# @app.route('/modem/mesh_ping', methods=['POST'])
|
# @app.route('/modem/mesh_ping', methods=['POST'])
|
||||||
|
|
||||||
# @app.route('/mesh/routing_table', methods=['GET'])
|
# @app.route('/mesh/routing_table', methods=['GET'])
|
||||||
# @app.route('/modem/get_rx_buffer', methods=['GET'])
|
# @app.route('/modem/get_rx_buffer', methods=['GET'])
|
||||||
# @app.route('/modem/del_rx_buffer', methods=['POST'])
|
# @app.route('/modem/del_rx_buffer', methods=['POST'])
|
||||||
|
|
||||||
# @app.route('/rig/status', methods=['GET'])
|
# @app.route('/rig/status', methods=['GET'])
|
||||||
# @app.route('/rig/mode', methods=['POST'])
|
# @app.route('/rig/mode', methods=['POST'])
|
||||||
# @app.route('/rig/frequency', methods=['POST'])
|
# @app.route('/rig/frequency', methods=['POST'])
|
||||||
# @app.route('/rig/test_hamlib', methods=['POST'])
|
# @app.route('/rig/test_hamlib', methods=['POST'])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def transmit_sock_data_worker(client_list, event_queue):
|
||||||
|
while True:
|
||||||
|
event = event_queue.get()
|
||||||
|
clients = client_list.copy()
|
||||||
|
for client in clients:
|
||||||
|
try:
|
||||||
|
client.send(event)
|
||||||
|
except Exception:
|
||||||
|
client_list.remove(client)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def sock_watchdog(sock, client_list, event_queue):
|
||||||
|
event_queue.put(json.dumps({"freedata-message": "hello-client"}))
|
||||||
|
|
||||||
|
client_list.add(sock)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
sock.receive(timeout=1)
|
||||||
|
except Exception as e:
|
||||||
|
print(e)
|
||||||
|
client_list.remove(sock)
|
||||||
|
break
|
||||||
|
return
|
||||||
|
|
||||||
|
# Event websocket
|
||||||
|
@sock.route('/events')
|
||||||
|
def sock_events(sock):
|
||||||
|
sock_watchdog(sock, events_client_list, app.modem_events)
|
||||||
|
|
||||||
|
@sock.route('/fft')
|
||||||
|
def sock_fft(sock):
|
||||||
|
sock_watchdog(sock, fft_client_list, app.modem_fft)
|
||||||
|
|
||||||
|
@sock.route('/states')
|
||||||
|
def sock_states(sock):
|
||||||
|
sock_watchdog(sock, states_client_list, app.state_queue)
|
||||||
|
|
||||||
|
# websocket multi client support for using with queued information.
|
||||||
|
# our client set which contains all connected websocket clients
|
||||||
|
events_client_list = set()
|
||||||
|
fft_client_list = set()
|
||||||
|
states_client_list = set()
|
||||||
|
|
||||||
|
# start a worker thread for every socket endpoint
|
||||||
|
events_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(events_client_list, app.modem_events))
|
||||||
|
events_thread.start()
|
||||||
|
|
||||||
|
states_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(states_client_list, app.state_queue))
|
||||||
|
states_thread.start()
|
||||||
|
|
||||||
|
fft_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(fft_client_list, app.modem_fft))
|
||||||
|
fft_thread.start()
|
||||||
|
|
|
@ -7,7 +7,7 @@ log = structlog.get_logger("COMMANDS")
|
||||||
def cqcqcq():
|
def cqcqcq():
|
||||||
try:
|
try:
|
||||||
DATA_QUEUE_TRANSMIT.put(["CQ"])
|
DATA_QUEUE_TRANSMIT.put(["CQ"])
|
||||||
|
return
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
log.warning("[CMD] error while transmiting CQ", e=err)
|
log.warning("[CMD] error while transmiting CQ", e=err)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue