import importlib.metadata
import packaging.version
import importlib.metadata
import packaging.version
from astropy.coordinates import SkyCoord, Distance
import astropy.units as u
from astropy.time import Time
from astroquery.simbad import SimbadClass
import astropy.config
from astropy.io import ascii
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
import numpy as np
import os
import shutil
from spectractor import parameters
from spectractor.config import set_logger
from spectractor.extractor.spectroscopy import (Lines, HGAR_LINES, HYDROGEN_LINES, ATMOSPHERIC_LINES,
ISM_LINES, STELLAR_LINES)
from getCalspec import getCalspec
# Astroquery versions change the Simbad API.
_astroquery_version = packaging.version.parse(importlib.metadata.version("astroquery"))
if _astroquery_version < packaging.version.parse("0.4.8"):
_USE_NEW_SIMBAD = False
_SIMBAD_VOTABLE_FIELDS = ('flux(U)', 'flux(B)', 'flux(V)', 'flux(R)', 'flux(I)', 'flux(J)', 'sptype',
'parallax', 'pm', 'z_value', "IDS", "ids")
else:
_USE_NEW_SIMBAD = True
_SIMBAD_VOTABLE_FIELDS = ('U', 'B', 'V', 'R', 'I', 'J', 'sp_type',
'parallax', 'propermotions', 'rvz_redshift', "IDS", "ids")
try:
from gaiaspec import getGaia
except ModuleNotFoundError:
getGaia = None
def _get_cache_dir():
cache = os.path.join(astropy.config.get_cache_dir(), "astroquery", "Simbad")
os.makedirs(cache, exist_ok=True)
return cache
def _get_cache_file(tag):
filename = tag.replace("*", "").replace(" ", "_").replace(".", "_")
return filename
def _clean_cache_dir():
cache = _get_cache_dir()
shutil.rmtree(cache)
[docs]
def load_target(label, verbose=False):
"""Load the target properties according to the type set by parameters.OBS_OBJECT_TYPE.
Currently, the type can be either "STAR", "HG-AR" or "MONOCHROMATOR". The label parameter gives the
name of the source and allows to load its specific properties.
Parameters
----------
label: str
The label of the target.
verbose: bool, optional
If True, more verbosity (default: False).
Examples
--------
>>> parameters.OBS_OBJECT_TYPE = "STAR"
>>> t = load_target("HD111980", verbose=False)
>>> print(t.label)
HD111980
>>> print(t.radec_position.dec) # doctest: +ELLIPSIS
-18d31m...s
>>> parameters.OBS_OBJECT_TYPE = "MONOCHROMATOR"
>>> t = load_target("XX", verbose=False)
>>> print(t.label)
XX
>>> parameters.OBS_OBJECT_TYPE = "HG-AR"
>>> t = load_target("XX", verbose=False)
>>> print([line.wavelength for line in t.lines.lines][:5])
[253.652, 296.728, 302.15, 313.155, 334.148]
"""
if parameters.OBS_OBJECT_TYPE == 'STAR':
return Star(label, verbose)
elif parameters.OBS_OBJECT_TYPE == 'HG-AR':
return ArcLamp(label, verbose)
elif parameters.OBS_OBJECT_TYPE == 'MONOCHROMATOR':
return Monochromator(label, verbose)
elif parameters.OBS_OBJECT_TYPE == "LED":
return Led(label, verbose)
else:
raise ValueError(f'Unknown parameters.OBS_OBJECT_TYPE: {parameters.OBS_OBJECT_TYPE}')
[docs]
class Target:
def __init__(self, label, verbose=False):
"""Initialize Target class.
Parameters
----------
label: str
String label to name the target
verbose: bool, optional
Set True to increase verbosity (default: False)
"""
self.my_logger = set_logger(self.__class__.__name__)
self.label = label
self.type = None
self.wavelengths = []
self.spectra = []
self.spectra_err = []
self.verbose = verbose
self.emission_spectrum = False
self.hydrogen_only = False
self.sed = None
self.sed_err = None
self.lines = None
self.radec_position = None
self.radec_position_after_pm = None
self.redshift = 0
self.image = None
self.image_x0 = None
self.image_y0 = None
self.starfield = None
[docs]
class ArcLamp(Target):
def __init__(self, label, verbose=False):
"""Initialize ArcLamp class.
Parameters
----------
label: str
String label to name the lamp.
verbose: bool, optional
Set True to increase verbosity (default: False)
Examples
--------
Mercury-Argon lamp:
>>> t = ArcLamp("HG-AR", verbose=False)
>>> print([line.wavelength for line in t.lines.lines][:5])
[253.652, 296.728, 302.15, 313.155, 334.148]
>>> print(t.emission_spectrum)
True
"""
Target.__init__(self, label, verbose=verbose)
self.my_logger = set_logger(self.__class__.__name__)
self.emission_spectrum = True
self.lines = Lines(HGAR_LINES, emission_spectrum=True, orders=[1, 2])
[docs]
def load(self): # pragma: no cover
pass
[docs]
class Monochromator(Target):
def __init__(self, label, verbose=False):
"""Initialize Monochromator class.
Parameters
----------
label: str
String label to name the monochromator.
verbose: bool, optional
Set True to increase verbosity (default: False)
Examples
--------
>>> t = Monochromator("XX", verbose=False)
>>> print(t.label)
XX
>>> print(t.emission_spectrum)
True
"""
Target.__init__(self, label, verbose=verbose)
self.my_logger = set_logger(self.__class__.__name__)
self.emission_spectrum = True
self.lines = Lines([], emission_spectrum=True, orders=[1, 2])
[docs]
def load(self): # pragma: no cover
pass
[docs]
class Led(Target):
def __init__(self, label, verbose=False):
"""Initialize Led class.
Parameters
----------
label: str
String label to name the led.
verbose: bool, optional
Set True to increase verbosity (default: False)
Examples
--------
>>> t = Led("XX", verbose=False)
>>> print(t.label)
XX
>>> print(t.emission_spectrum)
True
"""
Target.__init__(self, label, verbose=verbose)
self.my_logger = set_logger(self.__class__.__name__)
self.emission_spectrum = True
self.lines = Lines([], emission_spectrum=True, orders=[1, 2])
[docs]
def load(self): # pragma: no cover
pass
[docs]
def patchSimbadURL(simbad):
"""Monkeypatch the URL that Simbad is using to force it to use https.
This is necessary to make the tests on github actions work, because
for some reason it wasn't automatically upgrading to https otherwise.
"""
simbad.SIMBAD_URL = simbad.SIMBAD_URL.replace("http:", "https:")
[docs]
class Star(Target):
def __init__(self, label, verbose=False):
"""Initialize Star class.
Parameters
----------
label: str
String label to name the target
verbose: bool, optional
Set True to increase verbosity (default: False)
Examples
--------
Emission line object:
>>> s = Star('PNG321.0+3.9')
>>> print(s.label)
PNG321.0+3.9
>>> print(s.radec_position.dec) # doctest: +ELLIPSIS
-54d18m07.521...s
>>> print(s.emission_spectrum)
True
Standard star:
>>> s = Star('HD111980')
>>> print(s.label)
HD111980
>>> print(s.radec_position.dec) # doctest: +ELLIPSIS
-18d31m...s
>>> print(s.emission_spectrum)
False
"""
Target.__init__(self, label, verbose=verbose)
self.my_logger = set_logger(self.__class__.__name__)
self.load()
[docs]
def load(self):
"""Load the coordinates of the target.
Examples
--------
>>> parameters.VERBOSE = True
>>> s = Star('PNG321.0+3.9')
>>> print(s.radec_position.dec) # doctest: +ELLIPSIS
-54d18m07...s
>>> print(s.redshift) # doctest: +ELLIPSIS
-0.00021...
>>> s = Star('eta dor')
>>> print(s.radec_position.dec) # doctest: +ELLIPSIS
-66d02m22...s
>>> s = Star('mu.col')
>>> print(s.radec_position.dec) # doctest: +ELLIPSIS
-32d18m23...s
"""
date_reference="J2000"
if not getCalspec.is_calspec(self.label) and getCalspec.is_calspec(self.label.replace(".", " ")):
self.label = self.label.replace(".", " ")
astroquery_label = self.label
if getCalspec.is_calspec(self.label):
calspec = getCalspec.Calspec(self.label)
astroquery_label = calspec.Astroquery_Name
cache_location = _get_cache_dir()
cache_file = _get_cache_file(astroquery_label)
if os.path.exists(os.path.join(cache_location, f"{cache_file}.ecsv")):
self.my_logger.debug(f"\n\tLoad {self.label} coordinates from cached file {cache_file}.ecsv")
self.simbad_table = ascii.read(os.path.join(cache_location, f"{cache_file}.ecsv"))
else:
# explicitly make a class instance here because:
# when using ``from astroquery.simbad import Simbad`` and then using
# ``Simbad...`` methods secretly makes an instance, which stays around,
# has a connection go stale, and then raises an exception seemingly
# at some random time later
simbadQuerier = SimbadClass()
patchSimbadURL(simbadQuerier)
simbadQuerier.add_votable_fields(*_SIMBAD_VOTABLE_FIELDS)
self.my_logger.debug(f"\n\tDownload {self.label} coordinates from Simbad...")
self.simbad_table = simbadQuerier.query_object(astroquery_label)
self.simbad_table.write(os.path.join(cache_location,f"{cache_file}.ecsv"), overwrite=True)
if "ra" in self.simbad_table.keys():
ra_key = "ra"
dec_key = "dec"
redshift_key = "rvz_redshift"
else:
ra_key = "RA"
dec_key = "DEC"
redshift_key = "Z_VALUE"
if self.simbad_table is not None:
if self.verbose:
self.my_logger.info(f'\n\tSimbad:\n{self.simbad_table}')
if _USE_NEW_SIMBAD:
self.radec_position = SkyCoord(self.simbad_table[ra_key][0], self.simbad_table[dec_key][0], unit="deg")
else:
self.radec_position = SkyCoord(
self.simbad_table[ra_key][0] + ' ' + self.simbad_table[dec_key][0], unit=(u.hourangle, u.deg)
)
else:
raise RuntimeError(f"Target {self.label} not found in Simbad")
if not np.ma.is_masked(self.simbad_table[redshift_key][0]):
self.redshift = float(self.simbad_table[redshift_key][0])
else:
self.redshift = 0
self.get_radec_position_after_pm(self.simbad_table,
date_obs="J2000",
date_reference = date_reference)
self.load_spectra()
[docs]
def load_spectra(self):
"""Load reference spectra from getCalspec database or NED database.
If the object redshift is >0.2, the LAMBDA_MIN and LAMBDA_MAX parameters
are redshifted accordingly.
Examples
--------
>>> s = Star('HD111980')
>>> print(s.spectra[0][:4])
[2.2839e-13 2.0263e-13 2.0889e-13 2.3928e-13]
>>> print(f'{parameters.LAMBDA_MIN:.1f}, {parameters.LAMBDA_MAX:.1f}')
300.0, 1100.0
>>> print(s.spectra[0][:4])
[2.2839e-13 2.0263e-13 2.0889e-13 2.3928e-13]
"""
self.wavelengths = [] # in nm
self.spectra = []
self.spectra_err = []
# first try if it is a Calspec star
is_calspec = getCalspec.is_calspec(self.label)
if getGaia is None:
is_gaiaspec = False
is_gaia_full = False
else:
is_gaiaspec = getGaia.is_gaiaspec(self.label)
is_gaia_full = False
if is_gaiaspec == False:
is_gaia_full = getGaia.is_gaia_full(self.label)
if is_calspec:
self.load_calspec()
elif is_gaiaspec|is_gaia_full:
self.load_gaia()
# TODO DM-33731: the use of self.label in parameters.STAR_NAMES:
# below works for running but breaks a test so needs fixing for DM
elif 'HD' in self.label: # or self.label in parameters.STAR_NAMES: # it is a star
self.load_emission_spectrum(hydrogen_only_flag=False)
elif 'PNG' in self.label:
self.load_emission_spectrum(hydrogen_only_flag=True)
else: # maybe a quasar, try with NED query
self.load_ned()
self.build_sed()
self.my_logger.debug(f"\n\tTarget label: {self.label}"
f"\n\tCalspec? {is_calspec}"
f"\n\tNumber of spectra: {len(self.spectra)}"
f"\n\tRedshift: {self.redshift}"
f"\n\tEmission spectrum ? {self.emission_spectrum}")
if self.lines is not None and len(self.lines.lines) > 0:
self.my_logger.debug(f"\n\tLines: {[l.label for l in self.lines.lines]}")
[docs]
def load_calspec(self):
calspec = getCalspec.Calspec(self.label)
self.emission_spectrum = False
self.hydrogen_only = False
self.lines = Lines(
HYDROGEN_LINES + ATMOSPHERIC_LINES + STELLAR_LINES,
redshift=self.redshift,
emission_spectrum=self.emission_spectrum,
hydrogen_only=self.hydrogen_only,
)
spec_dict = calspec.get_spectrum_numpy()
# official units in spectractor are nanometers for wavelengths and erg/s/cm2/nm for fluxes
spec_dict["WAVELENGTH"] = spec_dict["WAVELENGTH"].to(u.nm)
for key in ["FLUX", "STATERROR", "SYSERROR"]:
spec_dict[key] = spec_dict[key].to(u.erg / u.second / u.cm**2 / u.nm)
self.wavelengths.append(spec_dict["WAVELENGTH"].value)
self.spectra.append(spec_dict["FLUX"].value)
self.spectra_err.append(np.sqrt(spec_dict["STATERROR"].value**2+spec_dict["SYSERROR"].value**2))
[docs]
def load_gaia(self):
"""
Load the spectrum from the Gaia database.
Examples
--------
>>> s = Star('HD111980')
>>> s.load_gaia()
>>> s.plot_spectra()
"""
gaia = getGaia.Gaia(self.label)
if "PNG" in self.label:
self.emission_spectrum = True
else:
self.emission_spectrum = False
self.hydrogen_only = False
self.lines = Lines(
HYDROGEN_LINES + ATMOSPHERIC_LINES + STELLAR_LINES,
redshift=self.redshift,
emission_spectrum=self.emission_spectrum,
hydrogen_only=self.hydrogen_only,
)
spec_dict = gaia.get_spectrum_numpy()
# official units in spectractor are nanometers for wavelengths and erg/s/cm2/nm for fluxes
spec_dict["WAVELENGTH"] = spec_dict["WAVELENGTH"].to(u.nm)
for key in ["FLUX", "STATERROR", "SYSERROR"]:
spec_dict[key] = spec_dict[key].to(u.erg / u.second / u.cm**2 / u.nm)
self.wavelengths.append(spec_dict["WAVELENGTH"].value)
self.spectra.append(spec_dict["FLUX"].value)
self.spectra_err.append(np.sqrt(spec_dict["STATERROR"].value**2+spec_dict["SYSERROR"].value**2))
[docs]
def load_emission_spectrum(self, hydrogen_only_flag):
if hydrogen_only_flag:
self.emission_spectrum = True
self.lines = Lines(
ATMOSPHERIC_LINES + ISM_LINES + HYDROGEN_LINES,
redshift=self.redshift,
emission_spectrum=self.emission_spectrum,
hydrogen_only=self.hydrogen_only,
)
else:
self.emission_spectrum = False
self.hydrogen_only = False
self.lines = Lines(
ATMOSPHERIC_LINES + HYDROGEN_LINES + STELLAR_LINES,
redshift=self.redshift,
emission_spectrum=self.emission_spectrum,
hydrogen_only=self.hydrogen_only,
)
[docs]
def load_ned(self):
"""
Load the spectrum from NED database.
Examples
--------
>>> s = Star('3C273')
>>> s.load_ned()
>>> s.plot_spectra()
"""
from astroquery.ned import Ned
try:
hdulists = Ned.get_spectra(self.label) # , show_progress=False)
except Exception as err:
raise err
if len(hdulists) > 0:
self.emission_spectrum = True
self.hydrogen_only = False
if self.redshift > 0.2:
self.hydrogen_only = True
parameters.LAMBDA_MIN *= 1 + self.redshift
parameters.LAMBDA_MAX *= 1 + self.redshift
self.lines = Lines(
ATMOSPHERIC_LINES + ISM_LINES + HYDROGEN_LINES,
redshift=self.redshift,
emission_spectrum=self.emission_spectrum,
hydrogen_only=self.hydrogen_only,
)
for k, h in enumerate(hdulists):
if h[0].data is None:
continue
if h[0].header["NAXIS"] == 1:
self.spectra.append(h[0].data)
else:
for d in h[1].data:
self.spectra.append(d)
wave_n = len(self.spectra[-1])
if h[0].header["NAXIS"] == 2:
wave_n = len(h[0].data.T)
wave_step = h[0].header["CDELT1"]
wave_start = (
h[0].header["CRVAL1"] - (h[0].header["CRPIX1"] - 1) * wave_step
)
wave_end = wave_start + wave_n * wave_step
waves = np.linspace(wave_start, wave_end, wave_n)
is_angstrom = False
for key in list(h[0].header.keys()):
if "angstrom" in str(h[0].header[key]).lower():
is_angstrom = True
if is_angstrom:
waves *= 0.1
if h[0].header["NAXIS"] > 1:
for i in range(h[0].header["NAXIS"] + 1):
self.wavelengths.append(waves)
else:
self.wavelengths.append(waves)
[docs]
def get_radec_position_after_pm(self, table_coordinates, date_obs="J2000", date_reference="J2000"):
if table_coordinates is not None:
if "pmra" in table_coordinates[0].keys():
pmra_key = 'pmra'
pmdec_key = 'pmdec'
plx_value_key = 'plx_value'
else:
pmra_key = 'PMRA'
pmdec_key = 'PMDEC'
plx_value_key = 'PLX_VALUE'
target_pmra = table_coordinates[0][pmra_key] * u.mas / u.yr
if np.isnan(target_pmra):
target_pmra = 0 * u.mas / u.yr
target_pmdec = table_coordinates[0][pmdec_key] * u.mas / u.yr
if np.isnan(target_pmdec):
target_pmdec = 0 * u.mas / u.yr
target_parallax = table_coordinates[0][plx_value_key] * u.mas
if target_parallax == 0 * u.mas:
target_parallax = 1e-4 * u.mas
target_coord = SkyCoord(ra=self.radec_position.ra, dec=self.radec_position.dec,
distance=Distance(parallax=target_parallax),
pm_ra_cosdec=target_pmra, pm_dec=target_pmdec, frame='icrs', equinox="J2000",
obstime=date_reference)
self.radec_position_after_pm = target_coord.apply_space_motion(new_obstime=Time(date_obs))
return self.radec_position_after_pm
else:
self.my_logger.warning("No Simbad table provided: can't apply proper motion correction. "
"Return original (RA,DEC) coordinates of the object.")
return self.radec_position
[docs]
def build_sed(self, index=0):
"""Interpolate the database reference spectra and return self.sed as a function of the wavelength.
Parameters
----------
index: int
Index of the spectrum stored in the self.spectra list
Examples
--------
>>> s = Star('HD111980')
>>> s.build_sed(index=0)
>>> s.sed(550)
array(1.67508011e-11)
"""
if len(self.spectra) == 0:
self.sed = interp1d(parameters.LAMBDAS, np.zeros_like(parameters.LAMBDAS), kind='linear', bounds_error=False,
fill_value=0.)
self.sed_err = interp1d(parameters.LAMBDAS, np.zeros_like(parameters.LAMBDAS), kind='linear', bounds_error=False,
fill_value=0.)
else:
self.sed = interp1d(self.wavelengths[index], self.spectra[index], kind='linear', bounds_error=False,
fill_value=0.)
if len(self.spectra_err) == 0:
self.sed_err = interp1d(self.wavelengths[index], np.zeros_like(self.spectra[index]), kind='linear', bounds_error=False,
fill_value=0.)
else:
self.sed_err = interp1d(self.wavelengths[index], self.spectra_err[index], kind='linear', bounds_error=False,
fill_value=10*np.max(self.spectra_err[index]))
[docs]
def plot_spectra(self):
""" Plot the spectra stored in the self.spectra list.
Examples
--------
>>> s = Star('HD111980')
>>> s.plot_spectra()
"""
# target.load_spectra() ## No global target object available here (SDC)
plt.figure() # necessary to create a new plot (SDC)
for isp, sp in enumerate(self.spectra):
plt.plot(self.wavelengths[isp], sp, label=f'Spectrum {isp}')
plt.xlim((300, 1100))
plt.xlabel(r'$\lambda$ [nm]')
plt.ylabel('Flux [erg/s/cm2/nm]')
plt.title(self.label)
plt.legend()
if parameters.DISPLAY: # pragma: no cover
plt.show()
if parameters.PdfPages:
parameters.PdfPages.savefig()
if __name__ == "__main__":
import doctest
doctest.testmod()