preparing for small test with dummy data

This commit is contained in:
DJ2LS 2023-03-29 08:22:12 +02:00
parent 790068d2d1
commit d0f6038dbc
2 changed files with 51 additions and 4 deletions

View file

@ -334,8 +334,6 @@ class RF:
while True:
threading.Event().wait(0.01)
# -----write
if len(self.modoutqueue) > 0 and not self.mod_out_locked:
data_out = self.modoutqueue.popleft()
self.tci_module.push_audio(data_out)

View file

@ -4,6 +4,7 @@
import structlog
import threading
import websocket
import numpy as np
from queues import AUDIO_TRANSMIT_QUEUE, AUDIO_RECEIVED_QUEUE
"""
@ -35,6 +36,11 @@ class TCI:
# flag if we're receiving a tx_chrono
self.tx_chrono = False
# audio related parameters, will be updated by tx chrono
self.sample_rate = None
self.format = None
self.codec = None
self.audio_length = None
def connect(self):
@ -66,10 +72,30 @@ class TCI:
# tx chrono frame
if len(message) in {64}:
receiver = message[:4]
sample_rate = int.from_bytes(message[4:8], "little")
format = int.from_bytes(message[8:12], "little")
codec = int.from_bytes(message[12:16], "little")
crc = int.from_bytes(message[16:20], "little")
audio_length = int.from_bytes(message[20:24], "little")
type = int.from_bytes(message[24:28], "little")
channel = int.from_bytes(message[28:32], "little")
reserved1 = int.from_bytes(message[32:36], "little")
reserved2 = int.from_bytes(message[36:40], "little")
reserved3 = int.from_bytes(message[40:44], "little")
reserved4 = int.from_bytes(message[44:48], "little")
reserved5 = int.from_bytes(message[48:52], "little")
reserved6 = int.from_bytes(message[52:56], "little")
reserved7 = int.from_bytes(message[56:60], "little")
reserved8 = int.from_bytes(message[60:64], "little")
if type == 3:
self.tx_chrono = True
self.sample_rate = sample_rate
self.format = format
self.codec = codec
self.audio_length = audio_length
# audio frame
if len(message) in {576, 2464, 4160}:
# audio received
@ -114,8 +140,8 @@ class TCI:
)
def push_audio(self, data_out):
print(data_out)
audio = bytearray(4096 + 64)
"""
# audio[:4] = receiver.to_bytes(4,byteorder='little', signed=False)
audio[4:8] = sample_rate.to_bytes(4, byteorder='little', signed=False)
@ -134,7 +160,30 @@ class TCI:
audio[56:60] = reserved7.to_bytes(4, byteorder='little', signed=False)
audio[60:64] = reserved8.to_bytes(4, byteorder='little', signed=False)
"""
self.ws.send(audio, websocket.ABNF.OPCODE_BINARY)
print(self.audio_length)
print(self.tx_chrono)
print(self.format)
print(self.codec)
if self.tx_chrono:
# dummy for now ...
audio = bytearray(4096 + 64)
# generate sine wave
rate = 8000 # samples per second
T = 3 # sample duration (seconds)
# n = int(rate*T) # number of samples
n = 1200
t = np.arange(n) / rate # grid of time values
f = 440.0 # sound frequency (Hz)
x = np.sin(2 * np.pi * f * t)
# print(len(x))
audio[64:] = bytes(x)
audio[24:28] = int(2).to_bytes(4, byteorder='little', signed=True)
self.ws.send(audio, websocket.ABNF.OPCODE_BINARY)
def set_ptt(self, state):
if state: