Source code for alma_search.search

"""ALMA archive querying and result-row construction.

This module contains the logic that talks to the ALMA TAP service, interprets
frequency metadata, infers likely spectral lines from frequency coverage, and
translates archive rows into the package's internal output schema.
"""

from __future__ import annotations

import logging
import re
from typing import Any

import pandas as pd

from .io import INTERNAL_OBSERVED_COLUMN, combine_bands
from .lines import LINE_CATALOG_GHZ
from .utils import (
    format_ra_dec_strings,
    is_blank,
    normalize_whitespace,
    safe_get,
    to_optional_float,
    unique_preserve_order,
)

LOGGER = logging.getLogger("alma_nearby_search")
ALMA_TAP_URL = "https://almascience.eso.org/tap"
DEFAULT_RADIUS_ARCMIN = 5.0
DEFAULT_LINE_TOLERANCE_KMS = 350.0
ARRAY_ORDER = ("12m", "7m", "TP")


[docs] def create_tap_service(tap_url: str = ALMA_TAP_URL) -> Any: """Create a TAP client for the ALMA science archive. Parameters ---------- tap_url : str, optional TAP endpoint URL. The default points to the public ALMA archive. Returns ------- Any ``pyvo.dal.TAPService`` instance. Raises ------ ImportError If :mod:`pyvo` is not installed in the current Python environment. """ try: import pyvo except ImportError as exc: raise ImportError("pyvo is not installed. Install it with: pip install pyvo") from exc return pyvo.dal.TAPService(tap_url)
[docs] def build_adql_query(ra_deg: float, dec_deg: float, radius_deg: float) -> str: """Build the ADQL cone-search query used against ALMA ObsCore. Parameters ---------- ra_deg : float Cone center right ascension in decimal degrees. dec_deg : float Cone center declination in decimal degrees. radius_deg : float Search radius in decimal degrees. Returns ------- str ADQL query string selecting the ObsCore fields needed by this package. """ return f""" SELECT proposal_id, target_name, s_ra, s_dec, s_fov, band_list, frequency_support, obs_title, obs_creator_name, instrument_name, antenna_arrays, spectral_resolution, velocity_resolution, sensitivity_10kms, em_min, em_max, obs_id, member_ous_uid FROM ivoa.obscore WHERE 1 = CONTAINS( POINT('ICRS', s_ra, s_dec), CIRCLE('ICRS', {ra_deg:.10f}, {dec_deg:.10f}, {radius_deg:.10f}) ) """
[docs] def query_alma_cone( service: Any, ra_deg: float, dec_deg: float, radius_arcmin: float, ) -> pd.DataFrame: """Query the ALMA archive around one target position. Parameters ---------- service : Any TAP service client, typically created by :func:`create_tap_service`. ra_deg : float Cone center right ascension in decimal degrees. dec_deg : float Cone center declination in decimal degrees. radius_arcmin : float Cone-search radius in arcminutes. Returns ------- pandas.DataFrame Query results as a pandas table. Returns an empty frame when the query completes successfully but finds no rows. """ radius_deg = radius_arcmin / 60.0 adql = build_adql_query(ra_deg=ra_deg, dec_deg=dec_deg, radius_deg=radius_deg) LOGGER.debug("Submitting ADQL query for RA=%.6f Dec=%.6f", ra_deg, dec_deg) result = service.search(adql) table = result.to_table() if len(table) == 0: return pd.DataFrame() df = table.to_pandas() df.columns = [str(col) for col in df.columns] return df
[docs] def parse_frequency_support(frequency_support: Any) -> list[tuple[float, float]]: """ Parse the ALMA ``frequency_support`` metadata field into GHz intervals. The field often contains text fragments like: [87.30..89.17GHz, ...] 1.23456E+11..1.24567E+11Hz 230.1 .. 232.0 GHz U 234.0 .. 236.0 GHz The parser is intentionally permissive and extracts every interval that looks like ``number .. number unit``. Parameters ---------- frequency_support : Any Raw ObsCore ``frequency_support`` value. Returns ------- list[tuple[float, float]] List of ``(low_ghz, high_ghz)`` intervals. The list is empty when no recognizable interval is present. """ if is_blank(frequency_support): return [] text = str(frequency_support) interval_pattern = re.compile( r"([0-9]+(?:\.[0-9]+)?(?:[eE][+\-]?[0-9]+)?)\s*\.\.\s*" r"([0-9]+(?:\.[0-9]+)?(?:[eE][+\-]?[0-9]+)?)\s*" r"(GHz|MHz|kHz|Hz)", flags=re.IGNORECASE, ) factor_by_unit = { "ghz": 1.0, "mhz": 1e-3, "khz": 1e-6, "hz": 1e-9, } intervals: list[tuple[float, float]] = [] for match in interval_pattern.finditer(text): low = float(match.group(1)) high = float(match.group(2)) unit = match.group(3).lower() factor = factor_by_unit[unit] low_ghz = low * factor high_ghz = high * factor intervals.append((min(low_ghz, high_ghz), max(low_ghz, high_ghz))) return intervals
[docs] def coarse_frequency_interval_from_em(em_min: Any, em_max: Any) -> list[tuple[float, float]]: """Convert wavelength bounds into a coarse frequency interval. Parameters ---------- em_min : Any Minimum wavelength in meters from ObsCore. em_max : Any Maximum wavelength in meters from ObsCore. Returns ------- list[tuple[float, float]] Single coarse ``(low_ghz, high_ghz)`` interval, or an empty list when the wavelength bounds are unavailable or invalid. """ if is_blank(em_min) or is_blank(em_max): return [] try: lam_min_m = float(em_min) lam_max_m = float(em_max) except (TypeError, ValueError): return [] if lam_min_m <= 0 or lam_max_m <= 0: return [] c_m_s = 299792458.0 f1_ghz = (c_m_s / lam_min_m) / 1e9 f2_ghz = (c_m_s / lam_max_m) / 1e9 return [(min(f1_ghz, f2_ghz), max(f1_ghz, f2_ghz))]
[docs] def infer_lines( frequency_support: Any, em_min: Any, em_max: Any, line_velocity_tolerance_kms: float, line_catalog_ghz: dict[str, float] | None = None, ) -> str: """Infer likely spectral lines covered by an ALMA observation. Parameters ---------- frequency_support : Any Raw spectral-coverage metadata string from ObsCore. em_min : Any Minimum wavelength in meters, used as a fallback when ``frequency_support`` is absent or unparsable. em_max : Any Maximum wavelength in meters, used alongside ``em_min``. line_velocity_tolerance_kms : float Velocity tolerance used to widen the rest-frequency matching window. line_catalog_ghz : dict[str, float] | None, optional Optional replacement catalog mapping line labels to rest frequencies in GHz. Returns ------- str Comma-separated matched line names, or ``"Unknown"`` when no reliable coverage interval could be inferred. """ catalog = line_catalog_ghz or LINE_CATALOG_GHZ intervals = parse_frequency_support(frequency_support) if not intervals: intervals = coarse_frequency_interval_from_em(em_min, em_max) if not intervals: return "Unknown" c_kms = 299792.458 matched: list[str] = [] for line_name, rest_ghz in catalog.items(): tol_ghz = rest_ghz * line_velocity_tolerance_kms / c_kms for low_ghz, high_ghz in intervals: if (low_ghz - tol_ghz) <= rest_ghz <= (high_ghz + tol_ghz): matched.append(line_name) break matches = unique_preserve_order(matched) return ",".join(matches) if matches else "Unknown"
[docs] def classify_array(instrument_name: Any) -> str: """Classify an ALMA observation using only the instrument name field. Parameters ---------- instrument_name : Any Raw ALMA instrument metadata. Returns ------- str Comma-separated array labels such as ``"12m"`` or ``"7m,TP"``. """ return classify_array_from_metadata(instrument_name=instrument_name, antenna_arrays="")
[docs] def classify_array_from_metadata(instrument_name: Any, antenna_arrays: Any) -> str: """Classify ALMA array usage from instrument and antenna metadata. Parameters ---------- instrument_name : Any Raw ObsCore instrument metadata. antenna_arrays : Any Raw antenna-array metadata, often containing antenna IDs that reveal the array type. Returns ------- str Canonical array classification assembled from the available metadata. """ instrument_text = "" if is_blank(instrument_name) else str(instrument_name).lower() antenna_text = "" if is_blank(antenna_arrays) else str(antenna_arrays) found: list[str] = [] tp_patterns = ("total power", "tp array", "tp-", " tp", "aca tp") if any(pattern in instrument_text for pattern in tp_patterns) or re.search(r":PM\d+\b", antenna_text): found.append("TP") seven_m_patterns = ("7m", "7-m", "morita", "aca", "compact array") if any(pattern in instrument_text for pattern in seven_m_patterns) or re.search(r":CM\d+\b", antenna_text): found.append("7m") twelve_m_patterns = ("12m", "12-m", "main array") if any(pattern in instrument_text for pattern in twelve_m_patterns) or re.search(r":(?:DA|DV)\d+\b", antenna_text): found.append("12m") ordered = [array for array in ARRAY_ORDER if array in found] return ",".join(ordered)
[docs] def rows_from_query_results( input_name: str, input_ra_deg: float, input_dec_deg: float, query_df: pd.DataFrame, line_velocity_tolerance_kms: float, ) -> list[dict[str, Any]]: """Transform raw ALMA query rows into the package's internal row schema. Parameters ---------- input_name : str Source name from the user's input catalog. input_ra_deg : float Input source right ascension in decimal degrees. input_dec_deg : float Input source declination in decimal degrees. query_df : pandas.DataFrame Raw ALMA query results for this source. line_velocity_tolerance_kms : float Velocity tolerance passed through to :func:`infer_lines`. Returns ------- list[dict[str, Any]] One internal output row per valid ALMA archive row. """ if query_df.empty: return [] import astropy.units as u from astropy.coordinates import SkyCoord origin = SkyCoord(input_ra_deg * u.deg, input_dec_deg * u.deg, frame="icrs") input_ra_text, input_dec_text = format_ra_dec_strings(input_ra_deg, input_dec_deg) output_rows: list[dict[str, Any]] = [] for _, row in query_df.iterrows(): alma_ra = safe_get(row, "s_ra", "") alma_dec = safe_get(row, "s_dec", "") try: alma_ra_float = float(alma_ra) alma_dec_float = float(alma_dec) except (TypeError, ValueError): LOGGER.debug("Skipping row with invalid ALMA position: %s", row.to_dict()) continue alma_coord = SkyCoord(alma_ra_float * u.deg, alma_dec_float * u.deg, frame="icrs") distance_arcsec = origin.separation(alma_coord).arcsecond alma_ra_text, alma_dec_text = format_ra_dec_strings(alma_ra_float, alma_dec_float) s_fov = safe_get(row, "s_fov", "") fov_arcsec = to_optional_float(s_fov, scale=3600.0, digits=3) project_code = normalize_whitespace(safe_get(row, "proposal_id", "")) alma_target_name = normalize_whitespace(safe_get(row, "target_name", "")) observing_band = combine_bands([safe_get(row, "band_list", "")]) telescope = classify_array_from_metadata( instrument_name=safe_get(row, "instrument_name", ""), antenna_arrays=safe_get(row, "antenna_arrays", ""), ) target_lines = infer_lines( frequency_support=safe_get(row, "frequency_support", ""), em_min=safe_get(row, "em_min", ""), em_max=safe_get(row, "em_max", ""), line_velocity_tolerance_kms=line_velocity_tolerance_kms, ) output_rows.append( { "Name": input_name, "ra": input_ra_text, "dec": input_dec_text, "project_code": project_code, "alma_target_name": alma_target_name, "alma_ra": alma_ra_text, "alma_dec": alma_dec_text, "distance_arcsec": round(distance_arcsec, 3), "fov_arcsec": fov_arcsec, "observing_band": observing_band, "telescope": telescope, "target_lines": target_lines, "spectral_resolution_khz": to_optional_float(safe_get(row, "spectral_resolution", "")), "velocity_resolution_kms": to_optional_float( safe_get(row, "velocity_resolution", ""), scale=1e-3, ), "sensitivity_10kms_mjy_beam": to_optional_float(safe_get(row, "sensitivity_10kms", "")), "proposal_title": normalize_whitespace(safe_get(row, "obs_title", "")), INTERNAL_OBSERVED_COLUMN: pd.NA, } ) return output_rows