mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
added socket interface to modem
This commit is contained in:
parent
6b4bdb4d7d
commit
7714c7aeb6
7 changed files with 213 additions and 121 deletions
|
@ -50,6 +50,12 @@ enable_morse_identifier = False
|
|||
respond_to_cq = True
|
||||
tx_delay = 200
|
||||
|
||||
[SOCKET_INTERFACE]
|
||||
enable = False
|
||||
host = 127.0.0.1
|
||||
cmd_port = 8000
|
||||
data_port = 8001
|
||||
|
||||
[MESSAGES]
|
||||
enable_auto_repeat = False
|
||||
|
||||
|
|
|
@ -61,6 +61,13 @@ class CONFIG:
|
|||
'respond_to_cq': bool,
|
||||
'tx_delay': int
|
||||
},
|
||||
'SOCKET_INTERFACE': {
|
||||
'enable' : bool,
|
||||
'host' : str,
|
||||
'cmd_port' : int,
|
||||
'data_port' : int,
|
||||
|
||||
},
|
||||
'MESSAGES': {
|
||||
'enable_auto_repeat': bool,
|
||||
}
|
||||
|
|
|
@ -327,7 +327,7 @@ def sock_states(sock):
|
|||
@atexit.register
|
||||
def stop_server():
|
||||
try:
|
||||
app.service_manager.stop_modem()
|
||||
app.service_manager.modem_service.put("stop")
|
||||
if app.service_manager.modem:
|
||||
app.service_manager.modem.sd_input_stream.stop
|
||||
audio.sd._terminate()
|
||||
|
|
|
@ -5,7 +5,7 @@ import structlog
|
|||
import audio
|
||||
|
||||
import radio_manager
|
||||
|
||||
from socket_interface import SocketInterfaceHandler
|
||||
|
||||
class SM:
|
||||
def __init__(self, app):
|
||||
|
@ -19,7 +19,7 @@ class SM:
|
|||
self.state_manager = app.state_manager
|
||||
self.event_manager = app.event_manager
|
||||
self.schedule_manager = app.schedule_manager
|
||||
|
||||
self.socket_interface_manager = SocketInterfaceHandler(self.config, self.state_manager)
|
||||
|
||||
runner_thread = threading.Thread(
|
||||
target=self.runner, name="runner thread", daemon=True
|
||||
|
@ -34,9 +34,13 @@ class SM:
|
|||
self.start_radio_manager()
|
||||
self.start_modem()
|
||||
|
||||
self.socket_interface_manager.start_servers()
|
||||
|
||||
elif cmd in ['stop'] and self.modem:
|
||||
self.stop_modem()
|
||||
self.stop_radio_manager()
|
||||
|
||||
self.socket_interface_manager.stop_servers()
|
||||
# we need to wait a bit for avoiding a portaudio crash
|
||||
threading.Event().wait(0.5)
|
||||
|
||||
|
|
193
modem/socket_interface.py
Normal file
193
modem/socket_interface.py
Normal file
|
@ -0,0 +1,193 @@
|
|||
import socketserver
|
||||
import threading
|
||||
import logging
|
||||
import structlog
|
||||
import select
|
||||
from queue import Queue
|
||||
|
||||
from command_p2p_connection import P2PConnectionCommand
|
||||
|
||||
# Shared queue for command and data handlers
|
||||
data_queue = Queue()
|
||||
|
||||
class CommandHandler(socketserver.BaseRequestHandler):
|
||||
def __init__(self, request, client_address, server):
|
||||
self.logger = structlog.get_logger(type(self).__name__)
|
||||
|
||||
self.handlers = {
|
||||
'CONNECT': self.handle_connect,
|
||||
'DISCONNECT': self.handle_disconnect,
|
||||
'MYCALL': self.handle_mycall,
|
||||
'BW': self.handle_bw,
|
||||
'ABORT': self.handle_abort,
|
||||
'PUBLIC': self.handle_public,
|
||||
'CWID': self.handle_cwid,
|
||||
'LISTEN': self.handle_listen,
|
||||
'COMPRESSION': self.handle_compression,
|
||||
'WINLINK SESSION': self.handle_winlink_session,
|
||||
}
|
||||
super().__init__(request, client_address, server)
|
||||
|
||||
def log(self, message, isWarning = False):
|
||||
msg = f"[{type(self).__name__}]: {message}"
|
||||
logger = self.logger.warn if isWarning else self.logger.info
|
||||
logger(msg)
|
||||
|
||||
def handle(self):
|
||||
self.log(f"Client connected: {self.client_address}")
|
||||
try:
|
||||
while True:
|
||||
data = self.request.recv(1024).strip()
|
||||
if not data:
|
||||
break
|
||||
decoded_data = data.decode()
|
||||
self.log(f"Command received from {self.client_address}: {decoded_data}")
|
||||
self.parse_command(decoded_data)
|
||||
finally:
|
||||
self.log(f"Command connection closed with {self.client_address}")
|
||||
|
||||
def parse_command(self, data):
|
||||
for command in self.handlers:
|
||||
if data.startswith(command):
|
||||
# Extract command arguments after the command itself
|
||||
args = data[len(command):].strip().split()
|
||||
self.dispatch_command(command, args)
|
||||
return
|
||||
self.send_response("ERROR: Unknown command\r\n")
|
||||
|
||||
def dispatch_command(self, command, data):
|
||||
if command in self.handlers:
|
||||
handler = self.handlers[command]
|
||||
handler(data)
|
||||
else:
|
||||
self.send_response(f"Unknown command: {command}")
|
||||
|
||||
def send_response(self, message):
|
||||
self.request.sendall(message.encode())
|
||||
|
||||
# Command handlers
|
||||
def handle_connect(self, data):
|
||||
# Your existing connect logic
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_disconnect(self, data):
|
||||
# Your existing disconnect logic
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_mycall(self, data):
|
||||
# Logic for handling MYCALL command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_bw(self, data):
|
||||
# Logic for handling BW command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_abort(self, data):
|
||||
# Logic for handling ABORT command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_public(self, data):
|
||||
# Logic for handling PUBLIC command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_cwid(self, data):
|
||||
# Logic for handling CWID command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_listen(self, data):
|
||||
# Logic for handling LISTEN command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_compression(self, data):
|
||||
# Logic for handling COMPRESSION command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
def handle_winlink_session(self, data):
|
||||
# Logic for handling WINLINK SESSION command
|
||||
self.send_response("OK\r\n")
|
||||
|
||||
class DataHandler(socketserver.BaseRequestHandler):
|
||||
def __init__(self, request, client_address, server):
|
||||
self.logger = structlog.get_logger(type(self).__name__)
|
||||
|
||||
super().__init__(request, client_address, server)
|
||||
|
||||
def log(self, message, isWarning = False):
|
||||
msg = f"[{type(self).__name__}]: {message}"
|
||||
logger = self.logger.warn if isWarning else self.logger.info
|
||||
logger(msg)
|
||||
def handle(self):
|
||||
self.log(f"Data connection established with {self.client_address}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout
|
||||
if ready_to_read:
|
||||
self.data = self.request.recv(1024).strip()
|
||||
if not self.data:
|
||||
break
|
||||
try:
|
||||
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}")
|
||||
except:
|
||||
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}")
|
||||
|
||||
# Check if there's something to send from the queue, without blocking
|
||||
if not data_queue.empty():
|
||||
data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking
|
||||
self.request.sendall(data_to_send)
|
||||
self.log(f"Sent data to {self.client_address}")
|
||||
|
||||
finally:
|
||||
self.log(f"Data connection closed with {self.client_address}")
|
||||
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
class SocketInterfaceHandler:
|
||||
def __init__(self, config, state_manager):
|
||||
self.config = config
|
||||
self.state_manager = state_manager
|
||||
self.logger = structlog.get_logger(type(self).__name__)
|
||||
self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"]
|
||||
self.data_port = self.config["SOCKET_INTERFACE"]["data_port"]
|
||||
self.command_server = None
|
||||
self.data_server = None
|
||||
self.command_server_thread = None
|
||||
self.data_server_thread = None
|
||||
|
||||
def log(self, message, isWarning = False):
|
||||
msg = f"[{type(self).__name__}]: {message}"
|
||||
logger = self.logger.warn if isWarning else self.logger.info
|
||||
logger(msg)
|
||||
|
||||
def start_servers(self):
|
||||
# Method to start both command and data server threads
|
||||
self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandHandler))
|
||||
self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataHandler))
|
||||
|
||||
self.command_server_thread.start()
|
||||
self.data_server_thread.start()
|
||||
|
||||
self.log(f"Interfaces started")
|
||||
|
||||
def run_server(self, port, handler):
|
||||
with ThreadedTCPServer(('127.0.0.1', port), handler) as server:
|
||||
self.log(f"Server started on port {port}")
|
||||
if port == self.command_port:
|
||||
self.command_server = server
|
||||
else:
|
||||
self.data_server = server
|
||||
server.serve_forever()
|
||||
|
||||
def stop_servers(self):
|
||||
# Gracefully shutdown the server
|
||||
if self.command_server:
|
||||
self.command_server.shutdown()
|
||||
if self.data_server:
|
||||
self.data_server.shutdown()
|
||||
self.log(f"Interfaces stopped")
|
||||
|
||||
def wait_for_server_threads(self):
|
||||
# Wait for both server threads to finish
|
||||
self.command_server_thread.join()
|
||||
self.data_server_thread.join()
|
|
@ -1,116 +0,0 @@
|
|||
import socketserver
|
||||
import threading
|
||||
import logging
|
||||
import signal
|
||||
import sys
|
||||
import select
|
||||
from queue import Queue
|
||||
|
||||
from command_p2p_connection import P2PConnectionCommand
|
||||
|
||||
# Shared queue for command and data handlers
|
||||
data_queue = Queue()
|
||||
# Initialize logging
|
||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||||
|
||||
|
||||
class VARACommandHandler(socketserver.BaseRequestHandler):
|
||||
mycall = None # Class attribute to store mycall
|
||||
dxcall = None
|
||||
bandwidth = None # Class attribute to store bandwidth
|
||||
|
||||
|
||||
def handle(self):
|
||||
logging.info(f"Command connection established with {self.client_address}")
|
||||
try:
|
||||
while True:
|
||||
self.data = self.request.recv(1024).strip()
|
||||
if not self.data:
|
||||
break
|
||||
logging.info(f"Command received from {self.client_address}: {self.data}")
|
||||
|
||||
if self.data.startswith(b'MYCALL '):
|
||||
VARACommandHandler.mycall = self.data.split(b' ')[1].strip()
|
||||
self.request.sendall(b"OK\r\n")
|
||||
elif self.data.startswith(b'BW'):
|
||||
VARACommandHandler.bandwidth = self.data[2:].strip()
|
||||
self.request.sendall(b"OK\r\n")
|
||||
elif self.data.startswith(b'CONNECT '):
|
||||
|
||||
P2PConnectionCommand.connect('MYCALL', 'DXCALL', 'BANDWIDTH')
|
||||
|
||||
self.request.sendall(b"OK\r\n")
|
||||
parts = self.data.split()
|
||||
if len(parts) >= 3 and VARACommandHandler.mycall and VARACommandHandler.bandwidth:
|
||||
VARACommandHandler.dxcall = parts[2]
|
||||
# Using the stored mycall and bandwidth for the response
|
||||
bytestring = b'CONNECTED ' + VARACommandHandler.mycall + b' ' + VARACommandHandler.dxcall + b' ' + VARACommandHandler.bandwidth + b'\r\n'
|
||||
self.request.sendall(bytestring)
|
||||
|
||||
else:
|
||||
self.request.sendall(b"ERROR: MYCALL or Bandwidth not set.\r\n")
|
||||
elif self.data.startswith(b'ABORT'):
|
||||
bytestring = b'DISCONNECTED\r\n'
|
||||
elif self.data.startswith(b'DISCONNECT'):
|
||||
bytestring = b'DISCONNECTED\r\n'
|
||||
self.request.sendall(bytestring)
|
||||
else:
|
||||
self.request.sendall(b"OK\r\n")
|
||||
|
||||
finally:
|
||||
logging.info(f"Command connection closed with {self.client_address}")
|
||||
|
||||
|
||||
class VARADataHandler(socketserver.BaseRequestHandler):
|
||||
def handle(self):
|
||||
logging.info(f"Data connection established with {self.client_address}")
|
||||
|
||||
try:
|
||||
while True:
|
||||
ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout
|
||||
if ready_to_read:
|
||||
self.data = self.request.recv(1024).strip()
|
||||
if not self.data:
|
||||
break
|
||||
try:
|
||||
logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}")
|
||||
except:
|
||||
logging.info(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}")
|
||||
|
||||
|
||||
|
||||
# Check if there's something to send from the queue, without blocking
|
||||
if not data_queue.empty():
|
||||
data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking
|
||||
self.request.sendall(data_to_send)
|
||||
logging.info(f"Sent data to {self.client_address}")
|
||||
|
||||
finally:
|
||||
logging.info(f"Data connection closed with {self.client_address}")
|
||||
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
def run_server(port, handler):
|
||||
with ThreadedTCPServer(('127.0.0.1', port), handler) as server:
|
||||
logging.info(f"Server running on port {port}")
|
||||
server.serve_forever()
|
||||
|
||||
|
||||
def signal_handler(sig, frame):
|
||||
sys.exit(0)
|
||||
|
||||
if __name__ == '__main__':
|
||||
# Setup signal handler for graceful shutdown
|
||||
signal.signal(signal.SIGINT, signal_handler)
|
||||
|
||||
# Create server threads for command and data ports
|
||||
command_server_thread = threading.Thread(target=run_server, args=(8300, VARACommandHandler))
|
||||
data_server_thread = threading.Thread(target=run_server, args=(8301, VARADataHandler))
|
||||
|
||||
# Start the server threads
|
||||
command_server_thread.start()
|
||||
data_server_thread.start()
|
||||
|
||||
# Wait for both server threads to finish
|
||||
command_server_thread.join()
|
||||
data_server_thread.join()
|
|
@ -151,8 +151,6 @@ class TestP2PConnectionSession(unittest.TestCase):
|
|||
self.iss_state_manager.register_p2p_connection_session(session)
|
||||
session.connect()
|
||||
|
||||
|
||||
|
||||
self.waitAndCloseChannels()
|
||||
del cmd
|
||||
|
||||
|
|
Loading…
Reference in a new issue