Source code for arve.data.compute_vrad_ccf

import numpy             as     np
from   scipy.interpolate import interp1d
from   scipy.optimize    import curve_fit
from   tqdm              import tqdm

[docs] class compute_vrad_ccf:
[docs] def compute_vrad_ccf( self, weight_name : str | None = None , vrad_grid : list[float] | None = None , ccf_err_scale : bool = False, criteria : list[str] | None = None , exclude_tellurics : bool = True , exclude_regions : bool = True , ) -> None: """Compute radial velocities (RVs) from spectral data using the cross-correlation function (CCF) method. Parameters ---------- weight_name : str | None, optional name of float column available in the mask to be used as weight (if None, all lines are weighted equally), by default None vrad_grid : list[float] | None, optional velocity grid, in the format [start,stop,step] and in units of km/s, on which to evaluate the CCF (if None, the grid is set to [-20,20,v_med] where v_med is the median wavelength step converted to a velocity), by default None ccf_err_scale : bool, optional scale the CCF errors if the velocity grid is under- or over-sampled, by default False criteria : list[str] | None, optional criteria to apply (must be boolean columns available in the mask), by default None exclude_tellurics : bool, optional exclude telluric bands, by default True exclude_regions : bool, optional exclude wavelength intervals, by default True Returns ------- None None """ # read data wave_val = self.spec["wave_val"] N_spec = self.spec["N_spec"] N_ord = self.spec["N_ord"] # read constants c = self.arve.functions.constants["c"] # read mask mask = self.aux_data["mask"] # nr. of lines N_line = np.zeros(N_ord, dtype=int) for i in range(N_ord): N_line[i] = len(mask[i]) # central wavelengths wave_c = np.zeros(N_ord, dtype=object) for i in range(N_ord): wave_c[i] = mask[i]["wave"].values # weights weight = np.zeros(N_ord, dtype=object) for i in range(N_ord): if weight_name is None: weight[i] = np.ones(N_line[i]) else: weight[i] = mask[i][weight_name].values # exclude tellurics if exclude_tellurics: if criteria is None: criteria = ["crit_tell"] else: criteria.append("crit_tell") # exclude regions if exclude_regions: if criteria is None: criteria = ["crit_excl"] else: criteria.append("crit_excl") # lines which satisfy criteria if criteria is not None: for i in range(N_ord): idx = np.ones(N_line[i], dtype=bool) for j in range(len(criteria)): crit = mask[i][criteria[j]].values idx *= crit wave_c[i] = wave_c[i][idx] weight[i] = weight[i][idx] # RV shifts wave_val_flat = np.concatenate(wave_val) vrad_step_med = np.nanmedian(np.diff(wave_val_flat)/wave_val_flat[:-1])*c if vrad_grid is None: ccf_vrad_start = -20 ccf_vrad_stop = 20 ccf_vrad_step = vrad_step_med ccf_vrad = np.arange(ccf_vrad_start, ccf_vrad_stop, ccf_vrad_step) ccf_err_corr = 1 else: ccf_vrad = np.arange(vrad_grid[0], vrad_grid[1]+vrad_grid[2]/2, vrad_grid[2]) ccf_err_corr = (vrad_step_med/vrad_grid[2])**(1/2) N_vrad = len(ccf_vrad) # keep mask lines within spectrum overlap for i in range(N_ord): idx = (self.arve.functions.doppler_shift(wave=wave_c[i], v=np.min(ccf_vrad)) > np.min(wave_val[i])) & \ (self.arve.functions.doppler_shift(wave=wave_c[i], v=np.max(ccf_vrad)) < np.max(wave_val[i])) wave_c[i] = wave_c[i][idx] weight[i] = weight[i][idx] # normalize weights for i in range(N_ord): weight[i] = weight[i]/np.sum(weight[i]) # empty arrays for indices i_l = np.zeros((N_ord,N_vrad), dtype=object) i_u = np.zeros((N_ord,N_vrad), dtype=object) # empty arrays for fractions r_l = np.zeros((N_ord,N_vrad), dtype=object) r_u = np.zeros((N_ord,N_vrad), dtype=object) # loop spectral orders for i in range(N_ord): # loop RV shifts for j in range(N_vrad): # shifted central wavelengths wc_shift = self.arve.functions.doppler_shift(wave=wave_c[i], v=ccf_vrad[j]) # upper and lower indices i_u[i,j] = np.searchsorted(wave_val[i], wc_shift) i_l[i,j] = i_u[i,j] - 1 # upper and lower fractions r_u[i,j] = (wc_shift-wave_val[i][i_l[i,j]])/(wave_val[i][i_u[i,j]]-wave_val[i][i_l[i,j]]) r_l[i,j] = 1 - r_u[i,j] # empty arrays for CCF values and errors ccf_val = np.zeros((N_spec,N_ord+1,N_vrad)) ccf_err = np.zeros((N_spec,N_ord+1,N_vrad)) # empty arrays for CCF moments vrad_val_arr = np.zeros((N_spec,N_ord+1))*np.nan vrad_err_arr = np.zeros((N_spec,N_ord+1))*np.nan cont_val_arr = np.zeros((N_spec,N_ord+1))*np.nan cont_err_arr = np.zeros((N_spec,N_ord+1))*np.nan ampl_val_arr = np.zeros((N_spec,N_ord+1))*np.nan ampl_err_arr = np.zeros((N_spec,N_ord+1))*np.nan fwhm_val_arr = np.zeros((N_spec,N_ord+1))*np.nan fwhm_err_arr = np.zeros((N_spec,N_ord+1))*np.nan # loop spectra print("Extracting CCF RVs.") print("~~~~ Processed spectra:") for i in tqdm(range(N_spec)): # read spectrum _, flux_val, flux_err = self.read_spec(i) # loop orders for j in range(N_ord+1): try: # single order if j < N_ord: # loop RV shifts for k in range(N_vrad): # CCF value and error ccf_val[i,j,k] = np.nansum(( flux_val[j][i_l[j,k]]*r_l[j,k] + flux_val[j][i_u[j,k]]*r_u[j,k] )*weight[j] ) ccf_err[i,j,k] = np.nansum(((flux_err[j][i_l[j,k]]*r_l[j,k])**2+(flux_err[j][i_u[j,k]]*r_u[j,k])**2)*weight[j]**2)**(1/2) # sum of all orders else: # order-summed CCF values and errors ccf_val[i,j] = np.nansum(ccf_val[i] , axis=0) ccf_err[i,j] = np.nansum(ccf_err[i]**2, axis=0)**(1/2) # scale CCF errors if ccf_err_scale: ccf_err[i,j] *= ccf_err_corr # initial guess on Gaussian parameters i_min = np.argmin(ccf_val[i,j]) i_max = np.argmax(ccf_val[i,j]) C0 = ccf_val[i,j,i_max] a0 = 1-ccf_val[i,j,i_min]/C0 b0 = ccf_vrad[i_min] c0 = (b0-interp1d(ccf_val[i,j,:i_min], ccf_vrad[:i_min])((ccf_val[i,j,i_min]+ccf_val[i,j,i_max])/2))*2 if np.abs(b0) < 1e-3: b0 = 1e-3 p0 = (C0, a0, b0, c0) # CCF parameters with fitted Gaussian param_val, param_cov = curve_fit(self.arve.functions.inverted_gaussian, ccf_vrad, ccf_val[i,j], sigma=ccf_err[i,j], p0=p0) param_err = np.sqrt(np.diag(param_cov)) # RV value and error vrad_val_arr[i,j] = param_val[2] vrad_err_arr[i,j] = 1/np.sqrt(np.sum(1/(ccf_err[i,j]*np.gradient(ccf_vrad)/np.gradient(ccf_val[i,j]))**2)) # continuum value and error cont_val_arr[i,j] = param_val[0] cont_err_arr[i,j] = param_err[0] # amplitude value and error ampl_val_arr[i,j] = param_val[1] ampl_err_arr[i,j] = param_err[1] # FWHM value and error fwhm_val_arr[i,j] = param_val[3] fwhm_err_arr[i,j] = param_err[3] except: continue # emtpty arrays for weighted average of spectral orders vrad_val = np.zeros(N_spec)*np.nan vrad_err = np.zeros(N_spec)*np.nan cont_val = np.zeros(N_spec)*np.nan cont_err = np.zeros(N_spec)*np.nan ampl_val = np.zeros(N_spec)*np.nan ampl_err = np.zeros(N_spec)*np.nan fwhm_val = np.zeros(N_spec)*np.nan fwhm_err = np.zeros(N_spec)*np.nan # loop spectra for i in range(N_spec): try: # weighted average of RV vrad_val[i], vrad_err[i] = _weighted_average(vrad_val_arr[i,:-1], vrad_err_arr[i,:-1]) # weighted average of continuum cont_val[i], cont_err[i] = _weighted_average(cont_val_arr[i,:-1], cont_err_arr[i,:-1]) # weighted average of amplitude ampl_val[i], ampl_err[i] = _weighted_average(ampl_val_arr[i,:-1], ampl_err_arr[i,:-1]) # weighted average of FWHM fwhm_val[i], fwhm_err[i] = _weighted_average(fwhm_val_arr[i,:-1], fwhm_err_arr[i,:-1]) except: continue # bisector normalized flux points bis_flux_val = np.arange(0.00, 1.00, 0.01) N_bis = len(bis_flux_val) # emtpty arrays for bisector and BIS value and error bis_vrad_val = np.zeros((N_spec,N_bis))*np.nan bis_vrad_err = np.zeros((N_spec,N_bis))*np.nan bis_val = np.zeros( N_spec )*np.nan bis_err = np.zeros( N_spec )*np.nan # loop spectra for i in range(N_spec): try: # CCF RV error ccf_err_vrad = (ccf_err[i,-1]*np.gradient(ccf_vrad)/np.gradient(ccf_val[i,-1]))**2 # bisector value bis_vrad_val_l = interp1d(ccf_val[i,-1][ccf_vrad<vrad_val[i]]/cont_val[i], ccf_vrad[ccf_vrad<vrad_val[i]], bounds_error=False)(bis_flux_val) bis_vrad_val_r = interp1d(ccf_val[i,-1][ccf_vrad>vrad_val[i]]/cont_val[i], ccf_vrad[ccf_vrad>vrad_val[i]], bounds_error=False)(bis_flux_val) bis_vrad_val[i] = (bis_vrad_val_l+bis_vrad_val_r)/2 # bisector error bis_vrad_err_l = interp1d(ccf_vrad[ccf_vrad<vrad_val[i]], ccf_err_vrad[ccf_vrad<vrad_val[i]], bounds_error=False)(bis_vrad_val_l) bis_vrad_err_r = interp1d(ccf_vrad[ccf_vrad>vrad_val[i]], ccf_err_vrad[ccf_vrad>vrad_val[i]], bounds_error=False)(bis_vrad_val_r) bis_vrad_err[i] = np.sqrt(bis_vrad_err_l**2 + bis_vrad_err_r**2)/2 # bisector normalized flux bis_flux_norm = 1-(1-bis_flux_val)/ampl_val[i] # bisector index for bottom and top idx_bis_bot = (bis_flux_norm>0.1) & (bis_flux_norm<0.4) idx_bis_top = (bis_flux_norm>0.6) & (bis_flux_norm<0.9) # BIS bottom and top value and error bis_bot_val = np.average(bis_vrad_val[i][idx_bis_bot]) bis_bot_err = np.average(bis_vrad_err[i][idx_bis_bot])/np.sqrt(np.sum(idx_bis_bot)) bis_top_val = np.average(bis_vrad_val[i][idx_bis_top]) bis_top_err = np.average(bis_vrad_err[i][idx_bis_top])/np.sqrt(np.sum(idx_bis_top)) # BIS value and error bis_val[i] = bis_top_val - bis_bot_val bis_err[i] = (bis_top_err**2 + bis_bot_err**2)**(1/2) except: continue # save data self.vrad = { "vrad_val" : vrad_val , "vrad_err" : vrad_err , "vrad_val_ord": vrad_val_arr, "vrad_err_ord": vrad_err_arr, "method" : "CCF" , } self.ccf = { "ccf_vrad" : ccf_vrad , "ccf_val" : ccf_val , "ccf_err" : ccf_err , "bis_vrad_val": bis_vrad_val, "bis_vrad_err": bis_vrad_err, "bis_flux_val": bis_flux_val, "cont_val" : cont_val , "cont_err" : cont_err , "ampl_val" : ampl_val , "ampl_err" : ampl_err , "fwhm_val" : fwhm_val , "fwhm_err" : fwhm_err , "bis_val" : bis_val , "bis_err" : bis_err , } return None
def _weighted_average( val_arr : np.ndarray, err_arr : np.ndarray ) -> tuple[float]: """Weighted average. Parameters ---------- val_arr : np.ndarray value array err_arr : np.ndarray error array Returns ------- tuple[float] weighted average value and error """ idx = ~np.isnan(val_arr) & ~np.isnan(err_arr) & (err_arr>0) if np.sum(idx) > 0: val_avg = np.average(val_arr[idx], weights=1/err_arr[idx]**2) err_avg = np.sqrt(1/np.sum(1/err_arr[idx]**2)) else: val_avg = np.nan err_avg = np.nan return val_avg, err_avg