diff --git a/modem/frame_dispatcher.py b/modem/frame_dispatcher.py index 85767d00..6fcd8001 100644 --- a/modem/frame_dispatcher.py +++ b/modem/frame_dispatcher.py @@ -20,9 +20,34 @@ from protocol_arq_irs import IRS from protocol_arq import ARQ from protocol_arq_session import SESSION +from frame_handler import FrameHandler class DISPATCHER(): + FRAME_HANDLER = { + FR_TYPE.ARQ_DC_OPEN_ACK_N.value: {"class": FrameHandler, "name": "ARQ OPEN ACK (Narrow)"}, + FR_TYPE.ARQ_DC_OPEN_ACK_W.value: {"class": FrameHandler, "name": "ARQ OPEN ACK (Wide)"}, + FR_TYPE.ARQ_DC_OPEN_N.value: {"class": FrameHandler, "name": "ARQ Data Channel Open (Narrow)"}, + FR_TYPE.ARQ_DC_OPEN_W.value: {"class": FrameHandler, "name": "ARQ Data Channel Open (Wide)"}, + FR_TYPE.ARQ_SESSION_CLOSE.value: {"class": FrameHandler, "name": "ARQ CLOSE SESSION"}, + FR_TYPE.ARQ_SESSION_HB.value: {"class": FrameHandler, "name": "ARQ HEARTBEAT"}, + FR_TYPE.ARQ_SESSION_OPEN.value: {"class": FrameHandler, "name": "ARQ OPEN SESSION"}, + FR_TYPE.ARQ_STOP.value: {"class": FrameHandler, "name": "ARQ STOP TX"}, + FR_TYPE.BEACON.value: {"class": FrameHandler, "name": "BEACON"}, + FR_TYPE.BURST_ACK.value: {"class": FrameHandler, "name": "BURST ACK"}, + FR_TYPE.BURST_NACK.value: {"class": FrameHandler, "name": "BURST NACK"}, + FR_TYPE.CQ.value: {"class": FrameHandler, "name": "CQ"}, + FR_TYPE.FR_ACK.value: {"class": FrameHandler, "name": "FRAME ACK"}, + FR_TYPE.FR_NACK.value: {"class": FrameHandler, "name": "FRAME NACK"}, + FR_TYPE.FR_REPEAT.value: {"class": FrameHandler, "name": "REPEAT REQUEST"}, + FR_TYPE.PING_ACK.value: {"class": FrameHandler, "name": "PING ACK"}, + FR_TYPE.PING.value: {"class": FrameHandler, "name": "PING"}, + FR_TYPE.QRV.value: {"class": FrameHandler, "name": "QRV"}, + FR_TYPE.IS_WRITING.value: {"class": FrameHandler, "name": "IS_WRITING"}, + FR_TYPE.FEC.value: {"class": FrameHandler, "name": "FEC"}, + FR_TYPE.FEC_WAKEUP.value: {"class": FrameHandler, "name": "FEC WAKEUP"}, + } + def __init__(self, config, event_queue, states, data_q_rx): self.log = structlog.get_logger("frame_dispatcher") @@ -37,6 +62,8 @@ class DISPATCHER(): self.data_queue_transmit = DATA_QUEUE_TRANSMIT self.data_queue_received = data_q_rx + self.arq_sessions = [] + def _initialize_handlers(self, config, event_queue, states): """Initializes various data handlers.""" @@ -101,7 +128,6 @@ class DISPATCHER(): FR_TYPE.IS_WRITING.value: (self.broadcasts.received_is_writing, "IS_WRITING"), FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"), FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"), - } def start(self): @@ -123,11 +149,34 @@ class DISPATCHER(): # [1] freedv instance # [2] bytes_per_frame # [3] snr - self.process_data( + self.old_process_data( + bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2], snr=data[3] + ) + self.new_process_data( bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2], snr=data[3] ) - def process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None: + def new_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None: + # get frame as dictionary + deconstructed_frame = self.frame_factory.deconstruct(bytes_out) + frametype = deconstructed_frame["frame_type_int"] + + if frametype not in self.FRAME_HANDLER: + self.log.warning( + "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + return + + # instantiate handler + handler_class = self.FRAME_HANDLER[frametype]['class'] + handler = handler_class(self.FRAME_HANDLER[frametype]['name'], + self.states, + self.event_manager, + MODEM_TRANSMIT_QUEUE, + self.arq_sessions) + + handler.handle(deconstructed_frame, snr, 0, freedv, bytes_per_frame) + + def old_process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None: """ Process incoming data and decide what to do with the frame. @@ -140,48 +189,35 @@ class DISPATCHER(): Returns: """ - try: - # get frame as dictionary - deconstructed_frame = self.frame_factory.deconstruct(bytes_out) + # get frame as dictionary + deconstructed_frame = self.frame_factory.deconstruct(bytes_out) + frametype = deconstructed_frame["frame_type_int"] - frametype = deconstructed_frame["frame_type_int"] - print(deconstructed_frame) - print(frametype) + if frametype not in self.rx_dispatcher: + self.log.warning( + "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name) + return + + # Process frametypes requiring a different set of arguments. + if FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: + self.arq_irs.arq_data_received( + deconstructed_frame, bytes_per_frame, snr, freedv + ) + # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync + # if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: + # self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") + # self.c_lib.freedv_set_sync(freedv, 0) + return - # Dispatch activity based on received frametype - if frametype in self.rx_dispatcher: - # Process frames "known" by rx_dispatcher - # self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....") - self.rx_dispatcher[frametype][0](deconstructed_frame, snr) + # TESTFRAMES + if frametype == FR_TYPE.TEST_FRAME.value: + self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame) + return - # Process frametypes requiring a different set of arguments. - elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value: - self.arq_irs.arq_data_received( - deconstructed_frame, bytes_per_frame, snr, freedv - ) - - # if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync - # if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst: - # self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}") - # self.c_lib.freedv_set_sync(freedv, 0) - - # TESTFRAMES - elif frametype == FR_TYPE.TEST_FRAME.value: - self.log.debug("[Modem] TESTFRAME RECEIVED", frame=deconstructed_frame) - - # Unknown frame type - else: - self.log.warning( - "[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name - ) - - except Exception as ex: - # for debugging purposes to receive all data - self.log.debug( - f"[Modem] Foreign frame received ({ex})", - frame = deconstructed_frame, - ) + # Process frames "known" by rx_dispatcher + # self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....") + self.rx_dispatcher[frametype][0](deconstructed_frame, snr) def get_id_from_frame(self, data): if data[:1] in [FR_TYPE.ARQ_DC_OPEN_N, FR_TYPE.ARQ_DC_OPEN_W]: diff --git a/modem/frame_handler.py b/modem/frame_handler.py new file mode 100644 index 00000000..c8588994 --- /dev/null +++ b/modem/frame_handler.py @@ -0,0 +1,56 @@ +import helpers +from event_manager import EventManager +from state_manager import StateManager +from queue import Queue +import structlog + +class FrameHandler(): + + def __init__(self, name: str, states: StateManager, event_manager: EventManager, + tx_frame_queue: Queue, + arq_sessions: list) -> None: + + self.name = name + self.states = states + self.event_manager = event_manager + self.tx_trame_queue = tx_frame_queue + self.arq_sessions = arq_sessions + self.logger = structlog.get_logger("Frame Handler") + + def add_to_heard_stations(self): + pass + + def make_event(self, frame): + return { + "freedata": "generic frame handler", + "frame": frame, + } + + def emit_event(self, frame): + event_data = self.make_event(frame) + self.event_manager.broadcast(event_data) + + def make_modem_queue_item(self, mode, repeat, repeat_delay, frame): + return { + 'mode': self.get_tx_mode(), + 'repeat': 1, + 'repeat_delay': 0, + 'frame': frame, + } + + def transmit(self, frame): + tx_queue_item = self.make_modem_queue_item(self.get_tx_mode(), 1, 0, frame) + self.tx_frame_queue.put(tx_queue_item) + + def follow_protocol(self): + pass + + def log(self, frame): + self.logger.info(f"[Frame Handler] Handling frame {frame}") + pass + + def handle(self, frame, snr, freq_offset, freedv_inst, bytes_per_frame): + self.log(frame) + self.add_to_heard_stations() + self.emit_event(frame) + self.follow_protocol()