imptube.tube

In this module, three main classes are defined - Measurement, Tube and Sample.

  1'''In this module, three main classes are defined - Measurement, Tube and Sample.
  2'''
  3
  4import sys
  5import sounddevice as sd
  6import numpy as np
  7import pandas as pd
  8import soundfile as sf
  9import os
 10from scipy.io import wavfile
 11from scipy.signal import chirp
 12from scipy.signal.windows import hann
 13from time import sleep, strftime
 14from imptube.utils import make_foldertree
 15from imptube.processing import (
 16    calibration_from_files,
 17    transfer_function_from_path,
 18    alpha_from_path,
 19    harmonic_distortion_filter,
 20    calc_rms_pressure_level
 21)
 22from typing import Protocol
 23import logging
 24
 25
 26class Measurement:
 27    """Contains information about measurement from the perspective of
 28    signal and boundary conditions.
 29
 30    Attributes
 31    ----------
 32    fs : int
 33        measurement sampling frequency
 34    channels_in : list[int]
 35        list of input channel numbers
 36    channels_out : list[int]
 37        list of output channel numbers (usually one member list)
 38    device : str
 39        string specifying part of sound card name
 40        List of available devices can be obtained with
 41        `python3 -m sounddevice` command.
 42    samples : int
 43        number of samples in the generated log sweep
 44        typically 2**n
 45    window_len : int
 46        length of the Hann half-window applied to the ends of the sweep
 47    sub_measurements : int
 48        number of measurements taken for each specimen
 49        Normally, no differences between sweep measurements should occur,
 50        this attribute mainly compensates for potential playback artifacts.
 51    f_low : int
 52        lower frequency limit for the generated sweep
 53    f_high : int
 54        higher frequency limit for the generated sweep
 55    fs_to_spl : float
 56        conversion level from dBFS to dB SPL for microphone 1
 57    sweep_lvl : float
 58        level of the sweep in dBFS
 59    """
 60
 61    def __init__(
 62            self, 
 63            fs : int=48000, 
 64            channels_in : list[int]=[1,2], 
 65            channels_out : list[int]=[1], 
 66            device : str='Scarlett',
 67            samples : int=131072, 
 68            window_len : int=8192,
 69            sub_measurements : int=2,
 70            f_low : int=10,
 71            f_high : int=1000,
 72            fs_to_spl : float=130,
 73            sweep_lvl : float=-6  
 74        ):
 75        self.fs = fs
 76        self.channels_in = channels_in
 77        self.channels_out = channels_out
 78        self.device = device
 79        self.samples = samples
 80        self.window_len = window_len
 81        self.sub_measurements = sub_measurements
 82        self.f_limits = [f_low, f_high]
 83        self.fs_to_spl = fs_to_spl
 84        self.sweep_lvl = sweep_lvl
 85
 86        self.boundary_df = pd.DataFrame({"fs_to_spl": [fs_to_spl]})
 87        self.boundary_df.to_csv(
 88            strftime("%y-%m-%d_%H-%M") + "_lvl_calib.csv"
 89        )
 90
 91        self.make_sweep()
 92        sd.default.samplerate = fs
 93        sd.default.channels = len(channels_in), len(channels_out)
 94        sd.default.device = device
 95
 96
 97    def make_sweep(self, windows=True) -> np.ndarray:
 98        """Generates numpy array with log sweep.
 99
100        Parameters
101        ----------
102        fs : int 
103            measurement sampling frequency
104        samples : int
105            number of samples in the generated log sweep
106            typically 2**n
107        window_len : int
108            length of the Hann half-window applied to the ends
109            of the sweep
110        f_low : int
111            lower frequency limit for the generated sweep
112        f_high : int
113            higher frequency limit for the generated sweep
114
115        Returns
116        -------
117        log_sweep : np.ndarray
118            numpy array containing mono log sweep
119        """
120        t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32)
121        
122        half_win = int(self.window_len)
123        log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90)
124        
125        if windows:
126            window = hann(int(self.window_len*2))
127            log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win]
128            log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:]
129        
130        lvl_to_factor = 10**(self.sweep_lvl/20)
131        log_sweep = log_sweep*lvl_to_factor
132
133        self.sweep = log_sweep
134        return log_sweep
135    
136    def regen_sweep(self):
137        """Regenerates the sweep."""
138        self.make_sweep()
139
140    def update_sweep_lvl(self):
141        """Updates the sweep level."""
142        self.sweep = self.sweep/np.max(np.abs(self.sweep))
143        self.sweep = self.sweep * 10**(self.sweep_lvl/20)
144
145    def filter_sweep(
146        self,
147        rfft_incident_pressure: np.ndarray,
148        f_limits=(10, 400),
149        ) -> np.ndarray:
150        """Filters the sweep with respect to incident pressure measured beforehand.
151
152        Parameters
153        ----------
154        rfft_incident_pressure : np.ndarray
155            rfft of incident pressure
156        f_lim : tuple[int, int]
157            frequency limits for the filtering
158        
159        Returns
160        -------
161        filtered_sweep : np.ndarray
162            filtered sweep
163        """
164        # generate sweep without windows
165        sweep_wo_win = self.make_sweep(windows=False)
166
167        # apply blackman window to the ends of the sweep
168        win = np.blackman(self.window_len//4)
169        sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2]
170        sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:]
171
172        # calculate rfft of the sweep
173        rfft_sweep = np.fft.rfft(sweep_wo_win)
174        rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs)
175        
176        # find indices of frequency limits
177        f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0]))
178        f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1]))
179
180        # calculate amplitude of the filtered sweep spectrum
181        amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure)
182        filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep))
183
184        # construct and apply filter based on blackman windows
185        filt = np.blackman(50)
186        filter = np.ones_like(filtered_sweep_spectrum)
187        filter[:f_low_idx] = 0
188        filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2]
189        filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:]
190        filter[-f_high_idx:] = 0
191        filtered_sweep_spectrum *= filter
192
193        # calculate ifft of the filtered sweep spectrum
194        filtered_sweep = np.fft.irfft(filtered_sweep_spectrum)
195
196        # apply hanning window to the ends of the filtered sweep
197        window = np.hanning(self.window_len)
198        filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2]
199        filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:]
200
201        # normalize the filtered sweep
202        filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep))
203
204        # apply level to the filtered sweep based on measurement level
205        lvl_to_factor = 10**(self.sweep_lvl/20)
206        filtered_sweep = filtered_sweep*lvl_to_factor
207
208        self.sweep = filtered_sweep
209        return filtered_sweep
210
211    def measure(self,
212            out_path : str='',
213            thd_filter : bool=True,
214            export : bool=True
215            ) -> tuple[np.ndarray, int]:
216        """Performs measurement and saves the recording. 
217        
218        Parameters
219        ----------
220        out_path : str
221            path where the recording should be saved, including filename
222        thd_filter : bool
223            enables harmonic distortion filtering
224            This affects the files saved.
225        export : bool
226            enables export to specified path
227
228        Returns
229        -------
230        data : np.ndarray
231            measured audio data
232        fs : int
233            sampling rate
234        """
235        data = sd.playrec(
236            self.sweep, 
237            input_mapping=self.channels_in, 
238            output_mapping=self.channels_out,
239            dtype=np.float32)
240        sd.wait()
241        data = np.asarray(data)
242
243        #filtration
244        if thd_filter:
245            data = data.T
246            data = harmonic_distortion_filter(
247                data, 
248                self.sweep, 
249                f_low=self.f_limits[0], 
250                f_high=self.f_limits[1]
251                )
252            data = data.T
253        
254        if export:    
255            sf.write(
256                file=out_path,
257                data=data,
258                samplerate=self.fs,
259                format='WAV',
260                subtype='FLOAT'
261                )
262        
263        return data, self.fs
264    
265    def calc_incident_pressure_filter(
266        self,
267        spectrum: np.ndarray,
268        r: np.ndarray,
269        f: np.ndarray,
270        distance: float,
271        speed_of_sound: float = 343,
272        f_limits=(10, 400)
273        ) -> np.ndarray:
274        """
275        Calculates incident pressure filter based on the measured 
276        spectrum and reflection factor.
277        Such filter can be used to filter the input sweep
278        to compensate for the loudspeaker frequency response.
279
280        Parameters
281        ----------
282        spectrum : np.ndarray
283            measured spectrum
284        r : np.ndarray
285            reflection factor
286        f : np.ndarray
287            frequency values
288        distance : float
289            distance between the sample and the microphone
290        speed_of_sound : float
291            speed of sound in air
292        f_limits : tuple[int, int]
293            frequency limits for the filtering
294
295        Returns
296        -------
297        incident_pressure : np.ndarray
298            incident pressure filter
299        """
300        def calculate_incident_pressure(
301            pressure: np.ndarray, 
302            reflection_factor: np.ndarray, 
303            distance: float,
304            wavenumber: np.ndarray
305        ):
306            """
307            Calculates incident pressure based on the measured 
308            spectrum and the reflection factor.
309
310            Parameters
311            ----------
312            pressure : np.ndarray
313                measured pressure
314            reflection_factor : np.ndarray
315                reflection factor
316            distance : float
317                distance between the sample and the microphone
318            wavenumber : np.ndarray
319                wavenumber
320
321            Returns
322            -------
323            incident_pressure : np.ndarray
324                incident pressure filter
325            """
326            return pressure / (
327                np.exp(-1j * wavenumber * distance) 
328                + reflection_factor * np.exp(1j * wavenumber * distance)
329            )
330        
331        sweep_spectrum = np.fft.rfft(self.sweep.copy())
332        # calculate incident pressure
333        f_low_idx = np.argmin(np.abs(f-f_limits[0]))
334        f_high_idx = np.argmin(np.abs(f-f_limits[1]))
335        incident_pressure = calculate_incident_pressure(
336            pressure=spectrum,
337            reflection_factor=r[f_low_idx:f_high_idx],
338            distance=distance,
339            wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound
340        )
341
342        #extend incident pressure to have the same length as sweep_spectrum
343        incident_pressure = np.concatenate([
344            np.ones(f_low_idx)*incident_pressure[0], 
345            incident_pressure
346            ])
347        incident_pressure = np.concatenate([
348            incident_pressure, 
349            np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1]
350            ])
351
352        # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples
353        incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure))
354        # normalize incident by the actual amplitude of the sweep used in the measurement
355        incident_pressure = incident_pressure / np.abs(sweep_spectrum)
356        return incident_pressure
357
358class Tube:
359    """Class representing tube geometry.
360
361    Attributes
362    ----------
363    further_mic_dist : float
364        further microphone distance from sample
365    closer_mic_dist : float
366        closer mic distance from sample
367    freq_limit : int
368        higher frequency limit for exports
369    """
370    def __init__(self,
371            further_mic_dist : float=0.400115, #x_1
372            closer_mic_dist : float=0.101755, #x_2
373            freq_limit : int=2000,
374            ):
375        self.further_mic_dist = further_mic_dist
376        self.closer_mic_dist = closer_mic_dist
377        self.mic_spacing = further_mic_dist - closer_mic_dist
378        self.freq_limit = freq_limit
379
380class Sample:
381    """A class representing sample and its boundary conditions.
382    
383    Attributes
384    ----------
385    name : str
386        name of the sample
387    temperature : float
388        ambient temperature in degC
389    rel_humidity : float
390        ambient relative humidity in %
391    tube : Tube
392        impedance tube definition object
393    timestamp : str
394        strftime timestamp in a format '%y-%m-%d_%H-%M'
395    folder : str
396        path to project data folder, defaults to "data"
397    """
398    def __init__(self,
399            name : str,
400            temperature : float,
401            rel_humidity : float,
402            atm_pressure : float = 101325,
403            tube : Tube=Tube(),
404            timestamp : str = strftime("%y-%m-%d_%H-%M"),
405            folder = "data",
406            ):
407        self.name = name
408        self.timestamp = timestamp
409        self.temperature = temperature
410        self.atm_pressure = atm_pressure
411        self.rel_humidity = rel_humidity
412        self.tube = tube
413        self.folder = folder
414        self.trees = make_foldertree(
415            self.name, 
416            self.folder, 
417            self.timestamp
418            )
419        bound_dict = {
420            'temp': [self.temperature],
421            'RH': [self.rel_humidity],
422            'atm_pressure': [self.atm_pressure],
423            'x1': [self.tube.further_mic_dist],
424            'x2': [self.tube.closer_mic_dist],
425            'lim': [self.tube.freq_limit]
426            }
427        self.boundary_df = pd.DataFrame(bound_dict)
428        self.boundary_df.to_csv(
429            os.path.join(
430                self.trees[2],self.trees[1]+"_bound_cond.csv"
431            )
432        )
433            
434    def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"):
435        """Migrates calibration files from different measurement.
436        
437        Parameters
438        ----------
439        cal_name : str
440            calibration sample name
441        cal_stamp : str
442            calibration sample timestamp i a '%y-%m-%d_%H-%M' format
443        cal_parent : str
444            parent data folder, defaults to 'data'
445        """
446        cal_trees = make_foldertree(
447            variant=cal_name,
448            time_stamp=cal_stamp,
449            parent=cal_parent
450            )
451        cal_parent_folder = cal_trees[2]
452        import_folder = cal_trees[3][1]
453        freqs = np.load(
454            os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy")
455            ) #freq import
456        cf = np.load(
457            os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy")
458            ) #cf import
459
460        parent_folder = self.trees[2]
461        export_folder = self.trees[3][1]
462        np.save(
463            os.path.join(parent_folder, self.trees[1]+"_freqs.npy"),
464            freqs
465            ) #freq export
466        np.save(
467            os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"),
468            cf
469            ) #cf export
470
471def calibration(
472        sample : Sample,
473        measurement : Measurement,
474        thd_filter : bool=True,
475        export : bool=True,
476        noise_filter : bool=False,
477        ) -> tuple[np.ndarray, np.ndarray]:
478    """Performs CLI calibration measurement.
479    
480    Parameters
481    ----------
482
483    sample : imptube.tube.Sample
484        
485    measurement : Measurement
486
487    thd_filter : bool
488        Enables harmonic distortion filtering
489    """
490    caltree = sample.trees[3][0]
491    if not os.path.exists(caltree):
492        os.makedirs(caltree)
493
494    m = measurement
495    running = True
496    while running:
497        for c in range(1, 3):
498            ready = input(f"Calibrate in configuration {c}? [Y/n]")
499            if ready.lower() == "n":
500                break
501            else:
502                for s in range(m.sub_measurements):
503                    f = os.path.join(caltree, sample.trees[1]+f"_cal_wav_conf{c}_{s}.wav")
504                    print(f)
505                    m.measure(f, thd_filter=thd_filter)
506                    sleep(0.5)
507        if input("Repeat calibration process? [y/N]").lower() == "y":
508            continue
509        else:
510            running = False
511        input("Move the microphones to original position before measurement!")
512    
513    cal = calibration_from_files(parent_folder=sample.trees[2], export=export, noise_filter=noise_filter)
514
515    return cal
516
517def single_measurement(
518        sample : Sample,
519        measurement : Measurement,
520        depth : float,
521        thd_filter : bool= True,
522        calc_spl : bool = True
523        ) -> tuple[list[np.ndarray], int]:
524    """Performs measurement.
525    
526    Parameters
527    ----------
528
529    sample : imptube.tube.Sample
530        
531    measurement : Measurement
532
533    depth : float
534        current depth of the sample
535    thd_filter : bool
536        Enables harmonic distortion filtering
537
538    Returns
539    -------
540    sub_measurement_data : list[np.ndarray]
541        list of audio recordings taken
542    fs : float
543        sampling rate of the recording
544    """
545    m = measurement
546    sub_measurement_data = []
547    for s in range(m.sub_measurements):
548        f = os.path.join(sample.trees[4][0], sample.trees[1]+f"_wav_d{depth}_{s}.wav")
549        data, fs = m.measure(f, thd_filter=thd_filter)
550        sub_measurement_data.append(data)
551        sleep(0.5)
552
553    if calc_spl:
554        rms_spl = calc_rms_pressure_level(data.T[0], m.fs_to_spl)
555        logging.info(f"RMS SPL: {rms_spl} dB")
556        m.rms_spl = rms_spl
557    return sub_measurement_data, fs
558
559def calculate_alpha(
560        sample : Sample,
561        return_r : bool = False,
562        return_z : bool = False,
563        noise_filter : bool = False
564        ) -> tuple[np.ndarray, np.ndarray]:
565    """Performs transfer function and alpha calculations from audio data
566    found in a valid folder structure.
567
568    Parameters
569    ----------
570    sample : Sample
571
572    Returns
573    -------
574    alpha : np.ndarray
575        sound absorption coefficient for frequencies lower than 
576        limit specified in sample.tube.freq_limit
577    freqs : np.ndarray
578        frequency values for the alpha array
579    """
580    sample.unique_d, sample.tfs = transfer_function_from_path(sample.trees[2], noise_filter=noise_filter)
581    results = alpha_from_path(
582        sample.trees[2],
583        return_f=True,
584        return_r=return_r,
585        return_z=return_z
586        )
587    return results
588
589class Sensor(Protocol):
590    """A protocol for Sensor class implementation."""
591    def read_temperature(self) -> float:
592        ...
593    
594    def read_humidity(self) -> float:
595        ...
596
597    def read_pressure(self) -> float:
598        ...
599    
600def read_env_bc(sensor : Sensor) -> tuple[float, float, float]:
601    for i in range(5):
602        try:
603            temperature = sensor.read_temperature()
604            rel_humidity = sensor.read_humidity()
605            atm_pressure = sensor.read_pressure()
606            break
607        except:
608            print(f"Reading {i+1} not succesful.")
609        if i == 4:
610            print("Unable to read data from sensor, try manually enter temperature and RH on initialization.")
611            sys.exit()
612    return temperature, rel_humidity, atm_pressure
613
614def calculate_spectrum(
615    sample: Sample,
616    substring: str,
617    f_limits=(10, 400)
618):
619    audio_files = os.listdir(sample.trees[4][0])
620    filtered_files = [f for f in audio_files if substring in f]
621    audio_data = []
622    for f in filtered_files:
623        fs, data = wavfile.read(f"{sample.trees[4][0]}/{f}")
624        audio_data.append(data.T[0])
625    audio_data = np.array(audio_data)
626    audio_data = np.mean(audio_data, axis=0)
627    
628    audio_spectrum = np.fft.rfft(audio_data)
629    audio_freqs = np.fft.rfftfreq(len(audio_data), d=1/fs)
630    flow_idx = np.argmin(np.abs(audio_freqs-f_limits[0]))
631    fhigh_idx = np.argmin(np.abs(audio_freqs-f_limits[1]))
632    audio_spectrum = audio_spectrum[flow_idx:fhigh_idx]
633    audio_freqs = audio_freqs[flow_idx:fhigh_idx]
634    return audio_spectrum, audio_freqs
635    #  TODO save bc as config file...
636    #  bound_dict = {
637    #     'temp': [self.temperature],
638    #     'RH': [self.RH],
639    #     'x1': [self.x_1],
640    #     'x2': [self.x_2],
641    #     'lim': [self.limit],
642    #     }
643    # self.boundary_df = pd.DataFrame(bound_dict)
644    # self.trees = make_foldertree(self.name, self.folder)
645    # self.boundary_df.to_csv(
646    #     os.path.join(
647    #         self.trees[2],self.trees[1]+"_bound_cond.csv"
648    #     )
649    # )
650    
class Measurement:
 27class Measurement:
 28    """Contains information about measurement from the perspective of
 29    signal and boundary conditions.
 30
 31    Attributes
 32    ----------
 33    fs : int
 34        measurement sampling frequency
 35    channels_in : list[int]
 36        list of input channel numbers
 37    channels_out : list[int]
 38        list of output channel numbers (usually one member list)
 39    device : str
 40        string specifying part of sound card name
 41        List of available devices can be obtained with
 42        `python3 -m sounddevice` command.
 43    samples : int
 44        number of samples in the generated log sweep
 45        typically 2**n
 46    window_len : int
 47        length of the Hann half-window applied to the ends of the sweep
 48    sub_measurements : int
 49        number of measurements taken for each specimen
 50        Normally, no differences between sweep measurements should occur,
 51        this attribute mainly compensates for potential playback artifacts.
 52    f_low : int
 53        lower frequency limit for the generated sweep
 54    f_high : int
 55        higher frequency limit for the generated sweep
 56    fs_to_spl : float
 57        conversion level from dBFS to dB SPL for microphone 1
 58    sweep_lvl : float
 59        level of the sweep in dBFS
 60    """
 61
 62    def __init__(
 63            self, 
 64            fs : int=48000, 
 65            channels_in : list[int]=[1,2], 
 66            channels_out : list[int]=[1], 
 67            device : str='Scarlett',
 68            samples : int=131072, 
 69            window_len : int=8192,
 70            sub_measurements : int=2,
 71            f_low : int=10,
 72            f_high : int=1000,
 73            fs_to_spl : float=130,
 74            sweep_lvl : float=-6  
 75        ):
 76        self.fs = fs
 77        self.channels_in = channels_in
 78        self.channels_out = channels_out
 79        self.device = device
 80        self.samples = samples
 81        self.window_len = window_len
 82        self.sub_measurements = sub_measurements
 83        self.f_limits = [f_low, f_high]
 84        self.fs_to_spl = fs_to_spl
 85        self.sweep_lvl = sweep_lvl
 86
 87        self.boundary_df = pd.DataFrame({"fs_to_spl": [fs_to_spl]})
 88        self.boundary_df.to_csv(
 89            strftime("%y-%m-%d_%H-%M") + "_lvl_calib.csv"
 90        )
 91
 92        self.make_sweep()
 93        sd.default.samplerate = fs
 94        sd.default.channels = len(channels_in), len(channels_out)
 95        sd.default.device = device
 96
 97
 98    def make_sweep(self, windows=True) -> np.ndarray:
 99        """Generates numpy array with log sweep.
100
101        Parameters
102        ----------
103        fs : int 
104            measurement sampling frequency
105        samples : int
106            number of samples in the generated log sweep
107            typically 2**n
108        window_len : int
109            length of the Hann half-window applied to the ends
110            of the sweep
111        f_low : int
112            lower frequency limit for the generated sweep
113        f_high : int
114            higher frequency limit for the generated sweep
115
116        Returns
117        -------
118        log_sweep : np.ndarray
119            numpy array containing mono log sweep
120        """
121        t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32)
122        
123        half_win = int(self.window_len)
124        log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90)
125        
126        if windows:
127            window = hann(int(self.window_len*2))
128            log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win]
129            log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:]
130        
131        lvl_to_factor = 10**(self.sweep_lvl/20)
132        log_sweep = log_sweep*lvl_to_factor
133
134        self.sweep = log_sweep
135        return log_sweep
136    
137    def regen_sweep(self):
138        """Regenerates the sweep."""
139        self.make_sweep()
140
141    def update_sweep_lvl(self):
142        """Updates the sweep level."""
143        self.sweep = self.sweep/np.max(np.abs(self.sweep))
144        self.sweep = self.sweep * 10**(self.sweep_lvl/20)
145
146    def filter_sweep(
147        self,
148        rfft_incident_pressure: np.ndarray,
149        f_limits=(10, 400),
150        ) -> np.ndarray:
151        """Filters the sweep with respect to incident pressure measured beforehand.
152
153        Parameters
154        ----------
155        rfft_incident_pressure : np.ndarray
156            rfft of incident pressure
157        f_lim : tuple[int, int]
158            frequency limits for the filtering
159        
160        Returns
161        -------
162        filtered_sweep : np.ndarray
163            filtered sweep
164        """
165        # generate sweep without windows
166        sweep_wo_win = self.make_sweep(windows=False)
167
168        # apply blackman window to the ends of the sweep
169        win = np.blackman(self.window_len//4)
170        sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2]
171        sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:]
172
173        # calculate rfft of the sweep
174        rfft_sweep = np.fft.rfft(sweep_wo_win)
175        rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs)
176        
177        # find indices of frequency limits
178        f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0]))
179        f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1]))
180
181        # calculate amplitude of the filtered sweep spectrum
182        amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure)
183        filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep))
184
185        # construct and apply filter based on blackman windows
186        filt = np.blackman(50)
187        filter = np.ones_like(filtered_sweep_spectrum)
188        filter[:f_low_idx] = 0
189        filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2]
190        filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:]
191        filter[-f_high_idx:] = 0
192        filtered_sweep_spectrum *= filter
193
194        # calculate ifft of the filtered sweep spectrum
195        filtered_sweep = np.fft.irfft(filtered_sweep_spectrum)
196
197        # apply hanning window to the ends of the filtered sweep
198        window = np.hanning(self.window_len)
199        filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2]
200        filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:]
201
202        # normalize the filtered sweep
203        filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep))
204
205        # apply level to the filtered sweep based on measurement level
206        lvl_to_factor = 10**(self.sweep_lvl/20)
207        filtered_sweep = filtered_sweep*lvl_to_factor
208
209        self.sweep = filtered_sweep
210        return filtered_sweep
211
212    def measure(self,
213            out_path : str='',
214            thd_filter : bool=True,
215            export : bool=True
216            ) -> tuple[np.ndarray, int]:
217        """Performs measurement and saves the recording. 
218        
219        Parameters
220        ----------
221        out_path : str
222            path where the recording should be saved, including filename
223        thd_filter : bool
224            enables harmonic distortion filtering
225            This affects the files saved.
226        export : bool
227            enables export to specified path
228
229        Returns
230        -------
231        data : np.ndarray
232            measured audio data
233        fs : int
234            sampling rate
235        """
236        data = sd.playrec(
237            self.sweep, 
238            input_mapping=self.channels_in, 
239            output_mapping=self.channels_out,
240            dtype=np.float32)
241        sd.wait()
242        data = np.asarray(data)
243
244        #filtration
245        if thd_filter:
246            data = data.T
247            data = harmonic_distortion_filter(
248                data, 
249                self.sweep, 
250                f_low=self.f_limits[0], 
251                f_high=self.f_limits[1]
252                )
253            data = data.T
254        
255        if export:    
256            sf.write(
257                file=out_path,
258                data=data,
259                samplerate=self.fs,
260                format='WAV',
261                subtype='FLOAT'
262                )
263        
264        return data, self.fs
265    
266    def calc_incident_pressure_filter(
267        self,
268        spectrum: np.ndarray,
269        r: np.ndarray,
270        f: np.ndarray,
271        distance: float,
272        speed_of_sound: float = 343,
273        f_limits=(10, 400)
274        ) -> np.ndarray:
275        """
276        Calculates incident pressure filter based on the measured 
277        spectrum and reflection factor.
278        Such filter can be used to filter the input sweep
279        to compensate for the loudspeaker frequency response.
280
281        Parameters
282        ----------
283        spectrum : np.ndarray
284            measured spectrum
285        r : np.ndarray
286            reflection factor
287        f : np.ndarray
288            frequency values
289        distance : float
290            distance between the sample and the microphone
291        speed_of_sound : float
292            speed of sound in air
293        f_limits : tuple[int, int]
294            frequency limits for the filtering
295
296        Returns
297        -------
298        incident_pressure : np.ndarray
299            incident pressure filter
300        """
301        def calculate_incident_pressure(
302            pressure: np.ndarray, 
303            reflection_factor: np.ndarray, 
304            distance: float,
305            wavenumber: np.ndarray
306        ):
307            """
308            Calculates incident pressure based on the measured 
309            spectrum and the reflection factor.
310
311            Parameters
312            ----------
313            pressure : np.ndarray
314                measured pressure
315            reflection_factor : np.ndarray
316                reflection factor
317            distance : float
318                distance between the sample and the microphone
319            wavenumber : np.ndarray
320                wavenumber
321
322            Returns
323            -------
324            incident_pressure : np.ndarray
325                incident pressure filter
326            """
327            return pressure / (
328                np.exp(-1j * wavenumber * distance) 
329                + reflection_factor * np.exp(1j * wavenumber * distance)
330            )
331        
332        sweep_spectrum = np.fft.rfft(self.sweep.copy())
333        # calculate incident pressure
334        f_low_idx = np.argmin(np.abs(f-f_limits[0]))
335        f_high_idx = np.argmin(np.abs(f-f_limits[1]))
336        incident_pressure = calculate_incident_pressure(
337            pressure=spectrum,
338            reflection_factor=r[f_low_idx:f_high_idx],
339            distance=distance,
340            wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound
341        )
342
343        #extend incident pressure to have the same length as sweep_spectrum
344        incident_pressure = np.concatenate([
345            np.ones(f_low_idx)*incident_pressure[0], 
346            incident_pressure
347            ])
348        incident_pressure = np.concatenate([
349            incident_pressure, 
350            np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1]
351            ])
352
353        # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples
354        incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure))
355        # normalize incident by the actual amplitude of the sweep used in the measurement
356        incident_pressure = incident_pressure / np.abs(sweep_spectrum)
357        return incident_pressure

