mirror of
https://github.com/DJ2LS/FreeDATA
synced 2024-05-14 10:04:33 +02:00
audio buffer thread sfaety test
This commit is contained in:
parent
2cc319c857
commit
342dd29747
|
@ -22,6 +22,12 @@ set(FRAMESPERBURST 3)
|
|||
set(BURSTS 1)
|
||||
set(TESTFRAMES 3)
|
||||
|
||||
add_test(NAME audio_buffer
|
||||
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
|
||||
cd ${CMAKE_CURRENT_SOURCE_DIR}/test;
|
||||
python3 test_audiobuffer.py")
|
||||
set_tests_properties(audio_buffer PROPERTIES PASS_REGULAR_EXPRESSION "errors: 0")
|
||||
|
||||
add_test(NAME resampler
|
||||
COMMAND sh -c "export LD_LIBRARY_PATH=${CODEC2_BUILD_DIR}/src;
|
||||
cd ${CMAKE_CURRENT_SOURCE_DIR}/test;
|
||||
|
|
59
test/test_audiobuffer.py
Normal file
59
test/test_audiobuffer.py
Normal file
|
@ -0,0 +1,59 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# tests audio buffer thread safety
|
||||
|
||||
import sys
|
||||
sys.path.insert(0,'..')
|
||||
from tnc import codec2
|
||||
import threading
|
||||
import numpy as np
|
||||
from time import sleep
|
||||
|
||||
BUFFER_SZ = 1024
|
||||
N_MAX = 100 # write a repeating sequence of 0..N_MAX-1
|
||||
WRITE_SZ = 10 # different read and write sized buffers
|
||||
READ_SZ = 8
|
||||
NTESTS = 10000
|
||||
|
||||
running = True
|
||||
audio_buffer = codec2.audio_buffer(BUFFER_SZ)
|
||||
|
||||
n_write = int(0)
|
||||
n_read = int(0)
|
||||
|
||||
def writer():
|
||||
global n_write
|
||||
print("writer starting")
|
||||
n = int(0)
|
||||
buf = np.zeros(WRITE_SZ, dtype=np.int16)
|
||||
while running:
|
||||
nfree = audio_buffer.size - audio_buffer.nbuffer
|
||||
if nfree >= WRITE_SZ:
|
||||
for i in range(0,WRITE_SZ):
|
||||
buf[i] = n;
|
||||
n += 1
|
||||
if n == N_MAX:
|
||||
n = 0
|
||||
n_write += 1
|
||||
audio_buffer.push(buf)
|
||||
|
||||
x = threading.Thread(target=writer)
|
||||
x.start()
|
||||
|
||||
n_out = int(0)
|
||||
errors = int(0)
|
||||
for tests in range(1,NTESTS):
|
||||
while audio_buffer.nbuffer < READ_SZ:
|
||||
sleep(0.001)
|
||||
for i in range(0,READ_SZ):
|
||||
if audio_buffer.buffer[i] != n_out:
|
||||
errors += 1
|
||||
n_out += 1
|
||||
if n_out == N_MAX:
|
||||
n_out = 0
|
||||
n_read += 1
|
||||
audio_buffer.pop(READ_SZ)
|
||||
|
||||
running = False
|
||||
print("n_write: %d n_read: %d errors: %d " % (n_write, n_read, errors))
|
Loading…
Reference in a new issue