2021-12-20 21:50:02 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
#
|
|
|
|
# tests audio buffer thread safety
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
# pylint: disable=global-statement, invalid-name
|
|
|
|
|
2021-12-20 21:50:02 +00:00
|
|
|
import sys
|
|
|
|
import threading
|
|
|
|
from time import sleep
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
import codec2
|
|
|
|
import numpy as np
|
|
|
|
import pytest
|
|
|
|
|
2021-12-20 21:50:02 +00:00
|
|
|
BUFFER_SZ = 1024
|
2022-05-23 12:26:14 +00:00
|
|
|
N_MAX = 100 # write a repeating sequence of 0....N_MAX-1
|
2022-05-21 23:04:17 +00:00
|
|
|
WRITE_SZ = 10 # different read and write sized buffers
|
|
|
|
READ_SZ = 8
|
|
|
|
NTESTS = 10000
|
2021-12-20 21:50:02 +00:00
|
|
|
|
|
|
|
running = True
|
|
|
|
audio_buffer = codec2.audio_buffer(BUFFER_SZ)
|
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
n_write = 0
|
2021-12-20 21:50:02 +00:00
|
|
|
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
def t_writer():
|
|
|
|
"""
|
|
|
|
Subprocess to handle writes to the NumPY audio "device."
|
|
|
|
"""
|
2021-12-20 21:50:02 +00:00
|
|
|
global n_write
|
|
|
|
print("writer starting")
|
2022-05-21 23:04:17 +00:00
|
|
|
n = 0
|
2021-12-20 21:50:02 +00:00
|
|
|
buf = np.zeros(WRITE_SZ, dtype=np.int16)
|
|
|
|
while running:
|
|
|
|
nfree = audio_buffer.size - audio_buffer.nbuffer
|
|
|
|
if nfree >= WRITE_SZ:
|
2022-05-21 23:04:17 +00:00
|
|
|
for index in range(WRITE_SZ):
|
|
|
|
buf[index] = n
|
2021-12-20 21:50:02 +00:00
|
|
|
n += 1
|
|
|
|
if n == N_MAX:
|
|
|
|
n = 0
|
|
|
|
n_write += 1
|
|
|
|
audio_buffer.push(buf)
|
2022-05-21 23:04:17 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_audiobuffer():
|
|
|
|
"""
|
|
|
|
Test for the audiobuffer
|
|
|
|
"""
|
|
|
|
global running
|
|
|
|
|
|
|
|
# Start the writer in a new thread.
|
|
|
|
writer_thread = threading.Thread(target=t_writer)
|
|
|
|
writer_thread.start()
|
|
|
|
|
|
|
|
n_out = n_read = errors = 0
|
|
|
|
for _ in range(NTESTS):
|
|
|
|
while audio_buffer.nbuffer < READ_SZ:
|
|
|
|
sleep(0.001)
|
|
|
|
for i in range(READ_SZ):
|
|
|
|
if audio_buffer.buffer[i] != n_out:
|
|
|
|
errors += 1
|
|
|
|
n_out += 1
|
|
|
|
if n_out == N_MAX:
|
|
|
|
n_out = 0
|
|
|
|
n_read += 1
|
|
|
|
audio_buffer.pop(READ_SZ)
|
|
|
|
|
|
|
|
print(f"n_write: {n_write} n_read: {n_read} errors: {errors} ")
|
|
|
|
|
|
|
|
# Indirectly stop the thread
|
|
|
|
running = False
|
|
|
|
sleep(0.1)
|
|
|
|
|
|
|
|
assert not writer_thread.is_alive()
|
|
|
|
assert n_write - n_read < BUFFER_SZ
|
|
|
|
assert errors == 0
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
# Run pytest with the current script as the filename.
|
|
|
|
ecode = pytest.main(["-v", sys.argv[0]])
|
|
|
|
if ecode == 0:
|
|
|
|
print("errors: 0")
|
|
|
|
else:
|
|
|
|
print(ecode)
|