skeleton cmake test framework, not doing any significant etsting yet

This commit is contained in:
drowe67 2021-12-12 08:09:31 +10:30 committed by David Rowe
parent 96a4fde978
commit 06d88a5ca1
9 changed files with 908 additions and 0 deletions

26
CMakeLists.txt Normal file
View file

@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.0)
project (FreeDATA)
include(CTest)
enable_testing()
# Find codec2
if(CODEC2_BUILD_DIR)
find_package(codec2 REQUIRED
PATHS ${CODEC2_BUILD_DIR}
NO_DEFAULT_PATH
CONFIGS codec2.cmake
)
if(codec2_FOUND)
message(STATUS "Codec2 library found in build tree.")
endif()
else()
find_package(codec2 REQUIRED)
endif()
add_test(NAME 000_audio_tests
COMMAND sh -c "cd ${CMAKE_CURRENT_SOURCE_DIR}/test/000_audio_tests;
python3 sinustest.py")
add_test(NAME 001_highsnr_stdio_audio
COMMAND sh -c "cd ${CMAKE_CURRENT_SOURCE_DIR}/test/001_highsnr_stdio_audio;
python3 test_tx.py --mode 12 --delay 500 --frames 2 --bursts 1 > t.s16")

View file

@ -0,0 +1,14 @@
# FreeDV-JATE
## Just Another TNC Experiment
Audio tests!
## Frame rate conversion
### 48000Hz down to 8000Hz with different frame rate converters
#### Install
pip3 install miniaudio
#### Run
python3 sinustest.py

View file

@ -0,0 +1,50 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import pyaudio
import numpy as np
import audioop
import miniaudio
volume = 0.5
fshigh = 48000
fslow = 8000
duration = 50.0
f = 440.0
samples_48000 = (np.sin(2*np.pi*np.arange(fshigh*duration)*f/fshigh)).astype(np.float32)
samples_8000 = (np.sin(2*np.pi*np.arange(fslow*duration)*f/fslow)).astype(np.float32)
samples_converted = audioop.ratecv(samples_48000,2,1,fshigh, fslow , None)
samples_converted = bytes(samples_converted[0])
converted_frames = miniaudio.convert_frames(miniaudio.SampleFormat.FLOAT32, 1, 48000, bytes(samples_48000), miniaudio.SampleFormat.FLOAT32, 1, 8000)
#converted_frames = bytes(converted_frames)
print(type(samples_8000))
print(type(samples_converted))
print(type(converted_frames))
# TODO - write ouputs to .int16 files so we can compare them
'''
p = pyaudio.PyAudio()
stream = p.open(format=pyaudio.paFloat32,
channels=2,
rate=fshigh,
output=True,
output_device_index=0 #static.AUDIO_OUTPUT_DEVICE
)
print("original 48000Hz sample")
stream.write(samples_48000)
print("original 8000Hz sample")
stream.write(samples_8000)
print("48000Hz to 8000Hz with audioop")
stream.write(samples_converted)
print("48000Hz to 8000Hz with miniaudio")
stream.write(converted_frames)
stream.stop_stream()
stream.close()
p.terminate()
'''

View file

@ -0,0 +1,90 @@
# FreeDV-JATE [Just Another TNC Experiment]
## 001_HIGHSNR_STDIO_AUDIO TEST SUITE
### INSTALL TEST SUITE
#### Install prerequierements
```
sudo apt update
sudo apt upgrade
sudo apt install git cmake build-essential python3-pip portaudio19-dev python3-pyaudio
pip3 install crcengine
pip3 install threading
```
Change into a directory of your choice
Run the following commands --> They will download and compile the latest codec2 ( dr-packet ) files and LPCNet as well into the directory of your choice
```
wget https://raw.githubusercontent.com/DJ2LS/FreeDV-JATE/001_HIGHSNR_STDIO_AUDIO/install_test_suite.sh
chmod +x install_test_suite.sh
./install_test_suite.sh
```
### PARAMETERS
| parameter | description | side
|--|--|--|
| - -mode 12 | set the mode for FreeDV ( 10,11,12 ) | TX & RX
| - -delay 1 | set the delay between burst | TX
| - -frames 1 | set the number of frames per burst | TX & RX
| - -bursts 1 | set the number of bursts | TX & RX
| - -input "audio" | if set, program switches to audio instead of stdin | RX
| - -audioinput 2 | set the audio device | RX
| - -output "audio" | if set, program switches to audio instead of stdout | TX
| - -audiooutput 1 | set the audio device | TX
| - -debug | if used, print additional debugging output | RX
### STDIO TESTS FOR TERMINAL USAGE ONLY
python3 test_tx.py --mode 12 --delay 500 --frames 2 --bursts 1 | python3 test_rx.py --mode 12 --frames 2 --bursts 1
### AUDIO TESTS VIA VIRTUAL AUDIO DEVICE
#### Create audio sinkhole and subdevices
Note: This command needs to be run again after every reboot
```
sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2
```
check if devices have been created
aplay -l
Output should be like this:
Karte 0: Intel [HDA Intel], Gerät 0: Generic Analog [Generic Analog]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
#### Run tests:
Its important, to run TEST_RX at first to reduce the chance that we get some system side audio errors. Tests are showing, that its important to start with audio device "2" at first and then go to the lower virtual devices "1".
Audio device "0" is the default sound card.
##### RX side
python3 test_rx.py --mode 12 --frames 2 --bursts 1 --input "audio" --audioinput 2 --debug
##### TX side
python3 test_tx.py --mode 12 --delay 500 --frames 2 --bursts 1 --output "audio" --audiooutput 1

View file

@ -0,0 +1,138 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import ctypes
from ctypes import *
import pathlib
import pyaudio
import audioop
import sys
import logging
import time
import threading
import sys
import argparse
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--mode', dest="FREEDV_MODE", default=0, type=int)
parser.add_argument('--input', dest="DATA_INPUT", type=str)
parser.add_argument('--audioinput', dest="AUDIO_INPUT", default=0, type=int)
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DATA_INPUT = args.DATA_INPUT
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT
FREEDV_MODE = args.FREEDV_MODE
DEBUGGING_MODE = args.DEBUGGING_MODE
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
#-------------------------------------------- LOAD FREEDV
libname = "libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#--------------------------------------------OPEN AUDIO CHANNEL RX
if DATA_INPUT == "audio":
p = pyaudio.PyAudio()
AUDIO_SAMPLE_RATE_RX = int(p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['defaultSampleRate'])
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE,
)
# GENERAL PARAMETERS
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
# DATA CHANNEL INITIALISATION
freedv = c_lib.freedv_open(FREEDV_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
n_max_modem_samples = c_lib.freedv_get_n_max_modem_samples(freedv)
bytes_out = (ctypes.c_ubyte * bytes_per_frame) #bytes_per_frame
bytes_out = bytes_out() #get pointer from bytes_out
c_lib.freedv_set_frames_per_burst(freedv,N_FRAMES_PER_BURST)
total_n_bytes = 0
rx_total_frames = 0
rx_frames = 0
rx_bursts = 0
receive = True
while receive == True:
time.sleep(0.01)
data_in = b''
if DATA_INPUT == "audio":
nin = c_lib.freedv_nin(freedv)
nin_converted = int(nin*(AUDIO_SAMPLE_RATE_RX/MODEM_SAMPLE_RATE))
if DEBUGGING_MODE == True:
print("-----------------------------")
print("NIN: " + str(nin) + " [ " + str(nin_converted) + " ]")
data_in = stream_rx.read(nin_converted, exception_on_overflow = False)
data_in = audioop.ratecv(data_in,2,1,AUDIO_SAMPLE_RATE_RX, MODEM_SAMPLE_RATE, None)
data_in = data_in[0].rstrip(b'\x00')
else:
nin = c_lib.freedv_nin(freedv)*2
data_in = sys.stdin.buffer.read(nin)
c_lib.freedv_rawdatarx.argtype = [ctypes.POINTER(ctypes.c_ubyte), bytes_out, data_in] # check if really neccessary
nbytes = c_lib.freedv_rawdatarx(freedv, bytes_out, data_in) # demodulate audio
total_n_bytes = total_n_bytes + nbytes
if DEBUGGING_MODE == True:
print("SYNC: " + str(c_lib.freedv_get_rx_status(freedv)))
if nbytes == bytes_per_frame:
rx_total_frames = rx_total_frames + 1
rx_frames = rx_frames + 1
if rx_frames == N_FRAMES_PER_BURST:
rx_frames = 0
rx_bursts = rx_bursts + 1
#c_lib.freedv_set_sync(freedv,0) #this should be automatically done by c_lib.freedv_set_frames_per_burst(freedv,N_FRAMES_PER_BURST)
if rx_bursts == N_BURSTS:
receive = False
print("------------------------------")
print("BURSTS: " + str(rx_bursts))
print("TOTAL RECEIVED BYTES: " + str(total_n_bytes))
print("RECEIVED FRAMES: " + str(rx_total_frames))

View file

@ -0,0 +1,129 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import pathlib
import pyaudio
import time
import threading
import audioop
import argparse
import sys
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--delay', dest="DELAY_BETWEEN_BURSTS", default=0, type=int)
parser.add_argument('--mode', dest="FREEDV_MODE", default=0, type=int)
parser.add_argument('--output', dest="DATA_OUTPUT", type=str)
parser.add_argument('--audiooutput', dest="AUDIO_OUTPUT", default=0, type=int)
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DELAY_BETWEEN_BURSTS = args.DELAY_BETWEEN_BURSTS/1000
DATA_OUTPUT = args.DATA_OUTPUT
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
mode = args.FREEDV_MODE
data_out = b'HELLO WORLD!'
print(N_BURSTS)
print(N_FRAMES_PER_BURST)
#-------------------------------------------- LOAD FREEDV
libname = "libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#--------------------------------------------OPEN AUDIO CHANNEL TX
if DATA_OUTPUT == "audio":
p = pyaudio.PyAudio()
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
AUDIO_SAMPLE_RATE_TX = int(p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['defaultSampleRate'])
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(mode)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
mod_out = ctypes.c_short * n_tx_modem_samples
mod_out = mod_out()
mod_out_preamble = ctypes.c_short * (1760*2) #1760 for mode 10,11,12 #4000 for mode 9
mod_out_preamble = mod_out_preamble()
buffer = bytearray(payload_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
crc = ctypes.c_ushort(c_lib.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
print("BURSTS: " + str(N_BURSTS) + " FRAMES: " + str(N_FRAMES_PER_BURST) )
for i in range(0,N_BURSTS):
c_lib.freedv_rawdatapreambletx(freedv, mod_out_preamble);
txbuffer = bytearray()
txbuffer += bytes(mod_out_preamble)
for n in range(0,N_FRAMES_PER_BURST):
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
c_lib.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and safe it into mod_out pointer
txbuffer += bytes(mod_out)
if DATA_OUTPUT == "audio":
audio = audioop.ratecv(txbuffer,2,1,MODEM_SAMPLE_RATE, AUDIO_SAMPLE_RATE_TX, None)
stream_tx.write(audio[0])
txbuffer = bytearray()
else:
sys.stdout.buffer.write(txbuffer) # print data to terminal for piping the output to other programs
sys.stdout.flush()
txbuffer = bytearray()
time.sleep(DELAY_BETWEEN_BURSTS)
if DATA_OUTPUT == "audio":
stream_tx.close()
p.terminate()

View file

@ -0,0 +1,96 @@
# FreeDV-JATE [Just Another TNC Experiment]
## 002_HIGHSNR_PING_PONG
### INSTALL TEST SUITE
#### Install prerequierements
```
sudo apt update
sudo apt upgrade
sudo apt install git cmake build-essential python3-pip portaudio19-dev python3-pyaudio
pip3 install crcengine
pip3 install threading
```
Go into a directory of your choice
Run the following commands --> They will download and compile the latest codec2 ( dr-packet ) files and LPCNet as well into the directory of your choice
```
wget https://raw.githubusercontent.com/DJ2LS/FreeDV-JATE/002_HIGHSNR_PING_PONG/install_test_suite.sh
chmod +x install_test_suite.sh
./install_test_suite.sh
```
### PARAMETERS
| parameter | description | side
|--|--|--|
| - -txmode 12 | set the mode for FreeDV ( 10,11,12,14 ) | Terminal 1 & Terminal 2
| - -rxmode 14 | set the mode for FreeDV ( 10,11,12,14 ) | Terminal 1 & Terminal 2
| - -frames 1 | set the number of frames per burst | Terminal 1
| - -bursts 1 | set the number of bursts | Terminal 1
| - -audioinput 2 | set the audio device | Terminal 1 & Terminal 2
| - -audiooutput 1 | set the audio device | Terminal 1 & Terminal 2
| - -debug | if used, print additional debugging output | Terminal 1 & Terminal 2
### AUDIO TESTS VIA VIRTUAL AUDIO DEVICE
#### Create audio sinkhole and subdevices
Note: This command needs to be run again after every reboot
```
sudo modprobe snd-aloop index=1,2 enable=1,1 pcm_substreams=1,1 id=CHAT1,CHAT2
```
check if devices have been created
aplay -l
Output should be like this:
```
Karte 0: Intel [HDA Intel], Gerät 0: Generic Analog [Generic Analog]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 1: CHAT1 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 0: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
Karte 2: CHAT2 [Loopback], Gerät 1: Loopback PCM [Loopback PCM]
Sub-Geräte: 1/1
Sub-Gerät #0: subdevice #0
```
### Run tests:
#### Terminal 1: Ping
```
python3 PING.py --txmode 12 --rxmode 14 --audioinput 2 --audiooutput 2 --frames 1 --bursts 2
```
Output
```
BURSTS: 2 FRAMES: 1
-----------------------------------------------------------------
TX | PING | BURST [1/2] FRAME [1/1]
RX | PONG | BURST [1/2] FRAME [1/1]
-----------------------------------------------------------------
TX | PING | BURST [2/2] FRAME [1/1]
RX | PONG | BURST [2/2] FRAME [1/1]
```
#### Terminal 2: Pong
```
python3 PONG.py --txmode 14 --rxmode 12 --audioinput 2 --audiooutput 2
```
Output
```
RX | BURST [1/2] FRAME [1/1] >>> SENDING PONG
RX | BURST [2/2] FRAME [1/1] >>> SENDING PONG
```

View file

@ -0,0 +1,199 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import ctypes
from ctypes import *
import pathlib
import pyaudio
import time
import threading
import argparse
import sys
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--delay', dest="DELAY_BETWEEN_BURSTS", default=0, type=int)
parser.add_argument('--txmode', dest="FREEDV_TX_MODE", default=0, type=int)
parser.add_argument('--rxmode', dest="FREEDV_RX_MODE", default=0, type=int)
parser.add_argument('--audiooutput', dest="AUDIO_OUTPUT", default=0, type=int)
parser.add_argument('--audioinput', dest="AUDIO_INPUT", default=0, type=int)
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
DELAY_BETWEEN_BURSTS = args.DELAY_BETWEEN_BURSTS/1000
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
FREEDV_TX_MODE = args.FREEDV_TX_MODE
FREEDV_RX_MODE = args.FREEDV_RX_MODE
DEBUGGING_MODE = args.DEBUGGING_MODE
#-------------------------------------------- LOAD FREEDV
libname = pathlib.Path().absolute() / "codec2/build_linux/src/libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
p = pyaudio.PyAudio()
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#AUDIO_SAMPLE_RATE_TX = int(p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['defaultSampleRate'])
#AUDIO_SAMPLE_RATE_RX = int(p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['defaultSampleRate'])
AUDIO_SAMPLE_RATE_TX = 8000
AUDIO_SAMPLE_RATE_RX = 8000
#--------------------------------------------OPEN AUDIO CHANNEL TX
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE,
)
def receive():
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_RX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
bytes_out = (ctypes.c_ubyte * bytes_per_frame) #bytes_per_frame
bytes_out = bytes_out() #get pointer from bytes_out
total_n_bytes = 0
rx_total_frames = 0
rx_frames = 0
rx_bursts = 0
receive = True
while receive == True:
time.sleep(0.01)
nin = c_lib.freedv_nin(freedv)
nin_converted = int(nin*(AUDIO_SAMPLE_RATE_RX/MODEM_SAMPLE_RATE))
if DEBUGGING_MODE == True:
print("-----------------------------")
print("NIN: " + str(nin) + " [ " + str(nin_converted) + " ]")
data_in = stream_rx.read(nin_converted, exception_on_overflow = False)
data_in = data_in.rstrip(b'\x00')
c_lib.freedv_rawdatarx.argtype = [ctypes.POINTER(ctypes.c_ubyte), bytes_out, data_in] # check if really neccessary
nbytes = c_lib.freedv_rawdatarx(freedv, bytes_out, data_in) # demodulate audio
total_n_bytes = total_n_bytes + nbytes
if DEBUGGING_MODE == True:
print("SYNC: " + str(c_lib.freedv_get_rx_status(freedv)))
if nbytes == bytes_per_frame:
rx_total_frames = rx_total_frames + 1
rx_frames = rx_frames + 1
if rx_frames == N_FRAMES_PER_BURST:
rx_frames = 0
rx_bursts = rx_bursts + 1
c_lib.freedv_set_sync(freedv,0)
burst = bytes_out[0]
n_total_burst = bytes_out[1]
frame = bytes_out[2]
n_total_frame = bytes_out[3]
print("RX | PONG | BURST [" + str(burst) + "/" + str(n_total_burst) + "] FRAME [" + str(frame) + "/" + str(n_total_frame) + "]")
print("-----------------------------------------------------------------")
c_lib.freedv_set_sync(freedv,0)
if rx_bursts == N_BURSTS:
receive = False
RECEIVE = threading.Thread(target=receive, name="RECEIVE THREAD")
RECEIVE.start()
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_TX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
mod_out = ctypes.c_short * n_tx_modem_samples
mod_out = mod_out()
mod_out_preamble = ctypes.c_short * (1760*2) #1760 for mode 10,11,12 #4000 for mode 9
mod_out_preamble = mod_out_preamble()
print("BURSTS: " + str(N_BURSTS) + " FRAMES: " + str(N_FRAMES_PER_BURST) )
print("-----------------------------------------------------------------")
for i in range(0,N_BURSTS):
c_lib.freedv_rawdatapreambletx(freedv, mod_out_preamble);
txbuffer = bytearray()
txbuffer += bytes(mod_out_preamble)
for n in range(0,N_FRAMES_PER_BURST):
data_out = bytearray()
data_out += bytes([i+1])
data_out += bytes([N_BURSTS])
data_out += bytes([n+1])
data_out += bytes([N_FRAMES_PER_BURST])
buffer = bytearray(payload_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
crc = ctypes.c_ushort(c_lib.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
c_lib.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and safe it into mod_out pointer
txbuffer += bytes(mod_out)
print("TX | PING | BURST [" + str(i+1) + "/" + str(N_BURSTS) + "] FRAME [" + str(n+1) + "/" + str(N_FRAMES_PER_BURST) + "]")
stream_tx.write(bytes(txbuffer))
ACK_TIMEOUT = time.time() + 3
txbuffer = bytearray()
#time.sleep(DELAY_BETWEEN_BURSTS)
# WAIT UNTIL WE RECEIVD AN ACK/DATAC0 FRAME
while ACK_TIMEOUT >= time.time():
time.sleep(0.01)
time.sleep(1)
stream_tx.close()
p.terminate()

View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 23 07:04:24 2020
@author: DJ2LS
"""
import ctypes
from ctypes import *
import pathlib
import pyaudio
import sys
import logging
import time
import threading
import sys
import argparse
#--------------------------------------------GET PARAMETER INPUTS
parser = argparse.ArgumentParser(description='Simons TEST TNC')
parser.add_argument('--bursts', dest="N_BURSTS", default=0, type=int)
parser.add_argument('--frames', dest="N_FRAMES_PER_BURST", default=0, type=int)
parser.add_argument('--txmode', dest="FREEDV_TX_MODE", default=0, type=int)
parser.add_argument('--rxmode', dest="FREEDV_RX_MODE", default=0, type=int)
parser.add_argument('--audioinput', dest="AUDIO_INPUT", default=0, type=int)
parser.add_argument('--audiooutput', dest="AUDIO_OUTPUT", default=0, type=int)
parser.add_argument('--debug', dest="DEBUGGING_MODE", action="store_true")
args = parser.parse_args()
N_BURSTS = args.N_BURSTS
N_FRAMES_PER_BURST = args.N_FRAMES_PER_BURST
AUDIO_OUTPUT_DEVICE = args.AUDIO_OUTPUT
AUDIO_INPUT_DEVICE = args.AUDIO_INPUT
FREEDV_TX_MODE = args.FREEDV_TX_MODE
FREEDV_RX_MODE = args.FREEDV_RX_MODE
DEBUGGING_MODE = args.DEBUGGING_MODE
# 1024 good for mode 6
AUDIO_FRAMES_PER_BUFFER = 2048
MODEM_SAMPLE_RATE = 8000
#-------------------------------------------- LOAD FREEDV
libname = pathlib.Path().absolute() / "codec2/build_linux/src/libcodec2.so"
c_lib = ctypes.CDLL(libname)
#--------------------------------------------CREATE PYAUDIO INSTANCE
p = pyaudio.PyAudio()
#--------------------------------------------GET SUPPORTED SAMPLE RATES FROM SOUND DEVICE
#AUDIO_SAMPLE_RATE_TX = int(p.get_device_info_by_index(AUDIO_OUTPUT_DEVICE)['defaultSampleRate'])
#AUDIO_SAMPLE_RATE_RX = int(p.get_device_info_by_index(AUDIO_INPUT_DEVICE)['defaultSampleRate'])
AUDIO_SAMPLE_RATE_TX = 8000
AUDIO_SAMPLE_RATE_RX = 8000
#--------------------------------------------OPEN AUDIO CHANNEL RX
stream_tx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_TX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER, #n_nom_modem_samples
output=True,
output_device_index=AUDIO_OUTPUT_DEVICE,
)
stream_rx = p.open(format=pyaudio.paInt16,
channels=1,
rate=AUDIO_SAMPLE_RATE_RX,
frames_per_buffer=AUDIO_FRAMES_PER_BUFFER,
input=True,
input_device_index=AUDIO_INPUT_DEVICE,
)
# GENERAL PARAMETERS
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
def send_pong(burst,n_total_burst,frame,n_total_frame):
data_out = bytearray()
data_out[0:1] = bytes([burst])
data_out[1:2] = bytes([n_total_burst])
data_out[2:3] = bytes([frame])
data_out[4:5] = bytes([n_total_frame])
c_lib.freedv_open.restype = ctypes.POINTER(ctypes.c_ubyte)
freedv = c_lib.freedv_open(FREEDV_TX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
payload_per_frame = bytes_per_frame -2
n_nom_modem_samples = c_lib.freedv_get_n_nom_modem_samples(freedv)
n_tx_modem_samples = c_lib.freedv_get_n_tx_modem_samples(freedv) #get n_tx_modem_samples which defines the size of the modulation object # --> *2
mod_out = ctypes.c_short * n_tx_modem_samples
mod_out = mod_out()
mod_out_preamble = ctypes.c_short * (1760*2) #1760 for mode 10,11,12 #4000 for mode 9
mod_out_preamble = mod_out_preamble()
buffer = bytearray(payload_per_frame) # use this if CRC16 checksum is required ( DATA1-3)
buffer[:len(data_out)] = data_out # set buffersize to length of data which will be send
crc = ctypes.c_ushort(c_lib.freedv_gen_crc16(bytes(buffer), payload_per_frame)) # generate CRC16
crc = crc.value.to_bytes(2, byteorder='big') # convert crc to 2 byte hex string
buffer += crc # append crc16 to buffer
c_lib.freedv_rawdatapreambletx(freedv, mod_out_preamble);
txbuffer = bytearray()
txbuffer += bytes(mod_out_preamble)
data = (ctypes.c_ubyte * bytes_per_frame).from_buffer_copy(buffer)
c_lib.freedv_rawdatatx(freedv,mod_out,data) # modulate DATA and safe it into mod_out pointer
txbuffer += bytes(mod_out)
stream_tx.write(bytes(txbuffer))
txbuffer = bytearray()
# DATA CHANNEL INITIALISATION
freedv = c_lib.freedv_open(FREEDV_RX_MODE)
bytes_per_frame = int(c_lib.freedv_get_bits_per_modem_frame(freedv)/8)
n_max_modem_samples = c_lib.freedv_get_n_max_modem_samples(freedv)
bytes_out = (ctypes.c_ubyte * bytes_per_frame) #bytes_per_frame
bytes_out = bytes_out() #get pointer from bytes_out
receive = True
while receive == True:
time.sleep(0.01)
data_in = b''
nin = c_lib.freedv_nin(freedv)
nin_converted = int(nin*(AUDIO_SAMPLE_RATE_RX/MODEM_SAMPLE_RATE))
if DEBUGGING_MODE == True:
print("-----------------------------")
print("NIN: " + str(nin) + " [ " + str(nin_converted) + " ]")
data_in = stream_rx.read(nin_converted, exception_on_overflow = False)
data_in = data_in.rstrip(b'\x00')
c_lib.freedv_rawdatarx.argtype = [ctypes.POINTER(ctypes.c_ubyte), bytes_out, data_in] # check if really neccessary
nbytes = c_lib.freedv_rawdatarx(freedv, bytes_out, data_in) # demodulate audio
if DEBUGGING_MODE == True:
print("SYNC: " + str(c_lib.freedv_get_rx_status(freedv)))
if nbytes == bytes_per_frame:
burst = bytes_out[0]
n_total_burst = bytes_out[1]
frame = bytes_out[2]
n_total_frame = bytes_out[3]
print("RX | BURST [" + str(burst) + "/" + str(n_total_burst) + "] FRAME [" + str(frame) + "/" + str(n_total_frame) + "] >>> SENDING PONG")
TRANSMIT_PONG = threading.Thread(target=send_pong, args=[burst,n_total_burst,frame,n_total_frame], name="SEND PONG")
TRANSMIT_PONG.start()
c_lib.freedv_set_sync(freedv,0)