Merge pull request #89 from DJ2LS/dr-test

Test framework
This commit is contained in:
DJ2LS 2021-12-20 16:54:50 +01:00 committed by GitHub
commit e86d327a88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 2220 additions and 34 deletions

36
.github/workflows/ctest.yml vendored Normal file
View file

@ -0,0 +1,36 @@
name: CTest
on: [push]
jobs:
build:
# The CMake configure and build commands are platform agnostic and should work equally
# well on Windows or Mac. You can convert this to a matrix build if you need
# cross-platform coverage.
# See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install packages
shell: bash
run: |
sudo apt-get update
sudo apt-get install octave octave-common octave-signal sox python3 python3-pip portaudio19-dev python3-pyaudio
pip3 install psutil crcengine ujson pyserial numpy structlog miniaudio
- name: Build codec2
shell: bash
run: |
git clone https://github.com/drowe67/codec2.git
cd codec2 && git checkout dr-tnc && git pull
mkdir -p build_linux && cd build_linux && cmake .. && make
- name: run ctests
shell: bash
working-directory: ${{github.workspace}}
run: |
mkdir build && cd build
cmake -DCODEC2_BUILD_DIR=$GITHUB_WORKSPACE/codec2/build_linux ..
ctest --output-on-failure

6
.gitignore vendored Normal file
View file

@ -0,0 +1,6 @@
# possible installation of codec2 within tnc
tnc/codec2
# temporary test artifacts
**/build
**/Testing

103
CMakeLists.txt Normal file
View file

