Source code for alma_search.cli

"""Command-line entry point for ALMA Search.

This module turns user input files into ALMA archive queries, collects the raw
matches, applies deduplication and cleaner-row selection, and finally writes
the public CSV output.
"""

from __future__ import annotations

import argparse
import logging
import sys
from typing import Sequence

import pandas as pd

from .io import (
    DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC,
    DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC,
    INTERNAL_OUTPUT_COLUMNS,
    build_no_match_row,
    deduplicate_results,
    load_targets,
    select_cleaner_rows,
    write_csv,
)
from .lines import DEFAULT_OBSERVED_SPECIES
from .search import DEFAULT_LINE_TOLERANCE_KMS, DEFAULT_RADIUS_ARCMIN, create_tap_service, query_alma_cone, rows_from_query_results
from .utils import configure_logging

LOGGER = logging.getLogger("alma_nearby_search")


[docs] def parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: """Parse command-line arguments for the ``alma_search`` executable. Parameters ---------- argv : sequence[str] | None, optional Optional argument list. When ``None``, :mod:`argparse` reads from the live command line. Returns ------- argparse.Namespace Parsed options and positional arguments used by :func:`main`. """ parser = argparse.ArgumentParser( description="Search the ALMA archive near input sky positions and export matches to CSV." ) parser.add_argument("input_csv", help="Input CSV/text with Name and coordinates") parser.add_argument("output_csv", help="Output CSV path") parser.add_argument( "--radius-arcmin", type=float, default=DEFAULT_RADIUS_ARCMIN, help=f"Cone-search radius in arcmin. Default: {DEFAULT_RADIUS_ARCMIN}", ) parser.add_argument( "--dedup-level", choices=("none", "project", "project_target"), default="project_target", help="Deduplication level for output rows. Default: project_target", ) parser.add_argument( "--line-velocity-tolerance-kms", type=float, default=DEFAULT_LINE_TOLERANCE_KMS, help=f"Velocity tolerance for line matching in km/s. Default: {DEFAULT_LINE_TOLERANCE_KMS}", ) parser.add_argument( "--verbose", action="store_true", help="Enable debug logging", ) parser.add_argument( "--observed-species", default=DEFAULT_OBSERVED_SPECIES, help=f"Species used for the final observed-in-ALMA flag and cleaner selection. Default: {DEFAULT_OBSERVED_SPECIES}", ) parser.add_argument( "--observed-distance-threshold-arcsec", type=float, default=DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC, help=( "Distance threshold in arcsec for assigning a value of 1 to the observed-species flag. " f"Default: {DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC}" ), ) parser.add_argument( "--observed-fov-threshold-arcsec", type=float, default=DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC, help=( "FOV threshold in arcsec for assigning a value of 0.5 when outside the distance threshold. " f"Default: {DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC}" ), ) parser.add_argument( "--cleaner", action="store_true", help="Write a reduced output table: keep unmatched rows, up to N closest rows for the selected observed species per source, otherwise one closest row", ) parser.add_argument( "--cleaner-max-observed-rows-per-name", "--cleaner-max-co-rows-per-name", dest="cleaner_max_observed_rows_per_name", type=int, default=5, help="Maximum number of closest rows to keep per source for the selected observed species when --cleaner is used. Default: 5", ) return parser.parse_args(argv)
[docs] def main(argv: Sequence[str] | None = None) -> int: """Execute the end-to-end ALMA search workflow. The workflow is: 1. Parse command-line options. 2. Load and normalize target coordinates. 3. Query the ALMA TAP service once per target. 4. Convert raw archive rows into output rows. 5. Deduplicate, optionally apply the cleaner filter, and write CSV. Parameters ---------- argv : sequence[str] | None, optional Optional command-line arguments to parse. Returns ------- int Exit status code. ``0`` means success, ``1`` means a local validation or file-processing error, and ``2`` means every remote ALMA query failed so no trustworthy output could be produced. """ args = parse_args(argv) configure_logging(verbose=args.verbose) if args.radius_arcmin <= 0: LOGGER.error("--radius-arcmin must be positive") return 1 if args.line_velocity_tolerance_kms < 0: LOGGER.error("--line-velocity-tolerance-kms must be non-negative") return 1 if args.observed_distance_threshold_arcsec < 0: LOGGER.error("--observed-distance-threshold-arcsec must be non-negative") return 1 if args.observed_fov_threshold_arcsec < 0: LOGGER.error("--observed-fov-threshold-arcsec must be non-negative") return 1 if args.cleaner_max_observed_rows_per_name <= 0: LOGGER.error("--cleaner-max-observed-rows-per-name must be positive") return 1 try: targets = load_targets(args.input_csv, logger=LOGGER) except Exception as exc: LOGGER.error("Failed to load input targets: %s", exc) return 1 try: service = create_tap_service() except Exception as exc: LOGGER.error("Failed to initialize ALMA TAP service: %s", exc) return 1 all_rows: list[dict[str, object]] = [] query_failures: list[str] = [] successful_queries = 0 for target in targets.itertuples(index=False): LOGGER.info( "Searching ALMA archive for %s at RA=%.6f Dec=%.6f within %.3f arcmin", target.Name, target.ra_deg, target.dec_deg, args.radius_arcmin, ) try: query_df = query_alma_cone( service=service, ra_deg=float(target.ra_deg), dec_deg=float(target.dec_deg), radius_arcmin=float(args.radius_arcmin), ) except Exception as exc: LOGGER.exception("ALMA TAP query failed for target %s: %s", target.Name, exc) query_failures.append(str(target.Name)) continue successful_queries += 1 rows = rows_from_query_results( input_name=str(target.Name), input_ra_deg=float(target.ra_deg), input_dec_deg=float(target.dec_deg), query_df=query_df, line_velocity_tolerance_kms=float(args.line_velocity_tolerance_kms), ) LOGGER.info("Found %d raw matches for %s", len(rows), target.Name) if rows: all_rows.extend(rows) else: all_rows.append( build_no_match_row( input_name=str(target.Name), input_ra_deg=float(target.ra_deg), input_dec_deg=float(target.dec_deg), ) ) if all_rows: raw_df = pd.DataFrame(all_rows, columns=INTERNAL_OUTPUT_COLUMNS) result_df = deduplicate_results( raw_df, args.dedup_level, observed_species=args.observed_species, observed_distance_threshold_arcsec=float(args.observed_distance_threshold_arcsec), observed_fov_threshold_arcsec=float(args.observed_fov_threshold_arcsec), ) else: result_df = pd.DataFrame(columns=INTERNAL_OUTPUT_COLUMNS) if args.cleaner: result_df = select_cleaner_rows( result_df, observed_species=args.observed_species, max_observed_rows_per_name=int(args.cleaner_max_observed_rows_per_name), ) if query_failures and successful_queries == 0: LOGGER.error( "All %d ALMA TAP queries failed. No output file was written. " "This is most likely a network or ALMA TAP service problem, not a true no-match result.", len(query_failures), ) return 2 if query_failures: LOGGER.warning( "%d target queries failed and were omitted from the output: %s", len(query_failures), ", ".join(query_failures[:10]) + ("..." if len(query_failures) > 10 else ""), ) try: write_csv(result_df, args.output_csv, observed_species=args.observed_species) except Exception as exc: LOGGER.error("Failed to write output CSV: %s", exc) return 1 LOGGER.info("Wrote %d rows to %s", len(result_df), args.output_csv) return 0
if __name__ == "__main__": sys.exit(main())