mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
315 lines
12 KiB
Python
315 lines
12 KiB
Python
import threading
|
|
from enum import Enum
|
|
from modem_frametypes import FRAME_TYPE
|
|
from codec2 import FREEDV_MODE
|
|
import data_frame_factory
|
|
import structlog
|
|
import random
|
|
from queue import Queue
|
|
import time
|
|
from command_arq_raw import ARQRawCommand
|
|
import numpy as np
|
|
import base64
|
|
from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES
|
|
from arq_session_iss import ARQSessionISS
|
|
|
|
class States(Enum):
|
|
NEW = 0
|
|
CONNECTING = 1
|
|
CONNECT_SENT = 2
|
|
CONNECT_ACK_SENT = 3
|
|
CONNECTED = 4
|
|
#HEARTBEAT_SENT = 5
|
|
#HEARTBEAT_ACK_SENT = 6
|
|
PAYLOAD_SENT = 7
|
|
ARQ_SESSION = 8
|
|
DISCONNECTING = 9
|
|
DISCONNECTED = 10
|
|
FAILED = 11
|
|
|
|
|
|
|
|
class P2PConnection:
|
|
STATE_TRANSITION = {
|
|
States.NEW: {
|
|
FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs',
|
|
},
|
|
States.CONNECTING: {
|
|
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
|
|
},
|
|
States.CONNECTED: {
|
|
FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs',
|
|
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
|
|
FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data',
|
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
|
|
},
|
|
States.PAYLOAD_SENT: {
|
|
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
|
|
},
|
|
States.DISCONNECTING: {
|
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
|
|
},
|
|
States.DISCONNECTED: {
|
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
|
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
|
|
|
|
},
|
|
}
|
|
|
|
def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None):
|
|
self.logger = structlog.get_logger(type(self).__name__)
|
|
self.config = config
|
|
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
|
|
|
self.socket_command_handler = socket_command_handler
|
|
|
|
self.destination = destination
|
|
self.origin = origin
|
|
self.bandwidth = 0
|
|
|
|
self.state_manager = state_manager
|
|
self.event_manager = event_manager
|
|
self.modem = modem
|
|
self.modem.demodulator.set_decode_mode([])
|
|
|
|
self.p2p_data_rx_queue = Queue()
|
|
self.p2p_data_tx_queue = Queue()
|
|
|
|
self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager)
|
|
|
|
|
|
self.state = States.NEW
|
|
self.session_id = self.generate_id()
|
|
|
|
self.event_frame_received = threading.Event()
|
|
|
|
self.RETRIES_CONNECT = 1
|
|
self.TIMEOUT_CONNECT = 10
|
|
self.TIMEOUT_DATA = 5
|
|
self.RETRIES_DATA = 5
|
|
self.ENTIRE_CONNECTION_TIMEOUT = 100
|
|
|
|
self.is_ISS = False # Indicator, if we are ISS or IRS
|
|
|
|
self.last_data_timestamp= time.time()
|
|
|
|
self.start_data_processing_worker()
|
|
|
|
|
|
def start_data_processing_worker(self):
|
|
"""Starts a worker thread to monitor the transmit data queue and process data."""
|
|
|
|
def data_processing_worker():
|
|
while True:
|
|
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION:
|
|
self.disconnect()
|
|
return
|
|
|
|
if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED:
|
|
self.process_data_queue()
|
|
threading.Event().wait(0.1)
|
|
|
|
|
|
|
|
|
|
# Create and start the worker thread
|
|
worker_thread = threading.Thread(target=data_processing_worker, daemon=True)
|
|
worker_thread.start()
|
|
|
|
def generate_id(self):
|
|
while True:
|
|
random_int = random.randint(1,255)
|
|
if random_int not in self.state_manager.p2p_connection_sessions:
|
|
return random_int
|
|
|
|
if len(self.state_manager.p2p_connection_sessions) >= 255:
|
|
return False
|
|
|
|
def set_details(self, snr, frequency_offset):
|
|
self.snr = snr
|
|
self.frequency_offset = frequency_offset
|
|
|
|
def log(self, message, isWarning = False):
|
|
msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}"
|
|
logger = self.logger.warn if isWarning else self.logger.info
|
|
logger(msg)
|
|
|
|
def set_state(self, state):
|
|
if self.state == state:
|
|
self.log(f"{type(self).__name__} state {self.state.name} unchanged.")
|
|
else:
|
|
self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}")
|
|
self.state = state
|
|
|
|
def on_frame_received(self, frame):
|
|
self.last_data_timestamp = time.time()
|
|
self.event_frame_received.set()
|
|
self.log(f"Received {frame['frame_type']}")
|
|
frame_type = frame['frame_type_int']
|
|
if self.state in self.STATE_TRANSITION:
|
|
if frame_type in self.STATE_TRANSITION[self.state]:
|
|
action_name = self.STATE_TRANSITION[self.state][frame_type]
|
|
response = getattr(self, action_name)(frame)
|
|
|
|
return
|
|
|
|
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
|
|
|
|
def transmit_frame(self, frame: bytearray, mode='auto'):
|
|
self.log("Transmitting frame")
|
|
if mode in ['auto']:
|
|
mode = self.get_mode_by_speed_level(self.speed_level)
|
|
|
|
self.modem.transmit(mode, 1, 1, frame)
|
|
|
|
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode):
|
|
while retries > 0:
|
|
self.event_frame_received = threading.Event()
|
|
if isinstance(frame_or_burst, list): burst = frame_or_burst
|
|
else: burst = [frame_or_burst]
|
|
for f in burst:
|
|
self.transmit_frame(f, mode)
|
|
self.event_frame_received.clear()
|
|
self.log(f"Waiting {timeout} seconds...")
|
|
if self.event_frame_received.wait(timeout):
|
|
return
|
|
self.log("Timeout!")
|
|
retries = retries - 1
|
|
|
|
#self.connected_iss() # override connection state for simulation purposes
|
|
self.session_failed()
|
|
|
|
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
|
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True)
|
|
twr.start()
|
|
|
|
def transmit_and_wait_irs(self, frame, timeout, mode):
|
|
self.event_frame_received.clear()
|
|
self.transmit_frame(frame, mode)
|
|
self.log(f"Waiting {timeout} seconds...")
|
|
#if not self.event_frame_received.wait(timeout):
|
|
# self.log("Timeout waiting for ISS. Session failed.")
|
|
# self.transmission_failed()
|
|
|
|
def launch_twr_irs(self, frame, timeout, mode):
|
|
thread_wait = threading.Thread(target = self.transmit_and_wait_irs,
|
|
args = [frame, timeout, mode], daemon=True)
|
|
thread_wait.start()
|
|
|
|
def connect(self):
|
|
self.set_state(States.CONNECTING)
|
|
self.is_ISS = True
|
|
session_open_frame = self.frame_factory.build_p2p_connection_connect(self.origin, self.destination, self.session_id)
|
|
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
|
return
|
|
|
|
def connected_iss(self, frame=None):
|
|
self.log("CONNECTED ISS...........................")
|
|
self.set_state(States.CONNECTED)
|
|
self.is_ISS = True
|
|
if self.socket_command_handler:
|
|
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
|
|
|
|
def connected_irs(self, frame):
|
|
self.log("CONNECTED IRS...........................")
|
|
self.state_manager.register_p2p_connection_session(self)
|
|
self.set_state(States.CONNECTED)
|
|
self.is_ISS = False
|
|
self.orign = frame["origin"]
|
|
self.destination = frame["destination_crc"]
|
|
|
|
if self.socket_command_handler:
|
|
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
|
|
|
|
session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id)
|
|
self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
|
|
|
def session_failed(self):
|
|
self.set_state(States.FAILED)
|
|
if self.socket_command_handler:
|
|
self.socket_command_handler.socket_respond_disconnected()
|
|
|
|
def process_data_queue(self, frame=None):
|
|
if not self.p2p_data_tx_queue.empty():
|
|
print("processing data....")
|
|
|
|
self.set_state(States.PAYLOAD_SENT)
|
|
data = self.p2p_data_tx_queue.get()
|
|
sequence_id = random.randint(0,255)
|
|
data = data.encode('utf-8')
|
|
|
|
if len(data) <= 11:
|
|
mode = FREEDV_MODE.signalling
|
|
elif 11 < len(data) < 32:
|
|
mode = FREEDV_MODE.datac4
|
|
else:
|
|
self.transmit_arq(data)
|
|
return
|
|
|
|
payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data)
|
|
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode)
|
|
return
|
|
|
|
def prepare_data_chunk(self, data, mode):
|
|
return data
|
|
|
|
def received_data(self, frame):
|
|
print(frame)
|
|
self.p2p_data_rx_queue.put(frame['data'])
|
|
|
|
ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0)
|
|
self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
|
|
|
def transmit_data_ack(self, frame):
|
|
print(frame)
|
|
|
|
def transmitted_data(self, frame):
|
|
print("transmitted data...")
|
|
self.set_state(States.CONNECTED)
|
|
|
|
def disconnect(self):
|
|
if self.state not in [States.DISCONNECTING, States.DISCONNECTED]:
|
|
self.set_state(States.DISCONNECTING)
|
|
disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id)
|
|
self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
|
return
|
|
|
|
def received_disconnect(self, frame):
|
|
self.log("DISCONNECTED...............")
|
|
self.set_state(States.DISCONNECTED)
|
|
if self.socket_command_handler:
|
|
self.socket_command_handler.socket_respond_disconnected()
|
|
self.is_ISS = False
|
|
disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id)
|
|
self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
|
|
|
def received_disconnect_ack(self, frame):
|
|
self.log("DISCONNECTED...............")
|
|
self.set_state(States.DISCONNECTED)
|
|
if self.socket_command_handler:
|
|
self.socket_command_handler.socket_respond_disconnected()
|
|
|
|
def transmit_arq(self, data):
|
|
self.set_state(States.ARQ_SESSION)
|
|
|
|
print("----------------------------------------------------------------")
|
|
print(self.destination)
|
|
print(self.state_manager.p2p_connection_sessions)
|
|
|
|
prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection)
|
|
iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte)
|
|
iss.id = self.session_id
|
|
if iss.id:
|
|
self.state_manager.register_arq_iss_session(iss)
|
|
iss.start()
|
|
return iss
|
|
|
|
def transmitted_arq(self):
|
|
self.last_data_timestamp = time.time()
|
|
self.set_state(States.CONNECTED)
|
|
|
|
def received_arq(self, data):
|
|
self.last_data_timestamp = time.time()
|
|
self.set_state(States.CONNECTED)
|
|
self.p2p_data_rx_queue.put(data)
|
|
|