added data and command queues

This commit is contained in:
DJ2LS 2024-03-19 11:19:13 +01:00
parent adb0c82332
commit 5ed11d771f
4 changed files with 99 additions and 43 deletions

View file

@ -6,6 +6,7 @@ import data_frame_factory
import structlog
import random
from queue import Queue
import time
class States(Enum):
NEW = 0
@ -37,7 +38,7 @@ class P2PConnection:
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
},
States.PAYLOAD_SENT: {
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'process_data_queue',
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
},
States.DISCONNECTING: {
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
@ -63,26 +64,13 @@ class P2PConnection:
self.states = state_manager
self.modem = modem
self.p2p_rx_queue = Queue()
self.p2p_tx_queue = Queue()
self.p2p_data_rx_queue = Queue()
self.p2p_data_tx_queue = Queue()
self.state = States.NEW
self.session_id = self.generate_id()
def generate_random_string(min_length, max_length):
import string
length = random.randint(min_length, max_length)
return ''.join(random.choices(string.ascii_letters, k=length))
# Generate and add 5 random entries to the queue
for _ in range(1):
random_entry = generate_random_string(2, 11)
self.p2p_tx_queue.put(random_entry)
self.event_frame_received = threading.Event()
self.RETRIES_CONNECT = 1
@ -93,6 +81,30 @@ class P2PConnection:
self.is_ISS = False # Indicator, if we are ISS or IRS
self.last_data_timestamp= time.time()
self.start_data_processing_worker()
def start_data_processing_worker(self):
"""Starts a worker thread to monitor the transmit data queue and process data."""
def data_processing_worker():
while True:
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT:
self.disconnect()
return
if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED:
self.process_data_queue()
threading.Event().wait(0.1)
# Create and start the worker thread
worker_thread = threading.Thread(target=data_processing_worker, daemon=True)
worker_thread.start()
def generate_id(self):
while True:
random_int = random.randint(1,255)
@ -102,7 +114,6 @@ class P2PConnection:
if len(self.states.p2p_connection_sessions) >= 255:
return False
def set_details(self, snr, frequency_offset):
self.snr = snr
self.frequency_offset = frequency_offset
@ -120,6 +131,7 @@ class P2PConnection:
self.state = state
def on_frame_received(self, frame):
self.last_data_timestamp = time.time()
self.event_frame_received.set()
self.log(f"Received {frame['frame_type']}")
frame_type = frame['frame_type_int']
@ -153,6 +165,7 @@ class P2PConnection:
self.log("Timeout!")
retries = retries - 1
#self.connected_iss() # override connection state for simulation purposes
self.session_failed()
def launch_twr(self, frame_or_burst, timeout, retries, mode):
@ -179,35 +192,38 @@ class P2PConnection:
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
return
def connected_iss(self, frame):
def connected_iss(self):
self.log("CONNECTED ISS...........................")
self.set_state(States.CONNECTED)
self.is_ISS = True
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
self.process_data_queue()
if self.socket_command_handler:
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
def connected_irs(self, frame):
self.log("CONNECTED IRS...........................")
self.states.register_p2p_connection_session(self)
self.set_state(States.CONNECTED)
self.is_ISS = False
self.orign = frame["origin"]
self.destination = frame["destination"]
self.destination = frame["destination_crc"]
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
if self.socket_command_handler:
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id)
self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
def session_failed(self):
self.set_state(States.FAILED)
self.socket_command_handler.socket_respond_disconnected()
if self.socket_command_handler:
self.socket_command_handler.socket_respond_disconnected()
def process_data_queue(self, frame=None):
if not self.p2p_tx_queue.empty():
if not self.p2p_data_tx_queue.empty():
print("processing data....")
self.set_state(States.PAYLOAD_SENT)
data = self.p2p_tx_queue.get()
data = self.p2p_data_tx_queue.get()
sequence_id = random.randint(0,255)
data = data.encode('utf-8')
@ -218,30 +234,35 @@ class P2PConnection:
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,
mode=mode)
return
print("ALL DATA SENT!!!!!")
self.disconnect()
def prepare_data_chunk(self, data, mode):
return data
def received_data(self, frame):
print(frame)
self.p2p_data_rx_queue.put(frame['data'])
ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0)
self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
def transmit_data_ack(self, frame):
print(frame)
def transmitted_data(self, frame):
print("transmitted data...")
self.set_state(States.CONNECTED)
def disconnect(self):
self.set_state(States.DISCONNECTING)
disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id)
self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
if self.state not in [States.DISCONNECTING, States.DISCONNECTED]:
self.set_state(States.DISCONNECTING)
disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id)
self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
return
def received_disconnect(self, frame):
self.log("DISCONNECTED...............")
self.set_state(States.DISCONNECTED)
self.socket_command_handler.socket_respond_disconnected()
if self.socket_command_handler:
self.socket_command_handler.socket_respond_disconnected()
self.is_ISS = False
disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id)
self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
@ -249,7 +270,8 @@ class P2PConnection:
def received_disconnect_ack(self, frame):
self.log("DISCONNECTED...............")
self.set_state(States.DISCONNECTED)
self.socket_command_handler.socket_respond_disconnected()
if self.socket_command_handler:
self.socket_command_handler.socket_respond_disconnected()
def transmit_arq(self):
@ -259,4 +281,5 @@ class P2PConnection:
#if command.run(app.modem_events, app.service_manager.modem):
def received_arq(self):
pass
pass

