Source code for arve.data.compute_vrad_lbl

from   astropy.stats     import sigma_clip
import numpy             as     np
from   scipy.interpolate import interp1d
from   scipy.optimize    import curve_fit
from   tqdm              import tqdm
import warnings
warnings.filterwarnings("ignore")

[docs] class compute_vrad_lbl:
[docs] def compute_vrad_lbl( self, scale : bool = True, bins : list[list[float]] | None = None, N_iter : int = 1 , vrad_err_lim : float = 1e-3, criteria : list[str] | None = None, exclude_tellurics : bool = True, exclude_regions : bool = True, ) -> None: """Compute radial velocities (RVs) from spectral data using the line-by-line (LBL) method. Parameters ---------- scale : bool, optional scale reference spectrum when fitting, by default True bins : list[list[float]] | None, optional formation temperature bins with which to segment each line (if None, only 1 bin with the entire line-forming temperature range is considered), by default None N_iter : int, optional number of iterations per line segment (the inital RV guess is updated from the previous iteration), by default 1 vrad_err_lim : float, optional RV error lower limit (used to prevent diverging RVs due to null errors returned by the numerical solver for some lines), by default 1e-3 criteria : list[str] | None, optional criteria to apply (must be boolean columns available in the mask), by default None exclude_tellurics : bool, optional exclude telluric bands, by default True exclude_regions : bool, optional exclude wavelength intervals, by default True Returns ------- None None """ # read data wave_val = self.spec["wave_val"] ref_wave_val, ref_flux_val = [self.spec_reference[key] for key in ["wave_val", "flux_val"]] temp = self.aux_data["spec"]["temp"] N_spec = self.spec["N_spec"] N_ord = self.spec["N_ord"] kind = self.spec["interpolation"] # reference spectrum gradient ref_grad_val = np.zeros_like(ref_wave_val) for i in range(N_ord): ref_grad_val[i] = np.gradient(ref_flux_val[i])/np.gradient(ref_wave_val[i]) # reference flux and gradient functions ref_flux_val_func = np.zeros(N_ord, dtype=object) ref_grad_val_func = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx_val = ~np.isnan(ref_wave_val[i]) & ~np.isnan(ref_flux_val[i]) & ~np.isnan(ref_grad_val[i]) if np.sum(idx_val) > 0: ref_flux_val_func[i] = interp1d(ref_wave_val[i,idx_val], ref_flux_val[i,idx_val], kind=kind, assume_sorted=True, bounds_error=False) ref_grad_val_func[i] = interp1d(ref_wave_val[i,idx_val], ref_grad_val[i,idx_val], kind=kind, assume_sorted=True, bounds_error=False) else: ref_flux_val_func[i] = lambda x: np.zeros_like(x)*np.nan ref_grad_val_func[i] = lambda x: np.zeros_like(x)*np.nan # read constants c = self.arve.functions.constants["c"] # read mask mask = self.aux_data["mask"] # temperature bins if bins is None: bins = [[np.nanmin(temp),np.nanmax(temp)]] # nr. of lines and temperature bins N_line = np.zeros(N_ord, dtype=int) for i in range(N_ord): N_line[i] = len(mask[i]) N_bin = len(bins) # exclude tellurics if exclude_tellurics: if criteria is None: criteria = ["crit_tell"] else: criteria.append("crit_tell") # exclude regions if exclude_regions: if criteria is None: criteria = ["crit_excl"] else: criteria.append("crit_excl") # lines which satisfy criteria idx_crit = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx_crit[i] = np.ones(N_line[i], dtype=bool) if criteria is not None: for j in range(len(criteria)): crit = np.array(mask[i][criteria[j]]) idx_crit[i] *= crit # lower and upper index of lines idx_l = np.zeros(N_ord, dtype=object) idx_u = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx_l[i] = np.array(mask[i]["idx_l"]) idx_u[i] = np.array(mask[i]["idx_u"]) # index of lines idx_line = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx_line[i] = np.zeros(N_line[i], dtype=object) for j in range(N_line[i]): idx_line[i][j] = np.arange(idx_l[i][j], idx_u[i][j]+1) # index of temperature bins idx_temp = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx_temp[i] = np.zeros(N_bin, dtype=object) for j in range(N_bin): idx_temp[i][j] = np.where((temp[i]>=bins[j][0]) & (temp[i]<=bins[j][1]))[0] # index of temperature-segmented line parts idx = np.zeros(N_ord, dtype=object) for i in range(N_ord): idx[i] = np.zeros((N_line[i],N_bin), dtype=object) for j in range(N_line[i]): for k in range(N_bin): idx[i][j,k] = np.intersect1d(idx_line[i][j], idx_temp[i][k]) # empty arrays for RV values and errors vrad_val_lbl = np.zeros(N_ord, dtype=object) vrad_err_lbl = np.zeros(N_ord, dtype=object) for i in range(N_ord): vrad_val_lbl[i] = np.zeros((N_spec,N_line[i],N_bin))*np.nan vrad_err_lbl[i] = np.zeros((N_spec,N_line[i],N_bin))*np.nan # mask array for RV values and errors vrad_mask_lbl = np.zeros(N_ord, dtype=object) for i in range(N_ord): vrad_mask_lbl[i] = np.ones((N_spec,N_line[i],N_bin), dtype=bool) # loop spectra print("Extracting LBL RVs.") print("~~~~ Processed spectra:") for i in tqdm(range(N_spec)): # read spectrum _, flux_val, flux_err = self.read_spec(i) # loop orders for j in range(N_ord): # loop lines for k in range(N_line[j]): # check if line satisfies criteria if idx_crit[j][k]: # loop temperature bins for l in range(N_bin): # initial guess vrad_val = 0 # wavelength, flux and flux error for line part obs_wave_val = wave_val[j,idx[j][k,l]] obs_flux_val = flux_val[j,idx[j][k,l]] obs_flux_err = flux_err[j,idx[j][k,l]] # try to template match try: # loop iterations for _ in range(N_iter): # observed spectrum shifted obs_wave_val_shift = obs_wave_val*(1-vrad_val/c) obs_flux_val_shift = obs_flux_val obs_flux_err_shift = obs_flux_err # reference spectrum interpolated ref_wave_val_inter = obs_wave_val_shift ref_flux_val_inter = ref_flux_val_func[j](ref_wave_val_inter) ref_grad_val_inter = ref_grad_val_func[j](ref_wave_val_inter) # select function of shifted spectrum and initial guess on parameters if scale: func_spec_shift = _spec_shift_scale p0 = (0,1) else: func_spec_shift = _spec_shift p0 = (0,) # solve for parameters param, _ = curve_fit(func_spec_shift, (ref_wave_val_inter, ref_flux_val_inter, ref_grad_val_inter), obs_flux_val_shift, sigma=obs_flux_err_shift, p0=p0) # compute RV and RV error vrad_val += param[0]*c vrad_err = 1/np.sqrt(np.sum(1/(obs_flux_err_shift/(ref_grad_val_inter*ref_wave_val_inter/c))**2)) if scale: vrad_err /= np.sqrt(param[1]) # save vrad_val_lbl[j][i,k,l] = vrad_val vrad_err_lbl[j][i,k,l] = vrad_err # if unsuccessful, continue except: continue # replace infinite values with NaNs for i in range(N_ord): vrad_val_lbl[i][np.abs(vrad_val_lbl[i]) == np.inf] = np.nan vrad_err_lbl[i][np.abs(vrad_err_lbl[i]) == np.inf] = np.nan # update mask array for i in range(N_ord): vrad_mask_lbl[i][np.isnan(vrad_val_lbl[i]) | np.isnan(vrad_err_lbl[i])] = False # outlier identification for i in range(N_spec): *_, clip_val_min, clip_val_max = sigma_clip(np.vstack([vrad_val_lbl[j][i,:,:] for j in range(N_ord)]), maxiters=None, return_bounds=True) *_, clip_err_min, clip_err_max = sigma_clip(np.vstack([vrad_err_lbl[j][i,:,:] for j in range(N_ord)]), maxiters=None, return_bounds=True) for j in range(N_ord): for k in range(N_line[j]): for l in range(N_bin): if (vrad_val_lbl[j][i,k,l] < clip_val_min) | (vrad_val_lbl[j][i,k,l] > clip_val_max): vrad_mask_lbl[j][i,k,l] = False if (vrad_err_lbl[j][i,k,l] < clip_err_min) | (vrad_err_lbl[j][i,k,l] > clip_err_max): vrad_mask_lbl[j][i,k,l] = False # weighted average of all valid lines per order vrad_val_ord = np.zeros((N_spec,N_ord,N_bin))*np.nan vrad_err_ord = np.zeros((N_spec,N_ord,N_bin))*np.nan for i in range(N_spec): for j in range(N_ord): for k in range(N_bin): idx = vrad_mask_lbl[j][i,:,k] & (vrad_err_lbl[j][i,:,k]>vrad_err_lim) if np.sum(idx) > 0: vrad_val_ord[i,j,k] = np.average(vrad_val_lbl[j][i,idx,k], weights=1/vrad_err_lbl[j][i,idx,k]**2) vrad_err_ord[i,j,k] = np.sqrt(1/np.sum(1/vrad_err_lbl[j][i,idx,k]**2)) # weighted average of all orders per temperature bin vrad_val_bin = np.zeros((N_spec,N_bin))*np.nan vrad_err_bin = np.zeros((N_spec,N_bin))*np.nan for i in range(N_spec): for j in range(N_bin): idx = ~np.isnan(vrad_val_ord[i,:,j]) & (vrad_err_ord[i,:,j]>0) if np.sum(idx) > 0: vrad_val_bin[i,j] = np.average(vrad_val_ord[i,idx,j], weights=1/vrad_err_ord[i,idx,j]**2) vrad_err_bin[i,j] = np.sqrt(1/np.sum(1/vrad_err_ord[i,idx,j]**2)) # default RV time series (taken to be the first temperature bin) vrad_val = vrad_val_bin[:,0] vrad_err = vrad_err_bin[:,0] # save RV data self.vrad = { "vrad_val" : vrad_val , "vrad_err" : vrad_err , "vrad_val_bin" : vrad_val_bin , "vrad_err_bin" : vrad_err_bin , "vrad_val_ord" : vrad_val_ord , "vrad_err_ord" : vrad_err_ord , "vrad_val_lbl" : vrad_val_lbl , "vrad_err_lbl" : vrad_err_lbl , "vrad_mask_lbl": vrad_mask_lbl, "bins" : bins , "method" : "LBL" , } return None
def _spec_shift( X : tuple[np.ndarray], *param : tuple[float] ) -> np.ndarray: """Spectrum shifted. Parameters ---------- X : tuple[np.ndarray] wavelength, flux and gradient arrays *param : tuple[float] free parameter(s): z (RV divided by speed of light) Returns ------- np.ndarray shifted spectrum """ wave, flux, grad = X z = param S = flux - grad*wave*z return S def _spec_shift_scale( X : tuple[np.ndarray], *param : tuple[float] ) -> np.ndarray: """Spectrum shifted and scaled. Parameters ---------- X : tuple[np.ndarray] wavelength, flux and gradient arrays *param : tuple[float] free parameter(s): z (RV divided by speed of light) and A (scaling factor) Returns ------- np.ndarray shifted and scaled spectrum """ wave, flux, grad = X z, A = param S = (flux - grad*wave*z)*A return S