Source code for Starfish.grid_tools.utils

import os
import logging
import tqdm
from urllib.request import urlretrieve, URLError
import multiprocessing as mp
import itertools

import numpy as np


log = logging.getLogger(__name__)


[docs]def download_PHOENIX_models(path, ranges=None, parameters=None): """ Download the PHOENIX grid models from the Goettingen servers. This will skip over any ill-defined files or any files that already exist on disk in the given folder. Parameters ---------- path : str or path-like The base directory to save the files in. ranges : iterable of (min, max), optional Each entry in ranges should be (min, max) for the associated parameter, in the order [Teff, logg, Z, (Alpha)]. Cannot be used with :attr:`parameters`. Default is None parameters : iterable of iterables of length 3 or length 4, optional The parameters to download. Should be a list of parameters where parameters can either be [Teff, logg, Z] or [Teff, logg, Z, Alpha]. All values should be floats or integers and not string. If no value provided, will download all models. Default is None Raises ------ ValueError If both ``parameters`` and ``ranges`` are specified Warning ------- This will create any directories if they do not exist Warning ------- Please use this responsibly to avoid over-saturating the connection to the Gottingen servers. Examples -------- .. code-block:: python from Starfish.grid_tools import download_PHOENIX_models ranges = [ [5000, 5200] # T [4.0, 5.0] # logg [0, 0] # Z ] download_PHOENIX_models(path='models', ranges=ranges) or equivalently using ``parameters`` syntax .. code-block:: python from itertools import product from Starfish.grid_tools import download_PHOENIX_models T = [6000, 6100, 6200] logg = [4.0, 4.5, 5.0] Z = [0] params = product(T, logg, Z) download_PHOENIX_models(path='models', parameters=params) """ if parameters is not None and ranges is not None: raise ValueError("Cannot specify both 'parameters' and 'ranges'") wave_url = "http://phoenix.astro.physik.uni-goettingen.de/data/HiResFITS/WAVE_PHOENIX-ACES-AGSS-COND-2011.fits" wave_file = os.path.join(path, "WAVE_PHOENIX-ACES-AGSS-COND-2011.fits") flux_file_formatter = ( "http://phoenix.astro.physik.uni-goettingen.de/data/HiResFITS/PHOENIX-ACES-AGSS-COND-2011" "/Z{2:s}{3:s}/lte{0:05.0f}-{1:03.2f}{2:s}{3:s}.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits" ) output_formatter = "Z{2:s}{3:s}/lte{0:05.0f}-{1:03.2f}{2:s}{3:s}.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits" os.makedirs(path, exist_ok=True) # Download step log.info("Starting Download of PHOENIX ACES models to {}".format(path)) if not os.path.exists(wave_file): log.info("Downloading wavelength file") urlretrieve(wave_url, wave_file) # Have to wait until wavelength file is downloaded before importing from .interfaces import PHOENIXGridInterface, PHOENIXGridInterfaceNoAlpha # Kind of messy, sorry if parameters is None: if ranges is not None: if len(ranges) == 3: grid = PHOENIXGridInterfaceNoAlpha(path) elif len(ranges) == 4: grid = PHOENIXGridInterface(path) parameters = list(itertools.product(*grid.points)) elif len(parameters[0]) == 3: grid = PHOENIXGridInterfaceNoAlpha(path) elif len(parameters[0]) == 4: grid = PHOENIXGridInterface(path) if ranges is not None: _ranges = np.asarray(ranges) min_params = _ranges.T[0] max_params = _ranges.T[1] else: min_params = np.tile(-np.inf, len(parameters[0])) max_params = np.tile(np.inf, len(parameters[0])) # I hate to iterate here, but this way the progress bar doesn't show something like # 7000 parameters and skips thousands at a time params = [] for p in parameters: if np.all(p >= min_params) and np.all(p <= max_params): params.append(p) pbar = tqdm.tqdm(params) for p in pbar: # Skip irregularities from grid try: grid.check_params(p) except ValueError: continue tmp_p = [p[0], p[1]] # Create the Z string. Have to do this because PHOENIX models use - sign for 0.0 Zstr = "-0.0" if p[2] == 0 else "{:+.1f}".format(p[2]) tmp_p.append(Zstr) # Create the Alpha string, which is nothing if alpha is 0 or unspecified if len(p) == 4: Astr = "" if p[3] == 0 else ".Alpha={:+.2f}".format(p[3]) else: Astr = "" tmp_p.append(Astr) url = flux_file_formatter.format(*tmp_p) pbar.set_description(url.split("/")[-1]) output_file = os.path.join(path, output_formatter.format(*tmp_p)) if not os.path.exists(output_file): os.makedirs(os.path.dirname(output_file), exist_ok=True) try: urlretrieve(url, output_file) except URLError: log.warning( f"Parameters {p} not found. Double check they are on PHOENIX grid" )
[docs]def chunk_list(mylist, n=mp.cpu_count()): """ Divide a lengthy parameter list into chunks for parallel processing and backfill if necessary. :param mylist: a lengthy list of parameter combinations :type mylist: 1-D list :param n: number of chunks to divide list into. Default is ``mp.cpu_count()`` :type n: integer :returns: **chunks** (*2-D list* of shape (n, -1)) a list of chunked parameter lists. """ if isinstance(mylist, np.ndarray): mylist = list(mylist) length = len(mylist) size = int(length / n) # fill with evenly divisible chunks = [mylist[0 + size * i : size * (i + 1)] for i in range(n)] leftover = length - size * n edge = size * n for i in range(leftover): # backfill each with the last item chunks[i % n].append(mylist[edge + i]) return chunks
[docs]def determine_chunk_log(wl, wl_min, wl_max): """ Take in a wavelength array and then, given two minimum bounds, determine the boolean indices that will allow us to truncate this grid to near the requested bounds while forcing the wl length to be a power of 2. :param wl: wavelength array :type wl: np.ndarray :param wl_min: minimum required wavelength :type wl_min: float :param wl_max: maximum required wavelength :type wl_max: float :returns: numpy.ndarray boolean array """ # wl_min and wl_max must of course be within the bounds of wl assert wl_min >= np.min(wl) and wl_max <= np.max( wl ), "determine_chunk_log: wl_min {:.2f} and wl_max {:.2f} are not within the bounds of the grid {:.2f} to {:.2f}.".format( wl_min, wl_max, np.min(wl), np.max(wl) ) # Find the smallest length synthetic spectrum that is a power of 2 in length # and longer than the number of points contained between wl_min and wl_max len_wl = len(wl) npoints = np.sum((wl >= wl_min) & (wl <= wl_max)) chunk = len_wl inds = (0, chunk) # This loop will exit with chunk being the smallest power of 2 that is # larger than npoints while chunk > npoints: if chunk / 2 > npoints: chunk = chunk // 2 else: break assert type(chunk) == np.int, "Chunk is not an integer!. Chunk is {}".format(chunk) if chunk < len_wl: # Now that we have determined the length of the chunk of the synthetic # spectrum, determine indices that straddle the data spectrum. # Find the index that corresponds to the wl at the center of the data spectrum center_wl = (wl_min + wl_max) / 2.0 center_ind = (np.abs(wl - center_wl)).argmin() # Take a chunk that straddles either side. inds = (center_ind - chunk // 2, center_ind + chunk // 2) ind = (np.arange(len_wl) >= inds[0]) & (np.arange(len_wl) < inds[1]) else: print("keeping grid as is") ind = np.ones_like(wl, dtype="bool") assert (min(wl[ind]) <= wl_min) and (max(wl[ind]) >= wl_max), ( "Model" "Interpolator chunking ({:.2f}, {:.2f}) didn't encapsulate full" " wl range ({:.2f}, {:.2f}).".format(min(wl[ind]), max(wl[ind]), wl_min, wl_max) ) return ind
[docs]def vacuum_to_air(wl): """ Converts vacuum wavelengths to air wavelengths using the Ciddor 1996 formula. :param wl: input vacuum wavelengths :type wl: numpy.ndarray :returns: numpy.ndarray .. note:: CA Prieto recommends this as more accurate than the IAU standard. """ wl = np.asarray(wl) sigma = (1e4 / wl) ** 2 f = 1.0 + 0.05792105 / (238.0185 - sigma) + 0.00167917 / (57.362 - sigma) return wl / f
[docs]def calculate_n(wl): """ Calculate *n*, the refractive index of light at a given wavelength. :param wl: input wavelength (in vacuum) :type wl: np.array :return: numpy.ndarray """ wl = np.asarray(wl) sigma = (1e4 / wl) ** 2 f = 1.0 + 0.05792105 / (238.0185 - sigma) + 0.00167917 / (57.362 - sigma) new_wl = wl / f n = wl / new_wl print(n)
def vacuum_to_air_SLOAN(wl): """ Converts vacuum wavelengths to air wavelengths using the outdated SLOAN definition. From the SLOAN website: AIR = VAC / (1.0 + 2.735182E-4 + 131.4182 / VAC^2 + 2.76249E8 / VAC^4) :param wl: The input wavelengths to convert """ wl = np.asarray(wl) air = wl / (1.0 + 2.735182e-4 + 131.4182 / wl**2 + 2.76249e8 / wl**4) return air
[docs]def air_to_vacuum(wl): """ Convert air wavelengths to vacuum wavelengths. :param wl: input air wavelegths :type wl: np.array :return: numpy.ndarray .. warning:: It is generally not recommended to do this, as the function is imprecise. """ wl = np.asarray(wl) sigma = 1e4 / wl vac = wl + wl * ( 6.4328e-5 + 2.94981e-2 / (146 - sigma**2) + 2.5540e-4 / (41 - sigma**2) ) return vac
@np.vectorize def idl_float(idl_num: str) -> float: """ Convert an IDL string number in scientific notation to a float Parameters ---------- idl_num : str Input str Returns ------- float Output float Examples -------- ```python >>> idl_float("1.6D4") 1.6e4 ``` """ idl_str = idl_num.lower() return np.float(idl_str.replace("d", "e"))