mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
267 lines
10 KiB
Python
267 lines
10 KiB
Python
"""
|
|
FRAME DISPATCHER - We are dispatching the received frames to the needed functions
|
|
|
|
|
|
"""
|
|
import threading
|
|
import structlog
|
|
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
|
import event_manager
|
|
from data_frame_factory import DataFrameFactory
|
|
|
|
#from deprecated_data_handler_broadcasts import BROADCAST
|
|
#from deprecated_data_handler_data_broadcasts import DATABROADCAST
|
|
#from deprecated_data_handler_ping import PING
|
|
|
|
from deprecated_protocol_arq_session_iss import ISS
|
|
from deprecated_protocol_arq_session_irs import IRS
|
|
from deprecated_protocol_arq_session import ARQ
|
|
from deprecated_protocol_arq_connection import SESSION
|
|
|
|
from frame_handler import FrameHandler
|
|
from frame_handler_ping import PingFrameHandler
|
|
from frame_handler_cq import CQFrameHandler
|
|
from frame_handler_arq_session import ARQFrameHandler
|
|
|
|
class DISPATCHER():
|
|
|
|
FRAME_HANDLER = {
|
|
FR_TYPE.ARQ_SESSION_OPEN_ACK.value: {"class": ARQFrameHandler, "name": "ARQ OPEN ACK"},
|
|
FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"},
|
|
FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"},
|
|
FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"},
|
|
FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"},
|
|
FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"},
|
|
FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"},
|
|
FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP TX"},
|
|
FR_TYPE.BEACON.value: {"class": FrameHandler, "name": "BEACON"},
|
|
FR_TYPE.BURST_FRAME.value:{"class": ARQFrameHandler, "name": "BURST FRAME"},
|
|
FR_TYPE.BURST_ACK.value: {"class": ARQFrameHandler, "name": "BURST ACK"},
|
|
FR_TYPE.BURST_NACK.value: {"class": ARQFrameHandler, "name": "BURST NACK"},
|
|
FR_TYPE.CQ.value: {"class": CQFrameHandler, "name": "CQ"},
|
|
FR_TYPE.FR_ACK.value: {"class": FrameHandler, "name": "FRAME ACK"},
|
|
FR_TYPE.FR_NACK.value: {"class": FrameHandler, "name": "FRAME NACK"},
|
|
FR_TYPE.FR_REPEAT.value: {"class": FrameHandler, "name": "REPEAT REQUEST"},
|
|
FR_TYPE.PING_ACK.value: {"class": FrameHandler, "name": "PING ACK"},
|
|
FR_TYPE.PING.value: {"class": PingFrameHandler, "name": "PING"},
|
|
FR_TYPE.QRV.value: {"class": FrameHandler, "name": "QRV"},
|
|
FR_TYPE.IS_WRITING.value: {"class": FrameHandler, "name": "IS_WRITING"},
|
|
FR_TYPE.FEC.value: {"class": FrameHandler, "name": "FEC"},
|
|
FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"},
|
|
}
|
|
|
|
def __init__(self, config, event_queue, states, data_q_rx, modem_tx_q):
|
|
self.log = structlog.get_logger("frame_dispatcher")
|
|
|
|
self.log.info("loading frame dispatcher.....\n")
|
|
self.config = config
|
|
self.event_queue = event_queue
|
|
self.states = states
|
|
|
|
self._initialize_handlers(config, event_queue, states)
|
|
#self._initialize_dispatchers()
|
|
|
|
self.data_queue_received = data_q_rx
|
|
self.modem_transmit_queue = modem_tx_q
|
|
|
|
self.arq_sessions = []
|
|
|
|
def _initialize_handlers(self, config, event_queue, states):
|
|
"""Initializes various data handlers."""
|
|
|
|
self.frame_factory = DataFrameFactory(config)
|
|
|
|
#self.broadcasts = BROADCAST(config, event_queue, states)
|
|
#self.data_broadcasts = DATABROADCAST(config, event_queue, states)
|
|
#self.ping = PING(config, event_queue, states)
|
|
|
|
self.arq = ARQ(config, event_queue, states)
|
|
self.arq_irs = IRS(config, event_queue, states)
|
|
self.arq_iss = ISS(config, event_queue, states)
|
|
self.arq_session = SESSION(config, event_queue, states)
|
|
|
|
self.event_manager = event_manager.EventManager([event_queue])
|
|
|
|
def start(self):
|
|
"""Starts worker threads for transmit and receive operations."""
|
|
threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start()
|
|
|
|
def worker_receive(self) -> None:
|
|
"""Queue received data for processing"""
|
|
while True:
|
|
data = self.data_queue_received.get()
|
|
self.new_process_data(
|
|
data['payload'],
|
|
data['freedv'],
|
|
data['bytes_per_frame'],
|
|
data['snr'],
|
|
data['frequency_offset'],
|
|
)
|
|
|
|
def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr, frequency_offset) -> None:
|
|
# get frame as dictionary
|
|
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
|
|
frametype = deconstructed_frame["frame_type_int"]
|
|
|
|
if frametype not in self.FRAME_HANDLER:
|
|
self.log.warning(
|
|
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
|
|
return
|
|
|
|
# instantiate handler
|
|
handler_class = self.FRAME_HANDLER[frametype]['class']
|
|
handler = handler_class(self.FRAME_HANDLER[frametype]['name'],
|
|
self.config,
|
|
self.states,
|
|
self.event_manager,
|
|
self.modem_transmit_queue)
|
|
|
|
handler.handle(deconstructed_frame, snr, frequency_offset, freedv, bytes_per_frame)
|
|
|
|
|
|
|
|
def get_id_from_frame(self, data):
|
|
if data[:1] == FR_TYPE.ARQ_SESSION_OPEN:
|
|
return data[13:14]
|
|
return None
|
|
|
|
def initialize_arq_instance(self):
|
|
self.arq = ARQ(self.config, self.event_queue, self.states)
|
|
self.arq_irs = IRS(self.config, self.event_queue, self.states)
|
|
self.arq_iss = ISS(self.config, self.event_queue, self.states)
|
|
self.arq_session = SESSION(self.config, self.event_queue, self.states)
|
|
|
|
return {
|
|
'arq': self.arq,
|
|
'arq_irs': self.arq_irs,
|
|
'arq_iss': self.arq_iss,
|
|
'arq_session': self.arq_session
|
|
}
|
|
|
|
def initialize_arq_transmission_irs(self, data):
|
|
if id := self.get_id_from_frame(data):
|
|
instance = self.initialize_arq_instance()
|
|
self.states.register_arq_instance_by_id(id, instance)
|
|
instance['arq_irs'].arq_received_data_channel_opener()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None:
|
|
"""
|
|
Process incoming data and decide what to do with the frame.
|
|
|
|
Args:
|
|
bytes_out:
|
|
freedv:
|
|
bytes_per_frame:
|
|
snr:
|
|
|
|
Returns:
|
|
|
|
"""
|
|
# get frame as dictionary
|
|
deconstructed_frame = self.frame_factory.deconstruct(bytes_out)
|
|
frametype = deconstructed_frame["frame_type_int"]
|
|
|
|
if frametype not in self.rx_dispatcher:
|
|
self.log.warning(
|
|
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
|
|
return
|
|
|
|
# Process frametypes requiring a different set of arguments.
|
|
if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
|
|
self.arq_irs.arq_data_received(
|
|
deconstructed_frame, bytes_per_frame, snr, freedv
|
|
)
|
|
|
|
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
|
|
# if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst:
|
|
# self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
|
|
# self.c_lib.freedv_set_sync(freedv, 0)
|
|
return
|
|
|
|
# TESTFRAMES
|
|
if frametype == FR_TYPE.TEST_FRAME.value:
|
|
self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame)
|
|
return
|
|
|
|
# Process frames "known" by rx_dispatcher
|
|
# self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....")
|
|
self.rx_dispatcher[frametype][0](deconstructed_frame, snr)
|
|
|
|
def _initialize_dispatchers(self):
|
|
"""Initializes dispatchers for handling different frame types."""
|
|
# Dictionary of functions and log messages used in process_data
|
|
# instead of a long series of if-elif-else statements.
|
|
self.rx_dispatcher = {
|
|
FR_TYPE.ARQ_SESSION_OPEN_ACK.value: (
|
|
self.arq_iss.arq_received_channel_is_open,
|
|
"ARQ OPEN ACK",
|
|
),
|
|
FR_TYPE.ARQ_SESSION_OPEN.value: (
|
|
self.initialize_arq_transmission_irs,
|
|
"ARQ Data Channel Open",
|
|
),
|
|
FR_TYPE.ARQ_CONNECTION_CLOSE.value: (
|
|
self.arq_session.received_session_close,
|
|
"ARQ CLOSE SESSION",
|
|
),
|
|
FR_TYPE.ARQ_CONNECTION_HB.value: (
|
|
self.arq_session.received_session_heartbeat,
|
|
"ARQ HEARTBEAT",
|
|
),
|
|
FR_TYPE.ARQ_CONNECTION_OPEN.value: (
|
|
self.arq_session.received_session_opener,
|
|
"ARQ OPEN SESSION",
|
|
),
|
|
FR_TYPE.ARQ_STOP.value: (self.arq.received_stop_transmission, "ARQ STOP TX"),
|
|
FR_TYPE.BEACON.value: (self.broadcasts.received_beacon, "BEACON"),
|
|
FR_TYPE.BURST_ACK.value: (self.arq_iss.burst_ack_nack_received, "BURST ACK"),
|
|
FR_TYPE.BURST_NACK.value: (self.arq_iss.burst_ack_nack_received, "BURST NACK"),
|
|
FR_TYPE.CQ.value: (self.broadcasts.received_cq, "CQ"),
|
|
FR_TYPE.FR_ACK.value: (self.arq_iss.frame_ack_received, "FRAME ACK"),
|
|
FR_TYPE.FR_NACK.value: (self.arq_iss.frame_nack_received, "FRAME NACK"),
|
|
FR_TYPE.FR_REPEAT.value: (self.arq_iss.burst_rpt_received, "REPEAT REQUEST"),
|
|
FR_TYPE.PING_ACK.value: (self.ping.received_ping_ack, "PING ACK"),
|
|
FR_TYPE.PING.value: (self.ping.received_ping, "PING"),
|
|
FR_TYPE.QRV.value: (self.broadcasts.received_qrv, "QRV"),
|
|
FR_TYPE.IS_WRITING.value: (self.broadcasts.received_is_writing, "IS_WRITING"),
|
|
FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"),
|
|
FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"),
|
|
}
|