Source code for alma_search.io

"""Input parsing and output-table helpers.

This module is responsible for normalizing user-supplied target catalogs,
combining and cleaning intermediate result rows, computing the final
observed-species flag, and writing the canonical CSV schema.
"""

from __future__ import annotations

import re
from pathlib import Path
from typing import Any, Sequence

import pandas as pd

from .lines import (
    DEFAULT_OBSERVED_SPECIES,
    INTERNAL_OBSERVED_COLUMN,
    has_observed_species_line,
    observed_species_column_name,
)
from .utils import (
    combine_scalar_values,
    format_float_text,
    format_ra_dec_strings,
    is_blank,
    normalize_whitespace,
    parse_ra_dec_to_degrees,
    stable_sort_numeric_strings,
    unique_preserve_order,
)

DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC = 30.0
DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC = 100.0
ARRAY_ORDER = ("12m", "7m", "TP")

INTERNAL_OUTPUT_COLUMNS = [
    "Name",
    "ra",
    "dec",
    "project_code",
    "alma_target_name",
    "alma_ra",
    "alma_dec",
    "distance_arcsec",
    "fov_arcsec",
    "observing_band",
    "telescope",
    "target_lines",
    "spectral_resolution_khz",
    "velocity_resolution_kms",
    "sensitivity_10kms_mjy_beam",
    "proposal_title",
    INTERNAL_OBSERVED_COLUMN,
]


