efast-phenocam-validation/2-phenocam-screening.py
Felix Delattre d29754e4a5 Ran linter.
2026-06-11 16:33:14 +02:00

494 lines
16 KiB
Python

"""Step 2: PhenoCam GCC + SNR screening on step-1 cache.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam/{year}.json`` — step-1 manifest
- ``phenocam/{year}/{sitename}.json`` — per-site metadata
- ``phenocam/{year}/{sitename}_1day.csv`` — GCC timeseries
Outputs (``data/phenocam_screening/``):
- ``{year}.json`` — full per-site results
- ``{year}.csv`` — flat summary table
CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional; default: all manifest sites).
Next step: :mod:`3-sentinel-clouds`.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import sys
from datetime import date, datetime
from pathlib import Path
from typing import Any
import numpy as np
from scipy.interpolate import UnivariateSpline
PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing"
if str(PROCESSING_DIR) not in sys.path:
sys.path.insert(0, str(PROCESSING_DIR))
from acquisition_phenocam import _phenocam_summary_gcc_value # noqa: E402
MIN_GCC_POINTS = 30
SNR_THRESHOLD = 2.0
CLUSTER_RADIUS_M = 500.0
GATE_ORDER = ("phenocam", "snr", "cluster")
ONE_DAY_CSV_SUFFIX = "_1day.csv"
_EARTH_RADIUS_M = 6371000.0
def load_manifest(path: Path) -> dict[str, Any]:
payload = json.loads(path.read_text(encoding="utf-8"))
for key in ("evaluation_year", "sites_dir", "sites"):
if key not in payload:
raise ValueError(f"Expected '{key}' in manifest {path}")
return payload
def resolve_sites_dir(manifest_path: Path, manifest: dict[str, Any]) -> Path:
return (manifest_path.parent / manifest["sites_dir"]).resolve()
def load_site_entry(sites_dir: Path, sitename: str) -> dict[str, Any]:
json_path = sites_dir / f"{sitename}.json"
payload = json.loads(json_path.read_text(encoding="utf-8"))
csv_path = sites_dir / f"{sitename}{ONE_DAY_CSV_SUFFIX}"
payload["_one_day_csv"] = csv_path if csv_path.is_file() else None
return payload
def parse_gcc90_series(csv_path: Path, evaluation_year: int) -> list[tuple[str, float]]:
lines = [
line
for line in csv_path.read_text(encoding="utf-8").split("\n")
if line and not line.startswith("#")
]
reader = csv.DictReader(lines)
fieldnames = reader.fieldnames or ()
use_mean_fallback = "gcc_90" not in fieldnames
year_start = date(evaluation_year, 1, 1)
year_end = date(evaluation_year, 12, 31)
series: list[tuple[str, float]] = []
for row in reader:
date_str = row.get("date")
if not date_str:
continue
try:
row_date = datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
continue
if not (year_start <= row_date <= year_end):
continue
gcc = _phenocam_summary_gcc_value(row, use_mean_fallback)
if gcc is None:
continue
series.append((row_date.isoformat(), float(gcc)))
series.sort(key=lambda item: item[0])
return series
def _months_covered(day_strings: list[str]) -> int:
months: set[int] = set()
for day in day_strings:
months.add(datetime.strptime(day, "%Y-%m-%d").month)
return len(months)
def _aic_for_spline(x: np.ndarray, y: np.ndarray, spline: UnivariateSpline) -> float:
residuals = y - spline(x)
rss = float(np.sum(residuals**2))
n = len(y)
if rss <= 0 or n < 4:
return math.inf
edf = float(spline.get_knots().shape[0] + spline.get_coeffs().shape[0])
return n * math.log(rss / n) + 2.0 * edf
def compute_snr_aic_spline(series: list[tuple[str, float]]) -> float | None:
if len(series) < MIN_GCC_POINTS:
return None
dates = [datetime.strptime(day, "%Y-%m-%d").date() for day, _ in series]
x = np.array([(d - dates[0]).days for d in dates], dtype=float)
y = np.array([value for _, value in series], dtype=float)
if len(np.unique(x)) < 5:
return None
y_var = float(np.var(y))
if y_var <= 0:
return None
candidates = np.logspace(-4, 2, 40) * y_var * len(y)
best_spline: UnivariateSpline | None = None
best_aic = math.inf
for smoothing in candidates:
try:
spline = UnivariateSpline(x, y, k=3, s=float(smoothing))
except Exception:
continue
aic = _aic_for_spline(x, y, spline)
if aic < best_aic:
best_aic = aic
best_spline = spline
if best_spline is None:
return None
residuals = y - best_spline(x)
rmse = float(np.sqrt(np.mean(residuals**2)))
amplitude = float(np.max(y) - np.min(y))
if rmse <= 0:
return None
return amplitude / rmse
def screen_site(
site_entry: dict[str, Any],
*,
evaluation_year: int,
min_gcc_points: int,
snr_threshold: float,
) -> dict[str, Any]:
response = site_entry["response"]
roi = response.get("roi")
csv_path = site_entry.get("_one_day_csv")
calculations: dict[str, Any] = {
"evaluation_year": evaluation_year,
"n_gcc_points": 0,
"first_gcc_date": None,
"last_gcc_date": None,
"months_with_gcc": 0,
"snr": None,
"min_gcc_points": min_gcc_points,
"snr_threshold": snr_threshold,
"status": "FAIL",
"failing_gate": None,
"passed_gates": [],
"reason": None,
}
if roi is None or not roi.get("one_day_summary") or csv_path is None:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "no_roi"
return {"response": response, "calculations": calculations}
series = parse_gcc90_series(csv_path, evaluation_year)
calculations["n_gcc_points"] = len(series)
if calculations["n_gcc_points"] == 0:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "no_gcc_in_year"
return {"response": response, "calculations": calculations}
day_strings = [day for day, _ in series]
calculations["first_gcc_date"] = day_strings[0]
calculations["last_gcc_date"] = day_strings[-1]
calculations["months_with_gcc"] = _months_covered(day_strings)
if calculations["n_gcc_points"] < min_gcc_points:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "insufficient_gcc_points"
return {"response": response, "calculations": calculations}
calculations["passed_gates"].append("phenocam")
snr = compute_snr_aic_spline(series)
calculations["snr"] = snr
if snr is None or snr < snr_threshold:
calculations["failing_gate"] = "snr"
calculations["reason"] = (
"insufficient_snr" if snr is not None else "snr_undefined"
)
return {"response": response, "calculations": calculations}
calculations["passed_gates"].append("snr")
calculations["status"] = "PASS"
calculations["failing_gate"] = None
calculations["reason"] = None
return {"response": response, "calculations": calculations}
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
p1, p2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlon / 2) ** 2
return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a))
def _site_coords(row: dict[str, Any]) -> tuple[float, float] | None:
camera = row["response"]["camera"]
lat, lon = camera.get("Lat"), camera.get("Lon")
if lat is None or lon is None:
return None
return float(lat), float(lon)
def _cluster_rank(row: dict[str, Any]) -> tuple[int, float]:
calc = row["calculations"]
return calc["n_gcc_points"], float(calc.get("snr") or 0.0)
def apply_cluster_gate(results: list[dict[str, Any]], *, radius_m: float) -> int:
pool: list[tuple[int, float, float]] = []
for idx, row in enumerate(results):
if "snr" not in row["calculations"]["passed_gates"]:
continue
coords = _site_coords(row)
if coords is None:
row["calculations"]["passed_gates"].append("cluster")
continue
pool.append((idx, coords[0], coords[1]))
n = len(pool)
parent = list(range(n))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a: int, b: int) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
for i in range(n):
_, lat1, lon1 = pool[i]
for j in range(i + 1, n):
_, lat2, lon2 = pool[j]
if _haversine_m(lat1, lon1, lat2, lon2) <= radius_m:
union(i, j)
clusters: dict[int, list[int]] = {}
for i in range(n):
clusters.setdefault(find(i), []).append(i)
demoted = 0
for members in clusters.values():
result_indices = [pool[i][0] for i in members]
cluster_size = len(result_indices)
winner_idx = max(result_indices, key=lambda idx: _cluster_rank(results[idx]))
winner_name = str(results[winner_idx]["response"]["camera"]["Sitename"])
for idx in result_indices:
calc = results[idx]["calculations"]
calc["cluster_size"] = cluster_size
if idx == winner_idx:
calc["passed_gates"].append("cluster")
else:
calc["status"] = "FAIL"
calc["failing_gate"] = "cluster"
calc["reason"] = "nearby_duplicate"
calc["cluster_winner"] = winner_name
demoted += 1
return demoted
def run_screening(
manifest: dict[str, Any],
sites_dir: Path,
*,
evaluation_year: int,
min_gcc_points: int,
snr_threshold: float,
site_filter: set[str] | None = None,
) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
sitenames = manifest["sites"]
if site_filter is not None:
sitenames = [name for name in sitenames if name in site_filter]
for index, sitename in enumerate(sitenames, start=1):
print(f"[PhenoCam-2] ({index}/{len(sitenames)}) {sitename}")
site_entry = load_site_entry(sites_dir, sitename)
results.append(
screen_site(
site_entry,
evaluation_year=evaluation_year,
min_gcc_points=min_gcc_points,
snr_threshold=snr_threshold,
)
)
return results
def print_summary(results: list[dict[str, Any]], evaluation_year: int) -> None:
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
gates_label = " + ".join(GATE_ORDER)
print(
f"\n[PhenoCam-2] Screening for {evaluation_year}: "
f"{len(passing)}/{len(results)} pass ({gates_label})"
)
for gate in GATE_ORDER:
fails = sum(1 for row in results if row["calculations"]["failing_gate"] == gate)
after = sum(1 for row in results if gate in row["calculations"]["passed_gates"])
print(f" after_{gate}: {after}, fail_at_{gate}: {fails}")
print("\nPer-site table")
print(f"{'site':<24} {'n':>4} {'mon':>3} {'snr':>6} {'status':>6} gate reason")
print("-" * 72)
for row in sorted(
results,
key=lambda item: str(item["response"]["camera"]["Sitename"]),
):
camera = row["response"]["camera"]
calc = row["calculations"]
snr_text = f"{calc['snr']:.2f}" if calc["snr"] is not None else ""
print(
f"{camera['Sitename']:<24} {calc['n_gcc_points']:4d} "
f"{calc['months_with_gcc']:3d} {snr_text:>6} "
f"{calc['status']:>6} {(calc['failing_gate'] or '-'):<8} "
f"{calc['reason'] or '-'}"
)
def write_screening_json(
results: list[dict[str, Any]],
output_path: Path,
evaluation_year: int,
) -> None:
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
payload = {
"evaluation_year": evaluation_year,
"count": len(results),
"qualifying_count": len(passing),
"sites": sorted(
results,
key=lambda item: str(item["response"]["camera"]["Sitename"]),
),
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(f"[PhenoCam-2] Wrote {output_path}")
def write_screening_csv(results: list[dict[str, Any]], output_path: Path) -> None:
rows: list[dict[str, Any]] = []
for row in results:
camera = row["response"]["camera"]
metadata = camera.get("sitemetadata") or {}
roi = row["response"].get("roi") or {}
calc = row["calculations"]
rows.append(
{
"Sitename": camera.get("Sitename"),
"Lat": camera.get("Lat"),
"Lon": camera.get("Lon"),
"site_description": metadata.get("site_description"),
"primary_veg_type": metadata.get("primary_veg_type"),
"site_type": metadata.get("site_type"),
"one_day_summary": roi.get("one_day_summary"),
**calc,
}
)
fieldnames = list(rows[0].keys()) if rows else ["Sitename", "status"]
if rows:
extra = [k for row in rows for k in row if k not in fieldnames]
fieldnames.extend(dict.fromkeys(extra))
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"[PhenoCam-2] Wrote {output_path}")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--evaluation-year",
type=int,
default=2025,
help="Evaluation year (default: 2025)",
)
parser.add_argument(
"--sites",
type=str,
default=None,
help="Comma-separated sitenames (default: all sites in step-1 manifest)",
)
parser.add_argument(
"--min-gcc-points",
type=int,
default=MIN_GCC_POINTS,
help=f"Minimum valid gcc_90 observations in-year (default: {MIN_GCC_POINTS})",
)
parser.add_argument(
"--snr-threshold",
type=float,
default=SNR_THRESHOLD,
help=f"Minimum AIC-spline SNR (default: {SNR_THRESHOLD})",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Screening output (default: data/phenocam_screening/{year}.json)",
)
parser.add_argument(
"--output-csv",
type=Path,
default=None,
help="Flat CSV summary path",
)
parser.add_argument(
"--cluster-radius-m",
type=float,
default=CLUSTER_RADIUS_M,
help=f"Deduplicate SNR-passed sites within this radius (default: {CLUSTER_RADIUS_M})",
)
parser.add_argument(
"--no-cluster",
action="store_true",
help="Skip nearby-site deduplication gate",
)
args = parser.parse_args(argv)
evaluation_year = args.evaluation_year
manifest_path = Path("data") / "phenocam" / f"{evaluation_year}.json"
if not manifest_path.is_file():
raise SystemExit(f"Step-1 manifest not found: {manifest_path}")
site_filter = None
if args.sites:
site_filter = {name.strip() for name in args.sites.split(",") if name.strip()}
manifest = load_manifest(manifest_path)
sites_dir_path = resolve_sites_dir(manifest_path, manifest)
results = run_screening(
manifest,
sites_dir_path,
evaluation_year=evaluation_year,
min_gcc_points=args.min_gcc_points,
snr_threshold=args.snr_threshold,
site_filter=site_filter,
)
if not args.no_cluster:
demoted = apply_cluster_gate(results, radius_m=args.cluster_radius_m)
if demoted:
print(f"[PhenoCam-2] Cluster dedup: demoted {demoted} nearby duplicate(s)")
print_summary(results, evaluation_year)
default_dir = Path("data") / "phenocam_screening"
json_name = f"{evaluation_year}.json"
csv_name = f"{evaluation_year}.csv"
write_screening_json(
results,
args.output_json or (default_dir / json_name),
evaluation_year,
)
write_screening_csv(results, args.output_csv or (default_dir / csv_name))
return 0
if __name__ == "__main__":
raise SystemExit(main())