Source code for arve.data.compute_spec_reference

import numpy             as     np
from   scipy.interpolate import interp1d
from   scipy.signal      import savgol_filter
from   tqdm              import tqdm
import warnings
warnings.filterwarnings("ignore")

[docs] class compute_spec_reference:
[docs] def compute_spec_reference( self, N_spec : int | None = None , oversamp : int = 10 , smooth : bool = False, ) -> None: """Compute reference spectrum. Parameters ---------- N_spec : int | None, optional number of spectra used to build the reference (if None, all input spectra will be considered), by default None oversamp : int, optional oversampling factor of the wavelength grid, by default 10 smooth : bool, optional smooth the individual spectra before building the reference, by default False Returns ------- None None """ # read data wave_val = self.spec["wave_val"] N_ord = self.spec["N_ord"] N_pix = self.spec["N_pix"] kind = self.spec["interpolation"] # nr. of spectra if N_spec is None: N_spec = self.spec["N_spec"] # read data from internally stored spectra if self.spec["files"] is None: # read data flux_val, flux_err = [self.spec[key] for key in ["flux_val", "flux_err"]] # reference spectrum ref_wave_val = wave_val ref_flux_val = np.array([np.average(flux_val[:N_spec,i], weights=1/flux_err[:N_spec,i]**2, axis=0) for i in range(N_ord)]) ref_flux_err = np.array([np.sqrt(1/np.sum(1/flux_err[:N_spec,i]**2, axis=0)) for i in range(N_ord)]) # read data from externally stored spectra else: # loop spectra print("Building reference spectrum.") print("~~~~ Processed spectra:") for i in tqdm(range(N_spec)): # read spectrum _, flux_val, flux_err = self.read_spec(i) # smooth spectrum if smooth: for j in range(N_ord): flux_val[j] = savgol_filter(flux_val[j], window_length=15, polyorder=2) # reference spectrum if i == 0: ref_wave_val = wave_val ref_flux_val = flux_val ref_flux_err = flux_err else: flux_val_arr = np.array([ref_flux_val,flux_val]) flux_err_arr = np.array([ref_flux_err,flux_err]) ref_flux_val = np.array([np.average(flux_val_arr[:,j], weights=1/flux_err_arr[:,j]**2, axis=0) for j in range(N_ord)]) ref_flux_err = np.array([np.sqrt(1/np.sum(1/flux_err_arr[:,j]**2, axis=0)) for j in range(N_ord)]) # oversample ref_wave_val_samp = np.zeros((N_ord,N_pix+(oversamp-1)*(N_pix-1)))*np.nan ref_flux_val_samp = np.zeros((N_ord,N_pix+(oversamp-1)*(N_pix-1)))*np.nan ref_flux_err_samp = np.zeros((N_ord,N_pix+(oversamp-1)*(N_pix-1)))*np.nan for i in range(N_ord): ref_wave_val_samp[i] = np.append(np.concatenate([np.linspace(ref_wave_val[i][j], ref_wave_val[i][j+1], oversamp+1)[:oversamp] for j in range(N_pix-1)]), ref_wave_val[i][-1]) idx_val_samp = ~np.isnan(ref_wave_val_samp[i]) idx_val = ~np.isnan(ref_wave_val[i]) & ~np.isnan(ref_flux_val[i]) & ~np.isnan(ref_flux_err[i]) if (np.sum(idx_val_samp) > 1) & (np.sum(idx_val) > 1): ref_flux_val_samp[i,idx_val_samp] = interp1d(ref_wave_val[i,idx_val], ref_flux_val[i,idx_val], kind=kind, assume_sorted=True, bounds_error=False)(ref_wave_val_samp[i,idx_val_samp]) ref_flux_err_samp[i,idx_val_samp] = interp1d(ref_wave_val[i,idx_val], ref_flux_err[i,idx_val], kind=kind, assume_sorted=True, bounds_error=False)(ref_wave_val_samp[i,idx_val_samp]) # save spectral data self.spec_reference = {"wave_val": ref_wave_val_samp, "flux_val": ref_flux_val_samp, "flux_err": ref_flux_err_samp} return None