328 lines
10 KiB
Python
328 lines
10 KiB
Python
"""PhenoCam signal-to-noise ratio for aggregate utility eligibility (Richardson et al., 2018)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
|
|
PHENOCAM_API = "https://phenocam.nau.edu/api"
|
|
SPLINE_RMSE_RE = re.compile(
|
|
r"^\s*#\s*Spline\s+RMSE\s+gcc_90\s*:\s*([0-9.eE+-]+)\s*$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
PRIMARY_SEASON: dict[str, int] = {
|
|
"forthgr": 2024,
|
|
"innsbruck": 2024,
|
|
"pitsalu": 2024,
|
|
"vindeln2": 2023,
|
|
"sunflowerjerez1": 2024,
|
|
"institutekarnobat": 2024,
|
|
}
|
|
|
|
# PhenoCam ROI type codes for archive URLs (first ROI used by acquisition when multiple exist).
|
|
SITE_ROITYPE: dict[str, str] = {
|
|
"forthgr": "AG",
|
|
"innsbruck": "GR",
|
|
"pitsalu": "WL",
|
|
"vindeln2": "MX",
|
|
"sunflowerjerez1": "AG",
|
|
"institutekarnobat": "AG",
|
|
}
|
|
|
|
PHENOCAM_ARCHIVE = "https://phenocam.nau.edu/data/archive"
|
|
|
|
|
|
def phenocam_snr_path(site_name: str, season: int, base: Path | None = None) -> Path:
|
|
root = base or Path("data")
|
|
return root / site_name / str(season) / "raw" / "phenocam" / "phenocam_snr.json"
|
|
|
|
|
|
def parse_spline_rmse_gcc90(text: str) -> float | None:
|
|
"""Parse ``# Spline RMSE gcc_90: <value>`` from transition-dates CSV header."""
|
|
for line in text.splitlines():
|
|
m = SPLINE_RMSE_RE.match(line)
|
|
if m:
|
|
try:
|
|
return float(m.group(1))
|
|
except ValueError:
|
|
return None
|
|
return None
|
|
|
|
|
|
def transition_dates_archive_url(site_name: str, roitype: str, seq: int = 1000) -> str:
|
|
return (
|
|
f"{PHENOCAM_ARCHIVE}/{site_name}/ROI/"
|
|
f"{site_name}_{roitype}_{seq}_1day_transition_dates.csv"
|
|
)
|
|
|
|
|
|
def transition_dates_url(site_name: str) -> str | None:
|
|
"""Return ``one_day_transition_dates`` URL for the site's primary ROI."""
|
|
roitype = SITE_ROITYPE.get(site_name)
|
|
if roitype:
|
|
for seq in (1000, 2000, 1001):
|
|
url = transition_dates_archive_url(site_name, roitype, seq)
|
|
try:
|
|
r = requests.head(url, timeout=15, allow_redirects=True)
|
|
if r.status_code == 200:
|
|
return url
|
|
except requests.RequestException:
|
|
continue
|
|
try:
|
|
url = f"{PHENOCAM_API}/roilists/"
|
|
params: dict | None = {"site": site_name}
|
|
while url:
|
|
r = requests.get(url, params=params, timeout=30)
|
|
r.raise_for_status()
|
|
data = r.json()
|
|
for roi in data.get("results", []):
|
|
if roi.get("site") == site_name:
|
|
td = roi.get("one_day_transition_dates")
|
|
if td:
|
|
return td
|
|
url = data.get("next")
|
|
params = None
|
|
except requests.RequestException:
|
|
pass
|
|
return None
|
|
|
|
|
|
def fetch_spline_rmse_from_archive(site_name: str) -> float | None:
|
|
"""Fetch spline RMSE via PhenoCam archive URL (fast path)."""
|
|
roitype = SITE_ROITYPE.get(site_name)
|
|
if not roitype:
|
|
return None
|
|
for seq in (1000, 2000, 1001):
|
|
url = transition_dates_archive_url(site_name, roitype, seq)
|
|
try:
|
|
r = requests.get(url, timeout=20)
|
|
if r.status_code != 200:
|
|
continue
|
|
rmse = parse_spline_rmse_gcc90(r.text)
|
|
if rmse is not None:
|
|
return rmse
|
|
except requests.RequestException:
|
|
continue
|
|
return None
|
|
|
|
|
|
def fetch_spline_rmse_gcc90(site_name: str) -> float | None:
|
|
"""Download transition-dates file header and return spline RMSE for gcc_90."""
|
|
rmse = fetch_spline_rmse_from_archive(site_name)
|
|
if rmse is not None:
|
|
return rmse
|
|
td_url = transition_dates_url(site_name)
|
|
if not td_url:
|
|
return None
|
|
try:
|
|
r = requests.get(td_url, timeout=30)
|
|
r.raise_for_status()
|
|
return parse_spline_rmse_gcc90(r.text)
|
|
except requests.RequestException:
|
|
return None
|
|
|
|
|
|
def season_amplitude(
|
|
site_name: str,
|
|
season: int,
|
|
*,
|
|
base: Path | None = None,
|
|
metrics: dict | None = None,
|
|
) -> float | None:
|
|
"""Seasonal amplitude max(gcc_90) - min(gcc_90) over the evaluation season."""
|
|
if metrics:
|
|
ps = metrics.get("phenocam_stats") or {}
|
|
mn, mx = ps.get("min"), ps.get("max")
|
|
if isinstance(mn, (int, float)) and isinstance(mx, (int, float)):
|
|
return float(mx - mn)
|
|
|
|
root = base or Path("data")
|
|
p = root / site_name / str(season) / "raw" / "phenocam" / "phenocam_gcc.json"
|
|
if not p.is_file():
|
|
return None
|
|
data = json.loads(p.read_text(encoding="utf-8"))
|
|
if isinstance(data, list):
|
|
vals = [
|
|
it.get("greenness_index")
|
|
for it in data
|
|
if isinstance(it.get("greenness_index"), (int, float))
|
|
]
|
|
elif isinstance(data, dict):
|
|
vals = [v for v in data.values() if isinstance(v, (int, float))]
|
|
else:
|
|
return None
|
|
if not vals:
|
|
return None
|
|
return float(max(vals) - min(vals))
|
|
|
|
|
|
def compute_snr(
|
|
site_name: str,
|
|
season: int,
|
|
*,
|
|
base: Path | None = None,
|
|
metrics: dict | None = None,
|
|
spline_rmse: float | None = None,
|
|
fetch_if_missing: bool = True,
|
|
) -> dict:
|
|
"""Return amplitude, spline RMSE, and SNR; may fetch RMSE from PhenoCam API."""
|
|
root = base or Path("data")
|
|
amp = season_amplitude(site_name, season, base=root, metrics=metrics)
|
|
rmse = spline_rmse
|
|
if rmse is None:
|
|
sidecar = phenocam_snr_path(site_name, season, root)
|
|
if sidecar.is_file():
|
|
cached = json.loads(sidecar.read_text(encoding="utf-8"))
|
|
rmse = cached.get("spline_rmse_gcc90")
|
|
elif fetch_if_missing:
|
|
rmse = fetch_spline_rmse_gcc90(site_name)
|
|
snr = None
|
|
if isinstance(amp, (int, float)) and isinstance(rmse, (int, float)) and rmse > 0:
|
|
snr = float(amp) / float(rmse)
|
|
return {
|
|
"site": site_name,
|
|
"season": season,
|
|
"amplitude": amp,
|
|
"spline_rmse_gcc90": rmse,
|
|
"snr": snr,
|
|
}
|
|
|
|
|
|
def write_phenocam_snr(
|
|
site_name: str,
|
|
season: int,
|
|
*,
|
|
base: Path | None = None,
|
|
metrics: dict | None = None,
|
|
fetch_if_missing: bool = True,
|
|
) -> Path | None:
|
|
"""Compute SNR and write ``phenocam_snr.json``; returns path or None on failure."""
|
|
root = base or Path("data")
|
|
info = compute_snr(
|
|
site_name,
|
|
season,
|
|
base=root,
|
|
metrics=metrics,
|
|
fetch_if_missing=fetch_if_missing,
|
|
)
|
|
if info.get("spline_rmse_gcc90") is None:
|
|
print(
|
|
f"[PhenoCam-SNR] Warning: no spline RMSE for {site_name} {season}; "
|
|
"skipping phenocam_snr.json"
|
|
)
|
|
return None
|
|
out = phenocam_snr_path(site_name, season, root)
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
td_url = transition_dates_url(site_name)
|
|
payload = {
|
|
"site": site_name,
|
|
"season": season,
|
|
"amplitude": info.get("amplitude"),
|
|
"spline_rmse_gcc90": info.get("spline_rmse_gcc90"),
|
|
"snr": info.get("snr"),
|
|
"source": "phenocam_1day_transition_dates_header",
|
|
"transition_dates_url": td_url,
|
|
"roitype": SITE_ROITYPE.get(site_name),
|
|
}
|
|
out.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
|
|
print(f"[PhenoCam-SNR] Saved: {out} (SNR={info.get('snr')})")
|
|
return out
|
|
|
|
|
|
def load_phenocam_snr(
|
|
site_name: str, season: int, *, base: Path | None = None
|
|
) -> dict | None:
|
|
"""Load cached SNR sidecar if present."""
|
|
p = phenocam_snr_path(site_name, season, base)
|
|
if not p.is_file():
|
|
return None
|
|
return json.loads(p.read_text(encoding="utf-8"))
|
|
|
|
|
|
def suggest_snr_threshold(snrs: list[float]) -> tuple[float, str]:
|
|
"""
|
|
Choose eligibility threshold from cross-site SNR distribution.
|
|
|
|
Returns (threshold, rationale). Uses a distribution-based split only when it
|
|
separates a low-SNR group (max below 2) from a high-SNR group (min at or above 2).
|
|
Otherwise defaults to SNR >= 2.
|
|
"""
|
|
if not snrs:
|
|
return 2.0, "default SNR >= 2 (no site SNR values available)"
|
|
sorted_snrs = sorted(snrs)
|
|
if len(sorted_snrs) == 1:
|
|
return 2.0, "default SNR >= 2 (single site only)"
|
|
if all(s >= 2.0 for s in sorted_snrs):
|
|
return 2.0, "default SNR >= 2 (all sites exceed 2; no low-SNR exclusion group)"
|
|
|
|
for i in range(1, len(sorted_snrs)):
|
|
low, high = sorted_snrs[:i], sorted_snrs[i:]
|
|
if not low or not high:
|
|
continue
|
|
gap = high[0] - low[-1]
|
|
if gap >= 0.5 and low[-1] < 2.0 <= high[0]:
|
|
threshold = (low[-1] + high[0]) / 2.0
|
|
return (
|
|
round(threshold, 3),
|
|
f"gap between {low[-1]:.3f} and {high[0]:.3f} straddles SNR=2 "
|
|
f"(midpoint {threshold:.3f})",
|
|
)
|
|
return 2.0, "default SNR >= 2 (no clear low/high cluster separation)"
|
|
|
|
|
|
def report_all_sites(
|
|
*,
|
|
base: Path | None = None,
|
|
sites: dict[str, int] | None = None,
|
|
fetch_if_missing: bool = True,
|
|
) -> list[dict]:
|
|
"""Compute SNR for all primary-season sites; print table and return rows."""
|
|
root = base or Path("data")
|
|
site_seasons = sites or PRIMARY_SEASON
|
|
rows: list[dict] = []
|
|
for site in sorted(site_seasons.keys()):
|
|
season = site_seasons[site]
|
|
metrics_path = root / site / str(season) / "metrics.json"
|
|
metrics = None
|
|
if metrics_path.is_file():
|
|
metrics = json.loads(metrics_path.read_text(encoding="utf-8"))
|
|
info = compute_snr(
|
|
site,
|
|
season,
|
|
base=root,
|
|
metrics=metrics,
|
|
fetch_if_missing=fetch_if_missing,
|
|
)
|
|
rows.append(info)
|
|
|
|
print(f"{'site':<20} {'season':>6} {'amplitude':>10} {'rmse_spl':>10} {'SNR':>8}")
|
|
print("-" * 58)
|
|
for r in rows:
|
|
amp = r.get("amplitude")
|
|
rmse = r.get("spline_rmse_gcc90")
|
|
snr = r.get("snr")
|
|
print(
|
|
f"{r['site']:<20} {r['season']:>6} "
|
|
f"{amp if amp is not None else '---':>10} "
|
|
f"{rmse if rmse is not None else '---':>10} "
|
|
f"{snr if snr is not None else '---':>8}"
|
|
)
|
|
|
|
valid_snrs = [r["snr"] for r in rows if isinstance(r.get("snr"), (int, float))]
|
|
threshold, rationale = suggest_snr_threshold(valid_snrs)
|
|
print(f"\nSuggested threshold: SNR >= {threshold} ({rationale})")
|
|
for r in rows:
|
|
snr = r.get("snr")
|
|
if isinstance(snr, (int, float)):
|
|
r["eligible_at_2"] = snr >= 2.0
|
|
r["eligible_at_3"] = snr >= 3.0
|
|
r["eligible_at_suggested"] = snr >= threshold
|
|
return rows
|
|
|
|
|
|
if __name__ == "__main__":
|
|
report_all_sites()
|