saving thoughts regarding to processing arq commands...

This commit is contained in:
DJ2LS 2023-12-03 14:11:43 +01:00
parent e7f8dc73e0
commit b29c539d5f
10 changed files with 209 additions and 125 deletions

View file

@ -23,7 +23,7 @@ export function connectionFailed(endpoint, event) {
}
export function stateDispatcher(data) {
data = JSON.parse(data);
//console.log(data);
console.log(data);
stateStore.modem_connection = "connected";
@ -43,6 +43,8 @@ export function stateDispatcher(data) {
stateStore.radio_status = data["radio_status"];
stateStore.frequency = data["radio_frequency"];
stateStore.mode = data["radio_mode"];
stateStore.heard_stations = data["heard_stations"];
/*
self.is_arq_state = False
self.is_arq_session = False

View file

@ -4,9 +4,11 @@ from codec2 import FREEDV_MODE
class TxCommand():
def __init__(self, config, logger, apiParams = {}):
def __init__(self, config, logger, state_manager, modem_events, apiParams = {}):
self.config = config
self.logger = logger
self.state_manager = state_manager
self.modem_events = modem_events
self.set_params_from_api(apiParams)
self.frame_factory = DataFrameFactory(config)

36
modem/command_arq_raw.py Normal file
View file

@ -0,0 +1,36 @@
from command import TxCommand
import api_validations
from protocol_arq_iss import ISS
from protocol_arq import ARQ
class ARQRawCommand(TxCommand):
def __int__(self, state_manager):
# open a new arq instance here
self.initialize_arq_instance()
def set_params_from_api(self, apiParams):
self.dxcall = apiParams['dxcall']
if not api_validations.validate_freedata_callsign(self.dxcall):
self.dxcall = f"{self.dxcall}-0"
return super().set_params_from_api(apiParams)
def initialize_arq_transmission_iss(self, data):
if id := self.get_id_from_frame(data):
instance = self.initialize_arq_instance()
self.states.register_arq_instance_by_id(id, instance)
instance['arq_irs'].arq_received_data_channel_opener()
def initialize_arq_instance(self):
self.arq = ARQ(self.config, self.event_queue, self.state_manager)
self.arq_iss = ISS(self.config, self.event_queue, self.state_manager)
return {
'arq': self.arq,
'arq_irs': self.arq_irs,
'arq_iss': self.arq_iss,
'arq_session': self.arq_session
}
def build_frame(self):
return self.frame_factory.build_arq_connect(destination=self.dxcall, session_id=b'', isWideband=True)

View file

@ -9,7 +9,7 @@ import structlog
import event_manager
import command_qrv
from data_handler import DATA
from deprecated_data_handler import DATA
TESTMODE = False

View file

@ -5,7 +5,7 @@ import time
import modem
import base64
import ujson as json
from data_handler import DATA
from deprecated_data_handler import DATA
class DATABROADCAST(DATA):
"""Terminal Node Controller for FreeDATA"""

View file

@ -4,7 +4,7 @@ from codec2 import FREEDV_MODE
import helpers
import uuid
import structlog
from data_handler import DATA
from deprecated_data_handler import DATA
class PING(DATA):
def __init__(self, config, event_queue, states):
super().__init__(config, event_queue, states)

View file

@ -10,9 +10,9 @@ import event_manager
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT, MODEM_TRANSMIT_QUEUE
from data_frame_factory import DataFrameFactory
from data_handler_broadcasts import BROADCAST
from data_handler_data_broadcasts import DATABROADCAST
from data_handler_ping import PING
#from deprecated_data_handler_broadcasts import BROADCAST
#from deprecated_data_handler_data_broadcasts import DATABROADCAST
#from deprecated_data_handler_ping import PING
from protocol_arq_iss import ISS
from protocol_arq_irs import IRS
@ -58,7 +58,7 @@ class DISPATCHER():
self.states = states
self._initialize_handlers(config, event_queue, states)
self._initialize_dispatchers()
#self._initialize_dispatchers()
self.data_queue_transmit = DATA_QUEUE_TRANSMIT
self.data_queue_received = data_q_rx
@ -70,9 +70,9 @@ class DISPATCHER():
self.frame_factory = DataFrameFactory(config)
self.broadcasts = BROADCAST(config, event_queue, states)
self.data_broadcasts = DATABROADCAST(config, event_queue, states)
self.ping = PING(config, event_queue, states)
#self.broadcasts = BROADCAST(config, event_queue, states)
#self.data_broadcasts = DATABROADCAST(config, event_queue, states)
#self.ping = PING(config, event_queue, states)
self.arq = ARQ(config, event_queue, states)
self.arq_irs = IRS(config, event_queue, states)
@ -81,6 +81,159 @@ class DISPATCHER():
self.event_manager = event_manager.EventManager([event_queue])
def start(self):
"""Starts worker threads for transmit and receive operations."""
threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start()
threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start()
def worker_transmit(self) -> None:
"""Dispatch incoming UI instructions for transmitting operations"""
while True:
command = self.data_queue_transmit.get()
command.run(self.event_queue, MODEM_TRANSMIT_QUEUE)
def worker_receive(self) -> None:
"""Queue received data for processing"""
while True:
data = self.data_queue_received.get()
self.new_process_data(
data['payload'],
data['freedv'],
data['bytes_per_frame'],
data['snr'],
data['frequency_offset'],
)
def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr, offset) -> None:
# get frame as dictionary
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
frametype = deconstructed_frame["frame_type_int"]
if frametype not in self.FRAME_HANDLER:
self.log.warning(
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
return
# instantiate handler
handler_class = self.FRAME_HANDLER[frametype]['class']
handler = handler_class(self.FRAME_HANDLER[frametype]['name'],
self.config,
self.states,
self.event_manager,
MODEM_TRANSMIT_QUEUE,
self.arq_sessions)
handler.handle(deconstructed_frame, snr, offset, freedv, bytes_per_frame)
def get_id_from_frame(self, data):
if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]:
return data[13:14]
return None
def initialize_arq_instance(self):
self.arq = ARQ(self.config, self.event_queue, self.states)
self.arq_irs = IRS(self.config, self.event_queue, self.states)
self.arq_iss = ISS(self.config, self.event_queue, self.states)
self.arq_session = SESSION(self.config, self.event_queue, self.states)
return {
'arq': self.arq,
'arq_irs': self.arq_irs,
'arq_iss': self.arq_iss,
'arq_session': self.arq_session
}
def initialize_arq_transmission_irs(self, data):
if id := self.get_id_from_frame(data):
instance = self.initialize_arq_instance()
self.states.register_arq_instance_by_id(id, instance)
instance['arq_irs'].arq_received_data_channel_opener()
def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None:
"""
Process incoming data and decide what to do with the frame.
Args:
bytes_out:
freedv:
bytes_per_frame:
snr:
Returns:
"""
# get frame as dictionary
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
frametype = deconstructed_frame["frame_type_int"]
if frametype not in self.rx_dispatcher:
self.log.warning(
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
return
# Process frametypes requiring a different set of arguments.
if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
self.arq_irs.arq_data_received(
deconstructed_frame, bytes_per_frame, snr, freedv
)
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
# if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst:
# self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
# self.c_lib.freedv_set_sync(freedv, 0)
return
# TESTFRAMES
if frametype == FR_TYPE.TEST_FRAME.value:
self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame)
return
# Process frames "known" by rx_dispatcher
# self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....")
self.rx_dispatcher[frametype][0](deconstructed_frame, snr)
def _initialize_dispatchers(self):
"""Initializes dispatchers for handling different frame types."""
# Dictionary of functions and log messages used in process_data
@ -129,114 +282,3 @@ class DISPATCHER():
FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"),
FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"),
}
def start(self):
"""Starts worker threads for transmit and receive operations."""
threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start()
threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start()
def worker_transmit(self) -> None:
"""Dispatch incoming UI instructions for transmitting operations"""
while True:
command = self.data_queue_transmit.get()
command.run(self.event_queue, MODEM_TRANSMIT_QUEUE)
def worker_receive(self) -> None:
"""Queue received data for processing"""
while True:
data = self.data_queue_received.get()
self.new_process_data(
data['payload'],
data['freedv'],
data['bytes_per_frame'],
data['snr'],
data['frequency_offset'],
)
def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr, offset) -> None:
# get frame as dictionary
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
frametype = deconstructed_frame["frame_type_int"]
if frametype not in self.FRAME_HANDLER:
self.log.warning(
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
return
# instantiate handler
handler_class = self.FRAME_HANDLER[frametype]['class']
handler = handler_class(self.FRAME_HANDLER[frametype]['name'],
self.config,
self.states,
self.event_manager,
MODEM_TRANSMIT_QUEUE,
self.arq_sessions)
handler.handle(deconstructed_frame, snr, offset, freedv, bytes_per_frame)
def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None:
"""
Process incoming data and decide what to do with the frame.
Args:
bytes_out:
freedv:
bytes_per_frame:
snr:
Returns:
"""
# get frame as dictionary
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
frametype = deconstructed_frame["frame_type_int"]
if frametype not in self.rx_dispatcher:
self.log.warning(
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
return
# Process frametypes requiring a different set of arguments.
if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
self.arq_irs.arq_data_received(
deconstructed_frame, bytes_per_frame, snr, freedv
)
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
# if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst:
# self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
# self.c_lib.freedv_set_sync(freedv, 0)
return
# TESTFRAMES
if frametype == FR_TYPE.TEST_FRAME.value:
self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame)
return
# Process frames "known" by rx_dispatcher
# self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....")
self.rx_dispatcher[frametype][0](deconstructed_frame, snr)
def get_id_from_frame(self, data):
if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]:
return data[13:14]
return None
def initialize_arq_instance(self):
self.arq = ARQ(self.config, self.event_queue, self.states)
self.arq_irs = IRS(self.config, self.event_queue, self.states)
self.arq_iss = ISS(self.config, self.event_queue, self.states)
self.arq_session = SESSION(self.config, self.event_queue, self.states)
return {
'arq': self.arq,
'arq_irs': self.arq_irs,
'arq_iss': self.arq_iss,
'arq_session': self.arq_session
}
def initialize_arq_transmission_irs(self, data):
if id := self.get_id_from_frame(data):
instance = self.initialize_arq_instance()
self.states.register_arq_instance_by_id(id, instance)
instance['arq_irs'].arq_received_data_channel_opener()

View file

@ -11,7 +11,6 @@ from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS
from modem_frametypes import FRAME_TYPE as FR_TYPE
import event_manager
from data_handler import DATA
TESTMODE = False
class ARQ:
def __init__(self, config, event_queue, states):

View file

@ -16,6 +16,8 @@ import command_cq
import command_ping
import command_feq
import command_test
import command_arq_raw
from queues import DATA_QUEUE_TRANSMIT as tx_cmd_queue
app = Flask(__name__)
@ -82,7 +84,7 @@ def validate(req, param, validator, isRequired = True):
# Takes a transmit command and puts it in the transmit command queue
def enqueue_tx_command(cmd_class, params = {}):
command = cmd_class(app.config_manager.read(), app.logger, params)
command = cmd_class(app.config_manager.read(), app.logger, app.state_manager, app.modem_events, params)
tx_cmd_queue.put(command)
app.logger.info(f"Command {command.get_name()} enqueued.")
@ -210,6 +212,7 @@ def post_modem_send_raw():
return api_response({"info": "endpoint for SENDING RAW DATA via POST"})
if not app.state_manager.is_modem_running:
api_abort('Modem not running', 503)
enqueue_tx_command(command_arq_raw.ARQRawCommand, request.json)
# server_commands.modem_arq_send_raw(request.json)
return "Not implemented yet"