@ -0,0 +1,103 @@
cmake_minimum_required(VERSION 3.0)
project (FreeDATA)
include(CTest)
enable_testing()
# Find codec2
if(CODEC2_BUILD_DIR)
find_package(codec2 REQUIRED
PATHS ${CODEC2_BUILD_DIR}
NO_DEFAULT_PATH
CONFIGS codec2.cmake
)
if(codec2_FOUND)
message(STATUS "Codec2 library found in build tree.")
endif()
else()
find_package(codec2 REQUIRED)
endif()
# test variables
set(FRAMESPERBURST 3)
set(BURSTS 1)
set(TESTFRAMES 3)
add_test(NAME 000_resampler
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/000_resampler;
python3 t48_8_short.py")
set_tests_properties(000_resampler PROPERTIES PASS_REGULAR_EXPRESSION "PASS")
add_test(NAME 001_highsnr_stdio_P_C_SM
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
python3 test_tx.py --mode datac0 --delay 500 --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS} |
sox -t .s16 -r 48000 -c 1 - -t .s16 -r 8000 -c 1 - |
freedv_data_raw_rx datac0 - - --framesperburst ${FRAMESPERBURST} | hexdump -C")
set_tests_properties(001_highsnr_stdio_P_C_SM PROPERTIES PASS_REGULAR_EXPRESSION "HELLO WORLD")
add_test(NAME 001_highsnr_stdio_C_P_SM
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
freedv_data_raw_tx datac0 --testframes ${TESTFRAMES} --bursts ${BURSTS} --framesperburst ${FRAMESPERBURST} /dev/zero - |
sox -t .s16 -r 8000 -c 1 - -t .s16 -r 48000 -c 1 - |
python3 test_rx.py --mode datac0 --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS}")
set_tests_properties(001_highsnr_stdio_C_P_SM PROPERTIES PASS_REGULAR_EXPRESSION "RECEIVED BURSTS: ${BURSTS} RECEIVED FRAMES: ${FRAMESPERBURST}")
add_test(NAME 001_highsnr_stdio_P_P_SM
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
python3 test_tx.py --mode datac0 --delay 500 --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS} |
python3 test_rx.py --mode datac0 --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS}")
set_tests_properties(001_highsnr_stdio_P_P_SM PROPERTIES PASS_REGULAR_EXPRESSION "RECEIVED BURSTS: ${BURSTS} RECEIVED FRAMES: ${FRAMESPERBURST}")
add_test(NAME 001_highsnr_stdio_P_P_MM
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
python3 test_multimode_tx.py --delay 500 --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS} |
python3 test_multimode_rx.py --framesperburst ${FRAMESPERBURST} --bursts ${BURSTS} --timeout 20")
set_tests_properties(001_highsnr_stdio_P_P_MM PROPERTIES PASS_REGULAR_EXPRESSION "DATAC0: ${BURSTS}/${FRAMESPERBURST} DATAC1: ${BURSTS}/${FRAMESPERBURST} DATAC3: ${BURSTS}/${FRAMESPERBURST}")
# These tests can't run on GitHub actions
if(NOT DEFINED ENV{GITHUB_RUN_ID})
# uses aplay/arecord then pipe to Python
add_test(NAME 001_highsnr_virtual1_P_P
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
./test_virtual1.sh")
set_tests_properties(001_highsnr_virtual1_P_P PROPERTIES PASS_REGULAR_EXPRESSION "RECEIVED BURSTS: 5 RECEIVED FRAMES: 10 RX_ERRORS: 0")
# let Python do audio I/O
add_test(NAME 001_highsnr_virtual2_P_P
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
./test_virtual2.sh")
set_tests_properties(001_highsnr_virtual2_P_P PROPERTIES PASS_REGULAR_EXPRESSION "RECEIVED BURSTS: 3 RECEIVED FRAMES: 6 RX_ERRORS: 0")
# Multimode test with Python I/O
add_test(NAME 001_highsnr_virtual3_P_P_MM
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
./test_virtual_mm.sh")
set_tests_properties(001_highsnr_virtual3_P_P_MM PROPERTIES PASS_REGULAR_EXPRESSION "DATAC0: 2/4 DATAC1: 2/4 DATAC3: 2/4")
# let Python do audio I/O via pyaudio callback mode
add_test(NAME 001_highsnr_virtual4_P_P_SM_CB
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
PATH=$PATH:${CODEC2_BUILD_DIR}/src;
cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
./test_virtual3.sh")
set_tests_properties(001_highsnr_virtual4_P_P_SM_CB PROPERTIES PASS_REGULAR_EXPRESSION "RECEIVED BURSTS: 3 RECEIVED FRAMES: 6 RX_ERRORS: 0")
endif()

View file

@ -0,0 +1,100 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Unit test for FreeDV API resampler functions, from
# codec2/unittest/t48_8_short.c - generate a sine wave at 8 KHz,
# upsample to 48 kHz, add an interferer, then downsample back to 8 kHz
#
# You can listen to the output files with:
#
# aplay -f S16_LE in8.raw
# aplay -r 48000 -f S16_LE out48.raw
# aplay -f S16_LE out8.raw
#
# They should sound like clean sine waves
import ctypes
from ctypes import *
import pathlib
import argparse
import sys
sys.path.insert(0,'../..')
from tnc import codec2
import numpy as np
# dig some constants out
FDMDV_OS_48 = codec2.api.FDMDV_OS_48
FDMDV_OS_TAPS_48K = codec2.api.FDMDV_OS_TAPS_48K
FDMDV_OS_TAPS_48_8K = codec2.api.FDMDV_OS_TAPS_48_8K
N8 = int(180) # processing buffer size at 8 kHz
N48 = int(N8*FDMDV_OS_48) # processing buffer size at 48 kHz
MEM8 = FDMDV_OS_TAPS_48_8K # 8kHz signal filter memory
MEM48 = FDMDV_OS_TAPS_48K # 48kHz signal filter memory
FRAMES = int(50) # number of frames to test
FS8 = 8000
FS48 = 48000
AMP = 16000 # sine wave amplitude
FTEST8 = 800 # input test frequency at FS=8kHz
FINTER48 = 10000 # interferer frequency at FS=48kHz
# Due to the design of these resamplers, the processing buffer (at 8kHz)
# must be an integer multiple of oversampling ratio
assert N8 % FDMDV_OS_48 == 0
# time indexes, we advance every frame
t = 0
t1 = 0
# output files to listen to/evaluate result
fin8 = open("in8.raw", mode='wb')
f48 = open("out48.raw", mode='wb')
fout8 = open("out8.raw", mode='wb')
resampler = codec2.resampler()
for f in range(FRAMES):
sine_in8k = (AMP*np.cos(2*np.pi*np.arange(t,t+N8)*FTEST8/FS8)).astype(np.int16)
t += N8
sine_in8k.tofile(fin8)
sine_out48k = resampler.resample8_to_48(sine_in8k)
sine_out48k.tofile(f48)
# add interfering sine wave (down sampling filter should remove)
sine_in48k = (sine_out48k + (AMP/2)*np.cos(2*np.pi*np.arange(t1,t1+N48)*FINTER48/FS48)).astype(np.int16)
t1 += N48
sine_out8k = resampler.resample48_to_8(sine_in48k)
sine_out8k.tofile(fout8)
fin8.close()
f48.close()
fout8.close()
# Automated test evaluation --------------------------------------------
# The input and output signals will not be time aligned due to the filter
# delays, so compare the magnitude spectrum
in8k = np.fromfile("in8.raw", dtype=np.int16)
out8k = np.fromfile("out8.raw", dtype=np.int16)
assert len(in8k) == len(out8k)
n = len(in8k)
h = np.hanning(len(in8k))
S1 = np.abs(np.fft.fft(in8k * h))
S2 = np.abs(np.fft.fft(out8k * h))
error = S1-S2
error_energy = np.dot(error,error)
ratio = error_energy/np.dot(S1,S1)
ratio_dB = 10*np.log10(ratio);
print("ratio_dB: %4.2f" % (ratio_dB));
threshdB = -40
if ratio_dB < threshdB:
print("PASS")
else:
print("FAIL")

View file

@ -0,0 +1,70 @@
# 001_HIGHSNR_STDIO_AUDIO TEST SUITE
1. Install
```
sudo apt update
sudo apt upgrade
sudo apt install git cmake build-essential python3-pip portaudio19-dev python3-pyaudio
pip3 install crcengine
pip3 install threading
```
1. Install codec2, and set up the `libcodec2.so` shared library path, for example
```
export LD_LIBRARY_PATH=${HOME}/codec2/build_linux/src
```
## STDIO tests
Pipes are used to move audio samples from the Tx to Rx:
```
python3 test_tx.py --mode datac1 --delay 500 --frames 2 --bursts 1 | python3 test_rx.py --mode datac1 --frames 2 --bursts 1
```
## AUDIO test via virtual audio devices
1. Create virtual audio devices. Note: This command needs to be run again after every reboot
```
sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2
```
1. Check if devices have been created
```
aplay -l
Karte 0: Intel [HDA Intel], Gerät 0: Generic Analog [Generic Analog]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
```
1. Determine the audio device number you would like to use:
```
python3 test_rx.py --list
<snip>
audiodev: 0 HDA Intel PCH: ALC269VC Analog (hw:0,0)
audiodev: 1 HDA Intel PCH: HDMI 0 (hw:0,3)
audiodev: 2 HDA Intel PCH: HDMI 1 (hw:0,7)
audiodev: 3 HDA Intel PCH: HDMI 2 (hw:0,8)
audiodev: 4 Loopback: PCM (hw:1,0)
audiodev: 5 Loopback: PCM (hw:1,1)
audiodev: 6 Loopback: PCM (hw:2,0)
audiodev: 7 Loopback: PCM (hw:2,1)
```
In this case we choose audiodev 4 for the RX and 5 for the Tx.
1. Start the Rx first, then Tx in separate consoles:
```
python3 test_rx.py --mode datac0 --frames 2 --bursts 1 --audiodev 4 --debug
python3 test_tx.py --mode datac0 --frames 2 --bursts 1 --audiodev 5

View file

@ -0,0 +1,178 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import ctypes
from ctypes import *
import pathlib
import pyaudio
import sys
import logging
import time
import threading
import sys
import argparse
import numpy as np
sys.path.insert(0,'..')
import codec2
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='FreeDATA audio test')
parser.add_argument('--bursts', dest="N_BURSTS", default=1, type=int)
parser.add_argument('--framesperburst', dest="N_FRAMES_PER_BURST", default=1, type=int)
parser.add_argument('--mode', dest="FREEDV_MODE", type=str, choices=['datac0', 'datac1', 'datac3'])
parser.add_argument('--audiodev', dest="AUDIO_INPUT_DEVICE", default=-1, type=int,
help="audio device number to use, use -2 to automatically select a loopback device")
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
parser.add_argument('--timeout', dest="TIMEOUT", default=10, type=int, help="Timeout (seconds) before test ends")
parser.add_argument('--list', dest="LIST", action="store_true", help="list audio devices by number and exit")
args = parser.parse_args()
if args.LIST:
p = pyaudio.PyAudio()
for dev in range(0,p.get_device_count()):
print("audiodev: ", dev, p.get_device_info_by_index(dev)["name"])
quit()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT_DEVICE
MODE = codec2.FREEDV_MODE[args.FREEDV_MODE].value
DEBUGGING_MODE = args.DEBUGGING_MODE
TIMEOUT = args.TIMEOUT
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2400*2 # <- consider increasing if you get nread_exceptions > 0
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_RX = 48000
# make sure our resampler will work
assert (AUDIO_SAMPLE_RATE_RX / MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48
# ------------------------------------------------ PYAUDIO CALLBACK
def callback(data_in48k, frame_count, time_info, status):
x = np.frombuffer(data_in48k, dtype=np.int16)
x.tofile(frx)
x = resampler.resample48_to_8(x)
audio_buffer.push(x)
return (None, pyaudio.paContinue)
# check if we want to use an audio device then do an pyaudio init
if AUDIO_INPUT_DEVICE != -1:
p = pyaudio.PyAudio()
# auto search for loopback devices
if AUDIO_INPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,p.get_device_count()):
if 'Loopback: PCM' in p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_INPUT_DEVICE = loopback_list[0] #0 = RX 1 = TX
print(f"loopback_list rx: {loopback_list}", file=sys.stderr)
else:
quit()
print(f"AUDIO INPUT DEVICE: {AUDIO_INPUT_DEVICE} DEVICE: {p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['name']} \
AUDIO SAMPLE RATE: {AUDIO_SAMPLE_RATE_RX}", file=sys.stderr)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
output=False,
input_device_index=AUDIO_INPUT_DEVICE,
stream_callback=callback
)
try:
print(f"starting pyaudio callback", file=sys.stderr)
stream_rx.start_stream()
except Exception as e:
print(f"pyAudio error: {e}", file=sys.stderr)
# ----------------------------------------------------------------
# DATA CHANNEL INITIALISATION
# open codec2 instance
freedv = cast(codec2.api.freedv_open(MODE), c_void_p)
# get number of bytes per frame for mode
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv)/8)
payload_bytes_per_frame = bytes_per_frame -2
n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(freedv)
bytes_out = create_string_buffer(bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(freedv,N_FRAMES_PER_BURST)
total_n_bytes = 0
rx_total_frames = 0
rx_frames = 0
rx_bursts = 0
rx_errors = 0
nread_exceptions = 0
timeout = time.time() + TIMEOUT
receive = True
audio_buffer = codec2.audio_buffer(AUDIO_FRAMES_PER_BUFFER*2)
resampler = codec2.resampler()
# Copy received 48 kHz to a file. Listen to this file with:
# aplay -r 48000 -f S16_LE rx48_callback.raw
# Corruption of this file is a good way to detect audio card issues
frx = open("rx48_callback.raw", mode='wb')
# initial number of samples we need
nin = codec2.api.freedv_nin(freedv)
while receive and time.time() < timeout:
# when we have enough samples call FreeDV Rx
while audio_buffer.nbuffer >= nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(freedv, bytes_out, audio_buffer.buffer.ctypes)
audio_buffer.pop(nin)
# call me on every loop!
nin = codec2.api.freedv_nin(freedv)
rx_status = codec2.api.freedv_get_rx_status(freedv)
if rx_status & codec2.api.FREEDV_RX_BIT_ERRORS:
rx_errors = rx_errors + 1
if DEBUGGING_MODE:
rx_status = codec2.api.rx_sync_flags_to_text[rx_status]
print("nin: %5d rx_status: %4s naudio_buffer: %4d" % \
(nin,rx_status,audio_buffer.nbuffer), file=sys.stderr)
if nbytes:
total_n_bytes = total_n_bytes + nbytes
if nbytes == bytes_per_frame:
rx_total_frames = rx_total_frames + 1
rx_frames = rx_frames + 1
if rx_frames == N_FRAMES_PER_BURST:
rx_frames = 0
rx_bursts = rx_bursts + 1
if rx_bursts == N_BURSTS:
receive = False
if time.time() >= timeout:
print("TIMEOUT REACHED")
if nread_exceptions:
print("nread_exceptions %d - receive audio lost! Consider increasing Pyaudio frames_per_buffer..." % \
nread_exceptions, file=sys.stderr)
print(f"RECEIVED BURSTS: {rx_bursts} RECEIVED FRAMES: {rx_total_frames} RX_ERRORS: {rx_errors}", file=sys.stderr)
frx.close()
# cloese pyaudio instance
stream_rx.close()
p.terminate()

View file

@ -0,0 +1,216 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pyaudio
import audioop
import time
import argparse
import sys
import ctypes
from ctypes import *
import pathlib
sys.path.insert(0,'../..')
from tnc import codec2
import numpy as np
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=1, type=int)
parser.add_argument('--framesperburst', dest="N_FRAMES_PER_BURST", default=1, type=int)
parser.add_argument('--audiodev', dest="AUDIO_INPUT_DEVICE", default=-1, type=int, help="audio device number to use")
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
parser.add_argument('--list', dest="LIST", action="store_true", help="list audio devices by number and exit")
parser.add_argument('--timeout', dest="TIMEOUT", default=10, type=int, help="Timeout (seconds) before test ends")
args = parser.parse_args()
if args.LIST:
p = pyaudio.PyAudio()
for dev in range(0,p.get_device_count()):
print("audiodev: ", dev, p.get_device_info_by_index(dev)["name"])
quit()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT_DEVICE
DEBUGGING_MODE = args.DEBUGGING_MODE
TIMEOUT = args.TIMEOUT
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2400*2
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_RX = 48000
# make sure our resampler will work
assert (AUDIO_SAMPLE_RATE_RX / MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48
# SET COUNTERS
rx_total_frames_datac0 = 0
rx_frames_datac0 = 0
rx_bursts_datac0 = 0
rx_total_frames_datac1 = 0
rx_frames_datac1 = 0
rx_bursts_datac1 = 0
rx_total_frames_datac3 = 0
rx_frames_datac3 = 0
rx_bursts_datac3 = 0
# open codec2 instance
datac0_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC0), c_void_p)
datac0_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac0_freedv)/8)
datac0_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac0_freedv)
datac0_bytes_out = create_string_buffer(datac0_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac0_freedv,N_FRAMES_PER_BURST)
datac0_buffer = codec2.audio_buffer(2*AUDIO_FRAMES_PER_BUFFER)
datac1_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC1), c_void_p)
datac1_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac1_freedv)/8)
datac1_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac1_freedv)
datac1_bytes_out = create_string_buffer(datac1_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac1_freedv,N_FRAMES_PER_BURST)
datac1_buffer = codec2.audio_buffer(2*AUDIO_FRAMES_PER_BUFFER)
datac3_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC3), c_void_p)
datac3_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac3_freedv)/8)
datac3_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac3_freedv)
datac3_bytes_out = create_string_buffer(datac3_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac3_freedv,N_FRAMES_PER_BURST)
datac3_buffer = codec2.audio_buffer(2*AUDIO_FRAMES_PER_BUFFER)
resampler = codec2.resampler()
# check if we want to use an audio device then do an pyaudio init
if AUDIO_INPUT_DEVICE != -1:
p = pyaudio.PyAudio()
# auto search for loopback devices
if AUDIO_INPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,p.get_device_count()):
if 'Loopback: PCM' in p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_INPUT_DEVICE = loopback_list[0] #0 = RX 1 = TX
print(f"loopback_list rx: {loopback_list}", file=sys.stderr)
else:
quit()
print(f"AUDIO INPUT DEVICE: {AUDIO_INPUT_DEVICE} DEVICE: {p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['name']} AUDIO SAMPLE RATE: {AUDIO_SAMPLE_RATE_RX}", file=sys.stderr)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE
)
timeout = time.time() + TIMEOUT
print(time.time(),TIMEOUT, timeout)
receive = True
nread_exceptions = 0
# initial nin values
datac0_nin = codec2.api.freedv_nin(datac0_freedv)
datac1_nin = codec2.api.freedv_nin(datac1_freedv)
datac3_nin = codec2.api.freedv_nin(datac3_freedv)
def print_stats():
if DEBUGGING_MODE:
datac0_rxstatus = codec2.api.freedv_get_rx_status(datac0_freedv)
datac0_rxstatus = codec2.api.rx_sync_flags_to_text[datac0_rxstatus]
datac1_rxstatus = codec2.api.freedv_get_rx_status(datac1_freedv)
datac1_rxstatus = codec2.api.rx_sync_flags_to_text[datac1_rxstatus]
datac3_rxstatus = codec2.api.freedv_get_rx_status(datac3_freedv)
datac3_rxstatus = codec2.api.rx_sync_flags_to_text[datac3_rxstatus]
print("NIN0: %5d RX_STATUS0: %4s NIN1: %5d RX_STATUS1: %4s NIN3: %5d RX_STATUS3: %4s" % \
(datac0_nin, datac0_rxstatus, datac1_nin, datac1_rxstatus, datac3_nin, datac3_rxstatus),
file=sys.stderr)
while receive and time.time() < timeout:
if AUDIO_INPUT_DEVICE != -1:
try:
data_in48k = stream_rx.read(AUDIO_FRAMES_PER_BUFFER, exception_on_overflow = True)
except OSError as err:
print(err, file=sys.stderr)
if str(err).find("Input overflowed") != -1:
nread_exceptions += 1
if str(err).find("Stream closed") != -1:
print("Ending....")
receive = False
else:
data_in48k = sys.stdin.buffer.read(AUDIO_FRAMES_PER_BUFFER*2)
# insert samples in buffer
x = np.frombuffer(data_in48k, dtype=np.int16)
if len(x) != AUDIO_FRAMES_PER_BUFFER:
print("len(x)",len(x))
receive = False
x = resampler.resample48_to_8(x)
datac0_buffer.push(x)
datac1_buffer.push(x)
datac3_buffer.push(x)
print_something = False
while datac0_buffer.nbuffer >= datac0_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac0_freedv, datac0_bytes_out, datac0_buffer.buffer.ctypes)
datac0_buffer.pop(datac0_nin)
datac0_nin = codec2.api.freedv_nin(datac0_freedv)
if nbytes == datac0_bytes_per_frame:
rx_total_frames_datac0 = rx_total_frames_datac0 + 1
rx_frames_datac0 = rx_frames_datac0 + 1
if rx_frames_datac0 == N_FRAMES_PER_BURST:
rx_frames_datac0 = 0
rx_bursts_datac0 = rx_bursts_datac0 + 1
print_stats()
while datac1_buffer.nbuffer >= datac1_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac1_freedv, datac1_bytes_out, datac1_buffer.buffer.ctypes)
datac1_buffer.pop(datac1_nin)
datac1_nin = codec2.api.freedv_nin(datac1_freedv)
if nbytes == datac1_bytes_per_frame:
rx_total_frames_datac1 = rx_total_frames_datac1 + 1
rx_frames_datac1 = rx_frames_datac1 + 1
if rx_frames_datac1 == N_FRAMES_PER_BURST:
rx_frames_datac1 = 0
rx_bursts_datac1 = rx_bursts_datac1 + 1
print_stats()
while datac3_buffer.nbuffer >= datac3_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac3_freedv, datac3_bytes_out, datac3_buffer.buffer.ctypes)
datac3_buffer.pop(datac3_nin)
datac3_nin = codec2.api.freedv_nin(datac3_freedv)
if nbytes == datac3_bytes_per_frame:
rx_total_frames_datac3 = rx_total_frames_datac3 + 1
rx_frames_datac3 = rx_frames_datac3 + 1
if rx_frames_datac3 == N_FRAMES_PER_BURST:
rx_frames_datac3 = 0
rx_bursts_datac3 = rx_bursts_datac3 + 1
print_stats()
if rx_bursts_datac0 == N_BURSTS and rx_bursts_datac1 == N_BURSTS and rx_bursts_datac3 == N_BURSTS:
receive = False
if nread_exceptions:
print("nread_exceptions %d - receive audio lost! Consider increasing Pyaudio frames_per_buffer..." % \
nread_exceptions, file=sys.stderr)
# INFO IF WE REACHED TIMEOUT
if time.time() > timeout:
print(f"TIMEOUT REACHED", file=sys.stderr)
print(f"DATAC0: {rx_bursts_datac0}/{rx_total_frames_datac0} DATAC1: {rx_bursts_datac1}/{rx_total_frames_datac1} DATAC3: {rx_bursts_datac3}/{rx_total_frames_datac3}", file=sys.stderr)
if AUDIO_INPUT_DEVICE != -1:
stream_rx.close()
p.terminate()

View file

@ -0,0 +1,151 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import pathlib
import pyaudio
import time
import threading
import audioop
import argparse
import sys
sys.path.insert(0,'../..')
from tnc import codec2
import numpy as np
# GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='FreeDATA TEST')
parser.add_argument('--bursts', dest="N_BURSTS", default=1, type=int)
parser.add_argument('--framesperburst', dest="N_FRAMES_PER_BURST", default=1, type=int)
parser.add_argument('--delay', dest="DELAY_BETWEEN_BURSTS", default=500, type=int)
parser.add_argument('--audiodev', dest="AUDIO_OUTPUT_DEVICE", default=-1, type=int, help="audio output device number to use")
parser.add_argument('--list', dest="LIST", action="store_true", help="list audio devices by number and exit")
args = parser.parse_args()
if args.LIST:
p = pyaudio.PyAudio()
for dev in range(0,p.get_device_count()):
print("audiodev: ", dev, p.get_device_info_by_index(dev)["name"])
quit()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DELAY_BETWEEN_BURSTS = args.DELAY_BETWEEN_BURSTS/1000
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT_DEVICE
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2400
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_TX = 48000
assert (AUDIO_SAMPLE_RATE_TX % MODEM_SAMPLE_RATE) == 0
if AUDIO_OUTPUT_DEVICE != -1:
p = pyaudio.PyAudio()
# auto search for loopback devices
if AUDIO_OUTPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,p.get_device_count()):
if 'Loopback: PCM' in p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_OUTPUT_DEVICE = loopback_list[1] #0 = RX 1 = TX
print(f"loopback_list tx: {loopback_list}", file=sys.stderr)
else:
quit()
# pyaudio init
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
resampler = codec2.resampler()
modes = [codec2.api.FREEDV_MODE_DATAC0, codec2.api.FREEDV_MODE_DATAC1, codec2.api.FREEDV_MODE_DATAC3]
for m in modes:
freedv = cast(codec2.api.freedv_open(m), c_void_p)
n_tx_modem_samples = codec2.api.freedv_get_n_tx_modem_samples(freedv)
mod_out = create_string_buffer(2*n_tx_modem_samples)
n_tx_preamble_modem_samples = codec2.api.freedv_get_n_tx_preamble_modem_samples(freedv)
mod_out_preamble = create_string_buffer(2*n_tx_preamble_modem_samples)
n_tx_postamble_modem_samples = codec2.api.freedv_get_n_tx_postamble_modem_samples(freedv)
mod_out_postamble = create_string_buffer(2*n_tx_postamble_modem_samples)
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv) / 8)
payload_per_frame = bytes_per_frame - 2
# data binary string
data_out = b'HELLO WORLD!'
buffer = bytearray(payload_per_frame)
# set buffersize to length of data which will be send
buffer[:len(data_out)] = data_out
crc = ctypes.c_ushort(codec2.api.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
# convert crc to 2 byte hex string
crc = crc.value.to_bytes(2, byteorder='big')
buffer += crc # append crc16 to buffer
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
for i in range(1,N_BURSTS+1):
# write preamble to txbuffer
codec2.api.freedv_rawdatapreambletx(freedv, mod_out_preamble)
txbuffer = bytes(mod_out_preamble)
# create modulaton for N = FRAMESPERBURST and append it to txbuffer
for n in range(1,N_FRAMES_PER_BURST+1):
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
codec2.api.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and save it into mod_out pointer
txbuffer += bytes(mod_out)
print(f"TX BURST: {i}/{N_BURSTS} FRAME: {n}/{N_FRAMES_PER_BURST}", file=sys.stderr)
# append postamble to txbuffer
codec2.api.freedv_rawdatapostambletx(freedv, mod_out_postamble)
txbuffer += bytes(mod_out_postamble)
# append a delay between bursts as audio silence
samples_delay = int(MODEM_SAMPLE_RATE*DELAY_BETWEEN_BURSTS)
mod_out_silence = create_string_buffer(samples_delay*2)
txbuffer += bytes(mod_out_silence)
# resample up to 48k (resampler works on np.int16)
x = np.frombuffer(txbuffer, dtype=np.int16)
txbuffer_48k = resampler.resample8_to_48(x)
# check if we want to use an audio device or stdout
if AUDIO_OUTPUT_DEVICE != -1:
stream_tx.write(txbuffer_48k.tobytes())
else:
# this test needs a lot of time, so we are having a look at times...
starttime = time.time()
# print data to terminal for piping the output to other programs
sys.stdout.buffer.write(txbuffer_48k)
sys.stdout.flush()
# and at least print the needed time to see which time we needed
timeneeded = time.time()-starttime
#print(f"time: {timeneeded} buffer: {len(txbuffer)}", file=sys.stderr)
# and at last check if we had an openend pyaudio instance and close it
if AUDIO_OUTPUT_DEVICE != -1:
time.sleep(stream_tx.get_output_latency())
stream_tx.stop_stream()
stream_tx.close()
p.terminate()

View file

@ -0,0 +1,39 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Throw away test program to help understand the care and feeding of PyAudio
import pyaudio
import numpy as np
CHUNK = 1024
FS48 = 48000
FTEST = 800
AMP = 16000
# 1. play sine wave out of default sound device
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paInt16,
channels=1,
rate=FS48,
frames_per_buffer=CHUNK,
output=True
)
f48 = open("out48.raw", mode='wb')
t = 0;
for f in range(50):
sine_48k = (AMP*np.cos(2*np.pi*np.arange(t,t+CHUNK)*FTEST/FS48)).astype(np.int16)
t += CHUNK
sine_48k.tofile(f48)
stream.write(sine_48k.tobytes())
sil_48k = np.zeros(CHUNK, dtype=np.int16)
for f in range(50):
sil_48k.tofile(f48)
stream.write(sil_48k)
stream.stop_stream()
stream.close()
p.terminate()
f48.close()

View file

@ -0,0 +1,184 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import ctypes
from ctypes import *
import pathlib
import pyaudio
import sys
import logging
import time
import threading
import sys
import argparse
import numpy as np
sys.path.insert(0,'../..')
from tnc import codec2
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=1, type=int)
parser.add_argument('--framesperburst', dest="N_FRAMES_PER_BURST", default=1, type=int)
parser.add_argument('--mode', dest="FREEDV_MODE", type=str, choices=['datac0', 'datac1', 'datac3'])
parser.add_argument('--audiodev', dest="AUDIO_INPUT_DEVICE", default=-1, type=int,
help="audio device number to use, use -2 to automatically select a loopback device")
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
parser.add_argument('--timeout', dest="TIMEOUT", default=10, type=int, help="Timeout (seconds) before test ends")
parser.add_argument('--list', dest="LIST", action="store_true", help="list audio devices by number and exit")
args = parser.parse_args()
if args.LIST:
p = pyaudio.PyAudio()
for dev in range(0,p.get_device_count()):
print("audiodev: ", dev, p.get_device_info_by_index(dev)["name"])
quit()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT_DEVICE
MODE = codec2.FREEDV_MODE[args.FREEDV_MODE].value
DEBUGGING_MODE = args.DEBUGGING_MODE
TIMEOUT = args.TIMEOUT
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2400*2 # <- consider increasing if you get nread_exceptions > 0
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_RX = 48000
# make sure our resampler will work
assert (AUDIO_SAMPLE_RATE_RX / MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48
# check if we want to use an audio device then do an pyaudio init
if AUDIO_INPUT_DEVICE != -1:
p = pyaudio.PyAudio()
# auto search for loopback devices
if AUDIO_INPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,p.get_device_count()):
if 'Loopback: PCM' in p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_INPUT_DEVICE = loopback_list[0] #0 = RX 1 = TX
print(f"loopback_list rx: {loopback_list}", file=sys.stderr)
else:
quit()
print(f"AUDIO INPUT DEVICE: {AUDIO_INPUT_DEVICE} DEVICE: {p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['name']} \
AUDIO SAMPLE RATE: {AUDIO_SAMPLE_RATE_RX}", file=sys.stderr)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE
)
# ----------------------------------------------------------------
# DATA CHANNEL INITIALISATION
# open codec2 instance
freedv = cast(codec2.api.freedv_open(MODE), c_void_p)
# get number of bytes per frame for mode
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv)/8)
payload_bytes_per_frame = bytes_per_frame -2
n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(freedv)
bytes_out = create_string_buffer(bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(freedv,N_FRAMES_PER_BURST)
total_n_bytes = 0
rx_total_frames = 0
rx_frames = 0
rx_bursts = 0
rx_errors = 0
nread_exceptions = 0
timeout = time.time() + TIMEOUT
receive = True
audio_buffer = codec2.audio_buffer(AUDIO_FRAMES_PER_BUFFER*2)
resampler = codec2.resampler()
# Copy received 48 kHz to a file. Listen to this file with:
# aplay -r 48000 -f S16_LE rx48.raw
# Corruption of this file is a good way to detect audio card issues
frx = open("rx48.raw", mode='wb')
# initial number of samples we need
nin = codec2.api.freedv_nin(freedv)
while receive and time.time() < timeout:
if AUDIO_INPUT_DEVICE != -1:
try:
data_in48k = stream_rx.read(AUDIO_FRAMES_PER_BUFFER, exception_on_overflow = True)
except OSError as err:
print(err, file=sys.stderr)
if str(err).find("Input overflowed") != -1:
nread_exceptions += 1
if str(err).find("Stream closed") != -1:
print("Ending...")
receive = False
else:
data_in48k = sys.stdin.buffer.read(AUDIO_FRAMES_PER_BUFFER*2)
# insert samples in buffer
x = np.frombuffer(data_in48k, dtype=np.int16)
x.tofile(frx)
if len(x) != AUDIO_FRAMES_PER_BUFFER:
receive = False
x = resampler.resample48_to_8(x)
audio_buffer.push(x)
# when we have enough samples call FreeDV Rx
while audio_buffer.nbuffer >= nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(freedv, bytes_out, audio_buffer.buffer.ctypes)
audio_buffer.pop(nin)
# call me on every loop!
nin = codec2.api.freedv_nin(freedv)
rx_status = codec2.api.freedv_get_rx_status(freedv)
if rx_status & codec2.api.FREEDV_RX_BIT_ERRORS:
rx_errors = rx_errors + 1
if DEBUGGING_MODE:
rx_status = codec2.api.rx_sync_flags_to_text[rx_status]
print("nin: %5d rx_status: %4s naudio_buffer: %4d" % \
(nin,rx_status,audio_buffer.nbuffer), file=sys.stderr)
if nbytes:
total_n_bytes = total_n_bytes + nbytes
if nbytes == bytes_per_frame:
rx_total_frames = rx_total_frames + 1
rx_frames = rx_frames + 1
if rx_frames == N_FRAMES_PER_BURST:
rx_frames = 0
rx_bursts = rx_bursts + 1
if rx_bursts == N_BURSTS:
receive = False
if time.time() >= timeout:
print("TIMEOUT REACHED")
if nread_exceptions:
print("nread_exceptions %d - receive audio lost! Consider increasing Pyaudio frames_per_buffer..." % \
nread_exceptions, file=sys.stderr)
print(f"RECEIVED BURSTS: {rx_bursts} RECEIVED FRAMES: {rx_total_frames} RX_ERRORS: {rx_errors}", file=sys.stderr)
frx.close()
# and at last check if we had an openend pyaudio instance and close it
if AUDIO_INPUT_DEVICE != -1:
stream_rx.close()
p.terminate()

View file

@ -0,0 +1,162 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import pathlib
import pyaudio
import time
import argparse
import sys
sys.path.insert(0,'../..')
from tnc import codec2
import numpy as np
# GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=1, type=int)
parser.add_argument('--framesperburst', dest="N_FRAMES_PER_BURST", default=1, type=int)
parser.add_argument('--delay', dest="DELAY_BETWEEN_BURSTS", default=500, type=int,
help="delay between bursts in ms")
parser.add_argument('--mode', dest="FREEDV_MODE", type=str, choices=['datac0', 'datac1', 'datac3'])
parser.add_argument('--audiodev', dest="AUDIO_OUTPUT_DEVICE", default=-1, type=int,
help="audio output device number to use, use -2 to automatically select a loopback device")
parser.add_argument('--list', dest="LIST", action="store_true", help="list audio devices by number and exit")
args = parser.parse_args()
if args.LIST:
p = pyaudio.PyAudio()
for dev in range(0,p.get_device_count()):
print("audiodev: ", dev, p.get_device_info_by_index(dev)["name"])
quit()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DELAY_BETWEEN_BURSTS = args.DELAY_BETWEEN_BURSTS/1000
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT_DEVICE
MODE = codec2.FREEDV_MODE[args.FREEDV_MODE].value
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2400
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_TX = 48000
assert (AUDIO_SAMPLE_RATE_TX % MODEM_SAMPLE_RATE) == 0
# check if we want to use an audio device then do an pyaudio init
if AUDIO_OUTPUT_DEVICE != -1:
p = pyaudio.PyAudio()
# auto search for loopback devices
if AUDIO_OUTPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,p.get_device_count()):
if 'Loopback: PCM' in p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_OUTPUT_DEVICE = loopback_list[1] #0 = RX 1 = TX
print(f"loopback_list tx: {loopback_list}", file=sys.stderr)
else:
quit()
print(f"AUDIO OUTPUT DEVICE: {AUDIO_OUTPUT_DEVICE} DEVICE: {p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['name']} \
AUDIO SAMPLE RATE: {AUDIO_SAMPLE_RATE_TX}", file=sys.stderr)
# pyaudio init
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE
)
resampler = codec2.resampler()
# data binary string
data_out = b'HELLO WORLD!'
# ----------------------------------------------------------------
# open codec2 instance
freedv = cast(codec2.api.freedv_open(MODE), c_void_p)
# get number of bytes per frame for mode
bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(freedv)/8)
payload_bytes_per_frame = bytes_per_frame -2
# init buffer for data
n_tx_modem_samples = codec2.api.freedv_get_n_tx_modem_samples(freedv)
mod_out = create_string_buffer(n_tx_modem_samples * 2)
# init buffer for preample
n_tx_preamble_modem_samples = codec2.api.freedv_get_n_tx_preamble_modem_samples(freedv)
mod_out_preamble = create_string_buffer(n_tx_preamble_modem_samples * 2)
# init buffer for postamble
n_tx_postamble_modem_samples = codec2.api.freedv_get_n_tx_postamble_modem_samples(freedv)
mod_out_postamble = create_string_buffer(n_tx_postamble_modem_samples * 2)
# create buffer for data
buffer = bytearray(payload_bytes_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
# create crc for data frame - we are using the crc function shipped with codec2 to avoid
# crc algorithm incompatibilities
crc = ctypes.c_ushort(codec2.api.freedv_gen_crc16(bytes(buffer), payload_bytes_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
print(f"TOTAL BURSTS: {N_BURSTS} TOTAL FRAMES_PER_BURST: {N_FRAMES_PER_BURST}", file=sys.stderr)
for i in range(1,N_BURSTS+1):
# write preamble to txbuffer
codec2.api.freedv_rawdatapreambletx(freedv, mod_out_preamble)
txbuffer = bytes(mod_out_preamble)
# create modulaton for N = FRAMESPERBURST and append it to txbuffer
for n in range(1,N_FRAMES_PER_BURST+1):
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
codec2.api.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and save it into mod_out pointer
txbuffer += bytes(mod_out)
print(f"TX BURST: {i}/{N_BURSTS} FRAME: {n}/{N_FRAMES_PER_BURST}", file=sys.stderr)
# append postamble to txbuffer
codec2.api.freedv_rawdatapostambletx(freedv, mod_out_postamble)
txbuffer += bytes(mod_out_postamble)
# append a delay between bursts as audio silence
samples_delay = int(MODEM_SAMPLE_RATE*DELAY_BETWEEN_BURSTS)
mod_out_silence = create_string_buffer(samples_delay*2)
txbuffer += bytes(mod_out_silence)
#print(f"samples_delay: {samples_delay} DELAY_BETWEEN_BURSTS: {DELAY_BETWEEN_BURSTS}", file=sys.stderr)
# resample up to 48k (resampler works on np.int16)
x = np.frombuffer(txbuffer, dtype=np.int16)
txbuffer_48k = resampler.resample8_to_48(x)
# check if we want to use an audio device or stdout
if AUDIO_OUTPUT_DEVICE != -1:
# Gotcha: we have to convert from np.int16 to Python "bytes"
stream_tx.write(txbuffer_48k.tobytes())
else:
# print data to terminal for piping the output to other programs
sys.stdout.buffer.write(txbuffer_48k)
sys.stdout.flush()
# and at last check if we had an opened pyaudio instance and close it
if AUDIO_OUTPUT_DEVICE != -1:
time.sleep(stream_tx.get_output_latency())
stream_tx.stop_stream()
stream_tx.close()
p.terminate()

View file

@ -0,0 +1,27 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, sound I/O performed by aplay/arecord at
# Fs=8000 Hz, and we pipe to Python utilities
function check_alsa_loopback {
lsmod | grep snd_aloop >> /dev/null
if [ $? -eq 1 ]; then
echo "ALSA loopback device not present. Please install with:"
echo
echo " sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2"
exit 1
fi
}
check_alsa_loopback
RX_LOG=$(mktemp)
MAX_RUN_TIME=2600
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
arecord --device="plughw:CARD=CHAT2,DEV=0" -r 48000 -f S16_LE -d $MAX_RUN_TIME | python3 test_rx.py --mode datac0 --frames 2 --bursts 5 --debug &
rx_pid=$!
sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 5 --delay 500 | aplay --device="plughw:CARD=CHAT2,DEV=1" -r 48000 -f S16_LE
wait ${rx_pid}

View file

@ -0,0 +1,16 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, tx sound I/O performed by aplay
# and arecord at Fs=48000Hz, we pipe to Python utilities
MAX_RUN_TIME=2600
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
arecord -r 48000 --device="plughw:CARD=CHAT1,DEV=0" -f S16_LE -d $MAX_RUN_TIME | \
python3 test_rx.py --mode datac0 --frames 2 --bursts 5 --debug &
rx_pid=$!
sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 5 --delay 500 | \
aplay -r 48000 --device="plughw:CARD=CHAT1,DEV=1" -f S16_LE
wait ${rx_pid}

View file

@ -0,0 +1,15 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, tx sound I/O performed by Python,
# rx using arecord, Fs=48000Hz
MAX_RUN_TIME=2600
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
arecord -r 48000 --device="plughw:CARD=CHAT1,DEV=0" -f S16_LE -d $MAX_RUN_TIME | \
python3 test_rx.py --mode datac0 --frames 2 --bursts 5 --debug --timeout 20 &
rx_pid=$!
sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 5 --delay 2000 --audiodev -2
wait ${rx_pid}

View file

@ -0,0 +1,15 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, tx sound I/O performed by aplay,
# rx sound I/O by Python, Fs=48000Hz.
MAX_RUN_TIME=2600
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
python3 test_rx.py --mode datac0 --frames 2 --bursts 5 --debug --audiodev -2 &
rx_pid=$!
sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 5 | \
aplay -r 48000 --device="plughw:CARD=CHAT1,DEV=1" -f S16_LE
wait ${rx_pid}

View file

@ -0,0 +1,23 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, Python audio I/O
function check_alsa_loopback {
lsmod | grep snd_aloop >> /dev/null
if [ $? -eq 1 ]; then
echo "ALSA loopback device not present. Please install with:"
echo
echo " sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2"
exit 1
fi
}
check_alsa_loopback
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
python3 test_rx.py --mode datac0 --frames 2 --bursts 3 --audiodev -2 --debug &
rx_pid=$!
sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 3 --audiodev -2
wait ${rx_pid}

View file

@ -0,0 +1,23 @@
#!/bin/bash -x
# Run a test using the virtual sound cards, Python audio I/O
function check_alsa_loopback {
lsmod | grep snd_aloop >> /dev/null
if [ $? -eq 1 ]; then
echo "ALSA loopback device not present. Please install with:"
echo
echo " sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2"
exit 1
fi
}
check_alsa_loopback
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
python3 test_callback_rx.py --mode datac0 --frames 2 --bursts 3 --audiodev -2 --debug &
rx_pid=$!
#sleep 1
python3 test_tx.py --mode datac0 --frames 2 --bursts 3 --audiodev -2
wait ${rx_pid}

View file

@ -0,0 +1,32 @@
#!/bin/bash -x
# Run a test using the virtual sound cards
function check_alsa_loopback {
lsmod | grep snd_aloop >> /dev/null
if [ $? -eq 1 ]; then
echo "ALSA loopback device not present. Please install with:"
echo
echo " sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2"
exit 1
fi
}
myInterruptHandler()
{
exit 1
}
check_alsa_loopback
RX_LOG=$(mktemp)
trap myInterruptHandler SIGINT
# make sure all child processes are killed when we exit
trap 'jobs -p | xargs -r kill' EXIT
python3 test_multimode_rx.py --timeout 60 --framesperburst 2 --bursts 2 --audiodev -2 --debug &
rx_pid=$!
sleep 1
python3 test_multimode_tx.py --framesperburst 2 --bursts 2 --audiodev -2 --delay 500
wait ${rx_pid}

View file

@ -0,0 +1,96 @@
# FreeDV-JATE [Just Another TNC Experiment]
## 002_HIGHSNR_PING_PONG
### INSTALL TEST SUITE
#### Install prerequierements
```
sudo apt update
sudo apt upgrade
sudo apt install git cmake build-essential python3-pip portaudio19-dev python3-pyaudio
pip3 install crcengine
pip3 install threading
```
Go into a directory of your choice
Run the following commands --> They will download and compile the latest codec2 ( dr-packet ) files and LPCNet as well into the directory of your choice
```
wget https://raw.githubusercontent.com/DJ2LS/FreeDV-JATE/002_HIGHSNR_PING_PONG/install_test_suite.sh
chmod +x install_test_suite.sh
./install_test_suite.sh
```
### PARAMETERS
| parameter | description | side
|--|--|--|
| - -txmode 12 | set the mode for FreeDV ( 10,11,12,14 ) | Terminal 1 & Terminal 2
| - -rxmode 14 | set the mode for FreeDV ( 10,11,12,14 ) | Terminal 1 & Terminal 2
| - -frames 1 | set the number of frames per burst | Terminal 1
| - -bursts 1 | set the number of bursts | Terminal 1
| - -audioinput 2 | set the audio device | Terminal 1 & Terminal 2
| - -audiooutput 1 | set the audio device | Terminal 1 & Terminal 2
| - -debug | if used, print additional debugging output | Terminal 1 & Terminal 2
### AUDIO TESTS VIA VIRTUAL AUDIO DEVICE
#### Create audio sinkhole and subdevices
Note: This command needs to be run again after every reboot
```
sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2
```
check if devices have been created
aplay -l
Output should be like this:
```
Karte 0: Intel [HDA Intel], Gerät 0: Generic Analog [Generic Analog]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
```
### Run tests:
#### Terminal 1: Ping
```
python3 PING.py --txmode 12 --rxmode 14 --audioinput 2 --audiooutput 2 --frames 1 --bursts 2
```
Output
```
BURSTS: 2 FRAMES: 1
-----------------------------------------------------------------
TX | PING | BURST [1/2] FRAME [1/1]
RX | PONG | BURST [1/2] FRAME [1/1]
-----------------------------------------------------------------
TX | PING | BURST [2/2] FRAME [1/1]
RX | PONG | BURST [2/2] FRAME [1/1]
```
#### Terminal 2: Pong
```
python3 PONG.py --txmode 14 --rxmode 12 --audioinput 2 --audiooutput 2
```
Output
```
RX | BURST [1/2] FRAME [1/1] >>> SENDING PONG
RX | BURST [2/2] FRAME [1/1] >>> SENDING PONG
```

View file

@ -0,0 +1,199 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import pathlib
import pyaudio
import time
import threading
import argparse
import sys
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--delay', dest="DELAY_BETWEEN_BURSTS", default=0, type=int)
parser.add_argument('--txmode', dest="FREEDV_TX_MODE", default=0, type=int)
parser.add_argument('--rxmode', dest="FREEDV_RX_MODE", default=0, type=int)
parser.add_argument('--audiooutput', dest="AUDIO_OUTPUT", default=0, type=int)
parser.add_argument('--audioinput', dest="AUDIO_INPUT", default=0, type=int)
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DELAY_BETWEEN_BURSTS = args.DELAY_BETWEEN_BURSTS/1000
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
FREEDV_TX_MODE = args.FREEDV_TX_MODE
FREEDV_RX_MODE = args.FREEDV_RX_MODE
DEBUGGING_MODE = args.DEBUGGING_MODE
#-------------------------------------------- LOAD FREEDV
libname = pathlib.Path().absolute() / "codec2/build_linux/src/libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
p = pyaudio.PyAudio()
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#AUDIO_SAMPLE_RATE_TX = int(p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['defaultSampleRate'])
#AUDIO_SAMPLE_RATE_RX = int(p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['defaultSampleRate'])
AUDIO_SAMPLE_RATE_TX = 8000
AUDIO_SAMPLE_RATE_RX = 8000
#--------------------------------------------OPEN AUDIO CHANNEL TX
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE,
)
def receive():
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_RX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
bytes_out = (ctypes.c_ubyte * bytes_per_frame) #bytes_per_frame
bytes_out = bytes_out() #get pointer from bytes_out
total_n_bytes = 0
rx_total_frames = 0
rx_frames = 0
rx_bursts = 0
receive = True
while receive == True:
time.sleep(0.01)
nin = c_lib.freedv_nin(freedv)
nin_converted = int(nin*(AUDIO_SAMPLE_RATE_RX/MODEM_SAMPLE_RATE))
if DEBUGGING_MODE == True:
print("-----------------------------")
print("NIN: " + str(nin) + " [ " + str(nin_converted) + " ]")
data_in = stream_rx.read(nin_converted, exception_on_overflow = False)
data_in = data_in.rstrip(b'\x00')
c_lib.freedv_rawdatarx.argtype = [ctypes.POINTER(ctypes.c_ubyte), bytes_out, data_in] # check if really neccessary
nbytes = c_lib.freedv_rawdatarx(freedv, bytes_out, data_in) # demodulate audio
total_n_bytes = total_n_bytes + nbytes
if DEBUGGING_MODE == True:
print("SYNC: " + str(c_lib.freedv_get_rx_status(freedv)))
if nbytes == bytes_per_frame:
rx_total_frames = rx_total_frames + 1
rx_frames = rx_frames + 1
if rx_frames == N_FRAMES_PER_BURST:
rx_frames = 0
rx_bursts = rx_bursts + 1
c_lib.freedv_set_sync(freedv,0)
burst = bytes_out[0]
n_total_burst = bytes_out[1]
frame = bytes_out[2]
n_total_frame = bytes_out[3]
print("RX | PONG | BURST [" + str(burst) + "/" + str(n_total_burst) + "] FRAME [" + str(frame) + "/" + str(n_total_frame) + "]")
print("-----------------------------------------------------------------")
c_lib.freedv_set_sync(freedv,0)
if rx_bursts == N_BURSTS:
receive = False
RECEIVE = threading.Thread(target=receive, name="RECEIVE THREAD")
RECEIVE.start()
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_TX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
mod_out = ctypes.c_short * n_tx_modem_samples
mod_out = mod_out()
mod_out_preamble = ctypes.c_short * (1760*2) #1760 for mode 10,11,12 #4000 for mode 9
mod_out_preamble = mod_out_preamble()
print("BURSTS: " + str(N_BURSTS) + " FRAMES: " + str(N_FRAMES_PER_BURST) )
print("-----------------------------------------------------------------")
for i in range(0,N_BURSTS):
c_lib.freedv_rawdatapreambletx(freedv, mod_out_preamble);
txbuffer = bytearray()
txbuffer += bytes(mod_out_preamble)
for n in range(0,N_FRAMES_PER_BURST):
data_out = bytearray()
data_out += bytes([i+1])
data_out += bytes([N_BURSTS])
data_out += bytes([n+1])
data_out += bytes([N_FRAMES_PER_BURST])
buffer = bytearray(payload_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
crc = ctypes.c_ushort(c_lib.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
c_lib.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and safe it into mod_out pointer
txbuffer += bytes(mod_out)
print("TX | PING | BURST [" + str(i+1) + "/" + str(N_BURSTS) + "] FRAME [" + str(n+1) + "/" + str(N_FRAMES_PER_BURST) + "]")
stream_tx.write(bytes(txbuffer))
ACK_TIMEOUT = time.time() + 3
txbuffer = bytearray()
#time.sleep(DELAY_BETWEEN_BURSTS)
# WAIT UNTIL WE RECEIVD AN ACK/DATAC0 FRAME
while ACK_TIMEOUT >= time.time():
time.sleep(0.01)
time.sleep(1)
stream_tx.close()
p.terminate()

View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import ctypes
from ctypes import *
import pathlib
import pyaudio
import sys
import logging
import time
import threading
import sys
import argparse
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--txmode', dest="FREEDV_TX_MODE", default=0, type=int)
parser.add_argument('--rxmode', dest="FREEDV_RX_MODE", default=0, type=int)
parser.add_argument('--audioinput', dest="AUDIO_INPUT", default=0, type=int)
parser.add_argument('--audiooutput', dest="AUDIO_OUTPUT", default=0, type=int)
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT
FREEDV_TX_MODE = args.FREEDV_TX_MODE
FREEDV_RX_MODE = args.FREEDV_RX_MODE
DEBUGGING_MODE = args.DEBUGGING_MODE
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
#-------------------------------------------- LOAD FREEDV
libname = pathlib.Path().absolute() / "codec2/build_linux/src/libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
p = pyaudio.PyAudio()
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#AUDIO_SAMPLE_RATE_TX = int(p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['defaultSampleRate'])
#AUDIO_SAMPLE_RATE_RX = int(p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['defaultSampleRate'])
AUDIO_SAMPLE_RATE_TX = 8000
AUDIO_SAMPLE_RATE_RX = 8000
#--------------------------------------------OPEN AUDIO CHANNEL RX
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE,
)
# GENERAL PARAMETERS
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
def send_pong(burst,n_total_burst,frame,n_total_frame):
data_out = bytearray()
data_out[0:1] = bytes([burst])
data_out[1:2] = bytes([n_total_burst])
data_out[2:3] = bytes([frame])
data_out[4:5] = bytes([n_total_frame])
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_TX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
mod_out = ctypes.c_short * n_tx_modem_samples
mod_out = mod_out()
mod_out_preamble = ctypes.c_short * (1760*2) #1760 for mode 10,11,12 #4000 for mode 9
mod_out_preamble = mod_out_preamble()
buffer = bytearray(payload_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
crc = ctypes.c_ushort(c_lib.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
c_lib.freedv_rawdatapreambletx(freedv, mod_out_preamble);
txbuffer = bytearray()
txbuffer += bytes(mod_out_preamble)
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
c_lib.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and safe it into mod_out pointer
txbuffer += bytes(mod_out)
stream_tx.write(bytes(txbuffer))
txbuffer = bytearray()
# DATA CHANNEL INITIALISATION
freedv = c_lib.freedv_open(FREEDV_RX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
n_max_modem_samples = c_lib.freedv_get_n_max_modem_samples(freedv)
bytes_out = (ctypes.c_ubyte * bytes_per_frame) #bytes_per_frame
bytes_out = bytes_out() #get pointer from bytes_out
receive = True
while receive == True:
time.sleep(0.01)
data_in = b''
nin = c_lib.freedv_nin(freedv)
nin_converted = int(nin*(AUDIO_SAMPLE_RATE_RX/MODEM_SAMPLE_RATE))
if DEBUGGING_MODE == True:
print("-----------------------------")
print("NIN: " + str(nin) + " [ " + str(nin_converted) + " ]")
data_in = stream_rx.read(nin_converted, exception_on_overflow = False)
data_in = data_in.rstrip(b'\x00')
c_lib.freedv_rawdatarx.argtype = [ctypes.POINTER(ctypes.c_ubyte), bytes_out, data_in] # check if really neccessary
nbytes = c_lib.freedv_rawdatarx(freedv, bytes_out, data_in) # demodulate audio
if DEBUGGING_MODE == True:
print("SYNC: " + str(c_lib.freedv_get_rx_status(freedv)))
if nbytes == bytes_per_frame:
burst = bytes_out[0]
n_total_burst = bytes_out[1]
frame = bytes_out[2]
n_total_frame = bytes_out[3]
print("RX | BURST [" + str(burst) + "/" + str(n_total_burst) + "] FRAME [" + str(frame) + "/" + str(n_total_frame) + "] >>> SENDING PONG")
TRANSMIT_PONG = threading.Thread(target=send_pong, args=[burst,n_total_burst,frame,n_total_frame], name="SEND PONG")
TRANSMIT_PONG.start()
c_lib.freedv_set_sync(freedv,0)

View file

@ -0,0 +1,22 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import sys
sys.path.insert(0,'../..')
sys.path.insert(0,'../../tnc')
import data_handler
teststring = b'HELLO WORLD'
data_handler.arq_transmit(teststring, 10, 1)

207
tnc/codec2.py Normal file
View file

@ -0,0 +1,207 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import sys
import pathlib
from enum import Enum
import numpy as np
#print("loading codec2 module", file=sys.stderr)
# Enum for codec2 modes
class FREEDV_MODE(Enum):
datac0 = 14
datac1 = 10
datac3 = 12
def freedv_get_mode(mode):
return FREEDV_MODE[mode].value
# -------------------------------------------- LOAD FREEDV
# codec2 search pathes in descending order
# libcodec2.so ctests
# pathlib.Path("codec2/build_linux/src/libcodec2.so.1.0") manual build
# pathlib.Path("lib/codec2/linux/libcodec2.so.1.0") precompiled
# pathlib.Path("../../tnc/codec2/build_linux/src/libcodec2.so.1.0") external loading manual build
# pathlib.Path("../../tnc/lib/codec2/linux/libcodec2.so.1.0") external loading precompiled
libname = ["libcodec2.so", \
pathlib.Path("codec2/build_linux/src/libcodec2.so.1.0"), \
pathlib.Path("lib/codec2/linux/libcodec2.so.1.0"), \
pathlib.Path("../../tnc/codec2/build_linux/src/libcodec2.so.1.0"), \
pathlib.Path("../../tnc/lib/codec2/linux/libcodec2.so.1.0"), \
]
# iterate through codec2 search pathes
for i in libname:
try:
api = ctypes.CDLL(i)
print(f"[C2 ] Codec2 library found - {i}", file=sys.stderr)
break
except:
print(f"[C2 ] Codec2 library not found - {i}", file=sys.stderr)
pass
# quit module if codec2 cant be loaded
if not 'api' in locals():
print(f"[C2 ] Loading Codec2 library failed", file=sys.stderr)
quit()
# ctypes function init
api.freedv_open.argype = [c_int]
api.freedv_open.restype = c_void_p
api.freedv_get_bits_per_modem_frame.argtype = [c_void_p]
api.freedv_get_bits_per_modem_frame.restype = c_int
api.freedv_nin.argtype = [c_void_p]
api.freedv_nin.restype = c_int
api.freedv_rawdatarx.argtype = [c_void_p, c_char_p, c_char_p]
api.freedv_rawdatarx.restype = c_int
api.freedv_rawdatatx.argtype = [c_void_p, c_char_p, c_char_p]
api.freedv_rawdatatx.restype = c_int
api.freedv_rawdatapostambletx.argtype = [c_void_p, c_char_p, c_char_p]
api.freedv_rawdatapostambletx.restype = c_int
api.freedv_rawdatapreambletx.argtype = [c_void_p, c_char_p, c_char_p]
api.freedv_rawdatapreambletx.restype = c_int
api.freedv_get_n_max_modem_samples.argtype = [c_void_p]
api.freedv_get_n_max_modem_samples.restype = c_int
api.freedv_set_frames_per_burst.argtype = [c_void_p, c_int]
api.freedv_set_frames_per_burst.restype = c_void_p
api.freedv_get_rx_status.argtype = [c_void_p]
api.freedv_get_rx_status.restype = c_int
api.freedv_get_modem_stats.argtype = [c_void_p, c_void_p, c_void_p]
api.freedv_get_modem_stats.restype = c_int
api.freedv_get_n_tx_postamble_modem_samples.argtype = [c_void_p]
api.freedv_get_n_tx_postamble_modem_samples.restype = c_int
api.freedv_get_n_tx_preamble_modem_samples.argtype = [c_void_p]
api.freedv_get_n_tx_preamble_modem_samples.restype = c_int
api.freedv_get_n_tx_modem_samples.argtype = [c_void_p]
api.freedv_get_n_tx_modem_samples.restype = c_int
api.freedv_get_n_max_modem_samples.argtype = [c_void_p]
api.freedv_get_n_max_modem_samples.restype = c_int
api.FREEDV_FS_8000 = 8000
api.FREEDV_MODE_DATAC1 = 10
api.FREEDV_MODE_DATAC3 = 12
api.FREEDV_MODE_DATAC0 = 14
# Return code flags for freedv_get_rx_status() function
api.FREEDV_RX_TRIAL_SYNC = 0x1 # demodulator has trial sync
api.FREEDV_RX_SYNC = 0x2 # demodulator has sync
api.FREEDV_RX_BITS = 0x4 # data bits have been returned
api.FREEDV_RX_BIT_ERRORS = 0x8 # FEC may not have corrected all bit errors (not all parity checks OK)
api.rx_sync_flags_to_text = [
"----",
"---T",
"--S-",
"--ST",
"-B--",
"-B-T",
"-BS-",
"-BST",
"E---",
"E--T",
"E-S-",
"E-ST",
"EB--",
"EB-T",
"EBS-",
"EBST"]
# audio buffer ---------------------------------------------------------
class audio_buffer:
# a buffer of int16 samples, using a fixed length numpy array self.buffer for storage
# self.nbuffer is the current number of samples in the buffer
def __init__(self, size):
print("create audio_buffer: ", size)
self.size = size
self.buffer = np.zeros(size, dtype=np.int16)
self.nbuffer = 0
def push(self,samples):
# add samples at the end of the buffer
assert self.nbuffer+len(samples) <= self.size
self.buffer[self.nbuffer:self.nbuffer+len(samples)] = samples
self.nbuffer += len(samples)
def pop(self,size):
# remove samples from the start of the buffer
self.nbuffer -= size;
self.buffer[:self.nbuffer] = self.buffer[size:size+self.nbuffer]
assert self.nbuffer >= 0
# resampler ---------------------------------------------------------
api.FDMDV_OS_48 = int(6) # oversampling rate
api.FDMDV_OS_TAPS_48K = int(48) # number of OS filter taps at 48kHz
api.FDMDV_OS_TAPS_48_8K = int(api.FDMDV_OS_TAPS_48K/api.FDMDV_OS_48) # number of OS filter taps at 8kHz
api.fdmdv_8_to_48_short.argtype = [c_void_p, c_void_p, c_int]
api.fdmdv_48_to_8_short.argtype = [c_void_p, c_void_p, c_int]
class resampler:
# resample an array of variable length, we just store the filter memories here
MEM8 = api.FDMDV_OS_TAPS_48_8K
MEM48 = api.FDMDV_OS_TAPS_48K
def __init__(self):
print("create 48<->8 kHz resampler")
self.filter_mem8 = np.zeros(self.MEM8, dtype=np.int16)
self.filter_mem48 = np.zeros(self.MEM48)
def resample48_to_8(self,in48):
assert in48.dtype == np.int16
# length of input vector must be an integer multiple of api.FDMDV_OS_48
assert(len(in48) % api.FDMDV_OS_48 == 0)
# concat filter memory and input samples
in48_mem = np.zeros(self.MEM48+len(in48), dtype=np.int16)
in48_mem[:self.MEM48] = self.filter_mem48
in48_mem[self.MEM48:] = in48
# In C: pin48=&in48[MEM48]
pin48,flag = in48_mem.__array_interface__['data']
pin48 += 2*self.MEM48
n8 = int(len(in48) / api.FDMDV_OS_48)
out8 = np.zeros(n8, dtype=np.int16)
api.fdmdv_48_to_8_short(out8.ctypes, pin48, n8);
# store memory for next time
self.filter_mem48 = in48_mem[:self.MEM48]
return out8
def resample8_to_48(self,in8):
assert in8.dtype == np.int16
# concat filter memory and input samples
in8_mem = np.zeros(self.MEM8+len(in8), dtype=np.int16)
in8_mem[:self.MEM8] = self.filter_mem8
in8_mem[self.MEM8:] = in8
# In C: pin8=&in8[MEM8]
pin8,flag = in8_mem.__array_interface__['data']
pin8 += 2*self.MEM8
out48 = np.zeros(api.FDMDV_OS_48*len(in8), dtype=np.int16)
api.fdmdv_8_to_48_short(out48.ctypes, pin8, len(in8));
# store memory for next time
self.filter_mem8 = in8_mem[:self.MEM8]
return out48

View file

@ -5,14 +5,13 @@ Created on Sun Dec 27 20:43:40 2020
@author: DJ2LS @author: DJ2LS
""" """
import sys
import logging, structlog, log_handler import logging, structlog, log_handler
import threading import threading
import time import time
from random import randrange from random import randrange
import asyncio import asyncio
import sys
import ujson as json import ujson as json
import static import static

View file

@ -21,6 +21,8 @@ import static
import data_handler import data_handler
import re import re
import codec2
# option for testing miniaudio instead of audioop for sample rate conversion # option for testing miniaudio instead of audioop for sample rate conversion
#import miniaudio #import miniaudio
@ -103,13 +105,20 @@ class RF():
def __init__(self): def __init__(self):
self.AUDIO_SAMPLE_RATE_RX = 48000 self.AUDIO_SAMPLE_RATE_RX = 48000
self.AUDIO_SAMPLE_RATE_TX = 48000 self.AUDIO_SAMPLE_RATE_TX = 48000
self.MODEM_SAMPLE_RATE = 8000 self.MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
self.AUDIO_FRAMES_PER_BUFFER_RX = 8192 #8192 self.AUDIO_FRAMES_PER_BUFFER_RX = 2400*2 #8192
self.AUDIO_FRAMES_PER_BUFFER_TX = 8 #8192 Lets to some tests with very small chunks for TX self.AUDIO_FRAMES_PER_BUFFER_TX = 8 #8192 Lets to some tests with very small chunks for TX
self.AUDIO_CHUNKS = 48 #8 * (self.AUDIO_SAMPLE_RATE_RX/self.MODEM_SAMPLE_RATE) #48 self.AUDIO_CHUNKS = 48 #8 * (self.AUDIO_SAMPLE_RATE_RX/self.MODEM_SAMPLE_RATE) #48
self.AUDIO_CHANNELS = 1 self.AUDIO_CHANNELS = 1
# make sure our resampler will work
assert (self.AUDIO_SAMPLE_RATE_RX / self.MODEM_SAMPLE_RATE) == codec2.api.FDMDV_OS_48
# small hack for initializing codec2 via codec2.py module
# TODO: we need to change the entire modem module to integrate codec2 module
self.c_lib = codec2.api
self.resampler = codec2.resampler()
'''
# -------------------------------------------- LOAD FREEDV # -------------------------------------------- LOAD FREEDV
try: try:
# we check at first for libcodec2 compiled from source # we check at first for libcodec2 compiled from source
@ -132,11 +141,11 @@ class RF():
structlog.get_logger("structlog").info("[TNC] Codec2 found", path=libname, origin="precompiled") structlog.get_logger("structlog").info("[TNC] Codec2 found", path=libname, origin="precompiled")
else: else:
structlog.get_logger("structlog").critical("[TNC] Codec2 not found") structlog.get_logger("structlog").critical("[TNC] Codec2 not found")
'''
'''
# --------------------------------------------CTYPES FUNCTION INIT # --------------------------------------------CTYPES FUNCTION INIT
# TODO: WE STILL HAVE SOME MISSING FUNCTIONS! # TODO: WE STILL HAVE SOME MISSING FUNCTIONS!
self.c_lib.freedv_open.argype = [c_int] self.c_lib.freedv_open.argype = [c_int]
self.c_lib.freedv_open.restype = c_void_p self.c_lib.freedv_open.restype = c_void_p
@ -154,8 +163,7 @@ class RF():
self.c_lib.freedv_set_frames_per_burst.argtype = [c_void_p, c_int] self.c_lib.freedv_set_frames_per_burst.argtype = [c_void_p, c_int]
self.c_lib.freedv_set_frames_per_burst.restype = c_int self.c_lib.freedv_set_frames_per_burst.restype = c_int
'''
# --------------------------------------------CREATE PYAUDIO INSTANCE # --------------------------------------------CREATE PYAUDIO INSTANCE
try: try:
@ -168,6 +176,17 @@ class RF():
self.p = pyaudio.PyAudio() self.p = pyaudio.PyAudio()
atexit.register(self.p.terminate) atexit.register(self.p.terminate)
# --------------------------------------------OPEN AUDIO CHANNEL RX # --------------------------------------------OPEN AUDIO CHANNEL RX
# optional auto selection of loopback device if using in testmode
if static.AUDIO_INPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,self.p.get_device_count()):
if 'Loopback: PCM' in self.p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
AUDIO_INPUT_DEVICE = loopback_list[0] #0 = RX 1 = TX
print(f"loopback_list rx: {loopback_list}", file=sys.stderr)
self.stream_rx = self.p.open(format=pyaudio.paInt16, self.stream_rx = self.p.open(format=pyaudio.paInt16,
channels=self.AUDIO_CHANNELS, channels=self.AUDIO_CHANNELS,
rate=self.AUDIO_SAMPLE_RATE_RX, rate=self.AUDIO_SAMPLE_RATE_RX,
@ -176,6 +195,16 @@ class RF():
input_device_index=static.AUDIO_INPUT_DEVICE input_device_index=static.AUDIO_INPUT_DEVICE
) )
# --------------------------------------------OPEN AUDIO CHANNEL TX # --------------------------------------------OPEN AUDIO CHANNEL TX
# optional auto selection of loopback device if using in testmode
if static.AUDIO_OUTPUT_DEVICE == -2:
loopback_list = []
for dev in range(0,self.p.get_device_count()):
if 'Loopback: PCM' in self.p.get_device_info_by_index(dev)["name"]:
loopback_list.append(dev)
if len(loopback_list) >= 2:
static.AUDIO_OUTPUT_DEVICE = loopback_list[1] #0 = RX 1 = TX
print(f"loopback_list tx: {loopback_list}", file=sys.stderr)
self.stream_tx = self.p.open(format=pyaudio.paInt16, self.stream_tx = self.p.open(format=pyaudio.paInt16,
channels=self.AUDIO_CHANNELS, channels=self.AUDIO_CHANNELS,
rate=self.AUDIO_SAMPLE_RATE_TX, rate=self.AUDIO_SAMPLE_RATE_TX,
@ -446,11 +475,45 @@ class RF():
# -------------------------------------------------------------------------------------------------------- # --------------------------------------------------------------------------------------------------------
def receive(self): def receive(self):
'''
freedv_mode_datac0 = 14 freedv_mode_datac0 = 14
freedv_mode_datac1 = 10 freedv_mode_datac1 = 10
freedv_mode_datac3 = 12 freedv_mode_datac3 = 12
'''
# open codec2 instance
datac0_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC0), c_void_p)
datac0_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac0_freedv)/8)
datac0_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac0_freedv)
datac0_bytes_out = create_string_buffer(datac0_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac0_freedv,1)
datac0_buffer = codec2.audio_buffer(2*self.AUDIO_FRAMES_PER_BUFFER_RX)
datac0_modem_stats_snr = c_float()
datac0_modem_stats_sync = c_int()
static.FREEDV_SIGNALLING_BYTES_PER_FRAME = datac0_bytes_per_frame
static.FREEDV_SIGNALLING_PAYLOAD_PER_FRAME = datac0_bytes_per_frame - 2
datac1_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC1), c_void_p)
datac1_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac1_freedv)/8)
datac1_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac1_freedv)
datac1_bytes_out = create_string_buffer(datac1_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac1_freedv,1)
datac1_buffer = codec2.audio_buffer(2*self.AUDIO_FRAMES_PER_BUFFER_RX)
datac1_modem_stats_snr = c_float()
datac1_modem_stats_sync = c_int()
datac3_freedv = cast(codec2.api.freedv_open(codec2.api.FREEDV_MODE_DATAC3), c_void_p)
datac3_bytes_per_frame = int(codec2.api.freedv_get_bits_per_modem_frame(datac3_freedv)/8)
datac3_n_max_modem_samples = codec2.api.freedv_get_n_max_modem_samples(datac3_freedv)
datac3_bytes_out = create_string_buffer(datac3_bytes_per_frame * 2)
codec2.api.freedv_set_frames_per_burst(datac3_freedv,1)
datac3_buffer = codec2.audio_buffer(2*self.AUDIO_FRAMES_PER_BUFFER_RX)
datac3_modem_stats_snr = c_float()
datac3_modem_stats_sync = c_int()
'''
# DATAC0 # DATAC0
datac0_freedv = cast(self.c_lib.freedv_open(freedv_mode_datac0), c_void_p) datac0_freedv = cast(self.c_lib.freedv_open(freedv_mode_datac0), c_void_p)
@ -485,20 +548,66 @@ class RF():
datac3_modem_stats_snr = c_float() datac3_modem_stats_snr = c_float()
datac3_modem_stats_sync = c_int() datac3_modem_stats_sync = c_int()
datac3_buffer = bytes() datac3_buffer = bytes()
''' '''
if mode == static.ARQ_DATA_CHANNEL_MODE:
static.FREEDV_DATA_BYTES_PER_FRAME = bytes_per_frame
static.FREEDV_DATA_PAYLOAD_PER_FRAME = bytes_per_frame - 2
self.c_lib.freedv_set_frames_per_burst(freedv, 0)
else:
#pass
self.c_lib.freedv_set_frames_per_burst(freedv, 0)
'''
fft_buffer = bytes() fft_buffer = bytes()
while True: receive = True
while receive:
try:
data_in48k = self.stream_rx.read(self.AUDIO_FRAMES_PER_BUFFER_RX, exception_on_overflow = True)
except OSError as err:
print(err, file=sys.stderr)
if str(err).find("Input overflowed") != -1:
nread_exceptions += 1
if str(err).find("Stream closed") != -1:
print("Ending...")
receive = False
# insert samples in buffer
x = np.frombuffer(data_in48k, dtype=np.int16)
# x.tofile(frx)
if len(x) != self.AUDIO_FRAMES_PER_BUFFER_RX:
receive = False
x = self.resampler.resample48_to_8(x)
datac0_buffer.push(x)
datac1_buffer.push(x)
datac3_buffer.push(x)
# when we have enough samples call FreeDV Rx
while datac0_buffer.nbuffer >= datac0_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac0_freedv, datac0_bytes_out, datac0_buffer.buffer.ctypes)
datac0_buffer.pop(datac0_nin)
datac0_nin = codec2.api.freedv_nin(datac0_freedv)
if nbytes == datac0_bytes_per_frame:
datac0_task = threading.Thread(target=self.process_data, args=[datac0_bytes_out, datac0_freedv, datac0_bytes_per_frame])
datac0_task.start()
while datac1_buffer.nbuffer >= datac1_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac1_freedv, datac1_bytes_out, datac1_buffer.buffer.ctypes)
datac1_buffer.pop(datac1_nin)
datac1_nin = codec2.api.freedv_nin(datac1_freedv)
if nbytes == datac1_bytes_per_frame:
datac1_task = threading.Thread(target=self.process_data, args=[datac1_bytes_out, datac1_freedv, datac1_bytes_per_frame])
datac1_task.start()
while datac3_buffer.nbuffer >= datac3_nin:
# demodulate audio
nbytes = codec2.api.freedv_rawdatarx(datac3_freedv, datac3_bytes_out, datac3_buffer.buffer.ctypes)
datac3_buffer.pop(datac3_nin)
datac3_nin = codec2.api.freedv_nin(datac3_freedv)
if nbytes == datac3_bytes_per_frame:
datac3_task = threading.Thread(target=self.process_data, args=[datac3_bytes_out, datac1_freedv, datac1_bytes_per_frame])
datac3_task.start()
'''
data_in = bytes() data_in = bytes()
data_in = self.stream_rx.read(self.AUDIO_CHUNKS, exception_on_overflow=False) data_in = self.stream_rx.read(self.AUDIO_CHUNKS, exception_on_overflow=False)
data_in = audioop.ratecv(data_in, 2, 1, self.AUDIO_SAMPLE_RATE_RX, self.MODEM_SAMPLE_RATE, None) data_in = audioop.ratecv(data_in, 2, 1, self.AUDIO_SAMPLE_RATE_RX, self.MODEM_SAMPLE_RATE, None)
@ -509,15 +618,7 @@ class RF():
datac1_nin = self.c_lib.freedv_nin(datac1_freedv) * 2 datac1_nin = self.c_lib.freedv_nin(datac1_freedv) * 2
datac3_nin = self.c_lib.freedv_nin(datac3_freedv) * 2 datac3_nin = self.c_lib.freedv_nin(datac3_freedv) * 2
'''
# refill buffer only if every mode has worked with its data
if (len(datac0_buffer) < (datac0_nin)) and (len(datac1_buffer) < (datac1_nin)) and (len(datac3_buffer) < (datac3_nin)):
datac0_buffer += data_in
datac1_buffer += data_in
datac3_buffer += data_in
'''
datac0_buffer += data_in datac0_buffer += data_in
datac1_buffer += data_in datac1_buffer += data_in
datac3_buffer += data_in datac3_buffer += data_in
@ -582,7 +683,7 @@ class RF():
datac3_task = threading.Thread(target=self.process_data, args=[datac3_bytes_out, datac3_freedv, datac3_bytes_per_frame]) datac3_task = threading.Thread(target=self.process_data, args=[datac3_bytes_out, datac3_freedv, datac3_bytes_per_frame])
datac3_task.start() datac3_task.start()
'''
# forward data only if broadcast or we are the receiver # forward data only if broadcast or we are the receiver
# bytes_out[1:2] == callsign check for signalling frames, # bytes_out[1:2] == callsign check for signalling frames,
# bytes_out[6:7] == callsign check for data frames, # bytes_out[6:7] == callsign check for data frames,

View file

@ -36,7 +36,7 @@ SOCKET_TIMEOUT = 3 # seconds
HAMLIB_PTT_TYPE = 'RTS' HAMLIB_PTT_TYPE = 'RTS'
PTT_STATE = False PTT_STATE = False
HAMLIB_DEVICE_ID = 0 HAMLIB_DEVICE_ID = 'RIG_MODEL_DUMMY_NOVFO'
HAMLIB_DEVICE_PORT = '/dev/ttyUSB0' HAMLIB_DEVICE_PORT = '/dev/ttyUSB0'
HAMLIB_SERIAL_SPEED = '9600' HAMLIB_SERIAL_SPEED = '9600'
@ -54,8 +54,8 @@ SCATTER = []
# --------------------------------- # ---------------------------------
# Audio Defaults # Audio Defaults
AUDIO_INPUT_DEVICE = 1 AUDIO_INPUT_DEVICE = -2
AUDIO_OUTPUT_DEVICE = 1 AUDIO_OUTPUT_DEVICE = -2
AUDIO_RMS = 0 AUDIO_RMS = 0