introduced schedule manager

This commit is contained in:
DJ2LS 2024-02-02 19:37:02 +01:00
parent 6a352e63b7
commit b70a209899
14 changed files with 221 additions and 147 deletions

View file

@ -157,9 +157,9 @@ export async function getFreedataMessages(){
processFreedataMessages(res)
}
export async function sendFreedataMessage(dxcall, body) {
export async function sendFreedataMessage(destination, body) {
return await apiPost("/freedata/messages", {
dxcall: dxcall,
destination: destination,
body: body,
});
}

View file

@ -13,9 +13,11 @@ class ARQ_SESSION_TYPES(Enum):
p2pmsg_lzma = 20
class ARQDataTypeHandler:
def __init__(self, event_manager):
def __init__(self, event_manager, state_manager):
self.logger = structlog.get_logger(type(self).__name__)
self.event_manager = event_manager
self.state_manager = state_manager
self.handlers = {
ARQ_SESSION_TYPES.raw: {
'prepare': self.prepare_raw,
@ -116,11 +118,11 @@ class ARQDataTypeHandler:
def handle_p2pmsg_lzma(self, data):
decompressed_data = lzma.decompress(data)
self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
message_received(self.event_manager, decompressed_data)
message_received(self.event_manager, self.state_manager, decompressed_data)
return decompressed_data
def failed_p2pmsg_lzma(self, data):
decompressed_data = lzma.decompress(data)
self.log(f"Handling failed LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
message_failed(self.event_manager, decompressed_data)
message_failed(self.event_manager, self.state_manager, decompressed_data)
return decompressed_data

View file

@ -35,7 +35,7 @@ class ARQSession():
self.event_manager: EventManager = modem.event_manager
self.states = modem.states
self.states.set("is_modem_busy", True)
self.states.setARQ(True)
self.snr = []
@ -49,7 +49,7 @@ class ARQSession():
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
self.event_frame_received = threading.Event()
self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager)
self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.states)
self.id = None
self.session_started = time.time()
self.session_ended = 0
@ -98,7 +98,7 @@ class ARQSession():
if isinstance(received_data, bytearray) and isinstance(type_byte, int):
self.arq_data_type_handler.dispatch(type_byte, received_data)
self.states.set("is_modem_busy", False)
self.states.setARQ(False)
return
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")

View file

@ -21,7 +21,7 @@ class ISS_State(Enum):
class ARQSessionISS(arq_session.ARQSession):
RETRIES_CONNECT = 10
RETRIES_CONNECT = 1
# DJ2LS: 3 seconds seems to be too small for radios with a too slow PTT toggle time
# DJ2LS: 3.5 seconds is working well WITHOUT a channel busy detection delay
@ -169,7 +169,7 @@ class ARQSessionISS(arq_session.ARQSession):
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics())
self.state_manager.remove_arq_iss_session(self.id)
self.states.set("is_modem_busy", False)
self.states.setARQ(False)
return None, None
def transmission_failed(self, irs_frame=None):
@ -178,7 +178,7 @@ class ARQSessionISS(arq_session.ARQSession):
self.set_state(ISS_State.FAILED)
self.log(f"Transmission failed!")
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,False, self.state.name, statistics=self.calculate_session_statistics())
self.states.set("is_modem_busy", False)
self.states.setARQ(False)
self.arq_data_type_handler.failed(self.type_byte, self.data)
@ -212,6 +212,6 @@ class ARQSessionISS(arq_session.ARQSession):
self.event_manager.send_arq_session_finished(
True, self.id, self.dxcall, False, self.state.name, statistics=self.calculate_session_statistics())
self.state_manager.remove_arq_iss_session(self.id)
self.states.set("is_modem_busy", False)
self.states.setARQ(False)
return None, None

View file

@ -1,41 +0,0 @@
import command_beacon
import sched
import time
import threading
class Beacon:
def __init__(self, config, states, event_manager, logger, modem):
self.config = config
self.states = states
self.event_manager = event_manager
self.log = logger
self.modem = modem
self.scheduler = sched.scheduler(time.time, time.sleep)
self.beacon_interval = self.config['MODEM']['beacon_interval']
self.beacon_enabled = False
self.event = threading.Event()
def start(self):
self.beacon_enabled = True
self.schedule_beacon()
def stop(self):
self.beacon_enabled = False
def schedule_beacon(self):
if self.beacon_enabled:
self.scheduler.enter(self.beacon_interval, 1, self.run_beacon)
threading.Thread(target=self.scheduler.run, daemon=True).start()
def run_beacon(self):
if self.beacon_enabled:
# Your beacon logic here
cmd = command_beacon.BeaconCommand(self.config, self.states, self.event_manager)
cmd.run(self.event_manager, self.modem)
self.schedule_beacon() # Reschedule the next beacon
def refresh(self):
# Interrupt and reschedule the beacon
self.scheduler = sched.scheduler(time.time, time.sleep)
self.schedule_beacon()

