Source code for arve.data.get_aux_data

import importlib.resources
import os
import urllib.request

import numpy                 as     np
import pandas                as     pd
from   scipy.interpolate     import interp1d
from   scipy.ndimage.filters import convolve

from typing import Literal

[docs] class get_aux_data:
[docs] def get_aux_data( self, mask_path : str | None = None , mask_medium : Literal["vac", "air"] = "vac", tell_wave : np.ndarray | None = None , tell_flux : np.ndarray | None = None , tell_lim : float = 0.99 , exclude_regions : list[list[float]] | None = None , ) -> None: """Get auxiliary data. Parameters ---------- mask_path : str | None, optional path to self-provided line mask (must be a CSV file where the relevant columns are named the same as in the package-provided masks), by default None mask_medium : Literal["vac", "air"], optional medium of mask wavelengths, by default "vac" tell_wave : np.ndarray | None, optional wavelength values of normalized telluric spectrum (to replace the general model), by default None tell_flux : np.ndarray | None, optional flux values of normalized telluric spectrum (to replace the general model), by default None tell_lim : float, optional normalized flux limit on telluric features (to add a criterion column for stellar lines unaffected by tellurics), by default 0.99 exclude_regions : list[list[float]] | None, optional wavelength intervals to be excluded (to add a criterion column for stellar lines outside the excluded regions), by default None Returns ------- None None """ # read wavelengh values wave_val = self.spec["wave_val"] # read resolution and medium resolution = self.spec["resolution"] medium = self.spec["medium"] # read systematic velocity and maximum BERV vrad_sys = self.arve.star.stellar_parameters["vrad_sys"] berv_max = self.arve.star.stellar_parameters["berv_max"] # paths to auxiliary data resource_aux_data = importlib.resources.files("arve.aux_data") with importlib.resources.as_file(resource_aux_data) as path: path_aux_data = str(path) + "/" path_aux_mask = path_aux_data+"masks/" path_aux_spec = path_aux_data+"spectra/" path_aux_tell = path_aux_data+"tellurics/" path_aux_wave = path_aux_data+"wavelengths/" # search file names and spectral types among masks files = sorted(os.listdir(path_aux_mask)) files = [file for file in files if file.endswith(".csv.zip")] sptypes_data = [file.split(".")[0] for file in files] sptypes_data_num = np.array([self.arve.functions.sptype_to_num(sptype=sptype) for sptype in sptypes_data]) # spectral type as number sptype_star = self.arve.star.stellar_parameters["sptype"] sptype_star_num = self.arve.functions.sptype_to_num(sptype=sptype_star) # file of closest spectral type available in auxiliary data idx_file = np.argmin(np.abs(sptype_star_num-sptypes_data_num)) file = files[idx_file] # read closest mask if mask_path is None: mask = pd.read_csv(path_aux_mask+file) mask_name = file # read self-provided mask else: mask = pd.read_csv(mask_path) mask_name = mask_path.split("/")[-1] # convert wavelengths to correct medium if (medium=="air") & (mask_medium=="vac"): mask["wave" ] = self.arve.functions.convert_vac_to_air(mask["wave" ]) mask["wave_l"] = self.arve.functions.convert_vac_to_air(mask["wave_l"]) mask["wave_u"] = self.arve.functions.convert_vac_to_air(mask["wave_u"]) if (medium=="vac") & (mask_medium=="air"): mask["wave" ] = self.arve.functions.convert_air_to_vac(mask["wave" ]) mask["wave_l"] = self.arve.functions.convert_air_to_vac(mask["wave_l"]) mask["wave_u"] = self.arve.functions.convert_air_to_vac(mask["wave_u"]) # keep only mask lines within wavelength range mask_dict = {} for i in range(self.spec["N_ord"]): idx_wave_l = np.searchsorted(mask["wave_l"], wave_val[i][ 0]) idx_wave_u = np.searchsorted(mask["wave_u"], wave_val[i][-1]) idx_wave = np.arange(idx_wave_l, idx_wave_u) mask_dict[i] = mask.iloc[idx_wave].reset_index(drop=True) # download closest spectrum from GitHub if it does not already exist in the package directory if not os.path.exists(path_aux_spec+file): path_github = "https://raw.githubusercontent.com/almoulla/arve/main/arve/aux_data/spectra/" urllib.request.urlretrieve(path_github+file, path_aux_spec+file) # read closest spectrum wave = pd.read_csv(path_aux_wave+"WAVE.csv.zip")["wave"] spec = pd.read_csv(path_aux_spec+file) if medium == "air": wave = self.arve.functions.convert_vac_to_air(wave) spec.insert(0, "wave", wave) # convolve spectrum if resolution is not None: spec["flux"] = _convolve_gaussian(spec["wave"].values, spec["flux"].values, resolution) # interpolate spectrum spec_wave = wave_val spec_flux = np.array([interp1d(spec["wave"], spec["flux"], kind="cubic", bounds_error=False)(spec_wave[i]) for i in range(self.spec["N_ord"])]) spec_temp = np.array([interp1d(spec["wave"], spec["temp"], kind="cubic", bounds_error=False)(spec_wave[i]) for i in range(self.spec["N_ord"])]) spec_dict = { "wave": spec_wave, "flux": spec_flux, "temp": spec_temp } # find lower and upper index of lines for i in range(self.spec["N_ord"]): if "wave" in mask_dict[0].columns: col_ref = "wave" if "flux" in mask_dict[0].columns: col_ref = "flux" mask_dict[i].insert(mask_dict[i].columns.get_loc(col_ref+"_l")+2, "idx_l", np.searchsorted(spec_dict["wave"][i], mask_dict[i]["wave_l"]) ) mask_dict[i].insert(mask_dict[i].columns.get_loc(col_ref+"_u")+2, "idx_u", np.searchsorted(spec_dict["wave"][i], mask_dict[i]["wave_u"])-1) mask_dict[i]["wave_l"] = spec_dict["wave"][i][mask_dict[i]["idx_l"]] mask_dict[i]["wave_u"] = spec_dict["wave"][i][mask_dict[i]["idx_u"]] # construct telluric spectrum with a general model interpolated and convolved as the stellar spectrum if (tell_wave is None) & (tell_flux is None): # read telluric spectrum wave = pd.read_csv(path_aux_wave+"WAVE.csv.zip")["wave"] tell = pd.read_csv(path_aux_tell+"TELL.csv.zip") if medium == "air": wave = self.arve.functions.convert_vac_to_air(wave) wave = self.arve.functions.doppler_shift(wave=wave, v=-vrad_sys) tell.insert(0, "wave", wave) # convolve telluric spectrum if resolution is not None: tell["flux"] = _convolve_gaussian(tell["wave"].values, tell["flux"].values, resolution) # interpolate telluric spectrum tell_wave = wave_val tell_flux = np.array([interp1d(tell["wave"], tell["flux"], kind="cubic", bounds_error=False)(tell_wave[i]) for i in range(self.spec["N_ord"])]) tell_dict = { "wave": tell_wave, "flux": tell_flux } # or use provided telluric spectrum (assumed to have the same resolution as the stellar spectrum but not necessarily sampled on the same wavelength grid) else: # concatenate stellar spectrum tell_wave_1d = np.concatenate(tell_wave) tell_flux_1d = np.concatenate(tell_flux) idx_sort = np.argsort(tell_wave_1d) # create DataFrame with flattened telluric spectrum tell = pd.DataFrame() tell["wave"] = tell_wave_1d[idx_sort] tell["flux"] = tell_flux_1d[idx_sort] # create dictionary with interpolated telluric spectrum tell_wave = wave_val tell_flux = np.array([interp1d(tell_wave[i], tell_flux[i], kind="cubic", bounds_error=False)(tell_wave[i]) for i in range(self.spec["N_ord"])]) tell_dict = { "wave": tell_wave, "flux": tell_flux } # find lower and upper bounds of telluric bands tell_wave = np.array(tell["wave"]) tell_flux = np.array(tell["flux"]) idx_above = np.where(tell_flux>=tell_lim)[0] idx_below = np.where(tell_flux< tell_lim)[0] if len(idx_above) == 0: tell_wave_l = np.array([tell_wave[ 0]]) tell_wave_u = np.array([tell_wave[-1]]) elif len(idx_below) == 0: tell_wave_l = np.array([]) tell_wave_u = np.array([]) else: idx_l = idx_above[:-1][np.diff(idx_above)>1] idx_u = idx_below[:-1][np.diff(idx_below)>1] if tell_flux[ 0] < tell_lim: idx_l = np.append(0, idx_l) if tell_flux[-1] >= tell_lim: idx_u = np.append(idx_u, idx_below[-1]) if tell_flux[-1] < tell_lim: idx_l = np.append(idx_l, idx_above[-1]) idx_u = np.append(idx_u, len(tell_flux)-1) tell_wave_l = self.arve.functions.doppler_shift(wave=tell_wave[idx_l], v=-berv_max) tell_wave_u = self.arve.functions.doppler_shift(wave=tell_wave[idx_u], v= berv_max) # create new telluric DataFrame with lower and upper bounds of telluric bands band = pd.DataFrame() band["wave_l"] = tell_wave_l band["wave_u"] = tell_wave_u # keep telluric bands which overlap with data band_dict = {} for i in range(self.spec["N_ord"]): idx_wave = (band["wave_l"]>wave_val[i][-1]) | (band["wave_u"]<wave_val[i][0]) == False band_dict[i] = band[idx_wave].reset_index(drop=True) band_dict[i]["wave_l"][band_dict[i]["wave_l"]<wave_val[i][ 0]] = wave_val[i][ 0] band_dict[i]["wave_u"][band_dict[i]["wave_u"]>wave_val[i][-1]] = wave_val[i][-1] # create new telluric DataFrame with lower and upper bounds of excluded regions if exclude_regions is not None: exclude_regions = np.array(exclude_regions) excl = pd.DataFrame() excl["wave_l"] = exclude_regions[:,0] excl["wave_u"] = exclude_regions[:,1] # keep excluded regions which overlap with data excl_dict = {} for i in range(self.spec["N_ord"]): if exclude_regions is not None: idx_wave = (excl["wave_l"]>wave_val[i][-1]) | (excl["wave_u"]<wave_val[i][0]) == False excl_dict[i] = excl[idx_wave].reset_index(drop=True) excl_dict[i]["wave_l"][excl_dict[i]["wave_l"]<wave_val[i][ 0]] = wave_val[i][ 0] excl_dict[i]["wave_u"][excl_dict[i]["wave_u"]>wave_val[i][-1]] = wave_val[i][-1] else: excl_dict[i] = pd.DataFrame(columns=["wave_l", "wave_u"]) # criterion: lines not within telluric bands for i in range(self.spec["N_ord"]): band_wave_l = band_dict[i]["wave_l"].values band_wave_u = band_dict[i]["wave_u"].values mask_wave_l = mask_dict[i]["wave_l"].values mask_wave_u = mask_dict[i]["wave_u"].values mask_dict[i]["crit_tell"] = np.sum([(mask_wave_l>band_wave_u[j]) | (mask_wave_u<band_wave_l[j]) for j in range(len(band_dict[i]))], axis=0) == len(band_dict[i]) # criterion: lines not within excluded regions for i in range(self.spec["N_ord"]): excl_wave_l = excl_dict[i]["wave_l"].values excl_wave_u = excl_dict[i]["wave_u"].values mask_wave_l = mask_dict[i]["wave_l"].values mask_wave_u = mask_dict[i]["wave_u"].values mask_dict[i]["crit_excl"] = np.sum([(mask_wave_l>excl_wave_u[j]) | (mask_wave_u<excl_wave_l[j]) for j in range(len(excl_dict[i]))], axis=0) == len(excl_dict[i]) # save self.aux_data = {"name": mask_name, "mask": mask_dict, "spec": spec_dict, "tell": tell_dict, "band": band_dict, "excl": excl_dict } return None
def _convolve_gaussian( wave : np.ndarray, flux : np.ndarray, resolution : float ) -> np.ndarray: """Convolve spectrum with a Gaussian instrumental profile. Adapted from: https://pysme-astro.readthedocs.io/en/latest/_modules/pysme/broadening.html Parameters ---------- wave : np.ndarray wavelength values flux : np.ndarray flux values resolution : float instrumental resolution Returns ------- np.ndarray flux values convolved with a Gaussian instrumental profile """ # nr. of wavelength points Nwave = len(wave) # half-width at half-maximum hwhm = 0.5*wave[0]/resolution # uniform dispersion wave_uan = wave[-1]-wave[0] wave_del = wave_uan/(Nwave-1) # nr. of points in half Gaussian Nhalf = int(4/np.sqrt(2*np.log(2))*hwhm/wave_del) # nr. of points in fill Gaussian (odd) Nfull = 2*Nhalf+1 # wavelength grid of Gaussian wave_grid = wave_del*(np.arange(Nfull)-(Nfull-1)/2) # standard deviation sigma = hwhm/np.sqrt(2*np.log(2)) # normalized Gaussian gauss = np.exp(-(wave_grid/sigma)**2/2) gauss /= np.sum(gauss) # convolve spectrum flux = convolve(flux, gauss, mode="nearest") return flux