"""PhenoCam signal-to-noise ratio for aggregate utility eligibility (Richardson et al., 2018).""" from __future__ import annotations import json import re from pathlib import Path import requests PHENOCAM_API = "https://phenocam.nau.edu/api" SPLINE_RMSE_RE = re.compile( r"^\s*#\s*Spline\s+RMSE\s+gcc_90\s*:\s*([0-9.eE+-]+)\s*$", re.IGNORECASE, ) PRIMARY_SEASON: dict[str, int] = { "forthgr": 2024, "innsbruck": 2024, "pitsalu": 2024, "vindeln2": 2023, "sunflowerjerez1": 2024, "institutekarnobat": 2024, } # PhenoCam ROI type codes for archive URLs (first ROI used by acquisition when multiple exist). SITE_ROITYPE: dict[str, str] = { "forthgr": "AG", "innsbruck": "GR", "pitsalu": "WL", "vindeln2": "MX", "sunflowerjerez1": "AG", "institutekarnobat": "AG", } PHENOCAM_ARCHIVE = "https://phenocam.nau.edu/data/archive" def phenocam_snr_path(site_name: str, season: int, base: Path | None = None) -> Path: root = base or Path("data") return root / site_name / str(season) / "raw" / "phenocam" / "phenocam_snr.json" def parse_spline_rmse_gcc90(text: str) -> float | None: """Parse ``# Spline RMSE gcc_90: `` from transition-dates CSV header.""" for line in text.splitlines(): m = SPLINE_RMSE_RE.match(line) if m: try: return float(m.group(1)) except ValueError: return None return None def transition_dates_archive_url(site_name: str, roitype: str, seq: int = 1000) -> str: return ( f"{PHENOCAM_ARCHIVE}/{site_name}/ROI/" f"{site_name}_{roitype}_{seq}_1day_transition_dates.csv" ) def transition_dates_url(site_name: str) -> str | None: """Return ``one_day_transition_dates`` URL for the site's primary ROI.""" roitype = SITE_ROITYPE.get(site_name) if roitype: for seq in (1000, 2000, 1001): url = transition_dates_archive_url(site_name, roitype, seq) try: r = requests.head(url, timeout=15, allow_redirects=True) if r.status_code == 200: return url except requests.RequestException: continue try: url = f"{PHENOCAM_API}/roilists/" params: dict | None = {"site": site_name} while url: r = requests.get(url, params=params, timeout=30) r.raise_for_status() data = r.json() for roi in data.get("results", []): if roi.get("site") == site_name: td = roi.get("one_day_transition_dates") if td: return td url = data.get("next") params = None except requests.RequestException: pass return None def fetch_spline_rmse_from_archive(site_name: str) -> float | None: """Fetch spline RMSE via PhenoCam archive URL (fast path).""" roitype = SITE_ROITYPE.get(site_name) if not roitype: return None for seq in (1000, 2000, 1001): url = transition_dates_archive_url(site_name, roitype, seq) try: r = requests.get(url, timeout=20) if r.status_code != 200: continue rmse = parse_spline_rmse_gcc90(r.text) if rmse is not None: return rmse except requests.RequestException: continue return None def fetch_spline_rmse_gcc90(site_name: str) -> float | None: """Download transition-dates file header and return spline RMSE for gcc_90.""" rmse = fetch_spline_rmse_from_archive(site_name) if rmse is not None: return rmse td_url = transition_dates_url(site_name) if not td_url: return None try: r = requests.get(td_url, timeout=30) r.raise_for_status() return parse_spline_rmse_gcc90(r.text) except requests.RequestException: return None def season_amplitude( site_name: str, season: int, *, base: Path | None = None, metrics: dict | None = None, ) -> float | None: """Seasonal amplitude max(gcc_90) - min(gcc_90) over the evaluation season.""" if metrics: ps = metrics.get("phenocam_stats") or {} mn, mx = ps.get("min"), ps.get("max") if isinstance(mn, (int, float)) and isinstance(mx, (int, float)): return float(mx - mn) root = base or Path("data") p = root / site_name / str(season) / "raw" / "phenocam" / "phenocam_gcc.json" if not p.is_file(): return None data = json.loads(p.read_text(encoding="utf-8")) if isinstance(data, list): vals = [ it.get("greenness_index") for it in data if isinstance(it.get("greenness_index"), (int, float)) ] elif isinstance(data, dict): vals = [v for v in data.values() if isinstance(v, (int, float))] else: return None if not vals: return None return float(max(vals) - min(vals)) def compute_snr( site_name: str, season: int, *, base: Path | None = None, metrics: dict | None = None, spline_rmse: float | None = None, fetch_if_missing: bool = True, ) -> dict: """Return amplitude, spline RMSE, and SNR; may fetch RMSE from PhenoCam API.""" root = base or Path("data") amp = season_amplitude(site_name, season, base=root, metrics=metrics) rmse = spline_rmse if rmse is None: sidecar = phenocam_snr_path(site_name, season, root) if sidecar.is_file(): cached = json.loads(sidecar.read_text(encoding="utf-8")) rmse = cached.get("spline_rmse_gcc90") elif fetch_if_missing: rmse = fetch_spline_rmse_gcc90(site_name) snr = None if isinstance(amp, (int, float)) and isinstance(rmse, (int, float)) and rmse > 0: snr = float(amp) / float(rmse) return { "site": site_name, "season": season, "amplitude": amp, "spline_rmse_gcc90": rmse, "snr": snr, } def write_phenocam_snr( site_name: str, season: int, *, base: Path | None = None, metrics: dict | None = None, fetch_if_missing: bool = True, ) -> Path | None: """Compute SNR and write ``phenocam_snr.json``; returns path or None on failure.""" root = base or Path("data") info = compute_snr( site_name, season, base=root, metrics=metrics, fetch_if_missing=fetch_if_missing, ) if info.get("spline_rmse_gcc90") is None: print( f"[PhenoCam-SNR] Warning: no spline RMSE for {site_name} {season}; " "skipping phenocam_snr.json" ) return None out = phenocam_snr_path(site_name, season, root) out.parent.mkdir(parents=True, exist_ok=True) td_url = transition_dates_url(site_name) payload = { "site": site_name, "season": season, "amplitude": info.get("amplitude"), "spline_rmse_gcc90": info.get("spline_rmse_gcc90"), "snr": info.get("snr"), "source": "phenocam_1day_transition_dates_header", "transition_dates_url": td_url, "roitype": SITE_ROITYPE.get(site_name), } out.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") print(f"[PhenoCam-SNR] Saved: {out} (SNR={info.get('snr')})") return out def load_phenocam_snr( site_name: str, season: int, *, base: Path | None = None ) -> dict | None: """Load cached SNR sidecar if present.""" p = phenocam_snr_path(site_name, season, base) if not p.is_file(): return None return json.loads(p.read_text(encoding="utf-8")) def suggest_snr_threshold(snrs: list[float]) -> tuple[float, str]: """ Choose eligibility threshold from cross-site SNR distribution. Returns (threshold, rationale). Uses a distribution-based split only when it separates a low-SNR group (max below 2) from a high-SNR group (min at or above 2). Otherwise defaults to SNR >= 2. """ if not snrs: return 2.0, "default SNR >= 2 (no site SNR values available)" sorted_snrs = sorted(snrs) if len(sorted_snrs) == 1: return 2.0, "default SNR >= 2 (single site only)" if all(s >= 2.0 for s in sorted_snrs): return 2.0, "default SNR >= 2 (all sites exceed 2; no low-SNR exclusion group)" for i in range(1, len(sorted_snrs)): low, high = sorted_snrs[:i], sorted_snrs[i:] if not low or not high: continue gap = high[0] - low[-1] if gap >= 0.5 and low[-1] < 2.0 <= high[0]: threshold = (low[-1] + high[0]) / 2.0 return ( round(threshold, 3), f"gap between {low[-1]:.3f} and {high[0]:.3f} straddles SNR=2 " f"(midpoint {threshold:.3f})", ) return 2.0, "default SNR >= 2 (no clear low/high cluster separation)" def report_all_sites( *, base: Path | None = None, sites: dict[str, int] | None = None, fetch_if_missing: bool = True, ) -> list[dict]: """Compute SNR for all primary-season sites; print table and return rows.""" root = base or Path("data") site_seasons = sites or PRIMARY_SEASON rows: list[dict] = [] for site in sorted(site_seasons.keys()): season = site_seasons[site] metrics_path = root / site / str(season) / "metrics.json" metrics = None if metrics_path.is_file(): metrics = json.loads(metrics_path.read_text(encoding="utf-8")) info = compute_snr( site, season, base=root, metrics=metrics, fetch_if_missing=fetch_if_missing, ) rows.append(info) print(f"{'site':<20} {'season':>6} {'amplitude':>10} {'rmse_spl':>10} {'SNR':>8}") print("-" * 58) for r in rows: amp = r.get("amplitude") rmse = r.get("spline_rmse_gcc90") snr = r.get("snr") print( f"{r['site']:<20} {r['season']:>6} " f"{amp if amp is not None else '---':>10} " f"{rmse if rmse is not None else '---':>10} " f"{snr if snr is not None else '---':>8}" ) valid_snrs = [r["snr"] for r in rows if isinstance(r.get("snr"), (int, float))] threshold, rationale = suggest_snr_threshold(valid_snrs) print(f"\nSuggested threshold: SNR >= {threshold} ({rationale})") for r in rows: snr = r.get("snr") if isinstance(snr, (int, float)): r["eligible_at_2"] = snr >= 2.0 r["eligible_at_3"] = snr >= 3.0 r["eligible_at_suggested"] = snr >= threshold return rows if __name__ == "__main__": report_all_sites()