View file

@ -15,7 +15,6 @@ class CommandSocket(socketserver.BaseRequestHandler):
self.event_manager = event_manager
self.config_manager = config_manager
self.modem = modem
print(self.config_manager)
self.logger = structlog.get_logger(type(self).__name__)
self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager)
@ -86,11 +85,13 @@ class DataSocket(socketserver.BaseRequestHandler):
msg = f"[{type(self).__name__}]: {message}"
logger = self.logger.warn if isWarning else self.logger.info
logger(msg)
def handle(self):
self.log(f"Data connection established with {self.client_address}")
try:
while True:
ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout
if ready_to_read:
self.data = self.request.recv(1024).strip()
@ -98,14 +99,21 @@ class DataSocket(socketserver.BaseRequestHandler):
break
try:
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}")
except:
except Exception:
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}")
for session in self.state_manager.p2p_connection_sessions:
print(f"sessions: {session}")
session.p2p_data_tx_queue.put(self.data)
# Check if there's something to send from the queue, without blocking
if not data_queue.empty():
data_to_send = data_queue.get_nowait() # Use get_nowait to avoid blocking
self.request.sendall(data_to_send)
self.log(f"Sent data to {self.client_address}")
for session_id in self.state_manager.p2p_connection_sessions:
session = self.state_manager.get_p2p_connection_session(session_id)
if not session.p2p_data_tx_queue.empty():
data_to_send = session.p2p_data_tx_queue.get_nowait() # Use get_nowait to avoid blocking
self.request.sendall(data_to_send)
self.log(f"Sent data to {self.client_address}")
finally:
self.log(f"Data connection closed with {self.client_address}")

View file

@ -9,6 +9,8 @@ class SocketCommandHandler:
self.state_manager = state_manager
self.event_manager = event_manager
self.session = None
def send_response(self, message):
full_message = f"{message}\r\n"
self.cmd_request.sendall(full_message.encode())
@ -22,10 +24,11 @@ class SocketCommandHandler:
'destination': data[1],
}
cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self)
session = cmd.run(self.event_manager.queues, self.modem)
if session.session_id:
self.state_manager.register_p2p_connection_session(session)
session.connect()
self.session = cmd.run(self.event_manager.queues, self.modem)
if self.session.session_id:
self.state_manager.register_p2p_connection_session(self.session)
self.session.connect()
def handle_disconnect(self, data):
# Your existing connect logic

View file

@ -136,6 +136,11 @@ class TestP2PConnectionSession(unittest.TestCase):
self.waitForSession(self.irs_event_queue, False)
self.channels_running = False
def generate_random_string(self, min_length, max_length):
import string
length = random.randint(min_length, max_length)
return ''.join(random.choices(string.ascii_letters, k=length))#
def testARQSessionSmallPayload(self):
# set Packet Error Rate (PER) / frame loss probability
self.loss_probability = 0
@ -151,8 +156,25 @@ class TestP2PConnectionSession(unittest.TestCase):
self.iss_state_manager.register_p2p_connection_session(session)
session.connect()
# Generate and add 5 random entries to the queue
for _ in range(5):
random_entry = self.generate_random_string(2, 11)
session.p2p_data_tx_queue.put(random_entry)
self.waitAndCloseChannels()
del cmd
class TestSocket:
def __init__(self, isCmd=True):
self.isCmd = isCmd
self.sent_data = [] # To capture data sent through this socket
self.received_data = b"" # To simulate data received by this socket
def sendall(self, data):
print(f"Mock sendall called with data: {data}")
self.sent_data.append(data)
if __name__ == '__main__':
unittest.main()