mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 08:04:33 +00:00
Merge pull request #691 from DJ2LS/dev-interface-tests
Socket interface prototype
This commit is contained in:
commit
33f0d2a160
22 changed files with 1127 additions and 24 deletions
|
@ -11,6 +11,7 @@ class ARQ_SESSION_TYPES(Enum):
|
||||||
raw_lzma = 10
|
raw_lzma = 10
|
||||||
raw_gzip = 11
|
raw_gzip = 11
|
||||||
p2pmsg_lzma = 20
|
p2pmsg_lzma = 20
|
||||||
|
p2p_connection = 30
|
||||||
|
|
||||||
class ARQDataTypeHandler:
|
class ARQDataTypeHandler:
|
||||||
def __init__(self, event_manager, state_manager):
|
def __init__(self, event_manager, state_manager):
|
||||||
|
@ -43,6 +44,12 @@ class ARQDataTypeHandler:
|
||||||
'failed' : self.failed_p2pmsg_lzma,
|
'failed' : self.failed_p2pmsg_lzma,
|
||||||
'transmitted': self.transmitted_p2pmsg_lzma,
|
'transmitted': self.transmitted_p2pmsg_lzma,
|
||||||
},
|
},
|
||||||
|
ARQ_SESSION_TYPES.p2p_connection: {
|
||||||
|
'prepare': self.prepare_p2p_connection,
|
||||||
|
'handle': self.handle_p2p_connection,
|
||||||
|
'failed': self.failed_p2p_connection,
|
||||||
|
'transmitted': self.transmitted_p2p_connection,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
@ -162,3 +169,35 @@ class ARQDataTypeHandler:
|
||||||
decompressed_data = lzma.decompress(data)
|
decompressed_data = lzma.decompress(data)
|
||||||
message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics)
|
message_transmitted(self.event_manager, self.state_manager, decompressed_data, statistics)
|
||||||
return decompressed_data
|
return decompressed_data
|
||||||
|
|
||||||
|
|
||||||
|
def prepare_p2p_connection(self, data):
|
||||||
|
compressed_data = gzip.compress(data)
|
||||||
|
self.log(f"Preparing gzip compressed P2P_CONNECTION data: {len(data)} Bytes >>> {len(compressed_data)} Bytes")
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
return compressed_data
|
||||||
|
|
||||||
|
def handle_p2p_connection(self, data, statistics):
|
||||||
|
decompressed_data = gzip.decompress(data)
|
||||||
|
self.log(f"Handling gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes")
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
print(decompressed_data)
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
for session_id in self.state_manager.p2p_connection_sessions:
|
||||||
|
print(session_id)
|
||||||
|
self.state_manager.p2p_connection_sessions[session_id].received_arq(decompressed_data)
|
||||||
|
|
||||||
|
def failed_p2p_connection(self, data, statistics):
|
||||||
|
decompressed_data = gzip.decompress(data)
|
||||||
|
self.log(f"Handling failed gzip compressed P2P_CONNECTION data: {len(decompressed_data)} Bytes from {len(data)} Bytes", isWarning=True)
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
return decompressed_data
|
||||||
|
|
||||||
|
def transmitted_p2p_connection(self, data, statistics):
|
||||||
|
|
||||||
|
decompressed_data = gzip.decompress(data)
|
||||||
|
print(decompressed_data)
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
for session_id in self.state_manager.p2p_connection_sessions:
|
||||||
|
print(session_id)
|
||||||
|
self.state_manager.p2p_connection_sessions[session_id].transmitted_arq()
|
|
@ -32,13 +32,13 @@ class ARQSession:
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, config: dict, modem, dxcall: str):
|
def __init__(self, config: dict, modem, dxcall: str, state_manager):
|
||||||
self.logger = structlog.get_logger(type(self).__name__)
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
self.config = config
|
self.config = config
|
||||||
|
|
||||||
self.event_manager: EventManager = modem.event_manager
|
self.event_manager: EventManager = modem.event_manager
|
||||||
self.states = modem.states
|
#self.states = modem.states
|
||||||
|
self.states = state_manager
|
||||||
self.states.setARQ(True)
|
self.states.setARQ(True)
|
||||||
|
|
||||||
self.snr = []
|
self.snr = []
|
||||||
|
|
|
@ -59,8 +59,8 @@ class ARQSessionIRS(arq_session.ARQSession):
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, config: dict, modem, dxcall: str, session_id: int):
|
def __init__(self, config: dict, modem, dxcall: str, session_id: int, state_manager):
|
||||||
super().__init__(config, modem, dxcall)
|
super().__init__(config, modem, dxcall, state_manager)
|
||||||
|
|
||||||
self.id = session_id
|
self.id = session_id
|
||||||
self.dxcall = dxcall
|
self.dxcall = dxcall
|
||||||
|
|
|
@ -53,7 +53,7 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes):
|
def __init__(self, config: dict, modem, dxcall: str, state_manager, data: bytearray, type_byte: bytes):
|
||||||
super().__init__(config, modem, dxcall)
|
super().__init__(config, modem, dxcall, state_manager)
|
||||||
self.state_manager = state_manager
|
self.state_manager = state_manager
|
||||||
self.data = data
|
self.data = data
|
||||||
self.total_length = len(data)
|
self.total_length = len(data)
|
||||||
|
@ -191,6 +191,10 @@ class ARQSessionISS(arq_session.ARQSession):
|
||||||
self.set_state(ISS_State.ENDED)
|
self.set_state(ISS_State.ENDED)
|
||||||
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
|
self.log(f"All data transfered! flag_final={irs_frame['flag']['FINAL']}, flag_checksum={irs_frame['flag']['CHECKSUM']}")
|
||||||
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
|
self.event_manager.send_arq_session_finished(True, self.id, self.dxcall,True, self.state.name, statistics=self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
|
||||||
|
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
print(self.arq_data_type_handler.state_manager.p2p_connection_sessions)
|
||||||
|
|
||||||
self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
|
self.arq_data_type_handler.transmitted(self.type_byte, self.data, self.calculate_session_statistics(self.confirmed_bytes, self.total_length))
|
||||||
self.state_manager.remove_arq_iss_session(self.id)
|
self.state_manager.remove_arq_iss_session(self.id)
|
||||||
self.states.setARQ(False)
|
self.states.setARQ(False)
|
||||||
|
|
|
@ -8,7 +8,7 @@ from arq_data_type_handler import ARQDataTypeHandler
|
||||||
|
|
||||||
class TxCommand():
|
class TxCommand():
|
||||||
|
|
||||||
def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}):
|
def __init__(self, config: dict, state_manager: StateManager, event_manager, apiParams:dict = {}, socket_command_handler=None):
|
||||||
self.config = config
|
self.config = config
|
||||||
self.logger = structlog.get_logger(type(self).__name__)
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
self.state_manager = state_manager
|
self.state_manager = state_manager
|
||||||
|
@ -16,6 +16,7 @@ class TxCommand():
|
||||||
self.set_params_from_api(apiParams)
|
self.set_params_from_api(apiParams)
|
||||||
self.frame_factory = DataFrameFactory(config)
|
self.frame_factory = DataFrameFactory(config)
|
||||||
self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager)
|
self.arq_data_type_handler = ARQDataTypeHandler(event_manager, state_manager)
|
||||||
|
self.socket_command_handler = socket_command_handler
|
||||||
|
|
||||||
def log(self, message, isWarning = False):
|
def log(self, message, isWarning = False):
|
||||||
msg = f"[{type(self).__name__}]: {message}"
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
|
37
modem/command_p2p_connection.py
Normal file
37
modem/command_p2p_connection.py
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
import queue
|
||||||
|
from command import TxCommand
|
||||||
|
import api_validations
|
||||||
|
import base64
|
||||||
|
from queue import Queue
|
||||||
|
from p2p_connection import P2PConnection
|
||||||
|
|
||||||
|
class P2PConnectionCommand(TxCommand):
|
||||||
|
|
||||||
|
def set_params_from_api(self, apiParams):
|
||||||
|
self.origin = apiParams['origin']
|
||||||
|
if not api_validations.validate_freedata_callsign(self.origin):
|
||||||
|
self.origin = f"{self.origin}-0"
|
||||||
|
|
||||||
|
self.destination = apiParams['destination']
|
||||||
|
if not api_validations.validate_freedata_callsign(self.destination):
|
||||||
|
self.destination = f"{self.destination}-0"
|
||||||
|
|
||||||
|
|
||||||
|
def connect(self, event_queue: Queue, modem):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run(self, event_queue: Queue, modem):
|
||||||
|
try:
|
||||||
|
self.emit_event(event_queue)
|
||||||
|
self.logger.info(self.log_message())
|
||||||
|
session = P2PConnection(self.config, modem, self.origin, self.destination, self.state_manager, self.event_manager, self.socket_command_handler)
|
||||||
|
if session.session_id:
|
||||||
|
self.state_manager.register_p2p_connection_session(session)
|
||||||
|
session.connect()
|
||||||
|
return session
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log(f"Error starting P2P Connection session: {e}", isWarning=True)
|
||||||
|
|
||||||
|
return False
|
|
@ -49,6 +49,13 @@ enable_morse_identifier = False
|
||||||
respond_to_cq = True
|
respond_to_cq = True
|
||||||
tx_delay = 50
|
tx_delay = 50
|
||||||
maximum_bandwidth = 1700
|
maximum_bandwidth = 1700
|
||||||
|
enable_socket_interface = False
|
||||||
|
|
||||||
|
[SOCKET_INTERFACE]
|
||||||
|
enable = False
|
||||||
|
host = 127.0.0.1
|
||||||
|
cmd_port = 8000
|
||||||
|
data_port = 8001
|
||||||
|
|
||||||
[MESSAGES]
|
[MESSAGES]
|
||||||
enable_auto_repeat = False
|
enable_auto_repeat = False
|
||||||
|
|
|
@ -59,7 +59,15 @@ class CONFIG:
|
||||||
'enable_morse_identifier': bool,
|
'enable_morse_identifier': bool,
|
||||||
'maximum_bandwidth': int,
|
'maximum_bandwidth': int,
|
||||||
'respond_to_cq': bool,
|
'respond_to_cq': bool,
|
||||||
'tx_delay': int
|
'tx_delay': int,
|
||||||
|
'enable_socket_interface': bool,
|
||||||
|
},
|
||||||
|
'SOCKET_INTERFACE': {
|
||||||
|
'enable' : bool,
|
||||||
|
'host' : str,
|
||||||
|
'cmd_port' : int,
|
||||||
|
'data_port' : int,
|
||||||
|
|
||||||
},
|
},
|
||||||
'MESSAGES': {
|
'MESSAGES': {
|
||||||
'enable_auto_repeat': bool,
|
'enable_auto_repeat': bool,
|
||||||
|
|
|
@ -18,6 +18,7 @@ class DataFrameFactory:
|
||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, config):
|
def __init__(self, config):
|
||||||
|
|
||||||
self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}"
|
self.myfullcall = f"{config['STATION']['mycall']}-{config['STATION']['myssid']}"
|
||||||
self.mygrid = config['STATION']['mygrid']
|
self.mygrid = config['STATION']['mygrid']
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@ class DataFrameFactory:
|
||||||
self._load_ping_templates()
|
self._load_ping_templates()
|
||||||
self._load_fec_templates()
|
self._load_fec_templates()
|
||||||
self._load_arq_templates()
|
self._load_arq_templates()
|
||||||
|
self._load_p2p_connection_templates()
|
||||||
|
|
||||||
def _load_broadcast_templates(self):
|
def _load_broadcast_templates(self):
|
||||||
# cq frame
|
# cq frame
|
||||||
|
@ -161,6 +163,63 @@ class DataFrameFactory:
|
||||||
"flag": 1,
|
"flag": 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def _load_p2p_connection_templates(self):
|
||||||
|
# p2p connect request
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"destination_crc": 3,
|
||||||
|
"origin": 6,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# connect ACK
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"destination_crc": 3,
|
||||||
|
"origin": 6,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# heartbeat for "is alive"
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ack heartbeat
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# p2p payload frames
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD.value] = {
|
||||||
|
"frame_length": None,
|
||||||
|
"session_id": 1,
|
||||||
|
"sequence_id": 1,
|
||||||
|
"data": "dynamic",
|
||||||
|
}
|
||||||
|
|
||||||
|
# p2p payload frame ack
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"session_id": 1,
|
||||||
|
"sequence_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# heartbeat for "is alive"
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ack heartbeat
|
||||||
|
self.template_list[FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value] = {
|
||||||
|
"frame_length": self.LENGTH_SIG1_FRAME,
|
||||||
|
"session_id": 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME):
|
def construct(self, frametype, content, frame_length = LENGTH_SIG1_FRAME):
|
||||||
frame_template = self.template_list[frametype.value]
|
frame_template = self.template_list[frametype.value]
|
||||||
|
@ -404,8 +463,9 @@ class DataFrameFactory:
|
||||||
"offset": offset.to_bytes(4, 'big'),
|
"offset": offset.to_bytes(4, 'big'),
|
||||||
"data": data,
|
"data": data,
|
||||||
}
|
}
|
||||||
frame = self.construct(FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode))
|
return self.construct(
|
||||||
return frame
|
FR_TYPE.ARQ_BURST_FRAME, payload, self.get_bytes_per_frame(freedv_mode)
|
||||||
|
)
|
||||||
|
|
||||||
def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int,
|
def build_arq_burst_ack(self, session_id: bytes, offset, speed_level: int,
|
||||||
frames_per_burst: int, snr: int, flag_final=False, flag_checksum=False, flag_abort=False):
|
frames_per_burst: int, snr: int, flag_final=False, flag_checksum=False, flag_abort=False):
|
||||||
|
@ -428,3 +488,62 @@ class DataFrameFactory:
|
||||||
"flag": flag.to_bytes(1, 'big'),
|
"flag": flag.to_bytes(1, 'big'),
|
||||||
}
|
}
|
||||||
return self.construct(FR_TYPE.ARQ_BURST_ACK, payload)
|
return self.construct(FR_TYPE.ARQ_BURST_ACK, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_connect(self, destination, origin, session_id):
|
||||||
|
payload = {
|
||||||
|
"destination_crc": helpers.get_crc_24(destination),
|
||||||
|
"origin": helpers.callsign_to_bytes(origin),
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_connect_ack(self, destination, origin, session_id):
|
||||||
|
payload = {
|
||||||
|
"destination_crc": helpers.get_crc_24(destination),
|
||||||
|
"origin": helpers.callsign_to_bytes(origin),
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_CONNECT_ACK, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_heartbeat(self, session_id):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_heartbeat_ack(self, session_id):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_HEARTBEAT_ACK, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_payload(self, freedv_mode: codec2.FREEDV_MODE, session_id: int, sequence_id: int, data: bytes):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
"sequence_id": sequence_id.to_bytes(1, 'big'),
|
||||||
|
"data": data,
|
||||||
|
}
|
||||||
|
return self.construct(
|
||||||
|
FR_TYPE.P2P_CONNECTION_PAYLOAD,
|
||||||
|
payload,
|
||||||
|
self.get_bytes_per_frame(freedv_mode),
|
||||||
|
)
|
||||||
|
|
||||||
|
def build_p2p_connection_payload_ack(self, session_id, sequence_id):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
"sequence_id": sequence_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_disconnect(self, session_id):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT, payload)
|
||||||
|
|
||||||
|
def build_p2p_connection_disconnect_ack(self, session_id):
|
||||||
|
payload = {
|
||||||
|
"session_id": session_id.to_bytes(1, 'big'),
|
||||||
|
}
|
||||||
|
return self.construct(FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK, payload)
|
||||||
|
|
|
@ -13,8 +13,11 @@ from frame_handler import FrameHandler
|
||||||
from frame_handler_ping import PingFrameHandler
|
from frame_handler_ping import PingFrameHandler
|
||||||
from frame_handler_cq import CQFrameHandler
|
from frame_handler_cq import CQFrameHandler
|
||||||
from frame_handler_arq_session import ARQFrameHandler
|
from frame_handler_arq_session import ARQFrameHandler
|
||||||
|
from frame_handler_p2p_connection import P2PConnectionFrameHandler
|
||||||
from frame_handler_beacon import BeaconFrameHandler
|
from frame_handler_beacon import BeaconFrameHandler
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DISPATCHER():
|
class DISPATCHER():
|
||||||
|
|
||||||
FRAME_HANDLER = {
|
FRAME_HANDLER = {
|
||||||
|
@ -22,9 +25,18 @@ class DISPATCHER():
|
||||||
FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"},
|
FR_TYPE.ARQ_SESSION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Open"},
|
||||||
FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"},
|
FR_TYPE.ARQ_SESSION_INFO_ACK.value: {"class": ARQFrameHandler, "name": "ARQ INFO ACK"},
|
||||||
FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"},
|
FR_TYPE.ARQ_SESSION_INFO.value: {"class": ARQFrameHandler, "name": "ARQ Data Channel Info"},
|
||||||
FR_TYPE.ARQ_CONNECTION_CLOSE.value: {"class": ARQFrameHandler, "name": "ARQ CLOSE SESSION"},
|
FR_TYPE.P2P_CONNECTION_CONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT"},
|
||||||
FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"},
|
FR_TYPE.P2P_CONNECTION_CONNECT_ACK.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection CONNECT ACK"},
|
||||||
FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"},
|
FR_TYPE.P2P_CONNECTION_DISCONNECT.value: {"class": P2PConnectionFrameHandler, "name": "P2P Connection DISCONNECT"},
|
||||||
|
FR_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: {"class": P2PConnectionFrameHandler,
|
||||||
|
"name": "P2P Connection DISCONNECT ACK"},
|
||||||
|
FR_TYPE.P2P_CONNECTION_PAYLOAD.value: {"class": P2PConnectionFrameHandler,
|
||||||
|
"name": "P2P Connection PAYLOAD"},
|
||||||
|
FR_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: {"class": P2PConnectionFrameHandler,
|
||||||
|
"name": "P2P Connection PAYLOAD ACK"},
|
||||||
|
|
||||||
|
#FR_TYPE.ARQ_CONNECTION_HB.value: {"class": ARQFrameHandler, "name": "ARQ HEARTBEAT"},
|
||||||
|
#FR_TYPE.ARQ_CONNECTION_OPEN.value: {"class": ARQFrameHandler, "name": "ARQ OPEN SESSION"},
|
||||||
FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"},
|
FR_TYPE.ARQ_STOP.value: {"class": ARQFrameHandler, "name": "ARQ STOP"},
|
||||||
FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"},
|
FR_TYPE.ARQ_STOP_ACK.value: {"class": ARQFrameHandler, "name": "ARQ STOP ACK"},
|
||||||
FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"},
|
FR_TYPE.BEACON.value: {"class": BeaconFrameHandler, "name": "BEACON"},
|
||||||
|
@ -82,7 +94,7 @@ class DISPATCHER():
|
||||||
|
|
||||||
if frametype not in self.FRAME_HANDLER:
|
if frametype not in self.FRAME_HANDLER:
|
||||||
self.log.warning(
|
self.log.warning(
|
||||||
"[Modem] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
|
"[DISPATCHER] ARQ - other frame type", frametype=FR_TYPE(frametype).name)
|
||||||
return
|
return
|
||||||
|
|
||||||
# instantiate handler
|
# instantiate handler
|
||||||
|
|
|
@ -34,7 +34,7 @@ class FrameHandler():
|
||||||
ft = self.details['frame']['frame_type']
|
ft = self.details['frame']['frame_type']
|
||||||
valid = False
|
valid = False
|
||||||
# Check for callsign checksum
|
# Check for callsign checksum
|
||||||
if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK']:
|
if ft in ['ARQ_SESSION_OPEN', 'ARQ_SESSION_OPEN_ACK', 'PING', 'PING_ACK', 'P2P_CONNECTION_CONNECT']:
|
||||||
valid, mycallsign = helpers.check_callsign(
|
valid, mycallsign = helpers.check_callsign(
|
||||||
call_with_ssid,
|
call_with_ssid,
|
||||||
self.details["frame"]["destination_crc"],
|
self.details["frame"]["destination_crc"],
|
||||||
|
@ -51,6 +51,20 @@ class FrameHandler():
|
||||||
session_id = self.details['frame']['session_id']
|
session_id = self.details['frame']['session_id']
|
||||||
if session_id in self.states.arq_iss_sessions:
|
if session_id in self.states.arq_iss_sessions:
|
||||||
valid = True
|
valid = True
|
||||||
|
|
||||||
|
# check for p2p connection
|
||||||
|
elif ft in ['P2P_CONNECTION_CONNECT']:
|
||||||
|
valid, mycallsign = helpers.check_callsign(
|
||||||
|
call_with_ssid,
|
||||||
|
self.details["frame"]["destination_crc"],
|
||||||
|
self.config['STATION']['ssid_list'])
|
||||||
|
|
||||||
|
#check for p2p connection
|
||||||
|
elif ft in ['P2P_CONNECTION_CONNECT_ACK', 'P2P_CONNECTION_PAYLOAD', 'P2P_CONNECTION_PAYLOAD_ACK', 'P2P_CONNECTION_DISCONNECT', 'P2P_CONNECTION_DISCONNECT_ACK']:
|
||||||
|
session_id = self.details['frame']['session_id']
|
||||||
|
if session_id in self.states.p2p_connection_sessions:
|
||||||
|
valid = True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
valid = False
|
valid = False
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,8 @@ class ARQFrameHandler(frame_handler.FrameHandler):
|
||||||
session = ARQSessionIRS(self.config,
|
session = ARQSessionIRS(self.config,
|
||||||
self.modem,
|
self.modem,
|
||||||
frame['origin'],
|
frame['origin'],
|
||||||
session_id)
|
session_id,
|
||||||
|
self.states)
|
||||||
self.states.register_arq_irs_session(session)
|
self.states.register_arq_irs_session(session)
|
||||||
|
|
||||||
elif frame['frame_type_int'] in [
|
elif frame['frame_type_int'] in [
|
||||||
|
|
54
modem/frame_handler_p2p_connection.py
Normal file
54
modem/frame_handler_p2p_connection.py
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
from queue import Queue
|
||||||
|
import frame_handler
|
||||||
|
from event_manager import EventManager
|
||||||
|
from state_manager import StateManager
|
||||||
|
from modem_frametypes import FRAME_TYPE as FR
|
||||||
|
from p2p_connection import P2PConnection
|
||||||
|
|
||||||
|
class P2PConnectionFrameHandler(frame_handler.FrameHandler):
|
||||||
|
|
||||||
|
def follow_protocol(self):
|
||||||
|
|
||||||
|
if not self.should_respond():
|
||||||
|
return
|
||||||
|
|
||||||
|
frame = self.details['frame']
|
||||||
|
session_id = frame['session_id']
|
||||||
|
snr = self.details["snr"]
|
||||||
|
frequency_offset = self.details["frequency_offset"]
|
||||||
|
|
||||||
|
if frame['frame_type_int'] == FR.P2P_CONNECTION_CONNECT.value:
|
||||||
|
|
||||||
|
# Lost OPEN_ACK case .. ISS will retry opening a session
|
||||||
|
if session_id in self.states.arq_irs_sessions:
|
||||||
|
session = self.states.p2p_connection_sessions[session_id]
|
||||||
|
|
||||||
|
# Normal case when receiving a SESSION_OPEN for the first time
|
||||||
|
else:
|
||||||
|
# if self.states.check_if_running_arq_session():
|
||||||
|
# self.logger.warning("DISCARDING SESSION OPEN because of ongoing ARQ session ", frame=frame)
|
||||||
|
# return
|
||||||
|
print(frame)
|
||||||
|
session = P2PConnection(self.config,
|
||||||
|
self.modem,
|
||||||
|
frame['origin'],
|
||||||
|
frame['destination_crc'],
|
||||||
|
self.states, self.event_manager)
|
||||||
|
session.session_id = session_id
|
||||||
|
self.states.register_p2p_connection_session(session)
|
||||||
|
|
||||||
|
elif frame['frame_type_int'] in [
|
||||||
|
FR.P2P_CONNECTION_CONNECT_ACK.value,
|
||||||
|
FR.P2P_CONNECTION_DISCONNECT.value,
|
||||||
|
FR.P2P_CONNECTION_DISCONNECT_ACK.value,
|
||||||
|
FR.P2P_CONNECTION_PAYLOAD.value,
|
||||||
|
FR.P2P_CONNECTION_PAYLOAD_ACK.value,
|
||||||
|
]:
|
||||||
|
session = self.states.get_p2p_connection_session(session_id)
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.logger.warning("DISCARDING FRAME", frame=frame)
|
||||||
|
return
|
||||||
|
|
||||||
|
session.set_details(snr, frequency_offset)
|
||||||
|
session.on_frame_received(frame)
|
|
@ -6,9 +6,6 @@ from enum import Enum
|
||||||
|
|
||||||
class FRAME_TYPE(Enum):
|
class FRAME_TYPE(Enum):
|
||||||
"""Lookup for frame types"""
|
"""Lookup for frame types"""
|
||||||
ARQ_CONNECTION_OPEN = 1
|
|
||||||
ARQ_CONNECTION_HB = 2
|
|
||||||
ARQ_CONNECTION_CLOSE = 3
|
|
||||||
ARQ_STOP = 10
|
ARQ_STOP = 10
|
||||||
ARQ_STOP_ACK = 11
|
ARQ_STOP_ACK = 11
|
||||||
ARQ_SESSION_OPEN = 12
|
ARQ_SESSION_OPEN = 12
|
||||||
|
@ -17,6 +14,14 @@ class FRAME_TYPE(Enum):
|
||||||
ARQ_SESSION_INFO_ACK = 15
|
ARQ_SESSION_INFO_ACK = 15
|
||||||
ARQ_BURST_FRAME = 20
|
ARQ_BURST_FRAME = 20
|
||||||
ARQ_BURST_ACK = 21
|
ARQ_BURST_ACK = 21
|
||||||
|
P2P_CONNECTION_CONNECT = 30
|
||||||
|
P2P_CONNECTION_CONNECT_ACK = 31
|
||||||
|
P2P_CONNECTION_HEARTBEAT = 32
|
||||||
|
P2P_CONNECTION_HEARTBEAT_ACK = 33
|
||||||
|
P2P_CONNECTION_PAYLOAD = 34
|
||||||
|
P2P_CONNECTION_PAYLOAD_ACK = 35
|
||||||
|
P2P_CONNECTION_DISCONNECT = 36
|
||||||
|
P2P_CONNECTION_DISCONNECT_ACK = 37
|
||||||
MESH_BROADCAST = 100
|
MESH_BROADCAST = 100
|
||||||
MESH_SIGNALLING_PING = 101
|
MESH_SIGNALLING_PING = 101
|
||||||
MESH_SIGNALLING_PING_ACK = 102
|
MESH_SIGNALLING_PING_ACK = 102
|
||||||
|
|
315
modem/p2p_connection.py
Normal file
315
modem/p2p_connection.py
Normal file
|
@ -0,0 +1,315 @@
|
||||||
|
import threading
|
||||||
|
from enum import Enum
|
||||||
|
from modem_frametypes import FRAME_TYPE
|
||||||
|
from codec2 import FREEDV_MODE
|
||||||
|
import data_frame_factory
|
||||||
|
import structlog
|
||||||
|
import random
|
||||||
|
from queue import Queue
|
||||||
|
import time
|
||||||
|
from command_arq_raw import ARQRawCommand
|
||||||
|
import numpy as np
|
||||||
|
import base64
|
||||||
|
from arq_data_type_handler import ARQDataTypeHandler, ARQ_SESSION_TYPES
|
||||||
|
from arq_session_iss import ARQSessionISS
|
||||||
|
|
||||||
|
class States(Enum):
|
||||||
|
NEW = 0
|
||||||
|
CONNECTING = 1
|
||||||
|
CONNECT_SENT = 2
|
||||||
|
CONNECT_ACK_SENT = 3
|
||||||
|
CONNECTED = 4
|
||||||
|
#HEARTBEAT_SENT = 5
|
||||||
|
#HEARTBEAT_ACK_SENT = 6
|
||||||
|
PAYLOAD_SENT = 7
|
||||||
|
ARQ_SESSION = 8
|
||||||
|
DISCONNECTING = 9
|
||||||
|
DISCONNECTED = 10
|
||||||
|
FAILED = 11
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class P2PConnection:
|
||||||
|
STATE_TRANSITION = {
|
||||||
|
States.NEW: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs',
|
||||||
|
},
|
||||||
|
States.CONNECTING: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
|
||||||
|
},
|
||||||
|
States.CONNECTED: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_CONNECT.value: 'connected_irs',
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_CONNECT_ACK.value: 'connected_iss',
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_PAYLOAD.value: 'received_data',
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
|
||||||
|
},
|
||||||
|
States.PAYLOAD_SENT: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_PAYLOAD_ACK.value: 'transmitted_data',
|
||||||
|
},
|
||||||
|
States.DISCONNECTING: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
|
||||||
|
},
|
||||||
|
States.DISCONNECTED: {
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT.value: 'received_disconnect',
|
||||||
|
FRAME_TYPE.P2P_CONNECTION_DISCONNECT_ACK.value: 'received_disconnect_ack',
|
||||||
|
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, config: dict, modem, origin: str, destination: str, state_manager, event_manager, socket_command_handler=None):
|
||||||
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
|
self.config = config
|
||||||
|
self.frame_factory = data_frame_factory.DataFrameFactory(self.config)
|
||||||
|
|
||||||
|
self.socket_command_handler = socket_command_handler
|
||||||
|
|
||||||
|
self.destination = destination
|
||||||
|
self.origin = origin
|
||||||
|
self.bandwidth = 0
|
||||||
|
|
||||||
|
self.state_manager = state_manager
|
||||||
|
self.event_manager = event_manager
|
||||||
|
self.modem = modem
|
||||||
|
self.modem.demodulator.set_decode_mode([])
|
||||||
|
|
||||||
|
self.p2p_data_rx_queue = Queue()
|
||||||
|
self.p2p_data_tx_queue = Queue()
|
||||||
|
|
||||||
|
self.arq_data_type_handler = ARQDataTypeHandler(self.event_manager, self.state_manager)
|
||||||
|
|
||||||
|
|
||||||
|
self.state = States.NEW
|
||||||
|
self.session_id = self.generate_id()
|
||||||
|
|
||||||
|
self.event_frame_received = threading.Event()
|
||||||
|
|
||||||
|
self.RETRIES_CONNECT = 1
|
||||||
|
self.TIMEOUT_CONNECT = 10
|
||||||
|
self.TIMEOUT_DATA = 5
|
||||||
|
self.RETRIES_DATA = 5
|
||||||
|
self.ENTIRE_CONNECTION_TIMEOUT = 100
|
||||||
|
|
||||||
|
self.is_ISS = False # Indicator, if we are ISS or IRS
|
||||||
|
|
||||||
|
self.last_data_timestamp= time.time()
|
||||||
|
|
||||||
|
self.start_data_processing_worker()
|
||||||
|
|
||||||
|
|
||||||
|
def start_data_processing_worker(self):
|
||||||
|
"""Starts a worker thread to monitor the transmit data queue and process data."""
|
||||||
|
|
||||||
|
def data_processing_worker():
|
||||||
|
while True:
|
||||||
|
if time.time() > self.last_data_timestamp + self.ENTIRE_CONNECTION_TIMEOUT and self.state is not States.ARQ_SESSION:
|
||||||
|
self.disconnect()
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self.p2p_data_tx_queue.empty() and self.state == States.CONNECTED:
|
||||||
|
self.process_data_queue()
|
||||||
|
threading.Event().wait(0.1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Create and start the worker thread
|
||||||
|
worker_thread = threading.Thread(target=data_processing_worker, daemon=True)
|
||||||
|
worker_thread.start()
|
||||||
|
|
||||||
|
def generate_id(self):
|
||||||
|
while True:
|
||||||
|
random_int = random.randint(1,255)
|
||||||
|
if random_int not in self.state_manager.p2p_connection_sessions:
|
||||||
|
return random_int
|
||||||
|
|
||||||
|
if len(self.state_manager.p2p_connection_sessions) >= 255:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def set_details(self, snr, frequency_offset):
|
||||||
|
self.snr = snr
|
||||||
|
self.frequency_offset = frequency_offset
|
||||||
|
|
||||||
|
def log(self, message, isWarning = False):
|
||||||
|
msg = f"[{type(self).__name__}][id={self.session_id}][state={self.state}][ISS={bool(self.is_ISS)}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
|
def set_state(self, state):
|
||||||
|
if self.state == state:
|
||||||
|
self.log(f"{type(self).__name__} state {self.state.name} unchanged.")
|
||||||
|
else:
|
||||||
|
self.log(f"{type(self).__name__} state change from {self.state.name} to {state.name}")
|
||||||
|
self.state = state
|
||||||
|
|
||||||
|
def on_frame_received(self, frame):
|
||||||
|
self.last_data_timestamp = time.time()
|
||||||
|
self.event_frame_received.set()
|
||||||
|
self.log(f"Received {frame['frame_type']}")
|
||||||
|
frame_type = frame['frame_type_int']
|
||||||
|
if self.state in self.STATE_TRANSITION:
|
||||||
|
if frame_type in self.STATE_TRANSITION[self.state]:
|
||||||
|
action_name = self.STATE_TRANSITION[self.state][frame_type]
|
||||||
|
response = getattr(self, action_name)(frame)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
self.log(f"Ignoring unknown transition from state {self.state.name} with frame {frame['frame_type']}")
|
||||||
|
|
||||||
|
def transmit_frame(self, frame: bytearray, mode='auto'):
|
||||||
|
self.log("Transmitting frame")
|
||||||
|
if mode in ['auto']:
|
||||||
|
mode = self.get_mode_by_speed_level(self.speed_level)
|
||||||
|
|
||||||
|
self.modem.transmit(mode, 1, 1, frame)
|
||||||
|
|
||||||
|
def transmit_wait_and_retry(self, frame_or_burst, timeout, retries, mode):
|
||||||
|
while retries > 0:
|
||||||
|
self.event_frame_received = threading.Event()
|
||||||
|
if isinstance(frame_or_burst, list): burst = frame_or_burst
|
||||||
|
else: burst = [frame_or_burst]
|
||||||
|
for f in burst:
|
||||||
|
self.transmit_frame(f, mode)
|
||||||
|
self.event_frame_received.clear()
|
||||||
|
self.log(f"Waiting {timeout} seconds...")
|
||||||
|
if self.event_frame_received.wait(timeout):
|
||||||
|
return
|
||||||
|
self.log("Timeout!")
|
||||||
|
retries = retries - 1
|
||||||
|
|
||||||
|
#self.connected_iss() # override connection state for simulation purposes
|
||||||
|
self.session_failed()
|
||||||
|
|
||||||
|
def launch_twr(self, frame_or_burst, timeout, retries, mode):
|
||||||
|
twr = threading.Thread(target = self.transmit_wait_and_retry, args=[frame_or_burst, timeout, retries, mode], daemon=True)
|
||||||
|
twr.start()
|
||||||
|
|
||||||
|
def transmit_and_wait_irs(self, frame, timeout, mode):
|
||||||
|
self.event_frame_received.clear()
|
||||||
|
self.transmit_frame(frame, mode)
|
||||||
|
self.log(f"Waiting {timeout} seconds...")
|
||||||
|
#if not self.event_frame_received.wait(timeout):
|
||||||
|
# self.log("Timeout waiting for ISS. Session failed.")
|
||||||
|
# self.transmission_failed()
|
||||||
|
|
||||||
|
def launch_twr_irs(self, frame, timeout, mode):
|
||||||
|
thread_wait = threading.Thread(target = self.transmit_and_wait_irs,
|
||||||
|
args = [frame, timeout, mode], daemon=True)
|
||||||
|
thread_wait.start()
|
||||||
|
|
||||||
|
def connect(self):
|
||||||
|
self.set_state(States.CONNECTING)
|
||||||
|
self.is_ISS = True
|
||||||
|
session_open_frame = self.frame_factory.build_p2p_connection_connect(self.origin, self.destination, self.session_id)
|
||||||
|
self.launch_twr(session_open_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
|
return
|
||||||
|
|
||||||
|
def connected_iss(self, frame=None):
|
||||||
|
self.log("CONNECTED ISS...........................")
|
||||||
|
self.set_state(States.CONNECTED)
|
||||||
|
self.is_ISS = True
|
||||||
|
if self.socket_command_handler:
|
||||||
|
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
|
||||||
|
|
||||||
|
def connected_irs(self, frame):
|
||||||
|
self.log("CONNECTED IRS...........................")
|
||||||
|
self.state_manager.register_p2p_connection_session(self)
|
||||||
|
self.set_state(States.CONNECTED)
|
||||||
|
self.is_ISS = False
|
||||||
|
self.orign = frame["origin"]
|
||||||
|
self.destination = frame["destination_crc"]
|
||||||
|
|
||||||
|
if self.socket_command_handler:
|
||||||
|
self.socket_command_handler.socket_respond_connected(self.origin, self.destination, self.bandwidth)
|
||||||
|
|
||||||
|
session_open_frame = self.frame_factory.build_p2p_connection_connect_ack(self.destination, self.origin, self.session_id)
|
||||||
|
self.launch_twr_irs(session_open_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
||||||
|
|
||||||
|
def session_failed(self):
|
||||||
|
self.set_state(States.FAILED)
|
||||||
|
if self.socket_command_handler:
|
||||||
|
self.socket_command_handler.socket_respond_disconnected()
|
||||||
|
|
||||||
|
def process_data_queue(self, frame=None):
|
||||||
|
if not self.p2p_data_tx_queue.empty():
|
||||||
|
print("processing data....")
|
||||||
|
|
||||||
|
self.set_state(States.PAYLOAD_SENT)
|
||||||
|
data = self.p2p_data_tx_queue.get()
|
||||||
|
sequence_id = random.randint(0,255)
|
||||||
|
data = data.encode('utf-8')
|
||||||
|
|
||||||
|
if len(data) <= 11:
|
||||||
|
mode = FREEDV_MODE.signalling
|
||||||
|
elif 11 < len(data) < 32:
|
||||||
|
mode = FREEDV_MODE.datac4
|
||||||
|
else:
|
||||||
|
self.transmit_arq(data)
|
||||||
|
return
|
||||||
|
|
||||||
|
payload = self.frame_factory.build_p2p_connection_payload(mode, self.session_id, sequence_id, data)
|
||||||
|
self.launch_twr(payload, self.TIMEOUT_DATA, self.RETRIES_DATA,mode=mode)
|
||||||
|
return
|
||||||
|
|
||||||
|
def prepare_data_chunk(self, data, mode):
|
||||||
|
return data
|
||||||
|
|
||||||
|
def received_data(self, frame):
|
||||||
|
print(frame)
|
||||||
|
self.p2p_data_rx_queue.put(frame['data'])
|
||||||
|
|
||||||
|
ack_data = self.frame_factory.build_p2p_connection_payload_ack(self.session_id, 0)
|
||||||
|
self.launch_twr_irs(ack_data, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
||||||
|
|
||||||
|
def transmit_data_ack(self, frame):
|
||||||
|
print(frame)
|
||||||
|
|
||||||
|
def transmitted_data(self, frame):
|
||||||
|
print("transmitted data...")
|
||||||
|
self.set_state(States.CONNECTED)
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
if self.state not in [States.DISCONNECTING, States.DISCONNECTED]:
|
||||||
|
self.set_state(States.DISCONNECTING)
|
||||||
|
disconnect_frame = self.frame_factory.build_p2p_connection_disconnect(self.session_id)
|
||||||
|
self.launch_twr(disconnect_frame, self.TIMEOUT_CONNECT, self.RETRIES_CONNECT, mode=FREEDV_MODE.signalling)
|
||||||
|
return
|
||||||
|
|
||||||
|
def received_disconnect(self, frame):
|
||||||
|
self.log("DISCONNECTED...............")
|
||||||
|
self.set_state(States.DISCONNECTED)
|
||||||
|
if self.socket_command_handler:
|
||||||
|
self.socket_command_handler.socket_respond_disconnected()
|
||||||
|
self.is_ISS = False
|
||||||
|
disconnect_ack_frame = self.frame_factory.build_p2p_connection_disconnect_ack(self.session_id)
|
||||||
|
self.launch_twr_irs(disconnect_ack_frame, self.ENTIRE_CONNECTION_TIMEOUT, mode=FREEDV_MODE.signalling)
|
||||||
|
|
||||||
|
def received_disconnect_ack(self, frame):
|
||||||
|
self.log("DISCONNECTED...............")
|
||||||
|
self.set_state(States.DISCONNECTED)
|
||||||
|
if self.socket_command_handler:
|
||||||
|
self.socket_command_handler.socket_respond_disconnected()
|
||||||
|
|
||||||
|
def transmit_arq(self, data):
|
||||||
|
self.set_state(States.ARQ_SESSION)
|
||||||
|
|
||||||
|
print("----------------------------------------------------------------")
|
||||||
|
print(self.destination)
|
||||||
|
print(self.state_manager.p2p_connection_sessions)
|
||||||
|
|
||||||
|
prepared_data, type_byte = self.arq_data_type_handler.prepare(data, ARQ_SESSION_TYPES.p2p_connection)
|
||||||
|
iss = ARQSessionISS(self.config, self.modem, 'AA1AAA-1', self.state_manager, prepared_data, type_byte)
|
||||||
|
iss.id = self.session_id
|
||||||
|
if iss.id:
|
||||||
|
self.state_manager.register_arq_iss_session(iss)
|
||||||
|
iss.start()
|
||||||
|
return iss
|
||||||
|
|
||||||
|
def transmitted_arq(self):
|
||||||
|
self.last_data_timestamp = time.time()
|
||||||
|
self.set_state(States.CONNECTED)
|
||||||
|
|
||||||
|
def received_arq(self, data):
|
||||||
|
self.last_data_timestamp = time.time()
|
||||||
|
self.set_state(States.CONNECTED)
|
||||||
|
self.p2p_data_rx_queue.put(data)
|
||||||
|
|
|
@ -327,7 +327,9 @@ def sock_states(sock):
|
||||||
@atexit.register
|
@atexit.register
|
||||||
def stop_server():
|
def stop_server():
|
||||||
try:
|
try:
|
||||||
app.service_manager.stop_modem()
|
app.service_manager.modem_service.put("stop")
|
||||||
|
app.socket_interface_manager.stop_servers()
|
||||||
|
|
||||||
if app.service_manager.modem:
|
if app.service_manager.modem:
|
||||||
app.service_manager.modem.sd_input_stream.stop
|
app.service_manager.modem.sd_input_stream.stop
|
||||||
audio.sd._terminate()
|
audio.sd._terminate()
|
||||||
|
@ -346,6 +348,7 @@ if __name__ == "__main__":
|
||||||
app.config_manager = CONFIG(config_file)
|
app.config_manager = CONFIG(config_file)
|
||||||
|
|
||||||
# start modem
|
# start modem
|
||||||
|
app.p2p_data_queue = queue.Queue() # queue which holds processing data of p2p connections
|
||||||
app.state_queue = queue.Queue() # queue which holds latest states
|
app.state_queue = queue.Queue() # queue which holds latest states
|
||||||
app.modem_events = queue.Queue() # queue which holds latest events
|
app.modem_events = queue.Queue() # queue which holds latest events
|
||||||
app.modem_fft = queue.Queue() # queue which holds latest fft data
|
app.modem_fft = queue.Queue() # queue which holds latest fft data
|
||||||
|
@ -357,6 +360,7 @@ if __name__ == "__main__":
|
||||||
app.schedule_manager = ScheduleManager(app.MODEM_VERSION, app.config_manager, app.state_manager, app.event_manager)
|
app.schedule_manager = ScheduleManager(app.MODEM_VERSION, app.config_manager, app.state_manager, app.event_manager)
|
||||||
# start service manager
|
# start service manager
|
||||||
app.service_manager = service_manager.SM(app)
|
app.service_manager = service_manager.SM(app)
|
||||||
|
|
||||||
# start modem service
|
# start modem service
|
||||||
app.modem_service.put("start")
|
app.modem_service.put("start")
|
||||||
# initialize database default values
|
# initialize database default values
|
||||||
|
@ -373,3 +377,4 @@ if __name__ == "__main__":
|
||||||
modemport = 5000
|
modemport = 5000
|
||||||
|
|
||||||
app.run(modemaddress, modemport)
|
app.run(modemaddress, modemport)
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ import structlog
|
||||||
import audio
|
import audio
|
||||||
|
|
||||||
import radio_manager
|
import radio_manager
|
||||||
|
from socket_interface import SocketInterfaceHandler
|
||||||
|
|
||||||
class SM:
|
class SM:
|
||||||
def __init__(self, app):
|
def __init__(self, app):
|
||||||
|
@ -19,7 +19,7 @@ class SM:
|
||||||
self.state_manager = app.state_manager
|
self.state_manager = app.state_manager
|
||||||
self.event_manager = app.event_manager
|
self.event_manager = app.event_manager
|
||||||
self.schedule_manager = app.schedule_manager
|
self.schedule_manager = app.schedule_manager
|
||||||
|
self.socket_interface_manager = None
|
||||||
|
|
||||||
runner_thread = threading.Thread(
|
runner_thread = threading.Thread(
|
||||||
target=self.runner, name="runner thread", daemon=True
|
target=self.runner, name="runner thread", daemon=True
|
||||||
|
@ -34,15 +34,23 @@ class SM:
|
||||||
self.start_radio_manager()
|
self.start_radio_manager()
|
||||||
self.start_modem()
|
self.start_modem()
|
||||||
|
|
||||||
|
if self.config['MODEM']['enable_socket_interface']:
|
||||||
|
self.socket_interface_manager = SocketInterfaceHandler(self.modem, self.app.config_manager, self.state_manager, self.event_manager).start_servers()
|
||||||
|
|
||||||
elif cmd in ['stop'] and self.modem:
|
elif cmd in ['stop'] and self.modem:
|
||||||
self.stop_modem()
|
self.stop_modem()
|
||||||
self.stop_radio_manager()
|
self.stop_radio_manager()
|
||||||
|
if self.config['MODEM']['enable_socket_interface']:
|
||||||
|
self.socket_interface_manager.stop_servers()
|
||||||
# we need to wait a bit for avoiding a portaudio crash
|
# we need to wait a bit for avoiding a portaudio crash
|
||||||
threading.Event().wait(0.5)
|
threading.Event().wait(0.5)
|
||||||
|
|
||||||
elif cmd in ['restart']:
|
elif cmd in ['restart']:
|
||||||
self.stop_modem()
|
self.stop_modem()
|
||||||
self.stop_radio_manager()
|
self.stop_radio_manager()
|
||||||
|
if self.config['MODEM']['enable_socket_interface']:
|
||||||
|
self.socket_interface_manager.stop_servers()
|
||||||
|
|
||||||
# we need to wait a bit for avoiding a portaudio crash
|
# we need to wait a bit for avoiding a portaudio crash
|
||||||
threading.Event().wait(0.5)
|
threading.Event().wait(0.5)
|
||||||
|
|
||||||
|
|
186
modem/socket_interface.py
Normal file
186
modem/socket_interface.py
Normal file
|
@ -0,0 +1,186 @@
|
||||||
|
""" WORK IN PROGRESS by DJ2LS"""
|
||||||
|
|
||||||
|
import socketserver
|
||||||
|
import threading
|
||||||
|
import structlog
|
||||||
|
import select
|
||||||
|
from queue import Queue
|
||||||
|
from socket_interface_commands import SocketCommandHandler
|
||||||
|
|
||||||
|
|
||||||
|
class CommandSocket(socketserver.BaseRequestHandler):
|
||||||
|
#def __init__(self, request, client_address, server):
|
||||||
|
def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None):
|
||||||
|
self.state_manager = state_manager
|
||||||
|
self.event_manager = event_manager
|
||||||
|
self.config_manager = config_manager
|
||||||
|
self.modem = modem
|
||||||
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
|
|
||||||
|
self.command_handler = SocketCommandHandler(request, self.modem, self.config_manager, self.state_manager, self.event_manager)
|
||||||
|
|
||||||
|
self.handlers = {
|
||||||
|
'CONNECT': self.command_handler.handle_connect,
|
||||||
|
'DISCONNECT': self.command_handler.handle_disconnect,
|
||||||
|
'MYCALL': self.command_handler.handle_mycall,
|
||||||
|
'BW': self.command_handler.handle_bw,
|
||||||
|
'ABORT': self.command_handler.handle_abort,
|
||||||
|
'PUBLIC': self.command_handler.handle_public,
|
||||||
|
'CWID': self.command_handler.handle_cwid,
|
||||||
|
'LISTEN': self.command_handler.handle_listen,
|
||||||
|
'COMPRESSION': self.command_handler.handle_compression,
|
||||||
|
'WINLINK SESSION': self.command_handler.handle_winlink_session,
|
||||||
|
}
|
||||||
|
super().__init__(request, client_address, server)
|
||||||
|
|
||||||
|
def log(self, message, isWarning = False):
|
||||||
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
|
def handle(self):
|
||||||
|
self.log(f"Client connected: {self.client_address}")
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
data = self.request.recv(1024).strip()
|
||||||
|
if not data:
|
||||||
|
break
|
||||||
|
decoded_data = data.decode()
|
||||||
|
self.log(f"Command received from {self.client_address}: {decoded_data}")
|
||||||
|
self.parse_command(decoded_data)
|
||||||
|
finally:
|
||||||
|
self.log(f"Command connection closed with {self.client_address}")
|
||||||
|
|
||||||
|
def parse_command(self, data):
|
||||||
|
for command in self.handlers:
|
||||||
|
if data.startswith(command):
|
||||||
|
# Extract command arguments after the command itself
|
||||||
|
args = data[len(command):].strip().split()
|
||||||
|
self.dispatch_command(command, args)
|
||||||
|
return
|
||||||
|
self.send_response("ERROR: Unknown command\r\n")
|
||||||
|
|
||||||
|
def dispatch_command(self, command, data):
|
||||||
|
if command in self.handlers:
|
||||||
|
handler = self.handlers[command]
|
||||||
|
handler(data)
|
||||||
|
else:
|
||||||
|
self.send_response(f"Unknown command: {command}")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class DataSocket(socketserver.BaseRequestHandler):
|
||||||
|
#def __init__(self, request, client_address, server):
|
||||||
|
def __init__(self, request, client_address, server, modem=None, state_manager=None, event_manager=None, config_manager=None):
|
||||||
|
self.state_manager = state_manager
|
||||||
|
self.event_manager = event_manager
|
||||||
|
self.config_manager = config_manager
|
||||||
|
self.modem = modem
|
||||||
|
|
||||||
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
|
|
||||||
|
super().__init__(request, client_address, server)
|
||||||
|
|
||||||
|
def log(self, message, isWarning = False):
|
||||||
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
|
def handle(self):
|
||||||
|
self.log(f"Data connection established with {self.client_address}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
|
||||||
|
ready_to_read, _, _ = select.select([self.request], [], [], 1) # 1-second timeout
|
||||||
|
if ready_to_read:
|
||||||
|
self.data = self.request.recv(1024).strip()
|
||||||
|
if not self.data:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data.decode()}")
|
||||||
|
except Exception:
|
||||||
|
self.log(f"Data received from {self.client_address}: [{len(self.data)}] - {self.data}")
|
||||||
|
|
||||||
|
for session in self.state_manager.p2p_connection_sessions:
|
||||||
|
print(f"sessions: {session}")
|
||||||
|
session.p2p_data_tx_queue.put(self.data)
|
||||||
|
|
||||||
|
# Check if there's something to send from the queue, without blocking
|
||||||
|
|
||||||
|
for session_id in self.state_manager.p2p_connection_sessions:
|
||||||
|
session = self.state_manager.get_p2p_connection_session(session_id)
|
||||||
|
if not session.p2p_data_tx_queue.empty():
|
||||||
|
data_to_send = session.p2p_data_tx_queue.get_nowait() # Use get_nowait to avoid blocking
|
||||||
|
self.request.sendall(data_to_send)
|
||||||
|
self.log(f"Sent data to {self.client_address}")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
self.log(f"Data connection closed with {self.client_address}")
|
||||||
|
|
||||||
|
|
||||||
|
#class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||||
|
# allow_reuse_address = True
|
||||||
|
|
||||||
|
|
||||||
|
class CustomThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||||
|
allow_reuse_address = True
|
||||||
|
|
||||||
|
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, **kwargs):
|
||||||
|
self.extra_args = kwargs
|
||||||
|
super().__init__(server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
|
||||||
|
|
||||||
|
def finish_request(self, request, client_address):
|
||||||
|
self.RequestHandlerClass(request, client_address, self, **self.extra_args)
|
||||||
|
|
||||||
|
class SocketInterfaceHandler:
|
||||||
|
def __init__(self, modem, config_manager, state_manager, event_manager):
|
||||||
|
self.modem = modem
|
||||||
|
self.config_manager = config_manager
|
||||||
|
self.config = self.config_manager.read()
|
||||||
|
self.state_manager = state_manager
|
||||||
|
self.event_manager = event_manager
|
||||||
|
self.logger = structlog.get_logger(type(self).__name__)
|
||||||
|
self.command_port = self.config["SOCKET_INTERFACE"]["cmd_port"]
|
||||||
|
self.data_port = self.config["SOCKET_INTERFACE"]["data_port"]
|
||||||
|
self.command_server = None
|
||||||
|
self.data_server = None
|
||||||
|
self.command_server_thread = None
|
||||||
|
self.data_server_thread = None
|
||||||
|
|
||||||
|
def log(self, message, isWarning = False):
|
||||||
|
msg = f"[{type(self).__name__}]: {message}"
|
||||||
|
logger = self.logger.warn if isWarning else self.logger.info
|
||||||
|
logger(msg)
|
||||||
|
|
||||||
|
def start_servers(self):
|
||||||
|
# Method to start both command and data server threads
|
||||||
|
self.command_server_thread = threading.Thread(target=self.run_server, args=(self.command_port, CommandSocket))
|
||||||
|
self.data_server_thread = threading.Thread(target=self.run_server, args=(self.data_port, DataSocket))
|
||||||
|
|
||||||
|
self.command_server_thread.start()
|
||||||
|
self.data_server_thread.start()
|
||||||
|
|
||||||
|
self.log(f"Interfaces started")
|
||||||
|
|
||||||
|
def run_server(self, port, handler):
|
||||||
|
with CustomThreadedTCPServer(('127.0.0.1', port), handler, modem=self.modem, state_manager=self.state_manager, event_manager=self.event_manager, config_manager=self.config_manager) as server:
|
||||||
|
self.log(f"Server started on port {port}")
|
||||||
|
if port == self.command_port:
|
||||||
|
self.command_server = server
|
||||||
|
else:
|
||||||
|
self.data_server = server
|
||||||
|
server.serve_forever()
|
||||||
|
|
||||||
|
def stop_servers(self):
|
||||||
|
# Gracefully shutdown the server
|
||||||
|
if self.command_server:
|
||||||
|
self.command_server.shutdown()
|
||||||
|
if self.data_server:
|
||||||
|
self.data_server.shutdown()
|
||||||
|
self.log(f"Interfaces stopped")
|
||||||
|
|
||||||
|
def wait_for_server_threads(self):
|
||||||
|
# Wait for both server threads to finish
|
||||||
|
self.command_server_thread.join()
|
||||||
|
self.data_server_thread.join()
|
75
modem/socket_interface_commands.py
Normal file
75
modem/socket_interface_commands.py
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
""" WORK IN PROGRESS by DJ2LS"""
|
||||||
|
from command_p2p_connection import P2PConnectionCommand
|
||||||
|
|
||||||
|
class SocketCommandHandler:
|
||||||
|
|
||||||
|
def __init__(self, cmd_request, modem, config_manager, state_manager, event_manager):
|
||||||
|
self.cmd_request = cmd_request
|
||||||
|
self.modem = modem
|
||||||
|
self.config_manager = config_manager
|
||||||
|
self.state_manager = state_manager
|
||||||
|
self.event_manager = event_manager
|
||||||
|
|
||||||
|
self.session = None
|
||||||
|
|
||||||
|
def send_response(self, message):
|
||||||
|
full_message = f"{message}\r\n"
|
||||||
|
self.cmd_request.sendall(full_message.encode())
|
||||||
|
|
||||||
|
def handle_connect(self, data):
|
||||||
|
# Your existing connect logic
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
params = {
|
||||||
|
'origin': data[0],
|
||||||
|
'destination': data[1],
|
||||||
|
}
|
||||||
|
cmd = P2PConnectionCommand(self.config_manager.read(), self.state_manager, self.event_manager, params, self)
|
||||||
|
self.session = cmd.run(self.event_manager.queues, self.modem)
|
||||||
|
if self.session.session_id:
|
||||||
|
self.state_manager.register_p2p_connection_session(self.session)
|
||||||
|
self.session.connect()
|
||||||
|
|
||||||
|
|
||||||
|
def handle_disconnect(self, data):
|
||||||
|
# Your existing connect logic
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_mycall(self, data):
|
||||||
|
# Logic for handling MYCALL command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_bw(self, data):
|
||||||
|
# Logic for handling BW command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_abort(self, data):
|
||||||
|
# Logic for handling ABORT command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_public(self, data):
|
||||||
|
# Logic for handling PUBLIC command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_cwid(self, data):
|
||||||
|
# Logic for handling CWID command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_listen(self, data):
|
||||||
|
# Logic for handling LISTEN command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_compression(self, data):
|
||||||
|
# Logic for handling COMPRESSION command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def handle_winlink_session(self, data):
|
||||||
|
# Logic for handling WINLINK SESSION command
|
||||||
|
self.send_response("OK")
|
||||||
|
|
||||||
|
def socket_respond_disconnected(self):
|
||||||
|
self.send_response("DISCONNECTED")
|
||||||
|
|
||||||
|
def socket_respond_connected(self, mycall, dxcall, bandwidth):
|
||||||
|
message = f"CONNECTED {mycall} {dxcall} {bandwidth}"
|
||||||
|
self.send_response(message)
|
|
@ -38,6 +38,8 @@ class StateManager:
|
||||||
self.arq_iss_sessions = {}
|
self.arq_iss_sessions = {}
|
||||||
self.arq_irs_sessions = {}
|
self.arq_irs_sessions = {}
|
||||||
|
|
||||||
|
self.p2p_connection_sessions = {}
|
||||||
|
|
||||||
#self.mesh_routing_table = []
|
#self.mesh_routing_table = []
|
||||||
|
|
||||||
self.radio_frequency = 0
|
self.radio_frequency = 0
|
||||||
|
@ -214,3 +216,15 @@ class StateManager:
|
||||||
"radio_rf_level": self.radio_rf_level,
|
"radio_rf_level": self.radio_rf_level,
|
||||||
"s_meter_strength": self.s_meter_strength,
|
"s_meter_strength": self.s_meter_strength,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def register_p2p_connection_session(self, session):
|
||||||
|
if session.session_id in self.p2p_connection_sessions:
|
||||||
|
print("session already registered...")
|
||||||
|
return False
|
||||||
|
self.p2p_connection_sessions[session.session_id] = session
|
||||||
|
return True
|
||||||
|
|
||||||
|
def get_p2p_connection_session(self, id):
|
||||||
|
if id not in self.p2p_connection_sessions:
|
||||||
|
pass
|
||||||
|
return self.p2p_connection_sessions[id]
|
|
@ -221,7 +221,9 @@ class TestARQSession(unittest.TestCase):
|
||||||
session = arq_session_irs.ARQSessionIRS(self.config,
|
session = arq_session_irs.ARQSessionIRS(self.config,
|
||||||
self.irs_modem,
|
self.irs_modem,
|
||||||
'AA1AAA-1',
|
'AA1AAA-1',
|
||||||
random.randint(0, 255))
|
random.randint(0, 255),
|
||||||
|
self.irs_state_manager
|
||||||
|
)
|
||||||
self.irs_state_manager.register_arq_irs_session(session)
|
self.irs_state_manager.register_arq_irs_session(session)
|
||||||
for session_id in self.irs_state_manager.arq_irs_sessions:
|
for session_id in self.irs_state_manager.arq_irs_sessions:
|
||||||
session = self.irs_state_manager.arq_irs_sessions[session_id]
|
session = self.irs_state_manager.arq_irs_sessions[session_id]
|
||||||
|
|
197
tests/test_p2p_connection.py
Normal file
197
tests/test_p2p_connection.py
Normal file
|
@ -0,0 +1,197 @@
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
sys.path.append('modem')
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import unittest.mock
|
||||||
|
from config import CONFIG
|
||||||
|
import helpers
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
import base64
|
||||||
|
from command_p2p_connection import P2PConnectionCommand
|
||||||
|
from state_manager import StateManager
|
||||||
|
from frame_dispatcher import DISPATCHER
|
||||||
|
import random
|
||||||
|
import structlog
|
||||||
|
import numpy as np
|
||||||
|
from event_manager import EventManager
|
||||||
|
from state_manager import StateManager
|
||||||
|
from data_frame_factory import DataFrameFactory
|
||||||
|
import codec2
|
||||||
|
import p2p_connection
|
||||||
|
|
||||||
|
from socket_interface_commands import SocketCommandHandler
|
||||||
|
|
||||||
|
class TestModem:
|
||||||
|
def __init__(self, event_q, state_q):
|
||||||
|
self.data_queue_received = queue.Queue()
|
||||||
|
self.demodulator = unittest.mock.Mock()
|
||||||
|
self.event_manager = EventManager([event_q])
|
||||||
|
self.logger = structlog.get_logger('Modem')
|
||||||
|
self.states = StateManager(state_q)
|
||||||
|
|
||||||
|
def getFrameTransmissionTime(self, mode):
|
||||||
|
samples = 0
|
||||||
|
c2instance = codec2.open_instance(mode.value)
|
||||||
|
samples += codec2.api.freedv_get_n_tx_preamble_modem_samples(c2instance)
|
||||||
|
samples += codec2.api.freedv_get_n_tx_modem_samples(c2instance)
|
||||||
|
samples += codec2.api.freedv_get_n_tx_postamble_modem_samples(c2instance)
|
||||||
|
time = samples / 8000
|
||||||
|
return time
|
||||||
|
|
||||||
|
def transmit(self, mode, repeats: int, repeat_delay: int, frames: bytearray) -> bool:
|
||||||
|
# Simulate transmission time
|
||||||
|
tx_time = self.getFrameTransmissionTime(mode) + 0.1 # PTT
|
||||||
|
self.logger.info(f"TX {tx_time} seconds...")
|
||||||
|
threading.Event().wait(tx_time)
|
||||||
|
|
||||||
|
transmission = {
|
||||||
|
'mode': mode,
|
||||||
|
'bytes': frames,
|
||||||
|
}
|
||||||
|
self.data_queue_received.put(transmission)
|
||||||
|
|
||||||
|
|
||||||
|
class TestP2PConnectionSession(unittest.TestCase):
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
config_manager = CONFIG('modem/config.ini.example')
|
||||||
|
cls.config = config_manager.read()
|
||||||
|
cls.logger = structlog.get_logger("TESTS")
|
||||||
|
cls.frame_factory = DataFrameFactory(cls.config)
|
||||||
|
|
||||||
|
# ISS
|
||||||
|
cls.iss_config_manager = config_manager
|
||||||
|
cls.iss_state_manager = StateManager(queue.Queue())
|
||||||
|
cls.iss_event_manager = EventManager([queue.Queue()])
|
||||||
|
cls.iss_event_queue = queue.Queue()
|
||||||
|
cls.iss_state_queue = queue.Queue()
|
||||||
|
cls.iss_p2p_data_queue = queue.Queue()
|
||||||
|
|
||||||
|
|
||||||
|
cls.iss_modem = TestModem(cls.iss_event_queue, cls.iss_state_queue)
|
||||||
|
cls.iss_frame_dispatcher = DISPATCHER(cls.config,
|
||||||
|
cls.iss_event_manager,
|
||||||
|
cls.iss_state_manager,
|
||||||
|
cls.iss_modem)
|
||||||
|
|
||||||
|
#cls.iss_socket_interface_handler = SocketInterfaceHandler(cls.iss_modem, cls.iss_config_manager, cls.iss_state_manager, cls.iss_event_manager)
|
||||||
|
#cls.iss_socket_command_handler = CommandSocket(TestSocket(), '127.0.0.1', 51234)
|
||||||
|
|
||||||
|
# IRS
|
||||||
|
cls.irs_state_manager = StateManager(queue.Queue())
|
||||||
|
cls.irs_event_manager = EventManager([queue.Queue()])
|
||||||
|
cls.irs_event_queue = queue.Queue()
|
||||||
|
cls.irs_state_queue = queue.Queue()
|
||||||
|
cls.irs_p2p_data_queue = queue.Queue()
|
||||||
|
cls.irs_modem = TestModem(cls.irs_event_queue, cls.irs_state_queue)
|
||||||
|
cls.irs_frame_dispatcher = DISPATCHER(cls.config,
|
||||||
|
cls.irs_event_manager,
|
||||||
|
cls.irs_state_manager,
|
||||||
|
cls.irs_modem)
|
||||||
|
|
||||||
|
# Frame loss probability in %
|
||||||
|
cls.loss_probability = 30
|
||||||
|
|
||||||
|
cls.channels_running = True
|
||||||
|
|
||||||
|
cls.disconnect_received = False
|
||||||
|
|
||||||
|
def channelWorker(self, modem_transmit_queue: queue.Queue, frame_dispatcher: DISPATCHER):
|
||||||
|
while self.channels_running:
|
||||||
|
# Transfer data between both parties
|
||||||
|
try:
|
||||||
|
transmission = modem_transmit_queue.get(timeout=1)
|
||||||
|
if random.randint(0, 100) < self.loss_probability:
|
||||||
|
self.logger.info(f"[{threading.current_thread().name}] Frame lost...")
|
||||||
|
continue
|
||||||
|
|
||||||
|
frame_bytes = transmission['bytes']
|
||||||
|
frame_dispatcher.new_process_data(frame_bytes, None, len(frame_bytes), 0, 0)
|
||||||
|
except queue.Empty:
|
||||||
|
continue
|
||||||
|
self.logger.info(f"[{threading.current_thread().name}] Channel closed.")
|
||||||
|
|
||||||
|
def waitForSession(self, q, outbound=False):
|
||||||
|
while True and self.channels_running:
|
||||||
|
ev = q.get()
|
||||||
|
print(ev)
|
||||||
|
if 'P2P_CONNECTION_DISCONNECT_ACK' in ev or self.disconnect_received:
|
||||||
|
self.logger.info(f"[{threading.current_thread().name}] session ended.")
|
||||||
|
break
|
||||||
|
|
||||||
|
def establishChannels(self):
|
||||||
|
self.channels_running = True
|
||||||
|
self.iss_to_irs_channel = threading.Thread(target=self.channelWorker,
|
||||||
|
args=[self.iss_modem.data_queue_received,
|
||||||
|
self.irs_frame_dispatcher],
|
||||||
|
name="ISS to IRS channel")
|
||||||
|
self.iss_to_irs_channel.start()
|
||||||
|
|
||||||
|
self.irs_to_iss_channel = threading.Thread(target=self.channelWorker,
|
||||||
|
args=[self.irs_modem.data_queue_received,
|
||||||
|
self.iss_frame_dispatcher],
|
||||||
|
name="IRS to ISS channel")
|
||||||
|
self.irs_to_iss_channel.start()
|
||||||
|
|
||||||
|
def waitAndCloseChannels(self):
|
||||||
|
self.waitForSession(self.iss_event_queue, True)
|
||||||
|
self.channels_running = False
|
||||||
|
self.waitForSession(self.irs_event_queue, False)
|
||||||
|
self.channels_running = False
|
||||||
|
|
||||||
|
def generate_random_string(self, min_length, max_length):
|
||||||
|
import string
|
||||||
|
length = random.randint(min_length, max_length)
|
||||||
|
return ''.join(random.choices(string.ascii_letters, k=length))#
|
||||||
|
|
||||||
|
def DisabledtestARQSessionSmallPayload(self):
|
||||||
|
# set Packet Error Rate (PER) / frame loss probability
|
||||||
|
self.loss_probability = 0
|
||||||
|
|
||||||
|
self.establishChannels()
|
||||||
|
|
||||||
|
handler = SocketCommandHandler(TestSocket(self), self.iss_modem, self.iss_config_manager, self.iss_state_manager, self.iss_event_manager)
|
||||||
|
handler.handle_connect(["AA1AAA-1", "BB2BBB-2"])
|
||||||
|
|
||||||
|
self.connected_event = threading.Event()
|
||||||
|
self.connected_event.wait()
|
||||||
|
|
||||||
|
for session_id in self.iss_state_manager.p2p_connection_sessions:
|
||||||
|
session = self.iss_state_manager.get_p2p_connection_session(session_id)
|
||||||
|
session.ENTIRE_CONNECTION_TIMEOUT = 15
|
||||||
|
# Generate and add 5 random entries to the queue
|
||||||
|
for _ in range(3):
|
||||||
|
min_length = (30 * _ ) + 1
|
||||||
|
max_length = (30 * _ ) + 1
|
||||||
|
print(min_length)
|
||||||
|
print(max_length)
|
||||||
|
random_entry = self.generate_random_string(min_length, max_length)
|
||||||
|
session.p2p_data_tx_queue.put(random_entry)
|
||||||
|
|
||||||
|
session.p2p_data_tx_queue.put('12345')
|
||||||
|
self.waitAndCloseChannels()
|
||||||
|
|
||||||
|
|
||||||
|
class TestSocket:
|
||||||
|
def __init__(self, test_class):
|
||||||
|
self.sent_data = [] # To capture data sent through this socket
|
||||||
|
self.test_class = test_class
|
||||||
|
def sendall(self, data):
|
||||||
|
print(f"Mock sendall called with data: {data}")
|
||||||
|
self.sent_data.append(data)
|
||||||
|
self.event_handler(data)
|
||||||
|
|
||||||
|
def event_handler(self, data):
|
||||||
|
if b'CONNECTED AA1AAA-1 BB2BBB-2 0\r\n' in self.sent_data:
|
||||||
|
self.test_class.connected_event.set()
|
||||||
|
|
||||||
|
if b'DISCONNECTED\r\n' in self.sent_data:
|
||||||
|
self.disconnect_received = True
|
||||||
|
self.test_class.assertEqual(b'DISCONNECTED\r\n', b'DISCONNECTED\r\n')
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
Loading…
Reference in a new issue