diff --git a/modem/data_handler.py b/modem/data_handler.py index f3eb3f6c..165f1030 100644 --- a/modem/data_handler.py +++ b/modem/data_handler.py @@ -285,8 +285,10 @@ class DATA: def worker_transmit(self) -> None: """Dispatch incoming UI instructions for transmitting operations""" while True: - data = self.data_queue_transmit.get() + print("ja?") + data = self.data_queue_transmit.get() + print(data) # if we are already in ARQ_STATE, or we're receiving codec2 traffic # let's wait with processing data # this should avoid weird toggle states where both stations @@ -363,7 +365,7 @@ class DATA: self.log.error( "[Modem] worker_transmit: received invalid command:", data=data ) - + print("jaaaa") def worker_receive(self) -> None: """Queue received data for processing""" while True: @@ -2856,7 +2858,6 @@ class DATA: else: self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - def received_cq(self, data_in: bytes, snr) -> None: """ Called when we receive a CQ frame diff --git a/modem/server.py b/modem/server.py index b2a7ad91..b73001a3 100644 --- a/modem/server.py +++ b/modem/server.py @@ -9,15 +9,17 @@ import queue import server_commands import service_manager import state_manager -import explorer from static import Modem as modeminfo - +import threading +import ujson as json app = Flask(__name__) CORS(app) CORS(app, resources={r"/*": {"origins": "*"}}) sock = Sock(app) - +app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 5} +print(app.config) +print(app.config['SOCK_SERVER_OPTIONS']) # set config file to use def set_config(): if 'FREEDATA_CONFIG' in os.environ: @@ -26,9 +28,9 @@ def set_config(): config_file = 'config.ini' if os.path.exists(config_file): - print("Using config from %s" % config_file) + print(f"Using config from {config_file}") else: - print("Config file '%s' not found. Exiting." % config_file) + print(f"Config file '{config_file}' not found. Exiting.") exit(1) app.config_manager = CONFIG(config_file) @@ -36,7 +38,7 @@ def set_config(): set_config() # start modem -app.state_queue = queue.Queue() # queue which holds latest events +app.state_queue = queue.Queue() # queue which holds latest states app.modem_events = queue.Queue() # queue which holds latest events app.modem_fft = queue.Queue() # queue which holds lates fft data app.modem_service = queue.Queue() # start / stop modem service @@ -94,42 +96,48 @@ def get_serial_devices(): def post_cqcqcq(): if request.method not in ['POST']: return api_response({"info": "endpoint for triggering a CQ via POST"}) - server_commands.cqcqcq() + if app.states.is_modem_running: + server_commands.cqcqcq() return api_response({"cmd": "cqcqcq"}) @app.route('/modem/beacon', methods=['POST']) def post_beacon(): if request.method not in ['POST']: return api_response({"info": "endpoint for controlling BEACON STATE via POST"}) - server_commands.beacon(request.json) + if app.states.is_modem_running: + server_commands.beacon(request.json) return api_response(request.json) @app.route('/modem/ping_ping', methods=['POST']) def post_ping(): if request.method not in ['POST']: return api_response({"info": "endpoint for controlling PING via POST"}) - server_commands.ping_ping(request.json) + if app.states.is_modem_running: + server_commands.ping_ping(request.json) return api_response(request.json) @app.route('/modem/send_test_frame', methods=['POST']) def post_send_test_frame(): if request.method not in ['POST']: return api_response({"info": "endpoint for triggering a TEST_FRAME via POST"}) - server_commands.modem_send_test_frame() + if app.states.is_modem_running: + server_commands.modem_send_test_frame() return api_response({"cmd": "test_frame"}) @app.route('/modem/fec_transmit', methods=['POST']) def post_send_fec_frame(): if request.method not in ['POST']: return api_response({"info": "endpoint for triggering a FEC frame via POST"}) - server_commands.modem_fec_transmit(request.json) + if app.states.is_modem_running: + server_commands.modem_fec_transmit(request.json) return api_response(request.json) @app.route('/modem/fec_is_writing', methods=['POST']) def post_send_fec_is_writing(): if request.method not in ['POST']: return api_response({"info": "endpoint for triggering a IS WRITING frame via POST"}) - server_commands.modem_fec_is_writing(request.json) + if app.states.is_modem_running: + server_commands.modem_fec_is_writing(request.json) return api_response(request.json) @app.route('/modem/start', methods=['POST']) @@ -158,66 +166,74 @@ def get_modem_version(): # @app.route('/modem/arq_disconnect', methods=['POST']) # @app.route('/modem/send_raw', methods=['POST']) # @app.route('/modem/stop_transmission', methods=['POST']) - - - -# our client set which contains all connected websocket clients -client_list = set() -# our transmit function which also handles client management -def transmit_sock_data(data): - try: - for client in client_list: - try: - client.send(data) - except Exception: - # print("client not connected anymore") - client_list.remove(client) - except RuntimeError: - # print("set changed during iteration") - pass - -# Event websocket -@sock.route('/events') -def sock_events(sock): - # it seems we have to keep the logics inside a loop, otherwise connection will be terminated - client_list.add(sock) - while True: - ev = app.modem_events.get() - transmit_sock_data(ev) - -# FFT Websocket -@sock.route('/fft') -def sock_fft(sock): - # it seems we have to keep the logics inside a loop, otherwise connection will be terminated - client_list.add(sock) - while True: - fft = app.modem_fft.get() - transmit_sock_data(fft) - -@sock.route('/states') -def sock_states(sock): - # it seems we have to keep the logics inside a loop, otherwise connection will be terminated - client_list.add(sock) - while True: - state = app.state_queue.get() - transmit_sock_data(state) - - # @app.route('/modem/listen', methods=['POST']) # not needed if we are restarting modem on changing settings # @app.route('/modem/record_audio', methods=['POST']) # @app.route('/modem/responde_to_call', methods=['POST']) # not needed if we are restarting modem on changing settings # @app.route('/modem/responde_to_cq', methods=['POST']) # not needed if we are restarting modem on changing settings # @app.route('/modem/audio_levels', methods=['POST']) # tx and rx # not needed if we are restarting modem on changing settings - - - # @app.route('/modem/mesh_ping', methods=['POST']) - # @app.route('/mesh/routing_table', methods=['GET']) # @app.route('/modem/get_rx_buffer', methods=['GET']) # @app.route('/modem/del_rx_buffer', methods=['POST']) - # @app.route('/rig/status', methods=['GET']) # @app.route('/rig/mode', methods=['POST']) # @app.route('/rig/frequency', methods=['POST']) -# @app.route('/rig/test_hamlib', methods=['POST']) \ No newline at end of file +# @app.route('/rig/test_hamlib', methods=['POST']) + + + + + +def transmit_sock_data_worker(client_list, event_queue): + while True: + event = event_queue.get() + clients = client_list.copy() + for client in clients: + try: + client.send(event) + except Exception: + client_list.remove(client) + + + +def sock_watchdog(sock, client_list, event_queue): + event_queue.put(json.dumps({"freedata-message": "hello-client"})) + + client_list.add(sock) + while True: + try: + sock.receive(timeout=1) + except Exception as e: + print(e) + client_list.remove(sock) + break + return + +# Event websocket +@sock.route('/events') +def sock_events(sock): + sock_watchdog(sock, events_client_list, app.modem_events) + +@sock.route('/fft') +def sock_fft(sock): + sock_watchdog(sock, fft_client_list, app.modem_fft) + +@sock.route('/states') +def sock_states(sock): + sock_watchdog(sock, states_client_list, app.state_queue) + +# websocket multi client support for using with queued information. +# our client set which contains all connected websocket clients +events_client_list = set() +fft_client_list = set() +states_client_list = set() + +# start a worker thread for every socket endpoint +events_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(events_client_list, app.modem_events)) +events_thread.start() + +states_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(states_client_list, app.state_queue)) +states_thread.start() + +fft_thread = threading.Thread(target=transmit_sock_data_worker, daemon=True, args=(fft_client_list, app.modem_fft)) +fft_thread.start() diff --git a/modem/server_commands.py b/modem/server_commands.py index 11a5ca5e..36423e62 100644 --- a/modem/server_commands.py +++ b/modem/server_commands.py @@ -7,7 +7,7 @@ log = structlog.get_logger("COMMANDS") def cqcqcq(): try: DATA_QUEUE_TRANSMIT.put(["CQ"]) - + return except Exception as err: log.warning("[CMD] error while transmiting CQ", e=err)