"""Step 2: PhenoCam GCC + SNR screening on step-1 cache. Inputs (``data/``, ``{year}`` = ``--evaluation-year``): - ``phenocam/{year}.json`` — step-1 manifest - ``phenocam/{year}/{sitename}.json`` — per-site metadata - ``phenocam/{year}/{sitename}_1day.csv`` — GCC timeseries Outputs (``data/phenocam_screening/``): - ``{year}.json`` — full per-site results - ``{year}.csv`` — flat summary table CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional; default: all manifest sites). Next step: :mod:`3-sentinel-clouds`. """ from __future__ import annotations import argparse import csv import json import math import sys from datetime import date, datetime from pathlib import Path from typing import Any import numpy as np from scipy.interpolate import UnivariateSpline PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing" if str(PROCESSING_DIR) not in sys.path: sys.path.insert(0, str(PROCESSING_DIR)) from acquisition_phenocam import _phenocam_summary_gcc_value # noqa: E402 MIN_GCC_POINTS = 30 SNR_THRESHOLD = 2.0 CLUSTER_RADIUS_M = 500.0 GATE_ORDER = ("phenocam", "snr", "cluster") ONE_DAY_CSV_SUFFIX = "_1day.csv" _EARTH_RADIUS_M = 6371000.0 def load_manifest(path: Path) -> dict[str, Any]: payload = json.loads(path.read_text(encoding="utf-8")) for key in ("evaluation_year", "sites_dir", "sites"): if key not in payload: raise ValueError(f"Expected '{key}' in manifest {path}") return payload def resolve_sites_dir(manifest_path: Path, manifest: dict[str, Any]) -> Path: return (manifest_path.parent / manifest["sites_dir"]).resolve() def load_site_entry(sites_dir: Path, sitename: str) -> dict[str, Any]: json_path = sites_dir / f"{sitename}.json" payload = json.loads(json_path.read_text(encoding="utf-8")) csv_path = sites_dir / f"{sitename}{ONE_DAY_CSV_SUFFIX}" payload["_one_day_csv"] = csv_path if csv_path.is_file() else None return payload def parse_gcc90_series(csv_path: Path, evaluation_year: int) -> list[tuple[str, float]]: lines = [ line for line in csv_path.read_text(encoding="utf-8").split("\n") if line and not line.startswith("#") ] reader = csv.DictReader(lines) fieldnames = reader.fieldnames or () use_mean_fallback = "gcc_90" not in fieldnames year_start = date(evaluation_year, 1, 1) year_end = date(evaluation_year, 12, 31) series: list[tuple[str, float]] = [] for row in reader: date_str = row.get("date") if not date_str: continue try: row_date = datetime.strptime(date_str, "%Y-%m-%d").date() except ValueError: continue if not (year_start <= row_date <= year_end): continue gcc = _phenocam_summary_gcc_value(row, use_mean_fallback) if gcc is None: continue series.append((row_date.isoformat(), float(gcc))) series.sort(key=lambda item: item[0]) return series def _months_covered(day_strings: list[str]) -> int: months: set[int] = set() for day in day_strings: months.add(datetime.strptime(day, "%Y-%m-%d").month) return len(months) def _aic_for_spline(x: np.ndarray, y: np.ndarray, spline: UnivariateSpline) -> float: residuals = y - spline(x) rss = float(np.sum(residuals**2)) n = len(y) if rss <= 0 or n < 4: return math.inf edf = float(spline.get_knots().shape[0] + spline.get_coeffs().shape[0]) return n * math.log(rss / n) + 2.0 * edf def compute_snr_aic_spline(series: list[tuple[str, float]]) -> float | None: if len(series) < MIN_GCC_POINTS: return None dates = [datetime.strptime(day, "%Y-%m-%d").date() for day, _ in series] x = np.array([(d - dates[0]).days for d in dates], dtype=float) y = np.array([value for _, value in series], dtype=float) if len(np.unique(x)) < 5: return None y_var = float(np.var(y)) if y_var <= 0: return None candidates = np.logspace(-4, 2, 40) * y_var * len(y) best_spline: UnivariateSpline | None = None best_aic = math.inf for smoothing in candidates: try: spline = UnivariateSpline(x, y, k=3, s=float(smoothing)) except Exception: continue aic = _aic_for_spline(x, y, spline) if aic < best_aic: best_aic = aic best_spline = spline if best_spline is None: return None residuals = y - best_spline(x) rmse = float(np.sqrt(np.mean(residuals**2))) amplitude = float(np.max(y) - np.min(y)) if rmse <= 0: return None return amplitude / rmse def screen_site( site_entry: dict[str, Any], *, evaluation_year: int, min_gcc_points: int, snr_threshold: float, ) -> dict[str, Any]: response = site_entry["response"] roi = response.get("roi") csv_path = site_entry.get("_one_day_csv") calculations: dict[str, Any] = { "evaluation_year": evaluation_year, "n_gcc_points": 0, "first_gcc_date": None, "last_gcc_date": None, "months_with_gcc": 0, "snr": None, "min_gcc_points": min_gcc_points, "snr_threshold": snr_threshold, "status": "FAIL", "failing_gate": None, "passed_gates": [], "reason": None, } if roi is None or not roi.get("one_day_summary") or csv_path is None: calculations["failing_gate"] = "phenocam" calculations["reason"] = "no_roi" return {"response": response, "calculations": calculations} series = parse_gcc90_series(csv_path, evaluation_year) calculations["n_gcc_points"] = len(series) if calculations["n_gcc_points"] == 0: calculations["failing_gate"] = "phenocam" calculations["reason"] = "no_gcc_in_year" return {"response": response, "calculations": calculations} day_strings = [day for day, _ in series] calculations["first_gcc_date"] = day_strings[0] calculations["last_gcc_date"] = day_strings[-1] calculations["months_with_gcc"] = _months_covered(day_strings) if calculations["n_gcc_points"] < min_gcc_points: calculations["failing_gate"] = "phenocam" calculations["reason"] = "insufficient_gcc_points" return {"response": response, "calculations": calculations} calculations["passed_gates"].append("phenocam") snr = compute_snr_aic_spline(series) calculations["snr"] = snr if snr is None or snr < snr_threshold: calculations["failing_gate"] = "snr" calculations["reason"] = "insufficient_snr" if snr is not None else "snr_undefined" return {"response": response, "calculations": calculations} calculations["passed_gates"].append("snr") calculations["status"] = "PASS" calculations["failing_gate"] = None calculations["reason"] = None return {"response": response, "calculations": calculations} def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: p1, p2 = math.radians(lat1), math.radians(lat2) dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = math.sin(dlat / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlon / 2) ** 2 return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a)) def _site_coords(row: dict[str, Any]) -> tuple[float, float] | None: camera = row["response"]["camera"] lat, lon = camera.get("Lat"), camera.get("Lon") if lat is None or lon is None: return None return float(lat), float(lon) def _cluster_rank(row: dict[str, Any]) -> tuple[int, float]: calc = row["calculations"] return calc["n_gcc_points"], float(calc.get("snr") or 0.0) def apply_cluster_gate(results: list[dict[str, Any]], *, radius_m: float) -> int: pool: list[tuple[int, float, float]] = [] for idx, row in enumerate(results): if "snr" not in row["calculations"]["passed_gates"]: continue coords = _site_coords(row) if coords is None: row["calculations"]["passed_gates"].append("cluster") continue pool.append((idx, coords[0], coords[1])) n = len(pool) parent = list(range(n)) def find(x: int) -> int: while parent[x] != x: parent[x] = parent[parent[x]] x = parent[x] return x def union(a: int, b: int) -> None: ra, rb = find(a), find(b) if ra != rb: parent[rb] = ra for i in range(n): _, lat1, lon1 = pool[i] for j in range(i + 1, n): _, lat2, lon2 = pool[j] if _haversine_m(lat1, lon1, lat2, lon2) <= radius_m: union(i, j) clusters: dict[int, list[int]] = {} for i in range(n): clusters.setdefault(find(i), []).append(i) demoted = 0 for members in clusters.values(): result_indices = [pool[i][0] for i in members] cluster_size = len(result_indices) winner_idx = max(result_indices, key=lambda idx: _cluster_rank(results[idx])) winner_name = str(results[winner_idx]["response"]["camera"]["Sitename"]) for idx in result_indices: calc = results[idx]["calculations"] calc["cluster_size"] = cluster_size if idx == winner_idx: calc["passed_gates"].append("cluster") else: calc["status"] = "FAIL" calc["failing_gate"] = "cluster" calc["reason"] = "nearby_duplicate" calc["cluster_winner"] = winner_name demoted += 1 return demoted def run_screening( manifest: dict[str, Any], sites_dir: Path, *, evaluation_year: int, min_gcc_points: int, snr_threshold: float, site_filter: set[str] | None = None, ) -> list[dict[str, Any]]: results: list[dict[str, Any]] = [] sitenames = manifest["sites"] if site_filter is not None: sitenames = [name for name in sitenames if name in site_filter] for index, sitename in enumerate(sitenames, start=1): print(f"[PhenoCam-2] ({index}/{len(sitenames)}) {sitename}") site_entry = load_site_entry(sites_dir, sitename) results.append( screen_site( site_entry, evaluation_year=evaluation_year, min_gcc_points=min_gcc_points, snr_threshold=snr_threshold, ) ) return results def print_summary(results: list[dict[str, Any]], evaluation_year: int) -> None: passing = [row for row in results if row["calculations"]["status"] == "PASS"] gates_label = " + ".join(GATE_ORDER) print( f"\n[PhenoCam-2] Screening for {evaluation_year}: " f"{len(passing)}/{len(results)} pass ({gates_label})" ) for gate in GATE_ORDER: fails = sum(1 for row in results if row["calculations"]["failing_gate"] == gate) after = sum(1 for row in results if gate in row["calculations"]["passed_gates"]) print(f" after_{gate}: {after}, fail_at_{gate}: {fails}") print("\nPer-site table") print( f"{'site':<24} {'n':>4} {'mon':>3} {'snr':>6} " f"{'status':>6} gate reason" ) print("-" * 72) for row in sorted( results, key=lambda item: str(item["response"]["camera"]["Sitename"]), ): camera = row["response"]["camera"] calc = row["calculations"] snr_text = f"{calc['snr']:.2f}" if calc["snr"] is not None else "" print( f"{camera['Sitename']:<24} {calc['n_gcc_points']:4d} " f"{calc['months_with_gcc']:3d} {snr_text:>6} " f"{calc['status']:>6} {(calc['failing_gate'] or '-'):<8} " f"{calc['reason'] or '-'}" ) def write_screening_json( results: list[dict[str, Any]], output_path: Path, evaluation_year: int, ) -> None: passing = [row for row in results if row["calculations"]["status"] == "PASS"] payload = { "evaluation_year": evaluation_year, "count": len(results), "qualifying_count": len(passing), "sites": sorted( results, key=lambda item: str(item["response"]["camera"]["Sitename"]), ), } output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") print(f"[PhenoCam-2] Wrote {output_path}") def write_screening_csv(results: list[dict[str, Any]], output_path: Path) -> None: rows: list[dict[str, Any]] = [] for row in results: camera = row["response"]["camera"] metadata = camera.get("sitemetadata") or {} roi = row["response"].get("roi") or {} calc = row["calculations"] rows.append( { "Sitename": camera.get("Sitename"), "Lat": camera.get("Lat"), "Lon": camera.get("Lon"), "site_description": metadata.get("site_description"), "primary_veg_type": metadata.get("primary_veg_type"), "site_type": metadata.get("site_type"), "one_day_summary": roi.get("one_day_summary"), **calc, } ) fieldnames = list(rows[0].keys()) if rows else ["Sitename", "status"] if rows: extra = [k for row in rows for k in row if k not in fieldnames] fieldnames.extend(dict.fromkeys(extra)) output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8", newline="") as handle: writer = csv.DictWriter(handle, fieldnames=fieldnames) writer.writeheader() writer.writerows(rows) print(f"[PhenoCam-2] Wrote {output_path}") def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument( "--evaluation-year", type=int, default=2025, help="Evaluation year (default: 2025)", ) parser.add_argument( "--sites", type=str, default=None, help="Comma-separated sitenames (default: all sites in step-1 manifest)", ) parser.add_argument( "--min-gcc-points", type=int, default=MIN_GCC_POINTS, help=f"Minimum valid gcc_90 observations in-year (default: {MIN_GCC_POINTS})", ) parser.add_argument( "--snr-threshold", type=float, default=SNR_THRESHOLD, help=f"Minimum AIC-spline SNR (default: {SNR_THRESHOLD})", ) parser.add_argument( "--output-json", type=Path, default=None, help="Screening output (default: data/phenocam_screening/{year}.json)", ) parser.add_argument( "--output-csv", type=Path, default=None, help="Flat CSV summary path", ) parser.add_argument( "--cluster-radius-m", type=float, default=CLUSTER_RADIUS_M, help=f"Deduplicate SNR-passed sites within this radius (default: {CLUSTER_RADIUS_M})", ) parser.add_argument( "--no-cluster", action="store_true", help="Skip nearby-site deduplication gate", ) args = parser.parse_args(argv) evaluation_year = args.evaluation_year manifest_path = Path("data") / "phenocam" / f"{evaluation_year}.json" if not manifest_path.is_file(): raise SystemExit(f"Step-1 manifest not found: {manifest_path}") site_filter = None if args.sites: site_filter = {name.strip() for name in args.sites.split(",") if name.strip()} manifest = load_manifest(manifest_path) sites_dir_path = resolve_sites_dir(manifest_path, manifest) results = run_screening( manifest, sites_dir_path, evaluation_year=evaluation_year, min_gcc_points=args.min_gcc_points, snr_threshold=args.snr_threshold, site_filter=site_filter, ) if not args.no_cluster: demoted = apply_cluster_gate(results, radius_m=args.cluster_radius_m) if demoted: print(f"[PhenoCam-2] Cluster dedup: demoted {demoted} nearby duplicate(s)") print_summary(results, evaluation_year) default_dir = Path("data") / "phenocam_screening" json_name = f"{evaluation_year}.json" csv_name = f"{evaluation_year}.csv" write_screening_json( results, args.output_json or (default_dir / json_name), evaluation_year, ) write_screening_csv(results, args.output_csv or (default_dir / csv_name)) return 0 if __name__ == "__main__": raise SystemExit(main())