using event manager instead event queue for most parts

This commit is contained in:
DJ2LS 2024-01-04 15:46:58 +01:00
parent 8967d2ac91
commit 85b81bb0dd
11 changed files with 160 additions and 70 deletions

View file

@ -28,8 +28,8 @@ export function stateDispatcher(data) {
stateStore.modem_connection = "connected";
if (
data["freedata-message"] == "state-change" ||
data["freedata-message"] == "state"
data["type"] == "state-change" ||
data["type"] == "state"
) {
stateStore.channel_busy = data["channel_busy"];
stateStore.is_codec2_traffic = data["is_codec2_traffic"];
@ -138,41 +138,7 @@ export function stateDispatcher(data) {
*/
}
}
function build_HSL() {
//Use data from activities to build HSL list
for (let i = 0; i < stateStore.activities.length; i++) {
if (
stateStore.activities[i][1].direction != "received" ||
stateStore.activities[i][1].origin == undefined
) {
//Ignore stations without origin and not received type
//console.warn("HSL: Ignoring " + stateStore.activities[i][0]);
continue;
}
let found = false;
for (let ii = 0; ii < stateStore.heard_stations.length; ii++) {
if (
stateStore.heard_stations[ii].origin ==
stateStore.activities[i][1].origin
) {
//Station already in HSL, check if newer than one in HSL
found = true;
if (
stateStore.heard_stations[ii].timestamp <
stateStore.activities[i][1].timestamp
) {
//Update existing entry in HSL
stateStore.heard_stations[ii] = stateStore.activities[i][1];
}
}
}
if (found == false) {
//Station not in HSL, let us add it
stateStore.heard_stations.push(stateStore.activities[i][1]);
}
}
stateStore.heard_stations.sort((a, b) => b.timestamp - a.timestamp); // b - a for reverse sort
}
export function eventDispatcher(data) {
data = JSON.parse(data);
@ -185,11 +151,11 @@ export function eventDispatcher(data) {
console.info(data);
if (data["scatter"] !== undefined) {
//console.warn("Got scatter data!!!!");
stateStore.scatter = JSON.parse(data["scatter"]);
return;
}
switch (data["ptt"]) {
case true:
case false:
@ -199,6 +165,13 @@ export function eventDispatcher(data) {
return;
}
switch (data["type"]) {
case "hello-client":
console.log("hello client received")
return
}
switch (data["freedata"]) {
case "modem-message":
switch (data["received"]) {
@ -277,6 +250,70 @@ export function eventDispatcher(data) {
}
}
if (data['arq-transfer-outbound']) {
switch (data["arq-transfer-outbound"].state) {
case "OPEN_SENT":
console.log("state OPEN_ACK_SENT needs to be implemented")
return
case "INFO_SENT":
console.log("state INFO_ACK_SENT needs to be implemented")
return
case "BURST_SENT":
console.log("state BURST_REPLY_SENT needs to be implemented")
return
case "ABORTING":
console.log("state ABORTING needs to be implemented")
return
case "ABORTED":
console.log("state ABORTED needs to be implemented")
return
case "FAILED":
let message = "Transmission failed";
displayToast("danger", "bi-x-octagon", message, 5000);
return
}
}
if (data['arq-transfer-inbound']) {
switch (data["arq-transfer-inbound"].state) {
case "NEW":
console.log("state NEW needs to be implemented")
return
case "OPEN_ACK_SENT":
console.log("state OPEN_ACK_SENT needs to be implemented")
return
case "INFO_ACK_SENT":
console.log("state INFO_ACK_SENT needs to be implemented")
return
case "BURST_REPLY_SENT":
console.log("state BURST_REPLY_SENT needs to be implemented")
return
case "ENDED":
console.log("state ENDED needs to be implemented")
return
case "ABORTED":
console.log("state ABORTED needs to be implemented")
return
case "FAILED":
let message = "Transmission failed";
displayToast("danger", "bi-x-octagon", message, 5000);
return
}
}
/*
var message = "";
@ -493,3 +530,40 @@ export function eventDispatcher(data) {
}
*/
}
function build_HSL() {
//Use data from activities to build HSL list
for (let i = 0; i < stateStore.activities.length; i++) {
if (
stateStore.activities[i][1].direction != "received" ||
stateStore.activities[i][1].origin == undefined
) {
//Ignore stations without origin and not received type
//console.warn("HSL: Ignoring " + stateStore.activities[i][0]);
continue;
}
let found = false;
for (let ii = 0; ii < stateStore.heard_stations.length; ii++) {
if (
stateStore.heard_stations[ii].origin ==
stateStore.activities[i][1].origin
) {
//Station already in HSL, check if newer than one in HSL
found = true;
if (
stateStore.heard_stations[ii].timestamp <
stateStore.activities[i][1].timestamp
) {
//Update existing entry in HSL
stateStore.heard_stations[ii] = stateStore.activities[i][1];
}
}
}
if (found == false) {
//Station not in HSL, let us add it
stateStore.heard_stations.push(stateStore.activities[i][1]);
}
}
stateStore.heard_stations.sort((a, b) => b.timestamp - a.timestamp); // b - a for reverse sort
}

View file

@ -6,11 +6,11 @@ class Beacon:
BEACON_LOOP_INTERVAL = 1
def __init__(self, config, states, event_queue, logger, modem):
def __init__(self, config, states, event_manager, logger, modem):
self.modem_config = config
self.states = states
self.event_queue = event_queue
self.event_manager = event_manager
self.log = logger
self.modem = modem
@ -39,8 +39,8 @@ class Beacon:
True):
#not self.states.channel_busy):
cmd = command_beacon.BeaconCommand(self.modem_config, self.states, self.event_queue)
cmd.run(self.event_queue, self.modem)
cmd = command_beacon.BeaconCommand(self.modem_config, self.states, self.event_manager)
cmd.run(self.event_manager, self.modem)
self.event.wait(self.modem_config['MODEM']['beacon_interval'])
self.event.wait(self.BEACON_LOOP_INTERVAL)

View file

@ -6,11 +6,11 @@ from state_manager import StateManager
class TxCommand():
def __init__(self, config: dict, state_manager: StateManager, modem_events: queue.Queue, apiParams:dict = {}):
def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}):
self.config = config
self.logger = structlog.get_logger("Command")
self.state_manager = state_manager
self.modem_events = modem_events
self.event_manager = event_manager
self.set_params_from_api(apiParams)
self.frame_factory = DataFrameFactory(config)

View file

@ -54,7 +54,7 @@ class EventManager:
}
self.broadcast(event)
def send_arq_session_finished(self, outbound: bool, session_id, dxcall, total_bytes, success: bool, state, data=False):
def send_arq_session_finished(self, outbound: bool, session_id, dxcall, total_bytes, success: bool, state: bool, data=False):
if data:
data = base64.b64encode(data).decode("UTF-8")
direction = 'outbound' if outbound else 'inbound'
@ -63,9 +63,25 @@ class EventManager:
'session_id': session_id,
'dxcall': dxcall,
'total_bytes': total_bytes,
'success': success,
'success': bool(success),
'state': state,
'data': data
'data': bool(data)
}
}
self.broadcast(event)
def modem_started(self):
event = {"modem": "started"}
self.broadcast(event)
def modem_restarted(self):
event = {"modem": "restarted"}
self.broadcast(event)
def modem_stopped(self):
event = {"modem": "stopped"}
self.broadcast(event)
def modem_failed(self):
event = {"modem": "failed"}
self.broadcast(event)

