more clean server shutdown

This commit is contained in:
DJ2LS 2024-04-21 13:24:04 +02:00
parent 3a079073e3
commit acb5b03d51
5 changed files with 37 additions and 33 deletions

View file

@ -42,6 +42,9 @@ class Demodulator():
self.fft_queue = fft_queue self.fft_queue = fft_queue
# Audio Stream object
self.stream = None
# init codec2 resampler # init codec2 resampler
self.resampler = codec2.resampler() self.resampler = codec2.resampler()
@ -50,9 +53,6 @@ class Demodulator():
# enable decoding of signalling modes # enable decoding of signalling modes
self.MODE_DICT[codec2.FREEDV_MODE.signalling.value]["decode"] = True self.MODE_DICT[codec2.FREEDV_MODE.signalling.value]["decode"] = True
self.MODE_DICT[codec2.FREEDV_MODE.signalling_ack.value]["decode"] = True self.MODE_DICT[codec2.FREEDV_MODE.signalling_ack.value]["decode"] = True
self.MODE_DICT[codec2.FREEDV_MODE.data_ofdm_2438.value]["decode"] = True
#self.MODE_DICT[codec2.FREEDV_MODE.qam16c2.value]["decode"] = True
tci_rx_callback_thread = threading.Thread( tci_rx_callback_thread = threading.Thread(
target=self.tci_rx_callback, target=self.tci_rx_callback,
@ -114,7 +114,6 @@ class Demodulator():
self.MODE_DICT[mode]["nin"] = nin self.MODE_DICT[mode]["nin"] = nin
def start(self, stream): def start(self, stream):
self.stream = stream self.stream = stream
for mode in self.MODE_DICT: for mode in self.MODE_DICT:
@ -153,7 +152,7 @@ class Demodulator():
state_buffer = self.MODE_DICT[mode]["state_buffer"] state_buffer = self.MODE_DICT[mode]["state_buffer"]
mode_name = self.MODE_DICT[mode]["name"] mode_name = self.MODE_DICT[mode]["name"]
try: try:
while self.stream.active: while self.stream and self.stream.active:
threading.Event().wait(0.01) threading.Event().wait(0.01)
while audiobuffer.nbuffer >= nin: while audiobuffer.nbuffer >= nin:
# demodulate audio # demodulate audio
@ -209,11 +208,13 @@ class Demodulator():
except Exception as e: except Exception as e:
error_message = str(e) error_message = str(e)
# we expect this error when shutdown # we expect this error when shutdown
if "PortAudio not initialized" in error_message: if error_message in ["PortAudio not initialized [PaErrorCode -10000]", "Invalid stream pointer [PaErrorCode -9988]"]:
e = None return
self.log.debug( else:
self.log.warning(
"[MDM] [demod_audio] demod loop ended", mode=mode_name, e=e "[MDM] [demod_audio] demod loop ended", mode=mode_name, e=e
) )
audio.sd._terminate()
def tci_rx_callback(self) -> None: def tci_rx_callback(self) -> None:
""" """

View file

@ -115,9 +115,10 @@ class RF:
# self.stream = lambda: None # self.stream = lambda: None
# self.stream.active = False # self.stream.active = False
# self.stream.stop # self.stream.stop
self.sd_input_stream.close()
except Exception: self.sd_output_stream.close()
self.log.error("[MDM] Error stopping freedata_server") except Exception as e:
self.log.error("[MDM] Error stopping freedata_server", e=e)
def init_audio(self): def init_audio(self):
self.log.info(f"[MDM] init: get audio devices", input_device=self.audio_input_device, self.log.info(f"[MDM] init: get audio devices", input_device=self.audio_input_device,

View file

@ -339,10 +339,7 @@ def stop_server():
# if app.service_manager.modem: # if app.service_manager.modem:
# app.service_manager.modem.sd_input_stream.stop # app.service_manager.modem.sd_input_stream.stop
audio.sd._terminate()
#time.sleep(1) #time.sleep(1)
print('Server shutdown...')
print("------------------------------------------")
def main(): def main():
app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 10} app.config['SOCK_SERVER_OPTIONS'] = {'ping_interval': 10}

View file

@ -36,6 +36,8 @@ class SM:
if self.config['SOCKET_INTERFACE']['enable']: if self.config['SOCKET_INTERFACE']['enable']:
self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers() self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers()
else:
self.socket_interface_manager = None
elif cmd in ['stop'] and self.modem: elif cmd in ['stop'] and self.modem:
self.stop_modem() self.stop_modem()
@ -49,9 +51,9 @@ class SM:
self.stop_modem() self.stop_modem()
self.stop_radio_manager() self.stop_radio_manager()
if self.config['SOCKET_INTERFACE']['enable'] and self.socket_interface_manager: if self.config['SOCKET_INTERFACE']['enable'] and self.socket_interface_manager:
self.socket_interface_manager.stop_servers() self.socket_interface_manager.stop_servers()
del self.socket_interface_manager
self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers()
# we need to wait a bit for avoiding a portaudio crash # we need to wait a bit for avoiding a portaudio crash
threading.Event().wait(0.5) threading.Event().wait(0.5)
@ -64,7 +66,6 @@ class SM:
else: else:
self.log.warning("[SVC] freedata_server command processing failed", cmd=cmd, state=self.state_manager.is_modem_running) self.log.warning("[SVC] freedata_server command processing failed", cmd=cmd, state=self.state_manager.is_modem_running)
def start_modem(self): def start_modem(self):
if self.config['STATION']['mycall'] in ['XX1XXX']: if self.config['STATION']['mycall'] in ['XX1XXX']:
@ -103,6 +104,7 @@ class SM:
def stop_modem(self): def stop_modem(self):
self.log.info("stopping freedata_server....") self.log.info("stopping freedata_server....")
self.modem.stop_modem()
del self.modem del self.modem
self.modem = False self.modem = False
self.state_manager.set("is_modem_running", False) self.state_manager.set("is_modem_running", False)

View file

@ -7,7 +7,6 @@ import select
from queue import Queue from queue import Queue
from socket_interface_commands import SocketCommandHandler from socket_interface_commands import SocketCommandHandler
class CommandSocket(socketserver.BaseRequestHandler): class CommandSocket(socketserver.BaseRequestHandler):
#def __init__(self, request, client_address, server): #def __init__(self, request, client_address, server):
def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None): def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None):
@ -88,7 +87,6 @@ class DataSocket(socketserver.BaseRequestHandler):
def handle(self): def handle(self):
self.log(f"Data connection established with {self.client_address}") self.log(f"Data connection established with {self.client_address}")
try: try:
while True: while True:
@ -171,23 +169,28 @@ class SocketInterfaceHandler:
self.log(f"Interfaces started") self.log(f"Interfaces started")
def run_server(self, port, handler): def run_server(self, port, handler):
try:
with CustomThreadedTCPServer(('127.0.0.1', port), handler, modem=self.modem, state_manager=self.state_manager, event_manager=self.event_manager, config_manager=self.config_manager) as server: with CustomThreadedTCPServer(('127.0.0.1', port), handler, modem=self.modem, state_manager=self.state_manager, event_manager=self.event_manager, config_manager=self.config_manager) as server:
self.log(f"Server started on port {port}") self.log(f"Server starting on port {port}")
if port == self.command_port: if port == self.command_port:
self.command_server = server self.command_server = server
else: else:
self.data_server = server self.data_server = server
server.serve_forever() server.serve_forever()
self.log(f"Server started on port {port}")
except Exception as e:
self.log(f"Error starting server on port {port} | {e}", isWarning=True)
def stop_servers(self): def stop_servers(self):
# Gracefully shutdown the server # Gracefully shutdown the server
if self.command_server: if self.command_server:
self.command_server.shutdown() self.command_server.shutdown()
self.command_server_thread.join()
del self.command_server
if self.data_server: if self.data_server:
self.data_server.shutdown() self.data_server.shutdown()
self.log(f"Interfaces stopped")
def wait_for_server_threads(self):
# Wait for both server threads to finish
self.command_server_thread.join()
self.data_server_thread.join() self.data_server_thread.join()
del self.data_server
self.log(f"Interfaces stopped")