mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
test with frame dispatcher
This commit is contained in:
parent
2422f1dab5
commit
1cf6d5914a
8 changed files with 306 additions and 273 deletions
|
@ -101,6 +101,7 @@ class CONFIG:
|
||||||
# Validates config data
|
# Validates config data
|
||||||
def validate(self, data):
|
def validate(self, data):
|
||||||
for section in data:
|
for section in data:
|
||||||
|
print(section)
|
||||||
for setting in data[section]:
|
for setting in data[section]:
|
||||||
if not isinstance(data[section][setting], self.config_types[section][setting]):
|
if not isinstance(data[section][setting], self.config_types[section][setting]):
|
||||||
message = (f"{section}.{setting} must be {self.config_types[section][setting]}."
|
message = (f"{section}.{setting} must be {self.config_types[section][setting]}."
|
||||||
|
|
|
@ -15,15 +15,7 @@ import structlog
|
||||||
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
||||||
import event_manager
|
import event_manager
|
||||||
|
|
||||||
from data_handler_broadcasts import BROADCAST
|
|
||||||
from data_handler_data_broadcasts import DATABROADCAST
|
|
||||||
from data_handler_ping import PING
|
|
||||||
|
|
||||||
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT
|
|
||||||
from data_handler_arq_iss import ISS
|
|
||||||
from data_handler_arq_irs import IRS
|
|
||||||
from data_handler_arq import ARQ
|
|
||||||
from data_handler_arq_session import SESSION
|
|
||||||
|
|
||||||
TESTMODE = False
|
TESTMODE = False
|
||||||
|
|
||||||
|
@ -39,257 +31,5 @@ class DATA:
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
self.states = states
|
self.states = states
|
||||||
|
|
||||||
self._initialize_handlers(config, event_queue, states)
|
|
||||||
self._initialize_queues()
|
|
||||||
self._initialize_dispatchers()
|
|
||||||
self._start_worker_threads()
|
|
||||||
|
|
||||||
def _initialize_handlers(self, config, event_queue, states):
|
|
||||||
"""Initializes various data handlers."""
|
|
||||||
self.broadcasts = BROADCAST(config, event_queue, states)
|
|
||||||
self.data_broadcasts = DATABROADCAST(config, event_queue, states)
|
|
||||||
self.ping = PING(config, event_queue, states)
|
|
||||||
|
|
||||||
self.arq = ARQ(config, event_queue, states)
|
|
||||||
self.arq_irs = IRS(config, event_queue, states)
|
|
||||||
self.arq_iss = ISS(config, event_queue, states)
|
|
||||||
self.arq_session = SESSION(config, event_queue, states)
|
|
||||||
|
|
||||||
self.event_manager = event_manager.EventManager([event_queue])
|
|
||||||
|
|
||||||
def _initialize_queues(self):
|
|
||||||
"""Initializes data queues."""
|
|
||||||
self.data_queue_transmit = DATA_QUEUE_TRANSMIT
|
|
||||||
self.data_queue_received = DATA_QUEUE_RECEIVED
|
|
||||||
|
|
||||||
def _initialize_dispatchers(self):
|
|
||||||
"""Initializes dispatchers for handling different frame types."""
|
|
||||||
# Dictionary of functions and log messages used in process_data
|
|
||||||
# instead of a long series of if-elif-else statements.
|
|
||||||
self.rx_dispatcher = {
|
|
||||||
FR_TYPE.ARQ_DC_OPEN_ACK_N.value: (
|
|
||||||
self.arq_iss.arq_received_channel_is_open,
|
|
||||||
"ARQ OPEN ACK (Narrow)",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_DC_OPEN_ACK_W.value: (
|
|
||||||
self.arq_iss.arq_received_channel_is_open,
|
|
||||||
"ARQ OPEN ACK (Wide)",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_DC_OPEN_N.value: (
|
|
||||||
self.arq_irs.arq_received_data_channel_opener,
|
|
||||||
"ARQ Data Channel Open (Narrow)",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_DC_OPEN_W.value: (
|
|
||||||
self.arq_irs.arq_received_data_channel_opener,
|
|
||||||
"ARQ Data Channel Open (Wide)",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_SESSION_CLOSE.value: (
|
|
||||||
self.arq_session.received_session_close,
|
|
||||||
"ARQ CLOSE SESSION",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_SESSION_HB.value: (
|
|
||||||
self.arq_session.received_session_heartbeat,
|
|
||||||
"ARQ HEARTBEAT",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_SESSION_OPEN.value: (
|
|
||||||
self.arq_session.received_session_opener,
|
|
||||||
"ARQ OPEN SESSION",
|
|
||||||
),
|
|
||||||
FR_TYPE.ARQ_STOP.value: (self.arq.received_stop_transmission, "ARQ STOP TX"),
|
|
||||||
FR_TYPE.BEACON.value: (self.broadcasts.received_beacon, "BEACON"),
|
|
||||||
FR_TYPE.BURST_ACK.value: (self.arq_iss.burst_ack_nack_received, "BURST ACK"),
|
|
||||||
FR_TYPE.BURST_NACK.value: (self.arq_iss.burst_ack_nack_received, "BURST NACK"),
|
|
||||||
FR_TYPE.CQ.value: (self.broadcasts.received_cq, "CQ"),
|
|
||||||
FR_TYPE.FR_ACK.value: (self.arq_iss.frame_ack_received, "FRAME ACK"),
|
|
||||||
FR_TYPE.FR_NACK.value: (self.arq_iss.frame_nack_received, "FRAME NACK"),
|
|
||||||
FR_TYPE.FR_REPEAT.value: (self.arq_iss.burst_rpt_received, "REPEAT REQUEST"),
|
|
||||||
FR_TYPE.PING_ACK.value: (self.ping.received_ping_ack, "PING ACK"),
|
|
||||||
FR_TYPE.PING.value: (self.ping.received_ping, "PING"),
|
|
||||||
FR_TYPE.QRV.value: (self.broadcasts.received_qrv, "QRV"),
|
|
||||||
FR_TYPE.IS_WRITING.value: (self.broadcasts.received_is_writing, "IS_WRITING"),
|
|
||||||
FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"),
|
|
||||||
FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"),
|
|
||||||
|
|
||||||
}
|
|
||||||
self.command_dispatcher = {
|
|
||||||
# "CONNECT": (self.arq_session_handler, "CONNECT"),
|
|
||||||
"CQ": (self.broadcasts.transmit_cq, "CQ"),
|
|
||||||
"DISCONNECT": (self.arq_session.close_session, "DISCONNECT"),
|
|
||||||
"SEND_TEST_FRAME": (self.broadcasts.send_test_frame, "TEST"),
|
|
||||||
"STOP": (self.arq.stop_transmission, "STOP"),
|
|
||||||
}
|
|
||||||
|
|
||||||
def _start_worker_threads(self):
|
|
||||||
"""Starts worker threads for transmit and receive operations."""
|
|
||||||
threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start()
|
|
||||||
threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start()
|
|
||||||
|
|
||||||
def worker_transmit(self) -> None:
|
|
||||||
"""Dispatch incoming UI instructions for transmitting operations"""
|
|
||||||
while True:
|
|
||||||
data = self.data_queue_transmit.get()
|
|
||||||
# if we are already in ARQ_STATE, or we're receiving codec2 traffic
|
|
||||||
# let's wait with processing data
|
|
||||||
# this should avoid weird toggle states where both stations
|
|
||||||
# stuck in IRS
|
|
||||||
#
|
|
||||||
# send transmission queued information once
|
|
||||||
if self.states.is_arq_state or self.states.is_codec2_traffic:
|
|
||||||
self.log.debug(
|
|
||||||
"[Modem] TX DISPATCHER - waiting with processing command ",
|
|
||||||
is_arq_state=self.states.is_arq_state,
|
|
||||||
)
|
|
||||||
|
|
||||||
self.send_data_to_socket_queue(
|
|
||||||
freedata="modem-message",
|
|
||||||
command=data[0],
|
|
||||||
status="queued",
|
|
||||||
)
|
|
||||||
|
|
||||||
# now stay in while loop until state released
|
|
||||||
while self.states.is_arq_state or self.states.is_codec2_traffic:
|
|
||||||
threading.Event().wait(0.01)
|
|
||||||
|
|
||||||
# and finally sleep some time
|
|
||||||
threading.Event().wait(1.0)
|
|
||||||
|
|
||||||
# Dispatch commands known to command_dispatcher
|
|
||||||
if data[0] in self.command_dispatcher:
|
|
||||||
self.log.debug(f"[Modem] TX {self.command_dispatcher[data[0]][1]}...")
|
|
||||||
self.command_dispatcher[data[0]][0]()
|
|
||||||
|
|
||||||
# Dispatch commands that need more arguments.
|
|
||||||
elif data[0] == "CONNECT":
|
|
||||||
# [1] mycallsign
|
|
||||||
# [2] dxcallsign
|
|
||||||
self.arq.arq_session_handler(data[1], data[2])
|
|
||||||
|
|
||||||
elif data[0] == "PING":
|
|
||||||
# [1] mycallsign // this is being injected as None
|
|
||||||
# [2] dxcallsign
|
|
||||||
mycallsign = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}"
|
|
||||||
self.ping.transmit_ping(mycallsign, data[2])
|
|
||||||
|
|
||||||
elif data[0] == "ARQ_RAW":
|
|
||||||
# [1] DATA_OUT bytes
|
|
||||||
# [2] self.transmission_uuid str
|
|
||||||
# [3] mycallsign with ssid str
|
|
||||||
# [4] dxcallsign with ssid str
|
|
||||||
self.arq_iss.open_dc_and_transmit(data[1], data[2], data[3], data[4])
|
|
||||||
|
|
||||||
|
|
||||||
elif data[0] == "FEC_IS_WRITING":
|
|
||||||
# [1] DATA_OUT bytes
|
|
||||||
# [2] MODE str datac0/1/3...
|
|
||||||
self.broadcasts.send_fec_is_writing(data[1])
|
|
||||||
|
|
||||||
elif data[0] == "FEC":
|
|
||||||
# [1] WAKEUP bool
|
|
||||||
# [2] MODE str datac0/1/3...
|
|
||||||
# [3] PAYLOAD
|
|
||||||
# [4] MYCALLSIGN
|
|
||||||
self.broadcasts.send_fec(data[1], data[2], data[3], data[4])
|
|
||||||
else:
|
|
||||||
self.log.error(
|
|
||||||
"[Modem] worker_transmit: received invalid command:", data=data
|
|
||||||
)
|
|
||||||
|
|
||||||
def worker_receive(self) -> None:
|
|
||||||
"""Queue received data for processing"""
|
|
||||||
while True:
|
|
||||||
data = self.data_queue_received.get()
|
|
||||||
# [0] bytes
|
|
||||||
# [1] freedv instance
|
|
||||||
# [2] bytes_per_frame
|
|
||||||
# [3] snr
|
|
||||||
self.process_data(
|
|
||||||
bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2], snr=data[3]
|
|
||||||
)
|
|
||||||
|
|
||||||
def process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None:
|
|
||||||
"""
|
|
||||||
Process incoming data and decide what to do with the frame.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
bytes_out:
|
|
||||||
freedv:
|
|
||||||
bytes_per_frame:
|
|
||||||
snr:
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
# Process data only if broadcast or we are the receiver
|
|
||||||
# bytes_out[1:4] == callsign check for signalling frames,
|
|
||||||
# bytes_out[2:5] == transmission
|
|
||||||
# we could also create an own function, which returns True.
|
|
||||||
frametype = int.from_bytes(bytes(bytes_out[:1]), "big")
|
|
||||||
|
|
||||||
# check for callsign CRC
|
|
||||||
_valid1, _ = helpers.check_callsign(self.arq.mycallsign, bytes(bytes_out[1:4]), self.arq.ssid_list)
|
|
||||||
_valid2, _ = helpers.check_callsign(self.arq.mycallsign, bytes(bytes_out[2:5]), self.arq.ssid_list)
|
|
||||||
# check for session ID
|
|
||||||
_valid3 = helpers.check_session_id(self.arq.session_id, bytes(bytes_out[1:2])) # signalling frames
|
|
||||||
_valid4 = helpers.check_session_id(self.arq.session_id, bytes(bytes_out[2:3])) # arq data frames
|
|
||||||
if (
|
|
||||||
_valid1
|
|
||||||
or _valid2
|
|
||||||
or _valid3
|
|
||||||
or _valid4
|
|
||||||
or frametype
|
|
||||||
in [
|
|
||||||
FR_TYPE.CQ.value,
|
|
||||||
FR_TYPE.QRV.value,
|
|
||||||
FR_TYPE.PING.value,
|
|
||||||
FR_TYPE.BEACON.value,
|
|
||||||
FR_TYPE.IS_WRITING.value,
|
|
||||||
FR_TYPE.FEC.value,
|
|
||||||
FR_TYPE.FEC_WAKEUP.value,
|
|
||||||
]
|
|
||||||
):
|
|
||||||
|
|
||||||
# CHECK IF FRAMETYPE IS BETWEEN 10 and 50 ------------------------
|
|
||||||
# frame = frametype - 10
|
|
||||||
# n_frames_per_burst = int.from_bytes(bytes(bytes_out[1:2]), "big")
|
|
||||||
|
|
||||||
# Dispatch activity based on received frametype
|
|
||||||
if frametype in self.rx_dispatcher:
|
|
||||||
# Process frames "known" by rx_dispatcher
|
|
||||||
# self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....")
|
|
||||||
self.rx_dispatcher[frametype][0](bytes_out[:-2],snr)
|
|
||||||
|
|
||||||
# Process frametypes requiring a different set of arguments.
|
|
||||||
elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
|
|
||||||
# get snr of received data
|
|
||||||
# FIXME find a fix for this - after moving to classes, this no longer works
|
|
||||||
# snr = self.calculate_snr(freedv)
|
|
||||||
self.log.debug("[Modem] RX SNR", snr=snr)
|
|
||||||
# send payload data to arq checker without CRC16
|
|
||||||
self.arq_irs.arq_data_received(
|
|
||||||
bytes(bytes_out[:-2]), bytes_per_frame, snr, freedv
|
|
||||||
)
|
|
||||||
|
|
||||||
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
|
|
||||||
# if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst:
|
|
||||||
# self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
|
|
||||||
# self.c_lib.freedv_set_sync(freedv, 0)
|
|
||||||
|
|
||||||
# TESTFRAMES
|
|
||||||
elif frametype == FR_TYPE.TEST_FRAME.value:
|
|
||||||
self.log.debug("[Modem] TESTFRAME RECEIVED", frame=bytes_out[:])
|
|
||||||
|
|
||||||
# Unknown frame type
|
|
||||||
else:
|
|
||||||
self.log.warning(
|
|
||||||
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
# for debugging purposes to receive all data
|
|
||||||
self.log.debug(
|
|
||||||
"[Modem] Foreign frame received",
|
|
||||||
frame=bytes_out[:-2].hex(),
|
|
||||||
frame_type=FR_TYPE(int.from_bytes(bytes_out[:1], byteorder="big")).name,
|
|
||||||
)
|
|
||||||
|
|
|
@ -10,11 +10,14 @@ from codec2 import FREEDV_MODE, FREEDV_MODE_USED_SLOTS
|
||||||
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
||||||
import event_manager
|
import event_manager
|
||||||
|
|
||||||
|
from data_handler import DATA
|
||||||
TESTMODE = False
|
TESTMODE = False
|
||||||
class ARQ:
|
class ARQ(DATA):
|
||||||
def __init__(self, config, event_queue, states):
|
def __init__(self, config, event_queue, states):
|
||||||
self.log = structlog.get_logger("DHARQ")
|
super().__init__(config, event_queue, states)
|
||||||
|
|
||||||
|
self.log = structlog.get_logger("DHARQ")
|
||||||
|
print(self.test_variable)
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
self.states = states
|
self.states = states
|
||||||
self.event_manager = event_manager.EventManager([event_queue])
|
self.event_manager = event_manager.EventManager([event_queue])
|
||||||
|
|
|
@ -11,11 +11,15 @@ import uuid
|
||||||
import structlog
|
import structlog
|
||||||
import event_manager
|
import event_manager
|
||||||
|
|
||||||
|
from data_handler import DATA
|
||||||
|
|
||||||
TESTMODE = False
|
TESTMODE = False
|
||||||
|
|
||||||
class BROADCAST:
|
class BROADCAST(DATA):
|
||||||
|
|
||||||
def __init__(self, config, event_queue, states):
|
def __init__(self, config, event_queue, states):
|
||||||
|
super().__init__(config, event_queue, states)
|
||||||
|
|
||||||
self.log = structlog.get_logger("DHBC")
|
self.log = structlog.get_logger("DHBC")
|
||||||
self.states = states
|
self.states = states
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
|
@ -24,7 +28,6 @@ class BROADCAST:
|
||||||
self.event_manager = event_manager.EventManager([event_queue])
|
self.event_manager = event_manager.EventManager([event_queue])
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
self.beacon_interval = 0
|
self.beacon_interval = 0
|
||||||
self.beacon_interval_timer = 0
|
self.beacon_interval_timer = 0
|
||||||
self.beacon_paused = False
|
self.beacon_paused = False
|
||||||
|
|
|
@ -5,13 +5,15 @@ import time
|
||||||
import modem
|
import modem
|
||||||
import base64
|
import base64
|
||||||
import ujson as json
|
import ujson as json
|
||||||
|
from data_handler import DATA
|
||||||
class DATABROADCAST:
|
class DATABROADCAST(DATA):
|
||||||
"""Terminal Node Controller for FreeDATA"""
|
"""Terminal Node Controller for FreeDATA"""
|
||||||
|
|
||||||
log = structlog.get_logger("BROADCAST")
|
log = structlog.get_logger("BROADCAST")
|
||||||
|
|
||||||
def __init__(self, config, event_queue, states) -> None:
|
def __init__(self, config, event_queue, states) -> None:
|
||||||
|
super().__init__(config, event_queue, states)
|
||||||
|
|
||||||
self.log = structlog.get_logger("DHDBC")
|
self.log = structlog.get_logger("DHDBC")
|
||||||
self.states = states
|
self.states = states
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
|
|
|
@ -4,9 +4,11 @@ from codec2 import FREEDV_MODE
|
||||||
import helpers
|
import helpers
|
||||||
import uuid
|
import uuid
|
||||||
import structlog
|
import structlog
|
||||||
|
from data_handler import DATA
|
||||||
class PING:
|
class PING(DATA):
|
||||||
def __init__(self, config, event_queue, states):
|
def __init__(self, config, event_queue, states):
|
||||||
|
super().__init__(config, event_queue, states)
|
||||||
|
|
||||||
self.log = structlog.get_logger("DHPING")
|
self.log = structlog.get_logger("DHPING")
|
||||||
self.states = states
|
self.states = states
|
||||||
self.event_queue = event_queue
|
self.event_queue = event_queue
|
||||||
|
|
280
modem/frame_dispatcher.py
Normal file
280
modem/frame_dispatcher.py
Normal file
|
@ -0,0 +1,280 @@
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import helpers
|
||||||
|
import structlog
|
||||||
|
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
||||||
|
import event_manager
|
||||||
|
from queues import DATA_QUEUE_RECEIVED, DATA_QUEUE_TRANSMIT
|
||||||
|
|
||||||
|
from data_handler_broadcasts import BROADCAST
|
||||||
|
from data_handler_data_broadcasts import DATABROADCAST
|
||||||
|
from data_handler_ping import PING
|
||||||
|
|
||||||
|
from data_handler_arq_iss import ISS
|
||||||
|
from data_handler_arq_irs import IRS
|
||||||
|
from data_handler_arq import ARQ
|
||||||
|
from data_handler_arq_session import SESSION
|
||||||
|
|
||||||
|
|
||||||
|
class DISPATCHER():
|
||||||
|
|
||||||
|
def __init__(self, config, event_queue, states):
|
||||||
|
print("loading frame dispatcher.....")
|
||||||
|
self.config = config
|
||||||
|
self.event_queue = event_queue
|
||||||
|
self.states = states
|
||||||
|
|
||||||
|
self._initialize_handlers(config, event_queue, states)
|
||||||
|
self._initialize_dispatchers()
|
||||||
|
self._initialize_queues()
|
||||||
|
self._start_worker_threads()
|
||||||
|
|
||||||
|
def _initialize_handlers(self, config, event_queue, states):
|
||||||
|
"""Initializes various data handlers."""
|
||||||
|
self.broadcasts = BROADCAST(config, event_queue, states)
|
||||||
|
self.data_broadcasts = DATABROADCAST(config, event_queue, states)
|
||||||
|
self.ping = PING(config, event_queue, states)
|
||||||
|
|
||||||
|
self.arq = ARQ(config, event_queue, states)
|
||||||
|
self.arq_irs = IRS(config, event_queue, states)
|
||||||
|
self.arq_iss = ISS(config, event_queue, states)
|
||||||
|
self.arq_session = SESSION(config, event_queue, states)
|
||||||
|
|
||||||
|
self.event_manager = event_manager.EventManager([event_queue])
|
||||||
|
|
||||||
|
def _initialize_dispatchers(self):
|
||||||
|
"""Initializes dispatchers for handling different frame types."""
|
||||||
|
# Dictionary of functions and log messages used in process_data
|
||||||
|
# instead of a long series of if-elif-else statements.
|
||||||
|
self.rx_dispatcher = {
|
||||||
|
FR_TYPE.ARQ_DC_OPEN_ACK_N.value: (
|
||||||
|
self.arq_iss.arq_received_channel_is_open,
|
||||||
|
"ARQ OPEN ACK (Narrow)",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_DC_OPEN_ACK_W.value: (
|
||||||
|
self.arq_iss.arq_received_channel_is_open,
|
||||||
|
"ARQ OPEN ACK (Wide)",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_DC_OPEN_N.value: (
|
||||||
|
self.arq_irs.arq_received_data_channel_opener,
|
||||||
|
"ARQ Data Channel Open (Narrow)",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_DC_OPEN_W.value: (
|
||||||
|
self.arq_irs.arq_received_data_channel_opener,
|
||||||
|
"ARQ Data Channel Open (Wide)",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_SESSION_CLOSE.value: (
|
||||||
|
self.arq_session.received_session_close,
|
||||||
|
"ARQ CLOSE SESSION",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_SESSION_HB.value: (
|
||||||
|
self.arq_session.received_session_heartbeat,
|
||||||
|
"ARQ HEARTBEAT",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_SESSION_OPEN.value: (
|
||||||
|
self.arq_session.received_session_opener,
|
||||||
|
"ARQ OPEN SESSION",
|
||||||
|
),
|
||||||
|
FR_TYPE.ARQ_STOP.value: (self.arq.received_stop_transmission, "ARQ STOP TX"),
|
||||||
|
FR_TYPE.BEACON.value: (self.broadcasts.received_beacon, "BEACON"),
|
||||||
|
FR_TYPE.BURST_ACK.value: (self.arq_iss.burst_ack_nack_received, "BURST ACK"),
|
||||||
|
FR_TYPE.BURST_NACK.value: (self.arq_iss.burst_ack_nack_received, "BURST NACK"),
|
||||||
|
FR_TYPE.CQ.value: (self.broadcasts.received_cq, "CQ"),
|
||||||
|
FR_TYPE.FR_ACK.value: (self.arq_iss.frame_ack_received, "FRAME ACK"),
|
||||||
|
FR_TYPE.FR_NACK.value: (self.arq_iss.frame_nack_received, "FRAME NACK"),
|
||||||
|
FR_TYPE.FR_REPEAT.value: (self.arq_iss.burst_rpt_received, "REPEAT REQUEST"),
|
||||||
|
FR_TYPE.PING_ACK.value: (self.ping.received_ping_ack, "PING ACK"),
|
||||||
|
FR_TYPE.PING.value: (self.ping.received_ping, "PING"),
|
||||||
|
FR_TYPE.QRV.value: (self.broadcasts.received_qrv, "QRV"),
|
||||||
|
FR_TYPE.IS_WRITING.value: (self.broadcasts.received_is_writing, "IS_WRITING"),
|
||||||
|
FR_TYPE.FEC.value: (self.data_broadcasts.received_fec, "FEC"),
|
||||||
|
FR_TYPE.FEC_WAKEUP.value: (self.data_broadcasts.received_fec_wakeup, "FEC WAKEUP"),
|
||||||
|
|
||||||
|
}
|
||||||
|
self.command_dispatcher = {
|
||||||
|
# "CONNECT": (self.arq_session_handler, "CONNECT"),
|
||||||
|
"CQ": (self.broadcasts.transmit_cq, "CQ"),
|
||||||
|
"DISCONNECT": (self.arq_session.close_session, "DISCONNECT"),
|
||||||
|
"SEND_TEST_FRAME": (self.broadcasts.send_test_frame, "TEST"),
|
||||||
|
"STOP": (self.arq.stop_transmission, "STOP"),
|
||||||
|
}
|
||||||
|
|
||||||
|
def _initialize_queues(self):
|
||||||
|
"""Initializes data queues."""
|
||||||
|
self.data_queue_transmit = DATA_QUEUE_TRANSMIT
|
||||||
|
self.data_queue_received = DATA_QUEUE_RECEIVED
|
||||||
|
|
||||||
|
def _start_worker_threads(self):
|
||||||
|
"""Starts worker threads for transmit and receive operations."""
|
||||||
|
threading.Thread(target=self.worker_transmit, name="Transmit Worker", daemon=True).start()
|
||||||
|
threading.Thread(target=self.worker_receive, name="Receive Worker", daemon=True).start()
|
||||||
|
|
||||||
|
def worker_transmit(self) -> None:
|
||||||
|
"""Dispatch incoming UI instructions for transmitting operations"""
|
||||||
|
while True:
|
||||||
|
data = self.data_queue_transmit.get()
|
||||||
|
# if we are already in ARQ_STATE, or we're receiving codec2 traffic
|
||||||
|
# let's wait with processing data
|
||||||
|
# this should avoid weird toggle states where both stations
|
||||||
|
# stuck in IRS
|
||||||
|
#
|
||||||
|
# send transmission queued information once
|
||||||
|
if self.states.is_arq_state or self.states.is_codec2_traffic:
|
||||||
|
self.log.debug(
|
||||||
|
"[Modem] TX DISPATCHER - waiting with processing command ",
|
||||||
|
is_arq_state=self.states.is_arq_state,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.send_data_to_socket_queue(
|
||||||
|
freedata="modem-message",
|
||||||
|
command=data[0],
|
||||||
|
status="queued",
|
||||||
|
)
|
||||||
|
|
||||||
|
# now stay in while loop until state released
|
||||||
|
while self.states.is_arq_state or self.states.is_codec2_traffic:
|
||||||
|
threading.Event().wait(0.01)
|
||||||
|
|
||||||
|
# and finally sleep some time
|
||||||
|
threading.Event().wait(1.0)
|
||||||
|
|
||||||
|
# Dispatch commands known to command_dispatcher
|
||||||
|
if data[0] in self.command_dispatcher:
|
||||||
|
self.log.debug(f"[Modem] TX {self.command_dispatcher[data[0]][1]}...")
|
||||||
|
self.command_dispatcher[data[0]][0]()
|
||||||
|
|
||||||
|
# Dispatch commands that need more arguments.
|
||||||
|
elif data[0] == "CONNECT":
|
||||||
|
# [1] mycallsign
|
||||||
|
# [2] dxcallsign
|
||||||
|
self.arq.arq_session_handler(data[1], data[2])
|
||||||
|
|
||||||
|
elif data[0] == "PING":
|
||||||
|
# [1] mycallsign // this is being injected as None
|
||||||
|
# [2] dxcallsign
|
||||||
|
mycallsign = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}"
|
||||||
|
self.ping.transmit_ping(mycallsign, data[2])
|
||||||
|
|
||||||
|
elif data[0] == "ARQ_RAW":
|
||||||
|
# [1] DATA_OUT bytes
|
||||||
|
# [2] self.transmission_uuid str
|
||||||
|
# [3] mycallsign with ssid str
|
||||||
|
# [4] dxcallsign with ssid str
|
||||||
|
self.arq_iss.open_dc_and_transmit(data[1], data[2], data[3], data[4])
|
||||||
|
|
||||||
|
|
||||||
|
elif data[0] == "FEC_IS_WRITING":
|
||||||
|
# [1] DATA_OUT bytes
|
||||||
|
# [2] MODE str datac0/1/3...
|
||||||
|
self.broadcasts.send_fec_is_writing(data[1])
|
||||||
|
|
||||||
|
elif data[0] == "FEC":
|
||||||
|
# [1] WAKEUP bool
|
||||||
|
# [2] MODE str datac0/1/3...
|
||||||
|
# [3] PAYLOAD
|
||||||
|
# [4] MYCALLSIGN
|
||||||
|
self.broadcasts.send_fec(data[1], data[2], data[3], data[4])
|
||||||
|
else:
|
||||||
|
self.log.error(
|
||||||
|
"[Modem] worker_transmit: received invalid command:", data=data
|
||||||
|
)
|
||||||
|
|
||||||
|
def worker_receive(self) -> None:
|
||||||
|
"""Queue received data for processing"""
|
||||||
|
while True:
|
||||||
|
data = self.data_queue_received.get()
|
||||||
|
# [0] bytes
|
||||||
|
# [1] freedv instance
|
||||||
|
# [2] bytes_per_frame
|
||||||
|
# [3] snr
|
||||||
|
self.process_data(
|
||||||
|
bytes_out=data[0], freedv=data[1], bytes_per_frame=data[2], snr=data[3]
|
||||||
|
)
|
||||||
|
|
||||||
|
def process_data(self, bytes_out, freedv, bytes_per_frame: int, snr) -> None:
|
||||||
|
"""
|
||||||
|
Process incoming data and decide what to do with the frame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bytes_out:
|
||||||
|
freedv:
|
||||||
|
bytes_per_frame:
|
||||||
|
snr:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self.check_if_valid_frame(bytes_out):
|
||||||
|
frametype = int.from_bytes(bytes(bytes_out[:1]), "big")
|
||||||
|
# Dispatch activity based on received frametype
|
||||||
|
if frametype in self.rx_dispatcher:
|
||||||
|
# Process frames "known" by rx_dispatcher
|
||||||
|
# self.log.debug(f"[Modem] {self.rx_dispatcher[frametype][1]} RECEIVED....")
|
||||||
|
self.rx_dispatcher[frametype][0](bytes_out[:-2], snr)
|
||||||
|
|
||||||
|
# Process frametypes requiring a different set of arguments.
|
||||||
|
elif FR_TYPE.BURST_51.value >= frametype >= FR_TYPE.BURST_01.value:
|
||||||
|
# get snr of received data
|
||||||
|
# FIXME find a fix for this - after moving to classes, this no longer works
|
||||||
|
# snr = self.calculate_snr(freedv)
|
||||||
|
self.log.debug("[Modem] RX SNR", snr=snr)
|
||||||
|
# send payload data to arq checker without CRC16
|
||||||
|
self.arq_irs.arq_data_received(
|
||||||
|
bytes(bytes_out[:-2]), bytes_per_frame, snr, freedv
|
||||||
|
)
|
||||||
|
|
||||||
|
# if we received the last frame of a burst or the last remaining rpt frame, do a modem unsync
|
||||||
|
# if self.arq_rx_burst_buffer.count(None) <= 1 or (frame+1) == n_frames_per_burst:
|
||||||
|
# self.log.debug(f"[Modem] LAST FRAME OF BURST --> UNSYNC {frame+1}/{n_frames_per_burst}")
|
||||||
|
# self.c_lib.freedv_set_sync(freedv, 0)
|
||||||
|
|
||||||
|
# TESTFRAMES
|
||||||
|
elif frametype == FR_TYPE.TEST_FRAME.value:
|
||||||
|
self.log.debug("[Modem] TESTFRAME RECEIVED", frame=bytes_out[:])
|
||||||
|
|
||||||
|
# Unknown frame type
|
||||||
|
else:
|
||||||
|
self.log.warning(
|
||||||
|
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name
|
||||||
|
)
|
||||||
|
|
||||||
|
else:
|
||||||
|
# for debugging purposes to receive all data
|
||||||
|
self.log.debug(
|
||||||
|
"[Modem] Foreign frame received",
|
||||||
|
frame=bytes_out[:-2].hex(),
|
||||||
|
frame_type=FR_TYPE(int.from_bytes(bytes_out[:1], byteorder="big")).name,
|
||||||
|
)
|
||||||
|
|
||||||
|
def check_if_valid_frame(self, bytes_out):
|
||||||
|
# Process data only if broadcast or we are the receiver
|
||||||
|
# bytes_out[1:4] == callsign check for signalling frames,
|
||||||
|
# bytes_out[2:5] == transmission
|
||||||
|
# we could also create an own function, which returns True.
|
||||||
|
frametype = int.from_bytes(bytes(bytes_out[:1]), "big")
|
||||||
|
|
||||||
|
# check for callsign CRC
|
||||||
|
_valid1, _ = helpers.check_callsign(self.arq.mycallsign, bytes(bytes_out[1:4]), self.arq.ssid_list)
|
||||||
|
_valid2, _ = helpers.check_callsign(self.arq.mycallsign, bytes(bytes_out[2:5]), self.arq.ssid_list)
|
||||||
|
# check for session ID
|
||||||
|
_valid3 = helpers.check_session_id(self.arq.session_id, bytes(bytes_out[1:2])) # signalling frames
|
||||||
|
_valid4 = helpers.check_session_id(self.arq.session_id, bytes(bytes_out[2:3])) # arq data frames
|
||||||
|
if (
|
||||||
|
_valid1
|
||||||
|
or _valid2
|
||||||
|
or _valid3
|
||||||
|
or _valid4
|
||||||
|
or frametype
|
||||||
|
in [
|
||||||
|
FR_TYPE.CQ.value,
|
||||||
|
FR_TYPE.QRV.value,
|
||||||
|
FR_TYPE.PING.value,
|
||||||
|
FR_TYPE.BEACON.value,
|
||||||
|
FR_TYPE.IS_WRITING.value,
|
||||||
|
FR_TYPE.FEC.value,
|
||||||
|
FR_TYPE.FEC_WAKEUP.value,
|
||||||
|
]
|
||||||
|
):
|
||||||
|
return True
|
||||||
|
return False
|
|
@ -1,5 +1,6 @@
|
||||||
import threading
|
import threading
|
||||||
import data_handler
|
import data_handler
|
||||||
|
import frame_dispatcher
|
||||||
import modem
|
import modem
|
||||||
import structlog
|
import structlog
|
||||||
import audio
|
import audio
|
||||||
|
@ -50,10 +51,10 @@ class SM:
|
||||||
if self.start_modem():
|
if self.start_modem():
|
||||||
self.modem_events.put(json.dumps({"freedata": "modem-event", "event": "restart"}))
|
self.modem_events.put(json.dumps({"freedata": "modem-event", "event": "restart"}))
|
||||||
elif cmd in ['fft:true']:
|
elif cmd in ['fft:true']:
|
||||||
self.modem.set_FFT_stream(True)
|
#self.modem.set_FFT_stream(True)
|
||||||
self.enable_fft=True
|
self.enable_fft=True
|
||||||
elif cmd in ['fft:false']:
|
elif cmd in ['fft:false']:
|
||||||
self.modem.set_FFT_stream(False)
|
#self.modem.set_FFT_stream(False)
|
||||||
self.enable_fft=False
|
self.enable_fft=False
|
||||||
else:
|
else:
|
||||||
self.log.warning("[SVC] modem command processing failed", cmd=cmd, state=self.states.is_modem_running)
|
self.log.warning("[SVC] modem command processing failed", cmd=cmd, state=self.states.is_modem_running)
|
||||||
|
@ -69,7 +70,8 @@ class SM:
|
||||||
if False not in audio_test and None not in audio_test and not self.states.is_modem_running:
|
if False not in audio_test and None not in audio_test and not self.states.is_modem_running:
|
||||||
self.log.info("starting modem....")
|
self.log.info("starting modem....")
|
||||||
self.modem = modem.RF(self.config, self.modem_events, self.modem_fft, self.modem_service, self.states)
|
self.modem = modem.RF(self.config, self.modem_events, self.modem_fft, self.modem_service, self.states)
|
||||||
self.data_handler = data_handler.DATA(self.config, self.modem_events, self.states)
|
#self.data_handler = data_handler.DATA(self.config, self.modem_events, self.states)
|
||||||
|
self.frame_dispatcher = frame_dispatcher.DISPATCHER(self.config, self.modem_events, self.states)
|
||||||
self.states.set("is_modem_running", True)
|
self.states.set("is_modem_running", True)
|
||||||
self.modem.set_FFT_stream(self.enable_fft)
|
self.modem.set_FFT_stream(self.enable_fft)
|
||||||
return True
|
return True
|
||||||
|
@ -86,9 +88,9 @@ class SM:
|
||||||
def stop_modem(self):
|
def stop_modem(self):
|
||||||
self.log.info("stopping modem....")
|
self.log.info("stopping modem....")
|
||||||
del self.modem
|
del self.modem
|
||||||
del self.data_handler
|
#del self.data_handler
|
||||||
self.modem = False
|
self.modem = False
|
||||||
self.data_handler = False
|
#self.data_handler = False
|
||||||
self.states.set("is_modem_running", False)
|
self.states.set("is_modem_running", False)
|
||||||
|
|
||||||
def test_audio(self):
|
def test_audio(self):
|
||||||
|
|
Loading…
Reference in a new issue