FreeDATA/modem/server.py

76 lines
2 KiB
Python
Raw Normal View History

2023-11-06 12:49:37 +00:00
from flask import Flask, request, jsonify
from flask_sock import Sock
2023-11-06 14:36:11 +00:00
import os
import serial_ports
2023-11-06 12:49:37 +00:00
from config import CONFIG
import audio
2023-11-06 12:49:37 +00:00
app = Flask(__name__)
sock = Sock(app)
2023-11-06 14:36:11 +00:00
# set config file to use
def set_config():
if 'FREEDATA_CONFIG' in os.environ:
config_file = os.environ['FREEDATA_CONFIG']
else:
config_file = 'config.ini'
2023-11-06 12:49:37 +00:00
2023-11-06 14:36:11 +00:00
if os.path.exists(config_file):
print("Using config from %s" % config_file)
else:
print("Config file '%s' not found. Exiting." % config_file)
exit(1)
2023-11-06 12:49:37 +00:00
2023-11-06 14:36:11 +00:00
app.config_manager = CONFIG(config_file)
# returns a standard API response
2023-11-06 12:49:37 +00:00
def api_response(data, status = 'ok'):
response = {
'status': status,
'data': data
}
return jsonify(response)
2023-11-06 14:36:11 +00:00
set_config()
2023-11-06 12:49:37 +00:00
## REST API
@app.route('/', methods=['GET'])
def index():
2023-11-06 14:36:11 +00:00
return api_response({'name': 'FreeDATA API',
2023-11-06 12:49:37 +00:00
'description': '',
'api_version': 1,
'license': 'GPL3.0',
'documentation': 'https://wiki.freedata.app',
})
# get and set config
@app.route('/config', methods=['GET', 'POST'])
def config():
if request.method == 'POST':
2023-11-06 19:52:33 +00:00
set_config = app.config_manager.write(request.json)
2023-11-06 12:49:37 +00:00
if not set_config:
response = api_response(None, 'error writing config')
else:
response = api_response(set_config)
return response
elif request.method == 'GET':
2023-11-06 14:36:11 +00:00
return api_response(app.config_manager.read())
2023-11-06 12:49:37 +00:00
@app.route('/devices/audio', methods=['GET'])
def get_audio_devices():
dev_in, dev_out = audio.get_audio_devices()
devices = { 'in': dev_in, 'out': dev_out }
return api_response(devices)
@app.route('/devices/serial', methods=['GET'])
def get_serial_devices():
devices = serial_ports.get_ports()
return api_response(devices)
2023-11-06 12:49:37 +00:00
# Event websocket
@sock.route('/events')
def echo(sock):
while True:
data = sock.receive()
sock.send(data)