Switching horses.
This commit is contained in:
parent
25cbd97662
commit
e3e14027fc
51 changed files with 5078 additions and 11678 deletions
495
2-phenocam-screening.py
Normal file
495
2-phenocam-screening.py
Normal file
|
|
@ -0,0 +1,495 @@
|
|||
"""Step 2: PhenoCam GCC + SNR screening on step-1 cache.
|
||||
|
||||
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
|
||||
|
||||
- ``phenocam/{year}.json`` — step-1 manifest
|
||||
- ``phenocam/{year}/{sitename}.json`` — per-site metadata
|
||||
- ``phenocam/{year}/{sitename}_1day.csv`` — GCC timeseries
|
||||
|
||||
Outputs (``data/phenocam_screening/``):
|
||||
|
||||
- ``{year}.json`` — full per-site results
|
||||
- ``{year}.csv`` — flat summary table
|
||||
|
||||
CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional; default: all manifest sites).
|
||||
|
||||
Next step: :mod:`3-sentinel-clouds`.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import csv
|
||||
import json
|
||||
import math
|
||||
import sys
|
||||
from datetime import date, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
from scipy.interpolate import UnivariateSpline
|
||||
|
||||
PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing"
|
||||
if str(PROCESSING_DIR) not in sys.path:
|
||||
sys.path.insert(0, str(PROCESSING_DIR))
|
||||
|
||||
from acquisition_phenocam import _phenocam_summary_gcc_value # noqa: E402
|
||||
|
||||
MIN_GCC_POINTS = 30
|
||||
SNR_THRESHOLD = 2.0
|
||||
CLUSTER_RADIUS_M = 500.0
|
||||
GATE_ORDER = ("phenocam", "snr", "cluster")
|
||||
ONE_DAY_CSV_SUFFIX = "_1day.csv"
|
||||
_EARTH_RADIUS_M = 6371000.0
|
||||
|
||||
|
||||
def load_manifest(path: Path) -> dict[str, Any]:
|
||||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||||
for key in ("evaluation_year", "sites_dir", "sites"):
|
||||
if key not in payload:
|
||||
raise ValueError(f"Expected '{key}' in manifest {path}")
|
||||
return payload
|
||||
|
||||
|
||||
def resolve_sites_dir(manifest_path: Path, manifest: dict[str, Any]) -> Path:
|
||||
return (manifest_path.parent / manifest["sites_dir"]).resolve()
|
||||
|
||||
|
||||
def load_site_entry(sites_dir: Path, sitename: str) -> dict[str, Any]:
|
||||
json_path = sites_dir / f"{sitename}.json"
|
||||
payload = json.loads(json_path.read_text(encoding="utf-8"))
|
||||
csv_path = sites_dir / f"{sitename}{ONE_DAY_CSV_SUFFIX}"
|
||||
payload["_one_day_csv"] = csv_path if csv_path.is_file() else None
|
||||
return payload
|
||||
|
||||
|
||||
def parse_gcc90_series(csv_path: Path, evaluation_year: int) -> list[tuple[str, float]]:
|
||||
lines = [
|
||||
line
|
||||
for line in csv_path.read_text(encoding="utf-8").split("\n")
|
||||
if line and not line.startswith("#")
|
||||
]
|
||||
reader = csv.DictReader(lines)
|
||||
fieldnames = reader.fieldnames or ()
|
||||
use_mean_fallback = "gcc_90" not in fieldnames
|
||||
|
||||
year_start = date(evaluation_year, 1, 1)
|
||||
year_end = date(evaluation_year, 12, 31)
|
||||
series: list[tuple[str, float]] = []
|
||||
for row in reader:
|
||||
date_str = row.get("date")
|
||||
if not date_str:
|
||||
continue
|
||||
try:
|
||||
row_date = datetime.strptime(date_str, "%Y-%m-%d").date()
|
||||
except ValueError:
|
||||
continue
|
||||
if not (year_start <= row_date <= year_end):
|
||||
continue
|
||||
gcc = _phenocam_summary_gcc_value(row, use_mean_fallback)
|
||||
if gcc is None:
|
||||
continue
|
||||
series.append((row_date.isoformat(), float(gcc)))
|
||||
series.sort(key=lambda item: item[0])
|
||||
return series
|
||||
|
||||
|
||||
def _months_covered(day_strings: list[str]) -> int:
|
||||
months: set[int] = set()
|
||||
for day in day_strings:
|
||||
months.add(datetime.strptime(day, "%Y-%m-%d").month)
|
||||
return len(months)
|
||||
|
||||
|
||||
def _aic_for_spline(x: np.ndarray, y: np.ndarray, spline: UnivariateSpline) -> float:
|
||||
residuals = y - spline(x)
|
||||
rss = float(np.sum(residuals**2))
|
||||
n = len(y)
|
||||
if rss <= 0 or n < 4:
|
||||
return math.inf
|
||||
edf = float(spline.get_knots().shape[0] + spline.get_coeffs().shape[0])
|
||||
return n * math.log(rss / n) + 2.0 * edf
|
||||
|
||||
|
||||
def compute_snr_aic_spline(series: list[tuple[str, float]]) -> float | None:
|
||||
if len(series) < MIN_GCC_POINTS:
|
||||
return None
|
||||
|
||||
dates = [datetime.strptime(day, "%Y-%m-%d").date() for day, _ in series]
|
||||
x = np.array([(d - dates[0]).days for d in dates], dtype=float)
|
||||
y = np.array([value for _, value in series], dtype=float)
|
||||
if len(np.unique(x)) < 5:
|
||||
return None
|
||||
|
||||
y_var = float(np.var(y))
|
||||
if y_var <= 0:
|
||||
return None
|
||||
|
||||
candidates = np.logspace(-4, 2, 40) * y_var * len(y)
|
||||
best_spline: UnivariateSpline | None = None
|
||||
best_aic = math.inf
|
||||
for smoothing in candidates:
|
||||
try:
|
||||
spline = UnivariateSpline(x, y, k=3, s=float(smoothing))
|
||||
except Exception:
|
||||
continue
|
||||
aic = _aic_for_spline(x, y, spline)
|
||||
if aic < best_aic:
|
||||
best_aic = aic
|
||||
best_spline = spline
|
||||
|
||||
if best_spline is None:
|
||||
return None
|
||||
|
||||
residuals = y - best_spline(x)
|
||||
rmse = float(np.sqrt(np.mean(residuals**2)))
|
||||
amplitude = float(np.max(y) - np.min(y))
|
||||
if rmse <= 0:
|
||||
return None
|
||||
return amplitude / rmse
|
||||
|
||||
|
||||
def screen_site(
|
||||
site_entry: dict[str, Any],
|
||||
*,
|
||||
evaluation_year: int,
|
||||
min_gcc_points: int,
|
||||
snr_threshold: float,
|
||||
) -> dict[str, Any]:
|
||||
response = site_entry["response"]
|
||||
roi = response.get("roi")
|
||||
csv_path = site_entry.get("_one_day_csv")
|
||||
calculations: dict[str, Any] = {
|
||||
"evaluation_year": evaluation_year,
|
||||
"n_gcc_points": 0,
|
||||
"first_gcc_date": None,
|
||||
"last_gcc_date": None,
|
||||
"months_with_gcc": 0,
|
||||
"snr": None,
|
||||
"min_gcc_points": min_gcc_points,
|
||||
"snr_threshold": snr_threshold,
|
||||
"status": "FAIL",
|
||||
"failing_gate": None,
|
||||
"passed_gates": [],
|
||||
"reason": None,
|
||||
}
|
||||
|
||||
if roi is None or not roi.get("one_day_summary") or csv_path is None:
|
||||
calculations["failing_gate"] = "phenocam"
|
||||
calculations["reason"] = "no_roi"
|
||||
return {"response": response, "calculations": calculations}
|
||||
|
||||
series = parse_gcc90_series(csv_path, evaluation_year)
|
||||
calculations["n_gcc_points"] = len(series)
|
||||
if calculations["n_gcc_points"] == 0:
|
||||
calculations["failing_gate"] = "phenocam"
|
||||
calculations["reason"] = "no_gcc_in_year"
|
||||
return {"response": response, "calculations": calculations}
|
||||
|
||||
day_strings = [day for day, _ in series]
|
||||
calculations["first_gcc_date"] = day_strings[0]
|
||||
calculations["last_gcc_date"] = day_strings[-1]
|
||||
calculations["months_with_gcc"] = _months_covered(day_strings)
|
||||
|
||||
if calculations["n_gcc_points"] < min_gcc_points:
|
||||
calculations["failing_gate"] = "phenocam"
|
||||
calculations["reason"] = "insufficient_gcc_points"
|
||||
return {"response": response, "calculations": calculations}
|
||||
|
||||
calculations["passed_gates"].append("phenocam")
|
||||
|
||||
snr = compute_snr_aic_spline(series)
|
||||
calculations["snr"] = snr
|
||||
if snr is None or snr < snr_threshold:
|
||||
calculations["failing_gate"] = "snr"
|
||||
calculations["reason"] = "insufficient_snr" if snr is not None else "snr_undefined"
|
||||
return {"response": response, "calculations": calculations}
|
||||
|
||||
calculations["passed_gates"].append("snr")
|
||||
calculations["status"] = "PASS"
|
||||
calculations["failing_gate"] = None
|
||||
calculations["reason"] = None
|
||||
return {"response": response, "calculations": calculations}
|
||||
|
||||
|
||||
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
|
||||
p1, p2 = math.radians(lat1), math.radians(lat2)
|
||||
dlat = math.radians(lat2 - lat1)
|
||||
dlon = math.radians(lon2 - lon1)
|
||||
a = math.sin(dlat / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlon / 2) ** 2
|
||||
return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a))
|
||||
|
||||
|
||||
def _site_coords(row: dict[str, Any]) -> tuple[float, float] | None:
|
||||
camera = row["response"]["camera"]
|
||||
lat, lon = camera.get("Lat"), camera.get("Lon")
|
||||
if lat is None or lon is None:
|
||||
return None
|
||||
return float(lat), float(lon)
|
||||
|
||||
|
||||
def _cluster_rank(row: dict[str, Any]) -> tuple[int, float]:
|
||||
calc = row["calculations"]
|
||||
return calc["n_gcc_points"], float(calc.get("snr") or 0.0)
|
||||
|
||||
|
||||
def apply_cluster_gate(results: list[dict[str, Any]], *, radius_m: float) -> int:
|
||||
pool: list[tuple[int, float, float]] = []
|
||||
for idx, row in enumerate(results):
|
||||
if "snr" not in row["calculations"]["passed_gates"]:
|
||||
continue
|
||||
coords = _site_coords(row)
|
||||
if coords is None:
|
||||
row["calculations"]["passed_gates"].append("cluster")
|
||||
continue
|
||||
pool.append((idx, coords[0], coords[1]))
|
||||
|
||||
n = len(pool)
|
||||
parent = list(range(n))
|
||||
|
||||
def find(x: int) -> int:
|
||||
while parent[x] != x:
|
||||
parent[x] = parent[parent[x]]
|
||||
x = parent[x]
|
||||
return x
|
||||
|
||||
def union(a: int, b: int) -> None:
|
||||
ra, rb = find(a), find(b)
|
||||
if ra != rb:
|
||||
parent[rb] = ra
|
||||
|
||||
for i in range(n):
|
||||
_, lat1, lon1 = pool[i]
|
||||
for j in range(i + 1, n):
|
||||
_, lat2, lon2 = pool[j]
|
||||
if _haversine_m(lat1, lon1, lat2, lon2) <= radius_m:
|
||||
union(i, j)
|
||||
|
||||
clusters: dict[int, list[int]] = {}
|
||||
for i in range(n):
|
||||
clusters.setdefault(find(i), []).append(i)
|
||||
|
||||
demoted = 0
|
||||
for members in clusters.values():
|
||||
result_indices = [pool[i][0] for i in members]
|
||||
cluster_size = len(result_indices)
|
||||
winner_idx = max(result_indices, key=lambda idx: _cluster_rank(results[idx]))
|
||||
winner_name = str(results[winner_idx]["response"]["camera"]["Sitename"])
|
||||
for idx in result_indices:
|
||||
calc = results[idx]["calculations"]
|
||||
calc["cluster_size"] = cluster_size
|
||||
if idx == winner_idx:
|
||||
calc["passed_gates"].append("cluster")
|
||||
else:
|
||||
calc["status"] = "FAIL"
|
||||
calc["failing_gate"] = "cluster"
|
||||
calc["reason"] = "nearby_duplicate"
|
||||
calc["cluster_winner"] = winner_name
|
||||
demoted += 1
|
||||
return demoted
|
||||
|
||||
|
||||
def run_screening(
|
||||
manifest: dict[str, Any],
|
||||
sites_dir: Path,
|
||||
*,
|
||||
evaluation_year: int,
|
||||
min_gcc_points: int,
|
||||
snr_threshold: float,
|
||||
site_filter: set[str] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
results: list[dict[str, Any]] = []
|
||||
sitenames = manifest["sites"]
|
||||
if site_filter is not None:
|
||||
sitenames = [name for name in sitenames if name in site_filter]
|
||||
for index, sitename in enumerate(sitenames, start=1):
|
||||
print(f"[PhenoCam-2] ({index}/{len(sitenames)}) {sitename}")
|
||||
site_entry = load_site_entry(sites_dir, sitename)
|
||||
results.append(
|
||||
screen_site(
|
||||
site_entry,
|
||||
evaluation_year=evaluation_year,
|
||||
min_gcc_points=min_gcc_points,
|
||||
snr_threshold=snr_threshold,
|
||||
)
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
def print_summary(results: list[dict[str, Any]], evaluation_year: int) -> None:
|
||||
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
|
||||
gates_label = " + ".join(GATE_ORDER)
|
||||
print(
|
||||
f"\n[PhenoCam-2] Screening for {evaluation_year}: "
|
||||
f"{len(passing)}/{len(results)} pass ({gates_label})"
|
||||
)
|
||||
|
||||
for gate in GATE_ORDER:
|
||||
fails = sum(1 for row in results if row["calculations"]["failing_gate"] == gate)
|
||||
after = sum(1 for row in results if gate in row["calculations"]["passed_gates"])
|
||||
print(f" after_{gate}: {after}, fail_at_{gate}: {fails}")
|
||||
|
||||
print("\nPer-site table")
|
||||
print(
|
||||
f"{'site':<24} {'n':>4} {'mon':>3} {'snr':>6} "
|
||||
f"{'status':>6} gate reason"
|
||||
)
|
||||
print("-" * 72)
|
||||
for row in sorted(
|
||||
results,
|
||||
key=lambda item: str(item["response"]["camera"]["Sitename"]),
|
||||
):
|
||||
camera = row["response"]["camera"]
|
||||
calc = row["calculations"]
|
||||
snr_text = f"{calc['snr']:.2f}" if calc["snr"] is not None else ""
|
||||
print(
|
||||
f"{camera['Sitename']:<24} {calc['n_gcc_points']:4d} "
|
||||
f"{calc['months_with_gcc']:3d} {snr_text:>6} "
|
||||
f"{calc['status']:>6} {(calc['failing_gate'] or '-'):<8} "
|
||||
f"{calc['reason'] or '-'}"
|
||||
)
|
||||
|
||||
|
||||
def write_screening_json(
|
||||
results: list[dict[str, Any]],
|
||||
output_path: Path,
|
||||
evaluation_year: int,
|
||||
) -> None:
|
||||
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
|
||||
payload = {
|
||||
"evaluation_year": evaluation_year,
|
||||
"count": len(results),
|
||||
"qualifying_count": len(passing),
|
||||
"sites": sorted(
|
||||
results,
|
||||
key=lambda item: str(item["response"]["camera"]["Sitename"]),
|
||||
),
|
||||
}
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
|
||||
print(f"[PhenoCam-2] Wrote {output_path}")
|
||||
|
||||
|
||||
def write_screening_csv(results: list[dict[str, Any]], output_path: Path) -> None:
|
||||
rows: list[dict[str, Any]] = []
|
||||
for row in results:
|
||||
camera = row["response"]["camera"]
|
||||
metadata = camera.get("sitemetadata") or {}
|
||||
roi = row["response"].get("roi") or {}
|
||||
calc = row["calculations"]
|
||||
rows.append(
|
||||
{
|
||||
"Sitename": camera.get("Sitename"),
|
||||
"Lat": camera.get("Lat"),
|
||||
"Lon": camera.get("Lon"),
|
||||
"site_description": metadata.get("site_description"),
|
||||
"primary_veg_type": metadata.get("primary_veg_type"),
|
||||
"site_type": metadata.get("site_type"),
|
||||
"one_day_summary": roi.get("one_day_summary"),
|
||||
**calc,
|
||||
}
|
||||
)
|
||||
fieldnames = list(rows[0].keys()) if rows else ["Sitename", "status"]
|
||||
if rows:
|
||||
extra = [k for row in rows for k in row if k not in fieldnames]
|
||||
fieldnames.extend(dict.fromkeys(extra))
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with output_path.open("w", encoding="utf-8", newline="") as handle:
|
||||
writer = csv.DictWriter(handle, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(rows)
|
||||
print(f"[PhenoCam-2] Wrote {output_path}")
|
||||
|
||||
|
||||
def main(argv: list[str] | None = None) -> int:
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument(
|
||||
"--evaluation-year",
|
||||
type=int,
|
||||
default=2025,
|
||||
help="Evaluation year (default: 2025)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--sites",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Comma-separated sitenames (default: all sites in step-1 manifest)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--min-gcc-points",
|
||||
type=int,
|
||||
default=MIN_GCC_POINTS,
|
||||
help=f"Minimum valid gcc_90 observations in-year (default: {MIN_GCC_POINTS})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--snr-threshold",
|
||||
type=float,
|
||||
default=SNR_THRESHOLD,
|
||||
help=f"Minimum AIC-spline SNR (default: {SNR_THRESHOLD})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-json",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Screening output (default: data/phenocam_screening/{year}.json)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output-csv",
|
||||
type=Path,
|
||||
default=None,
|
||||
help="Flat CSV summary path",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cluster-radius-m",
|
||||
type=float,
|
||||
default=CLUSTER_RADIUS_M,
|
||||
help=f"Deduplicate SNR-passed sites within this radius (default: {CLUSTER_RADIUS_M})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--no-cluster",
|
||||
action="store_true",
|
||||
help="Skip nearby-site deduplication gate",
|
||||
)
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
evaluation_year = args.evaluation_year
|
||||
manifest_path = Path("data") / "phenocam" / f"{evaluation_year}.json"
|
||||
if not manifest_path.is_file():
|
||||
raise SystemExit(f"Step-1 manifest not found: {manifest_path}")
|
||||
|
||||
site_filter = None
|
||||
if args.sites:
|
||||
site_filter = {name.strip() for name in args.sites.split(",") if name.strip()}
|
||||
|
||||
manifest = load_manifest(manifest_path)
|
||||
sites_dir_path = resolve_sites_dir(manifest_path, manifest)
|
||||
|
||||
results = run_screening(
|
||||
manifest,
|
||||
sites_dir_path,
|
||||
evaluation_year=evaluation_year,
|
||||
min_gcc_points=args.min_gcc_points,
|
||||
snr_threshold=args.snr_threshold,
|
||||
site_filter=site_filter,
|
||||
)
|
||||
if not args.no_cluster:
|
||||
demoted = apply_cluster_gate(results, radius_m=args.cluster_radius_m)
|
||||
if demoted:
|
||||
print(f"[PhenoCam-2] Cluster dedup: demoted {demoted} nearby duplicate(s)")
|
||||
print_summary(results, evaluation_year)
|
||||
|
||||
default_dir = Path("data") / "phenocam_screening"
|
||||
json_name = f"{evaluation_year}.json"
|
||||
csv_name = f"{evaluation_year}.csv"
|
||||
write_screening_json(
|
||||
results,
|
||||
args.output_json or (default_dir / json_name),
|
||||
evaluation_year,
|
||||
)
|
||||
write_screening_csv(results, args.output_csv or (default_dir / csv_name))
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Loading…
Add table
Add a link
Reference in a new issue