fixed message transmission status

This commit is contained in:
DJ2LS 2024-02-07 19:43:10 +01:00
parent 127f4db1a1
commit 0a9ce0d202
5 changed files with 197 additions and 5 deletions

View file

@ -3,7 +3,7 @@
import structlog
import lzma
import gzip
from message_p2p import message_received, message_failed
from message_p2p import message_received, message_failed, message_transmitted
from enum import Enum
class ARQ_SESSION_TYPES(Enum):
@ -23,21 +23,25 @@ class ARQDataTypeHandler:
'prepare': self.prepare_raw,
'handle': self.handle_raw,
'failed': self.failed_raw,
'transmitted': self.transmitted_raw,
},
ARQ_SESSION_TYPES.raw_lzma: {
'prepare': self.prepare_raw_lzma,
'handle': self.handle_raw_lzma,
'failed': self.failed_raw_lzma,
'transmitted': self.transmitted_raw_lzma,
},
ARQ_SESSION_TYPES.raw_gzip: {
'prepare': self.prepare_raw_gzip,
'handle': self.handle_raw_gzip,
'failed': self.failed_raw_gzip,
'transmitted': self.transmitted_raw_gzip,
},
ARQ_SESSION_TYPES.p2pmsg_lzma: {
'prepare': self.prepare_p2pmsg_lzma,
'handle': self.handle_p2pmsg_lzma,
'failed' : self.failed_p2pmsg_lzma,
'transmitted': self.transmitted_p2pmsg_lzma,
},
}
@ -68,6 +72,13 @@ class ARQDataTypeHandler:
else:
self.log(f"Unknown preparation endpoint: {session_type}", isWarning=True)
def transmitted(self, type_byte: int, data: bytearray):
session_type = self.get_session_type_from_value(type_byte)
if session_type in self.handlers and 'transmitted' in self.handlers[session_type]:
return self.handlers[session_type]['transmitted'](data)
else:
self.log(f"Unknown handling endpoint: {session_type}", isWarning=True)
def log(self, message, isWarning=False):
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
@ -84,6 +95,10 @@ class ARQDataTypeHandler:
def failed_raw(self, data):
return
def transmitted_raw(self, data):
decompressed_data = lzma.decompress(data)
return decompressed_data
def prepare_raw_lzma(self, data):
compressed_data = lzma.compress(data)
self.log(f"Preparing LZMA compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
@ -97,6 +112,10 @@ class ARQDataTypeHandler:
def failed_raw_lzma(self, data):
return
def transmitted_raw_lzma(self, data):
decompressed_data = lzma.decompress(data)
return decompressed_data
def prepare_raw_gzip(self, data):
compressed_data = gzip.compress(data)
self.log(f"Preparing GZIP compressed data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
@ -110,6 +129,10 @@ class ARQDataTypeHandler:
def failed_raw_gzip(self, data):
return
def transmitted_raw_gzip(self, data):
decompressed_data = lzma.decompress(data)
return decompressed_data
def prepare_p2pmsg_lzma(self, data):
compressed_data = lzma.compress(data)
self.log(f"Preparing LZMA compressed P2PMSG data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
@ -125,4 +148,9 @@ class ARQDataTypeHandler:
decompressed_data = lzma.decompress(data)
self.log(f"Handling failed LZMA compressed P2PMSG data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
message_failed(self.event_manager, self.state_manager, decompressed_data)
return decompressed_data
def transmitted_p2pmsg_lzma(self, data):
decompressed_data = lzma.decompress(data)
message_transmitted(self.event_manager, self.state_manager, decompressed_data)
return decompressed_data

View file

@ -170,6 +170,7 @@ class ARQSessionISS(arq_session.ARQSession):
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics())
self.state_manager.remove_arq_iss_session(self.id)
self.states.setARQ(False)
self.arq_data_type_handler.transmitted(self.type_byte, self.data)
return None, None
def transmission_failed(self, irs_frame=None):

View file

@ -13,13 +13,16 @@ def message_received(event_manager, state_manager, data):
received_message_dict = MessageP2P.to_dict(received_message_obj)
DatabaseManagerMessages(event_manager).add_message(received_message_dict, direction='receive', status='received', is_read=False)
def message_failed(event_manager, state_manager, data):
def message_transmitted(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
payload_message = MessageP2P.to_dict(payload_message_obj)
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'transmitted'})
def message_failed(event_manager, state_manager, data):
decompressed_json_string = data.decode('utf-8')
#payload_message = json.loads(decompressed_json_string)
#print(payload_message)
payload_message_obj = MessageP2P.from_payload(decompressed_json_string)
payload_message = MessageP2P.to_dict(payload_message_obj)
print(payload_message)
DatabaseManagerMessages(event_manager).update_message(payload_message["id"], update_data={'status': 'failed'})
class MessageP2P:

View file

@ -1,6 +1,7 @@
from message_system_db_manager import DatabaseManager
from message_system_db_attachments import DatabaseManagerAttachments
from message_system_db_model import Status, P2PMessage
from sqlalchemy.exc import IntegrityError
from datetime import datetime
import json
@ -47,6 +48,12 @@ class DatabaseManagerMessages(DatabaseManager):
self.log(f"Added data to database: {new_message.id}")
self.event_manager.freedata_message_db_change()
return new_message.id
except IntegrityError as e:
session.rollback() # Roll back the session to a clean state
self.log(f"Message with ID {message_data['id']} already exists in the database.", isWarning=True)
return None # or you might return the existing message's ID or details
except Exception as e:
session.rollback()
self.log(f"error adding new message to database with error: {e}", isWarning=True)

View file

@ -0,0 +1,153 @@
import sys
import time
sys.path.append('modem')
import unittest
import unittest.mock
from config import CONFIG
import helpers
import queue
import threading
import base64
from command_arq_raw import ARQRawCommand
from state_manager import StateManager
from frame_dispatcher import DISPATCHER
import random
import structlog
import numpy as np
from event_manager import EventManager
from state_manager import StateManager
from data_frame_factory import DataFrameFactory
import codec2
import arq_session_irs
from server import enqueue_tx_command
import command_message_send
class TestModem:
def __init__(self, event_q, state_q):
self.data_queue_received = queue.Queue()
self.demodulator = unittest.mock.Mock()
self.event_manager = EventManager([event_q])
self.logger = structlog.get_logger('Modem')
self.states = StateManager(state_q)
def getFrameTransmissionTime(self, mode):
samples = 0
c2instance = codec2.open_instance(mode.value)
samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance)
samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance)
samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance)
time = samples / 8000
return time
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
# Simulate transmission time
tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT
self.logger.info(f"TX {tx_time} seconds...")
threading.Event().wait(tx_time)
transmission = {
'mode': mode,
'bytes': frames,
}
self.data_queue_received.put(transmission)
class TestMessageProtocol(unittest.TestCase):
@classmethod
def setUpClass(cls):
config_manager = CONFIG('modem/config.ini.example')
cls.config = config_manager.read()
cls.logger = structlog.get_logger("TESTS")
cls.frame_factory = DataFrameFactory(cls.config)
# ISS
cls.iss_state_manager = StateManager(queue.Queue())
cls.iss_event_manager = EventManager([queue.Queue()])
cls.iss_event_queue = queue.Queue()
cls.iss_state_queue = queue.Queue()
cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue)
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
cls.iss_event_manager,
cls.iss_state_manager,
cls.iss_modem)
# IRS
cls.irs_state_manager = StateManager(queue.Queue())
cls.irs_event_manager = EventManager([queue.Queue()])
cls.irs_event_queue = queue.Queue()
cls.irs_state_queue = queue.Queue()
cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue)
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
cls.irs_event_manager,
cls.irs_state_manager,
cls.irs_modem)
# Frame loss probability in %
cls.loss_probability = 30
cls.channels_running = True
def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER):
while self.channels_running:
# Transfer data between both parties
try:
transmission = modem_transmit_queue.get(timeout=1)
if random.randint(0, 100) < self.loss_probability:
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
continue
frame_bytes = transmission['bytes']
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
except queue.Empty:
continue
self.logger.info(f"[{threading.current_thread().name}] Channel closed.")
def waitForSession(self, q, outbound=False):
key = 'arq-transfer-outbound' if outbound else 'arq-transfer-inbound'
while True:
ev = q.get()
if key in ev and ('success' in ev[key] or 'ABORTED' in ev[key]):
self.logger.info(f"[{threading.current_thread().name}] {key} session ended.")
break
def establishChannels(self):
self.channels_running = True
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
args=[self.iss_modem.data_queue_received,
self.irs_frame_dispatcher],
name="ISS to IRS channel")
self.iss_to_irs_channel.start()
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker,
args=[self.irs_modem.data_queue_received,
self.iss_frame_dispatcher],
name="IRS to ISS channel")
self.irs_to_iss_channel.start()
def waitAndCloseChannels(self):
self.waitForSession(self.iss_event_queue, True)
self.waitForSession(self.irs_event_queue, False)
self.channels_running = False
def testMessageViaSession(self):
# set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 0
self.establishChannels()
params = {
'destination': "XX1XXX-1",
'body': 'Hello World',
}
cmd_class = command_message_send.SendMessageCommand
command = cmd_class(self.config, self.iss_state_manager, self.iss_event_manager, params)
command.run(self.iss_event_manager, self.iss_modem)
self.waitAndCloseChannels()