mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
added session statistics
This commit is contained in:
parent
956cede593
commit
dd4ca1903b
|
@ -52,23 +52,23 @@ class ARQDataTypeHandler:
|
|||
return session_type
|
||||
return None
|
||||
|
||||
def dispatch(self, type_byte: int, data: bytearray):
|
||||
def dispatch(self, type_byte: int, data: bytearray, statistics: dict):
|
||||
session_type = self.get_session_type_from_value(type_byte)
|
||||
|
||||
self.state_manager.setARQ(False)
|
||||
|
||||
if session_type and session_type in self.handlers and 'handle' in self.handlers[session_type]:
|
||||
return self.handlers[session_type]['handle'](data)
|
||||
return self.handlers[session_type]['handle'](data, statistics)
|
||||
else:
|
||||
self.log(f"Unknown handling endpoint for type: {type_byte}", isWarning=True)
|
||||
|
||||
def failed(self, type_byte: int, data: bytearray):
|
||||
def failed(self, type_byte: int, data: bytearray, statistics: dict):
|
||||
session_type = self.get_session_type_from_value(type_byte)
|
||||
|
||||
self.state_manager.setARQ(False)
|
||||
|
||||
if session_type in self.handlers and 'failed' in self.handlers[session_type]:
|
||||
return self.handlers[session_type]['failed'](data)
|
||||
return self.handlers[session_type]['failed'](data, statistics)
|
||||
else:
|
||||
self.log(f"Unknown handling endpoint: {session_type}", isWarning=True)
|
||||
|
||||
|
@ -78,13 +78,13 @@ class ARQDataTypeHandler:
|
|||
else:
|
||||
self.log(f"Unknown preparation endpoint: {session_type}", isWarning=True)
|
||||
|
||||
def transmitted(self, type_byte: int, data: bytearray):
|
||||
def transmitted(self, type_byte: int, data: bytearray, statistics: dict):
|
||||
session_type = self.get_session_type_from_value(type_byte)
|
||||
|
||||
self.state_manager.setARQ(False)
|
||||
|
||||
if session_type in self.handlers and 'transmitted' in self.handlers[session_type]:
|
||||
return self.handlers[session_type]['transmitted'](data)
|
||||
return self.handlers[session_type]['transmitted'](data, statistics)
|
||||
else:
|
||||
self.log(f"Unknown handling endpoint: {session_type}", isWarning=True)
|
||||
|
||||
|
@ -97,14 +97,14 @@ class ARQDataTypeHandler:
|
|||
self.log(f"Preparing uncompressed data: {len(data)} Bytes")
|
||||
return data
|
||||
|
||||
def handle_raw(self, data):
|
||||
def handle_raw(self, data, statistics):
|
||||
self.log(f"Handling uncompressed data: {len(data)} Bytes")
|
||||
return data
|
||||
|
||||
def failed_raw(self, data):
|
||||
def failed_raw(self, data, statistics):
|
||||
return
|
||||
|
||||
def transmitted_raw(self, data):
|
||||
def transmitted_raw(self, data, statistics):
|
||||
return data
|
||||
|
||||
def prepare_raw_lzma(self, data):
|
||||
|
@ -112,15 +112,15 @@ class ARQDataTypeHandler:
|
|||
self.log(f"Preparing LZMA compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
|
||||
return compressed_data
|
||||
|
||||
def handle_raw_lzma(self, data):
|
||||
def handle_raw_lzma(self, data, statistics):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
self.log(f"Handling LZMA compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
|
||||
return decompressed_data
|
||||
|
||||
def failed_raw_lzma(self, data):
|
||||
def failed_raw_lzma(self, data, statistics):
|
||||
return
|
||||
|
||||
def transmitted_raw_lzma(self, data):
|
||||
def transmitted_raw_lzma(self, data, statistics):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
return decompressed_data
|
||||
|
||||
|
@ -129,15 +129,15 @@ class ARQDataTypeHandler:
|
|||
self.log(f"Preparing GZIP compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
|
||||
return compressed_data
|
||||
|
||||
def handle_raw_gzip(self, data):
|
||||
def handle_raw_gzip(self, data, statistics):
|
||||
decompressed_data = gzip.decompress(data)
|
||||
self.log(f"Handling GZIP compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
|
||||
return decompressed_data
|
||||
|
||||
def failed_raw_gzip(self, data):
|
||||
def failed_raw_gzip(self, data, statistics):
|
||||
return
|
||||
|
||||
def transmitted_raw_gzip(self, data):
|
||||
def transmitted_raw_gzip(self, data, statistics):
|
||||
decompressed_data = gzip.decompress(data)
|
||||
return decompressed_data
|
||||
|
||||
|
@ -146,19 +146,19 @@ class ARQDataTypeHandler:
|
|||
self.log(f"Preparing LZMA compressed P2PMSG data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
|
||||
return compressed_data
|
||||
|
||||
def handle_p2pmsg_lzma(self, data):
|
||||
def handle_p2pmsg_lzma(self, data, statistics):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
|
||||
message_received(self.event_manager, self.state_manager, decompressed_data)
|
||||
message_received(self.event_manager, self.state_manager, decompressed_data, statistics)
|
||||
return decompressed_data
|
||||
|
||||
def failed_p2pmsg_lzma(self, data):
|
||||
def failed_p2pmsg_lzma(self, data, statistics):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
self.log(f"Handling failed LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
|
||||
message_failed(self.event_manager, self.state_manager, decompressed_data)
|
||||
message_failed(self.event_manager, self.state_manager, decompressed_data, statistics)
|
||||
return decompressed_data
|
||||
|
||||
def transmitted_p2pmsg_lzma(self, data):
|
||||
def transmitted_p2pmsg_lzma(self, data, statistics):
|
||||
decompressed_data = lzma.decompress(data)
|
||||
message_transmitted(self.event_manager, self.state_manager, decompressed_data)
|
||||
message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics)
|
||||
return decompressed_data
|
|
@ -1,3 +1,4 @@
|
|||
import datetime
|
||||
import queue, threading
|
||||
import codec2
|
||||
import data_frame_factory
|
||||
|
@ -57,6 +58,11 @@ class ARQSession():
|
|||
self.session_ended = 0
|
||||
self.session_max_age = 500
|
||||
|
||||
# histogram lists for storing statistics
|
||||
self.snr_histogram = []
|
||||
self.bpm_histogram = []
|
||||
self.time_histogram = []
|
||||
|
||||
def log(self, message, isWarning = False):
|
||||
msg = f"[{type(self).__name__}][id={self.id}][state={self.state}]: {message}"
|
||||
logger = self.logger.warn if isWarning else self.logger.info
|
||||
|
@ -86,7 +92,7 @@ class ARQSession():
|
|||
)
|
||||
|
||||
def set_details(self, snr, frequency_offset):
|
||||
self.snr.append(snr)
|
||||
self.snr = snr
|
||||
self.frequency_offset = frequency_offset
|
||||
|
||||
def on_frame_received(self, frame):
|
||||
|
@ -98,7 +104,7 @@ class ARQSession():
|
|||
action_name = self.STATE_TRANSITION[self.state][frame_type]
|
||||
received_data, type_byte = getattr(self, action_name)(frame)
|
||||
if isinstance(received_data, bytearray) and isinstance(type_byte, int):
|
||||
self.arq_data_type_handler.dispatch(type_byte, received_data)
|
||||
self.arq_data_type_handler.dispatch(type_byte, received_data, self.calculate_session_statistics())
|
||||
return
|
||||
|
||||
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
|
||||
|
@ -110,6 +116,9 @@ class ARQSession():
|
|||
return False
|
||||
|
||||
def calculate_session_duration(self):
|
||||
if self.session_ended == 0:
|
||||
return time.time() - self.session_started
|
||||
|
||||
return self.session_ended - self.session_started
|
||||
|
||||
def calculate_session_statistics(self):
|
||||
|
@ -124,11 +133,25 @@ class ARQSession():
|
|||
else:
|
||||
bytes_per_minute = 0
|
||||
|
||||
# Convert histograms lists to dictionaries
|
||||
time_histogram_dict = {i: timestamp for i, timestamp in enumerate(self.time_histogram)}
|
||||
snr_histogram_dict = {i: snr for i, snr in enumerate(self.snr_histogram)}
|
||||
bpm_histogram_dict = {i: bpm for i, bpm in enumerate(self.bpm_histogram)}
|
||||
|
||||
return {
|
||||
'total_bytes': total_bytes,
|
||||
'duration': duration,
|
||||
'bytes_per_minute': bytes_per_minute
|
||||
}
|
||||
'total_bytes': total_bytes,
|
||||
'duration': duration,
|
||||
'bytes_per_minute': bytes_per_minute,
|
||||
'time_histogram': time_histogram_dict,
|
||||
'snr_histogram': snr_histogram_dict,
|
||||
'bpm_histogram': bpm_histogram_dict,
|
||||
}
|
||||
|
||||
def update_histograms(self):
|
||||
stats = self.calculate_session_statistics()
|
||||
self.snr_histogram.append(self.snr)
|
||||
self.bpm_histogram.append(stats['bytes_per_minute'])
|
||||
self.time_histogram.append(datetime.datetime.now().isoformat())
|
||||
|
||||
def get_appropriate_speed_level(self, snr):
|
||||
# Start with the lowest speed level as default
|
||||
|
|
|
@ -105,7 +105,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.id,
|
||||
self.dxcall,
|
||||
self.version,
|
||||
self.snr[0], flag_abort=self.abort)
|
||||
self.snr, flag_abort=self.abort)
|
||||
self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
||||
if not self.abort:
|
||||
self.set_state(IRS_State.OPEN_ACK_SENT)
|
||||
|
@ -123,7 +123,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length, self.state.name)
|
||||
|
||||
info_ack = self.frame_factory.build_arq_session_info_ack(
|
||||
self.id, self.total_crc, self.snr[0],
|
||||
self.id, self.total_crc, self.snr,
|
||||
self.speed_level, self.frames_per_burst, flag_abort=self.abort)
|
||||
self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling)
|
||||
if not self.abort:
|
||||
|
@ -150,13 +150,15 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.received_bytes += len(data_part)
|
||||
self.log(f"Received {self.received_bytes}/{self.total_length} bytes")
|
||||
self.event_manager.send_arq_session_progress(
|
||||
False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name)
|
||||
False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name, self.calculate_session_statistics())
|
||||
|
||||
return True
|
||||
|
||||
def receive_data(self, burst_frame):
|
||||
self.process_incoming_data(burst_frame)
|
||||
|
||||
# update statistics
|
||||
self.update_histograms()
|
||||
|
||||
if not self.all_data_received():
|
||||
self.calibrate_speed_settings(burst_frame=burst_frame)
|
||||
ack = self.frame_factory.build_arq_burst_ack(
|
||||
|
@ -164,7 +166,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.received_bytes,
|
||||
self.speed_level,
|
||||
self.frames_per_burst,
|
||||
self.snr[0],
|
||||
self.snr,
|
||||
flag_abort=self.abort
|
||||
)
|
||||
|
||||
|
@ -178,7 +180,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.received_bytes,
|
||||
self.speed_level,
|
||||
self.frames_per_burst,
|
||||
self.snr[0],
|
||||
self.snr,
|
||||
flag_final=True,
|
||||
flag_checksum=True)
|
||||
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
||||
|
@ -195,7 +197,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
self.received_bytes,
|
||||
self.speed_level,
|
||||
self.frames_per_burst,
|
||||
self.snr[0],
|
||||
self.snr,
|
||||
flag_final=True,
|
||||
flag_checksum=False)
|
||||
self.transmit_frame(ack, mode=FREEDV_MODE.signalling)
|
||||
|
@ -208,7 +210,7 @@ class ARQSessionIRS(arq_session.ARQSession):
|
|||
else:
|
||||
received_speed_level = 0
|
||||
|
||||
latest_snr = self.snr[-1] if self.snr else -10
|
||||
latest_snr = self.snr if self.snr else -10
|
||||
appropriate_speed_level = self.get_appropriate_speed_level(latest_snr)
|
||||
modes_to_decode = {}
|
||||
|
||||
|
|
|
@ -141,7 +141,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
|||
|
||||
info_frame = self.frame_factory.build_arq_session_info(self.id, self.total_length,
|
||||
helpers.get_crc_32(self.data),
|
||||
self.snr[0], self.type_byte)
|
||||
self.snr, self.type_byte)
|
||||
|
||||
self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||
self.set_state(ISS_State.INFO_SENT)
|
||||
|
@ -149,14 +149,15 @@ class ARQSessionISS(arq_session.ARQSession):
|
|||
return None, None
|
||||
|
||||
def send_data(self, irs_frame):
|
||||
# update statistics
|
||||
self.update_histograms()
|
||||
|
||||
self.update_speed_level(irs_frame)
|
||||
|
||||
if 'offset' in irs_frame:
|
||||
self.confirmed_bytes = irs_frame['offset']
|
||||
self.log(f"IRS confirmed {self.confirmed_bytes}/{self.total_length} bytes")
|
||||
self.event_manager.send_arq_session_progress(
|
||||
True, self.id, self.dxcall, self.confirmed_bytes, self.total_length, self.state.name)
|
||||
True, self.id, self.dxcall, self.confirmed_bytes, self.total_length, self.state.name, statistics=self.calculate_session_statistics())
|
||||
|
||||
# check if we received an abort flag
|
||||
if irs_frame["flag"]["ABORT"]:
|
||||
|
@ -190,9 +191,9 @@ class ARQSessionISS(arq_session.ARQSession):
|
|||
self.set_state(ISS_State.ENDED)
|
||||
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
|
||||
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics())
|
||||
self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics())
|
||||
self.state_manager.remove_arq_iss_session(self.id)
|
||||
self.states.setARQ(False)
|
||||
self.arq_data_type_handler.transmitted(self.type_byte, self.data)
|
||||
return None, None
|
||||
|
||||
def transmission_failed(self, irs_frame=None):
|
||||
|
|
|
@ -15,7 +15,7 @@ class SendMessageCommand(TxCommand):
|
|||
def set_params_from_api(self, apiParams):
|
||||
origin = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}"
|
||||
self.message = MessageP2P.from_api_params(origin, apiParams)
|
||||
DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued')
|
||||
DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), statistics={}, direction='transmit', status='queued')
|
||||
|
||||
def transmit(self, modem):
|
||||
|
||||
|
|
|
@ -42,7 +42,10 @@ class EventManager:
|
|||
}
|
||||
self.broadcast(event)
|
||||
|
||||
def send_arq_session_progress(self, outbound: bool, session_id, dxcall, received_bytes, total_bytes, state):
|
||||
def send_arq_session_progress(self, outbound: bool, session_id, dxcall, received_bytes, total_bytes, state, statistics=None):
|
||||
if statistics is None:
|
||||
statistics = {}
|
||||
|
||||
direction = 'outbound' if outbound else 'inbound'
|
||||
event = {
|
||||
"type": "arq",
|
||||
|
@ -52,6 +55,7 @@ class EventManager:
|
|||
'received_bytes': received_bytes,
|
||||
'total_bytes': total_bytes,
|
||||
'state': state,
|
||||
'statistics': statistics,
|
||||
}
|
||||
}
|
||||
self.broadcast(event)
|
||||
|
|
|
@ -7,23 +7,23 @@ from message_system_db_messages import DatabaseManagerMessages
|
|||
#import command_message_send
|
||||
|
||||
|
||||
def message_received(event_manager, state_manager, data):
|
||||
def message_received(event_manager, state_manager, data, statistics):
|
||||
decompressed_json_string = data.decode('utf-8')
|
||||
received_message_obj = MessageP2P.from_payload(decompressed_json_string)
|
||||
received_message_dict = MessageP2P.to_dict(received_message_obj)
|
||||
DatabaseManagerMessages(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False)
|
||||
DatabaseManagerMessages(event_manager).add_message(received_message_dict, statistics, direction='receive', status='received', is_read=False)
|
||||
|
||||
def message_transmitted(event_manager, state_manager, data):
|
||||
def message_transmitted(event_manager, state_manager, data, statistics):
|
||||
decompressed_json_string = data.decode('utf-8')
|
||||
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
|
||||
payload_message = MessageP2P.to_dict(payload_message_obj)
|
||||
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'transmitted'})
|
||||
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'transmitted', 'statistics': statistics})
|
||||
|
||||
def message_failed(event_manager, state_manager, data):
|
||||
def message_failed(event_manager, state_manager, data, statistics):
|
||||
decompressed_json_string = data.decode('utf-8')
|
||||
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
|
||||
payload_message = MessageP2P.to_dict(payload_message_obj)
|
||||
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'})
|
||||
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], statistics, update_data={'status': 'failed', 'statistics': statistics})
|
||||
|
||||
class MessageP2P:
|
||||
def __init__(self, id: str, origin: str, destination: str, body: str, attachments: list) -> None:
|
||||
|
|
|
@ -11,7 +11,7 @@ class DatabaseManagerMessages(DatabaseManager):
|
|||
super().__init__(uri)
|
||||
self.attachments_manager = DatabaseManagerAttachments(uri)
|
||||
|
||||
def add_message(self, message_data, direction='receive', status=None, is_read=True):
|
||||
def add_message(self, message_data, statistics, direction='receive', status=None, is_read=True):
|
||||
session = self.get_thread_scoped_session()
|
||||
try:
|
||||
# Create and add the origin and destination Stations
|
||||
|
@ -34,7 +34,8 @@ class DatabaseManagerMessages(DatabaseManager):
|
|||
direction=direction,
|
||||
status_id=status.id if status else None,
|
||||
is_read=is_read,
|
||||
attempt=0
|
||||
attempt=0,
|
||||
statistics=statistics
|
||||
)
|
||||
|
||||
session.add(new_message)
|
||||
|
@ -131,6 +132,9 @@ class DatabaseManagerMessages(DatabaseManager):
|
|||
if 'status' in update_data:
|
||||
message.status = self.get_or_create_status(session, update_data['status'])
|
||||
|
||||
if 'statistics' in update_data:
|
||||
message.statistics = update_data['statistics']
|
||||
|
||||
session.commit()
|
||||
self.log(f"Updated: {message_id}")
|
||||
self.event_manager.freedata_message_db_change()
|
||||
|
|
Loading…
Reference in a new issue