diff --git a/acquisition_phenocam.py b/acquisition_phenocam.py index ff4ba9c..5e782a7 100644 --- a/acquisition_phenocam.py +++ b/acquisition_phenocam.py @@ -276,3 +276,7 @@ def _download_phenocam_gcc(season, site_position, site_name, date_range=None): print( f"[PhenoCam-GI] Saved: {json_path} and {csv_path} ({len(timeseries)} entries)" ) + + from phenocam_snr import write_phenocam_snr + + write_phenocam_snr(site_name, season, base=Path("data")) diff --git a/metrics_stats.py b/metrics_stats.py index 1e49e77..530652e 100644 --- a/metrics_stats.py +++ b/metrics_stats.py @@ -309,6 +309,25 @@ def calculate_all_metrics(season, site_name, site_position): if phenocam_stats: results["phenocam_stats"] = phenocam_stats + from phenocam_snr import compute_snr, load_phenocam_snr, write_phenocam_snr + + snr_info = load_phenocam_snr(site_name, season, base=Path("data")) + if not snr_info: + write_phenocam_snr( + site_name, season, base=Path("data"), metrics=results, fetch_if_missing=True + ) + snr_info = load_phenocam_snr(site_name, season, base=Path("data")) + if not snr_info: + snr_info = compute_snr( + site_name, season, base=Path("data"), metrics=results, fetch_if_missing=True + ) + if snr_info.get("snr") is not None: + results["phenocam_snr"] = { + "amplitude": snr_info.get("amplitude"), + "spline_rmse_gcc90": snr_info.get("spline_rmse_gcc90"), + "snr": snr_info.get("snr"), + } + baseline = {} all_gcc, flags = _s2_gcc_series_from_preselection(base) if all_gcc: diff --git a/phenocam_snr.py b/phenocam_snr.py new file mode 100644 index 0000000..d333f00 --- /dev/null +++ b/phenocam_snr.py @@ -0,0 +1,328 @@ +"""PhenoCam signal-to-noise ratio for aggregate utility eligibility (Richardson et al., 2018).""" + +from __future__ import annotations + +import json +import re +from pathlib import Path + +import requests + +PHENOCAM_API = "https://phenocam.nau.edu/api" +SPLINE_RMSE_RE = re.compile( + r"^\s*#\s*Spline\s+RMSE\s+gcc_90\s*:\s*([0-9.eE+-]+)\s*$", + re.IGNORECASE, +) + +PRIMARY_SEASON: dict[str, int] = { + "forthgr": 2024, + "innsbruck": 2024, + "pitsalu": 2024, + "vindeln2": 2023, + "sunflowerjerez1": 2024, + "institutekarnobat": 2024, +} + +# PhenoCam ROI type codes for archive URLs (first ROI used by acquisition when multiple exist). +SITE_ROITYPE: dict[str, str] = { + "forthgr": "AG", + "innsbruck": "GR", + "pitsalu": "WL", + "vindeln2": "MX", + "sunflowerjerez1": "AG", + "institutekarnobat": "AG", +} + +PHENOCAM_ARCHIVE = "https://phenocam.nau.edu/data/archive" + + +def phenocam_snr_path(site_name: str, season: int, base: Path | None = None) -> Path: + root = base or Path("data") + return root / site_name / str(season) / "raw" / "phenocam" / "phenocam_snr.json" + + +def parse_spline_rmse_gcc90(text: str) -> float | None: + """Parse ``# Spline RMSE gcc_90: `` from transition-dates CSV header.""" + for line in text.splitlines(): + m = SPLINE_RMSE_RE.match(line) + if m: + try: + return float(m.group(1)) + except ValueError: + return None + return None + + +def transition_dates_archive_url(site_name: str, roitype: str, seq: int = 1000) -> str: + return ( + f"{PHENOCAM_ARCHIVE}/{site_name}/ROI/" + f"{site_name}_{roitype}_{seq}_1day_transition_dates.csv" + ) + + +def transition_dates_url(site_name: str) -> str | None: + """Return ``one_day_transition_dates`` URL for the site's primary ROI.""" + roitype = SITE_ROITYPE.get(site_name) + if roitype: + for seq in (1000, 2000, 1001): + url = transition_dates_archive_url(site_name, roitype, seq) + try: + r = requests.head(url, timeout=15, allow_redirects=True) + if r.status_code == 200: + return url + except requests.RequestException: + continue + try: + url = f"{PHENOCAM_API}/roilists/" + params: dict | None = {"site": site_name} + while url: + r = requests.get(url, params=params, timeout=30) + r.raise_for_status() + data = r.json() + for roi in data.get("results", []): + if roi.get("site") == site_name: + td = roi.get("one_day_transition_dates") + if td: + return td + url = data.get("next") + params = None + except requests.RequestException: + pass + return None + + +def fetch_spline_rmse_from_archive(site_name: str) -> float | None: + """Fetch spline RMSE via PhenoCam archive URL (fast path).""" + roitype = SITE_ROITYPE.get(site_name) + if not roitype: + return None + for seq in (1000, 2000, 1001): + url = transition_dates_archive_url(site_name, roitype, seq) + try: + r = requests.get(url, timeout=20) + if r.status_code != 200: + continue + rmse = parse_spline_rmse_gcc90(r.text) + if rmse is not None: + return rmse + except requests.RequestException: + continue + return None + + +def fetch_spline_rmse_gcc90(site_name: str) -> float | None: + """Download transition-dates file header and return spline RMSE for gcc_90.""" + rmse = fetch_spline_rmse_from_archive(site_name) + if rmse is not None: + return rmse + td_url = transition_dates_url(site_name) + if not td_url: + return None + try: + r = requests.get(td_url, timeout=30) + r.raise_for_status() + return parse_spline_rmse_gcc90(r.text) + except requests.RequestException: + return None + + +def season_amplitude( + site_name: str, + season: int, + *, + base: Path | None = None, + metrics: dict | None = None, +) -> float | None: + """Seasonal amplitude max(gcc_90) - min(gcc_90) over the evaluation season.""" + if metrics: + ps = metrics.get("phenocam_stats") or {} + mn, mx = ps.get("min"), ps.get("max") + if isinstance(mn, (int, float)) and isinstance(mx, (int, float)): + return float(mx - mn) + + root = base or Path("data") + p = root / site_name / str(season) / "raw" / "phenocam" / "phenocam_gcc.json" + if not p.is_file(): + return None + data = json.loads(p.read_text(encoding="utf-8")) + if isinstance(data, list): + vals = [ + it.get("greenness_index") + for it in data + if isinstance(it.get("greenness_index"), (int, float)) + ] + elif isinstance(data, dict): + vals = [v for v in data.values() if isinstance(v, (int, float))] + else: + return None + if not vals: + return None + return float(max(vals) - min(vals)) + + +def compute_snr( + site_name: str, + season: int, + *, + base: Path | None = None, + metrics: dict | None = None, + spline_rmse: float | None = None, + fetch_if_missing: bool = True, +) -> dict: + """Return amplitude, spline RMSE, and SNR; may fetch RMSE from PhenoCam API.""" + root = base or Path("data") + amp = season_amplitude(site_name, season, base=root, metrics=metrics) + rmse = spline_rmse + if rmse is None: + sidecar = phenocam_snr_path(site_name, season, root) + if sidecar.is_file(): + cached = json.loads(sidecar.read_text(encoding="utf-8")) + rmse = cached.get("spline_rmse_gcc90") + elif fetch_if_missing: + rmse = fetch_spline_rmse_gcc90(site_name) + snr = None + if isinstance(amp, (int, float)) and isinstance(rmse, (int, float)) and rmse > 0: + snr = float(amp) / float(rmse) + return { + "site": site_name, + "season": season, + "amplitude": amp, + "spline_rmse_gcc90": rmse, + "snr": snr, + } + + +def write_phenocam_snr( + site_name: str, + season: int, + *, + base: Path | None = None, + metrics: dict | None = None, + fetch_if_missing: bool = True, +) -> Path | None: + """Compute SNR and write ``phenocam_snr.json``; returns path or None on failure.""" + root = base or Path("data") + info = compute_snr( + site_name, + season, + base=root, + metrics=metrics, + fetch_if_missing=fetch_if_missing, + ) + if info.get("spline_rmse_gcc90") is None: + print( + f"[PhenoCam-SNR] Warning: no spline RMSE for {site_name} {season}; " + "skipping phenocam_snr.json" + ) + return None + out = phenocam_snr_path(site_name, season, root) + out.parent.mkdir(parents=True, exist_ok=True) + td_url = transition_dates_url(site_name) + payload = { + "site": site_name, + "season": season, + "amplitude": info.get("amplitude"), + "spline_rmse_gcc90": info.get("spline_rmse_gcc90"), + "snr": info.get("snr"), + "source": "phenocam_1day_transition_dates_header", + "transition_dates_url": td_url, + "roitype": SITE_ROITYPE.get(site_name), + } + out.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + print(f"[PhenoCam-SNR] Saved: {out} (SNR={info.get('snr')})") + return out + + +def load_phenocam_snr( + site_name: str, season: int, *, base: Path | None = None +) -> dict | None: + """Load cached SNR sidecar if present.""" + p = phenocam_snr_path(site_name, season, base) + if not p.is_file(): + return None + return json.loads(p.read_text(encoding="utf-8")) + + +def suggest_snr_threshold(snrs: list[float]) -> tuple[float, str]: + """ + Choose eligibility threshold from cross-site SNR distribution. + + Returns (threshold, rationale). Uses a distribution-based split only when it + separates a low-SNR group (max below 2) from a high-SNR group (min at or above 2). + Otherwise defaults to SNR >= 2. + """ + if not snrs: + return 2.0, "default SNR >= 2 (no site SNR values available)" + sorted_snrs = sorted(snrs) + if len(sorted_snrs) == 1: + return 2.0, "default SNR >= 2 (single site only)" + if all(s >= 2.0 for s in sorted_snrs): + return 2.0, "default SNR >= 2 (all sites exceed 2; no low-SNR exclusion group)" + + for i in range(1, len(sorted_snrs)): + low, high = sorted_snrs[:i], sorted_snrs[i:] + if not low or not high: + continue + gap = high[0] - low[-1] + if gap >= 0.5 and low[-1] < 2.0 <= high[0]: + threshold = (low[-1] + high[0]) / 2.0 + return ( + round(threshold, 3), + f"gap between {low[-1]:.3f} and {high[0]:.3f} straddles SNR=2 " + f"(midpoint {threshold:.3f})", + ) + return 2.0, "default SNR >= 2 (no clear low/high cluster separation)" + + +def report_all_sites( + *, + base: Path | None = None, + sites: dict[str, int] | None = None, + fetch_if_missing: bool = True, +) -> list[dict]: + """Compute SNR for all primary-season sites; print table and return rows.""" + root = base or Path("data") + site_seasons = sites or PRIMARY_SEASON + rows: list[dict] = [] + for site in sorted(site_seasons.keys()): + season = site_seasons[site] + metrics_path = root / site / str(season) / "metrics.json" + metrics = None + if metrics_path.is_file(): + metrics = json.loads(metrics_path.read_text(encoding="utf-8")) + info = compute_snr( + site, + season, + base=root, + metrics=metrics, + fetch_if_missing=fetch_if_missing, + ) + rows.append(info) + + print(f"{'site':<20} {'season':>6} {'amplitude':>10} {'rmse_spl':>10} {'SNR':>8}") + print("-" * 58) + for r in rows: + amp = r.get("amplitude") + rmse = r.get("spline_rmse_gcc90") + snr = r.get("snr") + print( + f"{r['site']:<20} {r['season']:>6} " + f"{amp if amp is not None else '---':>10} " + f"{rmse if rmse is not None else '---':>10} " + f"{snr if snr is not None else '---':>8}" + ) + + valid_snrs = [r["snr"] for r in rows if isinstance(r.get("snr"), (int, float))] + threshold, rationale = suggest_snr_threshold(valid_snrs) + print(f"\nSuggested threshold: SNR >= {threshold} ({rationale})") + for r in rows: + snr = r.get("snr") + if isinstance(snr, (int, float)): + r["eligible_at_2"] = snr >= 2.0 + r["eligible_at_3"] = snr >= 3.0 + r["eligible_at_suggested"] = snr >= threshold + return rows + + +if __name__ == "__main__": + report_all_sites()