Contains information about measurement from the perspective of signal and boundary conditions.

Attributes
  • fs (int): measurement sampling frequency
  • channels_in (list[int]): list of input channel numbers
  • channels_out (list[int]): list of output channel numbers (usually one member list)
  • device (str): string specifying part of sound card name List of available devices can be obtained with python3 -m sounddevice command.
  • samples (int): number of samples in the generated log sweep typically 2**n
  • window_len (int): length of the Hann half-window applied to the ends of the sweep
  • sub_measurements (int): number of measurements taken for each specimen Normally, no differences between sweep measurements should occur, this attribute mainly compensates for potential playback artifacts.
  • f_low (int): lower frequency limit for the generated sweep
  • f_high (int): higher frequency limit for the generated sweep
  • fs_to_spl (float): conversion level from dBFS to dB SPL for microphone 1
  • sweep_lvl (float): level of the sweep in dBFS
def make_sweep(self, windows=True) -> numpy.ndarray:
 98    def make_sweep(self, windows=True) -> np.ndarray:
 99        """Generates numpy array with log sweep.
100
101        Parameters
102        ----------
103        fs : int 
104            measurement sampling frequency
105        samples : int
106            number of samples in the generated log sweep
107            typically 2**n
108        window_len : int
109            length of the Hann half-window applied to the ends
110            of the sweep
111        f_low : int
112            lower frequency limit for the generated sweep
113        f_high : int
114            higher frequency limit for the generated sweep
115
116        Returns
117        -------
118        log_sweep : np.ndarray
119            numpy array containing mono log sweep
120        """
121        t = np.linspace(0,self.samples/self.fs,self.samples, dtype=np.float32)
122        
123        half_win = int(self.window_len)
124        log_sweep = chirp(t, self.f_limits[0], t[-1], self.f_limits[1], method="log", phi=90)
125        
126        if windows:
127            window = hann(int(self.window_len*2))
128            log_sweep[:half_win] = log_sweep[:half_win]*window[:half_win]
129            log_sweep[-half_win:] = log_sweep[-half_win:]*window[half_win:]
130        
131        lvl_to_factor = 10**(self.sweep_lvl/20)
132        log_sweep = log_sweep*lvl_to_factor
133
134        self.sweep = log_sweep
135        return log_sweep

