2023-11-19 13:56:29 +00:00
|
|
|
import time
|
|
|
|
from modem_frametypes import FRAME_TYPE as FR_TYPE
|
|
|
|
from codec2 import FREEDV_MODE
|
|
|
|
import helpers
|
|
|
|
import uuid
|
2023-11-19 16:09:47 +00:00
|
|
|
import structlog
|
2023-11-23 09:14:11 +00:00
|
|
|
from data_handler import DATA
|
|
|
|
class PING(DATA):
|
2023-11-19 16:09:47 +00:00
|
|
|
def __init__(self, config, event_queue, states):
|
2023-11-23 09:14:11 +00:00
|
|
|
super().__init__(config, event_queue, states)
|
|
|
|
|
2023-11-19 16:09:47 +00:00
|
|
|
self.log = structlog.get_logger("DHPING")
|
|
|
|
self.states = states
|
|
|
|
self.event_queue = event_queue
|
|
|
|
self.config = config
|
2023-11-19 13:56:29 +00:00
|
|
|
|
2023-11-26 10:41:37 +00:00
|
|
|
def received_ping(self, deconstructed_frame: list, snr) -> None:
|
2023-11-19 13:56:29 +00:00
|
|
|
"""
|
|
|
|
Called if we received a ping
|
|
|
|
|
|
|
|
Args:
|
|
|
|
data_in:bytes:
|
|
|
|
|
|
|
|
"""
|
2023-11-26 10:41:37 +00:00
|
|
|
# use --> deconstructed_frame
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#dxcallsign_crc = bytes(data_in[4:7])
|
|
|
|
mycallsign_crc = deconstructed_frame["mycallsign_crc"]
|
|
|
|
dxcallsign_crc = deconstructed_frame["dxcallsign_crc"]
|
|
|
|
dxcallsign = deconstructed_frame["dxcallsign"]
|
|
|
|
#dxcallsign = helpers.bytes_to_callsign(bytes(data_in[7:13]))
|
2023-11-19 13:56:29 +00:00
|
|
|
|
|
|
|
# check if callsign ssid override
|
2023-11-26 10:41:37 +00:00
|
|
|
valid, mycallsign = helpers.check_callsign(self.mycallsign, mycallsign_crc, self.ssid_list)
|
2023-11-19 13:56:29 +00:00
|
|
|
if not valid:
|
|
|
|
# PING packet not for me.
|
|
|
|
self.log.debug("[Modem] received_ping: ping not for this station.")
|
|
|
|
return
|
|
|
|
|
|
|
|
self.dxcallsign_crc = dxcallsign_crc
|
|
|
|
self.dxcallsign = dxcallsign
|
|
|
|
self.log.info(
|
|
|
|
"[Modem] PING REQ ["
|
|
|
|
+ str(mycallsign, "UTF-8")
|
|
|
|
+ "] <<< ["
|
|
|
|
+ str(dxcallsign, "UTF-8")
|
|
|
|
+ "]",
|
|
|
|
snr=snr,
|
|
|
|
)
|
|
|
|
|
|
|
|
self.dxgrid = b'------'
|
|
|
|
helpers.add_to_heard_stations(
|
|
|
|
dxcallsign,
|
|
|
|
self.dxgrid,
|
|
|
|
"PING",
|
|
|
|
snr,
|
|
|
|
self.modem_frequency_offset,
|
|
|
|
self.states.radio_frequency,
|
|
|
|
self.states.heard_stations
|
|
|
|
)
|
|
|
|
|
|
|
|
self.send_data_to_socket_queue(
|
|
|
|
freedata="modem-message",
|
|
|
|
ping="received",
|
|
|
|
uuid=str(uuid.uuid4()),
|
|
|
|
timestamp=int(time.time()),
|
|
|
|
dxgrid=str(self.dxgrid, "UTF-8"),
|
|
|
|
dxcallsign=str(dxcallsign, "UTF-8"),
|
|
|
|
mycallsign=str(mycallsign, "UTF-8"),
|
|
|
|
snr=str(snr),
|
|
|
|
)
|
|
|
|
if self.respond_to_call:
|
|
|
|
self.transmit_ping_ack(snr)
|
|
|
|
|
|
|
|
def transmit_ping_ack(self, snr):
|
|
|
|
"""
|
|
|
|
|
|
|
|
transmit a ping ack frame
|
|
|
|
called by def received_ping
|
|
|
|
"""
|
|
|
|
ping_frame = bytearray(self.length_sig0_frame)
|
|
|
|
ping_frame[:1] = bytes([FR_TYPE.PING_ACK.value])
|
|
|
|
ping_frame[1:4] = self.dxcallsign_crc
|
|
|
|
ping_frame[4:7] = self.mycallsign_crc
|
|
|
|
ping_frame[7:11] = helpers.encode_grid(self.mygrid)
|
|
|
|
ping_frame[13:14] = helpers.snr_to_bytes(snr)
|
|
|
|
|
|
|
|
if self.enable_fsk:
|
|
|
|
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.fsk_ldpc_0.value)
|
|
|
|
else:
|
|
|
|
self.enqueue_frame_for_tx([ping_frame], c2_mode=FREEDV_MODE.sig0.value)
|
|
|
|
|
|
|
|
def received_ping_ack(self, data_in: bytes, snr) -> None:
|
|
|
|
"""
|
|
|
|
Called if a PING ack has been received
|
|
|
|
Args:
|
|
|
|
data_in:bytes:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# check if we received correct ping
|
|
|
|
# check if callsign ssid override
|
|
|
|
_valid, mycallsign = helpers.check_callsign(self.mycallsign, data_in[1:4], self.ssid_list)
|
|
|
|
if _valid:
|
|
|
|
|
|
|
|
self.dxgrid = bytes(helpers.decode_grid(data_in[7:11]), "UTF-8")
|
|
|
|
dxsnr = helpers.snr_from_bytes(data_in[13:14])
|
|
|
|
self.send_data_to_socket_queue(
|
|
|
|
freedata="modem-message",
|
|
|
|
ping="acknowledge",
|
|
|
|
uuid=str(uuid.uuid4()),
|
|
|
|
timestamp=int(time.time()),
|
|
|
|
dxgrid=str(self.dxgrid, "UTF-8"),
|
|
|
|
dxcallsign=str(self.dxcallsign, "UTF-8"),
|
|
|
|
mycallsign=str(mycallsign, "UTF-8"),
|
|
|
|
snr=str(snr),
|
|
|
|
dxsnr=str(dxsnr)
|
|
|
|
)
|
|
|
|
# combined_snr = own rx snr / snr on dx side
|
|
|
|
combined_snr = f"{snr}/{dxsnr}"
|
|
|
|
helpers.add_to_heard_stations(
|
|
|
|
self.dxcallsign,
|
|
|
|
self.dxgrid,
|
|
|
|
"PING-ACK",
|
|
|
|
combined_snr,
|
|
|
|
self.modem_frequency_offset,
|
|
|
|
self.states.radio_frequency,
|
|
|
|
self.states.heard_stations
|
|
|
|
)
|
|
|
|
|
|
|
|
self.log.info(
|
|
|
|
"[Modem] PING ACK ["
|
|
|
|
+ str(mycallsign, "UTF-8")
|
|
|
|
+ "] >|< ["
|
|
|
|
+ str(self.dxcallsign, "UTF-8")
|
|
|
|
+ "]",
|
|
|
|
snr=snr,
|
|
|
|
dxsnr=dxsnr,
|
|
|
|
)
|
|
|
|
self.states.set("is_modem_busy", False)
|
|
|
|
else:
|
|
|
|
self.log.info(
|
|
|
|
"[Modem] FOREIGN PING ACK ["
|
|
|
|
+ str(self.mycallsign, "UTF-8")
|
|
|
|
+ "] ??? ["
|
|
|
|
+ str(bytes(data_in[4:7]), "UTF-8")
|
|
|
|
+ "]",
|
|
|
|
snr=snr,
|
|
|
|
)
|