resampler class

This commit is contained in:
drowe67 2021-12-16 11:56:39 +10:30 committed by David Rowe
parent b95562740a
commit 84f5cd315e
3 changed files with 57 additions and 31 deletions

View file

@ -29,8 +29,8 @@ FDMDV_OS_TAPS_48_8K = codec2.api.FDMDV_OS_TAPS_48_8K
N8 = int(180) # processing buffer size at 8 kHz
N48 = int(N8*FDMDV_OS_48) # processing buffer size at 48 kHz
MEM8 = int(FDMDV_OS_TAPS_48_8K) # 8kHz signal filter memory
MEM48 = int(FDMDV_OS_TAPS_48K) # 48kHz signal filter memory
MEM8 = FDMDV_OS_TAPS_48_8K # 8kHz signal filter memory
MEM48 = FDMDV_OS_TAPS_48K # 48kHz signal filter memory
FRAMES = int(50) # number of frames to test
FS8 = 8000
FS48 = 48000
@ -42,42 +42,33 @@ FINTER48 = 10000 # interferer frequency at FS=48kHz
# must be an integer multiple of oversampling ratio
assert N8 % FDMDV_OS_48 == 0
in8k = np.zeros(MEM8 + N8, dtype=np.int16)
out48k = np.zeros(N48, dtype=np.int16)
in48k = np.zeros(MEM48 + N48, dtype=np.int16)
out8k = np.zeros(N8, dtype=np.int16)
# time indexes, we advance every frame
t = 0
t1 = 0
# output files to listen to/evaluate result
fin8 = open("in8.raw", mode='wb')
f48 = open("out48.raw", mode='wb')
fout8 = open("out8.raw", mode='wb')
resampler = codec2.resampler(N48,N8)
for f in range(FRAMES):
# input sine wave
in8k[MEM8:] = AMP*np.cos(2*np.pi*np.arange(t,t+N8)*FTEST8/FS8)
sine_in8k = (AMP*np.cos(2*np.pi*np.arange(t,t+N8)*FTEST8/FS8)).astype(np.int16)
t += N8
in8k[MEM8:].tofile(fin8)
# upsample
pin8k,flag = in8k.__array_interface__['data']
pin8k += 2*MEM8
codec2.api.fdmdv_8_to_48_short(out48k.ctypes, pin8k, N8);
out48k.tofile(f48)
sine_in8k.tofile(fin8)
sine_out48k = resampler.resample8_to_48(sine_in8k)
sine_out48k.tofile(f48)
# add interfering sine wave (down sampling filter should remove)
in48k[MEM48:] = out48k + (AMP/2)*np.cos(2*np.pi*np.arange(t1,t1+N48)*FINTER48/FS48)
sine_in48k = (sine_out48k + (AMP/2)*np.cos(2*np.pi*np.arange(t1,t1+N48)*FINTER48/FS48)).astype(np.int16)
t1 += N48
# downsample
pin48k,flag = in48k.__array_interface__['data']
pin48k += 2*MEM48
codec2.api.fdmdv_48_to_8_short(out8k.ctypes, pin48k, N8);
out8k.tofile(fout8)
sine_out8k = resampler.resample48_to_8(sine_in48k)
sine_out8k.tofile(fout8)
fin8.close()
f48.close()
fout8.close()

View file

@ -49,8 +49,10 @@ TIMEOUT = args.TIMEOUT
# AUDIO PARAMETERS
AUDIO_FRAMES_PER_BUFFER = 2048 # seems to be best if >=1024
MODEM_SAMPLE_RATE = codec2.api.FREEDV_FS_8000
AUDIO_SAMPLE_RATE_RX = 8000
assert (AUDIO_SAMPLE_RATE_RX % MODEM_SAMPLE_RATE) == 0
AUDIO_SAMPLE_RATE_RX = 48000
# make sure our resampler will work
assert (AUDIO_SAMPLE_RATE_RX / MODEM_SAMPLE_RATE) == api.FDMDV_OS_48
# check if we want to use an audio device then do an pyaudio init
if AUDIO_INPUT_DEVICE != -1:
@ -109,12 +111,15 @@ nin = codec2.api.freedv_nin(freedv)
while receive and time.time() < timeout:
if AUDIO_INPUT_DEVICE != -1:
data_in = stream_rx.read(AUDIO_FRAMES_PER_BUFFER, exception_on_overflow = False)
data_in48k = stream_rx.read(AUDIO_FRAMES_PER_BUFFER, exception_on_overflow = False)
else:
data_in = sys.stdin.buffer.read(AUDIO_FRAMES_PER_BUFFER*2)
data_in48k = sys.stdin.buffer.read(AUDIO_FRAMES_PER_BUFFER*2)
# resample to 8 kHz
data_in8k = resampler.resample(data_in48k)
# insert samples in buffer
x = np.frombuffer(data_in, dtype=np.int16)
x = np.frombuffer(data_in8k, dtype=np.int16)
if len(x) == 0:
receive = False
audio_buffer.push(x)

View file

@ -123,9 +123,39 @@ api.rx_sync_flags_to_text = [
# resampler ---------------------------------------------------------
api.FDMDV_OS_48 = 6 # oversampling rate
api.FDMDV_OS_TAPS_48K = 48 # number of OS filter taps at 48kHz
api.FDMDV_OS_TAPS_48_8K = (api.FDMDV_OS_TAPS_48K/api.FDMDV_OS_48) # number of OS filter taps at 8kHz
api.FDMDV_OS_48 = int(6) # oversampling rate
api.FDMDV_OS_TAPS_48K = int(48) # number of OS filter taps at 48kHz
api.FDMDV_OS_TAPS_48_8K = int(api.FDMDV_OS_TAPS_48K/api.FDMDV_OS_48) # number of OS filter taps at 8kHz
api.fdmdv_8_to_48_short.argtype = [c_void_p, c_void_p, c_int]
api.fdmdv_48_to_8_short.argtype = [c_void_p, c_void_p, c_int]
class resampler:
# a buffer of int16 samples, using a fixed length numpy array self.buffer for storage
# self.nbuffer is the current number of samples in the buffer
MEM8 = api.FDMDV_OS_TAPS_48_8K
MEM48 = api.FDMDV_OS_TAPS_48K
def __init__(self, n48, n8):
print("create 48<->8 kHz resampler with buffers of %d at 48 kHz and %d at 8 kHz" % (n48, n8))
assert (n48 / n8) == api.FDMDV_OS_48
self.n8 = n8
self.n48 = n48
self.in8 = np.zeros(self.MEM8 + n8, dtype=np.int16)
self.out48 = np.zeros(n48, dtype=np.int16)
self.in48 = np.zeros(self.MEM48 + n48, dtype=np.int16)
self.out8 = np.zeros(n8, dtype=np.int16)
def resample48_to_8(self,in48):
assert in48.dtype == np.int16
assert len(in48) == self.n48
self.in48[self.MEM48:] = in48
pin48,flag = self.in48.__array_interface__['data']
pin48 += 2*self.MEM48
api.fdmdv_48_to_8_short(self.out8.ctypes, pin48, self.n8);
return self.out8
def resample8_to_48(self,in8):
assert in8.dtype == np.int16
assert len(in8) == self.n8
self.in8[self.MEM8:] = in8
pin8,flag = self.in8.__array_interface__['data']
pin8 += 2*self.MEM8
api.fdmdv_8_to_48_short(self.out48.ctypes, pin8, self.n8);
return self.out48