Source code for astro_prost.helpers

import os
import time
import sys
if sys.version_info >= (3, 9):
    import importlib.resources as pkg_resources
else:
    import importlib_resources as pkg_resources
import pickle
import requests
import matplotlib.pyplot as plt
import numpy as np
#monkey-patch for python 3.8
if not hasattr(np, 'int'):
    np.int = int
from astropy import units as u
from astropy.coordinates import Angle, SkyCoord, match_coordinates_sky
from astropy.io import ascii
from dl import queryClient as qC
from scipy import stats as st
from scipy.integrate import quad
from scipy.stats import halfnorm, norm, gaussian_kde, rv_continuous
from io import StringIO
import pandas as pd
import logging
import re
from io import BytesIO
from colorama import Fore, Style, init, deinit
try:
    from lsst.rsp import get_tap_service
except:
    pass #if not in the RSP, ignore LSST

# Precision & default values
[docs] PROB_FLOOR = np.finfo(float).eps
[docs] DUMMY_FILL_VAL = -999
# Conversion constants
[docs] RAD_TO_ARCSEC = 206265
[docs] MAX_RAD_DEG = 0.14 # 500'' search radius max
# Redshift & galaxy properties
[docs] REDSHIFT_FLOOR = 0.001 # minimum redshift of z=0.001
[docs] SIZE_FLOOR = 0.25 # 1 pixel, for Pan-STARRS
[docs] ABSMAG_FLOOR = -10 # guess at a (generous) minimum absolute magnitude for a galaxy?
[docs] STARMAG_DIFF = 0.05 # psfmag - kronmag threshold for star/galaxy cut
OFFSET_FLOOR = SHAPE_FLOOR = 1.e-10 # Uncertainty floors & ceilings SIGMA_ABSMAG_FLOOR = SIGMA_SIZE_FLOOR = SIGMA_REDSHIFT_FLOOR = 0.05 # 5% minimum uncertainty SIGMA_ABSMAG_CEIL = SIGMA_SIZE_CEIL = SIGMA_REDSHIFT_CEIL = 0.5 # 50% maximum uncertainty # Default settings for catalogs -- these are meant to be conservative!
[docs] DEFAULT_LIMITING_MAG = {"panstarrs": 22, "decals": 26, "glade": 17, "skymapper": 22, "rubin": 27}
[docs] CATALOG_SHRED_SETTINGS = { "panstarrs": True, # Only enable for Pan-STARRS and skymapper by default "decals": False, "glade": False, "skymapper": True, }
# Default paths
[docs] DEFAULT_DUST_PATH = "."
# Data Structure Definitions
[docs] PROP_DTYPES = [ ("objID", "U30"), ("objID_info", "U20"), ("name", "U20"), ("ra", float), ("dec", float), ("redshift_samples", object), ("redshift_mean", float), ("redshift_std", float), ("redshift_posterior", float), ("redshift_info", "U10"), ("offset_samples", object), ("offset_mean", float), ("offset_std", float), ("offset_posterior", float), ("offset_info", str), ("frac_offset_mean", float), ("frac_offset_std", float), ("absmag_samples", object), ("absmag_mean", float), ("absmag_std", float), ("absmag_posterior", float), ("absmag_info", "U10"), ("dlr_mean", float), ("dlr_std", float), ("dlr_samples", object), ("size_mean", float), ("size_std", float), ("total_posterior", float), ]
# Define a TRACE level below DEBUG -- for lengthy printouts
[docs] TRACE_LEVEL = 5
logging.addLevelName(TRACE_LEVEL, "TRACE") # Boilerplate trace method for the logger class
[docs] def trace(self, message, *args, **kws): """Logs a message at the TRACE level, which is lower than DEBUG (verbose = 3). Parameters ---------- message : str The message to be logged. *args : tuple Additional positional arguments to be passed to the logging call. **kws : dict Additional keyword arguments to be passed to the logging call. Returns ------- None """ if self.isEnabledFor(TRACE_LEVEL): self._log(TRACE_LEVEL, message, args, **kws)
logging.Logger.trace = trace
[docs] def sanitize_input(cat_name): """Cleans up catalog names for use by Prost by removing spaces, underscores, and hyphens, and converting to lowercase. Parameters ---------- cat_name : str The catalog name to be sanitized. Supported catalog names include 'decals', 'glade', 'panstarrs', and 'skymapper'. Returns ------- str The sanitized catalog name. """ cat_name_clean = re.sub(r"[_\-\s]", "", cat_name) return cat_name_clean.lower()
[docs] class ColorlessFormatter(logging.Formatter): """Formatter that strips ANSI color codes from log messages."""
[docs] def format(self, record): message = super().format(record) ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', message)
[docs] def add_console_handler(logger, formatter): """Attach a console (stream) handler to the logger. Parameters ---------- logger : logging.Logger The logger to which the console handler will be added. formatter : logging.Formatter The formatter used to format log messages. Returns ------- None """ console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler)
[docs] def add_file_handler(logger, log_file, formatter): """Attach a file handler to the logger. Parameters ---------- logger : logging.Logger The logger to which the file handler will be added. log_file : str The file path where log messages will be written. formatter : logging.Formatter The formatter used to format log messages. Returns ------- None """ log_path = os.path.dirname(log_file) os.makedirs(log_path, exist_ok=True) file_handler = logging.FileHandler(log_file, mode="a") file_handler.setFormatter(formatter) logger.addHandler(file_handler)
[docs] def setup_logger(log_file=None, verbose=1, is_main=False): """ Sets up a logger that logs messages to both the console and a file (if specified). Parameters ---------- log_file : str, optional Path to the log file. Returns ------- logging.Logger Configured logger instance. """ logger = logging.getLogger("Prost_logger") # If logger already exists and has handlers, return it (prevents duplicates) if logger.hasHandlers(): if log_file is None: return logger elif any(isinstance(h, logging.FileHandler) and h.baseFilename == log_file for h in logger.handlers): return logger log_levels = {0: logging.WARNING, 1: logging.INFO, 2: logging.DEBUG, 3: TRACE_LEVEL} logger.setLevel(log_levels.get(verbose)) console_formatter = logging.Formatter(f"{Style.DIM}%(asctime)s{Style.RESET_ALL}- %(levelname)s - %(message)s") file_formatter = ColorlessFormatter("%(asctime)s - %(levelname)s - %(message)s") # Main process: Set up log file and store its path log_file = log_file if is_main else os.environ.get('LOG_PATH_ENV') if log_file: add_file_handler(logger, log_file, file_formatter) if is_main: # Store log path for workers os.environ['LOG_PATH_ENV'] = log_file else: add_console_handler(logger, console_formatter) return logger
[docs] def flux_to_mag(flux, zeropoint): """Convert flux to magnitude, handling nonpositive values.""" flux = np.maximum(flux, 1e-10) mag = -2.5 * np.log10(flux) + zeropoint return mag
[docs] def build_skymapper_url(ra, dec, search_radius, release, catalog): url = ( f"http://skymapper.anu.edu.au/sm-cone/public/query" f"?CATALOG={release}.{catalog}&RA={ra:.5f}&DEC={dec:.5f}&SR={search_radius:.5f}" "&RESPONSEFORMAT=CSV&VERB=3" ) return url
[docs] def fetch_skymapper_sources(search_pos, search_rad, cat_cols, calc_host_props, logger=None, release='dr4'): """Queries the skymapper catalogs (https://skymapper.anu.edu.au/about-skymapper/). Parameters ---------- search_pos : astropy SkyCoord object Search position (transient location in this code). search_rad : astropy Angle object Size of the search radius. cat_cols : boolean If True, concatenates all columns from the catalog to the final output. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). release : str Data release of the catalog; can be 'dr1', 'dr2', or 'dr4'. Returns ------- pandas.DataFrame Dataframe containing the retrieved sources. """ if release not in ["dr1", "dr2", "dr4"]: raise ValueError(f"Invalid Skymapper version '{release}'. Please choose dr1 - dr4.") if logger is None: logger = setup_logger() if search_rad is None: search_rad = Angle(60 * u.arcsec) rad_deg = search_rad.to(u.deg).value if rad_deg > MAX_RAD_DEG: logger.warning("Search radius at this distance >500''! Reducing to ensure a fast skymapper query.") rad_deg = MAX_RAD_DEG master_url = build_skymapper_url(search_pos.ra.deg, search_pos.dec.deg, rad_deg, release, "master") master_df = pd.read_csv(BytesIO(requests.get(master_url).content)) phot_url = build_skymapper_url(search_pos.ra.deg, search_pos.dec.deg, rad_deg, release, "photometry") phot_df = pd.read_csv(BytesIO(requests.get(phot_url).content)) phot_df = phot_df[phot_df['e_a'].notna()] # Find rows with smallest uncertainty for each object/filter combo idx_min = phot_df.groupby(['object_id', 'filter'])['e_a'].idxmin() phot_df_min = phot_df.loc[idx_min] # Pivot filter-specific columns with prefix pivot_cols = [col for col in phot_df_min.columns if col not in ['object_id', 'filter']] phot_df_pivot = phot_df_min.pivot(index='object_id', columns='filter', values=pivot_cols) # Flatten MultiIndex columns: ('colname', 'g') → 'g_colname' phot_df_pivot.columns = [f"{filt}_{col}" for col, filt in phot_df_pivot.columns] phot_df_pivot.reset_index(inplace=True) # Merge with master catalog phot_df_pivot['object_id'] = phot_df_pivot['object_id'].fillna(0).astype(np.int64) candidate_hosts = phot_df_pivot.merge(master_df, on='object_id', how='outer') candidate_hosts.rename(columns={'object_id':'objID', 'raj2000':'ra', 'dej2000':'dec'}, inplace=True) # very basic cut to get rid of bad candidates - require ngood >= 1 in g OR r band # Note: SkyMapper doesn't have ngood columns, so skip this filtering for SkyMapper if 'g_ngood' in candidate_hosts.columns and 'r_ngood' in candidate_hosts.columns: g_good = (candidate_hosts['g_ngood'].notna()) & (candidate_hosts['g_ngood'] >= 1) r_good = (candidate_hosts['r_ngood'].notna()) & (candidate_hosts['r_ngood'] >= 1) candidate_hosts = candidate_hosts[g_good | r_good] # quick cut to remove stars candidate_hosts = candidate_hosts[np.abs(candidate_hosts['i_psf'] - candidate_hosts['i_petro']) > STARMAG_DIFF] candidate_hosts.replace(DUMMY_FILL_VAL, np.nan, inplace=True) candidate_hosts.reset_index(inplace=True, drop=True) if len(candidate_hosts) < 1: return None return candidate_hosts
[docs] def fetch_rubin_sources(search_pos, search_rad, cat_cols, calc_host_props, logger=None, release="dp1"): """Queries the rubin obs. lsst (placeholder now using dp0.2 and dp1 data, from tutorial 02b on the rsp!). Parameters ---------- search_pos : astropy SkyCoord object Search position (transient location in this code). search_rad : astropy Angle object Size of the search radius. cat_cols : boolean If True, concatenates all columns from the catalog to the final output. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). release : str Catalog to query (for now only dp0.2 and dp1!) Returns ------- pandas.DataFrame Dataframe containing the retrieved sources (after some basic quality cuts). """ service = get_tap_service("tap") assert service is not None str_center_coords = str(search_pos.ra.deg) + ", " + str(search_pos.dec.deg) str_radius = str(search_rad.deg) if release == "dp0.2": catalog_name = "dp02_dc2_catalogs.Object" else: catalog_name = "dp1.Object" query = "SELECT * " +\ f"FROM {catalog_name} " +\ "WHERE CONTAINS(POINT('ICRS', coord_ra, coord_dec), "\ "CIRCLE('ICRS', " + str_center_coords + ", " + str_radius + ")) = 1" #"AND detect_isPrimary = 1" job = service.submit_job(query) job.run() job.wait(phases=['COMPLETED', 'ERROR']) candidate_hosts = job.fetch_result().to_table().to_pandas() job.delete() del query if len(candidate_hosts) < 1: return None candidate_hosts.replace(DUMMY_FILL_VAL, np.nan, inplace=True) candidate_hosts.dropna(subset=[f'{flt}_ixx' for flt in 'gri'], inplace=True) candidate_hosts.dropna(subset=[f'{flt}_kronRad' for flt in 'gri'], inplace=True) candidate_hosts.reset_index(inplace=True, drop=True) for prop in ['_ixx', '_iyy', '_ixy', '_kronRad']: prop_list = [f"{flt}{prop}" for flt in 'gri'] candidate_hosts[prop] = candidate_hosts[prop_list].median(axis=1) candidate_hosts.rename(columns={'objectId':'objID', 'coord_ra':'ra', 'coord_dec':'dec'}, inplace=True) return candidate_hosts
[docs] def fetch_panstarrs_sources(search_pos, search_rad, cat_cols, calc_host_props, logger=None, release='dr2'): """Queries the panstarrs catalogs (https://catalogs.mast.stsci.edu/panstarrs/). Parameters ---------- search_pos : astropy SkyCoord object Search position (transient location in this code). search_rad : astropy Angle object Size of the search radius. cat_cols : boolean If True, concatenates all columns from the catalog to the final output. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). release : str Data release of the catalog; can be 'dr1' or 'dr2'. Returns ------- pandas.DataFrame Dataframe containing the retrieved sources (after some basic quality cuts). """ if release not in ["dr1", "dr2"]: raise ValueError(f"Invalid Pan-STARRS version '{release}'. Please choose 'dr1' or 'dr2'.") elif (release == 'dr1') and (('redshift' in calc_host_props) or ('absmag' in calc_host_props)): raise ValueError("Redshift estimation with Pan-STARRS data can only be done with release 'dr2'.") if logger is None: logger = setup_logger() if search_rad is None: search_rad = Angle(60 * u.arcsec) rad_deg = search_rad.to(u.deg).value if rad_deg > MAX_RAD_DEG: logger.warning("Search radius at this distance >500''! Reducing to ensure a fast pan-starrs query.") rad_deg = MAX_RAD_DEG # load table metadata to avoid a query pkg_data_file = pkg_resources.files('astro_prost') / 'data' / 'panstarrs_metadata.pkl' with pkg_resources.as_file(pkg_data_file) as metadata_path: with open(metadata_path, 'rb') as f: metadata = pickle.load(f) if not cat_cols: source_cols = [ "objID", "raMean", "decMean", "iPSFMag", "gmomentXX", "rmomentXX", "imomentXX", "zmomentXX", "ymomentXX", "gmomentYY", "rmomentYY", "imomentYY", "zmomentYY", "ymomentYY", "gmomentXY", "rmomentXY", "imomentXY", "zmomentXY", "ymomentXY", "nDetections", "primaryDetection", "gKronRad", "rKronRad", "iKronRad", "zKronRad", "yKronRad", "gKronMag", "rKronMag", "iKronMag", "zKronMag", "yKronMag", "gKronMagErr", "rKronMagErr", "iKronMagErr", "zKronMagErr", "yKronMagErr", ] result = panstarrs_cone(metadata, search_pos.ra.deg, search_pos.dec.deg, rad_deg, columns=source_cols, release=release) else: result = panstarrs_cone(metadata, search_pos.ra.deg, search_pos.dec.deg, rad_deg, release=release) if not result: logging.debug(f"Found no pan-starrs {release} sources.") return None candidate_hosts = pd.read_csv(StringIO(result)) if len(candidate_hosts) < 1: return None candidate_hosts.replace(DUMMY_FILL_VAL, np.nan, inplace=True) candidate_hosts.reset_index(inplace=True, drop=True) for prop in ['momentXX', 'momentYY', 'momentXY', 'KronRad', 'KronMag', 'KronMagErr']: prop_list = [f"{flt}{prop}" for flt in 'grizy'] candidate_hosts[prop] = candidate_hosts[prop_list].median(axis=1) # some VERY basic filtering to say that it's confidently detected candidate_hosts = candidate_hosts[candidate_hosts["nDetections"] > 2] candidate_hosts = candidate_hosts[candidate_hosts["primaryDetection"] == 1] # first-order cut to get rid of most stars candidate_hosts = candidate_hosts[np.abs(candidate_hosts['iPSFMag'] - candidate_hosts['iKronMag']) > STARMAG_DIFF] drop_cols = ['raMean', 'decMean'] if 'absmag' in calc_host_props: drop_cols.append('KronMag') if 'offset' in calc_host_props: drop_cols.append('KronRad') candidate_hosts.dropna(subset=drop_cols, inplace=True) candidate_hosts.rename(columns={'raMean':'ra', 'decMean':'dec'}, inplace=True) candidate_hosts.reset_index(drop=True, inplace=True) return candidate_hosts
[docs] def fetch_decals_sources(search_pos, search_rad, cat_cols, calc_host_props, release='dr9'): """Queries the decals catalogs (https://www.legacysurvey.org/decamls/). Parameters ---------- search_pos : astropy SkyCoord object Search position (transient location in this code). search_rad : astropy Angle object Size of the search radius. cat_cols : boolean If True, concatenates all columns from the catalog to the final output. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). release : str Data release of the catalog; can be 'dr9' or 'dr10'. Returns ------- pandas.DataFrame Dataframe containing the retrieved sources. """ if release not in ["dr9", "dr10"]: raise ValueError(f"Invalid DECaLS version '{release}'. Please choose 'dr9' or 'dr10'.") if search_rad is None: search_rad = Angle(60 * u.arcsec) rad_deg = search_rad.to(u.deg).value result = qC.query( sql=f"""SELECT t.ls_id, t.shape_r, t.shape_r_ivar, t.shape_e1, t.shape_e1_ivar, t.shape_e2, t.shape_e2_ivar, t.ra, t.type, t.dec, t.dered_mag_r, t.mag_r, t.flux_r, t.flux_ivar_r, t.nobs_g, t.nobs_r, t.nobs_z, t.fitbits, t.ra_ivar, t.dec_ivar, t.dered_flux_r, pz.z_phot_mean, pz.z_phot_median, pz.z_phot_std, pz.z_spec FROM ls_{release}.tractor t INNER JOIN ls_{release}.photo_z pz ON t.ls_id= pz.ls_id WHERE q3c_radial_query(t.ra, t.dec, {search_pos.ra.deg:.5f}, {search_pos.dec.deg:.5f}, {rad_deg}) AND (t.nobs_r > 0) AND (t.dered_flux_r > 0) AND (t.snr_r > 0) AND (t.type != 'PSF') AND nullif(t.dered_mag_r, 'NaN') is not null AND (t.fitbits != 8192) AND ((pz.z_spec > 0) OR (pz.z_phot_mean > 0))""" ) candidate_hosts = pd.read_csv(StringIO(result)) candidate_hosts.rename(columns={'ls_id':'objID'}, inplace=True) if len(candidate_hosts) < 1: return None else: return candidate_hosts
[docs] def calc_shape_props_rubin(candidate_hosts, band='r'): """Wrapper to calculate the shape parameters for rubin lsst galaxies. Parameters ---------- candidate_hosts : pandas.DataFrame Dataset containing sources with shape information. Returns ------- tuple of np.ndarray (temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std), where: - temp_sizes: Semi-major axes (arcsec) - temp_sizes_std: Uncertainty in semi-major axes - a_over_b: axis ratio - a_over_b_std: Uncertainty in axis ratio - phi: Position angles (radians) - phi_std: Uncertainty in position angles """ temp_sizes = candidate_hosts[f"{band}_kronRad"].values # size uncertainty: 10% floor temp_sizes_std = SIGMA_SIZE_FLOOR * temp_sizes temp_sizes_std = np.maximum(temp_sizes_std, SHAPE_FLOOR) temp_sizes_std[temp_sizes_std < (SIGMA_SIZE_FLOOR * temp_sizes)] = SIGMA_SIZE_FLOOR * temp_sizes[temp_sizes_std < (SIGMA_SIZE_FLOOR * temp_sizes)] # Moment-based axis ratio and PA gal_u = candidate_hosts[f"{band}_ixy"].values gal_q = candidate_hosts[f"{band}_ixx"].values - candidate_hosts[f"{band}_iyy"].values phi = 0.5 * np.arctan2(gal_u, gal_q) phi_std = 0.05 * np.abs(phi) phi_std = np.maximum(SHAPE_FLOOR, phi_std) kappa = gal_q**2 + gal_u**2 kappa = np.minimum(kappa, 0.99) a_over_b = (1 + kappa + 2 * np.sqrt(kappa)) / (1 - kappa) a_over_b = np.clip(a_over_b, 0.1, 10) a_over_b_std = SIGMA_SIZE_FLOOR * np.abs(a_over_b) # uncertainty floor return temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std
[docs] def calc_shape_props_panstarrs(candidate_hosts): """Wrapper to calculate the shape parameters for pan-starrs galaxies. Parameters ---------- candidate_hosts : pandas.DataFrame Dataset containing sources with shape information. Returns ------- tuple of np.ndarray (temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std), where: - temp_sizes: Semi-major axes (arcsec) - temp_sizes_std: Uncertainty in semi-major axes - a_over_b: axis ratio - a_over_b_std: Uncertainty in axis ratio - phi: Position angles (radians) - phi_std: Uncertainty in position angles """ temp_sizes = candidate_hosts["KronRad"].values # assume some fiducial shape floor temp_sizes_std = SIGMA_SIZE_FLOOR * candidate_hosts["KronRad"].values temp_sizes_std = np.maximum(temp_sizes_std, SHAPE_FLOOR) # Prevent division by zero temp_sizes_std[temp_sizes_std < (SIGMA_SIZE_FLOOR * temp_sizes)] = SIGMA_SIZE_FLOOR* temp_sizes[temp_sizes_std < (SIGMA_SIZE_FLOOR * temp_sizes)] gal_u = candidate_hosts["momentXY"].values gal_q = candidate_hosts["momentXX"].values - candidate_hosts["momentYY"].values phi = 0.5 * np.arctan2(gal_u, gal_q) phi_std = 0.05 * np.abs(phi) phi_std = np.maximum(SHAPE_FLOOR, phi_std) kappa = gal_q**2 + gal_u**2 kappa = np.minimum(kappa, 0.99) a_over_b = (1 + kappa + 2 * np.sqrt(kappa)) / (1 - kappa) a_over_b = np.clip(a_over_b, 0.1, 10) a_over_b_std = SIGMA_SIZE_FLOOR * np.abs(a_over_b) # uncertainty floor #return result return temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std
[docs] def calc_shape_props_skymapper(candidate_hosts): """Wrapper to calculate the shape parameters for skymapper galaxies. Parameters ---------- candidate_hosts : pandas.DataFrame Dataset containing sources with shape information. Returns ------- tuple of np.ndarray (temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std), where: - temp_sizes: Semi-major axes (arcsec) - temp_sizes_std: Uncertainty in semi-major axes - a_over_b: axis ratio - a_over_b_std: Uncertainty in axis ratio - phi: Position angles (radians) - phi_std: Uncertainty in position angles """ # a and b are in units of pixels a = candidate_hosts["r_a"].values * 0.5 b = candidate_hosts["r_b"].values * 0.5 e_a = candidate_hosts["r_e_a"].values * 0.5 e_b = candidate_hosts["r_e_b"].values * 0.5 PA = candidate_hosts["r_pa"].values e_PA = candidate_hosts["r_e_pa"].values # Handle nan values in shape parameters - set to valid defaults a = np.where(np.isnan(a), SIZE_FLOOR, np.maximum(a, SIZE_FLOOR)) b = np.where(np.isnan(b), SIZE_FLOOR, np.maximum(b, SIZE_FLOOR)) # Handle nan values in error measurements - set to uncertainty floor e_a = np.where(np.isnan(e_a), SIGMA_SIZE_FLOOR * a, np.maximum(e_a, SHAPE_FLOOR)) e_b = np.where(np.isnan(e_b), SIGMA_SIZE_FLOOR * b, np.maximum(e_b, SHAPE_FLOOR)) # Handle nan values in position angle measurements PA = np.where(np.isnan(PA), 0.0, PA) e_PA = np.where(np.isnan(e_PA), SIGMA_SIZE_FLOOR * np.abs(PA), np.maximum(e_PA, SHAPE_FLOOR)) a_over_b = a / b a_over_b = np.clip(a_over_b, 0.1, 10) # combine errors in quadrature - ensure no nan values a_over_b_std = np.sqrt((e_a / b)**2 + (a * e_b / b**2)**2) a_over_b_std = np.where(np.isnan(a_over_b_std), SIGMA_SIZE_FLOOR * a_over_b, a_over_b_std) a_over_b_std = np.maximum(a_over_b_std, SHAPE_FLOOR) # degrees to radius phi = np.radians(PA) phi_std = np.radians(e_PA) phi_std = np.maximum(phi_std, SHAPE_FLOOR) #return result return a, e_a, a_over_b, a_over_b_std, phi, phi_std
[docs] def calc_shape_props_decals(candidate_hosts): """Wrapper to calculate the shape parameters for decals galaxies. Parameters ---------- candidate_hosts : pandas.DataFrame Dataset containing sources with shape information. Returns ------- tuple of np.ndarray (temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std), where: - temp_sizes: Semi-major axes (arcsec) - temp_sizes_std: Uncertainty in semi-major axes - a_over_b: axis ratio - a_over_b_std: Uncertainty in axis ratio - phi: Position angles (radians) - phi_std: Uncertainty in position angles """ temp_sizes = candidate_hosts["shape_r"].values.copy() temp_sizes[temp_sizes < SIZE_FLOOR] = SIZE_FLOOR temp_sizes_ivar = np.maximum(1/(SIGMA_SIZE_FLOOR*temp_sizes)**2, candidate_hosts["shape_r_ivar"].values) temp_sizes_std = np.sqrt(1 / temp_sizes_ivar) temp_e1 = candidate_hosts["shape_e1"].astype(float).values temp_e1_ivar = np.maximum(1/(SIGMA_SIZE_FLOOR*temp_e1)**2, candidate_hosts["shape_e1_ivar"].astype(float).values) temp_e2 = candidate_hosts["shape_e2"].astype(float).values temp_e2_ivar = np.maximum(1/(SIGMA_SIZE_FLOOR*temp_e2)**2, candidate_hosts["shape_e2_ivar"].astype(float).values) temp_e1_std = np.sqrt(1.0 / temp_e1_ivar) temp_e2_std = np.sqrt(1.0 / temp_e2_ivar) mask_e1_floor = (temp_e1_std < SIGMA_SIZE_FLOOR * np.abs(temp_e1)) temp_e1_std[mask_e1_floor] = SIGMA_SIZE_FLOOR * np.abs(temp_e1[mask_e1_floor]) mask_e2_floor = (temp_e2_std < SIGMA_SIZE_FLOOR * np.abs(temp_e2)) temp_e2_std[mask_e2_floor] = SIGMA_SIZE_FLOOR * np.abs(temp_e2[mask_e2_floor]) # 3) Also clamp to a hard floor so that no std is < SHAPE_FLOOR temp_e1_std = np.maximum(temp_e1_std, SHAPE_FLOOR) temp_e2_std = np.maximum(temp_e2_std, SHAPE_FLOOR) # Calculate ellipticity and axis ratio for all samples e = np.sqrt(temp_e1**2 + temp_e2**2) e = np.maximum(e, SHAPE_FLOOR) a_over_b = (1 + e) / (1 - e) # Compute uncertainty in e (sigma_e) e_std = (1 / e) * np.sqrt(temp_e1**2 * temp_e1_std**2 + temp_e2**2 * temp_e2_std**2) # Compute uncertainty in a_over_b (sigma_a_over_b) a_over_b_std = (2 / (1 - e) ** 2) * e_std # Position angle and angle calculations for all samples phi = -np.arctan2(temp_e2, temp_e1) / 2 # now propagate uncertainty from the shape params -- this is a bit messy because # it requires partial derivatives d/de1 and d/de2 of arctan2, but let's try: # d/de2(arctan2) = e1/(e1^2 + e2^2) # d/de1(arctan2) = -e2/(e1^2 + e2^2) # so d/de2(-arctan2/2)= -e1/2*(e1^2 + e2^2) # so d/de1(-arctan2/2)= e2/2*(e1^2 + e2^2) denom = temp_e1**2 + temp_e2**2 denom = np.maximum(denom, SHAPE_FLOOR) partial_phi_e2 = -temp_e1 / (2.0 * denom) # d(phi)/d(e2) partial_phi_e1 = temp_e2 / (2.0 * denom) # d(phi)/d(e1) # Now propagate uncertainties from e1 and e2 phi_std = np.sqrt((partial_phi_e1**2) * (temp_e1_std**2) + (partial_phi_e2**2) * (temp_e2_std**2)) # First clamp phi_std to be at least SIGMA_SIZE_FLOOR * |phi| phi_std = np.maximum(phi_std, SIGMA_SIZE_FLOOR * np.abs(phi)) phi_std = np.maximum(phi_std, SHAPE_FLOOR) return temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std
[docs] def calc_shape_props_glade(candidate_hosts): """Wrapper to calculate the shape parameters for pan-starrs galaxies. Parameters ---------- candidate_hosts : pandas.DataFrame Dataset containing sources with shape information. Returns ------- tuple of np.ndarray (temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std), where: - temp_sizes: Semi-major axes (arcsec) - temp_sizes_std: Uncertainty in semi-major axes - a_over_b: axis ratio - a_over_b_std: Uncertainty in axis ratio - phi: Position angles (radians) - phi_std: Uncertainty in position angles """ temp_pa = candidate_hosts["PAHyp"].values.copy() # assume no position angle for unmeasured gals temp_pa[temp_pa != temp_pa] = 0 # (n) HyperLEDA decimal logarithm of the length of the projected major axis # of a galaxy at the isophotal level 25mag/arcsec2 in the B-band, # to semi-major half-axis (half-light radius) in arcsec temp_sizes = 0.5 * 3 * 10 ** (candidate_hosts["logd25Hyp"].values) temp_sizes = np.maximum(SIZE_FLOOR, temp_sizes) temp_sizes_std = np.minimum( temp_sizes, np.abs(temp_sizes) * np.log(10) * candidate_hosts["e_logd25Hyp"].values ) temp_sizes_std[temp_sizes_std != temp_sizes_std] = SIGMA_SIZE_FLOOR * temp_sizes[temp_sizes_std != temp_sizes_std] temp_sizes_std[temp_sizes_std < SIGMA_SIZE_FLOOR * temp_sizes] = SIGMA_SIZE_FLOOR * temp_sizes[temp_sizes_std < SIGMA_SIZE_FLOOR * temp_sizes] a_over_b = 10 ** (candidate_hosts["logr25Hyp"].values) a_over_b_std = a_over_b * np.log(10) * candidate_hosts["e_logr25Hyp"].values # set uncertainty floor nanbool = a_over_b_std != a_over_b_std a_over_b_std[nanbool] = SIGMA_SIZE_FLOOR * a_over_b[nanbool] temp_pa = candidate_hosts["PAHyp"].values.copy() # assume no position angle for unmeasured gals # (round is a decent assumption for the most distant ones) temp_pa[temp_pa != temp_pa] = SHAPE_FLOOR phi = np.radians(temp_pa) phi_std = SIGMA_SIZE_FLOOR * phi # uncertainty floor return temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std
[docs] def build_galaxy_array(candidate_hosts, cat_cols, transient_name, catalog, release, logger): """Builds a structured NumPy array of galaxy properties for host association. Parameters ---------- candidate_hosts : pandas.DataFrame DataFrame with candidate host galaxy properties. cat_cols : bool If True, include extra catalog columns not in PROP_DTYPES. transient_name : str Name of the transient (for logging purposes). catalog : str Name of the catalog (e.g., 'panstarrs', 'decals', 'glade', 'skymapper'). release : str Catalog release version (e.g., 'dr2'). logger : logging.Logger Logger instance for messages to console or output file. Returns ------- galaxies : numpy.ndarray Structured array of galaxy properties with dtype defined by PROP_DTYPES (extended to include catalog properties if cat_cols is True). cat_col_fields : list List of extra catalog column names from the catalog. """ n_galaxies = len(candidate_hosts) base_fields = ['objID', 'ra','dec'] calc_fields = [x[0] for x in PROP_DTYPES] dtype = list(PROP_DTYPES) #local copy of global var if n_galaxies < 1: logger.info(f"No sources found around {transient_name} in {catalog} {release}! " "Double-check that the SN coords overlap the survey footprint.") return None, [] if cat_cols: # Identify new fields to add from candidate_hosts cat_col_fields = list(set(candidate_hosts.columns) - set(calc_fields)) # Extend the dtype with new fields and their corresponding data types for col in cat_col_fields: col_dtype = candidate_hosts[col].dtype # Convert pandas extension dtypes (e.g. StringDtype) to numpy-compatible dtypes if hasattr(col_dtype, 'numpy_dtype'): col_dtype = col_dtype.numpy_dtype elif not isinstance(col_dtype, np.dtype): col_dtype = np.dtype('O') dtype.append((col, col_dtype)) # Create galaxies array with updated dtype galaxies = np.zeros(n_galaxies, dtype=dtype) # Populate galaxies array with data from candidate_hosts for col in candidate_hosts.columns: galaxies[col] = candidate_hosts[col].values else: galaxies = np.zeros(n_galaxies, dtype=dtype) cat_col_fields = [] # Populate galaxies array with data from candidate_hosts for col in base_fields: galaxies[col] = candidate_hosts[col].values return galaxies, cat_col_fields
[docs] def fetch_catalog_data(self, transient, search_rad, cosmo, logger, cat_cols, calc_host_props): """ Fetch and process catalog data for a given transient. This function selects the appropriate catalog function based on the catalog name (stored in `self.catalog_functions`), sets default parameters (e.g. limiting magnitude), and passes common parameters (transient info, search radius, cosmology, etc.) to the catalog-specific function. For the 'glade' catalog, it requires that `self.data` is provided. Parameters ---------- transient : object A transient object with at least `name` and `position` attributes. search_rad : astropy.units.Angle Cone search radius. cosmo : astropy.cosmology Cosmological model for distance conversions. logger : logging.Logger Logger instance for messages to console or output file. cat_cols : bool If True, include extra catalog columns in the returned data. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). Returns ------- tuple A tuple (catalog_data, extra_columns) where: - catalog_data is the processed candidate host data (e.g. a pandas.DataFrame or structured array), - extra_columns is a list of additional catalog column names (empty if `cat_cols` is False). Raises ------ ValueError If the catalog name is unknown or if required catalog data (e.g. for 'glade') is missing. """ if self.name not in self.catalog_functions: raise ValueError(f"Unknown catalog: {self.name}. Open a pull request to add functionality for other catalogs!") init(autoreset=True) catalog_func = self.catalog_functions[self.name] self.limiting_mag = DEFAULT_LIMITING_MAG.get(self.name, None) # Common parameters for all catalogs init_params = { "transient": transient, "search_rad": search_rad, "cosmo": cosmo, "logger": logger, "calc_host_props": calc_host_props, "n_samples": self.n_samples, "cat_cols": cat_cols, "release": self.release } # Add extra parameters if needed (e.g., `glade_catalog` for `glade`) if self.name == "glade": if self.data is not None: init_params["glade_catalog"] = self.data else: raise ValueError("Please provide GLADE catalog as 'data' when initializing GalaxyCatalog object.") return np.array([]), np.array([]) # quality cut -- if True, removes candidate galaxy shreds identified at the catalog level init_params['shred_cut'] = self.shred_cut # Run the function and store results deinit() return catalog_func(**init_params)
[docs] class GalaxyCatalog: """Class for a catalog containing candidate transient host galaxies. Parameters ---------- name : str Name of the transient catalog. Currently 'glade', 'decals', and 'panstarrs' are supported. data : pandas.DataFrame or None Locally-saved catalog data (e.g., GLADE) used for host association at low redshift. n_samples : int Number of Monte Carlo samples to draw during the association process. Attributes ---------- name : str Identifier for the catalog. Current supported values are 'glade', 'decals', and 'panstarrs'. data : pandas.DataFrame or None The catalog data provided to the object. Used for catalogs that do not require an external query. n_samples : int The number of samples used for probabilistic host association calculations. release : str or None The version or data release of the catalog (e.g., 'dr1/dr2' for pan-starrs; 'dr9/dr100' for decals). Can be None if not applicable. shred_cut : bool A flag indicating whether to automatically remove likely source shreds (duplicate detections) from the candidate list. The default is determined by the catalog type (only pan-starrs is True). """ def __init__(self, name, release=None, data=None, n_samples=1000):
[docs] self.name = name
[docs] self.data = data
[docs] self.n_samples = n_samples
[docs] self.release = release
[docs] self.shred_cut = CATALOG_SHRED_SETTINGS.get(self.name, False)
# Define catalog function mapping as an instance attribute
[docs] self.catalog_functions = { "panstarrs": build_panstarrs_candidates, "decals": build_decals_candidates, "glade": build_glade_candidates, "skymapper": build_skymapper_candidates, "rubin": build_rubin_candidates, }
[docs] def get_candidates(self, transient, cosmo, logger, time_query=False, calc_host_props=['offset' ,'absmag', 'redshift'], cat_cols=False): """Hydrates the catalog attribute catalog.galaxies with a list of candidates. Parameters ---------- transient : Transient Source to associate, of custom class Transient. cosmo : astropy cosmology Assumed cosmology for conversions (defaults to LambdaCDM if not set). logger : logging.Logger Logger instance for messages to console or output file. time_query : boolean If True, times the catalog query and stores the result in self.query_time. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). cat_cols : boolean If True, contatenates catalog columns to resulting DataFrame. """ search_rad = Angle(300 * u.arcsec) if (transient.redshift == transient.redshift) and (transient.redshift > 0): search_rad = Angle( np.nanmax( [ 100 / cosmo.angular_diameter_distance(transient.redshift).to(u.kpc).value * RAD_TO_ARCSEC, search_rad.arcsec, ] ) * u.arcsec ) self.search_rad = search_rad self.search_pos = transient.position if time_query: start_time = time.time() self.galaxies, self.cat_col_fields = fetch_catalog_data(self, transient, search_rad, cosmo, logger, cat_cols, calc_host_props) if self.galaxies is None: self.ngals = 0 else: self.ngals = len(self.galaxies) if time_query: end_time = time.time() elapsed = end_time - start_time self.query_time = elapsed
[docs] class Transient: """Class for transient source to be associated. Parameters ---------- name : str Name of transient. position : astropy.coord.SkyCoord object Position of transient. position_err : tuple of quantity objects Positional uncertainty of transient. logger : logging.Logger Logger instance for messages to console or output file. redshift : float Photometric or spectroscopic redshift of transient. spec_class : str Spectroscopic class of transient, if available. phot_class : str Photometric class of transient, if available. n_samples : int Number of iterations for Monte Carlo association. Attributes ---------- best_host : int Catalog index of host with the highest posterior probability of association. Set to -1 if no valid host. best_hosts : list of int List of catalog indices for the top n_hosts host galaxies. n_hosts : int Number of host galaxies to return. redshift : float Redshift of the transient. This is either the spectroscopic/photometric redshift or an inferred value from sampling from the prior. redshift_std : float Uncertainty in transient redshift. gen_z_samples : function Generates samples for the transient's redshift from the measured redshift (if available) or a user-specified prior distribution. priors : dict Prior distributions for host properties used in association. Keys are a subset of 'offset', 'absmag', and 'redshift'. likes : dict Likelihood functions for host properties used in association. Keys are a subset of 'offset', 'absmag', and 'redshift'. name : str Name of the transient. position : astropy.coordinates.SkyCoord The transient's position. spec_class : str Spectroscopic classification of the transient, if available. Defaults to an empty string. phot_class : str Photometric classification of the transient, if available. Defaults to an empty string. n_samples : int Number of samples used for probabilistic association. Defaults to 1000. """ def __init__(self, name, position, logger, redshift=np.nan, redshift_std=np.nan, spec_class="", position_err=(0.1*u.arcsec, 0.1*u.arcsec), phot_class="", n_samples=1000, n_hosts=2):
[docs] self.name = name
[docs] self.position = position
[docs] self.position_err = position_err
[docs] self.redshift = redshift
[docs] self.redshift_std = redshift_std
[docs] self.spec_class = spec_class
[docs] self.phot_class = phot_class
[docs] self.n_samples = n_samples
[docs] self.n_hosts = n_hosts
[docs] self.best_host = -1
[docs] self.logger = logger
[docs] self.best_hosts = []
if (redshift == redshift) and (redshift_std != redshift_std): redshift_std = SIGMA_REDSHIFT_FLOOR * self.redshift self.redshift_std = redshift_std self.logger.info(f"Setting redshift uncertainty for {name} to floor of {redshift_std:.5f}.")
[docs] self.priors = {}
[docs] self.likes = {}
# draw n_samples positional samples ra_samples = np.random.normal(self.position.ra.deg, self.position_err[0].to(u.deg).value, size=n_samples) dec_samples = np.random.normal(self.position.dec.deg, self.position_err[1].to(u.deg).value, size=n_samples)
[docs] self.position_samples = SkyCoord( ra=ra_samples*u.deg, dec=dec_samples*u.deg, )
[docs] def __str__(self): # Define what should be shown when printing the Transient object redshift_str = f"redshift={self.redshift:.4f}" if (self.redshift == self.redshift) else "redshift=N/A" class_str = ( f"spec. class = {self.spec_class}" if len(self.spec_class) > 0 else f"phot. class = {self.phot_class}" if len(self.phot_class) > 0 else "unclassified" ) return f"Transient(name={self.name}, position={self.position}, {redshift_str}, {class_str}"
[docs] def get_prior(self, type): """Retrieves the transient host's prior for a given property. Parameters ---------- type : str Type of prior to retrieve (can be redshift, offset, or absmag). Returns ------- prior : scipy stats continuous distribution The prior for 'type' property. """ try: prior = self.priors[type] except KeyError: prior = None return prior
[docs] def get_likelihood(self, type): """Retrieves the transient host's likelihood for a given property. Parameters ---------- type : str Type of prior to retrieve (can be redshift, offset, absmag). Returns ------- prior : scipy stats continuous distribution The likelihood for 'type' property. """ try: like = self.likes[type] except KeyError: like = None return like
[docs] def set_likelihood(self, type, func): """Sets the transient's host prior for a given property. Parameters ---------- type : str Type of likelihood to set (can be redshift, offset, absmag). func : scipy stats continuous distribution The likelihood to set for 'type' property. """ self.likes[type] = func
[docs] def set_prior(self, type, func): """Sets the transient host's prior for a given property. Parameters ---------- type : str Type of prior to set (can be redshift, offset, absmag). func : scipy stats continuous distribution The prior to set for 'type' property. """ self.priors[type] = func if (type == "redshift") and (self.redshift != self.redshift): self.gen_z_samples()
[docs] def gen_z_samples(self, ret=False, n_samples=None): """Generates transient redshift samples for Monte Carlo association. If redshift is not measured, samples are drawn from the prior. Parameters ---------- ret : boolean If true, returns the samples. n_samples : int Number of samples to draw. Returns ------- samples : np.ndarray If ret=True, redshift samples. """ if n_samples is None: n_samples = self.n_samples # Sample initially based on whether redshift is NaN or not if np.isnan(self.redshift): samples = self.get_prior("redshift").rvs(size=n_samples) self.redshift = np.nanmean(samples) self.redshift_std = np.nanstd(samples) else: if self.redshift < REDSHIFT_FLOOR: self.redshift = REDSHIFT_FLOOR self.redshift_std = np.nanstd(self.redshift) samples = norm.rvs(self.redshift, self.redshift_std, size=n_samples) # Resample only those below the floor while np.any(samples < REDSHIFT_FLOOR): mask = samples < REDSHIFT_FLOOR samples[mask] = (self.get_prior("redshift").rvs(size=np.sum(mask)) if np.isnan(self.redshift) else norm.rvs(self.redshift, self.redshift_std, size=np.sum(mask))) # now return or assign redshift samples if ret: return samples else: self.redshift_samples = samples
[docs] def calc_prior_redshift(self, redshift_samples, reduce="mean"): """Calculates the prior probability of the transient redshift samples. Parameters ---------- redshift_samples : np.ndarray Array of transient redshift samples. reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- pdf : float or np.ndarray prior probability point estimate or samples. """ pdf = self.get_prior("redshift").pdf(redshift_samples) if reduce == "mean": return np.nanmean(pdf, axis=1) # Resulting shape: (n_galaxies,) elif reduce == "median": return np.nanmedian(pdf, axis=1) else: return pdf
[docs] def calc_prior_offset(self, fractional_offset_samples, reduce="mean"): """Calculates the prior probability of the transient's fractional offset. Parameters ---------- fractional_offset_samples : np.ndarray Array of transient fractional offset samples. reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- type Description of returned object. """ pdf = self.get_prior("offset").pdf(fractional_offset_samples) if reduce == "mean": return np.nanmean(pdf, axis=1) # Resulting shape: (n_galaxies,) elif reduce == "median": return np.nanmedian(pdf, axis=1) else: return pdf
[docs] def calc_prior_absmag(self, absmag_samples, reduce="mean"): """Computes the prior probability for host galaxy absolute magnitude. Parameters ---------- absmag_samples : np.ndarray 2D array of (n_galaxies, n_samples). reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- np.ndarray The collapsed prior probability across samples for all galaxies. """ pdf = self.get_prior("absmag").pdf(absmag_samples) if reduce == "mean": return np.nanmean(pdf, axis=1) elif reduce == "median": return np.nanmedian(pdf, axis=1) else: return pdf
[docs] def calc_like_redshift(self, redshift_mean, redshift_std, reduce="mean"): """Calculates the likelihood of a redshift with uncertainties. Parameters ---------- redshift_mean : np.ndarray Redshift means for candidate hosts. redshift_std : np.ndarray Redshift stds for candidate hosts. reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- np.ndarray The collapsed likelihood probability across samples for all galaxies. """ z_sn_samples = self.redshift_samples[np.newaxis, :] # Shape: (n_sn_samples, 1) redshift_mean = redshift_mean[:, np.newaxis] # Shape: (1, n_galaxies) redshift_std = redshift_std[:, np.newaxis] # Shape: (1, n_galaxies) # Calculate the likelihood of each SN redshift sample across each galaxy likelihoods = norm.pdf( z_sn_samples, loc=redshift_mean, scale=redshift_std ) # Shape: (n_sn_samples, n_galaxies) if reduce == "mean": return np.nanmean(likelihoods, axis=1) # Resulting shape: (n_galaxies,) elif reduce == "median": return np.nanmedian(likelihoods, axis=1) else: return likelihoods
[docs] def calc_like_offset(self, fractional_offset_samples, reduce="mean"): """Calculates the likelihoods of a set of fractional offsets. Parameters ---------- fractional_offset_samples : np.ndarray Array of (n_galaxies, n_samples) fractional offsets. reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- np.ndarray The collapsed likelihood probability across samples for all galaxies. """ likelihoods = self.get_likelihood("offset").pdf(fractional_offset_samples) if reduce == "mean": return np.nanmean(likelihoods, axis=1) # Resulting shape: (n_galaxies,) elif reduce == "median": return np.nanmedian(likelihoods, axis=1) else: return likelihoods
[docs] def calc_like_absmag(self, absmag_samples, reduce="mean"): """Calculates the likelihoods of a set of absolute magnitudes. Parameters ---------- fractional_offset_samples : np.ndarray Array of (n_galaxies, n_samples) absmag values. reduce : str How to collapse the samples into a prior point-estimate. Defaults to calculating the mean across samples. Returns ------- np.ndarray The collapsed likelihood probability across samples for all galaxies. """ # assuming a typical 0.1 SN/century/10^10 Lsol (in K-band) # TODO -- convert to K-band luminosity of the host! # https://www.aanda.org/articles/aa/pdf/2005/15/aa1411.pdf likelihoods = self.get_likelihood("absmag").pdf(absmag_samples) if reduce == "mean": return np.nanmean(likelihoods, axis=1) elif reduce == "median": return np.nanmedian(likelihoods, axis=1) else: return likelihoods
[docs] def associate(self, galaxy_catalog, cosmo, condition_host_props=['offset']): """Runs the main transient association module. Parameters ---------- galaxy_catalog : GalaxyCatalog object The catalog populated with candidate hosts and their attributes. cosmo : astropy cosmology Assumed cosmology. Returns ------- galaxy_catalog : GalaxyCatalog object The catalog, with additional attributes including posterior probabilities, best host, and unobserved probability. """ ngals = galaxy_catalog.ngals search_rad = galaxy_catalog.search_rad limiting_mag = galaxy_catalog.limiting_mag galaxies = galaxy_catalog.galaxies n_gals = len(galaxies) n_samples = galaxy_catalog.n_samples post_set = [] if 'redshift' in condition_host_props: # Extract arrays for all galaxies from the catalog redshift_mean = np.array(galaxies["redshift_mean"]) redshift_std = np.array(galaxies["redshift_std"]) redshift_samples = np.vstack(galaxies["redshift_samples"]) prior_redshift = self.calc_prior_redshift(redshift_samples, reduce=None) like_redshift = self.calc_like_redshift(redshift_mean, redshift_std, reduce=None) if np.isnan(self.redshift): # Marginalize over the sampled supernova redshifts # by integrating the likelihood over the redshift prior sorted_indices = np.argsort(redshift_samples, axis=1) sorted_redshift_samples = np.take_along_axis(redshift_samples, sorted_indices, axis=1) sorted_integrand = np.take_along_axis(prior_redshift * like_redshift, sorted_indices, axis=1) # Perform integration using simps or trapz post_redshift = np.trapz(sorted_integrand, sorted_redshift_samples, axis=1) post_redshift = post_redshift[:, np.newaxis] else: post_redshift = prior_redshift * like_redshift post_set.append(post_redshift) if 'absmag' in condition_host_props: absmag_mean = np.array(galaxies["absmag_mean"]) absmag_std = np.array(galaxies["absmag_std"]) absmag_samples = np.vstack(galaxies["absmag_samples"]) prior_absmag = self.calc_prior_absmag(absmag_samples, reduce=None) like_absmag = self.calc_like_absmag(absmag_samples, reduce=None) post_absmag = prior_absmag * like_absmag post_set.append(post_absmag) if 'offset' in condition_host_props: offset_mean = np.array(galaxies["offset_mean"]) offset_std = np.array(galaxies["offset_std"]) offset_samples = np.vstack(galaxies["offset_samples"]) galaxy_dlr_samples = np.vstack(galaxies["dlr_samples"]) # Calculate angular diameter distances for all samples fractional_offset_samples = offset_samples / galaxy_dlr_samples prior_offset = self.calc_prior_offset(fractional_offset_samples, reduce=None) # Shape (N,) like_offset = self.calc_like_offset(fractional_offset_samples, reduce=None) # Shape (N,) post_offset = prior_offset * like_offset post_set.append(post_offset) # Compute the posterior probabilities for all galaxies post_gals_stacked = np.stack(post_set, axis=0) post_gals = np.prod(post_gals_stacked, axis=0) # some very low value that the SN is actually hostless, across all samples. post_hostless = np.ones(n_samples)*PROB_FLOOR if self.redshift == self.redshift: post_outside = self.probability_host_outside_cone(search_rad=search_rad, n_samples=n_samples, cosmo=cosmo, condition_host_props=condition_host_props) else: post_outside = PROB_FLOOR post_unobs = self.probability_of_unobserved_host(search_rad=search_rad, limiting_mag=limiting_mag, n_samples=n_samples, cosmo=cosmo, condition_host_props=condition_host_props) # sum across all galaxies post_tot = np.nansum(post_gals, axis=0) + post_hostless + post_outside + post_unobs # floor to machine precision post_tot[post_tot < PROB_FLOOR] = PROB_FLOOR p_none_norm = (post_outside + post_hostless + post_unobs) / post_tot p_any_norm = np.nansum(post_gals, axis=0) / post_tot post_gals_norm = post_gals / post_tot post_outside_norm = post_outside / post_tot post_unobs_norm = post_unobs / post_tot post_hostless_norm = post_hostless / post_tot if 'offset' in condition_host_props: post_offset_norm = post_offset / post_tot if 'redshift' in condition_host_props: post_redshift_norm = post_redshift / post_tot if 'absmag' in condition_host_props: post_absmag_norm = post_absmag / post_tot # calculate the galaxies with the most matches in the MCMC all_posts = np.vstack([ post_gals_norm, post_outside_norm, post_unobs_norm, post_hostless_norm ]) winner_indices = np.argmax(all_posts, axis=0) counts = np.bincount(winner_indices, minlength=n_gals+3) win_fractions = counts / float(n_samples) top_idxs = np.argsort(win_fractions)[::-1] self.associated_catalog = galaxy_catalog.name self.any_posterior = np.nanmedian(p_any_norm) self.none_posterior = np.nanmedian(p_none_norm) self.smallcone_posterior = np.nanmedian(post_outside_norm) self.missedcat_posterior = np.nanmedian(post_unobs_norm) best_idx = top_idxs[0] if best_idx < n_gals: # This is a real galaxy self.logger.info(Fore.GREEN + "Association successful!"+Style.RESET_ALL) self.best_host = best_idx else: # no galaxy found self.best_host = -1 if best_idx == n_gals: self.logger.warning(Fore.YELLOW+"Association failed. Host is likely outside the search cone."+Style.RESET_ALL) elif best_idx == (n_gals + 1): self.logger.warning(Fore.YELLOW+"Association failed. Host is likely missing from the catalog."+Style.RESET_ALL) elif best_idx == (n_gals + 2): self.logger.warning(Fore.YELLOW+"Association failed. Host is likely hostless."+Style.RESET_ALL) # Populate best_hosts list with top n_hosts candidates self.best_hosts = [] for i in range(min(self.n_hosts, len(top_idxs))): idx = top_idxs[i] if idx < n_gals: self.best_hosts.append(idx) else: break # consolidate across samples galaxy_catalog.galaxies["total_posterior"] = np.nanmedian(post_gals_norm, axis=1) if 'offset' in condition_host_props: galaxy_catalog.galaxies["offset_posterior"] = np.nanmedian(post_offset_norm, axis=1) galaxy_catalog.galaxies["frac_offset_mean"] = np.nanmean(fractional_offset_samples, axis=1) galaxy_catalog.galaxies["frac_offset_std"] = np.nanstd(fractional_offset_samples, axis=1) if 'redshift' in condition_host_props: galaxy_catalog.galaxies["redshift_posterior"] = np.nanmedian(post_redshift_norm, axis=1) if 'absmag' in condition_host_props: absmag_best_post = np.nanmedian(post_absmag_norm, axis=1)[self.best_host] galaxy_catalog.galaxies["absmag_posterior"] = np.nanmedian(post_absmag_norm, axis=1) return galaxy_catalog
[docs] def probability_of_unobserved_host(self, search_rad, cosmo, limiting_mag=30, n_samples=1000, condition_host_props=['offset']): """Calculates the posterior probability of the host being either dimmer than the limiting magnitude of the catalog or not in the catalog at all. Parameters ---------- search_rad : float Cone search radius, in arcsec. limiting_mag : float Limiting magnitude of the survey, in AB mag. n_samples : int Number of samples for Monte Carlo association. cosmo : astropy cosmology Assumed cosmology for the run. Returns ------- post_unobserved : np.ndarray n_samples of posterior probabilities of the host not being in the catalog. """ # only set if we have absmag and redshift priors -- otherwise set to 0! if ('absmag' not in condition_host_props) or ('redshift' not in condition_host_props): return np.ones(n_samples)*PROB_FLOOR post_set = [] n_gals = int(0.5 * n_samples) z_sn = self.redshift z_sn_std = self.redshift_std sn_distance = cosmo.luminosity_distance(z_sn).to(u.pc).value # in pc if np.isnan(z_sn): # draw galaxies from the same distribution redshift_mean = self.gen_z_samples(n_samples=n_gals, ret=True) redshift_std = SIGMA_REDSHIFT_FLOOR * redshift_mean redshift_samples = np.maximum( REDSHIFT_FLOOR, norm.rvs( loc=redshift_mean[:, np.newaxis], scale=redshift_std[:, np.newaxis], size=(n_gals, n_samples) ), ) prior_redshift = self.calc_prior_redshift(redshift_samples, reduce=None) like_redshift = self.calc_like_redshift(redshift_mean, redshift_std, reduce=None) sorted_indices = np.argsort(redshift_samples, axis=1) sorted_redshift_samples = np.take_along_axis(redshift_samples, sorted_indices, axis=1) sorted_integrand = np.take_along_axis(prior_redshift * like_redshift, sorted_indices, axis=1) # Perform integration using simps or trapz post_z = np.trapz(sorted_integrand, sorted_redshift_samples, axis=1) # Shape: (n_galaxies, 1) post_z = post_z[:, np.newaxis] else: # Use the known supernova redshift redshift_mean = np.maximum(REDSHIFT_FLOOR, norm.rvs(loc=z_sn, scale=z_sn_std, size=(n_gals))) # assume all well-constrained redshifts redshift_std = SIGMA_REDSHIFT_FLOOR * redshift_mean redshift_samples = np.maximum( REDSHIFT_FLOOR, norm.rvs( loc=redshift_mean[:, np.newaxis], scale=redshift_std[:, np.newaxis], size=(n_gals, n_samples) ), ) prior_redshift = self.calc_prior_redshift(redshift_samples, reduce=None) like_redshift = self.calc_like_redshift(redshift_mean, redshift_std, reduce=None) post_z = prior_redshift * like_redshift post_set.append(post_z) absmag_lim = limiting_mag - 5 * (np.log10(sn_distance / 10)) absmag_mean = np.linspace(absmag_lim, ABSMAG_FLOOR, n_gals) absmag_std = SIGMA_ABSMAG_FLOOR*np.abs(absmag_mean) absmag_samples = norm.rvs( loc=absmag_mean[:, np.newaxis], scale=absmag_std[:, np.newaxis], size=(n_gals, n_samples) ) prior_absmag = self.calc_prior_absmag(absmag_samples, reduce=None) like_absmag = self.calc_like_absmag(absmag_samples, reduce=None) post_absmag = prior_absmag*like_absmag post_set.append(post_absmag) if 'offset' in condition_host_props: galaxy_physical_radius_prior_means = halfnorm.rvs(size=n_gals, loc=1.0, scale=10) # in kpc galaxy_physical_radius_prior_std = SIGMA_SIZE_FLOOR * galaxy_physical_radius_prior_means galaxy_physical_radius_prior_samples = norm.rvs( loc=galaxy_physical_radius_prior_means[:, np.newaxis], scale=galaxy_physical_radius_prior_std[:, np.newaxis], size=(n_gals, n_samples), ) min_phys_rad = 1.0 max_phys_rad = (search_rad.arcsec / RAD_TO_ARCSEC) * sn_distance / 1.0e3 # in kpc physical_offset_mean = np.linspace(min_phys_rad, max_phys_rad, n_gals) physical_offset_std = SIGMA_SIZE_FLOOR * physical_offset_mean physical_offset_samples = norm.rvs( physical_offset_mean[:, np.newaxis], physical_offset_std[:, np.newaxis], size=(n_gals, n_samples) ) # Shape: (n_gals, n_samples) fractional_offset_samples = ( physical_offset_samples / galaxy_physical_radius_prior_samples ) prior_offset_unobs = self.calc_prior_offset(fractional_offset_samples, reduce=None) l_offset_unobs = self.calc_like_offset(fractional_offset_samples, reduce=None) post_offset = prior_offset_unobs * l_offset_unobs post_set.append(post_offset) # Compute the posterior probabilities for all galaxies # shape -> (n_properties, n_gals, n_samples) prob_unobs_stacked = np.stack(post_set, axis=0) # shape -> (n_gals, n_samples) post_unobs = np.prod(prob_unobs_stacked, axis=0) # average over all galaxies -- keep n_samples post_unobs = np.nanmean(post_unobs, axis=0) return post_unobs
[docs] def probability_host_outside_cone(self, cosmo, search_rad=60, n_samples=1000, condition_host_props=['offset']): """Calculates the posterior probability of the host being outside the cone search chosen for the catalog query. Primarily set by the fractional offset and redshift prior. Parameters ---------- search_rad : float Cone search radius, in arcsec. n_samples : int Number of samples to draw for Monte Carlo association. cosmo : astropy cosmology Assumed cosmology. condition_host_props : list List of galaxy properties to use for association. Returns ------- post_outside : np.ndarray An array of n_samples posterior probabilities of the host being outside the search cone. """ # only calculate if we have redshift and offset priors -- otherwise fix to PROB_FLOOR if 'redshift' not in condition_host_props: return np.ones(n_samples)*PROB_FLOOR n_gals = int(n_samples / 2) post_set = [] z_sn = self.redshift z_sn_std = self.redshift_std if np.isnan(z_sn): # draw galaxies from the same distribution redshift_mean = self.gen_z_samples( n_samples=n_gals, ret=True ) # draw from prior if redshift is missing redshift_std = SIGMA_REDSHIFT_FLOOR * redshift_mean # scatter around some nominal uncertainty redshift_samples = np.maximum( REDSHIFT_FLOOR, norm.rvs( loc=redshift_mean[:, np.newaxis], scale=redshift_std[:, np.newaxis], size=(n_gals, n_samples) ), ) prior_redshift = self.calc_prior_redshift(redshift_samples, reduce=None) like_redshift = self.calc_like_redshift(redshift_mean, redshift_std, reduce=None) sorted_indices = np.argsort(redshift_samples, axis=1) sorted_redshift_samples = np.take_along_axis(redshift_samples, sorted_indices, axis=1) sorted_integrand = np.take_along_axis(prior_redshift * like_redshift, sorted_indices, axis=1) # Perform integration using trapezoidal integration p_z = np.trapz(sorted_integrand, sorted_redshift_samples, axis=1) p_z = p_z[:, np.newaxis] else: # Use the known supernova redshift # some higher spread for host redshift photo-zs redshift_mean = np.maximum(REDSHIFT_FLOOR, norm.rvs(loc=z_sn, scale=z_sn_std, size=(n_gals))) redshift_std = SIGMA_REDSHIFT_FLOOR * redshift_mean # assume all well-constrained redshifts redshift_samples = np.maximum( REDSHIFT_FLOOR, norm.rvs( loc=redshift_mean[:, np.newaxis], scale=redshift_std[:, np.newaxis], size=(n_gals, n_samples) ), ) prior_redshift = self.calc_prior_redshift(redshift_samples, reduce=None) like_redshift = self.calc_like_redshift(redshift_mean, redshift_std, reduce=None) p_z = prior_redshift * like_redshift post_set.append(p_z) # Calculate the distance to the supernova for each sampled redshift sn_distances = cosmo.comoving_distance(self.redshift_samples).value # in Mpc # Convert angular cutout radius to physical offset at each sampled redshift min_phys_rad = (search_rad.arcsec / RAD_TO_ARCSEC) * sn_distances * 1e3 # in kpc max_phys_rad = 5 * min_phys_rad galaxy_physical_radius_prior_means = halfnorm.rvs(size=n_gals, loc=0, scale=10) # in kpc galaxy_physical_radius_prior_std = SIGMA_SIZE_FLOOR * galaxy_physical_radius_prior_means galaxy_physical_radius_prior_samples = norm.rvs( loc=galaxy_physical_radius_prior_means[:, np.newaxis], scale=galaxy_physical_radius_prior_std[:, np.newaxis], size=(n_gals, n_samples), ) physical_offset_samples = np.linspace(min_phys_rad, max_phys_rad, n_gals) fractional_offset_samples = physical_offset_samples / galaxy_physical_radius_prior_samples prior_offset = self.calc_prior_offset(fractional_offset_samples, reduce=None) l_offset = self.calc_like_offset(fractional_offset_samples, reduce=None) post_offset = prior_offset * l_offset post_set.append(post_offset) if 'absmag' in condition_host_props: # sample brightnesses absmag_mean = self.get_prior("absmag").rvs(size=n_gals) absmag_std = 0.05 * np.abs(absmag_mean) absmag_samples = np.maximum(REDSHIFT_FLOOR, norm.rvs(loc=absmag_mean[:, np.newaxis], scale=absmag_std[:, np.newaxis], size=(n_gals, n_samples)), ) prior_absmag = self.calc_prior_absmag(absmag_samples, reduce=None) like_absmag = self.calc_like_absmag(absmag_samples, reduce=None) post_absmag = prior_absmag * like_absmag post_set.append(post_absmag) # Compute the posterior probabilities for all galaxies # shape -> (n_properties, n_gals, n_samples) prob_outside_stacked = np.stack(post_set, axis=0) # shape -> (n_gals, n_samples) post_outside = np.prod(prob_outside_stacked, axis=0) # average over all simulated galaxies -- keep the samples post_outside = np.nanmean(post_outside, axis=0) return post_outside
[docs] class PriorzObservedTransients(rv_continuous): """A continuous probability distribution for a redshift prior defined by an observed sample of transients with a given limiting magnitude, volumetric rate, and brightness distribution. Parameters ---------- z_min : float Minimum redshift to draw transients from. z_max : float Maximum redshift to draw transients from. n_bins : int Number of bins with which to fit the observed sample to a PDF. mag_cutoff : float Maximum apparent magnitude of the transient survey. absmag_mean : float Expected absolute brightness of the transient. absmag_min : float Description of parameter `absmag_min`. absmag_max : type Description of parameter `absmag_max`. r_transient : float Transient volumetric rate, in units of N/Mpc^3/yr. (This gets normalized, so this is not too important). t_obs : float The observing time in years. **kwargs : dict Any other params. Attributes ---------- cosmo : astropy cosmology Assumed cosmology. _generate_distribution : function Runs the experiment to build the distribution of observed transients. z_min z_max n_bins mag_cutoff absmag_mean absmag_min absmag_max r_transient t_obs """ def __init__( self, cosmo, z_min=0, z_max=1, n_bins=100, mag_cutoff=22, absmag_mean=-19, absmag_min=-24, absmag_max=-17, r_transient=1e-5, t_obs=1.0, **kwargs, ): super().__init__(**kwargs) # Assign the parameters
[docs] self.z_min = z_min
[docs] self.z_max = z_max
[docs] self.n_bins = n_bins
[docs] self.mag_cutoff = mag_cutoff
[docs] self.absmag_mean = absmag_mean
[docs] self.absmag_min = absmag_min
[docs] self.absmag_max = absmag_max
[docs] self.r_transient = r_transient
[docs] self.t_obs = t_obs
[docs] self.cosmo = cosmo
# Automatically run the internal function to generate the distribution self._generate_distribution()
[docs] def _generate_distribution(self): """Generate and store the empirical redshift distribution for the prior. This method: - Creates redshift bins between self.z_min and self.z_max. - Computes the comoving volume element (dV/dz) for each bin using self.cosmo. - Estimates the number of supernovae per bin based on the transient rate (self.r_transient) and survey solid angle. - Samples redshifts uniformly within each bin according to the estimated counts. - Calculates luminosity distances for the sampled redshifts. - Samples and clips absolute magnitudes, then computes apparent magnitudes. - Filters the sample to include only supernovae with apparent magnitudes ≤ self.mag_cutoff. - Fits a Gaussian KDE to the resulting observed redshifts, storing the KDE in self.bestFit and the observed redshifts in self.observed_redshifts. Returns ------- None The distribution is stored internally. """ # Create redshift bins z_bins = np.linspace(self.z_min, self.z_max, self.n_bins + 1) # Centers of redshift bins z_centers = (z_bins[:-1] + z_bins[1:])/2 # Calculate the comoving volume element dV/dz for each redshift bin # in Mpc^3 per steradian per dz dv_dz = self.cosmo.differential_comoving_volume(z_centers).value # Full sky solid angle (4 pi steradians) # full sky in steradians solid_angle = 4 * np.pi # Supernovae per redshift bin (for full sky) supernovae_per_bin = (self.r_transient * dv_dz * solid_angle * np.diff(z_bins)).astype(int) # Generate random redshifts for all supernovae z_scattered = np.hstack( [ np.random.uniform(z_bins[i], z_bins[i + 1], size=n) for i, n in enumerate(supernovae_per_bin) if n > 0 ] ) # Calculate luminosity distance (in parsecs) for all supernovae d_l = self.cosmo.luminosity_distance(z_scattered).to(u.pc).value # Sample absolute magnitudes from a Gaussian and clip the values to the range [Mmin, Mmax] absolute_magnitudes = np.random.normal(loc=self.absmag_mean, scale=1.5, size=len(z_scattered)) absolute_magnitudes = np.clip(absolute_magnitudes, self.absmag_min, self.absmag_max) # Calculate apparent magnitudes using distance modulus for all supernovae m_apparent = absolute_magnitudes + 5 * np.log10(d_l / 10) # Filter supernovae based on apparent magnitude cutoff observed_indices = m_apparent <= self.mag_cutoff self.observed_redshifts = z_scattered[observed_indices] # Calculate the best fit KDE for observed redshifts self.bestFit = gaussian_kde(self.observed_redshifts)
[docs] def pdf(self, z): """ Return the PDF (KDE) based on observed redshifts. Handles 1D and 2D arrays. Parameters ---------- z : np.ndarray List of input redshifts. Returns ------- flat_pdf : the pdf from a kde fit to the input redshifts. """ if z.ndim == 2: flat_z = z.flatten() flat_pdf = self.bestFit(flat_z) return flat_pdf.reshape(z.shape) else: return self.bestFit(z)
[docs] def rvs(self, size=None): """Generate random variables from the empirical distribution. Parameters ---------- size : int Number of samples to draw from the distribution Returns ------- samples : np.ndarray The redshift samples from the distribution. """ samples = self.bestFit.resample(size=size).reshape(-1) return samples
[docs] def plot(self): """Plots the empirical redshift distribution. """ z_bins = np.linspace(self.z_min, self.z_max, self.n_bins + 1) plt.figure(figsize=(8, 6)) plt.hist( self.observed_redshifts, bins=z_bins, edgecolor="k", alpha=0.7, density=True, label="Observed Histogram", ) plt.xlabel("Redshift (z)") plt.ylabel("Density") # Plot the KDE over a fine grid of redshift values z_fine = np.linspace(self.z_min, self.z_max, 1000) plt.plot(z_fine, self.bestFit(z_fine), color="k", label="KDE Fit") plt.title(f"Observed Supernovae with $m < {self.mag_cutoff}$") plt.legend() plt.show()
[docs] class SnRateAbsmag(rv_continuous): """A host-galaxy absolute magnitude likelihood distribution, where supernova rate scales as ~0.1*L_host in units of 10^10 Lsol. Based on Li, Chornock et al. 2011. Parameters ---------- a : float The minimum absolute magnitude of a host galaxy. b : float The maximum absolute magnitude of a host galaxy. Attributes ---------- normalization : float The calculated normalization constant for the distribution. _calculate_normalization : function Calculates the normalization constant for the distribution. """ def __init__(self, a, b, **kwargs): super().__init__(a=a, b=b, **kwargs)
[docs] self.normalization = self._calculate_normalization(a, b)
[docs] def _calculate_normalization(self, a, b): """Calculates the normalization constant for the distribution. Parameters ---------- a : float The minimum absolute magnitude of a host galaxy. b : float The maximum absolute magnitude of a host galaxy. Returns ------- result : float The calculated normalization constant for the distribution. """ result, _ = quad(self._unnormalized_pdf, a, b) return result
[docs] def _unnormalized_pdf(self, abs_mag_samples): """Calculates the unnormalized PDF from the supernova rate. Parameters ---------- abs_mag_samples : np.ndarray Array of galaxy absolute magnitudes. Returns ------- snrate : np.ndarray Supernovae rate for corresponding galaxies. """ msol = 4.74 lgal = 10 ** (-0.4 * (abs_mag_samples - msol)) # in units of Lsol lgal /= 1.0e10 # in units of 10^10 Lsol snrate = 0.1 * lgal return snrate
[docs] def _pdf(self, m_abs_samples): """The PDF of galaxies with m_abs_samples, after normalization. Parameters ---------- m_abs_samples : np.ndarray Absolute magnitudes of galaxies. Returns ------- normalized_pdf : np.ndarray Normalized PDF values for m_abs_samples. """ normalized_pdf = self._unnormalized_pdf(m_abs_samples) / self.normalization return normalized_pdf
[docs] def panstarrs_cone( metadata, ra, dec, radius, table="stack", release="dr2", format="csv", columns=None, baseurl="https://catalogs.mast.stsci.edu/api/v0.1/panstarrs", verbose=False, **kw, ): """Conducts a cone search of the Pan-STARRS 3PI catalog tables. Parameters ---------- metadata : dict Dictionary of astropy tables. ra : float Right ascension of search center, in decimal degrees. dec : float Declination of search center, in decimal degrees. radius : float Radius of search cone, in degrees. table : str The table to query. release : str The pan-starrs data release. Can be "dr1" or "dr2". format : str The format for the retrieved data. columns : np.ndarray A list of columns to retrieve from 'table'. baseurl : str The api endpoint to query. verbose : boolean If True, prints details about the query. **kw : dict Any additional search parameters. Returns ------- result : str String containing retrieved data (empty if none found). """ data = kw.copy() data["ra"] = ra data["dec"] = dec data["radius"] = radius result = panstarrs_search( metadata=metadata, table=table, release=release, format=format, columns=columns, baseurl=baseurl, verbose=verbose, **data ) return result
[docs] def build_glade_candidates( transient, glade_catalog, cosmo, logger, search_rad=None, n_samples=1000, cat_cols=False, calc_host_props=['redshift', 'absmag', 'offset'], shred_cut=False, release=None, ): """Populates a GalaxyCatalog object with candidates from a cone search of the GLADE catalog (See https://glade.elte.hu/ for details). Reported luminosity distances have been converted to redshifts with a 5% uncertainty floor for faster processing. Parameters ---------- transient : Transient Custom object for queried transient containing name, position, and positional uncertainty. glade_catalog : pandas.DataFrame The locally-packaged GLADE catalog (to avoid querying). search_rad : astropy.units.Angle Radius for cone search. cosmo : astropy.cosmology Assumed cosmology for conversions. n_samples : int Number of samples for Monte Carlo association. cat_cols : boolean If True, concatenates catalog fields for best host to final catalog. shred_cut : boolean If True, removes likely source shreds associated with the same candidate galaxy. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). Returns ------- galaxies : structured numpy array Array of properties for candidate sources needed for host association. cat_col_fields : list List of columns retrieved from the galaxy catalog (rather than calculated internally). """ transient_name = transient.name transient_pos = transient.position transient_pos_samples = transient.position_samples if release != "latest": logger.warning("Only GLADE+ is supported at this time. Please open a pull request to expand Prost to alternative versions or other catalogs!") elif shred_cut: logger.warning("shred_cut is not implemented for GLADE+ galaxies at this time. Running with shred_cut = False.") if search_rad is None: search_rad = Angle(60 * u.arcsec) # start with a 1 degree cut on GLADE around the transient position to speed up the cross-match ra_min = transient_pos.ra.deg - 1 ra_max = transient_pos.ra.deg + 1 dec_min = transient_pos.dec.deg - 1 dec_max = transient_pos.dec.deg + 1 glade_catalog.rename(columns={'GLADE+':'objID','RAJ2000':'ra', 'DEJ2000':'dec', 'z_best':'redshift', 'z_best_std':'redshift_std'}, inplace=True) filtered_glade = glade_catalog[(glade_catalog["ra"] > ra_min) & (glade_catalog["ra"] < ra_max) & (glade_catalog["dec"] > dec_min) & (glade_catalog["dec"] < dec_max)] glade_catalog = filtered_glade candidate_hosts = glade_catalog[ SkyCoord(glade_catalog["ra"].values * u.deg, glade_catalog["dec"].values * u.deg) .separation(transient_pos) .arcsec < search_rad.arcsec ] galaxies_pos = SkyCoord(candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg) if len(candidate_hosts) < 1: return None, [] galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "GLADE+", release, logger) if galaxies is None: return None, [] n_galaxies = len(galaxies) galaxies['objID_info'] = ['GLADE+']*n_galaxies # get alternate name for candidate (e.g., NGC) name_columns = ['HyperLEDA', 'GWGC', '2MASS', 'WISExSCOS'] galaxies['name'] = candidate_hosts[name_columns] \ .apply(lambda row: row.dropna().iloc[0].strip() if row.notna().any() else None, axis=1) if 'offset' in calc_host_props: temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std = calc_shape_props_glade(candidate_hosts) dlr_samples = calc_dlr( transient_pos_samples, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=n_samples, ) offset_samples = np.array([(SkyCoord(galaxies["ra"][i] * u.deg, galaxies["dec"][i] * u.deg).separation(transient_pos_samples).arcsec) for i in np.arange(n_galaxies)]) # Calculate angular separation between SN and all galaxies (in arcseconds) for i in range(n_galaxies): galaxies["offset_samples"][i] = offset_samples[i, :] galaxies["dlr_samples"][i] = dlr_samples[i, :] galaxies['dlr_mean'] = np.nanmean(dlr_samples, axis=1) galaxies['dlr_std'] = np.nanstd(dlr_samples, axis=1) galaxies['size_mean'] = temp_sizes galaxies['size_std'] = temp_sizes_std galaxies['offset_mean'] = np.nanmean(offset_samples, axis=1) galaxies['offset_std'] = np.nanstd(offset_samples, axis=1) galaxies['offset_info'] = 'ARCSEC' if ('redshift' in calc_host_props) or ('absmag' in calc_host_props): redshift_mean = candidate_hosts["redshift"].values redshift_std = candidate_hosts["redshift_std"].values redshift_samples = np.maximum( REDSHIFT_FLOOR, norm.rvs( loc=redshift_mean[:, np.newaxis], scale=redshift_std[:, np.newaxis], size=(n_galaxies, n_samples) ), ) galaxies["redshift_mean"] = pd.to_numeric(candidate_hosts["redshift"].values, errors='coerce') galaxies["redshift_std"] = redshift_std galaxies['redshift_info'] = ['PHOT'] # Mark objects with measured luminosity distance or spec-z has_specz = candidate_hosts['f_dL'] > 1 galaxies['redshift_info'][has_specz] = 'SPEC' # set redshift floor redshift_samples[redshift_samples < REDSHIFT_FLOOR] = REDSHIFT_FLOOR for i in range(n_galaxies): galaxies["redshift_samples"][i] = redshift_samples[i, :] if 'absmag' in calc_host_props: temp_mag_r = candidate_hosts["Bmag"].values temp_mag_r_std = np.abs(candidate_hosts["e_Bmag"].values) # set a floor of 5% temp_mag_r_std[temp_mag_r_std < SIGMA_ABSMAG_FLOOR * temp_mag_r] = SIGMA_ABSMAG_FLOOR * temp_mag_r[temp_mag_r_std < SIGMA_ABSMAG_FLOOR * temp_mag_r] absmag_samples = ( norm.rvs( loc=temp_mag_r[:, np.newaxis], scale=temp_mag_r_std[:, np.newaxis], size=(n_galaxies, n_samples), ) - cosmo.distmod(redshift_samples).value ) galaxies["absmag_mean"] = temp_mag_r - cosmo.distmod(galaxies["redshift_mean"]).value galaxies["absmag_std"] = temp_mag_r_std galaxies["absmag_info"] = ["B"]*n_galaxies for i in range(n_galaxies): galaxies["absmag_samples"][i] = absmag_samples[i, :] return galaxies, cat_col_fields
[docs] def build_skymapper_candidates(transient, cosmo, logger, glade_catalog=None, search_rad=None, n_samples=1000, calc_host_props=['redshift', 'absmag', 'offset'], cat_cols=False, shred_cut=False, release="dr4"): """Populates a GalaxyCatalog object with candidates from a cone search of the SkyMapper catalog (See https://skymapper.anu.edu.au/about-skymapper/ for details). Parameters ---------- transient : str Transient object with name, position, and positional uncertainties defined. glade_catalog : pandas.DataFrame The locally-packaged GLADE catalog (to avoid querying). search_rad : astropy Angle Radius for cone search. cosmo : astropy cosmology Assumed cosmology for conversions. n_samples : int Number of samples for Monte Carlo association. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). cat_cols : boolean If True, concatenates catalog fields for best host to final catalog. shred_cut : boolean If True, removes likely source shreds associated with the same candidate galaxy. release : str Can be 'dr1', 'dr2', or 'dr4'. Returns ------- galaxies : structured numpy array Array of properties for candidate sources needed for host association. cat_col_fields : list List of columns retrieved from the galaxy catalog (rather than calculated internally). """ transient_name = transient.name transient_pos = transient.position transient_pos_samples = transient.position_samples candidate_hosts = fetch_skymapper_sources(transient_pos, search_rad, cat_cols, calc_host_props, logger, release) if candidate_hosts is None: return None, [] galaxies_pos = SkyCoord(candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg) galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "skymapper", release, logger) if galaxies is None: return None, [] n_galaxies = len(galaxies) galaxies["objID_info"] = [f'skymapper {release}']*n_galaxies if glade_catalog is not None: glade_catalog.rename(columns={'z_best':'redshift', 'z_best_std':'redshift_std'}, inplace=True) if candidate_hosts is None: return None, [] if 'offset' in calc_host_props: temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std = calc_shape_props_skymapper(candidate_hosts) galaxies_pos = SkyCoord( candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg ) dlr_samples = calc_dlr( transient_pos_samples, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=n_samples, ) temp_mag_r = candidate_hosts["r_petro"].values temp_mag_r_std = candidate_hosts["e_r_petro"].values.copy() # cap at 50% the mag # set a floor of 5% temp_mag_r_std[temp_mag_r_std > (SIGMA_ABSMAG_CEIL*temp_mag_r)] = SIGMA_ABSMAG_CEIL * temp_mag_r[temp_mag_r_std > (SIGMA_ABSMAG_CEIL*temp_mag_r)] temp_mag_r_std[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] = SIGMA_ABSMAG_FLOOR * temp_mag_r[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] # shred logic if (shred_cut) and (len(candidate_hosts) > 1): logger.info(f"Removing skymapper {release} shreds.") shred_idxs = find_shreds( candidate_hosts["objID"].values, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, temp_mag_r, logger, ) if len(shred_idxs) > 0: left_idxs = ~candidate_hosts.index.isin(shred_idxs) candidate_hosts = candidate_hosts[left_idxs] temp_mag_r = temp_mag_r[left_idxs] temp_mag_r_std = temp_mag_r_std[left_idxs] galaxies_pos = galaxies_pos[left_idxs] dlr_samples = dlr_samples[left_idxs] temp_sizes = temp_sizes[left_idxs] temp_sizes_std = temp_sizes_std[left_idxs] logger.info(f"Removed {len(shred_idxs)} flagged skymapper sources.") else: logger.info("No skymapper shreds found.") galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "skymapper", release, logger) if galaxies is None: return None, [] n_galaxies = len(galaxies) galaxies['objID_info'] = [f'skymapper {release}']*n_galaxies if ('redshift' in calc_host_props) or ('absmag' in calc_host_props): galaxies["redshift_mean"] = np.nan galaxies["redshift_std"] = np.nan # if the source is within 1arcsec of a GLADE host, take that spec-z. if glade_catalog is not None: logger.debug("Cross-matching with GLADE for redshifts...") ra_min = transient_pos.ra.deg - 1 ra_max = transient_pos.ra.deg + 1 dec_min = transient_pos.dec.deg - 1 dec_max = transient_pos.dec.deg + 1 filtered_glade = glade_catalog[(glade_catalog["RAJ2000"] > ra_min) & (glade_catalog["RAJ2000"] < ra_max) & (glade_catalog["DEJ2000"] > dec_min) & (glade_catalog["DEJ2000"] < dec_max)] glade_catalog = filtered_glade if len(glade_catalog) >= 1: glade_coords = SkyCoord(glade_catalog["RAJ2000"], glade_catalog["DEJ2000"], unit=(u.deg, u.deg)) idx, seps, _ = match_coordinates_sky(galaxies_pos, glade_coords) mask_within_1arcsec = seps.arcsec < 1 galaxies["redshift_mean"][mask_within_1arcsec] = glade_catalog["redshift"].values[ idx[mask_within_1arcsec] ] galaxies["redshift_std"][mask_within_1arcsec] = glade_catalog["redshift_std"].values[ idx[mask_within_1arcsec] ] redshift_samples = norm.rvs( galaxies["redshift_mean"][:, np.newaxis], galaxies["redshift_std"][:, np.newaxis], size=(n_galaxies, n_samples), ) # set photometric redshift floor redshift_samples[redshift_samples < REDSHIFT_FLOOR] = REDSHIFT_FLOOR for i in range(n_galaxies): galaxies["redshift_samples"][i] = redshift_samples[i, :] if 'absmag' in calc_host_props: absmag_samples = ( norm.rvs( loc=temp_mag_r[:, np.newaxis], scale=temp_mag_r_std[:, np.newaxis], size=(n_galaxies, n_samples), ) - cosmo.distmod(redshift_samples).value ) galaxies["absmag_mean"] = temp_mag_r - cosmo.distmod(galaxies["redshift_mean"]).value galaxies['absmag_std'] = temp_mag_r_std galaxies['absmag_info'] = ["r"]*n_galaxies for i in range(n_galaxies): galaxies["absmag_samples"][i] = absmag_samples[i, :] if 'offset' in calc_host_props: # Calculate angular separation between SN and all galaxies (in arcseconds) offset_samples = galaxies_pos[:, None].separation(transient_pos_samples).arcsec for i in range(n_galaxies): galaxies['offset_samples'][i] = offset_samples[i, :] galaxies["dlr_samples"][i] = dlr_samples[i, :] galaxies['dlr_mean'] = np.nanmean(dlr_samples, axis=1) galaxies['dlr_std'] = np.nanstd(dlr_samples, axis=1) galaxies['size_mean'] = temp_sizes galaxies['size_std'] = temp_sizes_std galaxies['offset_mean'] = np.nanmean(offset_samples, axis=1) galaxies['offset_std'] = np.nanstd(offset_samples, axis=1) galaxies['offset_info'] = 'ARCSEC' return galaxies, cat_col_fields
[docs] def build_decals_candidates(transient, cosmo, logger, search_rad=None, n_samples=1000, calc_host_props=['redshift', 'absmag', 'offset'], cat_cols=False, shred_cut=False, release="dr9"): """Populates a GalaxyCatalog object with candidates from a cone search of the DECaLS catalog (See https://www.legacysurvey.org/decamls/ for details). Parameters ---------- transient : str Transient object with name, position, and positional uncertainties defined. glade_catalog : pandas.DataFrame The locally-packaged GLADE catalog (to avoid querying). search_rad : astropy Angle Radius for cone search. cosmo : astropy cosmology Assumed cosmology for conversions. n_samples : int Number of samples for Monte Carlo association. calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). cat_cols : boolean If True, concatenates catalog fields for best host to final catalog. shred_cut : boolean If True, removes likely source shreds associated with the same candidate galaxy. release : str Can be 'dr9' or 'dr10'. Returns ------- galaxies : structured numpy array Array of properties for candidate sources needed for host association. cat_col_fields : list List of columns retrieved from the galaxy catalog (rather than calculated internally). """ if shred_cut: logger.warning("shred_cut is not implemented for decals galaxies at this time. Running with shred_cut = False.") transient_name = transient.name transient_pos = transient.position transient_pos_samples = transient.position_samples candidate_hosts = fetch_decals_sources(transient_pos, search_rad, cat_cols, calc_host_props, release) if candidate_hosts is None: return None, [] galaxies_pos = SkyCoord(candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg) galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "decals", release, logger) if galaxies is None: return None, [] n_galaxies = len(galaxies) galaxies["objID_info"] = [f'decals {release}']*n_galaxies if 'offset' in calc_host_props: temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std = calc_shape_props_decals(candidate_hosts) dlr_samples = calc_dlr( transient_pos_samples, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=n_samples, ) # Calculate angular separation between SN and all galaxies (in arcseconds) offset_samples = galaxies_pos[:, None].separation(transient_pos_samples).arcsec for i in range(n_galaxies): galaxies['offset_samples'][i] = offset_samples[i, :] galaxies["dlr_samples"][i] = dlr_samples[i, :] galaxies['dlr_mean'] = np.nanmean(dlr_samples, axis=1) galaxies['dlr_std'] = np.nanstd(dlr_samples, axis=1) galaxies['size_mean'] = temp_sizes galaxies['size_std'] = temp_sizes_std galaxies['offset_mean'] = np.nanmean(offset_samples, axis=1) galaxies['offset_std'] = np.nanstd(offset_samples, axis=1) galaxies['offset_info'] = 'ARCSEC' if ('redshift' in calc_host_props) or ('absmag' in calc_host_props): galaxy_photoz_mean = candidate_hosts["z_phot_mean"].values galaxy_photoz_std = candidate_hosts["z_phot_std"].values galaxy_specz = candidate_hosts["z_spec"].values galaxies["redshift_mean"] = galaxy_photoz_mean galaxies["redshift_std"] = np.abs(galaxy_photoz_std) galaxies["redshift_info"] = ['PHOT']*n_galaxies #if we have spec-zs, replace those as the best redshift good_specz = galaxy_specz > REDSHIFT_FLOOR galaxies["redshift_mean"][good_specz] = galaxy_specz[good_specz] galaxies["redshift_std"][good_specz] = SIGMA_REDSHIFT_FLOOR * galaxy_specz[good_specz] # floor of 5% for spec-zs galaxies["redshift_info"][good_specz] = 'SPEC' galaxies["redshift_std"][galaxy_photoz_std > (SIGMA_REDSHIFT_CEIL * galaxy_photoz_mean)] = ( SIGMA_REDSHIFT_CEIL * galaxy_photoz_mean[galaxy_photoz_std > (SIGMA_REDSHIFT_CEIL * galaxy_photoz_mean)] ) # ceiling of 50% redshift_samples = norm.rvs( galaxies["redshift_mean"][:, np.newaxis], galaxies["redshift_std"][:, np.newaxis], size=(n_galaxies, n_samples), ) # set photometric redshift floor redshift_samples[redshift_samples < REDSHIFT_FLOOR] = REDSHIFT_FLOOR for i in range(n_galaxies): galaxies["redshift_samples"][i] = redshift_samples[i, :] if 'absmag' in calc_host_props: temp_mag_r = candidate_hosts["dered_mag_r"].values temp_mag_r_std = np.abs( 2.5 / np.log(10) * np.sqrt(1 / np.maximum(PROB_FLOOR, candidate_hosts["flux_ivar_r"].values)) / candidate_hosts["flux_r"].values ) # cap at 50% the mag temp_mag_r_std[temp_mag_r_std > (SIGMA_ABSMAG_CEIL * temp_mag_r)] = SIGMA_ABSMAG_CEIL * temp_mag_r[temp_mag_r_std > (SIGMA_ABSMAG_CEIL * temp_mag_r)] # set a floor of 5% temp_mag_r_std[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] = SIGMA_ABSMAG_FLOOR * temp_mag_r[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] absmag_samples = ( norm.rvs( loc=temp_mag_r[:, np.newaxis], scale=temp_mag_r_std[:, np.newaxis], size=(n_galaxies, n_samples), ) - cosmo.distmod(redshift_samples).value ) galaxies['absmag_mean'] = temp_mag_r - cosmo.distmod(galaxies["redshift_mean"]).value galaxies['absmag_std'] = temp_mag_r_std galaxies["absmag_info"] = ["r"]*n_galaxies for i in range(n_galaxies): galaxies["absmag_samples"][i] = absmag_samples[i, :] return galaxies, cat_col_fields
[docs] def build_rubin_candidates( transient, cosmo, logger, glade_catalog=None, search_rad=None, n_samples=1000, calc_host_props=['redshift', 'absmag', 'offset'], cat_cols=False, release='dp1', shred_cut=False, dust_path=DEFAULT_DUST_PATH, ): """Populates a GalaxyCatalog object with candidates from a cone search of the panstarrs catalog (See https://outerspace.stsci.edu/display/PANSTARRS/ for details). Parameters ---------- transient : Transient Custom Transient object with name, position, and positional uncertainties defined. transient_pos : astropy.coord.SkyCoord Position of transient to associate. cosmo : astropy cosmology Assumed cosmology for conversions. logger : logging.Logger Logger instance for messages to console or output file. search_rad : astropy Angle Radius for cone search. n_samples : int Number of samples for Monte Carlo association. glade_catalog : pandas.DataFrame The locally-packaged GLADE catalog (to avoid querying). calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). cat_cols : boolean If True, concatenates catalog fields for best host to final catalog. shred_cut : boolean If True, removes likely source shreds associated with the same candidate galaxy. dust_path : str Path to the dust map data files. Returns ------- galaxies : structured numpy array Array of properties for candidate sources needed for host association. cat_col_fields : list List of columns retrieved from the galaxy catalog (rather than calculated internally). """ if release not in ["dp1", "dp0.2"]: raise ValueError("Only Rubin DP0.2 and DP1 data is accessible! LSST data is coming...") ZEROPOINTS = { "u": 26.52, "g": 28.51, "r": 28.36, "i": 28.17, "z": 27.78, "y": 26.82, } transient_name = transient.name transient_pos = transient.position transient_pos_samples = transient.position_samples if shred_cut: logger.info("Running with shred_cut = True...") candidate_hosts = fetch_rubin_sources(transient_pos, search_rad, cat_cols, calc_host_props, logger, release) if glade_catalog is not None: glade_catalog.rename(columns={'z_best':'redshift', 'z_best_std':'redshift_std'}, inplace=True) if candidate_hosts is None: return None, [] if 'offset' in calc_host_props: temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std = calc_shape_props_rubin(candidate_hosts) galaxies_pos = SkyCoord( candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg ) dlr_samples = calc_dlr( transient_pos_samples, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=n_samples, ) band = 'r' flux = candidate_hosts[f"{band}_kronFlux"].values flux_err = candidate_hosts[f"{band}_kronFluxErr"].values zeropoint = ZEROPOINTS[band] temp_mag = flux_to_mag(flux, zeropoint) temp_mag_std = 2.5 / np.log(10) * (flux_err / np.maximum(flux, 1e-10)) temp_mag_std = np.clip(temp_mag_std, 0.05, 0.5 * np.abs(temp_mag)) # cap at 50% the mag # set a floor of 5% temp_mag_std[temp_mag_std > (SIGMA_ABSMAG_CEIL*temp_mag)] = SIGMA_ABSMAG_CEIL * temp_mag[temp_mag_std > (SIGMA_ABSMAG_CEIL*temp_mag)] temp_mag_std[temp_mag_std < (SIGMA_ABSMAG_FLOOR * temp_mag)] = SIGMA_ABSMAG_FLOOR * temp_mag[temp_mag_std < (SIGMA_ABSMAG_FLOOR * temp_mag)] # shred logic if (shred_cut) and (len(candidate_hosts) > 1): logger.info(f"Removing rubin {release} shreds.") shred_idxs = find_shreds( candidate_hosts["objectId"].values, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, temp_mag_r, logger ) if len(shred_idxs) > 0: left_idxs = ~candidate_hosts.index.isin(shred_idxs) candidate_hosts = candidate_hosts[left_idxs] temp_mag_r = temp_mag_r[left_idxs] temp_mag_r_std = temp_mag_r_std[left_idxs] galaxies_pos = galaxies_pos[left_idxs] dlr_samples = dlr_samples[left_idxs] temp_sizes = temp_sizes[left_idxs] temp_sizes_std = temp_sizes_std[left_idxs] logger.info(f"Removed {len(shred_idxs)} flagged rubin sources.") else: logger.info("No rubin shreds found.") galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "rubin", release, logger) if galaxies is None: return None n_galaxies = len(galaxies) galaxies['objID_info'] = [f'rubin {release}']*n_galaxies if ('redshift' in calc_host_props) or ('absmag' in calc_host_props): galaxies["redshift_mean"] = np.nan galaxies["redshift_std"] = np.nan # if the source is within 1arcsec of a GLADE host, take that spec-z. if glade_catalog is not None: logger.debug("Cross-matching with GLADE for redshifts...") ra_min = transient_pos.ra.deg - 1 ra_max = transient_pos.ra.deg + 1 dec_min = transient_pos.dec.deg - 1 dec_max = transient_pos.dec.deg + 1 filtered_glade = glade_catalog[(glade_catalog["RAJ2000"] > ra_min) & (glade_catalog["RAJ2000"] < ra_max) & (glade_catalog["DEJ2000"] > dec_min) & (glade_catalog["DEJ2000"] < dec_max)] glade_catalog = filtered_glade if len(glade_catalog) >= 1: glade_coords = SkyCoord(glade_catalog["RAJ2000"], glade_catalog["DEJ2000"], unit=(u.deg, u.deg)) idx, seps, _ = match_coordinates_sky(galaxies_pos, glade_coords) mask_within_1arcsec = seps.arcsec < 1 galaxies["redshift_mean"][mask_within_1arcsec] = glade_catalog["redshift"].values[ idx[mask_within_1arcsec] ] galaxies["redshift_std"][mask_within_1arcsec] = glade_catalog["redshift_std"].values[ idx[mask_within_1arcsec] ] redshift_samples = norm.rvs( galaxies["redshift_mean"][:, np.newaxis], galaxies["redshift_std"][:, np.newaxis], size=(n_galaxies, n_samples), ) # set photometric redshift floor redshift_samples[redshift_samples < REDSHIFT_FLOOR] = REDSHIFT_FLOOR for i in range(n_galaxies): galaxies["redshift_samples"][i] = redshift_samples[i, :] if 'absmag' in calc_host_props: absmag_samples = ( norm.rvs( loc=temp_mag[:, np.newaxis], scale=temp_mag_std[:, np.newaxis], size=(n_galaxies, n_samples), ) - cosmo.distmod(redshift_samples).value ) galaxies['absmag_mean'] = temp_mag - cosmo.distmod(galaxies["redshift_mean"]).value galaxies['absmag_std'] = temp_mag_std galaxies["absmag_info"] = [band]*n_galaxies for i in range(n_galaxies): galaxies["absmag_samples"][i] = absmag_samples[i, :] if 'offset' in calc_host_props: # Calculate angular separation between SN and all galaxies (in arcseconds) offset_samples = galaxies_pos[:, None].separation(transient_pos_samples).arcsec for i in range(n_galaxies): galaxies['offset_samples'][i] = offset_samples[i, :] galaxies["dlr_samples"][i] = dlr_samples[i, :] galaxies['dlr_mean'] = np.nanmean(dlr_samples, axis=1) galaxies['dlr_std'] = np.nanstd(dlr_samples, axis=1) galaxies['size_mean'] = temp_sizes galaxies['size_std'] = temp_sizes_std galaxies['offset_mean'] = np.nanmean(offset_samples, axis=1) galaxies['offset_std'] = np.nanstd(offset_samples, axis=1) galaxies['offset_info'] = 'ARCSEC' return galaxies, cat_col_fields
[docs] def build_panstarrs_candidates( transient, cosmo, logger, glade_catalog=None, search_rad=None, n_samples=1000, calc_host_props=['redshift', 'absmag', 'offset'], cat_cols=False, release='dr2', shred_cut=True, dust_path=DEFAULT_DUST_PATH, ): """Populates a GalaxyCatalog object with candidates from a cone search of the panstarrs catalog (See https://outerspace.stsci.edu/display/PANSTARRS/ for details). Parameters ---------- transient : Transient Custom Transient object with name, position, and positional uncertainties defined. transient_pos : astropy.coord.SkyCoord Position of transient to associate. cosmo : astropy cosmology Assumed cosmology for conversions. logger : logging.Logger Logger instance for messages to console or output file. search_rad : astropy Angle Radius for cone search. n_samples : int Number of samples for Monte Carlo association. glade_catalog : pandas.DataFrame The locally-packaged GLADE catalog (to avoid querying). calc_host_props : list Properties to calculate internally for each host ('offset', 'redshift', 'absmag'). cat_cols : boolean If True, concatenates catalog fields for best host to final catalog. shred_cut : boolean If True, removes likely source shreds associated with the same candidate galaxy. dust_path : str Path to the dust map data files. Returns ------- galaxies : structured numpy array Array of properties for candidate sources needed for host association. cat_col_fields : list List of columns retrieved from the galaxy catalog (rather than calculated internally). """ transient_name = transient.name transient_pos = transient.position transient_pos_samples = transient.position_samples if shred_cut: logger.info("Running with shred_cut = True...") candidate_hosts = fetch_panstarrs_sources(transient_pos, search_rad, cat_cols, calc_host_props, logger, release) if glade_catalog is not None: glade_catalog.rename(columns={'z_best':'redshift', 'z_best_std':'redshift_std'}, inplace=True) if candidate_hosts is None: return None, [] if 'offset' in calc_host_props: temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std = calc_shape_props_panstarrs(candidate_hosts) galaxies_pos = SkyCoord( candidate_hosts["ra"].values * u.deg, candidate_hosts["dec"].values * u.deg ) dlr_samples = calc_dlr( transient_pos_samples, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=n_samples, ) temp_mag_r = candidate_hosts["KronMag"].values temp_mag_r_std = candidate_hosts["KronMagErr"].values.copy() # cap at 50% the mag # set a floor of 5% temp_mag_r_std[temp_mag_r_std > (SIGMA_ABSMAG_CEIL*temp_mag_r)] = SIGMA_ABSMAG_CEIL * temp_mag_r[temp_mag_r_std > (SIGMA_ABSMAG_CEIL*temp_mag_r)] temp_mag_r_std[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] = SIGMA_ABSMAG_FLOOR * temp_mag_r[temp_mag_r_std < (SIGMA_ABSMAG_FLOOR * temp_mag_r)] # shred logic if (shred_cut) and (len(candidate_hosts) > 1): logger.info(f"Removing panstarrs {release} shreds.") shred_idxs = find_shreds( candidate_hosts["objID"].values, galaxies_pos, temp_sizes, temp_sizes_std, a_over_b, a_over_b_std, phi, phi_std, temp_mag_r, logger, ) if len(shred_idxs) > 0: left_idxs = ~candidate_hosts.index.isin(shred_idxs) candidate_hosts = candidate_hosts[left_idxs] temp_mag_r = temp_mag_r[left_idxs] temp_mag_r_std = temp_mag_r_std[left_idxs] galaxies_pos = galaxies_pos[left_idxs] dlr_samples = dlr_samples[left_idxs] temp_sizes = temp_sizes[left_idxs] temp_sizes_std = temp_sizes_std[left_idxs] logger.info(f"Removed {len(shred_idxs)} flagged panstarrs sources.") else: logger.info("No panstarrs shreds found.") galaxies, cat_col_fields = build_galaxy_array(candidate_hosts, cat_cols, transient_name, "panstarrs", release, logger) if galaxies is None: return None, [] n_galaxies = len(galaxies) galaxies['objID_info'] = [f'pan-starrs {release}']*n_galaxies if ('redshift' in calc_host_props) or ('absmag' in calc_host_props): galaxies["redshift_mean"] = np.nan galaxies["redshift_std"] = np.nan # if the source is within 1arcsec of a GLADE host, take that spec-z. if glade_catalog is not None: logger.debug("Cross-matching with GLADE for redshifts...") ra_min = transient_pos.ra.deg - 1 ra_max = transient_pos.ra.deg + 1 dec_min = transient_pos.dec.deg - 1 dec_max = transient_pos.dec.deg + 1 filtered_glade = glade_catalog[(glade_catalog["RAJ2000"] > ra_min) & (glade_catalog["RAJ2000"] < ra_max) & (glade_catalog["DEJ2000"] > dec_min) & (glade_catalog["DEJ2000"] < dec_max)] glade_catalog = filtered_glade if len(glade_catalog) >= 1: glade_coords = SkyCoord(glade_catalog["RAJ2000"], glade_catalog["DEJ2000"], unit=(u.deg, u.deg)) idx, seps, _ = match_coordinates_sky(galaxies_pos, glade_coords) mask_within_1arcsec = seps.arcsec < 1 galaxies["redshift_mean"][mask_within_1arcsec] = glade_catalog["redshift"].values[ idx[mask_within_1arcsec] ] galaxies["redshift_std"][mask_within_1arcsec] = glade_catalog["redshift_std"].values[ idx[mask_within_1arcsec] ] redshift_samples = norm.rvs( galaxies["redshift_mean"][:, np.newaxis], galaxies["redshift_std"][:, np.newaxis], size=(n_galaxies, n_samples), ) # set photometric redshift floor redshift_samples[redshift_samples < REDSHIFT_FLOOR] = REDSHIFT_FLOOR for i in range(n_galaxies): galaxies["redshift_samples"][i] = redshift_samples[i, :] if 'absmag' in calc_host_props: absmag_samples = ( norm.rvs( loc=temp_mag_r[:, np.newaxis], scale=temp_mag_r_std[:, np.newaxis], size=(n_galaxies, n_samples), ) - cosmo.distmod(redshift_samples).value ) galaxies["absmag_mean"] = temp_mag_r - cosmo.distmod(galaxies["redshift_mean"]).value galaxies['absmag_std'] = temp_mag_r_std galaxies['absmag_info'] = ["r"]*n_galaxies for i in range(n_galaxies): galaxies["absmag_samples"][i] = absmag_samples[i, :] if 'offset' in calc_host_props: # Calculate angular separation between SN and all galaxies (in arcseconds) offset_samples = galaxies_pos[:, None].separation(transient_pos_samples).arcsec for i in range(n_galaxies): galaxies['offset_samples'][i] = offset_samples[i, :] galaxies["dlr_samples"][i] = dlr_samples[i, :] galaxies['dlr_mean'] = np.nanmean(dlr_samples, axis=1) galaxies['dlr_std'] = np.nanstd(dlr_samples, axis=1) galaxies['size_mean'] = temp_sizes galaxies['size_std'] = temp_sizes_std galaxies['offset_mean'] = np.nanmean(offset_samples, axis=1) galaxies['offset_std'] = np.nanstd(offset_samples, axis=1) galaxies['offset_info'] = 'ARCSEC' return galaxies, cat_col_fields
[docs] def calc_dlr(transient_pos_samples, galaxies_pos, a, a_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=1000): """Calculates the directional light radius (DLR) between candidate host and transient, following the general framework in Gupta et al. (2016). Parameters ---------- transient_pos_samples : array of astropy.coord.SkyCoord objects Array of transient positions with positional uncertainties galaxies_pos : array of astropy.coord.SkyCoord objects Positions of candidate host galaxies. a : np.ndarray Semi-major axes of candidates, in arcsec. a_std : np.ndarray Error in semi-major axes of candidates, in arcsec. a_over_b : np.ndarray Axis ratio (major/minor) of candidates. a_over_b_std : np.ndarray Error in axis ratio. phi : np.ndarray Position angles of sources, in radians. phi_std : np.ndarray Error in position angle. n_samples : type Number of DLR samples to draw for Monte Carlo association. Returns ------- dlr : np.ndarray 2D array of DLR samples of dimensionality (n_galaxies, n_samples). """ n_gals = len(galaxies_pos) transient_ra = transient_pos_samples.ra.deg transient_dec = transient_pos_samples.dec.deg if n_samples > 1: hosts_ra = galaxies_pos.ra.deg[:, np.newaxis] hosts_dec = galaxies_pos.dec.deg[:, np.newaxis] a = norm.rvs(loc=a[:, np.newaxis], scale=a_std[:, np.newaxis], size=(n_gals, n_samples)) a_over_b = norm.rvs( loc=a_over_b[:, np.newaxis], scale=a_over_b_std[:, np.newaxis], size=(n_gals, n_samples) ) phi = norm.rvs(loc=phi[:, np.newaxis], scale=phi_std[:, np.newaxis], size=(n_gals, n_samples)) else: hosts_ra = galaxies_pos.ra.deg hosts_dec = galaxies_pos.dec.deg xr = (transient_ra - hosts_ra) * 3600 yr = (transient_dec - hosts_dec) * 3600 gam = np.arctan2(xr, yr) theta = phi - gam dlr = a / np.sqrt(((a_over_b) * np.sin(theta)) ** 2 + (np.cos(theta)) ** 2) return dlr
[docs] def find_shreds(objids, coords, a, a_std, a_over_b, a_over_b_std, phi, phi_std, appmag, logger): """ Finds and removes potentially shredded sources. Drops the dimmer source in any pair where the separation is less than either galaxy's DLR (sep/DLR < 1). Now vectorized! Parameters ---------- objids : np.ndarray Catalog IDs of sources. coords : np.ndarray Astropy SkyCoord objects. a : np.ndarray Semi-major axes (arcsec). a_std : np.ndarray Errors in semi-major axes (arcsec). a_over_b : np.ndarray Axis ratios (major/minor). a_over_b_std : np.ndarray Errors in axis ratio. phi : np.ndarray Position angles (radians). phi_std : np.ndarray Errors in position angle. appmag : np.ndarray Apparent magnitude, in r. Returns ------- dropidxs : np.ndarray Indices of sources flagged as shreds. """ n = len(coords) if n < 2: return np.array([]) # Compute pairwise separations (n x n) seps = coords[:, None].separation(coords[None, :]).arcsec np.fill_diagonal(seps, np.inf) # Ignore self-comparisons #assume no positional uncertainty ra_err = np.finfo(float).eps * u.arcsec dec_err = np.finfo(float).eps * u.arcsec coord_samples = SkyCoord( ra=np.random.normal(coords.ra.deg, ra_err.to(u.deg).value, size=n)*u.deg, dec=np.random.normal(coords.dec.deg, dec_err.to(u.deg).value, size=n)*u.deg ) # Compute DLR for each source; assume calc_dlr returns a 1D array of length n dlr = calc_dlr(coord_samples, coords, a, a_std, a_over_b, a_over_b_std, phi, phi_std, n_samples=1) # Create the shred condition: compare each pair’s separation to both sources’ DLRs. # Broadcasting: for each (i,j), compare seps[i,j] < dlr[i] or seps[i,j] < dlr[j] shred_mask = (seps < dlr[:, None]) | (seps < dlr[None, :]) # Use upper-triangle indices to avoid duplicate comparisons. upper_i, upper_j = np.triu_indices(n, k=1) valid = shred_mask[upper_i, upper_j] pairs = np.column_stack((upper_i[valid], upper_j[valid])) # For each pair, choose the index of the dimmer source. i_vals, j_vals = pairs[:, 0], pairs[:, 1] dropidxs = np.where(appmag[i_vals] > appmag[j_vals], i_vals, j_vals) dropidxs = np.unique(dropidxs) # (Optional) Log remaining sources keepidxs = np.setdiff1d(np.arange(n), dropidxs) logger.debug(f"{len(keepidxs)} sources remaining after shredding:") for idx in keepidxs: logger.debug( f" objID {objids[idx]} | ra, dec: {coords[idx].ra.deg:.6f}, {coords[idx].dec.deg:.6f} | " f"App. Mag (r): {appmag[idx]:.2f} | DLR: {dlr[idx]:.2f}" ) return dropidxs
[docs] def is_service_available(url, timeout=5): try: requests.head(url, timeout=timeout) return True except Exception: return False
[docs] def get_ned_specz(ra, dec, search_radius=1.0, logger=None): """Query NED for spectroscopic redshift at given coordinates. This function queries the NASA/IPAC Extragalactic Database (NED) for objects near the specified coordinates and retrieves spectroscopic redshift information if available. It filters out photometric redshifts and attempts to get detailed redshift measurements with uncertainties. Parameters ---------- ra : float Right ascension in degrees. dec : float Declination in degrees. search_radius : float, optional Search radius in arcseconds. Default is 1.0 arcsec. logger : logging.Logger, optional Logger instance for messages. If None, no logging is performed. Returns ------- z_spec : float or None Spectroscopic redshift if found, otherwise None. z_spec_err : float or None Uncertainty in spectroscopic redshift if found, otherwise None. has_specz : bool True if a spectroscopic redshift was found, False otherwise. Notes ----- The function uses NED's 'Redshift Flag' to distinguish between spectroscopic and photometric redshifts. It rejects objects flagged as 'PHOT' and accepts those with empty flags (typically spectroscopic) or 'PUN' (published, uncorrected). When detailed redshift tables are available, the function filters out measurements marked with "Uncertain origin" and converts velocities to redshifts using z = v/c. If uncertainty information is not available, the function estimates a conservative uncertainty of either 50 km/s (converted to redshift) or 5% of the redshift value. Examples -------- >>> z, z_err, has_z = get_ned_specz(69.438805, -20.507317, search_radius=1.0) >>> if has_z: ... print(f"Spectroscopic redshift: {z:.4f} ± {z_err:.4f}") """ try: from astroquery.ipac.ned import Ned except ImportError: if logger: logger.warning("astroquery not available; cannot query NED for spectroscopic redshifts") return None, None, False try: coord = SkyCoord(ra=ra*u.deg, dec=dec*u.deg) # Query NED for nearby objects result_table = Ned.query_region(coord, radius=search_radius*u.arcsec) if len(result_table) == 0: if logger: logger.debug(f"No NED objects found within {search_radius} arcsec of RA={ra:.6f}, Dec={dec:.6f}") return None, None, False # Look for objects with spectroscopic redshifts # NED uses 'Redshift Flag' to indicate quality/type # Common flags: blank (usually spec-z), 'PHOT', 'PUN' (published, uncorrected) for row in result_table: if row['Redshift'] > 0: # Valid redshift exists z_flag = row['Redshift Flag'].strip() if row['Redshift Flag'] else '' # Accept if flag is empty (usually spec-z) or explicitly 'PUN' (published, uncorrected but usually spec) # Reject if flag is 'PHOT' (photometric) if z_flag != 'PHOT': z_spec = float(row['Redshift']) separation = row['Separation'] # in arcmin # Try to get detailed redshift info to find uncertainties try: obj_name = row['Object Name'] z_table = Ned.get_table(obj_name, table='redshifts') # Get the most reliable velocity measurement # Filter out entries marked as "Uncertain origin" reliable_z = z_table[~z_table['Qualifiers'].astype(str).str.contains('Uncertain', case=False, na=False)] if len(reliable_z) > 0: # Use first reliable measurement velocity = reliable_z['Published Velocity'][0] # km/s # Convert to redshift: z = v/c z_spec = velocity / 299792.458 # Estimate uncertainty as ~50 km/s / c (~0.00017) if not provided z_spec_err = 50.0 / 299792.458 else: # Use the value from main table, estimate 5% uncertainty z_spec_err = 0.05 * z_spec except Exception as e: # If detailed query fails, use 5% uncertainty estimate if logger: logger.debug(f"Could not retrieve detailed redshift info from NED: {e}") z_spec_err = 0.05 * z_spec if logger: logger.info( f"Found NED spectroscopic redshift: z={z_spec:.4f}±{z_spec_err:.4f} " f"at separation={separation:.3f} arcmin" ) return z_spec, z_spec_err, True # No spectroscopic redshift found if logger: logger.debug(f"NED objects found but no spectroscopic redshift within {search_radius} arcsec") return None, None, False except Exception as e: if logger: logger.warning(f"NED query failed: {e}") return None, None, False