Generates numpy array with log sweep.

Parameters
  • fs (int): measurement sampling frequency
  • samples (int): number of samples in the generated log sweep typically 2**n
  • window_len (int): length of the Hann half-window applied to the ends of the sweep
  • f_low (int): lower frequency limit for the generated sweep
  • f_high (int): higher frequency limit for the generated sweep
Returns
  • log_sweep (np.ndarray): numpy array containing mono log sweep
def regen_sweep(self):
137    def regen_sweep(self):
138        """Regenerates the sweep."""
139        self.make_sweep()

Regenerates the sweep.

def update_sweep_lvl(self):
141    def update_sweep_lvl(self):
142        """Updates the sweep level."""
143        self.sweep = self.sweep/np.max(np.abs(self.sweep))
144        self.sweep = self.sweep * 10**(self.sweep_lvl/20)

Updates the sweep level.

def filter_sweep( self, rfft_incident_pressure: numpy.ndarray, f_limits=(10, 400)) -> numpy.ndarray:
146    def filter_sweep(
147        self,
148        rfft_incident_pressure: np.ndarray,
149        f_limits=(10, 400),
150        ) -> np.ndarray:
151        """Filters the sweep with respect to incident pressure measured beforehand.
152
153        Parameters
154        ----------
155        rfft_incident_pressure : np.ndarray
156            rfft of incident pressure
157        f_lim : tuple[int, int]
158            frequency limits for the filtering
159        
160        Returns
161        -------
162        filtered_sweep : np.ndarray
163            filtered sweep
164        """
165        # generate sweep without windows
166        sweep_wo_win = self.make_sweep(windows=False)
167
168        # apply blackman window to the ends of the sweep
169        win = np.blackman(self.window_len//4)
170        sweep_wo_win[:len(win)//2] = sweep_wo_win[:len(win)//2] * win[:len(win)//2]
171        sweep_wo_win[-len(win)//2:] = sweep_wo_win[-len(win)//2:] * win[-len(win)//2:]
172
173        # calculate rfft of the sweep
174        rfft_sweep = np.fft.rfft(sweep_wo_win)
175        rfft_freqs = np.fft.rfftfreq(len(sweep_wo_win), d=1/self.fs)
176        
177        # find indices of frequency limits
178        f_low_idx = np.argmin(np.abs(rfft_freqs-f_limits[0]))
179        f_high_idx = np.argmin(np.abs(rfft_freqs-f_limits[1]))
180
181        # calculate amplitude of the filtered sweep spectrum
182        amplitude = np.abs(rfft_sweep) / np.abs(rfft_incident_pressure)
183        filtered_sweep_spectrum = amplitude * np.exp(1j*np.angle(rfft_sweep))
184
185        # construct and apply filter based on blackman windows
186        filt = np.blackman(50)
187        filter = np.ones_like(filtered_sweep_spectrum)
188        filter[:f_low_idx] = 0
189        filter[f_low_idx:f_low_idx+len(filt)//2] = filt[:len(filt)//2]
190        filter[-f_high_idx-len(filt)//2:-f_high_idx] = filt[len(filt)//2:]
191        filter[-f_high_idx:] = 0
192        filtered_sweep_spectrum *= filter
193
194        # calculate ifft of the filtered sweep spectrum
195        filtered_sweep = np.fft.irfft(filtered_sweep_spectrum)
196
197        # apply hanning window to the ends of the filtered sweep
198        window = np.hanning(self.window_len)
199        filtered_sweep[:len(window)//2] = filtered_sweep[:len(window)//2] * window[:len(window)//2]
200        filtered_sweep[-len(window)//2:] = filtered_sweep[-len(window)//2:] * window[len(window)//2:]
201
202        # normalize the filtered sweep
203        filtered_sweep = filtered_sweep/np.max(np.abs(filtered_sweep))
204
205        # apply level to the filtered sweep based on measurement level
206        lvl_to_factor = 10**(self.sweep_lvl/20)
207        filtered_sweep = filtered_sweep*lvl_to_factor
208
209        self.sweep = filtered_sweep
210        return filtered_sweep

Filters the sweep with respect to incident pressure measured beforehand.

Parameters
  • rfft_incident_pressure (np.ndarray): rfft of incident pressure
  • f_lim (tuple[int, int]): frequency limits for the filtering
Returns
  • filtered_sweep (np.ndarray): filtered sweep
def measure( self, out_path: str = '', thd_filter: bool = True, export: bool = True) -> tuple[numpy.ndarray, int]:
212    def measure(self,
213            out_path : str='',
214            thd_filter : bool=True,
215            export : bool=True
216            ) -> tuple[np.ndarray, int]:
217        """Performs measurement and saves the recording. 
218        
219        Parameters
220        ----------
221        out_path : str
222            path where the recording should be saved, including filename
223        thd_filter : bool
224            enables harmonic distortion filtering
225            This affects the files saved.
226        export : bool
227            enables export to specified path
228
229        Returns
230        -------
231        data : np.ndarray
232            measured audio data
233        fs : int
234            sampling rate
235        """
236        data = sd.playrec(
237            self.sweep, 
238            input_mapping=self.channels_in, 
239            output_mapping=self.channels_out,
240            dtype=np.float32)
241        sd.wait()
242        data = np.asarray(data)
243
244        #filtration
245        if thd_filter:
246            data = data.T
247            data = harmonic_distortion_filter(
248                data, 
249                self.sweep, 
250                f_low=self.f_limits[0], 
251                f_high=self.f_limits[1]
252                )
253            data = data.T
254        
255        if export:    
256            sf.write(
257                file=out_path,
258                data=data,
259                samplerate=self.fs,
260                format='WAV',
261                subtype='FLOAT'
262                )
263        
264        return data, self.fs

Performs measurement and saves the recording.

Parameters
  • out_path (str): path where the recording should be saved, including filename
  • thd_filter (bool): enables harmonic distortion filtering This affects the files saved.
  • export (bool): enables export to specified path
Returns
  • data (np.ndarray): measured audio data
  • fs (int): sampling rate
def calc_incident_pressure_filter( self, spectrum: numpy.ndarray, r: numpy.ndarray, f: numpy.ndarray, distance: float, speed_of_sound: float = 343, f_limits=(10, 400)) -> numpy.ndarray:
266    def calc_incident_pressure_filter(
267        self,
268        spectrum: np.ndarray,
269        r: np.ndarray,
270        f: np.ndarray,
271        distance: float,
272        speed_of_sound: float = 343,
273        f_limits=(10, 400)
274        ) -> np.ndarray:
275        """
276        Calculates incident pressure filter based on the measured 
277        spectrum and reflection factor.
278        Such filter can be used to filter the input sweep
279        to compensate for the loudspeaker frequency response.
280
281        Parameters
282        ----------
283        spectrum : np.ndarray
284            measured spectrum
285        r : np.ndarray
286            reflection factor
287        f : np.ndarray
288            frequency values
289        distance : float
290            distance between the sample and the microphone
291        speed_of_sound : float
292            speed of sound in air
293        f_limits : tuple[int, int]
294            frequency limits for the filtering
295
296        Returns
297        -------
298        incident_pressure : np.ndarray
299            incident pressure filter
300        """
301        def calculate_incident_pressure(
302            pressure: np.ndarray, 
303            reflection_factor: np.ndarray, 
304            distance: float,
305            wavenumber: np.ndarray
306        ):
307            """
308            Calculates incident pressure based on the measured 
309            spectrum and the reflection factor.
310
311            Parameters
312            ----------
313            pressure : np.ndarray
314                measured pressure
315            reflection_factor : np.ndarray
316                reflection factor
317            distance : float
318                distance between the sample and the microphone
319            wavenumber : np.ndarray
320                wavenumber
321
322            Returns
323            -------
324            incident_pressure : np.ndarray
325                incident pressure filter
326            """
327            return pressure / (
328                np.exp(-1j * wavenumber * distance) 
329                + reflection_factor * np.exp(1j * wavenumber * distance)
330            )
331        
332        sweep_spectrum = np.fft.rfft(self.sweep.copy())
333        # calculate incident pressure
334        f_low_idx = np.argmin(np.abs(f-f_limits[0]))
335        f_high_idx = np.argmin(np.abs(f-f_limits[1]))
336        incident_pressure = calculate_incident_pressure(
337            pressure=spectrum,
338            reflection_factor=r[f_low_idx:f_high_idx],
339            distance=distance,
340            wavenumber=2*np.pi*f[f_low_idx:f_high_idx]/speed_of_sound
341        )
342
343        #extend incident pressure to have the same length as sweep_spectrum
344        incident_pressure = np.concatenate([
345            np.ones(f_low_idx)*incident_pressure[0], 
346            incident_pressure
347            ])
348        incident_pressure = np.concatenate([
349            incident_pressure, 
350            np.ones(len(sweep_spectrum)-len(incident_pressure))*incident_pressure[-1]
351            ])
352
353        # smoothen incident pressure by applying a moving average convolution filter with a window of 20 samples
354        incident_pressure = np.convolve(np.abs(incident_pressure), np.hanning(20), mode="same")*np.exp(1j*np.angle(incident_pressure))
355        # normalize incident by the actual amplitude of the sweep used in the measurement
356        incident_pressure = incident_pressure / np.abs(sweep_spectrum)
357        return incident_pressure

Calculates incident pressure filter based on the measured spectrum and reflection factor. Such filter can be used to filter the input sweep to compensate for the loudspeaker frequency response.

Parameters
  • spectrum (np.ndarray): measured spectrum
  • r (np.ndarray): reflection factor
  • f (np.ndarray): frequency values
  • distance (float): distance between the sample and the microphone
  • speed_of_sound (float): speed of sound in air
  • f_limits (tuple[int, int]): frequency limits for the filtering
Returns
  • incident_pressure (np.ndarray): incident pressure filter
class Tube:
359class Tube:
360    """Class representing tube geometry.
361
362    Attributes
363    ----------
364    further_mic_dist : float
365        further microphone distance from sample
366    closer_mic_dist : float
367        closer mic distance from sample
368    freq_limit : int
369        higher frequency limit for exports
370    """
371    def __init__(self,
372            further_mic_dist : float=0.400115, #x_1
373            closer_mic_dist : float=0.101755, #x_2
374            freq_limit : int=2000,
375            ):
376        self.further_mic_dist = further_mic_dist
377        self.closer_mic_dist = closer_mic_dist
378        self.mic_spacing = further_mic_dist - closer_mic_dist
379        self.freq_limit = freq_limit

Class representing tube geometry.

Attributes
  • further_mic_dist (float): further microphone distance from sample
  • closer_mic_dist (float): closer mic distance from sample
  • freq_limit (int): higher frequency limit for exports
class Sample:
381class Sample:
382    """A class representing sample and its boundary conditions.
383    
384    Attributes
385    ----------
386    name : str
387        name of the sample
388    temperature : float
389        ambient temperature in degC
390    rel_humidity : float
391        ambient relative humidity in %
392    tube : Tube
393        impedance tube definition object
394    timestamp : str
395        strftime timestamp in a format '%y-%m-%d_%H-%M'
396    folder : str
397        path to project data folder, defaults to "data"
398    """
399    def __init__(self,
400            name : str,
401            temperature : float,
402            rel_humidity : float,
403            atm_pressure : float = 101325,
404            tube : Tube=Tube(),
405            timestamp : str = strftime("%y-%m-%d_%H-%M"),
406            folder = "data",
407            ):
408        self.name = name
409        self.timestamp = timestamp
410        self.temperature = temperature
411        self.atm_pressure = atm_pressure
412        self.rel_humidity = rel_humidity
413        self.tube = tube
414        self.folder = folder
415        self.trees = make_foldertree(
416            self.name, 
417            self.folder, 
418            self.timestamp
419            )
420        bound_dict = {
421            'temp': [self.temperature],
422            'RH': [self.rel_humidity],
423            'atm_pressure': [self.atm_pressure],
424            'x1': [self.tube.further_mic_dist],
425            'x2': [self.tube.closer_mic_dist],
426            'lim': [self.tube.freq_limit]
427            }
428        self.boundary_df = pd.DataFrame(bound_dict)
429        self.boundary_df.to_csv(
430            os.path.join(
431                self.trees[2],self.trees[1]+"_bound_cond.csv"
432            )
433        )
434            
435    def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"):
436        """Migrates calibration files from different measurement.
437        
438        Parameters
439        ----------
440        cal_name : str
441            calibration sample name
442        cal_stamp : str
443            calibration sample timestamp i a '%y-%m-%d_%H-%M' format
444        cal_parent : str
445            parent data folder, defaults to 'data'
446        """
447        cal_trees = make_foldertree(
448            variant=cal_name,
449            time_stamp=cal_stamp,
450            parent=cal_parent
451            )
452        cal_parent_folder = cal_trees[2]
453        import_folder = cal_trees[3][1]
454        freqs = np.load(
455            os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy")
456            ) #freq import
457        cf = np.load(
458            os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy")
459            ) #cf import
460
461        parent_folder = self.trees[2]
462        export_folder = self.trees[3][1]
463        np.save(
464            os.path.join(parent_folder, self.trees[1]+"_freqs.npy"),
465            freqs
466            ) #freq export
467        np.save(
468            os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"),
469            cf
470            ) #cf export

A class representing sample and its boundary conditions.

Attributes
  • name (str): name of the sample
  • temperature (float): ambient temperature in degC
  • rel_humidity (float): ambient relative humidity in %
  • tube (Tube): impedance tube definition object
  • timestamp (str): strftime timestamp in a format '%y-%m-%d_%H-%M'
  • folder (str): path to project data folder, defaults to "data"
def migrate_cal(self, cal_name, cal_stamp, cal_parent='data'):
435    def migrate_cal(self, cal_name, cal_stamp, cal_parent="data"):
436        """Migrates calibration files from different measurement.
437        
438        Parameters
439        ----------
440        cal_name : str
441            calibration sample name
442        cal_stamp : str
443            calibration sample timestamp i a '%y-%m-%d_%H-%M' format
444        cal_parent : str
445            parent data folder, defaults to 'data'
446        """
447        cal_trees = make_foldertree(
448            variant=cal_name,
449            time_stamp=cal_stamp,
450            parent=cal_parent
451            )
452        cal_parent_folder = cal_trees[2]
453        import_folder = cal_trees[3][1]
454        freqs = np.load(
455            os.path.join(cal_parent_folder, cal_trees[1]+"_freqs.npy")
456            ) #freq import
457        cf = np.load(
458            os.path.join(import_folder, cal_trees[1]+"_cal_f_12.npy")
459            ) #cf import
460
461        parent_folder = self.trees[2]
462        export_folder = self.trees[3][1]
463        np.save(
464            os.path.join(parent_folder, self.trees[1]+"_freqs.npy"),
465            freqs
466            ) #freq export
467        np.save(
468            os.path.join(export_folder, self.trees[1]+"_cal_f_12.npy"),
469            cf
470            ) #cf export

Migrates calibration files from different measurement.

Parameters
  • cal_name (str): calibration sample name
  • cal_stamp (str): calibration sample timestamp i a '%y-%m-%d_%H-%M' format
  • cal_parent (str): parent data folder, defaults to 'data'
def calibration( sample: Sample, measurement: Measurement, thd_filter: bool = True, export: bool = True, noise_filter: bool = False) -> tuple[numpy.ndarray, numpy.ndarray]:
472def calibration(
473        sample : Sample,
474        measurement : Measurement,
475        thd_filter : bool=True,
476        export : bool=True,
477        noise_filter : bool=False,
478        ) -> tuple[np.ndarray, np.ndarray]:
479    """Performs CLI calibration measurement.
480    
481    Parameters
482    ----------
483
484    sample : imptube.tube.Sample
485        
486    measurement : Measurement
487
488    thd_filter : bool
489        Enables harmonic distortion filtering
490    """
491    caltree = sample.trees[3][0]
492    if not os.path.exists(caltree):
493        os.makedirs(caltree)
494
495    m = measurement
496    running = True
497    while running:
498        for c in range(1, 3):
499            ready = input(f"Calibrate in configuration {c}? [Y/n]")
500            if ready.lower() == "n":
501                break
502            else:
503                for s in range(m.sub_measurements):
504                    f = os.path.join(caltree, sample.trees[1]+f"_cal_wav_conf{c}_{s}.wav")
505                    print(f)
506                    m.measure(f, thd_filter=thd_filter)
507                    sleep(0.5)
508        if input("Repeat calibration process? [y/N]").lower() == "y":
509            continue
510        else:
511            running = False
512        input("Move the microphones to original position before measurement!")
513    
514    cal = calibration_from_files(parent_folder=sample.trees[2], export=export, noise_filter=noise_filter)
515
516    return cal

Performs CLI calibration measurement.

Parameters
  • sample (imptube.tube.Sample):

  • measurement (Measurement):

  • thd_filter (bool): Enables harmonic distortion filtering

def single_measurement( sample: Sample, measurement: Measurement, depth: float, thd_filter: bool = True, calc_spl: bool = True) -> tuple[list[numpy.ndarray], int]:
518def single_measurement(
519        sample : Sample,
520        measurement : Measurement,
521        depth : float,
522        thd_filter : bool= True,
523        calc_spl : bool = True
524        ) -> tuple[list[np.ndarray], int]:
525    """Performs measurement.
526    
527    Parameters
528    ----------
529
530    sample : imptube.tube.Sample
531        
532    measurement : Measurement
533
534    depth : float
535        current depth of the sample
536    thd_filter : bool
537        Enables harmonic distortion filtering
538
539    Returns
540    -------
541    sub_measurement_data : list[np.ndarray]
542        list of audio recordings taken
543    fs : float
544        sampling rate of the recording
545    """
546    m = measurement
547    sub_measurement_data = []
548    for s in range(m.sub_measurements):
549        f = os.path.join(sample.trees[4][0], sample.trees[1]+f"_wav_d{depth}_{s}.wav")
550        data, fs = m.measure(f, thd_filter=thd_filter)
551        sub_measurement_data.append(data)
552        sleep(0.5)
553
554    if calc_spl:
555        rms_spl = calc_rms_pressure_level(data.T[0], m.fs_to_spl)
556        logging.info(f"RMS SPL: {rms_spl} dB")
557        m.rms_spl = rms_spl
558    return sub_measurement_data, fs

Performs measurement.

Parameters
  • sample (imptube.tube.Sample):

  • measurement (Measurement):

  • depth (float): current depth of the sample

  • thd_filter (bool): Enables harmonic distortion filtering
Returns
  • sub_measurement_data (list[np.ndarray]): list of audio recordings taken
  • fs (float): sampling rate of the recording
def calculate_alpha( sample: Sample, return_r: bool = False, return_z: bool = False, noise_filter: bool = False) -> tuple[numpy.ndarray, numpy.ndarray]:
560def calculate_alpha(
561        sample : Sample,
562        return_r : bool = False,
563        return_z : bool = False,
564        noise_filter : bool = False
565        ) -> tuple[np.ndarray, np.ndarray]:
566    """Performs transfer function and alpha calculations from audio data
567    found in a valid folder structure.
568
569    Parameters
570    ----------
571    sample : Sample
572
573    Returns
574    -------
575    alpha : np.ndarray
576        sound absorption coefficient for frequencies lower than 
577        limit specified in sample.tube.freq_limit
578    freqs : np.ndarray
579        frequency values for the alpha array
580    """
581    sample.unique_d, sample.tfs = transfer_function_from_path(sample.trees[2], noise_filter=noise_filter)
582    results = alpha_from_path(
583        sample.trees[2],
584        return_f=True,
585        return_r=return_r,
586        return_z=return_z
587        )
588    return results

Performs transfer function and alpha calculations from audio data found in a valid folder structure.

Parameters
  • sample (Sample):
Returns
  • alpha (np.ndarray): sound absorption coefficient for frequencies lower than limit specified in sample.tube.freq_limit
  • freqs (np.ndarray): frequency values for the alpha array
class Sensor(typing.Protocol):
590class Sensor(Protocol):
591    """A protocol for Sensor class implementation."""
592    def read_temperature(self) -> float:
593        ...
594    
595    def read_humidity(self) -> float:
596        ...
597
598    def read_pressure(self) -> float:
599        ...

A protocol for Sensor class implementation.