View file

@ -10,12 +10,17 @@ class TxCommand():
def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}):
self.config = config
self.logger = structlog.get_logger("Command")
self.logger = structlog.get_logger(type(self).__name__)
self.state_manager = state_manager
self.event_manager = event_manager
self.set_params_from_api(apiParams)
self.frame_factory = DataFrameFactory(config)
self.arq_data_type_handler = ARQDataTypeHandler(event_manager)
self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager)
def log(self, message, isWarning = False):
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
logger(msg)
def set_params_from_api(self, apiParams):
pass

View file

@ -12,20 +12,39 @@ class SendMessageCommand(TxCommand):
"""
def set_params_from_api(self, apiParams):
print(apiParams)
origin = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}"
self.message = MessageP2P.from_api_params(origin, apiParams)
print(self.message.id)
print(self.message.to_dict())
print("--------------------------------------- set params from api")
DatabaseManager(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued')
def transmit(self, modem):
if self.state_manager.getARQ():
self.log("Modem busy, waiting until ready...")
return
first_queued_message = DatabaseManager(self.event_manager).get_first_queued_message()
if not first_queued_message:
self.log("No queued message in database.")
self.state_manager.set_state("pending_messages", False)
return
self.log(f"Queued message found: {first_queued_message['id']}")
DatabaseManager(self.event_manager).update_message(first_queued_message["id"], update_data={'status': 'transmitting'})
message_dict = DatabaseManager(self.event_manager).get_message_by_id(first_queued_message["id"])
print(message_dict["id"])
message = MessageP2P.from_api_params(message_dict['origin'], message_dict)
print(message.id)
print("--------------------------------------- transmit")
# Convert JSON string to bytes (using UTF-8 encoding)
DatabaseManager(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='transmitting')
payload = self.message.to_payload().encode('utf-8')
payload = message.to_payload().encode('utf-8')
json_bytearray = bytearray(payload)
data, data_type = self.arq_data_type_handler.prepare(json_bytearray, ARQ_SESSION_TYPES.p2pmsg_lzma)
iss = ARQSessionISS(self.config,
modem,
self.message.destination,

View file

@ -17,30 +17,23 @@ import time
log = structlog.get_logger("explorer")
class explorer():
def __init__(self, app, config, states):
self.config = config
self.app = app
def __init__(self, modem_version, config_manager, states):
self.modem_version = modem_version
self.config_manager = config_manager
self.config = self.config_manager.read()
self.states = states
self.explorer_url = "https://api.freedata.app/explorer.php"
self.publish_interval = 120
self.scheduler = sched.scheduler(time.time, time.sleep)
self.schedule_thread = threading.Thread(target=self.run_scheduler)
self.schedule_thread.start()
def run_scheduler(self):
# Schedule the first execution of push
self.scheduler.enter(self.publish_interval, 1, self.push)
# Run the scheduler in a loop
self.scheduler.run()
def push(self):
self.config = self.config_manager.read()
frequency = 0 if self.states.radio_frequency is None else self.states.radio_frequency
band = "USB"
callsign = str(self.config['STATION']['mycall']) + "-" + str(self.config["STATION"]['myssid'])
gridsquare = str(self.config['STATION']['mygrid'])
version = str(self.app.MODEM_VERSION)
version = str(self.modem_version)
bandwidth = str(self.config['MODEM']['enable_low_bandwidth_mode'])
beacon = str(self.states.is_beacon_running)
strength = str(self.states.s_meter_strength)
@ -76,11 +69,3 @@ class explorer():
except Exception as e:
log.warning("[EXPLORER] connection lost")
# Reschedule the push method
self.scheduler.enter(self.publish_interval, 1, self.push)
def shutdown(self):
# If there are other cleanup tasks, include them here
if self.schedule_thread:
self.schedule_thread.join()

View file

@ -3,19 +3,23 @@ import api_validations
import base64
import json
from message_system_db_manager import DatabaseManager
#import command_message_send
def message_received(event_manager, data):
def message_received(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
received_message_obj = MessageP2P.from_payload(decompressed_json_string)
received_message_dict = MessageP2P.to_dict(received_message_obj)
DatabaseManager(event_manager).add_message(received_message_dict, direction='receive', status='received')
def message_failed(event_manager, data):
def message_failed(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
payload_message = json.loads(decompressed_json_string)
DatabaseManager(event_manager).update_message(payload_message["id"], update_data={'status' : 'failed'})
#payload_message = json.loads(decompressed_json_string)
#print(payload_message)
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
payload_message = MessageP2P.to_dict(payload_message_obj)
print(payload_message)
DatabaseManager(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'})
class MessageP2P:
def __init__(self, id: str, origin: str, destination: str, body: str, attachments: list) -> None:
@ -29,12 +33,12 @@ class MessageP2P:
@classmethod
def from_api_params(cls, origin: str, params: dict):
dxcall = params['dxcall']
if not api_validations.validate_freedata_callsign(dxcall):
dxcall = f"{dxcall}-0"
destination = params['destination']
if not api_validations.validate_freedata_callsign(destination):
destination = f"{destination}-0"
if not api_validations.validate_freedata_callsign(dxcall):
raise ValueError(f"Invalid dxcall given ({params['dxcall']})")
if not api_validations.validate_freedata_callsign(destination):
raise ValueError(f"Invalid destination given ({params['destination']})")
body = params['body']
if len(body) < 1:
@ -47,9 +51,12 @@ class MessageP2P:
attachments.append(cls.__decode_attachment__(a))
timestamp = datetime.datetime.now().isoformat()
msg_id = f"{origin}_{dxcall}_{timestamp}"
if 'id' not in params:
msg_id = f"{origin}_{destination}_{timestamp}"
else:
msg_id = params["id"]
return cls(msg_id, origin, dxcall, body, attachments)
return cls(msg_id, origin, destination, body, attachments)
@classmethod
def from_payload(cls, payload):

View file

@ -30,7 +30,8 @@ class DatabaseManager:
"received",
"failed",
"failed_checksum",
"aborted"
"aborted",
"queued"
]
# Add default statuses if they don't exist
@ -259,4 +260,29 @@ class DatabaseManager:
def get_attachments_by_message_id_json(self, message_id):
attachments = self.get_attachments_by_message_id(message_id)
return json.dumps(attachments)
return json.dumps(attachments)
def get_first_queued_message(self):
session = self.get_thread_scoped_session()
try:
# Find the status object for "queued"
queued_status = session.query(Status).filter_by(name='queued').first()
if queued_status:
# Query for the first (oldest) message with status "queued"
message = session.query(P2PMessage)\
.filter_by(status=queued_status)\
.order_by(P2PMessage.timestamp)\
.first()
if message:
self.log(f"Found queued message with ID {message.id}")
return message.to_dict()
else:
return None
else:
self.log("Queued status not found", isWarning=True)
return None
except Exception as e:
self.log(f"Error fetching the first queued message: {e}", isWarning=True)
return None
finally:
session.remove()

83
modem/schedule_manager.py Normal file
View file

@ -0,0 +1,83 @@
import sched
import time
import threading
import command_message_send
from message_system_db_manager import DatabaseManager
import explorer
import command_beacon
class ScheduleManager:
def __init__(self, modem_version, config_manager, state_manger, event_manager):
self.modem_version = modem_version
self.config_manager = config_manager
self.state_manager = state_manger
self.event_manager = event_manager
self.config = self.config_manager.read()
self.beacon_interval = self.config['MODEM']['beacon_interval']
self.scheduler = sched.scheduler(time.time, time.sleep)
self.events = {
'check_for_queued_messages': {'function': self.check_for_queued_messages, 'interval': 10},
'explorer_publishing': {'function': self.push_to_explorer, 'interval': 120},
'transmitting_beacon': {'function': self.transmit_beacon, 'interval': self.beacon_interval},
}
self.running = False # Flag to control the running state
self.scheduler_thread = None # Reference to the scheduler thread
self.modem = None
def schedule_event(self, event_function, interval):
"""Schedule an event and automatically reschedule it after execution."""
event_function() # Execute the event function
if self.running: # Only reschedule if still running
self.scheduler.enter(interval, 1, self.schedule_event, (event_function, interval))
def start(self, modem):
"""Start scheduling events and run the scheduler in a separate thread."""
# wait some time
threading.Event().wait(timeout=10)
# get actual modem istamce
self.modem = modem
self.running = True # Set the running flag to True
for event_info in self.events.values():
# Schedule each event for the first time
self.scheduler.enter(0, 1, self.schedule_event, (event_info['function'], event_info['interval']))
# Run the scheduler in a separate thread
self.scheduler_thread = threading.Thread(target=self.scheduler.run, daemon=True)
self.scheduler_thread.start()
def stop(self):
"""Stop scheduling new events and terminate the scheduler thread."""
self.running = False # Clear the running flag to stop scheduling new events
# Clear scheduled events to break the scheduler out of its waiting state
for event in list(self.scheduler.queue):
self.scheduler.cancel(event)
# Wait for the scheduler thread to finish
self.scheduler_thread.join()
def transmit_beacon(self):
if not self.state_manager.getARQ() and self.state_manager.is_beacon_running:
cmd = command_beacon.BeaconCommand(self.config, self.state_manager, self.event_manager)
cmd.run(self.event_manager, self.modem)
def push_to_explorer(self):
self.config = self.config_manager.read()
if self.config['STATION']['enable_explorer']:
explorer.explorer(self.modem_version, self.config_manager, self.state_manager).push()
def check_for_queued_messages(self):
if not self.state_manager.getARQ():
if DatabaseManager(self.event_manager).get_first_queued_message():
params = DatabaseManager(self.event_manager).get_first_queued_message()
command = command_message_send.SendMessageCommand(self.config_manager.read(), self.state_manager, self.event_manager, params)
command.transmit(self.modem)
return

View file

@ -19,7 +19,7 @@ import command_arq_raw
import command_message_send
import event_manager
from message_system_db_manager import DatabaseManager
from schedule_manager import ScheduleManager
app = Flask(__name__)
CORS(app)
@ -73,6 +73,7 @@ def enqueue_tx_command(cmd_class, params = {}):
if command.run(app.modem_events, app.service_manager.modem): # TODO remove the app.modem_event custom queue
return True
return False
## REST API
@app.route('/', methods=['GET'])
def index():
@ -134,10 +135,8 @@ def post_beacon():
if not app.state_manager.is_beacon_running:
app.state_manager.set('is_beacon_running', request.json['enabled'])
app.modem_service.put("start_beacon")
else:
app.state_manager.set('is_beacon_running', request.json['enabled'])
app.modem_service.put("stop_beacon")
return api_response(request.json)
@ -263,7 +262,7 @@ def handle_freedata_message(message_id):
def get_message_attachments(message_id):
attachments = DatabaseManager(app.event_manager).get_attachments_by_message_id_json(message_id)
return api_response(attachments)
# @app.route('/modem/arq_connect', methods=['POST'])
# @app.route('/modem/arq_disconnect', methods=['POST'])
# @app.route('/modem/send_raw', methods=['POST'])
@ -309,11 +308,13 @@ if __name__ == "__main__":
app.event_manager = event_manager.EventManager([app.modem_events]) # TODO remove the app.modem_event custom queue
# init state manager
app.state_manager = state_manager.StateManager(app.state_queue)
# initialize message system schedule manager
app.schedule_manager = ScheduleManager(app.MODEM_VERSION, app.config_manager, app.state_manager, app.event_manager)
# start service manager
app.service_manager = service_manager.SM(app)
# start modem service
app.modem_service.put("start")
# initialize databse default values
# initialize database default values
DatabaseManager(app.event_manager).initialize_default_values()
wsm.startThreads(app)
app.run()

View file

@ -3,9 +3,7 @@ import frame_dispatcher
import modem
import structlog
import audio
import ujson as json
import explorer
import beacon
import radio_manager
@ -14,14 +12,13 @@ class SM:
self.log = structlog.get_logger("service")
self.app = app
self.modem = False
self.beacon = False
self.explorer = False
self.app.radio_manager = False
self.config = self.app.config_manager.read()
self.modem_fft = app.modem_fft
self.modem_service = app.modem_service
self.state_manager = app.state_manager
self.event_manager = app.event_manager
self.schedule_manager = app.schedule_manager
runner_thread = threading.Thread(
@ -33,23 +30,18 @@ class SM:
while True:
cmd = self.modem_service.get()
if cmd in ['start'] and not self.modem:
self.log.info("------------------ FreeDATA ------------------")
self.log.info("------------------ MODEM ------------------")
self.config = self.app.config_manager.read()
self.start_radio_manager()
self.start_modem()
self.start_explorer_publishing()
elif cmd in ['stop'] and self.modem:
self.stop_modem()
self.stop_explorer_publishing()
self.stop_radio_manager()
# we need to wait a bit for avoiding a portaudio crash
threading.Event().wait(0.5)
elif cmd in ['restart']:
self.stop_modem()
self.stop_explorer_publishing()
self.stop_radio_manager()
# we need to wait a bit for avoiding a portaudio crash
threading.Event().wait(0.5)
@ -59,24 +51,22 @@ class SM:
if self.start_modem():
self.event_manager.modem_restarted()
self.start_explorer_publishing()
elif cmd in ['start_beacon']:
self.start_beacon()
elif cmd in ['stop_beacon']:
self.stop_beacon()
else:
self.log.warning("[SVC] modem command processing failed", cmd=cmd, state=self.state_manager.is_modem_running)
def start_modem(self):
if self.config['STATION']['mycall'] in ['XX1XXX']:
self.log.warning("wrong callsign in config! interrupting startup")
return False
if self.state_manager.is_modem_running:
self.log.warning("modem already running")
return False
# test audio devices
audio_test = self.test_audio()
@ -98,6 +88,7 @@ class SM:
self.event_manager.modem_started()
self.state_manager.set("is_modem_running", True)
self.modem.start_modem()
self.schedule_manager.start(self.modem)
return True
@ -106,6 +97,7 @@ class SM:
del self.modem
self.modem = False
self.state_manager.set("is_modem_running", False)
self.schedule_manager.stop()
self.event_manager.modem_stopped()
def test_audio(self):
@ -119,28 +111,6 @@ class SM:
self.log.error("Error testing audio devices", e=e)
return [False, False]
def start_beacon(self):
self.beacon = beacon.Beacon(self.config, self.state_manager, self.event_manager, self.log, self.modem)
self.beacon.start()
def stop_beacon(self):
self.beacon.stop()
def start_explorer_publishing(self):
try:
# optionally start explorer module
if self.config['STATION']['enable_explorer']:
self.explorer = explorer.explorer(self.app, self.config, self.state_manager)
except Exception as e:
self.log.warning("[EXPLORER] Publishing not started because of error", e=e)
def stop_explorer_publishing(self):
if self.config['STATION']['enable_explorer']:
try:
del self.explorer
except Exception as e:
self.log.info("[EXPLORER] Error while stopping...", e=e)
def start_radio_manager(self):
self.app.radio_manager = radio_manager.RadioManager(self.config, self.state_manager, self.event_manager)

View file

@ -19,7 +19,10 @@ class StateManager:
self.channel_busy_condition_codec2 = threading.Event()
self.is_modem_running = False
self.is_modem_busy = False
self.is_modem_busy = threading.Event()
self.setARQ(False)
self.is_beacon_running = False
# If true, any wait() call is blocking
@ -30,13 +33,13 @@ class StateManager:
self.dxcallsign: bytes = b"ZZ9YY-0"
self.dxgrid: bytes = b"------"
self.heard_stations = [] # TODO remove it... heard stations list == deprecated
self.heard_stations = []
self.activities_list = {}
self.arq_iss_sessions = {}
self.arq_irs_sessions = {}
self.mesh_routing_table = []
#self.mesh_routing_table = []
self.radio_frequency = 0
self.radio_mode = None
@ -46,6 +49,11 @@ class StateManager:
# Set rig control status regardless or rig control method
self.radio_status = False
# message system related states
self.pending_messages = False
def sendState (self):
currentState = self.get_state_event(False)
self.statequeue.put(currentState)
@ -87,7 +95,7 @@ class StateManager:
"channel_busy_slot": self.channel_busy_slot,
"audio_dbfs": self.audio_dbfs,
"activities": self.activities_list,
"is_modem_busy" : self.is_modem_busy
"is_modem_busy" : self.getARQ()
}
# .wait() blocks until the event is set
@ -101,6 +109,15 @@ class StateManager:
else:
self.transmitting_event.set()
def setARQ(self, busy):
if busy:
self.is_modem_busy.clear()
else:
self.is_modem_busy.set()
def getARQ(self):
return not self.is_modem_busy.is_set()
def waitForTransmission(self):
self.transmitting_event.wait()
@ -202,4 +219,4 @@ class StateManager:
"radio_mode": self.radio_mode,
"radio_rf_level": self.radio_rf_level,
"s_meter_strength": self.s_meter_strength,
}
}