diff --git a/modem/arq_session.py b/modem/arq_session.py index f26e642a..618cdec9 100644 --- a/modem/arq_session.py +++ b/modem/arq_session.py @@ -97,5 +97,3 @@ class ARQSession(): return self.log(f"Ignoring unknow transition from state {self.state} with frame {frame['frame_type']}") - - diff --git a/modem/arq_session_irs.py b/modem/arq_session_irs.py index 1bb512b8..eb85f23d 100644 --- a/modem/arq_session_irs.py +++ b/modem/arq_session_irs.py @@ -78,6 +78,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.log("Timeout waiting for ISS. Session failed.") self.set_state(IRS_State.FAILED) + self.event_manager.send_arq_finished(False, self.id, self.dxcall, self.total_length, False) def launch_transmit_and_wait(self, frame, timeout, mode): thread_wait = threading.Thread(target = self.transmit_and_wait, @@ -101,6 +102,7 @@ class ARQSessionIRS(arq_session.ARQSession): self.dx_snr.append(info_frame['snr']) self.log(f"New transfer of {self.total_length} bytes") + self.event_manager.send_arq_session_new(False, self.id, self.dxcall, self.total_length) self.calibrate_speed_settings() self.set_decode_mode() @@ -134,6 +136,8 @@ class ARQSessionIRS(arq_session.ARQSession): self.received_data[frame['offset']:] = data_part self.received_bytes += len(data_part) self.log(f"Received {self.received_bytes}/{self.total_length} bytes") + self.event_manager.send_arq_session_progress( + False, self.id, self.dxcall, self.received_bytes, self.total_length) return True @@ -155,10 +159,15 @@ class ARQSessionIRS(arq_session.ARQSession): self.log("All data received successfully!") self.transmit_frame(ack, mode=FREEDV_MODE.signalling) self.set_state(IRS_State.ENDED) + self.event_manager.send_arq_session_finished( + False, self.id, self.dxcall, self.total_length, True) else: self.log("CRC fail at the end of transmission!") self.set_state(IRS_State.FAILED) + self.event_manager.send_arq_session_finished( + False, self.id, self.dxcall, self.total_length, False) + def calibrate_speed_settings(self): self.speed_level = 0 # for now stay at lowest speed level diff --git a/modem/arq_session_iss.py b/modem/arq_session_iss.py index ba4d7cc6..16123803 100644 --- a/modem/arq_session_iss.py +++ b/modem/arq_session_iss.py @@ -66,6 +66,7 @@ class ARQSessionISS(arq_session.ARQSession): retries = retries - 1 self.set_state(ISS_State.FAILED) self.log("Session failed") + self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data), False) def launch_twr(self, frame_or_burst, timeout, retries, mode): twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode]) @@ -96,10 +97,13 @@ class ARQSessionISS(arq_session.ARQSession): if 'offset' in irs_frame: self.confirmed_bytes = irs_frame['offset'] self.log(f"IRS confirmed {self.confirmed_bytes}/{len(self.data)} bytes") + self.event_manager.send_arq_session_progress( + True, self.id, self.dxcall, self.confirmed_bytes, len(self.data)) if self.confirmed_bytes == len(self.data): self.set_state(ISS_State.ENDED) self.log("All data transfered!") + self.event_manager.send_arq_session_finished(True, self.id, self.dxcall, len(self.data), True) return payload_size = self.get_data_payload_size() burst = [] diff --git a/modem/event_manager.py b/modem/event_manager.py index 73748c7f..156c22cf 100644 --- a/modem/event_manager.py +++ b/modem/event_manager.py @@ -5,11 +5,12 @@ class EventManager: def __init__(self, queues): self.queues = queues - self.log = structlog.get_logger('Event Manager') + self.logger = structlog.get_logger('Event Manager') self.lastpttstate = False def broadcast(self, data): for q in self.queues: + self.logger.debug(f"Event: ", ev=data) q.put(data) def send_ptt_change(self, on:bool = False): @@ -26,3 +27,38 @@ class EventManager: def send_custom_event(self, **event_data): self.broadcast(event_data) + + def send_arq_session_new(self, outbound: bool, session_id, dxcall, total_bytes): + direction = 'outbound' if outbound else 'inbound' + event = { + f"arq-transfer-{direction}": { + 'session_id': session_id, + 'dxcall': dxcall, + 'total_bytes': total_bytes, + } + } + self.broadcast(event) + + def send_arq_session_progress(self, outbound: bool, session_id, dxcall, received_bytes, total_bytes): + direction = 'outbound' if outbound else 'inbound' + event = { + f"arq-transfer-{direction}": { + 'session_id': session_id, + 'dxcall': dxcall, + 'received_bytes': received_bytes, + 'total_bytes': total_bytes, + } + } + self.broadcast(event) + + def send_arq_session_finished(self, outbound: bool, session_id, dxcall, total_bytes, success: bool): + direction = 'outbound' if outbound else 'inbound' + event = { + f"arq-transfer-{direction}": { + 'session_id': session_id, + 'dxcall': dxcall, + 'total_bytes': total_bytes, + 'success': success, + } + } + self.broadcast(event)