FreeDATA/modem/data_handler_ping.py

142 lines
4.5 KiB
Python
Raw Normal View History

2023-11-19 13:56:29 +00:00
import time
from modem_frametypes import FRAME_TYPE as FR_TYPE
from codec2 import FREEDV_MODE
import helpers
import uuid
2023-11-19 16:09:47 +00:00
import structlog
2023-11-23 09:14:11 +00:00
from data_handler import DATA
class PING(DATA):
2023-11-19 16:09:47 +00:00
def __init__(self, config, event_queue, states):
2023-11-23 09:14:11 +00:00
super().__init__(config, event_queue, states)
2023-11-19 16:09:47 +00:00
self.log = structlog.get_logger("DHPING")
self.states = states
self.event_queue = event_queue
self.config = config
2023-11-19 13:56:29 +00:00
def received_ping(self, deconstructed_frame: list, snr) -> None:
2023-11-19 13:56:29 +00:00
"""
Called if we received a ping
Args:
data_in:bytes:
"""
destination_crc = deconstructed_frame["destination_crc"]
origin_crc = deconstructed_frame["origin_crc"]
origin = deconstructed_frame["origin"]
2023-11-19 13:56:29 +00:00
# check if callsign ssid override
valid, mycallsign = helpers.check_callsign(self.config['STATION']['mycall'], destination_crc, self.config['STATION']['ssid_list'])
2023-11-19 13:56:29 +00:00
if not valid:
# PING packet not for me.
self.log.debug("[Modem] received_ping: ping not for this station.")
return
self.dxcallsign_crc = origin_crc
self.dxcallsign = origin
2023-11-19 13:56:29 +00:00
self.log.info(
f"[Modem] PING REQ from [{origin}] to [{mycallsign}]",
2023-11-19 13:56:29 +00:00
snr=snr,
)
self.dxgrid = b'------'
helpers.add_to_heard_stations(
origin,
2023-11-19 13:56:29 +00:00
self.dxgrid,
"PING",
snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.send_data_to_socket_queue(
freedata="modem-message",
ping="received",
uuid=str(uuid.uuid4()),
timestamp=int(time.time()),
dxgrid=self.dxgrid,
dxcallsign=origin,
mycallsign=mycallsign,
2023-11-19 13:56:29 +00:00
snr=str(snr),
)
if self.respond_to_call:
self.transmit_ping_ack(snr)
def transmit_ping_ack(self, snr):
"""
transmit a ping ack frame
called by def received_ping
"""
ping_frame = bytearray(self.length_sig0_frame)
ping_frame[:1] = bytes([FR_TYPE.PING_ACK.value])
ping_frame[1:4] = self.dxcallsign_crc
ping_frame[4:7] = self.mycallsign_crc
ping_frame[7:11] = helpers.encode_grid(self.mygrid)
ping_frame[13:14] = helpers.snr_to_bytes(snr)
if self.enable_fsk:
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
else:
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.sig0.value)
def received_ping_ack(self, data_in: bytes, snr) -> None:
"""
Called if a PING ack has been received
Args:
data_in:bytes:
"""
# check if we received correct ping
# check if callsign ssid override
_valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4], self.ssid_list)
if _valid:
self.dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8")
dxsnr = helpers.snr_from_bytes(data_in[13:14])
self.send_data_to_socket_queue(
freedata="modem-message",
ping="acknowledge",
uuid=str(uuid.uuid4()),
timestamp=int(time.time()),
dxgrid=str(self.dxgrid, "UTF-8"),
dxcallsign=str(self.dxcallsign, "UTF-8"),
mycallsign=str(mycallsign, "UTF-8"),
snr=str(snr),
dxsnr=str(dxsnr)
)
# combined_snr = own rx snr / snr on dx side
combined_snr = f"{snr}/{dxsnr}"
helpers.add_to_heard_stations(
self.dxcallsign,
self.dxgrid,
"PING-ACK",
combined_snr,
self.modem_frequency_offset,
self.states.radio_frequency,
self.states.heard_stations
)
self.log.info(
"[Modem] PING ACK ["
+ str(mycallsign, "UTF-8")
+ "] >|< ["
+ str(self.dxcallsign, "UTF-8")
+ "]",
snr=snr,
dxsnr=dxsnr,
)
self.states.set("is_modem_busy", False)
else:
self.log.info(
"[Modem] FOREIGN PING ACK ["
+ str(self.mycallsign, "UTF-8")
+ "] ??? ["
+ str(bytes(data_in[4:7]), "UTF-8")
+ "]",
snr=snr,
)