[docs] def get_output_columns(species: Any) -> list[str]: """Return the exported column order for a specific observed-species label. Parameters ---------- species : Any User-supplied species name, for example ``"CO"`` or ``"HCN"``. Returns ------- list[str] Public CSV columns in the exact order used for export, with the final internal flag column renamed to the user-facing label. """ return [ observed_species_column_name(species) if column == INTERNAL_OBSERVED_COLUMN else column for column in INTERNAL_OUTPUT_COLUMNS ]
[docs] def compute_observed_species_flag( target_lines: Any, distance_arcsec: Any, fov_arcsec: Any, observed_species: Any, observed_distance_threshold_arcsec: float, observed_fov_threshold_arcsec: float, ) -> float: """Compute the internal observed-species score for one result row. The internal score is later converted to a simple ``Yes`` or ``No`` in the exported CSV. A positive value means the selected species is considered observed for that source. The score values are: - ``1.0`` when the inferred line is present and the ALMA pointing is within the distance threshold. - ``0.5`` when the inferred line is present, the pointing is farther away, but the field of view is large enough to still count as coverage. - ``0.0`` otherwise. Parameters ---------- target_lines : Any Comma-separated inferred line names for the row. distance_arcsec : Any Angular separation between the input target and ALMA pointing center. fov_arcsec : Any Approximate ALMA field of view in arcseconds. observed_species : Any Species to test, such as ``"CO"`` or ``"HCN"``. observed_distance_threshold_arcsec : float Distance threshold for a definite match. observed_fov_threshold_arcsec : float FOV threshold for the looser coverage case. Returns ------- float Internal score used during post-processing. """ if not has_observed_species_line(target_lines, observed_species): return 0.0 try: distance_value = float(distance_arcsec) except (TypeError, ValueError): return 0.0 if distance_value < observed_distance_threshold_arcsec: return 1.0 try: fov_value = float(fov_arcsec) except (TypeError, ValueError): return 0.0 if ( distance_value >= observed_distance_threshold_arcsec and fov_value > observed_fov_threshold_arcsec ): return 0.5 return 0.0
[docs] def observed_species_flag_to_label(value: Any) -> str: """Convert an internal observed-species score into a CSV label. Parameters ---------- value : Any Numeric score produced by :func:`compute_observed_species_flag`. Returns ------- str ``"Yes"`` when the score is greater than zero, otherwise ``"No"``. """ try: return "Yes" if float(value) > 0 else "No" except (TypeError, ValueError): return "No"
[docs] def build_no_match_row(input_name: str, input_ra_deg: float, input_dec_deg: float) -> dict[str, Any]: """Create a placeholder row for a target with no returned ALMA matches. Parameters ---------- input_name : str Original target name from the input catalog. input_ra_deg : float Input right ascension in decimal degrees. input_dec_deg : float Input declination in decimal degrees. Returns ------- dict[str, Any] Output row with coordinate fields filled and science metadata set to missing values. """ ra_text, dec_text = format_ra_dec_strings(input_ra_deg, input_dec_deg) return { "Name": input_name, "ra": ra_text, "dec": dec_text, "project_code": pd.NA, "alma_target_name": pd.NA, "alma_ra": pd.NA, "alma_dec": pd.NA, "distance_arcsec": pd.NA, "fov_arcsec": pd.NA, "observing_band": pd.NA, "telescope": pd.NA, "target_lines": pd.NA, "spectral_resolution_khz": pd.NA, "velocity_resolution_kms": pd.NA, "sensitivity_10kms_mjy_beam": pd.NA, "proposal_title": pd.NA, INTERNAL_OBSERVED_COLUMN: 0.0, }
[docs] def load_targets_from_table(df: pd.DataFrame) -> pd.DataFrame: """Normalize a tabular input catalog into ``Name``, ``ra_deg``, ``dec_deg``. Supported schemas are: - ``Name, ra_deg, dec_deg`` for decimal-degree coordinates. - ``Name, ra, dec`` for sexagesimal or decimal text coordinates. Parameters ---------- df : pandas.DataFrame Raw table read from a CSV-like input file. Returns ------- pandas.DataFrame Normalized target table with decimal-degree coordinates. Raises ------ ValueError If the required columns are missing or any coordinate row cannot be parsed. """ columns = {str(col): col for col in df.columns} required_degree = {"Name", "ra_deg", "dec_deg"} if required_degree.issubset(columns): targets = df.loc[:, ["Name", "ra_deg", "dec_deg"]].copy() targets["Name"] = targets["Name"].astype(str) targets["ra_deg"] = pd.to_numeric(targets["ra_deg"], errors="coerce") targets["dec_deg"] = pd.to_numeric(targets["dec_deg"], errors="coerce") elif {"Name", "ra", "dec"}.issubset(columns): normalized_rows: list[dict[str, Any]] = [] for idx, row in df.iterrows(): try: ra_deg, dec_deg = parse_ra_dec_to_degrees(row["ra"], row["dec"]) except Exception as exc: raise ValueError(f"Invalid RA/Dec values in input row {idx + 2}: {exc}") from exc normalized_rows.append( { "Name": str(row["Name"]), "ra_deg": ra_deg, "dec_deg": dec_deg, } ) targets = pd.DataFrame(normalized_rows, columns=["Name", "ra_deg", "dec_deg"]) else: raise ValueError( "Input table must contain either Name,ra_deg,dec_deg or Name,ra,dec columns" ) invalid = targets["ra_deg"].isna() | targets["dec_deg"].isna() if invalid.any(): bad_rows = (targets.index[invalid] + 2).tolist() raise ValueError(f"Invalid RA/Dec values in input rows: {bad_rows}") return targets
[docs] def load_targets_from_text(path: str) -> pd.DataFrame: """Load targets from a plain-text coordinate list. Each non-comment line must have the form ``Name,RA DEC`` where the coordinate tokens can be decimal degrees or sexagesimal text. Parameters ---------- path : str Path to the plain-text input file. Returns ------- pandas.DataFrame Normalized target table with columns ``Name``, ``ra_deg``, and ``dec_deg``. Raises ------ ValueError If any line cannot be parsed into a valid name and coordinate pair. """ normalized_rows: list[dict[str, Any]] = [] with open(path, "r", encoding="utf-8") as handle: for line_number, raw_line in enumerate(handle, start=1): line = raw_line.strip() if not line or line.startswith("#"): continue parts = [part.strip() for part in line.split(",", maxsplit=1)] if len(parts) != 2: raise ValueError( f"Could not parse line {line_number}: expected 'Name,RA DEC', got {raw_line.rstrip()!r}" ) name, coord_text = parts coord_parts = coord_text.split() if len(coord_parts) != 2: raise ValueError( f"Could not parse coordinates on line {line_number}: {coord_text!r}" ) try: ra_deg, dec_deg = parse_ra_dec_to_degrees(coord_parts[0], coord_parts[1]) except Exception as exc: raise ValueError(f"Invalid coordinates on line {line_number}: {exc}") from exc normalized_rows.append( { "Name": name, "ra_deg": ra_deg, "dec_deg": dec_deg, } ) return pd.DataFrame(normalized_rows, columns=["Name", "ra_deg", "dec_deg"])
[docs] def load_targets(path: str, logger: Any | None = None) -> pd.DataFrame: """Load a target catalog from CSV-like or plain-text input. The function first attempts structured CSV parsing. If that fails, it falls back to the plain-text parser used for line-based coordinate lists. Parameters ---------- path : str Input file path. logger : Any | None, optional Optional logger used to report fallback decisions. Returns ------- pandas.DataFrame Normalized target catalog in decimal degrees. """ input_path = Path(path) try: df = pd.read_csv(path) return load_targets_from_table(df) except (pd.errors.EmptyDataError, pd.errors.ParserError, UnicodeDecodeError, ValueError): if logger is not None: logger.debug("Falling back to plain-text target parsing for %s", input_path) if input_path.suffix.lower() not in {".txt", ".dat", ".list", ".csv"} and logger is not None: logger.debug("Attempting plain-text parse for unsupported extension %s", input_path.suffix) return load_targets_from_text(path)
[docs] def combine_arrays(values: Sequence[str]) -> str: """Combine classified array labels into the canonical order. Parameters ---------- values : sequence[str] Array classifications such as ``"12m"``, ``"7m"``, or comma-separated combinations from multiple rows. Returns ------- str Unique array labels joined in ``12m,7m,TP`` order. """ flattened: list[str] = [] for value in values: if is_blank(value): continue flattened.extend(part.strip() for part in str(value).split(",") if part.strip()) unique = unique_preserve_order(flattened) ordered = [array for array in ARRAY_ORDER if array in unique] return ",".join(ordered)
[docs] def combine_bands(values: Sequence[Any]) -> str: """Combine ALMA band metadata into a sorted comma-separated string. Parameters ---------- values : sequence[Any] Raw ``band_list`` field values, potentially containing repeated values or multiple delimiters. Returns ------- str Unique bands sorted numerically when possible. """ tokens: list[str] = [] for value in values: if is_blank(value): continue for piece in re.split(r"[,\s;/|]+", str(value)): cleaned = piece.strip() if cleaned: tokens.append(cleaned) return ",".join(stable_sort_numeric_strings(tokens))
[docs] def combine_lines(values: Sequence[str]) -> str: """Combine inferred line labels from multiple rows. Parameters ---------- values : sequence[str] Comma-separated line lists, often from multiple observations being merged into one output row. Returns ------- str Unique line names in first-seen order. Returns ``"Unknown"`` only when no explicit line name is available but at least one source row was marked as unknown. """ items: list[str] = [] unknown_seen = False for value in values: if is_blank(value): continue for piece in str(value).split(","): cleaned = piece.strip() if not cleaned: continue if cleaned == "Unknown": unknown_seen = True continue items.append(cleaned) unique_items = unique_preserve_order(items) if unique_items: return ",".join(unique_items) return "Unknown" if unknown_seen else ""
[docs] def blank_string_to_na(value: Any) -> Any: """Convert blank strings to :data:`pandas.NA`. Parameters ---------- value : Any Scalar value to normalize. Returns ------- Any ``pandas.NA`` for empty strings, otherwise the original value. """ if isinstance(value, str) and not value.strip(): return pd.NA return value
[docs] def finalize_results( df: pd.DataFrame, observed_species: Any = DEFAULT_OBSERVED_SPECIES, observed_distance_threshold_arcsec: float = DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC, observed_fov_threshold_arcsec: float = DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC, ) -> pd.DataFrame: """Apply final cleanup and derive the observed-species flag column. Parameters ---------- df : pandas.DataFrame Intermediate result table using the internal schema. observed_species : Any, optional Species used for the final observed-in-ALMA decision. observed_distance_threshold_arcsec : float, optional Distance threshold for a definite observed flag. observed_fov_threshold_arcsec : float, optional FOV threshold for the looser observed flag. Returns ------- pandas.DataFrame Cleaned result table with normalized missing values and a groupwise observed-species score propagated across rows with the same source ``Name``. """ if df.empty: return df.copy() output = df.copy() for field in ( "project_code", "alma_target_name", "alma_ra", "alma_dec", "observing_band", "telescope", "target_lines", "spectral_resolution_khz", "velocity_resolution_kms", "sensitivity_10kms_mjy_beam", "proposal_title", ): output[field] = output[field].map(blank_string_to_na) output[INTERNAL_OBSERVED_COLUMN] = output.apply( lambda row: compute_observed_species_flag( target_lines=row.get("target_lines"), distance_arcsec=row.get("distance_arcsec"), fov_arcsec=row.get("fov_arcsec"), observed_species=observed_species, observed_distance_threshold_arcsec=observed_distance_threshold_arcsec, observed_fov_threshold_arcsec=observed_fov_threshold_arcsec, ), axis=1, ) output[INTERNAL_OBSERVED_COLUMN] = output.groupby("Name", dropna=False)[INTERNAL_OBSERVED_COLUMN].transform("max") return output
[docs] def select_cleaner_rows( df: pd.DataFrame, observed_species: Any = DEFAULT_OBSERVED_SPECIES, max_observed_rows_per_name: int = 5, ) -> pd.DataFrame: """ Reduce the final table to a smaller human-review subset. Rules ----- - Keep one unmatched row when a source has no ALMA match. - Keep up to ``max_observed_rows_per_name`` closest rows when the selected observed species exists. - Otherwise keep the single closest row for that source. Parameters ---------- df : pandas.DataFrame Finalized result table. observed_species : Any, optional Species used to decide whether a source has relevant rows. max_observed_rows_per_name : int, optional Maximum number of closest species-matching rows to keep per source. Returns ------- pandas.DataFrame Filtered table sorted by source name and distance. """ if df.empty: return df.copy() ordered = df.copy() ordered["_distance_sort"] = pd.to_numeric(ordered["distance_arcsec"], errors="coerce") ordered = ordered.sort_values(["Name", "_distance_sort"], kind="stable", na_position="last") selected_groups: list[pd.DataFrame] = [] for _, group in ordered.groupby("Name", sort=False, dropna=False): unmatched = group[group["project_code"].isna()] if not unmatched.empty: selected_groups.append(unmatched.head(1)) continue species_rows = group[ group["target_lines"].fillna("").map( lambda value: has_observed_species_line(value, observed_species) ) ] if not species_rows.empty: selected_groups.append(species_rows.head(max_observed_rows_per_name)) else: selected_groups.append(group.head(1)) cleaned = pd.concat(selected_groups, ignore_index=True) cleaned = cleaned.sort_values(["Name", "_distance_sort"], kind="stable", na_position="last") return cleaned.drop(columns=["_distance_sort"])
[docs] def deduplicate_results( df: pd.DataFrame, dedup_level: str, observed_species: Any = DEFAULT_OBSERVED_SPECIES, observed_distance_threshold_arcsec: float = DEFAULT_OBSERVED_DISTANCE_THRESHOLD_ARCSEC, observed_fov_threshold_arcsec: float = DEFAULT_OBSERVED_FOV_THRESHOLD_ARCSEC, ) -> pd.DataFrame: """Deduplicate raw result rows before final export. Parameters ---------- df : pandas.DataFrame Raw per-observation result rows in the internal schema. dedup_level : str Deduplication mode. Supported values are ``"none"``, ``"project"``, and ``"project_target"``. observed_species : Any, optional Species used for the final observed-species flag. observed_distance_threshold_arcsec : float, optional Distance threshold for a definite observed flag. observed_fov_threshold_arcsec : float, optional FOV threshold for the looser observed flag. Returns ------- pandas.DataFrame Deduplicated and finalized output rows. Raises ------ ValueError If ``dedup_level`` is not one of the supported values. """ if df.empty or dedup_level == "none": return finalize_results( df, observed_species=observed_species, observed_distance_threshold_arcsec=observed_distance_threshold_arcsec, observed_fov_threshold_arcsec=observed_fov_threshold_arcsec, ) group_keys = ["Name", "ra", "dec", "project_code"] if dedup_level == "project_target": group_keys.append("alma_target_name") elif dedup_level != "project": raise ValueError(f"Unsupported dedup_level: {dedup_level}") grouped_rows: list[dict[str, Any]] = [] for _, group in df.groupby(group_keys, dropna=False, sort=False): numeric_distance = pd.to_numeric(group["distance_arcsec"], errors="coerce") if numeric_distance.notna().any(): best_idx = numeric_distance.idxmin() else: best_idx = group.index[0] best = group.loc[best_idx].to_dict() numeric_fov = pd.to_numeric(group["fov_arcsec"], errors="coerce") best["fov_arcsec"] = round(numeric_fov.max(), 3) if numeric_fov.notna().any() else pd.NA best["observing_band"] = blank_string_to_na(combine_bands(group["observing_band"].tolist())) best["telescope"] = blank_string_to_na(combine_arrays(group["telescope"].tolist())) best["target_lines"] = blank_string_to_na(combine_lines(group["target_lines"].tolist())) best["spectral_resolution_khz"] = combine_scalar_values(group["spectral_resolution_khz"].tolist(), digits=3) best["velocity_resolution_kms"] = combine_scalar_values(group["velocity_resolution_kms"].tolist(), digits=3) best["sensitivity_10kms_mjy_beam"] = combine_scalar_values(group["sensitivity_10kms_mjy_beam"].tolist(), digits=3) if is_blank(best.get("proposal_title")): candidates = [normalize_whitespace(v) for v in group["proposal_title"].tolist()] for candidate in candidates: if candidate: best["proposal_title"] = candidate break grouped_rows.append(best) return finalize_results( pd.DataFrame(grouped_rows, columns=INTERNAL_OUTPUT_COLUMNS), observed_species=observed_species, observed_distance_threshold_arcsec=observed_distance_threshold_arcsec, observed_fov_threshold_arcsec=observed_fov_threshold_arcsec, )
[docs] def write_csv(df: pd.DataFrame, path: str, observed_species: Any = DEFAULT_OBSERVED_SPECIES) -> None: """Write the public CSV output file. Parameters ---------- df : pandas.DataFrame Final result table in the internal schema. path : str Destination CSV file path. observed_species : Any, optional Species name used to label the final ``Observed ... in ALMA?`` column. """ output = df.copy() for column in INTERNAL_OUTPUT_COLUMNS: if column not in output.columns: output[column] = pd.NA output = output.loc[:, INTERNAL_OUTPUT_COLUMNS] output[INTERNAL_OBSERVED_COLUMN] = output[INTERNAL_OBSERVED_COLUMN].map(observed_species_flag_to_label) output = output.rename(columns={INTERNAL_OBSERVED_COLUMN: observed_species_column_name(observed_species)}) output["_distance_sort"] = pd.to_numeric(output["distance_arcsec"], errors="coerce") output = output.sort_values(by=["Name", "_distance_sort"], ascending=[True, True], kind="stable", na_position="last") output = output.drop(columns=["_distance_sort"]) output.to_csv(path, index=False, na_rep="NaN")