first tci rx audio stream

This commit is contained in:
DJ2LS 2023-02-21 11:57:14 +01:00
parent 4c676baab4
commit 755e5efec1
3 changed files with 98 additions and 206 deletions

View file

@ -26,7 +26,7 @@ import static
import structlog import structlog
import ujson as json import ujson as json
import tci import tci
from queues import DATA_QUEUE_RECEIVED, MODEM_RECEIVED_QUEUE, MODEM_TRANSMIT_QUEUE, RIGCTLD_COMMAND_QUEUE from queues import DATA_QUEUE_RECEIVED, MODEM_RECEIVED_QUEUE, MODEM_TRANSMIT_QUEUE, RIGCTLD_COMMAND_QUEUE, AUDIO_RECEIVED_QUEUE, AUDIO_TRANSMIT_QUEUE
TESTMODE = False TESTMODE = False
RXCHANNEL = "" RXCHANNEL = ""
@ -85,6 +85,10 @@ class RF:
self.modem_transmit_queue = MODEM_TRANSMIT_QUEUE self.modem_transmit_queue = MODEM_TRANSMIT_QUEUE
self.modem_received_queue = MODEM_RECEIVED_QUEUE self.modem_received_queue = MODEM_RECEIVED_QUEUE
self.audio_received_queue = AUDIO_RECEIVED_QUEUE
self.audio_transmit_queue = AUDIO_TRANSMIT_QUEUE
# Init FIFO queue to store modulation out in # Init FIFO queue to store modulation out in
self.modoutqueue = deque() self.modoutqueue = deque()
@ -190,14 +194,15 @@ class RF:
# lets init TCI module # lets init TCI module
self.tci_module = tci.TCI() self.tci_module = tci.TCI()
# lets open TCI radio # lets open TCI radio
self.tci_module.open_rig(static.TCI_IP, static.TCI_PORT) #self.tci_module.open_rig(static.TCI_IP, static.TCI_PORT)
# lets init TCI audio # lets init TCI audio
self.tci_module.init_audio() #self.tci_module.init_audio()
# let's start the audio rx callback # let's start the audio rx callback
self.log.debug("[MDM] Starting tci rx callback thread") #self.log.debug("[MDM] Starting tci rx callback thread")
tci_rx_callback_thread = threading.Thread( tci_rx_callback_thread = threading.Thread(
target=self.tci_rx_callback, target=self.tci_rx_callback,
name="TCI RX CALLBACK THREAD", name="TCI RX CALLBACK THREAD",
@ -337,7 +342,6 @@ class RF:
data_out48k = self.modoutqueue.popleft() data_out48k = self.modoutqueue.popleft()
self.tci_module.push_audio(data_out48k) self.tci_module.push_audio(data_out48k)
def tci_rx_callback(self) -> None: def tci_rx_callback(self) -> None:
""" """
Callback for TCI RX Callback for TCI RX
@ -350,10 +354,24 @@ class RF:
threading.Event().wait(0.01) threading.Event().wait(0.01)
#print(self.tci_module.get_audio()) #print(self.tci_module.get_audio())
data_in48k = self.tci_module.get_audio() #data_in48k = self.tci_module.get_audio()
print(data_in48k)
x = np.frombuffer(data_in48k, dtype=np.int16) #x = np.frombuffer(self.audio_received_queue.get(), dtype=np.int16)
# x = self.resampler.resample48_to_8(x) """
if not self.audio_received_queue.empty():
x = self.audio_received_queue.get()
x = np.frombuffer(x, dtype=np.int16)
print(x)
print(len(x))
else:
#x = bytes([0]) * 9600
x = np.random.uniform(-1, 1, 2400)
x = np.frombuffer(x, dtype=np.int16)
print("dummy data")
"""
x = self.audio_received_queue.get()
x = np.frombuffer(x, dtype=np.int16)
#x = self.resampler.resample48_to_8(x)
self.fft_data = x self.fft_data = x

View file

@ -11,6 +11,10 @@ DATA_QUEUE_RECEIVED = queue.Queue()
MODEM_RECEIVED_QUEUE = queue.Queue() MODEM_RECEIVED_QUEUE = queue.Queue()
MODEM_TRANSMIT_QUEUE = queue.Queue() MODEM_TRANSMIT_QUEUE = queue.Queue()
# Initialize FIFO queue to store audio frames
AUDIO_RECEIVED_QUEUE = queue.Queue()
AUDIO_TRANSMIT_QUEUE = queue.Queue()
# Initialize FIFO queue to finally store received data # Initialize FIFO queue to finally store received data
RX_BUFFER = queue.Queue(maxsize=static.RX_BUFFER_SIZE) RX_BUFFER = queue.Queue(maxsize=static.RX_BUFFER_SIZE)

View file

@ -9,218 +9,88 @@ import structlog
import threading import threading
import static import static
import numpy as np import numpy as np
import websocket
import _thread
import time
import rel
from queues import AUDIO_TRANSMIT_QUEUE, AUDIO_RECEIVED_QUEUE
class TCI: class TCI:
"""TCI (hamlib) communication class""" def __init__(self, hostname='127.0.0.1', port=50001):
# websocket.enableTrace(True)
self.log = structlog.get_logger("TCI")
log = structlog.get_logger("radio (TCI)") self.audio_received_queue = AUDIO_RECEIVED_QUEUE
self.audio_transmit_queue = AUDIO_TRANSMIT_QUEUE
def __init__(self, hostname="localhost", port=9000, poll_rate=5, timeout=5): self.hostname = '127.0.0.1'
"""Open a connection to TCI, and test it for validity""" self.port = 50001
self.ptt_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.data_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ptt_connected = False self.ws = ''
self.data_connected = False
self.hostname = hostname
self.port = port
self.connection_attempts = 5
# class wide variable for some parameters tci_thread = threading.Thread(
self.bandwidth = '' target=self.connect,
self.frequency = '' name="TCI THREAD",
self.mode = '' daemon=True,
self.alc = ''
self.strength = ''
self.rf = ''
def open_rig(
self,
tci_ip,
tci_port
):
"""
Args:
tci_ip:
tci_port:
Returns:
"""
self.hostname = tci_ip
self.port = int(tci_port)
# _ptt_connect = self.ptt_connect()
# _data_connect = self.data_connect()
ptt_thread = threading.Thread(target=self.ptt_connect, args=[], daemon=True)
ptt_thread.start()
data_thread = threading.Thread(target=self.data_connect, args=[], daemon=True)
data_thread.start()
# wait some time
threading.Event().wait(0.5)
if self.ptt_connected and self.data_connected:
self.log.debug("Rigctl DATA/PTT initialized")
return True
self.log.error(
"[TCI] Can't connect!", ip=self.hostname, port=self.port
) )
return False tci_thread.start()
def ptt_connect(self):
"""Connect to TCI instance"""
while True:
if not self.ptt_connected:
try:
self.ptt_connection = socket.create_connection((self.hostname, self.port))
self.ptt_connected = True
self.log.info(
"[TCI] Connected PTT instance to TCI!", ip=self.hostname, port=self.port
)
except Exception as err:
# ConnectionRefusedError: [Errno 111] Connection refused
self.close_rig()
self.log.warning(
"[TCI] PTT Reconnect...",
ip=self.hostname,
port=self.port,
e=err,
)
threading.Event().wait(0.5)
def data_connect(self):
"""Connect to TCI instance"""
while True:
if not self.data_connected:
try:
self.data_connection = socket.create_connection((self.hostname, self.port))
self.data_connected = True
self.log.info(
"[TCI] Connected DATA instance to TCI!", ip=self.hostname, port=self.port
)
except Exception as err:
# ConnectionRefusedError: [Errno 111] Connection refused
self.close_rig()
self.log.warning(
"[TCI] DATA Reconnect...",
ip=self.hostname,
port=self.port,
e=err,
)
threading.Event().wait(0.5)
def close_rig(self):
""" """
self.ptt_sock.close()
self.data_sock.close()
self.ptt_connected = False
self.data_connected = False
def send_ptt_command(self, command, expect_answer) -> bytes:
"""Send a command to the connected rotctld instance,
and return the return value.
Args:
command:
"""
if self.ptt_connected:
try:
self.ptt_connection.sendall(command)
except Exception:
self.log.warning(
"[TCI] Command not executed!",
command=command,
ip=self.hostname,
port=self.port,
)
self.ptt_connected = False
return b""
def send_data_command(self, command, expect_answer) -> bytes:
"""Send a command to the connected tci instance,
and return the return value.
Args:
command:
"""
if self.data_connected:
self.data_connection.setblocking(False)
self.data_connection.settimeout(0.05)
try:
self.data_connection.sendall(command)
except Exception:
self.log.warning(
"[TCI] Command not executed!",
command=command,
ip=self.hostname,
port=self.port,
)
self.data_connected = False
try: def connect(self):
# recv seems to be blocking so in case of ptt we don't need the response print("starting..............")
# maybe this speeds things up and avoids blocking states self.ws = websocket.WebSocketApp("ws://127.0.0.1:50001",
recv = True on_open=self.on_open,
data = b'' on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close
)
while recv: self.ws.run_forever(reconnect=5) # Set dispatcher to automatic reconnection, 5 second reconnect delay if con>
try: #rel.signal(2, rel.abort) # Keyboard Interrupt
#rel.dispatch()
data = self.data_connection.recv(64) def on_message(self, ws, message):
if message == "ready;":
self.ws.send('audio_samplerate:8000;')
self.ws.send('audio_stream_channels:1;')
self.ws.send('AUDIO_STREAM_SAMPLE_TYPE:int16;')
self.ws.send('AUDIO_STREAM_SAMPLES:1200;')
self.ws.send('audio_start:0;')
except socket.timeout: if len(message) == 576 or len(message) == 2464 or len(message) == 4160:
recv = False # audio received
receiver = message[:4]
sample_rate = int.from_bytes(message[4:8], "little")
format = int.from_bytes(message[8:12], "little")
codec = message[12:16]
crc = message[16:20]
audio_length = int.from_bytes(message[20:24], "little")
type = int.from_bytes(message[24:28], "little")
channel = int.from_bytes(message[28:32], "little")
reserved = int.from_bytes(message[32:36], "little")
audio_data = message[36+28:]
print(len(audio_data))
self.audio_received_queue.put(audio_data)
return data def on_error(self, error):
self.log.error(
"[TCI] Error FreeDATA to TCI rig!", ip=self.hostname, port=self.port, e=error
)
# return self.data_connection.recv(64) if expect_answer else True def on_close(self, ws, close_status_code, close_msg):
except Exception: self.log.warning(
self.log.warning( "[TCI] Closed FreeDATA to TCI connection!", ip=self.hostname, port=self.port, statu=close_status_code, msg=close_msg
"[TCI] No command response!", )
command=command,
ip=self.hostname,
port=self.port,
)
self.data_connected = False
return b""
def init_audio(self): def on_open(self, ws):
try: self.log.info(
self.send_data_command(b"IQ_SAMPLERATE:48000;", False) "[TCI] Connected FreeDATA to TCI rig!", ip=self.hostname, port=self.port
self.send_data_command(b"audio_samplerate:8;", False) )
self.send_data_command(b"audio_start: 0;", False)
return True
except Exception:
return False
def get_audio(self):
""""""
# generate random audio data
if not self.data_connected:
return np.random.uniform(-1, 1, 48000)
try:
return self.data_connection.recv(4800)
except Exception:
return False
def push_audio(self): self.log.info(
""" """ "[TCI] Init...", ip=self.hostname, port=self.port
try: )
return self.send_data_command(b"PUSH AUDIO COMMAND ", True)
except Exception:
return False