Коммит
4a41380183
|
@ -1,2 +1,4 @@
|
|||
# Forest_IR_synthesis
|
||||
Forest impulse response synthesis model, implemented in Python.
|
||||
- /code: python code
|
||||
- /examples: example forest IRs
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
__author__ = "Shoken KANEKO"
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
|
||||
na = np.newaxis
|
||||
join = os.path.join
|
||||
|
||||
pi = np.pi
|
||||
soundVel = 343.0
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
|
@ -0,0 +1,470 @@
|
|||
__author__ = "Shoken KANEKO"
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
from scipy.fft import rfft, irfft
|
||||
from scipy.special import jn, yn
|
||||
import time
|
||||
|
||||
na = np.newaxis
|
||||
join = os.path.join
|
||||
norm = np.linalg.norm
|
||||
|
||||
from Constants import pi, soundVel
|
||||
import Wav
|
||||
import SignalProcessing as sigp
|
||||
from SignalProcessing import addSignal, getImpulse
|
||||
from SignalProcessing import getFreqsForGivenFilterLength
|
||||
from SignalProcessing import getIRFromSpectrum_irfft
|
||||
from SignalProcessing import dists, getDelayedSig, floatX
|
||||
|
||||
def computeAirAbsorptionCoef(
|
||||
freqs,
|
||||
temperature_in_celsius=20.0,
|
||||
pressure_in_kiloPascal=101.325,
|
||||
relativeHumidity_in_percent=50.0):
|
||||
"""
|
||||
freqs: array-like with size [F].
|
||||
frequencies to compute the air absorption coefficient alpha,
|
||||
where the sound pressure amplitude A(x) = A0 * exp(-alpha * x)
|
||||
implementation based on JavaScript code from here:
|
||||
http://resource.npl.co.uk/acoustics/techguides/absorption/ (accessed: 6/10/2020)
|
||||
"""
|
||||
temperature_in_kelvin = 273.15 + temperature_in_celsius
|
||||
temp_ref = 293.15
|
||||
temp_rel = temperature_in_kelvin/temp_ref
|
||||
T_01 = 273.15 + 0.01 # Triple point isotherm temperature in [K]
|
||||
P_ref = 101.325 # Reference atmospheric pressure in [kPa]
|
||||
P_rel = pressure_in_kiloPascal / P_ref
|
||||
P_sat_over_P_ref = 10**((-6.8346 * (T_01 / temperature_in_kelvin)**1.261) + 4.6151)
|
||||
H = relativeHumidity_in_percent * (P_sat_over_P_ref / P_rel)
|
||||
Fro = P_rel * (24 + 40400 * H * (0.02 + H) / (0.391 + H))
|
||||
Frn = P_rel / np.sqrt(temp_rel) * (9 + 280 * H * np.power(np.e, (-4.17 * (np.power(temp_rel, (-1 / 3)) - 1))))
|
||||
Xc = 0.0000000000184 / P_rel * np.sqrt(temp_rel)
|
||||
f2 = freqs**2
|
||||
Xo = 0.01275 * np.power(np.e, (-2239.1 / temperature_in_kelvin)) * np.power((Fro + (f2 / Fro)), -1)
|
||||
Xn = 0.1068 * np.power(np.e, (-3352 / temperature_in_kelvin)) * np.power((Frn + (f2 / Frn)), -1)
|
||||
alpha = 20 * np.log10(np.e) * f2 * (Xc + np.power(temp_rel, (-5 / 2)) * (Xo + Xn))
|
||||
return alpha # [F]
|
||||
|
||||
def getFilterBankForAirAbsorption(ntaps, fs, dists, fftLen):
|
||||
freqs, freqs_half = sigp.getFreqsForGivenFilterLength(ntaps, fs)
|
||||
alphas = computeAirAbsorptionCoef(freqs_half) # [hF]
|
||||
attenuations_dB = alphas[:, na] * dists[na, :] # [hF x M] ... in dB
|
||||
attenuations = sigp.getAmplitudeFromdB(-attenuations_dB)
|
||||
filters = irfft(attenuations, axis=0) # [F=ntaps x M]
|
||||
filters = np.roll(filters, shift=ntaps // 2, axis=0)
|
||||
specs = rfft(filters,n=fftLen,axis=0)
|
||||
return filters, specs # [ntaps x nDists]
|
||||
|
||||
def computeGammaTable(N,a,freqs):
|
||||
# a: tree radius
|
||||
# N: maximum order of Bessel functions
|
||||
# freqs: [F]
|
||||
F = len(freqs)
|
||||
ks = 2*pi*freqs/soundVel # [F]
|
||||
kas = ks*a # [F]
|
||||
ns = np.arange(N+2) # [N+2]
|
||||
jns = [jn(ns,ka) for ka in kas] # [F x N+2]
|
||||
yns = [yn(ns,ka) for ka in kas] # [F x N+2]
|
||||
jns = np.array(jns)
|
||||
yns = np.array(yns)
|
||||
tanGamma_n = np.zeros([F,N+1],dtype=floatX)
|
||||
for n in range(1,N+1):
|
||||
import warnings
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter('ignore')
|
||||
tanGamma_n[:,n] = (jns[:,n-1] - jns[:,n+1])/(yns[:,n-1] - yns[:,n+1])
|
||||
tanGamma_n[:,0] = jns[:,1]/yns[:,1]
|
||||
tanGamma_n[0, :] = 0
|
||||
gamma_n = np.arctan(tanGamma_n) # [F x N+1]
|
||||
sinGamma_n = np.sin(gamma_n)
|
||||
cosGamma_n = np.cos(gamma_n)
|
||||
expiGamma_n = cosGamma_n + 1j * sinGamma_n
|
||||
sinexpiGamma_n = sinGamma_n * expiGamma_n # [F x N+1]
|
||||
sinexpiGamma_n[:,1:] *= 2 # multiplying En
|
||||
sinexpiGamma_n[0,:] = 0
|
||||
return sinexpiGamma_n # [F x N+1], (freq)^-0.5 * En * sinGamma_n * expiGamma_n
|
||||
# the rest:
|
||||
# multiply cosnphi(freq, angle),
|
||||
# sum over all n,
|
||||
# mult (freqs**-0.5)
|
||||
# mult ((2/pi)**0.5 * np.exp(1j*pi*0.25)),
|
||||
# apply delay and multiply 1/dist**0.5
|
||||
|
||||
def computeAngleDependentCylinderScatteringFilter(N, a, freqs, angles_in_radian):
|
||||
# angles_in_radian: [A]
|
||||
gammaTable = computeGammaTable(N,a,freqs) # [F x N+1]
|
||||
ns = np.arange(N+1) # [N+1]
|
||||
cosnphis = np.cos(ns[na,:]*angles_in_radian[:,na]) # [A x N+1]
|
||||
sum = np.sum(gammaTable[:,na,:]*cosnphis[na,:,:], axis=-1) # [F x A]
|
||||
const = ((2/pi)**0.5 * np.exp(1j*pi*0.25))
|
||||
import warnings
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter('ignore')
|
||||
sum = sum * ((freqs**-0.5)*const)[:,na] # [F x A]
|
||||
sum[0,:] = 1e-8
|
||||
return sum
|
||||
# the rest:
|
||||
# apply delay and multiply 1/dist**0.5
|
||||
|
||||
def computeAngleDependentSphereRadiationFilter(maxOrder, a, freqs, angles_in_radian):
|
||||
# angles_in_radian: [A]
|
||||
import scipy.special as sp
|
||||
N = maxOrder
|
||||
ns = np.arange(maxOrder+1) # [N+1]
|
||||
kas = 2*np.pi*freqs/soundVel * a # [F]
|
||||
jnps = np.array([sp.spherical_jn(ns,ka) for ka in kas]) # [F x N+1]
|
||||
ynps = np.array([sp.spherical_yn(ns,ka) for ka in kas]) # [F x N+1]
|
||||
hnps = jnps + 1j * ynps # [F x N+1]
|
||||
coss = np.cos(angles_in_radian) # [A]
|
||||
Pncoss = np.array([sp.lpn(N, z=cos)[0] for cos in coss]) # [A x N+1]
|
||||
numerator = ((2*ns+1)*(-1j)**(ns-1)) * Pncoss # [A x N+1]
|
||||
numerator_LF = (((2*ns+1)*(-1j)**(ns-1)) * Pncoss)[na,:,:] * (kas[:,na]**ns[na,:])[:,na,:] # [F x A x N+1]
|
||||
denominator = (kas**2)[:,na] * hnps # [F x N+1]
|
||||
denominator_LF = 1j*(ns+1)*sp.factorial2(2*ns-1) #/(kas[:,na]**ns[na,:]) # [F x N+1]
|
||||
summand = numerator[na,:,:] / denominator[:,na,:] # [F x A x N+1]
|
||||
summand_LF = numerator_LF / denominator_LF # [F x A x N+1]
|
||||
for idx,ka in enumerate(kas):
|
||||
for n in ns:
|
||||
if ka<=0.01*n:
|
||||
summand[idx,:,n] = summand_LF[idx,:,n]
|
||||
sum = np.sum(summand, axis=-1) # [F x A]
|
||||
return sum # [F x A]
|
||||
|
||||
def getFilterBankForAngleDependentCylinderScattering(
|
||||
treeRad=0.25, fs=24000, steps=128, nAngleBins=180, N=50, fftLen=128*3):
|
||||
a = treeRad
|
||||
freqs, freqs_half = getFreqsForGivenFilterLength(steps, fs)
|
||||
angles_in_radian = np.arange(0,nAngleBins+1) * 180/nAngleBins * pi/180
|
||||
Ns = [N] # 50 is just enough
|
||||
F = len(freqs)
|
||||
A = len(angles_in_radian)
|
||||
filter_NxhFxA = np.zeros((len(Ns),F//2+1,A),dtype=np.complex128)
|
||||
for n,N in enumerate(Ns):
|
||||
filter_hFxA = computeAngleDependentCylinderScatteringFilter(N, a, freqs_half, angles_in_radian)
|
||||
filter_NxhFxA[n,:,:] = filter_hFxA
|
||||
|
||||
filter_FxA = np.zeros([F,A],dtype=np.complex128)
|
||||
filter_FxA[:F//2+1,:] = filter_hFxA[:,:]
|
||||
filter_FxA[F//2+1:,:] = filter_hFxA[1:F//2,:][::-1,:].conj()
|
||||
irs = getIRFromSpectrum_irfft(filter_hFxA) # [T x C]
|
||||
irs = np.roll(irs, shift=steps//2, axis=0)
|
||||
specs = rfft(irs,n=fftLen,axis=0) # [Fh(=2T//2+1=T+1) x nAngles]
|
||||
return irs, specs # [T x nAngles], [T+1 x nAngles]
|
||||
|
||||
def getFilterBankForSourceDirectivity(fs, ntaps, nAngleBins=180, birdHeadRad=0.025, maxOrder=80, fftLen=128, doPlot=0):
|
||||
a = birdHeadRad
|
||||
freqs, freqs_half = getFreqsForGivenFilterLength(ntaps, fs)
|
||||
angles_in_radian = np.arange(0, nAngleBins + 1) * 180 / nAngleBins * pi / 180
|
||||
Ns = [maxOrder]
|
||||
Fh = len(freqs_half)
|
||||
F = len(freqs)
|
||||
A = len(angles_in_radian)
|
||||
filter_NxFhxA = np.zeros((len(Ns), Fh, A), dtype=np.complex128)
|
||||
for n, N in enumerate(Ns):
|
||||
filter_FhxA = computeAngleDependentSphereRadiationFilter(N, a, freqs_half, angles_in_radian)
|
||||
filter_NxFhxA[n, :, :] = filter_FhxA
|
||||
filter_FxA = np.zeros([F, A], dtype=np.complex128)
|
||||
filter_FxA[:F // 2 + 1, :] = filter_FhxA[:, :]
|
||||
filter_FxA[F // 2 + 1:, :] = filter_FhxA[1:F // 2, :][::-1, :].conj()
|
||||
irs = getIRFromSpectrum_irfft(filter_FhxA) # [T x C]
|
||||
irs = np.roll(irs, shift=ntaps // 2, axis=0)
|
||||
rms_0 = sigp.getAweightedRMS(irs[:,0], fs, energy=1)
|
||||
irs = irs / rms_0**0.5
|
||||
specs = rfft(irs, n=fftLen, axis=0) # [Fh(=2T//2+1=T+1) x nAngles]
|
||||
return irs, specs # [T x nAngles], [T+1 x nAngles]
|
||||
|
||||
def getTreePoss(forestRange_x, forestRange_y, nTrees, algo="uniform", seed=1234):
|
||||
|
||||
if algo.lower()=="uniform":
|
||||
np.random.seed(seed)
|
||||
treePoss = np.random.uniform(size=[nTrees,2])
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
assert treePoss.shape[0]==nTrees
|
||||
treePoss[:, 0] *= (forestRange_x[1] - forestRange_x[0])
|
||||
treePoss[:, 1] *= (forestRange_y[1] - forestRange_y[0])
|
||||
treePoss += np.array([forestRange_x[0], forestRange_y[0]])
|
||||
treePoss = np.concatenate([treePoss, np.zeros([nTrees,1])], axis=-1) # [nTrees x 3]
|
||||
return treePoss # [nTrees x 3]
|
||||
|
||||
def getRotMat2D(theta):
|
||||
c = np.cos(theta)
|
||||
s = np.sin(theta)
|
||||
return np.array([[c,-s],[s,c]])
|
||||
|
||||
def getRotMat3D_hori(theta):
|
||||
c = np.cos(theta)
|
||||
s = np.sin(theta)
|
||||
return np.array([[c,-s,0],[s,c,0],[0,0,1]])
|
||||
|
||||
def getCosOf3Pnts(p1,p2,p3):
|
||||
v1 = (p2 - p1)[:2]
|
||||
v2 = (p3 - p2)[:2]
|
||||
cos = v1.dot(v2)/(norm(v1)*norm(v2))
|
||||
return cos
|
||||
|
||||
def getCosOf2Vecs(v1,v2):
|
||||
cos = v1.dot(v2)/(norm(v1)*norm(v2))
|
||||
return cos
|
||||
|
||||
def getCossOf3Pnts(p1,p2s,p3):
|
||||
# p2s: [N x 3]
|
||||
v1 = (p2s[:,:2] - p1[:2]) # [N x 2]
|
||||
v2 = (p3[:2] - p2s[:,:2])
|
||||
coss = np.sum(v1*v2,axis=-1)/(np.sum(v1**2,axis=-1)**0.5 * np.sum(v2**2,axis=-1)**0.5)
|
||||
return coss # [N]
|
||||
|
||||
def getCossOf2Vecs_batch(v1,v2s):
|
||||
# v2s: [N x 3]
|
||||
coss = np.sum(v1*v2s,axis=-1)/(np.sum(v1**2,axis=-1)**0.5 * np.sum(v2s**2,axis=-1)**0.5)
|
||||
return coss # [N]
|
||||
|
||||
def getAndStoreMultipleScatteringFilter(filterBank_treeScattering_TxA, angleBins, dicFilter):
|
||||
if len(angleBins)==1:
|
||||
if str(angleBins) in dicFilter:
|
||||
return dicFilter[str(angleBins)], dicFilter
|
||||
else:
|
||||
dicFilter[str(angleBins)] = filterBank_treeScattering_TxA[:,angleBins[0]]
|
||||
return dicFilter[str(angleBins)], dicFilter
|
||||
if str(angleBins[:-1]) in dicFilter:
|
||||
filter_new = np.convolve(filterBank_treeScattering_TxA[:,angleBins[-1]],dicFilter[str(angleBins[:-1])])
|
||||
dicFilter[str(angleBins)] = filter_new
|
||||
return filter_new, dicFilter
|
||||
else:
|
||||
filter_new, dic_Filter = getAndStoreMultipleScatteringFilter(filterBank_treeScattering_TxA, angleBins[:-1], dicFilter)
|
||||
filter_new = np.convolve(filterBank_treeScattering_TxA[:,angleBins[-1]],filter_new)
|
||||
dicFilter[str(angleBins)] = filter_new
|
||||
return filter_new, dicFilter
|
||||
|
||||
|
||||
def getAirFilterIdx(dist, D):
|
||||
return min(int(dist // 10), D)
|
||||
|
||||
def getAirFilterIdx_batch(dists, Dary):
|
||||
return np.minimum(dists // 10, Dary).astype(int)
|
||||
|
||||
def simulateForestIR(
|
||||
nTrees, posSrc, micPoss, fs, sigLen_in_samples=None, forestRange_x=None, forestRange_y=None, reflectionCoef=2.0,
|
||||
treeRadiationExponent=0.7, applyCylindricalScatteringFilter=1, applyAirAbsorption=1, maxDist=20000.0,
|
||||
maxReflOrder=1, seed=1234, floorReflectionCoef=0.8, ntaps_treeScattering=128, ntaps_airAbsorption=128,
|
||||
ntaps_delay=128, sourceDirectivity=0, ntaps_sourceDirectivity=128, sourceDirectivityVec=None,
|
||||
samplingAlgo="uniform"
|
||||
):
|
||||
# scalable forest reverb generator.
|
||||
assert maxReflOrder<=1,"Higher order scattering is not supported."
|
||||
assert applyAirAbsorption==1
|
||||
assert applyCylindricalScatteringFilter==1
|
||||
print("simulating forest IR...")
|
||||
if forestRange_x is None:
|
||||
forestRange_x = [0.0,1000.0]
|
||||
if forestRange_y is None:
|
||||
forestRange_y = [0.0,1000.0]
|
||||
if nTrees>0:
|
||||
treePoss = getTreePoss(forestRange_x, forestRange_y, nTrees, algo=samplingAlgo, seed=seed)
|
||||
else:
|
||||
treePoss = None
|
||||
if nTrees==0:
|
||||
ntaps_treeScattering = 0
|
||||
dists_src_to_mics_direct = dists(posSrc.reshape([1,3]), micPoss)[0] # [nMics]
|
||||
posSrc_floorMirror = posSrc.copy()
|
||||
posSrc_floorMirror[2] *= -1
|
||||
dists_src_to_mics_floor = dists(posSrc_floorMirror.reshape([1,3]), micPoss)[0] # [nMics]
|
||||
if nTrees>0:
|
||||
dists_src_to_trees = dists(posSrc.reshape([1,3])[:,:2], treePoss[:,:2])[0] # [nTrees]
|
||||
dists_trees_to_mic = dists(micPoss[:,:2], treePoss[:,:2]) # [nMics x nTrees]
|
||||
dists_src_to_mics_reflect = dists_src_to_trees[na,:] + dists_trees_to_mic # [nMics x nTrees]
|
||||
distDiffs_src_to_mics_reflect = dists_src_to_mics_reflect - dists_src_to_mics_direct[:,na] # [nMics x nTrees]
|
||||
distsMult_src_to_mics_reflect = dists_src_to_trees[na,:] * dists_trees_to_mic**treeRadiationExponent # [nMics x nTrees]
|
||||
dists_src_to_mics_reflect_ = dists_src_to_mics_reflect
|
||||
distsMult_src_to_mics_reflect_ = distsMult_src_to_mics_reflect
|
||||
|
||||
domain = "freq"
|
||||
if domain=="freq":
|
||||
fftLen = ntaps_treeScattering + ntaps_airAbsorption + ntaps_delay + sourceDirectivity * ntaps_sourceDirectivity
|
||||
else:
|
||||
fftLen = 128
|
||||
Bt = 100 # batch size in batched path processing
|
||||
assert nTrees%Bt==0, "The number of trees must be "+str(Bt)+"n (n: integer)."
|
||||
if applyCylindricalScatteringFilter==1 and nTrees>0:
|
||||
filterBank_treeScattering_TxA, filterBank_treeScattering_freqDomain_FhxA = \
|
||||
getFilterBankForAngleDependentCylinderScattering(fs=fs, steps=ntaps_treeScattering, fftLen=fftLen)
|
||||
filterBank_treeScattering_TxA *= reflectionCoef
|
||||
filterBank_treeScattering_freqDomain_FhxA *= reflectionCoef
|
||||
filterBank_treeScattering_freqDomain_FhxA_ = filterBank_treeScattering_freqDomain_FhxA
|
||||
|
||||
distBins = np.arange(0,maxDist+10,10)
|
||||
if applyAirAbsorption==1:
|
||||
filterBank_airAbsorption_TxD, filterBank_airAbsorption_freqDomain_FhxD = \
|
||||
getFilterBankForAirAbsorption(ntaps_airAbsorption, fs, dists=distBins, fftLen=fftLen)
|
||||
D = filterBank_airAbsorption_TxD.shape[1]
|
||||
Dary = np.ones([Bt]) * D
|
||||
filterBank_airAbsorption_freqDomain_FhxD_ = filterBank_airAbsorption_freqDomain_FhxD
|
||||
Dary_ = Dary
|
||||
|
||||
if sourceDirectivity==1:
|
||||
filterBank_directivity_TxA, filterBank_directivity_FhxA = getFilterBankForSourceDirectivity(
|
||||
fs, ntaps=ntaps_sourceDirectivity, fftLen=fftLen)
|
||||
|
||||
nMic = len(micPoss)
|
||||
if sigLen_in_samples is None:
|
||||
diagonalLen = ((forestRange_y[1]-forestRange_y[0])**2 + (forestRange_x[1]-forestRange_x[0])**2)**0.5
|
||||
sigLen_in_samples = int(diagonalLen / soundVel * fs)
|
||||
signal = np.zeros([sigLen_in_samples, nMic], dtype=floatX)
|
||||
impulse = getImpulse(1)
|
||||
# treeIndices = np.arange(nTrees, dtype=int)
|
||||
|
||||
T1 = len(signal)
|
||||
posSrc_ = posSrc
|
||||
treePoss_ = treePoss
|
||||
micPoss_ = micPoss
|
||||
|
||||
for m in range(nMic):
|
||||
print("computing ir from src to mic",m,"...")
|
||||
tic = time.time()
|
||||
if nTrees>0:
|
||||
distDiffs_src_to_mic_reflect = distDiffs_src_to_mics_reflect[m,:] # [nTrees]
|
||||
|
||||
# compute direct path
|
||||
direct = impulse
|
||||
# apply air absorption
|
||||
if applyAirAbsorption==1:
|
||||
airFilterIdx = getAirFilterIdx(dists_src_to_mics_direct[m],D)
|
||||
airFilter = filterBank_airAbsorption_TxD[:,airFilterIdx]
|
||||
direct = np.convolve(direct,airFilter)
|
||||
if sourceDirectivity==1:
|
||||
cos_dir = getCosOf2Vecs(sourceDirectivityVec,micPoss[m]-posSrc)
|
||||
cos_dir = min(1,cos_dir)
|
||||
cos_dir = max(-1,cos_dir)
|
||||
angle_dir = np.arccos(cos_dir) * 180/pi
|
||||
angleBin_dir = (angle_dir+0.5).astype(int)
|
||||
dirFilter = filterBank_directivity_TxA[:,angleBin_dir]
|
||||
direct = np.convolve(direct,dirFilter)
|
||||
|
||||
# apply delay and distance attenuation
|
||||
delayedSig_direct_m, delayToAdd = getDelayedSig(direct, fs, delayInSec=0.0, distance=dists_src_to_mics_direct[m])
|
||||
addSignal(signal, delayedSig_direct_m, delayToAdd=delayToAdd, channel=m)
|
||||
|
||||
# compute floor reflection
|
||||
floor = impulse
|
||||
# apply air absorption
|
||||
if applyAirAbsorption==1:
|
||||
airFilterIdx = getAirFilterIdx(dists_src_to_mics_direct[m],D)
|
||||
airFilter = filterBank_airAbsorption_TxD[:,airFilterIdx]
|
||||
floor = airFilter
|
||||
if sourceDirectivity==1:
|
||||
micPos_refl = micPoss[m].copy()
|
||||
micPos_refl[2] *= -1
|
||||
cos_dir = getCosOf2Vecs(sourceDirectivityVec,micPos_refl-posSrc)
|
||||
cos_dir = min(1,cos_dir)
|
||||
cos_dir = max(-1,cos_dir)
|
||||
angle_dir = np.arccos(cos_dir) * 180/pi
|
||||
angleBin_dir = (angle_dir+0.5).astype(int)
|
||||
dirFilter = filterBank_directivity_TxA[:,angleBin_dir]
|
||||
floor = np.convolve(floor, dirFilter)
|
||||
|
||||
# apply delay and distance attenuation
|
||||
delayedSig_floor_m, delayToAdd = getDelayedSig(floor, fs, delayInSec=0.0, distance=dists_src_to_mics_floor[m])
|
||||
addSignal(signal, delayedSig_floor_m * floorReflectionCoef, delayToAdd=delayToAdd, channel=m)
|
||||
|
||||
# compute 1st order scattering by trees
|
||||
if nTrees>0:
|
||||
coss = getCossOf3Pnts(posSrc_, treePoss_, micPoss_[m])
|
||||
coss[coss<-1] = -1
|
||||
coss[coss>1] = 1
|
||||
angles = np.arccos(coss) * 180.0/pi
|
||||
angleBins = (angles+0.5).astype(int)
|
||||
# if np.max()
|
||||
assert applyAirAbsorption==1
|
||||
assert applyCylindricalScatteringFilter==1
|
||||
|
||||
if sourceDirectivity==1:
|
||||
treePoss_[:,2] = micPoss[m,2]
|
||||
coss_dir = getCossOf2Vecs_batch(sourceDirectivityVec, treePoss_ - posSrc)
|
||||
coss_dir[coss_dir<-1] = -1
|
||||
coss_dir[coss_dir>1] = 1
|
||||
angles_dir = np.arccos(coss_dir) * 180 / pi
|
||||
angleBins_dir = (angles_dir + 0.5).astype(int)
|
||||
|
||||
if domain=="time":
|
||||
for t in range(nTrees):
|
||||
# apply tree scattering and air absorption
|
||||
scatteringFilter = filterBank_treeScattering_TxA[:,angleBins[t]] # [T1]
|
||||
airFilter = filterBank_airAbsorption_TxD[:, getAirFilterIdx(dists_src_to_mics_reflect[m,t],D)] # [T2]
|
||||
reflected_m_t = np.convolve(scatteringFilter, airFilter) # [T1+T2-1]
|
||||
# apply delay and distance attenuation
|
||||
delayedSig_reflect_m_t, delayToAdd = getDelayedSig(
|
||||
reflected_m_t, fs, delayInSec=dists_src_to_mics_reflect[m,t]/soundVel, distance=0.0,
|
||||
nIRwithoutDelay=ntaps_delay)
|
||||
delayedSig_reflect_m_t /= distsMult_src_to_mics_reflect[m,t]
|
||||
# add signal to result
|
||||
addSignal(signal, delayedSig_reflect_m_t, delayToAdd=delayToAdd, channel=m)
|
||||
elif domain=="freq":
|
||||
for tHead in range(0,nTrees,Bt):
|
||||
# apply tree scattering and air absorption
|
||||
scatteringFilter_freqDomain = filterBank_treeScattering_freqDomain_FhxA_[:,angleBins[tHead:tHead+Bt]] # [Fh x Bt]
|
||||
airFilter_freqDomain = filterBank_airAbsorption_freqDomain_FhxD_[:, getAirFilterIdx_batch(
|
||||
dists_src_to_mics_reflect_[m,tHead:tHead+Bt],Dary_)] # [Fh x Bt]
|
||||
reflected_m_Bt_freqDomain = scatteringFilter_freqDomain * airFilter_freqDomain # [Fh x Bt]
|
||||
if sourceDirectivity==1:
|
||||
reflected_m_Bt_freqDomain = reflected_m_Bt_freqDomain * filterBank_directivity_FhxA[:,angleBins_dir[tHead:tHead+Bt]]
|
||||
# apply delay
|
||||
delayedSigs_reflect_m_Bt, delaysToAdd = \
|
||||
sigp.getDelayedSig_batch_freqDomain(
|
||||
reflected_m_Bt_freqDomain,
|
||||
fs, dists_src_to_mics_reflect_[m,tHead:tHead+Bt]/soundVel,
|
||||
nIRwithoutDelay=ntaps_delay, fftLen=fftLen) # [T1+T2+T3 x Bt], [Bt]
|
||||
# apply amplitude attenuation
|
||||
delayedSigs_reflect_m_Bt /= distsMult_src_to_mics_reflect_[m, tHead:tHead+Bt]
|
||||
|
||||
T2 = fftLen
|
||||
tailsInSig1 = np.minimum(T1, delaysToAdd + T2)
|
||||
tailsInSig2 = np.minimum(T2, T1 - delaysToAdd)
|
||||
|
||||
t__ = np.where(delaysToAdd<T1)[0]
|
||||
for t in t__:
|
||||
signal[delaysToAdd[t]:tailsInSig1[t], m] += \
|
||||
delayedSigs_reflect_m_Bt[:tailsInSig2[t],t]
|
||||
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
toc = time.time()
|
||||
print(" time:",toc-tic)
|
||||
return signal # [T x mMic]
|
||||
|
||||
|
||||
def getMatDistance(mat1, mat2):
|
||||
diff = mat1 - mat2
|
||||
dist = np.sum(diff * diff.conj()).real ** 0.5
|
||||
return dist
|
||||
|
||||
def computeDistancesOfMatrices(dic1,dic2):
|
||||
# dic: sorted dictionary in which values are ndarrays of same shape
|
||||
len1 = len(dic1)
|
||||
len2 = len(dic2)
|
||||
resultMat = np.zeros([len1,len2],dtype=np.float64)
|
||||
i = 0
|
||||
for k, v in dic1.items():
|
||||
j = 0
|
||||
for l, w in dic2.items():
|
||||
resultMat[i,j] = getMatDistance(v,w)
|
||||
j += 1
|
||||
i += 1
|
||||
return resultMat
|
||||
|
||||
def generateSampleForestIR():
|
||||
fs = 24000
|
||||
ir = simulateForestIR(
|
||||
nTrees=100000,
|
||||
posSrc=np.array([500,500,1.5]),
|
||||
micPoss=np.array([510,500,1.5]).reshape([1,3]),
|
||||
fs=fs,
|
||||
sigLen_in_samples=fs*5)
|
||||
Wav.writeWav(ir, "sampleForestIR.wav", fs, 1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
generateSampleForestIR()
|
|
@ -0,0 +1,12 @@
|
|||
This is a python implementation of the fast forest reverberator described in:
|
||||
*Kaneko, S., Gamper, H., “A fast forest reverberator using single scattering cylinders”. in IEEE 23rd workshop on multimedia signal processing, 2021.*
|
||||
|
||||
Dependencies:
|
||||
- python (tested on Python ver. 3.7.6)
|
||||
- numpy (tested on ver. 1.19.1)
|
||||
- scipy (tested on ver. 1.5.0)
|
||||
- soundfile (tested on ver. 0.10.3.post1)
|
||||
|
||||
Usage:
|
||||
- Run ```ForestReverb.py```
|
||||
... this script will create a wav file containing an example forest IR with a source-receiver distance of 10m in a forest with 100k single scattering cylinders as "sampleForestIR.wav" next to the python script.
|
|
@ -0,0 +1,292 @@
|
|||
__author__ = "Shoken KANEKO"
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
from scipy.signal import stft as spstft
|
||||
from scipy.signal import istft as spistft
|
||||
from scipy.fft import irfft
|
||||
from Constants import soundVel
|
||||
|
||||
na = np.newaxis
|
||||
join = os.path.join
|
||||
floatX = np.float64
|
||||
pi = np.pi
|
||||
|
||||
def stft(sig_TxnCh, fs, winSize, stepSize=None):
|
||||
if stepSize is None:
|
||||
stepSize = winSize//2
|
||||
noverlap = winSize - stepSize
|
||||
assert winSize > stepSize
|
||||
freqs,times,spec_FxnumChxnumSteps = \
|
||||
spstft(sig_TxnCh, fs=fs, window='hann', nperseg=winSize, noverlap=noverlap,
|
||||
nfft=None, detrend=False, return_onesided=False, boundary='zeros', padded=True, axis=0)
|
||||
if sig_TxnCh.ndim==2:
|
||||
spec = np.array(spec_FxnumChxnumSteps).transpose([2,0,1]) # [numSteps x F x numCh] <- [F x numCh x numSteps]
|
||||
elif sig_TxnCh.ndim==1:
|
||||
spec = spec_FxnumChxnumSteps.T # [numSteps x F] <- [F x numSteps]
|
||||
else:
|
||||
raise NotImplementedError
|
||||
return freqs, times, spec
|
||||
|
||||
def istft(spec_TxFxnCh, fs, winSize, stepSize):
|
||||
if stepSize is None:
|
||||
stepSize = winSize//2
|
||||
noverlap = winSize - stepSize
|
||||
assert winSize > stepSize
|
||||
_, sig_TxnCh = spistft(spec_TxFxnCh, fs=fs, window='hann', nperseg=winSize, noverlap=noverlap,
|
||||
nfft=None, input_onesided=False, boundary=True, time_axis=0, freq_axis=1)
|
||||
return sig_TxnCh
|
||||
|
||||
def getZeroPadded(sig, retSigLen):
|
||||
if sig.ndim==1:
|
||||
T = len(sig)
|
||||
assert retSigLen>T
|
||||
ret = np.concatenate([sig, np.zeros([retSigLen - T],dtype=sig.dtype)])
|
||||
elif sig.ndim==2:
|
||||
T,C = sig.shape
|
||||
assert retSigLen>T
|
||||
ret = np.concatenate([sig, np.zeros([retSigLen - T,C],dtype=sig.dtype)],axis=0)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
return ret
|
||||
|
||||
def dist(p1,p2):
|
||||
# p1, p2: [3]
|
||||
return np.sum((p1-p2)**2,axis=-1)**0.5
|
||||
|
||||
def dist_hori(p1,p2):
|
||||
# p1, p2: [3]
|
||||
return np.sum((p1-p2)[:2]**2,axis=-1)**0.5
|
||||
|
||||
def dists(ps1,ps2):
|
||||
# ps1: [nPnts1 x 3]
|
||||
# ps2: [nPnts2 x 3]
|
||||
return np.sum((ps1[:,na,:]-ps2[na,:,:])**2,axis=-1)**0.5 # [nPnts1 x nPnts2]
|
||||
|
||||
def delayInSec(p1,p2):
|
||||
d = dist(p1, p2)
|
||||
return d / soundVel
|
||||
|
||||
def delaysInSec(ps1,ps2):
|
||||
d = dists(ps1, ps2)
|
||||
return d / soundVel # [nPnts1 x nPnts2]
|
||||
|
||||
def delayInSamples(p1,p2,fs):
|
||||
d = dist(p1,p2)
|
||||
return d / soundVel * fs
|
||||
|
||||
def convolve(sig1, sig2, domain="time"):
|
||||
# sig1: [T1] or [T1 x C1]
|
||||
# sig2: [T2] or [T2 x C2]
|
||||
if sig1.ndim == 1:
|
||||
sig1_ = sig1.reshape([len(sig1),1])
|
||||
else:
|
||||
sig1_ = sig1
|
||||
if sig2.ndim == 1:
|
||||
sig2_ = sig2.reshape([len(sig2),1])
|
||||
else:
|
||||
sig2_ = sig2
|
||||
|
||||
if domain=="time":
|
||||
ret = []
|
||||
for c1 in range(sig1_.shape[1]):
|
||||
ret.append([np.convolve(sig1_[:,c1],sig2_[:,c2]) for c2 in range(sig2_.shape[1])])
|
||||
# ret: [c1][c2][T3]
|
||||
return np.array(ret).transpose([2,0,1]) # [T3 x c1 x c2]
|
||||
elif domain=="freq":
|
||||
from scipy.fft import rfft
|
||||
n = len(sig1_) + len(sig2_) - 1
|
||||
sig1_f = rfft(sig1_,n=n,axis=0) # [Fh x c1]
|
||||
sig2_f = rfft(sig2_,n=n,axis=0) # [Fh x c2]
|
||||
conved_f = sig1_f[:,:,na] * sig2_f[:,na,:] # [Fh x c1 x c2]
|
||||
conved = irfft(conved_f,axis=0) # [T x c1 x c2]
|
||||
return conved.real
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def getDelayedSig(sig, fs, delayInSec=None, distance=None, nIRwithoutDelay = 128):
|
||||
# sig: [T]
|
||||
assert sig.ndim==1
|
||||
# set distance to None if only time delay but no amplitude reduction should be applied
|
||||
# set delayInSec to None if delay should be computed from distance
|
||||
if delayInSec is None or delayInSec==0.0:
|
||||
assert distance is not None
|
||||
delayInSec = distance / soundVel
|
||||
if distance is None or distance==0.0:
|
||||
distance = 1.0
|
||||
assert distance != 0
|
||||
delayInSamples = delayInSec * fs
|
||||
delayIRLen = int(delayInSamples + nIRwithoutDelay//2)
|
||||
if delayInSamples > nIRwithoutDelay//2:
|
||||
delayToAdd = int(delayInSamples - nIRwithoutDelay // 2)
|
||||
else:
|
||||
delayToAdd = 0
|
||||
ts = np.arange(start=delayToAdd,stop=delayIRLen)
|
||||
x = pi * (ts - delayInSamples)
|
||||
delayIR = np.sin(x)/(x * distance)
|
||||
if delayIRLen > delayInSamples and delayInSamples - int(delayInSamples) == 0:
|
||||
delayIR[int(delayInSamples) - delayToAdd] = 1.0 / distance
|
||||
delayed = np.convolve(sig, delayIR)
|
||||
return delayed, delayToAdd
|
||||
|
||||
def getDelayedSig_batch(sigs, fs, delaysInSec=None, nIRwithoutDelay = 128):
|
||||
# sigs: [T x C]
|
||||
# delaysInSec, distances: [C]
|
||||
assert sigs.ndim == 2
|
||||
T,C = sigs.shape
|
||||
delaysInSamples = delaysInSec * fs # [C] ... this is a floating point number
|
||||
delaysInSamples_int = delaysInSamples.astype(int)
|
||||
delaysToAdd = delaysInSamples_int - int(nIRwithoutDelay // 2) # [C]
|
||||
effectiveIRLen = nIRwithoutDelay
|
||||
delaysToAdd[delaysInSamples <= nIRwithoutDelay//2] = 0
|
||||
ts = np.arange(effectiveIRLen) # [T]
|
||||
ts = np.tile(ts.reshape([effectiveIRLen,1]),reps=(1,C)) # [T x C]
|
||||
ts = ts + delaysToAdd # [T x C]
|
||||
x = pi * (ts - delaysInSamples) # [T x C]
|
||||
import warnings
|
||||
with warnings.catch_warnings():
|
||||
warnings.simplefilter('ignore')
|
||||
delayIRs = np.sin(x)/x # [T x C]
|
||||
singulars = (x==0) * (delaysInSamples == delaysInSamples_int)
|
||||
delayIRs[singulars] = 1.0
|
||||
delayed = np.array([np.convolve(sigs[:,i], delayIRs[:,i]) for i in range(C)]).T # [T x C]
|
||||
return delayed, delaysToAdd # [T x C], [C]
|
||||
|
||||
def getDelayedSig_batch_freqDomain(sigs, fs, delaysInSec=None, nIRwithoutDelay=128, fftLen=128*3):
|
||||
# sigs: [Fh x C]
|
||||
# delaysInSec, distances: [C]
|
||||
assert sigs.ndim == 2
|
||||
_,C = sigs.shape
|
||||
delaysInSamples = delaysInSec * fs # [C] ... this is a floating point number
|
||||
delaysInSamples_int = delaysInSamples.astype(np.int32)
|
||||
delaysToAdd = delaysInSamples_int - int(nIRwithoutDelay // 2) # [C]
|
||||
effectiveIRLen = nIRwithoutDelay
|
||||
delaysToAdd[delaysInSamples <= nIRwithoutDelay//2] = 0
|
||||
ts = np.arange(effectiveIRLen) # [T]
|
||||
ts = np.tile(ts.reshape([effectiveIRLen,1]),reps=(1,C)) # [T x C]
|
||||
ts = ts + delaysToAdd # [T x C]
|
||||
x = pi * (ts - delaysInSamples) # [T x C]
|
||||
delayIRs = np.sin(x)/x # [T x C]
|
||||
singulars = (x==0) * (delaysInSamples == delaysInSamples_int)
|
||||
delayIRs[singulars] = 1.0
|
||||
from scipy.fft import rfft, irfft
|
||||
delayIRs_freqDomain = rfft(delayIRs,n=fftLen,axis=0) # [Fh x C]
|
||||
delayed_freqDomain = sigs * delayIRs_freqDomain
|
||||
delayed = irfft(delayed_freqDomain, axis=0) # [T x C]
|
||||
return delayed, delaysToAdd # [T x C], [C]
|
||||
|
||||
def getFreqsForGivenFilterLength(nTaps, fs):
|
||||
nFreqBins = nTaps
|
||||
df = fs / nFreqBins
|
||||
freqs = np.arange(nFreqBins) * df
|
||||
freqs_half = freqs[:nFreqBins // 2 + 1]
|
||||
return freqs, freqs_half
|
||||
|
||||
def getHPF(nTaps, fs, fcut):
|
||||
import scipy.signal as spsig
|
||||
assert nTaps%2==1
|
||||
fir = spsig.firwin(nTaps, cutoff=fcut, fs=fs, pass_zero=False)
|
||||
return fir
|
||||
|
||||
def getBPF(nTaps, fs, bandFreq_low, bandFreq_high):
|
||||
import scipy.signal as spsig
|
||||
assert nTaps%2==1
|
||||
fir = spsig.firwin(nTaps,cutoff=[bandFreq_low,bandFreq_high],fs=fs, pass_zero=False)
|
||||
return fir
|
||||
|
||||
def get_dB_from_amplitude(spec_TxF, eps=1e-8):
|
||||
return 20*np.log(np.abs(spec_TxF)+eps)/np.log(10)
|
||||
|
||||
def getAmplitudeFromdB(dB):
|
||||
amp = np.exp(dB*np.log(10)/20)
|
||||
return amp
|
||||
|
||||
def getIRFromSpectrum(spec_FxC):
|
||||
from scipy.fft import ifft
|
||||
ir = ifft(spec_FxC, axis=0).real
|
||||
return ir # [T x C]
|
||||
|
||||
def getIRFromSpectrum_irfft(spec_hFxC):
|
||||
ir = irfft(spec_hFxC, axis=0).real
|
||||
return ir # [T x C]
|
||||
|
||||
def getLongerSignalByRepeating(sig, desiredLen):
|
||||
# sig: [T] or [T x C]
|
||||
T = len(sig)
|
||||
nRep = int(desiredLen/T)+1
|
||||
if sig.ndim==2:
|
||||
rep = np.tile(sig, reps=[nRep, 1])
|
||||
else:
|
||||
rep = np.tile(sig, reps=nRep)
|
||||
assert len(rep)>=desiredLen
|
||||
return rep[:desiredLen]
|
||||
|
||||
def addSignal(sig1, sig2, delayToAdd, channel):
|
||||
# sig1: [T1 x C]
|
||||
# sig2: [T2]
|
||||
# delayToAdd: position in sig1 to start adding sig2
|
||||
assert int(delayToAdd)==delayToAdd
|
||||
T1 = len(sig1)
|
||||
T2 = len(sig2)
|
||||
tailInSig1 = min(T1, delayToAdd + T2)
|
||||
tailInSig2 = min(T2, T1 - delayToAdd)
|
||||
assert channel is not None
|
||||
if delayToAdd<T1:
|
||||
sig1[delayToAdd:tailInSig1, channel] += sig2[0:tailInSig2]
|
||||
|
||||
def getSoundPressure_in_Pascal_from_dBSPL(dBSPL):
|
||||
return getAmplitudeFromdB(dBSPL)*0.00002
|
||||
|
||||
def getSoundPressureLevel_in_dBSPL_from_Pascal(pa):
|
||||
return 20*np.log10(pa/0.00002)
|
||||
|
||||
def getSignal_scaled_to_desired_SPL_Aweighted(sig, fs, spl_in_dB):
|
||||
rms_A = getAweightedRMS(sig, fs)
|
||||
rms_A_desired = getSoundPressure_in_Pascal_from_dBSPL(spl_in_dB)
|
||||
return sig * rms_A_desired/rms_A
|
||||
|
||||
def getSNR(sig,noise,fs,doAweighting=0):
|
||||
if doAweighting==0:
|
||||
sigRMS = getRMS(sig)
|
||||
noiseRMS = getRMS(noise)
|
||||
else:
|
||||
sigRMS = getAweightedRMS(sig,fs)
|
||||
noiseRMS = getAweightedRMS(noise,fs)
|
||||
snr = 20*np.log10(sigRMS/noiseRMS)
|
||||
return snr
|
||||
|
||||
def getRMS(sig_t):
|
||||
return np.mean(sig_t**2)**0.5
|
||||
|
||||
def getEnergy(sig_t):
|
||||
return np.sum(sig_t**2)
|
||||
|
||||
def getLinearGainCoefsOfAweighting(freqs):
|
||||
import librosa
|
||||
weights = librosa.core.A_weighting(freqs) # in deciBell
|
||||
weights_lin = getAmplitudeFromdB(weights)
|
||||
return weights_lin
|
||||
|
||||
def getAweightedRMS(sig_t, fs, energy=0):
|
||||
assert sig_t.ndim==1
|
||||
import scipy as sp
|
||||
sig_f = sp.fft.rfft(sig_t)
|
||||
nfft = len(sig_t)
|
||||
freqs = sp.fft.rfftfreq(nfft, 1/fs)
|
||||
weights_lin = getLinearGainCoefsOfAweighting(freqs)
|
||||
assert len(freqs)==nfft//2+1
|
||||
sig_f_weighted = sig_f * weights_lin # [F]
|
||||
sig_t_weighted = sp.fft.irfft(sig_f_weighted)
|
||||
if energy==1:
|
||||
return getEnergy(sig_t_weighted)
|
||||
rms = getRMS(sig_t_weighted)
|
||||
return rms
|
||||
|
||||
def getImpulse(sigLen):
|
||||
# create src signal
|
||||
srcSig = np.zeros(sigLen, dtype=np.float64)
|
||||
srcSig[0] = 1
|
||||
return srcSig
|
||||
|
||||
if __name__=="__main__":
|
||||
pass
|
|
@ -0,0 +1,26 @@
|
|||
__author__ = "Shoken KANEKO"
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
import soundfile as sf
|
||||
|
||||
na = np.newaxis
|
||||
join = os.path.join
|
||||
|
||||
def writeWav(ary, path, fs, normalize=0):
|
||||
# ary: [T x nChannels] or [T]
|
||||
if normalize==1:
|
||||
maxAmp = np.max(np.abs(ary))
|
||||
ary_ = ary / maxAmp
|
||||
else:
|
||||
maxAmp = 1.0
|
||||
ary_ = ary
|
||||
if ary_.ndim==1:
|
||||
ary_ = ary_.reshape([len(ary_),1])
|
||||
_,ext = os.path.splitext(path)
|
||||
sf.write(path,ary_,fs,subtype="FLOAT",format=ext[1:].upper())
|
||||
gainFactor = 1.0/maxAmp
|
||||
return gainFactor
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
Загрузка…
Ссылка в новой задаче