diff --git a/modem/arq_data_type_handler.py b/modem/arq_data_type_handler.py index 8745056a..573d4077 100644 --- a/modem/arq_data_type_handler.py +++ b/modem/arq_data_type_handler.py @@ -52,23 +52,23 @@ class ARQDataTypeHandler: return session_type return None - def dispatch(self, type_byte: int, data: bytearray): + def dispatch(self, type_byte: int, data: bytearray, statistics: dict): session_type = self.get_session_type_from_value(type_byte) self.state_manager.setARQ(False) if session_type and session_type in self.handlers and 'handle' in self.handlers[session_type]: - return self.handlers[session_type]['handle'](data) + return self.handlers[session_type]['handle'](data, statistics) else: self.log(f"Unknown handling endpoint for type: {type_byte}", isWarning=True) - def failed(self, type_byte: int, data: bytearray): + def failed(self, type_byte: int, data: bytearray, statistics: dict): session_type = self.get_session_type_from_value(type_byte) self.state_manager.setARQ(False) if session_type in self.handlers and 'failed' in self.handlers[session_type]: - return self.handlers[session_type]['failed'](data) + return self.handlers[session_type]['failed'](data, statistics) else: self.log(f"Unknown handling endpoint: {session_type}", isWarning=True) @@ -78,13 +78,13 @@ class ARQDataTypeHandler: else: self.log(f"Unknown preparation endpoint: {session_type}", isWarning=True) - def transmitted(self, type_byte: int, data: bytearray): + def transmitted(self, type_byte: int, data: bytearray, statistics: dict): session_type = self.get_session_type_from_value(type_byte) self.state_manager.setARQ(False) if session_type in self.handlers and 'transmitted' in self.handlers[session_type]: - return self.handlers[session_type]['transmitted'](data) + return self.handlers[session_type]['transmitted'](data, statistics) else: self.log(f"Unknown handling endpoint: {session_type}", isWarning=True) @@ -97,14 +97,14 @@ class ARQDataTypeHandler: self.log(f"Preparing uncompressed data: {len(data)} Bytes") return data - def handle_raw(self, data): + def handle_raw(self, data, statistics): self.log(f"Handling uncompressed data: {len(data)} Bytes") return data - def failed_raw(self, data): + def failed_raw(self, data, statistics): return - def transmitted_raw(self, data): + def transmitted_raw(self, data, statistics): return data def prepare_raw_lzma(self, data): @@ -112,15 +112,15 @@ class ARQDataTypeHandler: self.log(f"Preparing LZMA compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") return compressed_data - def handle_raw_lzma(self, data): + def handle_raw_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) self.log(f"Handling LZMA compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes") return decompressed_data - def failed_raw_lzma(self, data): + def failed_raw_lzma(self, data, statistics): return - def transmitted_raw_lzma(self, data): + def transmitted_raw_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) return decompressed_data @@ -129,15 +129,15 @@ class ARQDataTypeHandler: self.log(f"Preparing GZIP compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") return compressed_data - def handle_raw_gzip(self, data): + def handle_raw_gzip(self, data, statistics): decompressed_data = gzip.decompress(data) self.log(f"Handling GZIP compressed data: {len(decompressed_data)} Bytes from {len(data)} Bytes") return decompressed_data - def failed_raw_gzip(self, data): + def failed_raw_gzip(self, data, statistics): return - def transmitted_raw_gzip(self, data): + def transmitted_raw_gzip(self, data, statistics): decompressed_data = gzip.decompress(data) return decompressed_data @@ -146,19 +146,19 @@ class ARQDataTypeHandler: self.log(f"Preparing LZMA compressed P2PMSG data: {len(data)} Bytes >>> {len(compressed_data)} Bytes") return compressed_data - def handle_p2pmsg_lzma(self, data): + def handle_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) self.log(f"Handling LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes") - message_received(self.event_manager, self.state_manager, decompressed_data) + message_received(self.event_manager, self.state_manager, decompressed_data, statistics) return decompressed_data - def failed_p2pmsg_lzma(self, data): + def failed_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) self.log(f"Handling failed LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True) - message_failed(self.event_manager, self.state_manager, decompressed_data) + message_failed(self.event_manager, self.state_manager, decompressed_data, statistics) return decompressed_data - def transmitted_p2pmsg_lzma(self, data): + def transmitted_p2pmsg_lzma(self, data, statistics): decompressed_data = lzma.decompress(data) - message_transmitted(self.event_manager, self.state_manager, decompressed_data) + message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics) return decompressed_data \ No newline at end of file diff --git a/modem/arq_session.py b/modem/arq_session.py index 4b259cd6..1e750331 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -1,3 +1,4 @@ +import datetime import queue, threading import codec2 import data_frame_factory @@ -57,6 +58,11 @@ class ARQSession(): self.session_ended = 0 self.session_max_age = 500 + # histogram lists for storing statistics + self.snr_histogram = [] + self.bpm_histogram = [] + self.time_histogram = [] + def log(self, message, isWarning = False): msg = f"[{type(self).__name__}][id={self.id}][state={self.state}]: {message}" logger = self.logger.warn if isWarning else self.logger.info @@ -86,7 +92,7 @@ class ARQSession(): ) def set_details(self, snr, frequency_offset): - self.snr.append(snr) + self.snr = snr self.frequency_offset = frequency_offset def on_frame_received(self, frame): @@ -98,7 +104,7 @@ class ARQSession(): action_name = self.STATE_TRANSITION[self.state][frame_type] received_data, type_byte = getattr(self, action_name)(frame) if isinstance(received_data, bytearray) and isinstance(type_byte, int): - self.arq_data_type_handler.dispatch(type_byte, received_data) + self.arq_data_type_handler.dispatch(type_byte, received_data, self.calculate_session_statistics()) return self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}") @@ -110,6 +116,9 @@ class ARQSession(): return False def calculate_session_duration(self): + if self.session_ended == 0: + return time.time() - self.session_started + return self.session_ended - self.session_started def calculate_session_statistics(self): @@ -124,11 +133,25 @@ class ARQSession(): else: bytes_per_minute = 0 + # Convert histograms lists to dictionaries + time_histogram_dict = {i: timestamp for i, timestamp in enumerate(self.time_histogram)} + snr_histogram_dict = {i: snr for i, snr in enumerate(self.snr_histogram)} + bpm_histogram_dict = {i: bpm for i, bpm in enumerate(self.bpm_histogram)} + return { - 'total_bytes': total_bytes, - 'duration': duration, - 'bytes_per_minute': bytes_per_minute - } + 'total_bytes': total_bytes, + 'duration': duration, + 'bytes_per_minute': bytes_per_minute, + 'time_histogram': time_histogram_dict, + 'snr_histogram': snr_histogram_dict, + 'bpm_histogram': bpm_histogram_dict, + } + + def update_histograms(self): + stats = self.calculate_session_statistics() + self.snr_histogram.append(self.snr) + self.bpm_histogram.append(stats['bytes_per_minute']) + self.time_histogram.append(datetime.datetime.now().isoformat()) def get_appropriate_speed_level(self, snr): # Start with the lowest speed level as default diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 97c4a5a7..74ad0b4a 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -105,7 +105,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.id, self.dxcall, self.version, - self.snr[0], flag_abort=self.abort) + self.snr, flag_abort=self.abort) self.launch_transmit_and_wait(ack_frame, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) if not self.abort: self.set_state(IRS_State.OPEN_ACK_SENT) @@ -123,7 +123,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length, self.state.name) info_ack = self.frame_factory.build_arq_session_info_ack( - self.id, self.total_crc, self.snr[0], + self.id, self.total_crc, self.snr, self.speed_level, self.frames_per_burst, flag_abort=self.abort) self.launch_transmit_and_wait(info_ack, self.TIMEOUT_CONNECT, mode=FREEDV_MODE.signalling) if not self.abort: @@ -150,13 +150,15 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_bytes += len(data_part) self.log(f"Received {self.received_bytes}/{self.total_length} bytes") self.event_manager.send_arq_session_progress( - False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name) + False, self.id, self.dxcall, self.received_bytes, self.total_length, self.state.name, self.calculate_session_statistics()) return True def receive_data(self, burst_frame): self.process_incoming_data(burst_frame) - + # update statistics + self.update_histograms() + if not self.all_data_received(): self.calibrate_speed_settings(burst_frame=burst_frame) ack = self.frame_factory.build_arq_burst_ack( @@ -164,7 +166,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_bytes, self.speed_level, self.frames_per_burst, - self.snr[0], + self.snr, flag_abort=self.abort ) @@ -178,7 +180,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_bytes, self.speed_level, self.frames_per_burst, - self.snr[0], + self.snr, flag_final=True, flag_checksum=True) self.transmit_frame(ack, mode=FREEDV_MODE.signalling) @@ -195,7 +197,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_bytes, self.speed_level, self.frames_per_burst, - self.snr[0], + self.snr, flag_final=True, flag_checksum=False) self.transmit_frame(ack, mode=FREEDV_MODE.signalling) @@ -208,7 +210,7 @@ class ARQSessionIRS(arq_session.ARQSession): else: received_speed_level = 0 - latest_snr = self.snr[-1] if self.snr else -10 + latest_snr = self.snr if self.snr else -10 appropriate_speed_level = self.get_appropriate_speed_level(latest_snr) modes_to_decode = {} diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index 61442ea2..eab75ae4 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -141,7 +141,7 @@ class ARQSessionISS(arq_session.ARQSession): info_frame = self.frame_factory.build_arq_session_info(self.id, self.total_length, helpers.get_crc_32(self.data), - self.snr[0], self.type_byte) + self.snr, self.type_byte) self.launch_twr(info_frame, self.TIMEOUT_CONNECT_ACK, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling) self.set_state(ISS_State.INFO_SENT) @@ -149,14 +149,15 @@ class ARQSessionISS(arq_session.ARQSession): return None, None def send_data(self, irs_frame): + # update statistics + self.update_histograms() self.update_speed_level(irs_frame) - if 'offset' in irs_frame: self.confirmed_bytes = irs_frame['offset'] self.log(f"IRS confirmed {self.confirmed_bytes}/{self.total_length} bytes") self.event_manager.send_arq_session_progress( - True, self.id, self.dxcall, self.confirmed_bytes, self.total_length, self.state.name) + True, self.id, self.dxcall, self.confirmed_bytes, self.total_length, self.state.name, statistics=self.calculate_session_statistics()) # check if we received an abort flag if irs_frame["flag"]["ABORT"]: @@ -190,9 +191,9 @@ class ARQSessionISS(arq_session.ARQSession): self.set_state(ISS_State.ENDED) self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}") self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics()) + self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics()) self.state_manager.remove_arq_iss_session(self.id) self.states.setARQ(False) - self.arq_data_type_handler.transmitted(self.type_byte, self.data) return None, None def transmission_failed(self, irs_frame=None): diff --git a/modem/command_message_send.py b/modem/command_message_send.py index ad6fb6cc..1923208d 100644 --- a/modem/command_message_send.py +++ b/modem/command_message_send.py @@ -15,7 +15,7 @@ class SendMessageCommand(TxCommand): def set_params_from_api(self, apiParams): origin = f"{self.config['STATION']['mycall']}-{self.config['STATION']['myssid']}" self.message = MessageP2P.from_api_params(origin, apiParams) - DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), direction='transmit', status='queued') + DatabaseManagerMessages(self.event_manager).add_message(self.message.to_dict(), statistics={}, direction='transmit', status='queued') def transmit(self, modem): diff --git a/modem/event_manager.py b/modem/event_manager.py index 21482ee3..ab19eff0 100644 --- a/modem/event_manager.py +++ b/modem/event_manager.py @@ -42,7 +42,10 @@ class EventManager: } self.broadcast(event) - def send_arq_session_progress(self, outbound: bool, session_id, dxcall, received_bytes, total_bytes, state): + def send_arq_session_progress(self, outbound: bool, session_id, dxcall, received_bytes, total_bytes, state, statistics=None): + if statistics is None: + statistics = {} + direction = 'outbound' if outbound else 'inbound' event = { "type": "arq", @@ -52,6 +55,7 @@ class EventManager: 'received_bytes': received_bytes, 'total_bytes': total_bytes, 'state': state, + 'statistics': statistics, } } self.broadcast(event) diff --git a/modem/message_p2p.py b/modem/message_p2p.py index fafd2c25..44fcde96 100644 --- a/modem/message_p2p.py +++ b/modem/message_p2p.py @@ -7,23 +7,23 @@ from message_system_db_messages import DatabaseManagerMessages #import command_message_send -def message_received(event_manager, state_manager, data): +def message_received(event_manager, state_manager, data, statistics): decompressed_json_string = data.decode('utf-8') received_message_obj = MessageP2P.from_payload(decompressed_json_string) received_message_dict = MessageP2P.to_dict(received_message_obj) - DatabaseManagerMessages(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False) + DatabaseManagerMessages(event_manager).add_message(received_message_dict, statistics, direction='receive', status='received', is_read=False) -def message_transmitted(event_manager, state_manager, data): +def message_transmitted(event_manager, state_manager, data, statistics): decompressed_json_string = data.decode('utf-8') payload_message_obj = MessageP2P.from_payload(decompressed_json_string) payload_message = MessageP2P.to_dict(payload_message_obj) - DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'transmitted'}) + DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'transmitted', 'statistics': statistics}) -def message_failed(event_manager, state_manager, data): +def message_failed(event_manager, state_manager, data, statistics): decompressed_json_string = data.decode('utf-8') payload_message_obj = MessageP2P.from_payload(decompressed_json_string) payload_message = MessageP2P.to_dict(payload_message_obj) - DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'}) + DatabaseManagerMessages(event_manager).update_message(payload_message["id"], statistics, update_data={'status': 'failed', 'statistics': statistics}) class MessageP2P: def __init__(self, id: str, origin: str, destination: str, body: str, attachments: list) -> None: diff --git a/modem/message_system_db_messages.py b/modem/message_system_db_messages.py index 639ac92f..6f6739e1 100644 --- a/modem/message_system_db_messages.py +++ b/modem/message_system_db_messages.py @@ -11,7 +11,7 @@ class DatabaseManagerMessages(DatabaseManager): super().__init__(uri) self.attachments_manager = DatabaseManagerAttachments(uri) - def add_message(self, message_data, direction='receive', status=None, is_read=True): + def add_message(self, message_data, statistics, direction='receive', status=None, is_read=True): session = self.get_thread_scoped_session() try: # Create and add the origin and destination Stations @@ -34,7 +34,8 @@ class DatabaseManagerMessages(DatabaseManager): direction=direction, status_id=status.id if status else None, is_read=is_read, - attempt=0 + attempt=0, + statistics=statistics ) session.add(new_message) @@ -131,6 +132,9 @@ class DatabaseManagerMessages(DatabaseManager): if 'status' in update_data: message.status = self.get_or_create_status(session, update_data['status']) + if 'statistics' in update_data: + message.statistics = update_data['statistics'] + session.commit() self.log(f"Updated: {message_id}") self.event_manager.freedata_message_db_change()