imptube.tube
In this module, three main classes are defined - Measurement, Tube and Sample.
1'''In this module, three main classes are defined - Measurement, Tube and Sample. 2''' 3 4import sys 5import sounddevice as sd 6import numpy as np 7import pandas as pd 8import soundfile as sf 9import os 10from scipy.io import wavfile 11from scipy.signal import chirp 12from scipy.signal.windows import hann 13from time import sleep, strftime 14from imptube.utils import make_foldertree 15from imptube.processing import ( 16 calibration_from_files, 17 transfer_function_from_path, 18 alpha_from_path, 19 harmonic_distortion_filter, 20 calc_rms_pressure_level 21) 22from typing import Protocol 23import logging 24 25 26class Measurement: 27 """Contains information about measurement from the perspective of 28 signal and boundary conditions. 29 30 Attributes 31 ---------- 32 fs : int 33 measurement sampling frequency 34 channels_in : list[int] 35 list of input channel numbers 36 channels_out : list[int] 37 list of output channel numbers (usually one member list) 38 device : str 39 string specifying part of sound card name 40 List of available devices can be obtained with 41 `python3 -m sounddevice` command. 42 samples : int 43 number of samples in the generated log sweep 44 typically 2**n 45 window_len : int 46 length of the Hann half-window applied to the ends of the sweep 47 sub_measurements : int 48 number of measurements taken for each specimen 49 Normally, no differences between sweep measurements should occur, 50 this attribute mainly compensates for potential playback artifacts. 51 f_low : int 52 lower frequency limit for the generated sweep 53 f_high : int 54 higher frequency limit for the generated sweep 55 fs_to_spl : float 56 conversion level from dBFS to dB SPL for microphone 1 57 sweep_lvl : float 58 level of the sweep in dBFS 59 """ 60 61 def __init__( 62 self, 63 fs : int=48000, 64 channels_in : list[int]=[1,2], 65 channels_out : list[int]=[1], 66 device : str='Scarlett', 67 samples : int=131072, 68 window_len : int=8192, 69 sub_measurements : int=2, 70 f_low : int=10, 71 f_high : int=1000, 72 fs_to_spl : float=130, 73 sweep_lvl : float=-6 74 ): 75 self.fs = fs 76 self.channels_in = channels_in 77 self.channels_out = channels_out 78 self.device = device 79 self.samples = samples 80 self.window_len = window_len 81 self.sub_measurements = sub_measurements 82 self.f_limits = [f_low, f_high] 83 self.fs_to_spl = fs_to_spl 84 self.sweep_lvl = sweep_lvl 85 86 self.boundary_df = pd.DataFrame({"fs_to_spl": [fs_to_spl]}) 87 self.boundary_df.to_csv( 88 strftime("%y-%m-%d_%H-%M") + "_lvl_calib.csv" 89 ) 90 91 self.make_sweep() 92 sd.default.samplerate = fs 93 sd.default.channels = len(channels_in), len(channels_out) 94 sd.default.device = device 95 96 97 def make_sweep(self, windows=True) -> np.ndarray: 98 """Generates numpy array with log sweep. 99 100 Parameters 101 ---------- 102 fs : int 103 measurement sampling frequency 104 samples : int 105 number of samples in the generated log sweep 106 typically 2**n 107 window_len : int 108 length of the Hann half-window applied to the ends 109 of the sweep 110 f_low : int 111 lower frequency limit for the generated sweep 112 f_high : int 113 higher frequency limit for the generated sweep 114 115 Returns 116 ------- 117 log_sweep : np.ndarray 118 numpy array containing mono log sweep 119 """ 120 t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32) 121 122 half_win = int(self.window_len) 123 log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90) 124 125 if windows: 126 window = hann(int(self.window_len*2)) 127 log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win] 128 log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:] 129 130 lvl_to_factor = 10**(self.sweep_lvl/20) 131 log_sweep = log_sweep*lvl_to_factor 132 133 self.sweep = log_sweep 134 return log_sweep 135 136 def regen_sweep(self): 137 """Regenerates the sweep.""" 138 self.make_sweep() 139 140 def update_sweep_lvl(self): 141 """Updates the sweep level.""" 142 self.sweep = self.sweep/np.max(np.abs(self.sweep)) 143 self.sweep = self.sweep * 10**(self.sweep_lvl/20) 144 145 def filter_sweep( 146 self, 147 rfft_incident_pressure: np.ndarray, 148 f_limits=(10, 400), 149 ) -> np.ndarray: 150 """Filters the sweep with respect to incident pressure measured beforehand. 151 152 Parameters 153 ---------- 154 rfft_incident_pressure : np.ndarray 155 rfft of incident pressure 156 f_lim : tuple[int, int] 157 frequency limits for the filtering 158 159 Returns 160 ------- 161 filtered_sweep : np.ndarray 162 filtered sweep 163 """ 164 # generate sweep without windows 165 sweep_wo_win = self.make_sweep(windows=False) 166 167 # apply blackman window to the ends of the sweep 168 win = np.blackman(self.window_len//4) 169 sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2] 170 sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:] 171 172 # calculate rfft of the sweep 173 rfft_sweep = np.fft.rfft(sweep_wo_win) 174 rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs) 175 176 # find indices of frequency limits 177 f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0])) 178 f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1])) 179 180 # calculate amplitude of the filtered sweep spectrum 181 amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure) 182 filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep)) 183 184 # construct and apply filter based on blackman windows 185 filt = np.blackman(50) 186 filter = np.ones_like(filtered_sweep_spectrum) 187 filter[:f_low_idx] = 0 188 filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2] 189 filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:] 190 filter[-f_high_idx:] = 0 191 filtered_sweep_spectrum *= filter 192 193 # calculate ifft of the filtered sweep spectrum 194 filtered_sweep = np.fft.irfft(filtered_sweep_spectrum) 195 196 # apply hanning window to the ends of the filtered sweep 197 window = np.hanning(self.window_len) 198 filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2] 199 filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:] 200 201 # normalize the filtered sweep 202 filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep)) 203 204 # apply level to the filtered sweep based on measurement level 205 lvl_to_factor = 10**(self.sweep_lvl/20) 206 filtered_sweep = filtered_sweep*lvl_to_factor 207 208 self.sweep = filtered_sweep 209 return filtered_sweep 210 211 def measure(self, 212 out_path : str='', 213 thd_filter : bool=True, 214 export : bool=True 215 ) -> tuple[np.ndarray, int]: 216 """Performs measurement and saves the recording. 217 218 Parameters 219 ---------- 220 out_path : str 221 path where the recording should be saved, including filename 222 thd_filter : bool 223 enables harmonic distortion filtering 224 This affects the files saved. 225 export : bool 226 enables export to specified path 227 228 Returns 229 ------- 230 data : np.ndarray 231 measured audio data 232 fs : int 233 sampling rate 234 """ 235 data = sd.playrec( 236 self.sweep, 237 input_mapping=self.channels_in, 238 output_mapping=self.channels_out, 239 dtype=np.float32) 240 sd.wait() 241 data = np.asarray(data) 242 243 #filtration 244 if thd_filter: 245 data = data.T 246 data = harmonic_distortion_filter( 247 data, 248 self.sweep, 249 f_low=self.f_limits[0], 250 f_high=self.f_limits[1] 251 ) 252 data = data.T 253 254 if export: 255 sf.write( 256 file=out_path, 257 data=data, 258 samplerate=self.fs, 259 format='WAV', 260 subtype='FLOAT' 261 ) 262 263 return data, self.fs 264 265 def calc_incident_pressure_filter( 266 self, 267 spectrum: np.ndarray, 268 r: np.ndarray, 269 f: np.ndarray, 270 distance: float, 271 speed_of_sound: float = 343, 272 f_limits=(10, 400) 273 ) -> np.ndarray: 274 """ 275 Calculates incident pressure filter based on the measured 276 spectrum and reflection factor. 277 Such filter can be used to filter the input sweep 278 to compensate for the loudspeaker frequency response. 279 280 Parameters 281 ---------- 282 spectrum : np.ndarray 283 measured spectrum 284 r : np.ndarray 285 reflection factor 286 f : np.ndarray 287 frequency values 288 distance : float 289 distance between the sample and the microphone 290 speed_of_sound : float 291 speed of sound in air 292 f_limits : tuple[int, int] 293 frequency limits for the filtering 294 295 Returns 296 ------- 297 incident_pressure : np.ndarray 298 incident pressure filter 299 """ 300 def calculate_incident_pressure( 301 pressure: np.ndarray, 302 reflection_factor: np.ndarray, 303 distance: float, 304 wavenumber: np.ndarray 305 ): 306 """ 307 Calculates incident pressure based on the measured 308 spectrum and the reflection factor. 309 310 Parameters 311 ---------- 312 pressure : np.ndarray 313 measured pressure 314 reflection_factor : np.ndarray 315 reflection factor 316 distance : float 317 distance between the sample and the microphone 318 wavenumber : np.ndarray 319 wavenumber 320 321 Returns 322 ------- 323 incident_pressure : np.ndarray 324 incident pressure filter 325 """ 326 return pressure / ( 327 np.exp(-1j * wavenumber * distance) 328 + reflection_factor * np.exp(1j * wavenumber * distance) 329 ) 330 331 sweep_spectrum = np.fft.rfft(self.sweep.copy()) 332 # calculate incident pressure 333 f_low_idx = np.argmin(np.abs(f-f_limits[0])) 334 f_high_idx = np.argmin(np.abs(f-f_limits[1])) 335 incident_pressure = calculate_incident_pressure( 336 pressure=spectrum, 337 reflection_factor=r[f_low_idx:f_high_idx], 338 distance=distance, 339 wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound 340 ) 341 342 #extend incident pressure to have the same length as sweep_spectrum 343 incident_pressure = np.concatenate([ 344 np.ones(f_low_idx)*incident_pressure[0], 345 incident_pressure 346 ]) 347 incident_pressure = np.concatenate([ 348 incident_pressure, 349 np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1] 350 ]) 351 352 # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples 353 incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure)) 354 # normalize incident by the actual amplitude of the sweep used in the measurement 355 incident_pressure = incident_pressure / np.abs(sweep_spectrum) 356 return incident_pressure 357 358class Tube: 359 """Class representing tube geometry. 360 361 Attributes 362 ---------- 363 further_mic_dist : float 364 further microphone distance from sample 365 closer_mic_dist : float 366 closer mic distance from sample 367 freq_limit : int 368 higher frequency limit for exports 369 """ 370 def __init__(self, 371 further_mic_dist : float=0.400115, #x_1 372 closer_mic_dist : float=0.101755, #x_2 373 freq_limit : int=2000, 374 ): 375 self.further_mic_dist = further_mic_dist 376 self.closer_mic_dist = closer_mic_dist 377 self.mic_spacing = further_mic_dist - closer_mic_dist 378 self.freq_limit = freq_limit 379 380class Sample: 381 """A class representing sample and its boundary conditions. 382 383 Attributes 384 ---------- 385 name : str 386 name of the sample 387 temperature : float 388 ambient temperature in degC 389 rel_humidity : float 390 ambient relative humidity in % 391 tube : Tube 392 impedance tube definition object 393 timestamp : str 394 strftime timestamp in a format '%y-%m-%d_%H-%M' 395 folder : str 396 path to project data folder, defaults to "data" 397 """ 398 def __init__(self, 399 name : str, 400 temperature : float, 401 rel_humidity : float, 402 atm_pressure : float = 101325, 403 tube : Tube=Tube(), 404 timestamp : str = strftime("%y-%m-%d_%H-%M"), 405 folder = "data", 406 ): 407 self.name = name 408 self.timestamp = timestamp 409 self.temperature = temperature 410 self.atm_pressure = atm_pressure 411 self.rel_humidity = rel_humidity 412 self.tube = tube 413 self.folder = folder 414 self.trees = make_foldertree( 415 self.name, 416 self.folder, 417 self.timestamp 418 ) 419 bound_dict = { 420 'temp': [self.temperature], 421 'RH': [self.rel_humidity], 422 'atm_pressure': [self.atm_pressure], 423 'x1': [self.tube.further_mic_dist], 424 'x2': [self.tube.closer_mic_dist], 425 'lim': [self.tube.freq_limit] 426 } 427 self.boundary_df = pd.DataFrame(bound_dict) 428 self.boundary_df.to_csv( 429 os.path.join( 430 self.trees[2],self.trees[1]+"_bound_cond.csv" 431 ) 432 ) 433 434 def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"): 435 """Migrates calibration files from different measurement. 436 437 Parameters 438 ---------- 439 cal_name : str 440 calibration sample name 441 cal_stamp : str 442 calibration sample timestamp i a '%y-%m-%d_%H-%M' format 443 cal_parent : str 444 parent data folder, defaults to 'data' 445 """ 446 cal_trees = make_foldertree( 447 variant=cal_name, 448 time_stamp=cal_stamp, 449 parent=cal_parent 450 ) 451 cal_parent_folder = cal_trees[2] 452 import_folder = cal_trees[3][1] 453 freqs = np.load( 454 os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy") 455 ) #freq import 456 cf = np.load( 457 os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy") 458 ) #cf import 459 460 parent_folder = self.trees[2] 461 export_folder = self.trees[3][1] 462 np.save( 463 os.path.join(parent_folder, self.trees[1]+"_freqs.npy"), 464 freqs 465 ) #freq export 466 np.save( 467 os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"), 468 cf 469 ) #cf export 470 471def calibration( 472 sample : Sample, 473 measurement : Measurement, 474 thd_filter : bool=True, 475 export : bool=True, 476 noise_filter : bool=False, 477 ) -> tuple[np.ndarray, np.ndarray]: 478 """Performs CLI calibration measurement. 479 480 Parameters 481 ---------- 482 483 sample : imptube.tube.Sample 484 485 measurement : Measurement 486 487 thd_filter : bool 488 Enables harmonic distortion filtering 489 """ 490 caltree = sample.trees[3][0] 491 if not os.path.exists(caltree): 492 os.makedirs(caltree) 493 494 m = measurement 495 running = True 496 while running: 497 for c in range(1, 3): 498 ready = input(f"Calibrate in configuration {c}? [Y/n]") 499 if ready.lower() == "n": 500 break 501 else: 502 for s in range(m.sub_measurements): 503 f = os.path.join(caltree, sample.trees[1]+f"_cal_wav_conf{c}_{s}.wav") 504 print(f) 505 m.measure(f, thd_filter=thd_filter) 506 sleep(0.5) 507 if input("Repeat calibration process? [y/N]").lower() == "y": 508 continue 509 else: 510 running = False 511 input("Move the microphones to original position before measurement!") 512 513 cal = calibration_from_files(parent_folder=sample.trees[2], export=export, noise_filter=noise_filter) 514 515 return cal 516 517def single_measurement( 518 sample : Sample, 519 measurement : Measurement, 520 depth : float, 521 thd_filter : bool= True, 522 calc_spl : bool = True 523 ) -> tuple[list[np.ndarray], int]: 524 """Performs measurement. 525 526 Parameters 527 ---------- 528 529 sample : imptube.tube.Sample 530 531 measurement : Measurement 532 533 depth : float 534 current depth of the sample 535 thd_filter : bool 536 Enables harmonic distortion filtering 537 538 Returns 539 ------- 540 sub_measurement_data : list[np.ndarray] 541 list of audio recordings taken 542 fs : float 543 sampling rate of the recording 544 """ 545 m = measurement 546 sub_measurement_data = [] 547 for s in range(m.sub_measurements): 548 f = os.path.join(sample.trees[4][0], sample.trees[1]+f"_wav_d{depth}_{s}.wav") 549 data, fs = m.measure(f, thd_filter=thd_filter) 550 sub_measurement_data.append(data) 551 sleep(0.5) 552 553 if calc_spl: 554 rms_spl = calc_rms_pressure_level(data.T[0], m.fs_to_spl) 555 logging.info(f"RMS SPL: {rms_spl} dB") 556 m.rms_spl = rms_spl 557 return sub_measurement_data, fs 558 559def calculate_alpha( 560 sample : Sample, 561 return_r : bool = False, 562 return_z : bool = False, 563 noise_filter : bool = False 564 ) -> tuple[np.ndarray, np.ndarray]: 565 """Performs transfer function and alpha calculations from audio data 566 found in a valid folder structure. 567 568 Parameters 569 ---------- 570 sample : Sample 571 572 Returns 573 ------- 574 alpha : np.ndarray 575 sound absorption coefficient for frequencies lower than 576 limit specified in sample.tube.freq_limit 577 freqs : np.ndarray 578 frequency values for the alpha array 579 """ 580 sample.unique_d, sample.tfs = transfer_function_from_path(sample.trees[2], noise_filter=noise_filter) 581 results = alpha_from_path( 582 sample.trees[2], 583 return_f=True, 584 return_r=return_r, 585 return_z=return_z 586 ) 587 return results 588 589class Sensor(Protocol): 590 """A protocol for Sensor class implementation.""" 591 def read_temperature(self) -> float: 592 ... 593 594 def read_humidity(self) -> float: 595 ... 596 597 def read_pressure(self) -> float: 598 ... 599 600def read_env_bc(sensor : Sensor) -> tuple[float, float, float]: 601 for i in range(5): 602 try: 603 temperature = sensor.read_temperature() 604 rel_humidity = sensor.read_humidity() 605 atm_pressure = sensor.read_pressure() 606 break 607 except: 608 print(f"Reading {i+1} not succesful.") 609 if i == 4: 610 print("Unable to read data from sensor, try manually enter temperature and RH on initialization.") 611 sys.exit() 612 return temperature, rel_humidity, atm_pressure 613 614def calculate_spectrum( 615 sample: Sample, 616 substring: str, 617 f_limits=(10, 400) 618): 619 audio_files = os.listdir(sample.trees[4][0]) 620 filtered_files = [f for f in audio_files if substring in f] 621 audio_data = [] 622 for f in filtered_files: 623 fs, data = wavfile.read(f"{sample.trees[4][0]}/{f}") 624 audio_data.append(data.T[0]) 625 audio_data = np.array(audio_data) 626 audio_data = np.mean(audio_data, axis=0) 627 628 audio_spectrum = np.fft.rfft(audio_data) 629 audio_freqs = np.fft.rfftfreq(len(audio_data), d=1/fs) 630 flow_idx = np.argmin(np.abs(audio_freqs-f_limits[0])) 631 fhigh_idx = np.argmin(np.abs(audio_freqs-f_limits[1])) 632 audio_spectrum = audio_spectrum[flow_idx:fhigh_idx] 633 audio_freqs = audio_freqs[flow_idx:fhigh_idx] 634 return audio_spectrum, audio_freqs 635 # TODO save bc as config file... 636 # bound_dict = { 637 # 'temp': [self.temperature], 638 # 'RH': [self.RH], 639 # 'x1': [self.x_1], 640 # 'x2': [self.x_2], 641 # 'lim': [self.limit], 642 # } 643 # self.boundary_df = pd.DataFrame(bound_dict) 644 # self.trees = make_foldertree(self.name, self.folder) 645 # self.boundary_df.to_csv( 646 # os.path.join( 647 # self.trees[2],self.trees[1]+"_bound_cond.csv" 648 # ) 649 # ) 650
27class Measurement: 28 """Contains information about measurement from the perspective of 29 signal and boundary conditions. 30 31 Attributes 32 ---------- 33 fs : int 34 measurement sampling frequency 35 channels_in : list[int] 36 list of input channel numbers 37 channels_out : list[int] 38 list of output channel numbers (usually one member list) 39 device : str 40 string specifying part of sound card name 41 List of available devices can be obtained with 42 `python3 -m sounddevice` command. 43 samples : int 44 number of samples in the generated log sweep 45 typically 2**n 46 window_len : int 47 length of the Hann half-window applied to the ends of the sweep 48 sub_measurements : int 49 number of measurements taken for each specimen 50 Normally, no differences between sweep measurements should occur, 51 this attribute mainly compensates for potential playback artifacts. 52 f_low : int 53 lower frequency limit for the generated sweep 54 f_high : int 55 higher frequency limit for the generated sweep 56 fs_to_spl : float 57 conversion level from dBFS to dB SPL for microphone 1 58 sweep_lvl : float 59 level of the sweep in dBFS 60 """ 61 62 def __init__( 63 self, 64 fs : int=48000, 65 channels_in : list[int]=[1,2], 66 channels_out : list[int]=[1], 67 device : str='Scarlett', 68 samples : int=131072, 69 window_len : int=8192, 70 sub_measurements : int=2, 71 f_low : int=10, 72 f_high : int=1000, 73 fs_to_spl : float=130, 74 sweep_lvl : float=-6 75 ): 76 self.fs = fs 77 self.channels_in = channels_in 78 self.channels_out = channels_out 79 self.device = device 80 self.samples = samples 81 self.window_len = window_len 82 self.sub_measurements = sub_measurements 83 self.f_limits = [f_low, f_high] 84 self.fs_to_spl = fs_to_spl 85 self.sweep_lvl = sweep_lvl 86 87 self.boundary_df = pd.DataFrame({"fs_to_spl": [fs_to_spl]}) 88 self.boundary_df.to_csv( 89 strftime("%y-%m-%d_%H-%M") + "_lvl_calib.csv" 90 ) 91 92 self.make_sweep() 93 sd.default.samplerate = fs 94 sd.default.channels = len(channels_in), len(channels_out) 95 sd.default.device = device 96 97 98 def make_sweep(self, windows=True) -> np.ndarray: 99 """Generates numpy array with log sweep. 100 101 Parameters 102 ---------- 103 fs : int 104 measurement sampling frequency 105 samples : int 106 number of samples in the generated log sweep 107 typically 2**n 108 window_len : int 109 length of the Hann half-window applied to the ends 110 of the sweep 111 f_low : int 112 lower frequency limit for the generated sweep 113 f_high : int 114 higher frequency limit for the generated sweep 115 116 Returns 117 ------- 118 log_sweep : np.ndarray 119 numpy array containing mono log sweep 120 """ 121 t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32) 122 123 half_win = int(self.window_len) 124 log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90) 125 126 if windows: 127 window = hann(int(self.window_len*2)) 128 log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win] 129 log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:] 130 131 lvl_to_factor = 10**(self.sweep_lvl/20) 132 log_sweep = log_sweep*lvl_to_factor 133 134 self.sweep = log_sweep 135 return log_sweep 136 137 def regen_sweep(self): 138 """Regenerates the sweep.""" 139 self.make_sweep() 140 141 def update_sweep_lvl(self): 142 """Updates the sweep level.""" 143 self.sweep = self.sweep/np.max(np.abs(self.sweep)) 144 self.sweep = self.sweep * 10**(self.sweep_lvl/20) 145 146 def filter_sweep( 147 self, 148 rfft_incident_pressure: np.ndarray, 149 f_limits=(10, 400), 150 ) -> np.ndarray: 151 """Filters the sweep with respect to incident pressure measured beforehand. 152 153 Parameters 154 ---------- 155 rfft_incident_pressure : np.ndarray 156 rfft of incident pressure 157 f_lim : tuple[int, int] 158 frequency limits for the filtering 159 160 Returns 161 ------- 162 filtered_sweep : np.ndarray 163 filtered sweep 164 """ 165 # generate sweep without windows 166 sweep_wo_win = self.make_sweep(windows=False) 167 168 # apply blackman window to the ends of the sweep 169 win = np.blackman(self.window_len//4) 170 sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2] 171 sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:] 172 173 # calculate rfft of the sweep 174 rfft_sweep = np.fft.rfft(sweep_wo_win) 175 rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs) 176 177 # find indices of frequency limits 178 f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0])) 179 f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1])) 180 181 # calculate amplitude of the filtered sweep spectrum 182 amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure) 183 filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep)) 184 185 # construct and apply filter based on blackman windows 186 filt = np.blackman(50) 187 filter = np.ones_like(filtered_sweep_spectrum) 188 filter[:f_low_idx] = 0 189 filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2] 190 filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:] 191 filter[-f_high_idx:] = 0 192 filtered_sweep_spectrum *= filter 193 194 # calculate ifft of the filtered sweep spectrum 195 filtered_sweep = np.fft.irfft(filtered_sweep_spectrum) 196 197 # apply hanning window to the ends of the filtered sweep 198 window = np.hanning(self.window_len) 199 filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2] 200 filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:] 201 202 # normalize the filtered sweep 203 filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep)) 204 205 # apply level to the filtered sweep based on measurement level 206 lvl_to_factor = 10**(self.sweep_lvl/20) 207 filtered_sweep = filtered_sweep*lvl_to_factor 208 209 self.sweep = filtered_sweep 210 return filtered_sweep 211 212 def measure(self, 213 out_path : str='', 214 thd_filter : bool=True, 215 export : bool=True 216 ) -> tuple[np.ndarray, int]: 217 """Performs measurement and saves the recording. 218 219 Parameters 220 ---------- 221 out_path : str 222 path where the recording should be saved, including filename 223 thd_filter : bool 224 enables harmonic distortion filtering 225 This affects the files saved. 226 export : bool 227 enables export to specified path 228 229 Returns 230 ------- 231 data : np.ndarray 232 measured audio data 233 fs : int 234 sampling rate 235 """ 236 data = sd.playrec( 237 self.sweep, 238 input_mapping=self.channels_in, 239 output_mapping=self.channels_out, 240 dtype=np.float32) 241 sd.wait() 242 data = np.asarray(data) 243 244 #filtration 245 if thd_filter: 246 data = data.T 247 data = harmonic_distortion_filter( 248 data, 249 self.sweep, 250 f_low=self.f_limits[0], 251 f_high=self.f_limits[1] 252 ) 253 data = data.T 254 255 if export: 256 sf.write( 257 file=out_path, 258 data=data, 259 samplerate=self.fs, 260 format='WAV', 261 subtype='FLOAT' 262 ) 263 264 return data, self.fs 265 266 def calc_incident_pressure_filter( 267 self, 268 spectrum: np.ndarray, 269 r: np.ndarray, 270 f: np.ndarray, 271 distance: float, 272 speed_of_sound: float = 343, 273 f_limits=(10, 400) 274 ) -> np.ndarray: 275 """ 276 Calculates incident pressure filter based on the measured 277 spectrum and reflection factor. 278 Such filter can be used to filter the input sweep 279 to compensate for the loudspeaker frequency response. 280 281 Parameters 282 ---------- 283 spectrum : np.ndarray 284 measured spectrum 285 r : np.ndarray 286 reflection factor 287 f : np.ndarray 288 frequency values 289 distance : float 290 distance between the sample and the microphone 291 speed_of_sound : float 292 speed of sound in air 293 f_limits : tuple[int, int] 294 frequency limits for the filtering 295 296 Returns 297 ------- 298 incident_pressure : np.ndarray 299 incident pressure filter 300 """ 301 def calculate_incident_pressure( 302 pressure: np.ndarray, 303 reflection_factor: np.ndarray, 304 distance: float, 305 wavenumber: np.ndarray 306 ): 307 """ 308 Calculates incident pressure based on the measured 309 spectrum and the reflection factor. 310 311 Parameters 312 ---------- 313 pressure : np.ndarray 314 measured pressure 315 reflection_factor : np.ndarray 316 reflection factor 317 distance : float 318 distance between the sample and the microphone 319 wavenumber : np.ndarray 320 wavenumber 321 322 Returns 323 ------- 324 incident_pressure : np.ndarray 325 incident pressure filter 326 """ 327 return pressure / ( 328 np.exp(-1j * wavenumber * distance) 329 + reflection_factor * np.exp(1j * wavenumber * distance) 330 ) 331 332 sweep_spectrum = np.fft.rfft(self.sweep.copy()) 333 # calculate incident pressure 334 f_low_idx = np.argmin(np.abs(f-f_limits[0])) 335 f_high_idx = np.argmin(np.abs(f-f_limits[1])) 336 incident_pressure = calculate_incident_pressure( 337 pressure=spectrum, 338 reflection_factor=r[f_low_idx:f_high_idx], 339 distance=distance, 340 wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound 341 ) 342 343 #extend incident pressure to have the same length as sweep_spectrum 344 incident_pressure = np.concatenate([ 345 np.ones(f_low_idx)*incident_pressure[0], 346 incident_pressure 347 ]) 348 incident_pressure = np.concatenate([ 349 incident_pressure, 350 np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1] 351 ]) 352 353 # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples 354 incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure)) 355 # normalize incident by the actual amplitude of the sweep used in the measurement 356 incident_pressure = incident_pressure / np.abs(sweep_spectrum) 357 return incident_pressure
Contains information about measurement from the perspective of signal and boundary conditions.
Attributes
- fs (int): measurement sampling frequency
- channels_in (list[int]): list of input channel numbers
- channels_out (list[int]): list of output channel numbers (usually one member list)
- device (str):
string specifying part of sound card name
List of available devices can be obtained with
python3 -m sounddevice
command. - samples (int): number of samples in the generated log sweep typically 2**n
- window_len (int): length of the Hann half-window applied to the ends of the sweep
- sub_measurements (int): number of measurements taken for each specimen Normally, no differences between sweep measurements should occur, this attribute mainly compensates for potential playback artifacts.
- f_low (int): lower frequency limit for the generated sweep
- f_high (int): higher frequency limit for the generated sweep
- fs_to_spl (float): conversion level from dBFS to dB SPL for microphone 1
- sweep_lvl (float): level of the sweep in dBFS
98 def make_sweep(self, windows=True) -> np.ndarray: 99 """Generates numpy array with log sweep. 100 101 Parameters 102 ---------- 103 fs : int 104 measurement sampling frequency 105 samples : int 106 number of samples in the generated log sweep 107 typically 2**n 108 window_len : int 109 length of the Hann half-window applied to the ends 110 of the sweep 111 f_low : int 112 lower frequency limit for the generated sweep 113 f_high : int 114 higher frequency limit for the generated sweep 115 116 Returns 117 ------- 118 log_sweep : np.ndarray 119 numpy array containing mono log sweep 120 """ 121 t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32) 122 123 half_win = int(self.window_len) 124 log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90) 125 126 if windows: 127 window = hann(int(self.window_len*2)) 128 log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win] 129 log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:] 130 131 lvl_to_factor = 10**(self.sweep_lvl/20) 132 log_sweep = log_sweep*lvl_to_factor 133 134 self.sweep = log_sweep 135 return log_sweep
Generates numpy array with log sweep.
Parameters
- fs (int): measurement sampling frequency
- samples (int): number of samples in the generated log sweep typically 2**n
- window_len (int): length of the Hann half-window applied to the ends of the sweep
- f_low (int): lower frequency limit for the generated sweep
- f_high (int): higher frequency limit for the generated sweep
Returns
- log_sweep (np.ndarray): numpy array containing mono log sweep
141 def update_sweep_lvl(self): 142 """Updates the sweep level.""" 143 self.sweep = self.sweep/np.max(np.abs(self.sweep)) 144 self.sweep = self.sweep * 10**(self.sweep_lvl/20)
Updates the sweep level.
146 def filter_sweep( 147 self, 148 rfft_incident_pressure: np.ndarray, 149 f_limits=(10, 400), 150 ) -> np.ndarray: 151 """Filters the sweep with respect to incident pressure measured beforehand. 152 153 Parameters 154 ---------- 155 rfft_incident_pressure : np.ndarray 156 rfft of incident pressure 157 f_lim : tuple[int, int] 158 frequency limits for the filtering 159 160 Returns 161 ------- 162 filtered_sweep : np.ndarray 163 filtered sweep 164 """ 165 # generate sweep without windows 166 sweep_wo_win = self.make_sweep(windows=False) 167 168 # apply blackman window to the ends of the sweep 169 win = np.blackman(self.window_len//4) 170 sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2] 171 sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:] 172 173 # calculate rfft of the sweep 174 rfft_sweep = np.fft.rfft(sweep_wo_win) 175 rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs) 176 177 # find indices of frequency limits 178 f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0])) 179 f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1])) 180 181 # calculate amplitude of the filtered sweep spectrum 182 amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure) 183 filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep)) 184 185 # construct and apply filter based on blackman windows 186 filt = np.blackman(50) 187 filter = np.ones_like(filtered_sweep_spectrum) 188 filter[:f_low_idx] = 0 189 filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2] 190 filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:] 191 filter[-f_high_idx:] = 0 192 filtered_sweep_spectrum *= filter 193 194 # calculate ifft of the filtered sweep spectrum 195 filtered_sweep = np.fft.irfft(filtered_sweep_spectrum) 196 197 # apply hanning window to the ends of the filtered sweep 198 window = np.hanning(self.window_len) 199 filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2] 200 filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:] 201 202 # normalize the filtered sweep 203 filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep)) 204 205 # apply level to the filtered sweep based on measurement level 206 lvl_to_factor = 10**(self.sweep_lvl/20) 207 filtered_sweep = filtered_sweep*lvl_to_factor 208 209 self.sweep = filtered_sweep 210 return filtered_sweep
Filters the sweep with respect to incident pressure measured beforehand.
Parameters
- rfft_incident_pressure (np.ndarray): rfft of incident pressure
- f_lim (tuple[int, int]): frequency limits for the filtering
Returns
- filtered_sweep (np.ndarray): filtered sweep
212 def measure(self, 213 out_path : str='', 214 thd_filter : bool=True, 215 export : bool=True 216 ) -> tuple[np.ndarray, int]: 217 """Performs measurement and saves the recording. 218 219 Parameters 220 ---------- 221 out_path : str 222 path where the recording should be saved, including filename 223 thd_filter : bool 224 enables harmonic distortion filtering 225 This affects the files saved. 226 export : bool 227 enables export to specified path 228 229 Returns 230 ------- 231 data : np.ndarray 232 measured audio data 233 fs : int 234 sampling rate 235 """ 236 data = sd.playrec( 237 self.sweep, 238 input_mapping=self.channels_in, 239 output_mapping=self.channels_out, 240 dtype=np.float32) 241 sd.wait() 242 data = np.asarray(data) 243 244 #filtration 245 if thd_filter: 246 data = data.T 247 data = harmonic_distortion_filter( 248 data, 249 self.sweep, 250 f_low=self.f_limits[0], 251 f_high=self.f_limits[1] 252 ) 253 data = data.T 254 255 if export: 256 sf.write( 257 file=out_path, 258 data=data, 259 samplerate=self.fs, 260 format='WAV', 261 subtype='FLOAT' 262 ) 263 264 return data, self.fs
Performs measurement and saves the recording.
Parameters
- out_path (str): path where the recording should be saved, including filename
- thd_filter (bool): enables harmonic distortion filtering This affects the files saved.
- export (bool): enables export to specified path
Returns
- data (np.ndarray): measured audio data
- fs (int): sampling rate
266 def calc_incident_pressure_filter( 267 self, 268 spectrum: np.ndarray, 269 r: np.ndarray, 270 f: np.ndarray, 271 distance: float, 272 speed_of_sound: float = 343, 273 f_limits=(10, 400) 274 ) -> np.ndarray: 275 """ 276 Calculates incident pressure filter based on the measured 277 spectrum and reflection factor. 278 Such filter can be used to filter the input sweep 279 to compensate for the loudspeaker frequency response. 280 281 Parameters 282 ---------- 283 spectrum : np.ndarray 284 measured spectrum 285 r : np.ndarray 286 reflection factor 287 f : np.ndarray 288 frequency values 289 distance : float 290 distance between the sample and the microphone 291 speed_of_sound : float 292 speed of sound in air 293 f_limits : tuple[int, int] 294 frequency limits for the filtering 295 296 Returns 297 ------- 298 incident_pressure : np.ndarray 299 incident pressure filter 300 """ 301 def calculate_incident_pressure( 302 pressure: np.ndarray, 303 reflection_factor: np.ndarray, 304 distance: float, 305 wavenumber: np.ndarray 306 ): 307 """ 308 Calculates incident pressure based on the measured 309 spectrum and the reflection factor. 310 311 Parameters 312 ---------- 313 pressure : np.ndarray 314 measured pressure 315 reflection_factor : np.ndarray 316 reflection factor 317 distance : float 318 distance between the sample and the microphone 319 wavenumber : np.ndarray 320 wavenumber 321 322 Returns 323 ------- 324 incident_pressure : np.ndarray 325 incident pressure filter 326 """ 327 return pressure / ( 328 np.exp(-1j * wavenumber * distance) 329 + reflection_factor * np.exp(1j * wavenumber * distance) 330 ) 331 332 sweep_spectrum = np.fft.rfft(self.sweep.copy()) 333 # calculate incident pressure 334 f_low_idx = np.argmin(np.abs(f-f_limits[0])) 335 f_high_idx = np.argmin(np.abs(f-f_limits[1])) 336 incident_pressure = calculate_incident_pressure( 337 pressure=spectrum, 338 reflection_factor=r[f_low_idx:f_high_idx], 339 distance=distance, 340 wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound 341 ) 342 343 #extend incident pressure to have the same length as sweep_spectrum 344 incident_pressure = np.concatenate([ 345 np.ones(f_low_idx)*incident_pressure[0], 346 incident_pressure 347 ]) 348 incident_pressure = np.concatenate([ 349 incident_pressure, 350 np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1] 351 ]) 352 353 # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples 354 incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure)) 355 # normalize incident by the actual amplitude of the sweep used in the measurement 356 incident_pressure = incident_pressure / np.abs(sweep_spectrum) 357 return incident_pressure
Calculates incident pressure filter based on the measured spectrum and reflection factor. Such filter can be used to filter the input sweep to compensate for the loudspeaker frequency response.
Parameters
- spectrum (np.ndarray): measured spectrum
- r (np.ndarray): reflection factor
- f (np.ndarray): frequency values
- distance (float): distance between the sample and the microphone
- speed_of_sound (float): speed of sound in air
- f_limits (tuple[int, int]): frequency limits for the filtering
Returns
- incident_pressure (np.ndarray): incident pressure filter
359class Tube: 360 """Class representing tube geometry. 361 362 Attributes 363 ---------- 364 further_mic_dist : float 365 further microphone distance from sample 366 closer_mic_dist : float 367 closer mic distance from sample 368 freq_limit : int 369 higher frequency limit for exports 370 """ 371 def __init__(self, 372 further_mic_dist : float=0.400115, #x_1 373 closer_mic_dist : float=0.101755, #x_2 374 freq_limit : int=2000, 375 ): 376 self.further_mic_dist = further_mic_dist 377 self.closer_mic_dist = closer_mic_dist 378 self.mic_spacing = further_mic_dist - closer_mic_dist 379 self.freq_limit = freq_limit
Class representing tube geometry.
Attributes
- further_mic_dist (float): further microphone distance from sample
- closer_mic_dist (float): closer mic distance from sample
- freq_limit (int): higher frequency limit for exports
381class Sample: 382 """A class representing sample and its boundary conditions. 383 384 Attributes 385 ---------- 386 name : str 387 name of the sample 388 temperature : float 389 ambient temperature in degC 390 rel_humidity : float 391 ambient relative humidity in % 392 tube : Tube 393 impedance tube definition object 394 timestamp : str 395 strftime timestamp in a format '%y-%m-%d_%H-%M' 396 folder : str 397 path to project data folder, defaults to "data" 398 """ 399 def __init__(self, 400 name : str, 401 temperature : float, 402 rel_humidity : float, 403 atm_pressure : float = 101325, 404 tube : Tube=Tube(), 405 timestamp : str = strftime("%y-%m-%d_%H-%M"), 406 folder = "data", 407 ): 408 self.name = name 409 self.timestamp = timestamp 410 self.temperature = temperature 411 self.atm_pressure = atm_pressure 412 self.rel_humidity = rel_humidity 413 self.tube = tube 414 self.folder = folder 415 self.trees = make_foldertree( 416 self.name, 417 self.folder, 418 self.timestamp 419 ) 420 bound_dict = { 421 'temp': [self.temperature], 422 'RH': [self.rel_humidity], 423 'atm_pressure': [self.atm_pressure], 424 'x1': [self.tube.further_mic_dist], 425 'x2': [self.tube.closer_mic_dist], 426 'lim': [self.tube.freq_limit] 427 } 428 self.boundary_df = pd.DataFrame(bound_dict) 429 self.boundary_df.to_csv( 430 os.path.join( 431 self.trees[2],self.trees[1]+"_bound_cond.csv" 432 ) 433 ) 434 435 def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"): 436 """Migrates calibration files from different measurement. 437 438 Parameters 439 ---------- 440 cal_name : str 441 calibration sample name 442 cal_stamp : str 443 calibration sample timestamp i a '%y-%m-%d_%H-%M' format 444 cal_parent : str 445 parent data folder, defaults to 'data' 446 """ 447 cal_trees = make_foldertree( 448 variant=cal_name, 449 time_stamp=cal_stamp, 450 parent=cal_parent 451 ) 452 cal_parent_folder = cal_trees[2] 453 import_folder = cal_trees[3][1] 454 freqs = np.load( 455 os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy") 456 ) #freq import 457 cf = np.load( 458 os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy") 459 ) #cf import 460 461 parent_folder = self.trees[2] 462 export_folder = self.trees[3][1] 463 np.save( 464 os.path.join(parent_folder, self.trees[1]+"_freqs.npy"), 465 freqs 466 ) #freq export 467 np.save( 468 os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"), 469 cf 470 ) #cf export
A class representing sample and its boundary conditions.
Attributes
- name (str): name of the sample
- temperature (float): ambient temperature in degC
- rel_humidity (float): ambient relative humidity in %
- tube (Tube): impedance tube definition object
- timestamp (str): strftime timestamp in a format '%y-%m-%d_%H-%M'
- folder (str): path to project data folder, defaults to "data"
435 def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"): 436 """Migrates calibration files from different measurement. 437 438 Parameters 439 ---------- 440 cal_name : str 441 calibration sample name 442 cal_stamp : str 443 calibration sample timestamp i a '%y-%m-%d_%H-%M' format 444 cal_parent : str 445 parent data folder, defaults to 'data' 446 """ 447 cal_trees = make_foldertree( 448 variant=cal_name, 449 time_stamp=cal_stamp, 450 parent=cal_parent 451 ) 452 cal_parent_folder = cal_trees[2] 453 import_folder = cal_trees[3][1] 454 freqs = np.load( 455 os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy") 456 ) #freq import 457 cf = np.load( 458 os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy") 459 ) #cf import 460 461 parent_folder = self.trees[2] 462 export_folder = self.trees[3][1] 463 np.save( 464 os.path.join(parent_folder, self.trees[1]+"_freqs.npy"), 465 freqs 466 ) #freq export 467 np.save( 468 os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"), 469 cf 470 ) #cf export
Migrates calibration files from different measurement.
Parameters
- cal_name (str): calibration sample name
- cal_stamp (str): calibration sample timestamp i a '%y-%m-%d_%H-%M' format
- cal_parent (str): parent data folder, defaults to 'data'
472def calibration( 473 sample : Sample, 474 measurement : Measurement, 475 thd_filter : bool=True, 476 export : bool=True, 477 noise_filter : bool=False, 478 ) -> tuple[np.ndarray, np.ndarray]: 479 """Performs CLI calibration measurement. 480 481 Parameters 482 ---------- 483 484 sample : imptube.tube.Sample 485 486 measurement : Measurement 487 488 thd_filter : bool 489 Enables harmonic distortion filtering 490 """ 491 caltree = sample.trees[3][0] 492 if not os.path.exists(caltree): 493 os.makedirs(caltree) 494 495 m = measurement 496 running = True 497 while running: 498 for c in range(1, 3): 499 ready = input(f"Calibrate in configuration {c}? [Y/n]") 500 if ready.lower() == "n": 501 break 502 else: 503 for s in range(m.sub_measurements): 504 f = os.path.join(caltree, sample.trees[1]+f"_cal_wav_conf{c}_{s}.wav") 505 print(f) 506 m.measure(f, thd_filter=thd_filter) 507 sleep(0.5) 508 if input("Repeat calibration process? [y/N]").lower() == "y": 509 continue 510 else: 511 running = False 512 input("Move the microphones to original position before measurement!") 513 514 cal = calibration_from_files(parent_folder=sample.trees[2], export=export, noise_filter=noise_filter) 515 516 return cal
Performs CLI calibration measurement.
Parameters
sample (imptube.tube.Sample):
measurement (Measurement):
thd_filter (bool): Enables harmonic distortion filtering
518def single_measurement( 519 sample : Sample, 520 measurement : Measurement, 521 depth : float, 522 thd_filter : bool= True, 523 calc_spl : bool = True 524 ) -> tuple[list[np.ndarray], int]: 525 """Performs measurement. 526 527 Parameters 528 ---------- 529 530 sample : imptube.tube.Sample 531 532 measurement : Measurement 533 534 depth : float 535 current depth of the sample 536 thd_filter : bool 537 Enables harmonic distortion filtering 538 539 Returns 540 ------- 541 sub_measurement_data : list[np.ndarray] 542 list of audio recordings taken 543 fs : float 544 sampling rate of the recording 545 """ 546 m = measurement 547 sub_measurement_data = [] 548 for s in range(m.sub_measurements): 549 f = os.path.join(sample.trees[4][0], sample.trees[1]+f"_wav_d{depth}_{s}.wav") 550 data, fs = m.measure(f, thd_filter=thd_filter) 551 sub_measurement_data.append(data) 552 sleep(0.5) 553 554 if calc_spl: 555 rms_spl = calc_rms_pressure_level(data.T[0], m.fs_to_spl) 556 logging.info(f"RMS SPL: {rms_spl} dB") 557 m.rms_spl = rms_spl 558 return sub_measurement_data, fs
Performs measurement.
Parameters
sample (imptube.tube.Sample):
measurement (Measurement):
depth (float): current depth of the sample
- thd_filter (bool): Enables harmonic distortion filtering
Returns
- sub_measurement_data (list[np.ndarray]): list of audio recordings taken
- fs (float): sampling rate of the recording
560def calculate_alpha( 561 sample : Sample, 562 return_r : bool = False, 563 return_z : bool = False, 564 noise_filter : bool = False 565 ) -> tuple[np.ndarray, np.ndarray]: 566 """Performs transfer function and alpha calculations from audio data 567 found in a valid folder structure. 568 569 Parameters 570 ---------- 571 sample : Sample 572 573 Returns 574 ------- 575 alpha : np.ndarray 576 sound absorption coefficient for frequencies lower than 577 limit specified in sample.tube.freq_limit 578 freqs : np.ndarray 579 frequency values for the alpha array 580 """ 581 sample.unique_d, sample.tfs = transfer_function_from_path(sample.trees[2], noise_filter=noise_filter) 582 results = alpha_from_path( 583 sample.trees[2], 584 return_f=True, 585 return_r=return_r, 586 return_z=return_z 587 ) 588 return results
Performs transfer function and alpha calculations from audio data found in a valid folder structure.
Parameters
- sample (Sample):
Returns
- alpha (np.ndarray): sound absorption coefficient for frequencies lower than limit specified in sample.tube.freq_limit
- freqs (np.ndarray): frequency values for the alpha array
590class Sensor(Protocol): 591 """A protocol for Sensor class implementation.""" 592 def read_temperature(self) -> float: 593 ... 594 595 def read_humidity(self) -> float: 596 ... 597 598 def read_pressure(self) -> float: 599 ...
A protocol for Sensor class implementation.