from __future__ import annotations
import yaml
import shutil
import pickle
import json
from typing import Optional, Literal, Tuple
from pprint import pformat
from copy import deepcopy
from pathlib import Path
import h5py
import numpy as np
from ..sl_model import ParameterManager
__all__ = [
"create_config",
"load_preprocess_config",
"load_config",
"load_default_config",
"save_config",
"preprocess_config",
"append_exclude_info",
"Config",
]
def create_config(dir="./"):
template_dir = Path(__file__).resolve().parent/Path("templates")
target_dir = Path(dir)
target_dir.mkdir(parents=True, exist_ok=True)
lines = open(template_dir/"config.yml").readlines()
open(target_dir/"config.yml", "w").writelines(lines)
for _, fname in iter_config_names():
shutil.copy(template_dir/fname, target_dir)
def load_preprocess_config(dirname):
config = load_config(dirname)
preprocess_config(config)
return config
[docs]
def load_config(fname: str) -> Config:
"""Load the config.
Args:
fname: Path to the config file. It can be a pickle file stored using
``spectuner.save_config`` or a HDF file obtained from pixel-level
fitting. In addition, if this is a directory, it will load
the config defined by the YAML files.
Returns:
Loaded ``Config`` instance.
"""
fname = Path(fname)
if fname.is_file():
if h5py.is_hdf5(fname):
config = json.loads(h5py.File(fname).attrs["config"])
else:
config = pickle.load(open(fname, "rb"))
elif fname.is_dir():
config = yaml.safe_load(open(fname/"config.yml"))
for key, name in iter_config_names():
config[key] = yaml.safe_load(open(fname/name))
else:
raise ValueError(f"Unknown path: {fname}")
return Config(**config)
[docs]
def load_default_config() -> Config:
"""Load the default config.
Returns:
Default config.
"""
return load_config(Path(__file__).parent/"templates")
[docs]
def save_config(config: Config, fname: str):
"""Save the config to a pickle file.
Args:
config: Config to save.
fname: Saving name.
"""
if isinstance(config, Config):
pickle.dump(dict(config), open(fname, "wb"))
else:
raise ValueError(f"Unknown input: {config}")
def preprocess_config(config):
"""This function does the following (in-place):
1. Load ``spec`` in ``obs_info`` if applicable.
2. Load ``freqs_exclude`` in ``peak_manager`` if applicable.
"""
if config["obs_info"] is not None:
for item in config["obs_info"]:
if isinstance(item["spec"], str) or isinstance(item["spec"], Path):
item["spec"] = np.loadtxt(item["spec"])
freqs_exclude_in = config["peak_manager"]["freqs_exclude"]
if isinstance(freqs_exclude_in, str) or isinstance(freqs_exclude_in, Path):
config["peak_manager"]["freqs_exclude"] = np.loadtxt(freqs_exclude_in)
def iter_config_names():
keys = ["species", "modify"]
file_names = [
"species.yml",
"modify.yml"
]
return zip(keys, file_names)
def append_exclude_info(config, freqs_exclude, exclude_list):
config = deepcopy(config)
if config["peak_manager"]["freqs_exclude"] is None:
config["peak_manager"]["freqs_exclude"] = np.zeros(0)
config["peak_manager"]["freqs_exclude"] \
= np.append(config["peak_manager"]["freqs_exclude"], freqs_exclude)
if not config["prev"]["exclude_identified"]:
return config
if config["species"]["exclude_list"] is None:
config["species"]["exclude_list"] = []
config["species"]["exclude_list"].extend(exclude_list)
return config
[docs]
class Config(dict):
"""A subclass of dict with user-friendly methods to update the config."""
def __repr__(self) -> str:
return pformat(dict(self), sort_dicts=False)
[docs]
def append_spectral_window(self,
spec: np.ndarray,
beam_info: float | tuple,
noise: float,
T_bg: float=0.,
need_cmb: bool=True):
"""Add a spectral window to ``obs_info``.
Args:
spec: The observed spectrum given by a 2D array, with the first
column being the frequency in MHz and the second column
being the intensity in K.
beam_info: For single disk telescopes, this should
be a float indicating the telescope diameter in meter. For
interferometers, this should be (``BMAJ``, ``BMIN``) indicating
the beam size in degree.
noise: RMS noise in K.
T_bg: Background temperature in K.
need_cmb: If ``true``, additionally add 2.726 K to the background
temperature.
"""
if self["obs_info"] is None:
self["obs_info"] = []
item = {
"spec": spec,
"beam_info": beam_info,
"T_bg": T_bg,
"need_cmb": need_cmb,
"noise": noise,
}
if len(self["obs_info"]) == 0:
self["obs_info"].append(item)
return
spec_prev = self["obs_info"][-1]["spec"]
freq_max_prev = np.max(spec_prev[:, 0])
freq_min = np.min(item["spec"][:, 0])
if freq_max_prev >= freq_min:
raise ValueError("Spectral windows should be non-overlapping and in"
" asceding of frequency.")
self["obs_info"].append(item)
[docs]
def append_spectral_window_simple(self,
freq: np.ndarray,
beam_info: float | tuple,
T_bg: float=0.,
need_cmb: bool=True):
"""Add a spectral window to ``obs_info``.
This may be used for generating model spectra. For any fitting-related
tasks, please use ``append_spectral_window`` instead.
Args:
freq: Frequency grid in MHz.
beam_info: For single disk telescopes, this should
be a float indicating the telescope diameter in meter. For
interferometers, this should be (``BMAJ``, ``BMIN``) indicating
the beam size in degree.
T_bg: Background temperature in K.
need_cmb: If ``true``, additionally add 2.726 K to the background
temperature.
"""
spec = np.vstack([freq, np.zeros_like(freq)]).T
self.append_spectral_window(spec, beam_info, 0., T_bg, need_cmb)
[docs]
def set_fname_db(self, fname_db: str):
"""Set the path to the spectroscopic database.
Args:
fname_db: Path to the spectroscopic database.
"""
# Derive absolute path
fname_db_ = str(Path(fname_db).resolve())
self["sl_model"]["fname_db"] = fname_db_
[docs]
def set_n_process(self, n_process: int):
"""Set the number of processes for the multiprocessing pool.
Args:
n_process: Number of processes.
"""
self["n_process"] = n_process
def set_peak_manager(self,
noise_factor: float=4.,
rel_height: float=0.25,
freqs_exclude: Optional[np.ndarray]=None):
config_peak_mgr = self["peak_manager"]
config_peak_mgr["noise_factor"] = noise_factor
config_peak_mgr["rel_height"] = rel_height
config_peak_mgr["freqs_exclude"] = freqs_exclude
[docs]
def set_param_info(self,
param_name: Literal["theta", "T_ex", "N_tot", "delta_v", "v_offset"],
is_log: bool,
bound: Optional[Tuple[float, float]]=None,
is_shared: bool=False,
special: Optional[str]=None):
r"""Update the settings of a parameter.
Args:
param_name: Parameter name.
is_log: Whether to use log-scale during fitting for this parameter.
bound: Lower and upper limits used by optimizers for fitting. If
``is_log=True``, the limits should be in log-scale. For
example, if ``is_log=True``, ``bound=(12, 20)`` means that
the parameter is between :math:`10^{12}` and :math:`10^{20}`.
The unit of the limits follows:
- theta: arcsec
- T_ex: K
- N_tot: cm^-2
- delta_v: km/s
- v_offset: km/s
is_shared: Whether the parameter is shared in joint fitting of
different states and isotopologues.
special: Special parametrization. Now, this can only be used for
``theta``. If set ``special="eta"``, ``theta`` should be
treated as the filling factor :math:`\eta`, with
:math:`\eta = \theta^2/(\theta^2 + \theta_{\rm maj} \theta_{\rm min})`.
"""
if param_name not in ParameterManager.param_names:
raise ValueError("param_name should be one of {}.".format(
ParameterManager.param_names))
item = {
"is_shared": is_shared,
"is_log": is_log,
"bound": bound,
}
if special is not None:
item["special"] = special
self["param_info"][param_name] = item
[docs]
def set_optimizer(self, method: str, **kwargs):
"""Set the optimizer for the fitting.
Args:
method (str): Name of the optimizer.
- Use 'pso' (particle swarm optimization) for line
identification.
- Use 'slsqp' (sequential least squares programming implemented
in ``scipy.minimize``), for pixel-by-pixel fitting with a
nerual network.
**kwargs: Additional arguments for the optimizer.
- n_trial (int): Number of trials for 'pso'. The optimizer will
be run for n_trial times, and the best fit will be selected
among all trials. Defaults to 1.
- n_swarm (int): Number of particles for 'pso'. Defaults to 28.
- n_draw (int): Number of samples drawed by the neural network.
This only works for local optimizers such as 'slsqp'. The
code will compute the fitness for each sample and select the
best one as the initial guess. Defaults to 50.
"""
config_opt = self["optimizer"]
config_opt["method"] = method
config_opt.update(kwargs)
[docs]
def set_ident_species(self,
species: Optional[list],
collect_iso: bool=True,
combine_iso: bool=False,
combine_state: bool=False,
version_mode: Literal["default", "all"]="default",
include_hyper: bool=False,
exclude_list: Optional[list]=None,
rename_dict: Optional[dict]=None):
"""Set species for line identification.
Args:
species: List of species to include. If ``None``, inlcude all
possible species in the given frequency ranges.
collect_iso: If ``True``, collect isotopologues of molecules in
``species``.
combine_iso: If ``True``, combine isotopologues and fitting them
jointly.
combine_state: If ``True``, combine states and fitting them jointly.
version_mode: If set to ``'all'``, include all versions of the species
in the indiviudal fitting phase. Then, during the combining phase,
the best fit among the versions is used. Both ``combine_iso`` and
``combine_state`` must be ``False`` for this option to work.
Defaults to ``'default'``.
include_hyper: If ``True``, include hyperfine states.
exclude_list: List of species to exclude.
rename_dict: A dict to rename species.
"""
config_species = self["species"]
config_species["species"] = species
config_species["collect_iso"] = collect_iso
config_species["combine_iso"] = combine_iso
config_species["combine_state"] = combine_state
config_species["version_mode"] = version_mode
config_species["include_hyper"] = include_hyper
config_species["exclude_list"] = exclude_list
config_species["rename_dict"] = rename_dict
[docs]
def set_modificaiton_lists(self,
exclude_id_list: Optional[list]=None,
exclude_name_list: Optional[list]=None,
include_id_list: Optional[list]=None):
"""Set modification lists.
Args:
exclude_id_list: List of IDs to be removed from the combined result.
exclude_name_list: List of molecule names to be removed from the
combined result.
include_id_list: List of IDs to be added to the combined result.
"""
if exclude_id_list is None:
exclude_id_list = []
if exclude_name_list is None:
exclude_name_list = []
if include_id_list is None:
include_id_list = []
config_modify = self["modify"]
config_modify["exclude_id_list"] = exclude_id_list
config_modify["exclude_name_list"] = exclude_name_list
config_modify["include_id_list"] = include_id_list
[docs]
def set_previous_result(self, fname: str, exclude_identified: bool=True):
"""Set previous identification result.
Args:
fname: Path to the previous identification result.
exclude_identified: Whether to exclude the identified species in the
previous result.
"""
config_prev = self["prev"]
config_prev["fname"] = fname
config_prev["exclude_identified"] = exclude_identified
[docs]
def set_pixel_by_pixel_fitting(self,
species: list,
loss_fn: Literal["pm", "chi2"]="pm",
need_spectra: bool=True):
"""Set pixel-by-pixel fitting.
Args:
species: List of species to fit. The species will be fit jointly.
loss_fn: Loss function for fitting.
- ``"pm"``: Peak matching.
- ``"chi2"``: Chi-square.
need_spectra: Whether to save the best-fitting model spectrum.
"""
self["cube"]["species"] = species
self["cube"]["loss_fn"] = loss_fn
self["cube"]["need_spectra"] = need_spectra
[docs]
def set_inference_model(self,
ckpt: str,
device: str="cuda:0",
batch_size: int=64,
num_workers: int=2):
"""Set AI model related parameters.
Args:
ckpt: Path to the checkpoint file.
device: Device to use. Set ``device="cpu"`` if no GPU is available.
Defaults to ``"cuda:0"``.
batch_size: Batch size of inference. Reduce this number if GPU
memory is not enough. Defaults to 64.
num_workers: Number of workers for the data loader.
"""
config_inf = self["inference"]
config_inf["ckpt"] = ckpt
config_inf["device"] = device
config_inf["batch_size"] = batch_size
config_inf["num_workers"] = num_workers