add distance information to heard stations and events

This commit is contained in:
DJ2LS 2024-03-27 10:16:14 +01:00
parent b0e46fb998
commit 329ce9dc20
3 changed files with 141 additions and 52 deletions

View file

@ -6,9 +6,11 @@ import structlog
import time, uuid
from codec2 import FREEDV_MODE
from message_system_db_manager import DatabaseManager
import maidenhead
TESTMODE = False
class FrameHandler():
def __init__(self, name: str, config, states: StateManager, event_manager: EventManager,
@ -101,14 +103,21 @@ class FrameHandler():
self.states.add_activity(activity)
def add_to_heard_stations(self):
frame = self.details['frame']
if 'origin' not in frame:
return
dxgrid = frame['gridsquare'] if 'gridsquare' in frame else "------"
dxgrid = frame.get('gridsquare', "------")
# Initialize distance values
distance_km = None
distance_miles = None
if dxgrid != "------" and frame.get('gridsquare'):
distance_dict = maidenhead.distance_between_locators(self.config['STATION']['mygrid'], frame['gridsquare'])
distance_km = distance_dict['kilometers']
distance_miles = distance_dict['miles']
helpers.add_to_heard_stations(
frame['origin'],
dxgrid,
@ -117,8 +126,9 @@ class FrameHandler():
self.details['frequency_offset'],
self.states.radio_frequency,
self.states.heard_stations,
distance_km=distance_km, # Pass the kilometer distance
distance_miles=distance_miles # Pass the miles distance
)
def make_event(self):
event = {
@ -135,6 +145,9 @@ class FrameHandler():
if 'gridsquare' in self.details['frame']:
event['gridsquare'] = self.details['frame']['gridsquare']
distance = maidenhead.distance_between_locators(self.config['STATION']['mygrid'], self.details['frame']['gridsquare'])
event['distance_kilometers'] = distance['kilometers']
event['distance_miles'] = distance['miles']
return event
@ -164,8 +177,6 @@ class FrameHandler():
self.details['freedv_inst'] = freedv_inst
self.details['bytes_per_frame'] = bytes_per_frame
print(self.details)
# look in database for a full callsign if only crc is present
if 'origin' not in frame and 'origin_crc' in frame:
self.details['frame']['origin'] = DatabaseManager(self.event_manager).get_callsign_by_checksum(frame['origin_crc'])

View file

@ -121,60 +121,44 @@ def get_crc_32(data: str) -> bytes:
return crc_algorithm(data).to_bytes(4, byteorder="big")
def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list):
"""
from datetime import datetime, timezone
import time
def add_to_heard_stations(dxcallsign, dxgrid, datatype, snr, offset, frequency, heard_stations_list, distance_km=None,
distance_miles=None):
"""
Args:
dxcallsign:
dxgrid:
datatype:
snr:
offset:
frequency:
dxcallsign (str): The callsign of the DX station.
dxgrid (str): The Maidenhead grid square of the DX station.
datatype (str): The type of data received (e.g., FT8, CW).
snr (int): Signal-to-noise ratio of the received signal.
offset (float): Frequency offset.
frequency (float): Base frequency of the received signal.
heard_stations_list (list): List containing heard stations.
distance_km (float): Distance to the DX station in kilometers.
distance_miles (float): Distance to the DX station in miles.
Returns:
Nothing
Nothing. The function updates the heard_stations_list in-place.
"""
# check if buffer empty
if len(heard_stations_list) == 0:
heard_stations_list.append(
[dxcallsign, dxgrid, int(datetime.now(timezone.utc).timestamp()), datatype, snr, offset, frequency]
)
# if not, we search and update
# Convert current timestamp to an integer
current_timestamp = int(datetime.now(timezone.utc).timestamp())
# Initialize the new entry
new_entry = [
dxcallsign, dxgrid, current_timestamp, datatype, snr, offset, frequency, distance_km, distance_miles
]
# Check if the buffer is empty or if the callsign is not already in the list
if not any(dxcallsign == station[0] for station in heard_stations_list):
heard_stations_list.append(new_entry)
else:
for i in range(len(heard_stations_list)):
# Update callsign with new timestamp
if heard_stations_list[i].count(dxcallsign) > 0:
heard_stations_list[i] = [
dxcallsign,
dxgrid,
int(time.time()),
datatype,
snr,
offset,
frequency,
]
# Search for the existing entry and update
for i, entry in enumerate(heard_stations_list):
if entry[0] == dxcallsign:
heard_stations_list[i] = new_entry
break
# Insert if nothing found
if i == len(heard_stations_list) - 1:
heard_stations_list.append(
[
dxcallsign,
dxgrid,
int(time.time()),
datatype,
snr,
offset,
frequency,
]
)
break
# for idx, item in enumerate(heard_stations_list):
# if dxcallsign in item:
# item = [dxcallsign, int(time.time())]
# heard_stations_list[idx] = item
def callsign_to_bytes(callsign: str) -> bytes:

94
modem/maidenhead.py Normal file
View file

@ -0,0 +1,94 @@
import math
def haversine(lat1, lon1, lat2, lon2):
"""
Calculate the great circle distance in kilometers between two points
on the Earth (specified in decimal degrees).
Parameters:
lat1, lon1: Latitude and longitude of point 1.
lat2, lon2: Latitude and longitude of point 2.
Returns:
float: Distance between the two points in kilometers.
"""
# Radius of the Earth in kilometers. Use 3956 for miles
R = 6371.0
# Convert latitude and longitude from degrees to radians
lat1 = math.radians(lat1)
lon1 = math.radians(lon1)
lat2 = math.radians(lat2)
lon2 = math.radians(lon2)
# Difference in coordinates
dlon = lon2 - lon1
dlat = lat2 - lat1
# Haversine formula
a = math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
distance = R * c
return distance
def maidenhead_to_latlon(grid_square):
"""
Convert a Maidenhead locator to latitude and longitude coordinates.
The output coordinates represent the southwestern corner of the grid square.
Parameters:
grid_square (str): The Maidenhead locator.
Returns:
tuple: A tuple containing the latitude and longitude (in that order) of the grid square's center.
"""
if len(grid_square) < 4 or len(grid_square) % 2 != 0:
raise ValueError("Grid square must be at least 4 characters long and an even length.")
grid_square = grid_square.upper()
lon = -180 + (ord(grid_square[0]) - ord('A')) * 20
lat = -90 + (ord(grid_square[1]) - ord('A')) * 10
lon += (int(grid_square[2]) * 2)
lat += int(grid_square[3])
if len(grid_square) >= 6:
lon += (ord(grid_square[4]) - ord('A')) * (5 / 60)
lat += (ord(grid_square[5]) - ord('A')) * (2.5 / 60)
if len(grid_square) == 8:
lon += int(grid_square[6]) * (5 / 600)
lat += int(grid_square[7]) * (2.5 / 600)
# Adjust to the center of the grid square
if len(grid_square) <= 4:
lon += 1
lat += 0.5
elif len(grid_square) == 6:
lon += 2.5 / 60
lat += 1.25 / 60
else:
lon += 2.5 / 600
lat += 1.25 / 600
return lat, lon
def distance_between_locators(locator1, locator2):
"""
Calculate the distance between two Maidenhead locators and return the result as a dictionary.
Parameters:
locator1 (str): The first Maidenhead locator.
locator2 (str): The second Maidenhead locator.
Returns:
dict: A dictionary containing the distances in kilometers and miles.
"""
lat1, lon1 = maidenhead_to_latlon(locator1)
lat2, lon2 = maidenhead_to_latlon(locator2)
km = haversine(lat1, lon1, lat2, lon2)
miles = km * 0.621371
return {'kilometers': km, 'miles': miles}