diff --git a/modem/data_handler.py b/modem/data_handler.py index 1df9fb9f..ac8a6c5d 100644 --- a/modem/data_handler.py +++ b/modem/data_handler.py @@ -13,6 +13,7 @@ import threading import helpers import structlog from modem_frametypes import FRAME_TYPE as FR_TYPE +import event_manager from data_handler_broadcasts import BROADCAST from data_handler_data_broadcasts import DATABROADCAST @@ -54,6 +55,8 @@ class DATA: self.arq_iss = ISS(config, event_queue, states) self.arq_session = SESSION(config, event_queue, states) + self.event_manager = event_manager.EventManager([event_queue]) + def _initialize_queues(self): """Initializes data queues.""" self.data_queue_transmit = DATA_QUEUE_TRANSMIT diff --git a/modem/data_handler_arq.py b/modem/data_handler_arq.py index 716223ad..5c7e4733 100644 --- a/modem/data_handler_arq.py +++ b/modem/data_handler_arq.py @@ -8,7 +8,7 @@ import stats import structlog from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS from modem_frametypes import FRAME_TYPE as FR_TYPE - +import event_manager TESTMODE = False class ARQ: @@ -17,6 +17,8 @@ class ARQ: self.event_queue = event_queue self.states = states + self.event_manager = event_manager.EventManager([event_queue]) + # ARQ PROTOCOL VERSION # v.5 - signalling frame uses datac0 @@ -301,7 +303,7 @@ class ARQ: """ self.log.warning("[Modem] Stopping transmission!") - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="stopped", @@ -329,7 +331,7 @@ class ARQ: self.log.warning("[Modem] Stopping transmission!") self.states.set("is_modem_busy", False) self.states.set("is_arq_state", False) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="stopped", @@ -347,7 +349,7 @@ class ARQ: Returns: """ self.log.warning("[Modem] Channel busy, waiting until free...") - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", channel="busy", status="waiting", @@ -671,7 +673,7 @@ class ARQ: + str(self.dxcallsign, "UTF-8") + "]" ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", @@ -702,7 +704,7 @@ class ARQ: + str(self.dxcallsign, "UTF-8") + "]" ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="failed", diff --git a/modem/data_handler_arq_irs.py b/modem/data_handler_arq_irs.py index 420712b9..b6978bac 100644 --- a/modem/data_handler_arq_irs.py +++ b/modem/data_handler_arq_irs.py @@ -134,7 +134,7 @@ class IRS(ARQ): data=base64_data ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="received", @@ -218,7 +218,7 @@ class IRS(ARQ): self.dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13])) self.states.set("dxcallsign", self.dxcallsign) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="opening", @@ -338,7 +338,7 @@ class IRS(ARQ): self.enqueue_frame_for_tx([connection_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="opened", @@ -652,7 +652,7 @@ class IRS(ARQ): ) # send a network message with information - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="receiving", @@ -732,7 +732,7 @@ class IRS(ARQ): ) self.arq_process_received_data_frame(data_frame, snr, signed=False) else: - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", diff --git a/modem/data_handler_arq_iss.py b/modem/data_handler_arq_iss.py index 8fbf4916..13275dc1 100644 --- a/modem/data_handler_arq_iss.py +++ b/modem/data_handler_arq_iss.py @@ -11,6 +11,7 @@ import modem import numpy as np from codec2 import FREEDV_MODE from modem_frametypes import FRAME_TYPE as FR_TYPE +import event_manager from data_handler_arq import ARQ @@ -57,7 +58,7 @@ class ISS(ARQ): self.arq_compression_factor = np.clip(compression_factor, 0, 255) compression_factor = bytes([int(self.arq_compression_factor * 10)]) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="transmitting", @@ -272,7 +273,7 @@ class ISS(ARQ): tx_start_of_transmission, bufferposition_end, len(data_out) ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="transmitting", @@ -314,7 +315,7 @@ class ISS(ARQ): # gui database is too slow for handling this within 0.001 seconds # so let's sleep a little threading.Event().wait(0.2) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="transmitted", @@ -344,7 +345,7 @@ class ISS(ARQ): """ will be called if we not successfully transmitted all of queued data """ - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", @@ -482,7 +483,7 @@ class ISS(ARQ): self.states.radio_frequency, self.states.heard_stations ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", @@ -586,7 +587,7 @@ class ISS(ARQ): while not self.states.is_arq_session and not self.arq_session_timeout: threading.Event().wait(0.01) self.states.set("arq_session_state", "connecting") - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connecting", @@ -595,7 +596,7 @@ class ISS(ARQ): ) if self.states.is_arq_session and self.states.arq_session_state == "connected": # self.states.set("arq_session_state", "connected") - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connected", @@ -615,7 +616,7 @@ class ISS(ARQ): state=self.states.arq_session_state, ) self.states.set("arq_session_state", "failed") - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="failed", @@ -659,7 +660,7 @@ class ISS(ARQ): state=self.states.arq_session_state, ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connecting", @@ -694,7 +695,7 @@ class ISS(ARQ): return False # Given the while condition, it will only exit when `self.states.is_arq_session` is True - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connected", @@ -771,7 +772,7 @@ class ISS(ARQ): "[Modem] arq_open_data_channel:", transmission_uuid=self.transmission_uuid ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", @@ -846,7 +847,7 @@ class ISS(ARQ): for attempt in range(self.data_channel_max_retries): - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="opening", @@ -895,7 +896,7 @@ class ISS(ARQ): """ protocol_version = int.from_bytes(bytes(data_in[13:14]), "big") if protocol_version == self.arq_protocol_version: - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="opened", @@ -949,7 +950,7 @@ class ISS(ARQ): self.data_channel_last_received = int(time.time()) else: - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="transmission", status="failed", diff --git a/modem/data_handler_arq_session.py b/modem/data_handler_arq_session.py index 1510e506..d1bf7848 100644 --- a/modem/data_handler_arq_session.py +++ b/modem/data_handler_arq_session.py @@ -48,7 +48,7 @@ class SESSION(ARQ): self.states.arq_session_state, ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="close", @@ -70,7 +70,7 @@ class SESSION(ARQ): connection_frame[:1] = bytes([FR_TYPE.ARQ_SESSION_HB.value]) connection_frame[1:2] = self.session_id - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connected", @@ -104,7 +104,7 @@ class SESSION(ARQ): self.states.heard_stations ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connected", @@ -149,7 +149,7 @@ class SESSION(ARQ): self.states.arq_session_state, ) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="close", @@ -223,7 +223,7 @@ class SESSION(ARQ): self.states.is_arq_session = True self.states.set("is_modem_busy", True) - self.send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", arq="session", status="connected", diff --git a/modem/data_handler_broadcasts.py b/modem/data_handler_broadcasts.py index a52932d4..8cb13c79 100644 --- a/modem/data_handler_broadcasts.py +++ b/modem/data_handler_broadcasts.py @@ -9,7 +9,7 @@ import modem from random import randrange import uuid import structlog -from data_handler_helpers import enqueue_frame_for_tx, send_data_to_socket_queue +import event_manager TESTMODE = False @@ -20,6 +20,10 @@ class BROADCAST: self.states = states self.event_queue = event_queue self.config = config + + self.event_manager = event_manager.EventManager([event_queue]) + + self.beacon_interval = 0 self.beacon_interval_timer = 0 @@ -53,9 +57,7 @@ class BROADCAST: """Send an empty test frame""" test_frame = bytearray(126) test_frame[:1] = bytes([FR_TYPE.TEST_FRAME.value]) - enqueue_frame_for_tx( - frame_to_tx=[test_frame], c2_mode=FREEDV_MODE.datac13.value - ) + self.enqueue_frame_for_tx(frame_to_tx=[test_frame], c2_mode=FREEDV_MODE.datac13.value) def send_fec(self, mode, wakeup, payload, mycallsign): """Send an empty test frame""" @@ -80,14 +82,14 @@ class BROADCAST: fec_wakeup_frame[8:9] = bytes([1]) # n payload bursts print(mode_int_wakeup) - enqueue_frame_for_tx( + self.enqueue_frame_for_tx( frame_to_tx=[fec_wakeup_frame], c2_mode=codec2.FREEDV_MODE["sig1"].value ) time.sleep(1) fec_frame = bytearray(payload_per_frame) fec_frame[:1] = bytes([FR_TYPE.FEC.value]) fec_frame[1:payload_per_frame] = bytes(payload[:fec_payload_length]) - enqueue_frame_for_tx( + self.enqueue_frame_for_tx( frame_to_tx=[fec_frame], c2_mode=codec2.FREEDV_MODE[mode].value ) @@ -101,7 +103,7 @@ class BROADCAST: # send burst only if channel not busy - but without waiting # otherwise burst will be dropped if not self.states.channel_busy and not self.states.is_transmitting: - enqueue_frame_for_tx( + self.enqueue_frame_for_tx( frame_to_tx=[fec_frame], c2_mode=codec2.FREEDV_MODE["sig0"].value ) else: @@ -120,14 +122,14 @@ class BROADCAST: """ self.log.info("[Modem] CQ CQ CQ") - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", cq="transmitting", mycallsign=str(self.mycallsign, "UTF-8"), dxcallsign="None", ) - + cq_frame = bytearray(self.length_sig0_frame) cq_frame[:1] = bytes([FR_TYPE.CQ.value]) cq_frame[1:7] = helpers.callsign_to_bytes(self.mycallsign) @@ -137,9 +139,9 @@ class BROADCAST: if self.enable_fsk: self.log.info("[Modem] ENABLE FSK", state=self.enable_fsk) - enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value) + self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value) else: - enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) + self.enqueue_frame_for_tx([cq_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) def received_cq(self, data_in: bytes, snr) -> None: """ @@ -155,7 +157,7 @@ class BROADCAST: self.log.debug("[Modem] received_cq:", dxcallsign=dxcallsign) self.dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8") - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", cq="received", mycallsign=str(self.mycallsign, "UTF-8"), @@ -202,7 +204,7 @@ class BROADCAST: self.log.info("[Modem] Waiting for QRV slot...") helpers.wait(randrange(0, int(self.duration_sig1_frame * 4), self.duration_sig1_frame * 10 // 10.0)) - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", qrv="transmitting", dxcallsign=str(dxcallsign, "UTF-8"), @@ -218,9 +220,9 @@ class BROADCAST: if self.enable_fsk: self.log.info("[Modem] ENABLE FSK", state=self.enable_fsk) - enqueue_frame_for_tx([qrv_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value) + self.enqueue_frame_for_tx([qrv_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value) else: - enqueue_frame_for_tx([qrv_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) + self.enqueue_frame_for_tx([qrv_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) def received_qrv(self, data_in: bytes, snr) -> None: """ @@ -236,7 +238,7 @@ class BROADCAST: combined_snr = f"{snr}/{dxsnr}" - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", qrv="received", dxcallsign=str(dxcallsign, "UTF-8"), @@ -276,7 +278,7 @@ class BROADCAST: # here we add the received station to the heard stations buffer dxcallsign = helpers.bytes_to_callsign(bytes(data_in[1:7])) - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", fec="is_writing", dxcallsign=str(dxcallsign, "UTF-8") @@ -312,7 +314,7 @@ class BROADCAST: and not self.states.is_modem_busy and not self.states.is_arq_state ): - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", beacon="transmitting", dxcallsign="None", @@ -329,12 +331,12 @@ class BROADCAST: if self.enable_fsk: self.log.info("[Modem] ENABLE FSK", state=self.enable_fsk) - enqueue_frame_for_tx( + self.enqueue_frame_for_tx( [beacon_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value, ) else: - enqueue_frame_for_tx([beacon_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, + self.enqueue_frame_for_tx([beacon_frame], c2_mode=FREEDV_MODE.sig0.value, copies=1, repeat_delay=0) if self.enable_morse_identifier: MODEM_TRANSMIT_QUEUE.put(["morse", 1, 0, self.mycallsign]) @@ -360,7 +362,7 @@ class BROADCAST: # here we add the received station to the heard stations buffer beacon_callsign = helpers.bytes_to_callsign(bytes(data_in[1:7])) self.dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8") - send_data_to_socket_queue( + self.event_manager.send_custom_event( freedata="modem-message", beacon="received", uuid=str(uuid.uuid4()), @@ -386,4 +388,32 @@ class BROADCAST: self.modem_frequency_offset, self.states.radio_frequency, self.states.heard_stations - ) \ No newline at end of file + ) + + def enqueue_frame_for_tx( + self, + frame_to_tx, # : list[bytearray], # this causes a crash on python 3.7 + c2_mode=FREEDV_MODE.sig0.value, + copies=1, + repeat_delay=0, + ) -> None: + """ + Send (transmit) supplied frame to Modem + + :param frame_to_tx: Frame data to send + :type frame_to_tx: list of bytearrays + :param c2_mode: Codec2 mode to use, defaults to datac13 + :type c2_mode: int, optional + :param copies: Number of frame copies to send, defaults to 1 + :type copies: int, optional + :param repeat_delay: Delay time before sending repeat frame, defaults to 0 + :type repeat_delay: int, optional + """ + # frame_type = FR_TYPE(int.from_bytes(frame_to_tx[0][:1], byteorder="big")).name + # log.debug("[Modem] enqueue_frame_for_tx", c2_mode=FREEDV_MODE(c2_mode).name, data=frame_to_tx,type=frame_type) + + MODEM_TRANSMIT_QUEUE.put([c2_mode, copies, repeat_delay, frame_to_tx]) + + # Wait while transmitting + while self.states.is_transmitting: + threading.Event().wait(0.01) \ No newline at end of file diff --git a/modem/data_handler_helpers.py b/modem/data_handler_helpers.py deleted file mode 100644 index 8e210bb8..00000000 --- a/modem/data_handler_helpers.py +++ /dev/null @@ -1,74 +0,0 @@ -from modem_frametypes import FRAME_TYPE as FR_TYPE -import threading -from codec2 import FREEDV_MODE -from queues import MODEM_TRANSMIT_QUEUE -import ujson as json - - -def enqueue_frame_for_tx( - frame_to_tx, # : list[bytearray], # this causes a crash on python 3.7 - c2_mode=FREEDV_MODE.sig0.value, - copies=1, - repeat_delay=0, -) -> None: - """ - Send (transmit) supplied frame to Modem - - :param frame_to_tx: Frame data to send - :type frame_to_tx: list of bytearrays - :param c2_mode: Codec2 mode to use, defaults to datac13 - :type c2_mode: int, optional - :param copies: Number of frame copies to send, defaults to 1 - :type copies: int, optional - :param repeat_delay: Delay time before sending repeat frame, defaults to 0 - :type repeat_delay: int, optional - """ - frame_type = FR_TYPE(int.from_bytes(frame_to_tx[0][:1], byteorder="big")).name - #log.debug("[Modem] enqueue_frame_for_tx", c2_mode=FREEDV_MODE(c2_mode).name, data=frame_to_tx, - type=frame_type) - - MODEM_TRANSMIT_QUEUE.put([c2_mode, copies, repeat_delay, frame_to_tx]) - - # Wait while transmitting - while states.is_transmitting: - threading.Event().wait(0.01) - -def send_data_to_socket_queue(**jsondata): - """ - Send information to the UI via JSON and the sock.SOCKET_QUEUE. - - Args: - Dictionary containing the data to be sent, in the format: - key=value, for each item. E.g.: - send_data_to_socket_queue( - freedata="modem-message", - arq="received", - status="success", - uuid=transmission_uuid, - timestamp=timestamp, - mycallsign=str(mycallsign, "UTF-8"), - dxcallsign=str(dxcallsign, "UTF-8"), - dxgrid=str(dxgrid, "UTF-8"), - data=base64_data, - ) - """ - - # add mycallsign and dxcallsign to network message if they not exist - # and make sure we are not overwrite them if they exist - - """ - try: - if "mycallsign" not in jsondata: - jsondata["mycallsign"] = str(mycallsign, "UTF-8") - if "dxcallsign" not in jsondata: - jsondata["dxcallsign"] = str(dxcallsign, "UTF-8") - except Exception as e: - log.debug("[Modem] error adding callsigns to network message", e=e) - """ - # run json dumps - json_data_out = json.dumps(jsondata) - - log.debug("[Modem] send_data_to_socket_queue:", jsondata=json_data_out) - # finally push data to our network queue - # sock.SOCKET_QUEUE.put(json_data_out) - event_queue.put(json_data_out) \ No newline at end of file diff --git a/modem/event_manager.py b/modem/event_manager.py index 70fea020..906ad29e 100644 --- a/modem/event_manager.py +++ b/modem/event_manager.py @@ -22,4 +22,8 @@ class EventManager: def send_buffer_overflow(self, data): jsondata = {"buffer-overflow": str(data)} data_out = json.dumps(jsondata) - self.broadcast(data_out) \ No newline at end of file + self.broadcast(data_out) + + def send_custom_event(self, **jsondata): + data_out = json.dumps(jsondata) + self.broadcast(data_out)