View file

@ -38,26 +38,25 @@ class DISPATCHER():
FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"},
}
def __init__(self, config, event_queue, states, modem):
def __init__(self, config, event_manager, states, modem):
self.log = structlog.get_logger("frame_dispatcher")
self.log.info("loading frame dispatcher.....\n")
self.config = config
self.event_queue = event_queue
self.states = states
self.event_manager = event_manager
self._initialize_handlers(config, event_queue, states)
self._initialize_handlers(config, states)
self.modem = modem
self.data_queue_received = modem.data_queue_received
self.arq_sessions = []
def _initialize_handlers(self, config, event_queue, states):
def _initialize_handlers(self, config, states):
"""Initializes various data handlers."""
self.frame_factory = DataFrameFactory(config)
self.event_manager = event_manager.EventManager([event_queue])
def start(self):
"""Starts worker threads for transmit and receive operations."""

View file

@ -22,7 +22,6 @@ import tci
import cw
from queues import RIGCTLD_COMMAND_QUEUE
import audio
import event_manager
import demodulator
TESTMODE = False
@ -32,12 +31,11 @@ class RF:
log = structlog.get_logger("RF")
def __init__(self, config, event_queue, fft_queue, service_queue, states) -> None:
def __init__(self, config, event_manager, fft_queue, service_queue, states) -> None:
self.config = config
print(config)
self.service_queue = service_queue
self.states = states
self.event_manager = event_manager
self.sampler_avg = 0
self.buffer_avg = 0
@ -78,7 +76,6 @@ class RF:
self.modem_received_queue = queue.Queue()
self.audio_received_queue = queue.Queue()
self.data_queue_received = queue.Queue()
self.event_manager = event_manager.EventManager([event_queue])
self.fft_queue = fft_queue
self.demodulator = demodulator.Demodulator(self.config,

View file

@ -17,6 +17,7 @@ import command_ping
import command_feq
import command_test
import command_arq_raw
import event_manager
app = Flask(__name__)
CORS(app)
@ -49,6 +50,7 @@ app.state_queue = queue.Queue() # queue which holds latest states
app.modem_events = queue.Queue() # queue which holds latest events
app.modem_fft = queue.Queue() # queue which holds latest fft data
app.modem_service = queue.Queue() # start / stop modem service
app.event_manager = event_manager.EventManager([queue.Queue()])
# init state manager
app.state_manager = state_manager.StateManager(app.state_queue)

View file

@ -14,13 +14,14 @@ class SM:
self.modem = False
self.beacon = False
self.data_handler = False
self.app = app
self.config = self.app.config_manager.read()
self.modem_events = app.modem_events
self.modem_fft = app.modem_fft
self.modem_service = app.modem_service
self.states = app.state_manager
self.event_manager = app.event_manager
runner_thread = threading.Thread(
target=self.runner, name="runner thread", daemon=True
@ -49,7 +50,7 @@ class SM:
# we need to wait a bit for avoiding a portaudio crash
threading.Event().wait(0.5)
if self.start_modem():
self.modem_events.put(json.dumps({"freedata": "modem-event", "event": "restart"}))
self.event_manager.modem_restarted()
elif cmd in ['start_beacon']:
self.start_beacon()
@ -76,18 +77,19 @@ class SM:
if False in audio_test or None in audio_test or self.states.is_modem_running:
self.log.warning("starting modem failed", input_test=audio_test[0], output_test=audio_test[1])
self.states.set("is_modem_running", False)
self.modem_events.put({"freedata": "modem-event", "event": "failed"})
self.event_manager.modem_failed()
return False
self.log.info("starting modem....")
self.modem = modem.RF(self.config, self.modem_events, self.modem_fft, self.modem_service, self.states)
self.modem = modem.RF(self.config, self.event_manager, self.modem_fft, self.modem_service, self.states)
self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config,
self.modem_events,
self.event_manager,
self.states,
self.modem)
self.frame_dispatcher.start()
self.event_manager.modem_started()
self.states.set("is_modem_running", True)
self.modem.start_modem()
@ -96,10 +98,9 @@ class SM:
def stop_modem(self):
self.log.info("stopping modem....")
del self.modem
#del self.data_handler
self.modem = False
#self.data_handler = False
self.states.set("is_modem_running", False)
self.event_manager.modem_stopped()
def test_audio(self):
audio_test = audio.test_audio_devices(self.config['AUDIO']['input_device'],
@ -110,7 +111,7 @@ class SM:
def start_beacon(self):
self.beacon = beacon.Beacon(self.config, self.states, self.modem_events, self.log, self.modem)
self.beacon = beacon.Beacon(self.config, self.states, self.event_manager, self.log, self.modem)
self.beacon.start()
def stop_beacon(self):

View file

@ -79,7 +79,7 @@ class StateManager:
msgtype = "state"
return {
"freedata-message": msgtype,
"type": msgtype,
"is_modem_running": self.is_modem_running,
"is_beacon_running": self.is_beacon_running,
"radio_status": self.radio_status,

View file

@ -8,7 +8,7 @@ fft_client_list = set()
states_client_list = set()
def handle_connection(sock, client_list, event_queue):
event_queue.put({"freedata-message": "hello-client"})
event_queue.put({"type": "hello-client"})
client_list.add(sock)
while True:
@ -32,7 +32,6 @@ def transmit_sock_data_worker(client_list, event_queue):
json_event = event
else:
json_event = json.dumps(event)
clients = client_list.copy()
for client in clients:
try:

View file

@ -58,19 +58,21 @@ class TestARQSession(unittest.TestCase):
# ISS
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_manager = EventManager([queue.Queue()])
cls.iss_event_queue = queue.Queue()
cls.iss_modem = TestModem(cls.iss_event_queue)
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_queue,
cls.iss_event_manager,
cls.iss_state_manager,
cls.iss_modem)
# IRS
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_manager = EventManager([queue.Queue()])
cls.irs_event_queue = queue.Queue()
cls.irs_modem = TestModem(cls.irs_event_queue)
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_queue,
cls.irs_event_manager,
cls.irs_state_manager,
cls.irs_modem)