Compare commits

..

No commits in common. "sources" and "main" have entirely different histories.

43 changed files with 7209 additions and 59075 deletions

75
.gitignore vendored Normal file
View file

@ -0,0 +1,75 @@
# Generated caches and downloads (regenerate via pipeline steps)
data/
# Environment and secrets
.env
.env.*
!.env.example
.venv/
venv/
env/
ENV/
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
*.pyd
.Python
# uv
.uv/
# Testing & coverage
.pytest_cache/
.coverage
.coverage.*
coverage.xml
nosetests.xml
htmlcov/
.tox/
# Linting & type checking
.ruff_cache/
.mypy_cache/
.dmypy.json
dmypy.json
.pytype/
# Distribution / packaging
build/
dist/
*.egg-info/
*.egg
MANIFEST
pip-log.txt
pip-delete-this-directory.txt
# Logs
*.log
nohup.out
# Jupyter
.ipynb_checkpoints/
profile_default/
ipython_config.py
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
desktop.ini
# Merge artifacts
*.orig
*.rej
# Agents
AGENTS.md

8
.pre-commit-config.yaml Normal file
View file

@ -0,0 +1,8 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.4
hooks:
- id: ruff
args: [--fix]
- id: ruff-format

1
.python-version Normal file
View file

@ -0,0 +1 @@
3.11.10

284
1-phenocam.py Normal file
View file

@ -0,0 +1,284 @@
"""Step 1: download worldwide PhenoCam sites for a calendar year.
Inputs (``data/``): none queries the PhenoCam API.
Outputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam/{year}.json`` site list manifest
- ``phenocam/{year}/{sitename}.json`` camera + ROI metadata
- ``phenocam/{year}/{sitename}_1day.csv`` ``one_day_summary`` GCC CSV
CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional comma-separated filter).
Next step: :mod:`2-phenocam-screening`.
"""
from __future__ import annotations
import argparse
import json
import sys
from datetime import date
from pathlib import Path
from typing import Any
import requests
PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing"
if str(PROCESSING_DIR) not in sys.path:
sys.path.insert(0, str(PROCESSING_DIR))
from acquisition_phenocam import PHENOCAM_API # noqa: E402
from acquisition_phenocam_all_europe import _paginate_cameras, _parse_iso_date # noqa: E402
EVALUATION_YEAR = 2025
HOST_PROBE = "https://phenocam.nau.edu/api/cameras/?limit=1"
ONE_DAY_CSV_SUFFIX = "_1day.csv"
def check_phenocam_host() -> None:
try:
response = requests.get(HOST_PROBE, timeout=30)
response.raise_for_status()
except requests.RequestException as exc:
raise RuntimeError(
f"PhenoCam API unreachable (phenocam.nau.edu): "
f"{exc.__class__.__name__}: {exc}"
) from exc
def _overlaps_year(first: str | None, last: str | None, season: int) -> bool:
start = _parse_iso_date(first)
end = _parse_iso_date(last)
if start is None or end is None:
return False
return start <= date(season, 12, 31) and end >= date(season, 1, 1)
def sites_dir(cache_dir: Path, evaluation_year: int) -> Path:
return cache_dir / "phenocam" / str(evaluation_year)
def site_json_path(cache_dir: Path, evaluation_year: int, sitename: str) -> Path:
return sites_dir(cache_dir, evaluation_year) / f"{sitename}.json"
def site_csv_path(cache_dir: Path, evaluation_year: int, sitename: str) -> Path:
return sites_dir(cache_dir, evaluation_year) / f"{sitename}{ONE_DAY_CSV_SUFFIX}"
def load_candidate_cameras(
evaluation_year: int,
*,
site_filter: set[str] | None = None,
active_only: bool = False,
limit: int | None = None,
) -> list[dict[str, Any]]:
cameras: list[dict[str, Any]] = []
for camera in _paginate_cameras():
if active_only and not camera.get("active"):
continue
sitename = str(camera["Sitename"])
if site_filter is not None and sitename not in site_filter:
continue
if not _overlaps_year(
camera.get("date_first"), camera.get("date_last"), evaluation_year
):
continue
cameras.append(dict(camera))
cameras.sort(key=lambda item: str(item["Sitename"]))
if limit is not None:
cameras = cameras[:limit]
return cameras
def fetch_roi_record(site_name: str) -> dict[str, Any] | None:
rois: list[dict[str, Any]] = []
url = f"{PHENOCAM_API}/roilists/"
params: dict[str, Any] | None = {"site": site_name}
while url:
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
payload = response.json()
rois.extend(
item for item in payload.get("results", []) if item.get("site") == site_name
)
url = payload.get("next")
params = None
if rois:
break
return dict(rois[0]) if rois else None
def download_one_day_csv(csv_url: str, output_path: Path) -> None:
response = requests.get(csv_url, timeout=60)
response.raise_for_status()
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(response.text, encoding="utf-8")
def download_site(
camera: dict[str, Any],
evaluation_year: int,
cache_dir: Path,
) -> str:
sitename = str(camera["Sitename"])
roi = fetch_roi_record(sitename)
payload = {"response": {"camera": camera, "roi": roi}}
json_path = site_json_path(cache_dir, evaluation_year, sitename)
json_path.parent.mkdir(parents=True, exist_ok=True)
json_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
csv_url = roi.get("one_day_summary") if roi else None
if csv_url:
download_one_day_csv(
csv_url, site_csv_path(cache_dir, evaluation_year, sitename)
)
return sitename
def load_or_download_site(
camera: dict[str, Any],
evaluation_year: int,
cache_dir: Path,
*,
refresh: bool,
) -> str:
sitename = str(camera["Sitename"])
json_path = site_json_path(cache_dir, evaluation_year, sitename)
csv_path = site_csv_path(cache_dir, evaluation_year, sitename)
if not refresh and json_path.is_file():
if not csv_path.is_file():
payload = json.loads(json_path.read_text(encoding="utf-8"))
roi = payload.get("response", {}).get("roi") or {}
csv_url = roi.get("one_day_summary")
if csv_url:
download_one_day_csv(csv_url, csv_path)
return sitename
return download_site(camera, evaluation_year, cache_dir)
def run_download(
*,
cache_dir: Path,
evaluation_year: int,
active_only: bool = False,
site_filter: set[str] | None = None,
limit: int | None = None,
refresh: bool = False,
) -> list[str]:
check_phenocam_host()
candidates = load_candidate_cameras(
evaluation_year,
site_filter=site_filter,
active_only=active_only,
limit=limit,
)
print(
f"[PhenoCam-1] {len(candidates)} candidate(s) with archive overlap for "
f"{evaluation_year}"
)
sitenames: list[str] = []
for index, camera in enumerate(candidates, start=1):
sitename = str(camera["Sitename"])
print(
f"[PhenoCam-1] ({index}/{len(candidates)}) {sitename} "
f"({float(camera['Lat']):.4f}, {float(camera['Lon']):.4f})"
)
sitenames.append(
load_or_download_site(
camera,
evaluation_year,
cache_dir,
refresh=refresh,
)
)
return sorted(sitenames)
def write_manifest(
sitenames: list[str],
output_path: Path,
cache_dir: Path,
evaluation_year: int,
) -> None:
rel_sites_dir = sites_dir(cache_dir, evaluation_year).relative_to(
output_path.parent
)
payload = {
"evaluation_year": evaluation_year,
"count": len(sitenames),
"sites_dir": rel_sites_dir.as_posix(),
"sites": sitenames,
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(f"[PhenoCam-1] Wrote {output_path}")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--cache-dir",
type=Path,
default=Path("data"),
help="Base directory for per-site files and manifest",
)
parser.add_argument(
"--evaluation-year",
type=int,
default=EVALUATION_YEAR,
help=f"Calendar year to download (default: {EVALUATION_YEAR})",
)
parser.add_argument(
"--active-only",
action="store_true",
help="Restrict candidates to cameras marked active in the API",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Process only the first N candidate sites (testing)",
)
parser.add_argument(
"--sites",
type=str,
default=None,
help="Comma-separated sitenames to download (testing)",
)
parser.add_argument(
"--refresh",
action="store_true",
help="Re-download sites even when cache files exist",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Manifest output path (default: data/phenocam/{year}.json)",
)
args = parser.parse_args(argv)
site_filter = None
if args.sites:
site_filter = {name.strip() for name in args.sites.split(",") if name.strip()}
sitenames = run_download(
cache_dir=args.cache_dir,
evaluation_year=args.evaluation_year,
active_only=args.active_only,
site_filter=site_filter,
limit=args.limit,
refresh=args.refresh,
)
manifest_path = args.output_json or (
args.cache_dir / "phenocam" / f"{args.evaluation_year}.json"
)
write_manifest(sitenames, manifest_path, args.cache_dir, args.evaluation_year)
return 0
if __name__ == "__main__":
raise SystemExit(main())

494
2-phenocam-screening.py Normal file
View file

@ -0,0 +1,494 @@
"""Step 2: PhenoCam GCC + SNR screening on step-1 cache.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam/{year}.json`` step-1 manifest
- ``phenocam/{year}/{sitename}.json`` per-site metadata
- ``phenocam/{year}/{sitename}_1day.csv`` GCC timeseries
Outputs (``data/phenocam_screening/``):
- ``{year}.json`` full per-site results
- ``{year}.csv`` flat summary table
CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional; default: all manifest sites).
Next step: :mod:`3-sentinel-clouds`.
"""
from __future__ import annotations
import argparse
import csv
import json
import math
import sys
from datetime import date, datetime
from pathlib import Path
from typing import Any
import numpy as np
from scipy.interpolate import UnivariateSpline
PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing"
if str(PROCESSING_DIR) not in sys.path:
sys.path.insert(0, str(PROCESSING_DIR))
from acquisition_phenocam import _phenocam_summary_gcc_value # noqa: E402
MIN_GCC_POINTS = 30
SNR_THRESHOLD = 2.0
CLUSTER_RADIUS_M = 500.0
GATE_ORDER = ("phenocam", "snr", "cluster")
ONE_DAY_CSV_SUFFIX = "_1day.csv"
_EARTH_RADIUS_M = 6371000.0
def load_manifest(path: Path) -> dict[str, Any]:
payload = json.loads(path.read_text(encoding="utf-8"))
for key in ("evaluation_year", "sites_dir", "sites"):
if key not in payload:
raise ValueError(f"Expected '{key}' in manifest {path}")
return payload
def resolve_sites_dir(manifest_path: Path, manifest: dict[str, Any]) -> Path:
return (manifest_path.parent / manifest["sites_dir"]).resolve()
def load_site_entry(sites_dir: Path, sitename: str) -> dict[str, Any]:
json_path = sites_dir / f"{sitename}.json"
payload = json.loads(json_path.read_text(encoding="utf-8"))
csv_path = sites_dir / f"{sitename}{ONE_DAY_CSV_SUFFIX}"
payload["_one_day_csv"] = csv_path if csv_path.is_file() else None
return payload
def parse_gcc90_series(csv_path: Path, evaluation_year: int) -> list[tuple[str, float]]:
lines = [
line
for line in csv_path.read_text(encoding="utf-8").split("\n")
if line and not line.startswith("#")
]
reader = csv.DictReader(lines)
fieldnames = reader.fieldnames or ()
use_mean_fallback = "gcc_90" not in fieldnames
year_start = date(evaluation_year, 1, 1)
year_end = date(evaluation_year, 12, 31)
series: list[tuple[str, float]] = []
for row in reader:
date_str = row.get("date")
if not date_str:
continue
try:
row_date = datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
continue
if not (year_start <= row_date <= year_end):
continue
gcc = _phenocam_summary_gcc_value(row, use_mean_fallback)
if gcc is None:
continue
series.append((row_date.isoformat(), float(gcc)))
series.sort(key=lambda item: item[0])
return series
def _months_covered(day_strings: list[str]) -> int:
months: set[int] = set()
for day in day_strings:
months.add(datetime.strptime(day, "%Y-%m-%d").month)
return len(months)
def _aic_for_spline(x: np.ndarray, y: np.ndarray, spline: UnivariateSpline) -> float:
residuals = y - spline(x)
rss = float(np.sum(residuals**2))
n = len(y)
if rss <= 0 or n < 4:
return math.inf
edf = float(spline.get_knots().shape[0] + spline.get_coeffs().shape[0])
return n * math.log(rss / n) + 2.0 * edf
def compute_snr_aic_spline(series: list[tuple[str, float]]) -> float | None:
if len(series) < MIN_GCC_POINTS:
return None
dates = [datetime.strptime(day, "%Y-%m-%d").date() for day, _ in series]
x = np.array([(d - dates[0]).days for d in dates], dtype=float)
y = np.array([value for _, value in series], dtype=float)
if len(np.unique(x)) < 5:
return None
y_var = float(np.var(y))
if y_var <= 0:
return None
candidates = np.logspace(-4, 2, 40) * y_var * len(y)
best_spline: UnivariateSpline | None = None
best_aic = math.inf
for smoothing in candidates:
try:
spline = UnivariateSpline(x, y, k=3, s=float(smoothing))
except Exception:
continue
aic = _aic_for_spline(x, y, spline)
if aic < best_aic:
best_aic = aic
best_spline = spline
if best_spline is None:
return None
residuals = y - best_spline(x)
rmse = float(np.sqrt(np.mean(residuals**2)))
amplitude = float(np.max(y) - np.min(y))
if rmse <= 0:
return None
return amplitude / rmse
def screen_site(
site_entry: dict[str, Any],
*,
evaluation_year: int,
min_gcc_points: int,
snr_threshold: float,
) -> dict[str, Any]:
response = site_entry["response"]
roi = response.get("roi")
csv_path = site_entry.get("_one_day_csv")
calculations: dict[str, Any] = {
"evaluation_year": evaluation_year,
"n_gcc_points": 0,
"first_gcc_date": None,
"last_gcc_date": None,
"months_with_gcc": 0,
"snr": None,
"min_gcc_points": min_gcc_points,
"snr_threshold": snr_threshold,
"status": "FAIL",
"failing_gate": None,
"passed_gates": [],
"reason": None,
}
if roi is None or not roi.get("one_day_summary") or csv_path is None:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "no_roi"
return {"response": response, "calculations": calculations}
series = parse_gcc90_series(csv_path, evaluation_year)
calculations["n_gcc_points"] = len(series)
if calculations["n_gcc_points"] == 0:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "no_gcc_in_year"
return {"response": response, "calculations": calculations}
day_strings = [day for day, _ in series]
calculations["first_gcc_date"] = day_strings[0]
calculations["last_gcc_date"] = day_strings[-1]
calculations["months_with_gcc"] = _months_covered(day_strings)
if calculations["n_gcc_points"] < min_gcc_points:
calculations["failing_gate"] = "phenocam"
calculations["reason"] = "insufficient_gcc_points"
return {"response": response, "calculations": calculations}
calculations["passed_gates"].append("phenocam")
snr = compute_snr_aic_spline(series)
calculations["snr"] = snr
if snr is None or snr < snr_threshold:
calculations["failing_gate"] = "snr"
calculations["reason"] = (
"insufficient_snr" if snr is not None else "snr_undefined"
)
return {"response": response, "calculations": calculations}
calculations["passed_gates"].append("snr")
calculations["status"] = "PASS"
calculations["failing_gate"] = None
calculations["reason"] = None
return {"response": response, "calculations": calculations}
def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
p1, p2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlon / 2) ** 2
return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a))
def _site_coords(row: dict[str, Any]) -> tuple[float, float] | None:
camera = row["response"]["camera"]
lat, lon = camera.get("Lat"), camera.get("Lon")
if lat is None or lon is None:
return None
return float(lat), float(lon)
def _cluster_rank(row: dict[str, Any]) -> tuple[int, float]:
calc = row["calculations"]
return calc["n_gcc_points"], float(calc.get("snr") or 0.0)
def apply_cluster_gate(results: list[dict[str, Any]], *, radius_m: float) -> int:
pool: list[tuple[int, float, float]] = []
for idx, row in enumerate(results):
if "snr" not in row["calculations"]["passed_gates"]:
continue
coords = _site_coords(row)
if coords is None:
row["calculations"]["passed_gates"].append("cluster")
continue
pool.append((idx, coords[0], coords[1]))
n = len(pool)
parent = list(range(n))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a: int, b: int) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[rb] = ra
for i in range(n):
_, lat1, lon1 = pool[i]
for j in range(i + 1, n):
_, lat2, lon2 = pool[j]
if _haversine_m(lat1, lon1, lat2, lon2) <= radius_m:
union(i, j)
clusters: dict[int, list[int]] = {}
for i in range(n):
clusters.setdefault(find(i), []).append(i)
demoted = 0
for members in clusters.values():
result_indices = [pool[i][0] for i in members]
cluster_size = len(result_indices)
winner_idx = max(result_indices, key=lambda idx: _cluster_rank(results[idx]))
winner_name = str(results[winner_idx]["response"]["camera"]["Sitename"])
for idx in result_indices:
calc = results[idx]["calculations"]
calc["cluster_size"] = cluster_size
if idx == winner_idx:
calc["passed_gates"].append("cluster")
else:
calc["status"] = "FAIL"
calc["failing_gate"] = "cluster"
calc["reason"] = "nearby_duplicate"
calc["cluster_winner"] = winner_name
demoted += 1
return demoted
def run_screening(
manifest: dict[str, Any],
sites_dir: Path,
*,
evaluation_year: int,
min_gcc_points: int,
snr_threshold: float,
site_filter: set[str] | None = None,
) -> list[dict[str, Any]]:
results: list[dict[str, Any]] = []
sitenames = manifest["sites"]
if site_filter is not None:
sitenames = [name for name in sitenames if name in site_filter]
for index, sitename in enumerate(sitenames, start=1):
print(f"[PhenoCam-2] ({index}/{len(sitenames)}) {sitename}")
site_entry = load_site_entry(sites_dir, sitename)
results.append(
screen_site(
site_entry,
evaluation_year=evaluation_year,
min_gcc_points=min_gcc_points,
snr_threshold=snr_threshold,
)
)
return results
def print_summary(results: list[dict[str, Any]], evaluation_year: int) -> None:
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
gates_label = " + ".join(GATE_ORDER)
print(
f"\n[PhenoCam-2] Screening for {evaluation_year}: "
f"{len(passing)}/{len(results)} pass ({gates_label})"
)
for gate in GATE_ORDER:
fails = sum(1 for row in results if row["calculations"]["failing_gate"] == gate)
after = sum(1 for row in results if gate in row["calculations"]["passed_gates"])
print(f" after_{gate}: {after}, fail_at_{gate}: {fails}")
print("\nPer-site table")
print(f"{'site':<24} {'n':>4} {'mon':>3} {'snr':>6} {'status':>6} gate reason")
print("-" * 72)
for row in sorted(
results,
key=lambda item: str(item["response"]["camera"]["Sitename"]),
):
camera = row["response"]["camera"]
calc = row["calculations"]
snr_text = f"{calc['snr']:.2f}" if calc["snr"] is not None else ""
print(
f"{camera['Sitename']:<24} {calc['n_gcc_points']:4d} "
f"{calc['months_with_gcc']:3d} {snr_text:>6} "
f"{calc['status']:>6} {(calc['failing_gate'] or '-'):<8} "
f"{calc['reason'] or '-'}"
)
def write_screening_json(
results: list[dict[str, Any]],
output_path: Path,
evaluation_year: int,
) -> None:
passing = [row for row in results if row["calculations"]["status"] == "PASS"]
payload = {
"evaluation_year": evaluation_year,
"count": len(results),
"qualifying_count": len(passing),
"sites": sorted(
results,
key=lambda item: str(item["response"]["camera"]["Sitename"]),
),
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
print(f"[PhenoCam-2] Wrote {output_path}")
def write_screening_csv(results: list[dict[str, Any]], output_path: Path) -> None:
rows: list[dict[str, Any]] = []
for row in results:
camera = row["response"]["camera"]
metadata = camera.get("sitemetadata") or {}
roi = row["response"].get("roi") or {}
calc = row["calculations"]
rows.append(
{
"Sitename": camera.get("Sitename"),
"Lat": camera.get("Lat"),
"Lon": camera.get("Lon"),
"site_description": metadata.get("site_description"),
"primary_veg_type": metadata.get("primary_veg_type"),
"site_type": metadata.get("site_type"),
"one_day_summary": roi.get("one_day_summary"),
**calc,
}
)
fieldnames = list(rows[0].keys()) if rows else ["Sitename", "status"]
if rows:
extra = [k for row in rows for k in row if k not in fieldnames]
fieldnames.extend(dict.fromkeys(extra))
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
print(f"[PhenoCam-2] Wrote {output_path}")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--evaluation-year",
type=int,
default=2025,
help="Evaluation year (default: 2025)",
)
parser.add_argument(
"--sites",
type=str,
default=None,
help="Comma-separated sitenames (default: all sites in step-1 manifest)",
)
parser.add_argument(
"--min-gcc-points",
type=int,
default=MIN_GCC_POINTS,
help=f"Minimum valid gcc_90 observations in-year (default: {MIN_GCC_POINTS})",
)
parser.add_argument(
"--snr-threshold",
type=float,
default=SNR_THRESHOLD,
help=f"Minimum AIC-spline SNR (default: {SNR_THRESHOLD})",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Screening output (default: data/phenocam_screening/{year}.json)",
)
parser.add_argument(
"--output-csv",
type=Path,
default=None,
help="Flat CSV summary path",
)
parser.add_argument(
"--cluster-radius-m",
type=float,
default=CLUSTER_RADIUS_M,
help=f"Deduplicate SNR-passed sites within this radius (default: {CLUSTER_RADIUS_M})",
)
parser.add_argument(
"--no-cluster",
action="store_true",
help="Skip nearby-site deduplication gate",
)
args = parser.parse_args(argv)
evaluation_year = args.evaluation_year
manifest_path = Path("data") / "phenocam" / f"{evaluation_year}.json"
if not manifest_path.is_file():
raise SystemExit(f"Step-1 manifest not found: {manifest_path}")
site_filter = None
if args.sites:
site_filter = {name.strip() for name in args.sites.split(",") if name.strip()}
manifest = load_manifest(manifest_path)
sites_dir_path = resolve_sites_dir(manifest_path, manifest)
results = run_screening(
manifest,
sites_dir_path,
evaluation_year=evaluation_year,
min_gcc_points=args.min_gcc_points,
snr_threshold=args.snr_threshold,
site_filter=site_filter,
)
if not args.no_cluster:
demoted = apply_cluster_gate(results, radius_m=args.cluster_radius_m)
if demoted:
print(f"[PhenoCam-2] Cluster dedup: demoted {demoted} nearby duplicate(s)")
print_summary(results, evaluation_year)
default_dir = Path("data") / "phenocam_screening"
json_name = f"{evaluation_year}.json"
csv_name = f"{evaluation_year}.csv"
write_screening_json(
results,
args.output_json or (default_dir / json_name),
evaluation_year,
)
write_screening_csv(results, args.output_csv or (default_dir / csv_name))
return 0
if __name__ == "__main__":
raise SystemExit(main())

931
3-sentinel-data.py Normal file
View file

@ -0,0 +1,931 @@
"""Step 3: Download S2 and S3 rasters and prepare EFAST inputs.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam_screening/{year}.json`` step-2 PASS sites (coordinates included)
Outputs (``data/``):
- ``sentinel_data/{year}/{sitename}/raw/s3/*.tif`` S3 SYN L2 per-date GeoTIFFs
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` S2 REFL + DIST_CLOUD GeoTIFFs
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` S3 composite GeoTIFFs
- ``sentinel_data/{year}/{sitename}/data.json`` run summary
Requires ``CDSE_USER`` / ``CDSE_PASSWORD`` in ``../.env`` (workspace root; ``uv sync`` installs efast).
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` (optional; default: all step-2 PASS sites)
Prior step: :mod:`2-phenocam-screening`.
Next step: :mod:`4-fusion`.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any
import netCDF4
import numpy as np
import openeo
import rasterio
import requests
from dotenv import load_dotenv
from pystac_client import Client
from rasterio import shutil as rio_shutil
from rasterio.enums import Resampling
from rasterio.errors import WindowError
from rasterio.transform import from_bounds
from rasterio.vrt import WarpedVRT
from rasterio.warp import transform_geom
from rasterio.windows import Window
from rasterio.windows import from_bounds as window_from_bounds
from rasterio.windows import transform as window_transform
from shapely import wkt as shapely_wkt
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Public constants — edit here to change pipeline behaviour
# ---------------------------------------------------------------------------
S2_BANDS = ["B02", "B03", "B04"]
S3_BANDS = [
"Syn_Oa04_reflectance",
"Syn_Oa06_reflectance",
"Syn_Oa08_reflectance",
"Syn_Oa17_reflectance",
]
S3_BAND_NAMES = ["SDR_Oa04", "SDR_Oa06", "SDR_Oa08", "SDR_Oa17"]
RESOLUTION_RATIO = 30
S3_MOSAIC_DAYS = 100
S3_COMPOSITE_STEP = 2
S3_COMPOSITE_SIGMA_DOY = 10
S3_COMPOSITE_D = 20
S3_SMOOTHING_STD = 1
S3_REFLECTANCE_SCALE = 10_000 # OpenEO SYN L2 SDR → 01 (EFAST expects < 5)
# ---------------------------------------------------------------------------
# Internal S2 constants
# ---------------------------------------------------------------------------
EARTH_SEARCH_URL = "https://earth-search.aws.element84.com/v1"
_BAND_ASSETS: dict[str, str] = {
"B02": "blue",
"B03": "green",
"B04": "red",
"B05": "rededge1",
"B06": "rededge2",
"B07": "rededge3",
"B08": "nir",
"B8A": "nir08",
"B11": "swir16",
"B12": "swir22",
}
_SCL_ASSET = "scl"
_MIN_BBOX_HALF_DEG = 0.008
_GDAL_COG_ENV = {
# HTTP/1.1 avoids HTTP/2 multiplexing connection-reset cascades on S3.
"GDAL_HTTP_VERSION": "1.1",
"GDAL_HTTP_MERGE_CONSECUTIVE_RANGES": "YES",
"GDAL_HTTP_TCP_KEEPALIVE": "YES",
"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR",
"CPL_VSIL_CURL_CACHE_SIZE": "200000000",
# Built-in GDAL retries for 429/502/503/504 and transient resets.
"GDAL_HTTP_MAX_RETRY": "3",
"GDAL_HTTP_RETRY_DELAY": "0.5",
"AWS_NO_SIGN_REQUEST": "YES",
}
# ---------------------------------------------------------------------------
# Internal S3 constants
# ---------------------------------------------------------------------------
CDSE_TOKEN_URL = (
"https://identity.dataspace.copernicus.eu/auth/realms/CDSE/"
"protocol/openid-connect/token"
)
OPENEO_URL = "openeo.dataspace.copernicus.eu"
S3_COLLECTION = "SENTINEL3_SYN_L2_SYN"
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent
# ---------------------------------------------------------------------------
# Credentials
# ---------------------------------------------------------------------------
def _cdse_credentials() -> dict[str, str | None]:
load_dotenv(WORKSPACE_ROOT / ".env")
return {
"username": os.getenv("CDSE_USER"),
"password": os.getenv("CDSE_PASSWORD"),
}
# ---------------------------------------------------------------------------
# Screening manifest helpers
# ---------------------------------------------------------------------------
def _load_screening_pass_sites(year: int) -> list[dict[str, Any]]:
"""Return list of PASS-site dicts from step-2 screening JSON.
Each entry has ``sitename``, ``lat``, ``lon`` keys.
"""
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not path.is_file():
raise FileNotFoundError(f"Step-2 screening manifest not found: {path}")
payload = json.loads(path.read_text(encoding="utf-8"))
sites = []
for row in payload.get("sites", []):
calc = row.get("calculations", {})
if calc.get("status") != "PASS":
continue
camera = row.get("response", {}).get("camera", {})
name = camera.get("Sitename")
lat = camera.get("Lat")
lon = camera.get("Lon")
if name and lat is not None and lon is not None:
sites.append({"sitename": str(name), "lat": float(lat), "lon": float(lon)})
return sites
# ---------------------------------------------------------------------------
# S2: geometry helpers (from s2_cloud_native.py)
# ---------------------------------------------------------------------------
def wkt_to_bbox(geometry_wkt: str) -> list[float]:
"""Convert a WKT geometry to a ``[west, south, east, north]`` bbox."""
geom = shapely_wkt.loads(geometry_wkt)
minx, miny, maxx, maxy = geom.bounds
if minx == maxx and miny == maxy:
minx -= _MIN_BBOX_HALF_DEG
maxx += _MIN_BBOX_HALF_DEG
miny -= _MIN_BBOX_HALF_DEG
maxy += _MIN_BBOX_HALF_DEG
return [minx, miny, maxx, maxy]
def _boa_offset(item: Any) -> int:
"""Return the BOA additive offset for a STAC item.
Processing baseline >= 04.00 applies a -1000 offset; earlier baselines use 0.
"""
if item.properties.get("earthsearch:boa_offset_applied"):
return 0
baseline_str = str(
item.properties.get("processing:baseline")
or item.properties.get("s2:processing_baseline")
or "0"
)
try:
baseline = float(baseline_str)
except ValueError:
baseline = 0.0
return -1000 if baseline >= 4.0 else 0
def _window_for_bbox(
src: rasterio.io.DatasetReader,
bbox_4326: list[float],
) -> Window | None:
"""Return the rasterio Window for a EPSG:4326 bbox clipped to src bounds."""
bbox_geom = {
"type": "Polygon",
"coordinates": [
[
[bbox_4326[0], bbox_4326[1]],
[bbox_4326[2], bbox_4326[1]],
[bbox_4326[2], bbox_4326[3]],
[bbox_4326[0], bbox_4326[3]],
[bbox_4326[0], bbox_4326[1]],
]
],
}
src_geom = transform_geom("EPSG:4326", src.crs.to_wkt(), bbox_geom)
xs = [c[0] for c in src_geom["coordinates"][0][:4]]
ys = [c[1] for c in src_geom["coordinates"][0][:4]]
win = window_from_bounds(min(xs), min(ys), max(xs), max(ys), src.transform)
try:
return win.intersection(Window(0, 0, src.width, src.height))
except WindowError:
return None
def _read_window(
href: str,
bbox_4326: list[float],
out_shape: tuple[int, int] | None = None,
resampling: Resampling = Resampling.bilinear,
) -> tuple[np.ndarray, dict[str, Any]] | None:
"""Range-read a single-band array for the bbox window from a COG URL."""
with rasterio.open(href) as src:
win = _window_for_bbox(src, bbox_4326)
if win is None:
return None
data = src.read(1, window=win, out_shape=out_shape, resampling=resampling)
profile: dict[str, Any] = {
"crs": src.crs,
"transform": window_transform(win, src.transform),
"height": data.shape[0],
"width": data.shape[1],
"dtype": src.dtypes[0],
}
return data, profile
def _read_bands(
item: Any,
bbox: list[float],
bands: list[str],
) -> tuple[list[np.ndarray], dict[str, Any]] | None:
"""Range-read all requested bands for one STAC item."""
band_arrays: list[np.ndarray] = []
ref_profile: dict[str, Any] | None = None
for band_name in bands:
asset_key = _BAND_ASSETS.get(band_name)
if asset_key is None or asset_key not in item.assets:
return None
ref_shape = (
(ref_profile["height"], ref_profile["width"]) if ref_profile else None
)
result = _read_window(item.assets[asset_key].href, bbox, out_shape=ref_shape)
if result is None:
return None
data, profile = result
if ref_profile is None:
ref_profile = profile
band_arrays.append(data.astype("float32"))
return (band_arrays, ref_profile) if ref_profile is not None else None
def _cloud_mask(item: Any, bbox: list[float], shape: tuple[int, int]) -> np.ndarray:
"""Return a boolean cloud/shadow mask from the item's SCL band.
Masks SCL classes 0 (no data), 3 (cloud shadow), and >7 (clouds, cirrus, snow).
"""
scl = item.assets.get(_SCL_ASSET)
result = (
_read_window(scl.href, bbox, out_shape=shape, resampling=Resampling.nearest)
if scl
else None
)
if result is None:
return np.zeros(shape, dtype=bool)
scl_data, _ = result
return (scl_data == 0) | (scl_data == 3) | (scl_data > 7)
def _pad_to_multiple(arr: np.ndarray, ratio: int) -> np.ndarray:
"""Zero-pad (bands, H, W) so H and W are multiples of ``ratio``."""
pad_h = (ratio - arr.shape[1] % ratio) % ratio
pad_w = (ratio - arr.shape[2] % ratio) % ratio
if pad_h or pad_w:
arr = np.pad(arr, ((0, 0), (0, pad_h), (0, pad_w)), constant_values=0)
return arr
# ---------------------------------------------------------------------------
# S2: STAC search + download (from s2_cloud_native.py)
# ---------------------------------------------------------------------------
def stac_search_s2(
bbox: list[float],
start_date: datetime,
end_date: datetime,
) -> list[Any]:
"""Search Earth Search for S2 L2A items intersecting a bbox."""
client = Client.open(EARTH_SEARCH_URL)
search = client.search(
collections=["sentinel-2-l2a"],
bbox=bbox,
datetime=(
f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/"
f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}"
),
max_items=10_000,
)
return list({item.id: item for item in search.items()}.values())
def _process_item(
item: Any,
bbox: list[float],
bands: list[str],
output_dir: Path,
ratio: int,
) -> str | None:
"""Range-read one S2 item and write a masked REFL GeoTIFF.
Returns a skip-message string when the item cannot be processed, else None.
"""
out_path = output_dir / f"{item.id}_REFL.tif"
if out_path.is_file():
return None
bands_result = _read_bands(item, bbox, bands)
if bands_result is None:
return f"[S2] Skipping {item.id}: missing asset or no bbox overlap"
band_arrays, ref_profile = bands_result
mask = _cloud_mask(item, bbox, (ref_profile["height"], ref_profile["width"]))
stacked = (np.stack(band_arrays) + _boa_offset(item)) / 10_000.0
np.clip(stacked, 0, None, out=stacked)
stacked[:, mask] = 0.0
stacked = _pad_to_multiple(stacked, ratio)
out_profile = {
"driver": "GTiff",
"count": len(bands),
"dtype": "float32",
"nodata": 0,
"crs": ref_profile["crs"],
"transform": ref_profile["transform"],
"height": stacked.shape[1],
"width": stacked.shape[2],
"compress": "lzw",
}
with rasterio.open(out_path, "w", **out_profile) as dst:
dst.write(stacked)
for i, band_name in enumerate(bands, 1):
dst.set_band_description(i, band_name)
return None
def download_s2_window(
items: list[Any],
bbox: list[float],
output_dir: Path,
bands: list[str],
ratio: int = RESOLUTION_RATIO,
max_workers: int = 12,
) -> None:
"""Range-read S2 L2A COG windows and write masked REFL GeoTIFFs.
Writes ``{item.id}_REFL.tif`` directly no intermediate raw download.
Cloud/shadow pixels (SCL 0, 3, >7) are zeroed. BOA offset is inferred from
``processing:baseline``. Output is zero-padded to multiples of ``ratio``.
Items are fetched in parallel using ``max_workers`` threads.
"""
output_dir.mkdir(parents=True, exist_ok=True)
with rasterio.Env(**_GDAL_COG_ENV):
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(
_process_item, item, bbox, bands, output_dir, ratio
): item.id
for item in items
}
with tqdm(
total=len(futures), unit="granule", desc="S2 COG window read"
) as pbar:
for fut in as_completed(futures):
msg = fut.result()
if msg:
tqdm.write(msg)
pbar.update(1)
# ---------------------------------------------------------------------------
# S3: download (from s3_openeo.py)
# ---------------------------------------------------------------------------
def _utm_epsg(bbox: list[float]) -> int:
"""Return the UTM EPSG code for the centre of a ``[W, S, E, N]`` bbox."""
lon = (bbox[0] + bbox[2]) / 2
lat = (bbox[1] + bbox[3]) / 2
zone = int((lon + 180) / 6) + 1
return 32600 + zone if lat >= 0 else 32700 + zone
def _cdse_token(username: str, password: str) -> str:
"""Obtain a CDSE bearer token via password grant."""
resp = requests.post(
CDSE_TOKEN_URL,
data={
"grant_type": "password",
"username": username,
"password": password,
"client_id": "cdse-public",
},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
def _netcdf_to_geotiffs(nc_path: Path, output_dir: Path, epsg: int) -> int:
"""Split an OpenEO NetCDF into per-date GeoTIFFs.
Output filenames match the ``S3*__YYYYMMDDTHHMMSS.tif`` pattern that
``s3_processing.produce_median_composite`` expects.
Handles half-pixel cell-centre coordinates, ascending y-axis (flip_y),
and fills NetCDF masked values with NaN.
"""
written = 0
with netCDF4.Dataset(str(nc_path), "r") as nc:
times = netCDF4.num2date(nc.variables["t"][:], nc.variables["t"].units)
x_coords = np.asarray(nc.variables["x"][:], dtype=float)
y_coords = np.asarray(nc.variables["y"][:], dtype=float)
half_x = abs(x_coords[1] - x_coords[0]) / 2 if len(x_coords) > 1 else 0.0
half_y = abs(y_coords[1] - y_coords[0]) / 2 if len(y_coords) > 1 else 0.0
transform = from_bounds(
x_coords.min() - half_x,
y_coords.min() - half_y,
x_coords.max() + half_x,
y_coords.max() + half_y,
len(x_coords),
len(y_coords),
)
flip_y = len(y_coords) > 1 and y_coords[0] < y_coords[-1]
date_counts: dict[str, int] = {}
for t_idx, time_val in enumerate(times):
date_str = time_val.strftime("%Y%m%d")
n = date_counts.get(date_str, 0)
date_counts[date_str] = n + 1
raw = np.stack([nc.variables[b][t_idx, :, :] for b in S3_BANDS], axis=0)
stacked = (
np.ma.filled(raw, fill_value=np.nan).astype("float32")
/ S3_REFLECTANCE_SCALE
)
if flip_y:
stacked = stacked[:, ::-1, :]
filename = f"S3_{date_str}_{n}__{date_str}T120000.tif"
with rasterio.open(
output_dir / filename,
"w",
driver="GTiff",
height=len(y_coords),
width=len(x_coords),
count=len(S3_BANDS),
dtype="float32",
nodata=float("nan"),
crs=f"EPSG:{epsg}",
transform=transform,
compress="lzw",
) as dst:
dst.write(stacked)
for i, band_name in enumerate(S3_BAND_NAMES, 1):
dst.set_band_description(i, band_name)
written += 1
return written
_S3_DOWNLOAD_RETRIES = 4
_S3_DOWNLOAD_BACKOFF = 30 # seconds; doubled on each retry
def _download_with_retry(datacube: Any, nc_path: Path) -> None:
"""Download an OpenEO datacube to *nc_path*, retrying on transient errors.
Retries up to ``_S3_DOWNLOAD_RETRIES`` times with exponential backoff
starting at ``_S3_DOWNLOAD_BACKOFF`` seconds. Re-authenticates on each
attempt so an expired token never blocks a retry.
"""
delay = _S3_DOWNLOAD_BACKOFF
last_exc: Exception | None = None
for attempt in range(1, _S3_DOWNLOAD_RETRIES + 1):
try:
if nc_path.exists():
nc_path.unlink()
datacube.download(str(nc_path), format="NetCDF")
return
except Exception as exc:
last_exc = exc
if attempt < _S3_DOWNLOAD_RETRIES:
print(
f"[S3-OEO] Download attempt {attempt} failed ({exc}); "
f"retrying in {delay}s..."
)
time.sleep(delay)
delay *= 2
else:
print(f"[S3-OEO] All {_S3_DOWNLOAD_RETRIES} download attempts failed")
raise RuntimeError(
f"S3 download failed after {_S3_DOWNLOAD_RETRIES} attempts"
) from last_exc
def download_s3_openeo(
start_date: datetime,
end_date: datetime,
aoi_geometry: str,
output_dir: Path,
credentials: dict[str, str | None],
) -> None:
"""Download S3 SYN L2 SDR for an AOI via CDSE OpenEO, server-side clipped.
Writes per-date ``S3_{YYYYMMDD}_{n}__{YYYYMMDD}T120000.tif`` files to
``output_dir``, ready for ``s3_processing.produce_median_composite``.
Skips if any ``S3*.tif`` files already exist.
"""
output_dir.mkdir(parents=True, exist_ok=True)
if any(output_dir.glob("S3*.tif")):
print("[S3-OEO] Skipping — output_dir already contains S3 GeoTIFFs")
return
bbox = wkt_to_bbox(aoi_geometry)
epsg = _utm_epsg(bbox)
spatial_extent = {
"west": bbox[0],
"east": bbox[2],
"south": bbox[1],
"north": bbox[3],
}
print("[S3-OEO] Authenticating with CDSE...")
token = _cdse_token(credentials["username"], credentials["password"]) # type: ignore[arg-type]
conn = openeo.connect(OPENEO_URL)
conn.authenticate_oidc_access_token(token)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
print(f"[S3-OEO] Loading {S3_COLLECTION} ({start_str}{end_str})...")
datacube = conn.load_collection(
S3_COLLECTION,
spatial_extent=spatial_extent,
temporal_extent=[start_str, end_str],
bands=S3_BANDS,
).resample_spatial(projection=epsg)
nc_path = output_dir / "_s3_syn_l2.nc"
print(f"[S3-OEO] Downloading NetCDF to {nc_path}...")
t0 = time.time()
_download_with_retry(datacube, nc_path)
print(f"[S3-OEO] Download completed in {time.time() - t0:.1f}s")
print("[S3-OEO] Splitting into per-date GeoTIFFs...")
written = _netcdf_to_geotiffs(nc_path, output_dir, epsg)
nc_path.unlink(missing_ok=True)
print(f"[S3-OEO] {written} GeoTIFFs written to {output_dir}")
# ---------------------------------------------------------------------------
# S2: distance_to_clouds helper
# ---------------------------------------------------------------------------
def _import_distance_to_clouds():
try:
from efast.s2_processing import distance_to_clouds
return distance_to_clouds
except ImportError as exc:
raise ImportError("efast not found. Install with: uv sync") from exc
def _normalize_s2_grid(s2_dir: Path) -> None:
"""Remove REFL (and companion DIST_CLOUD/GCC) files that don't match the dominant shape.
Sites at MGRS tile boundaries can have S2 downloads from two tiles with different
spatial extents. For example, a site at the top edge of tile 16SDA will produce
narrow (e.g. 30×180) REFL files from 16SDA and full-extent (180×180) files from
16SDB. EFAST requires all S2 files to share the same grid, so we keep only the
shape with the most pixels (largest tile coverage) and discard the rest together
with any already-computed companions.
"""
refl_files = sorted(s2_dir.glob("*_REFL.tif"))
if not refl_files:
return
shape_to_files: dict[tuple[int, int], list[Path]] = {}
for f in refl_files:
with rasterio.open(f) as src:
shape: tuple[int, int] = src.shape # type: ignore[assignment]
shape_to_files.setdefault(shape, []).append(f)
if len(shape_to_files) <= 1:
return
ref_shape = max(shape_to_files.keys(), key=lambda s: s[0] * s[1])
shapes_summary = ", ".join(
f"{s[0]}×{s[1]}({len(fs)})" for s, fs in shape_to_files.items()
)
n_removed = 0
for shape, files in shape_to_files.items():
if shape == ref_shape:
continue
for refl_path in files:
stem = refl_path.stem[: -len("_REFL")] # e.g. "S2A_16SDA_20250106_0_L2A"
for companion in s2_dir.glob(f"{stem}_*.tif"):
companion.unlink()
refl_path.unlink(missing_ok=True)
n_removed += 1
print(
f"[S2-NORM] Tile boundary — shapes: {shapes_summary}; "
f"removed {n_removed} file-sets, kept {ref_shape[0]}×{ref_shape[1]}"
)
def _rescale_dist_cloud(s2_dir: Path) -> None:
"""Ensure DIST_CLOUD values are in pixel units (not normalised to [0,1])."""
for dc_path in s2_dir.glob("*DIST_CLOUD.tif"):
with rasterio.open(dc_path) as src:
d = src.read(1)
if float(np.nanmax(d)) <= 1:
with rasterio.open(dc_path, "r+") as dst:
dst.write(np.where(d > 0, 2.0, d).astype(np.float32), 1)
# ---------------------------------------------------------------------------
# S3: compositing + reprojection helpers (from 4-sentinel-data.py)
# ---------------------------------------------------------------------------
def _import_s3_processing():
try:
from efast import s3_processing
return s3_processing
except ImportError as exc:
raise ImportError("efast not found. Install with: uv sync") from exc
def _reproject_s3_composites_to_s2_grid(
composite_dir: Path,
s2_refl_path: Path,
s3_out_dir: Path,
*,
resolution_ratio: int = RESOLUTION_RATIO,
) -> None:
"""Reproject S3 composites to the S2 spatial grid at LR resolution."""
s3_out_dir.mkdir(parents=True, exist_ok=True)
with rasterio.open(s2_refl_path) as s2_ref:
target_bounds = s2_ref.bounds
target_crs = s2_ref.crs
width = s2_ref.width // resolution_ratio
height = s2_ref.height // resolution_ratio
s3_transform = rasterio.transform.from_bounds(
target_bounds.left,
target_bounds.bottom,
target_bounds.right,
target_bounds.top,
width,
height,
)
for sen3_path in sorted(composite_dir.glob("composite_*.tif")):
date_part = sen3_path.stem.split("_", 1)[1].replace("-", "")
outfile = s3_out_dir / f"composite_{date_part}.tif"
vrt_options = {
"transform": s3_transform,
"height": height,
"width": width,
"crs": target_crs,
"resampling": Resampling.cubic,
}
with rasterio.open(sen3_path) as s3_src:
with WarpedVRT(s3_src, **vrt_options) as vrt:
profile = vrt.profile.copy()
profile.update({"dtype": "float32", "nodata": 0, "driver": "GTiff"})
rio_shutil.copy(vrt, outfile, **profile)
def _s3_reflectance_scale(raw_s3_dir: Path) -> float:
"""Return multiplier that maps raw SYN L2 SDR values to 01 reflectance."""
for path in raw_s3_dir.glob("S3*.tif"):
with rasterio.open(path) as src:
data = src.read()
if not np.any(np.isfinite(data)):
continue
mx = float(np.nanmax(data))
if np.isfinite(mx) and mx > 5:
return 1.0 / S3_REFLECTANCE_SCALE
return 1.0
def _stage_s3_for_efast(raw_s3_dir: Path, staging_dir: Path) -> int:
"""Copy ``S3_*.tif`` inputs, scaling reflectance when still in DN form."""
scale = _s3_reflectance_scale(raw_s3_dir)
if staging_dir.exists():
shutil.rmtree(staging_dir)
staging_dir.mkdir(parents=True)
count = 0
for src_path in sorted(raw_s3_dir.glob("S3*.tif")):
dst_path = staging_dir / src_path.name
with rasterio.open(src_path) as src:
data = src.read().astype("float32") * scale
profile = src.profile.copy()
profile.update(dtype="float32")
descriptions = src.descriptions
with rasterio.open(dst_path, "w", **profile) as dst:
dst.write(data)
for i, desc in enumerate(descriptions, 1):
if desc:
dst.set_band_description(i, desc)
count += 1
if scale != 1.0:
print(f"[S3-PREP] Scaled raw SDR by {scale:g} for EFAST compositing")
return count
def _prepare_s3(
raw_s3_dir: Path,
s2_refl_path: Path,
s3_out_dir: Path,
*,
work_dir: Path | None = None,
) -> None:
"""Run EFAST S3 compositing pipeline and reproject to S2 grid."""
s3 = _import_s3_processing()
base = work_dir or (s3_out_dir / "_efast_work")
staging = base / "scaled"
composites = base / "composites"
blurred = base / "blurred"
calibrated = base / "calibrated"
for directory in (staging, composites, blurred, calibrated):
if directory.exists():
shutil.rmtree(directory)
directory.mkdir(parents=True, exist_ok=True)
staged = _stage_s3_for_efast(raw_s3_dir, staging)
if staged == 0:
raise ValueError(f"No S3*.tif files found in {raw_s3_dir}")
print(
f"[S3-PREP] produce_median_composite: mosaic_days={S3_MOSAIC_DAYS}, "
f"step={S3_COMPOSITE_STEP}, sigma_doy={S3_COMPOSITE_SIGMA_DOY}, "
f"D={S3_COMPOSITE_D}"
)
s3.produce_median_composite(
staging,
composites,
step=S3_COMPOSITE_STEP,
mosaic_days=S3_MOSAIC_DAYS,
s3_bands=[1, 2, 3, 4],
D=S3_COMPOSITE_D,
sigma_doy=S3_COMPOSITE_SIGMA_DOY,
)
s3.smoothing(
composites,
blurred,
product="composite",
std=S3_SMOOTHING_STD,
preserve_nan=False,
)
s3.reformat_s3(blurred, calibrated, product="composite", scaling_factor=1)
for old in s3_out_dir.glob("composite_*.tif"):
old.unlink()
_reproject_s3_composites_to_s2_grid(calibrated, s2_refl_path, s3_out_dir)
if work_dir is None and base.exists():
shutil.rmtree(base)
n_out = len(list(s3_out_dir.glob("composite_*.tif")))
print(f"[S3-PREP] Wrote {n_out} composites")
# ---------------------------------------------------------------------------
# Per-site pipeline
# ---------------------------------------------------------------------------
def process_site(
sitename: str,
lat: float,
lon: float,
year: int,
) -> dict[str, Any]:
"""Download S2 + S3 and run EFAST preparation for one site."""
site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename
s2_out = site_dir / "prepared" / "s2"
s3_raw = site_dir / "raw" / "s3"
s3_out = site_dir / "prepared" / "s3"
aoi_wkt = f"POINT ({lon} {lat})"
bbox = wkt_to_bbox(aoi_wkt)
creds = _cdse_credentials()
# S3 download
print(f"[{sitename}] Downloading S3...")
download_s3_openeo(
start_date=datetime(year, 1, 1),
end_date=datetime(year, 12, 31),
aoi_geometry=aoi_wkt,
output_dir=s3_raw,
credentials=creds,
)
# S2 download
print(f"[{sitename}] Searching S2 on Earth Search...")
items = stac_search_s2(bbox, datetime(year, 1, 1), datetime(year, 12, 31))
print(f"[{sitename}] {len(items)} S2 items found — downloading windows...")
download_s2_window(items, bbox, s2_out, S2_BANDS, RESOLUTION_RATIO)
_normalize_s2_grid(s2_out)
# S2 distance-to-clouds
print(f"[{sitename}] Computing distance-to-clouds...")
distance_to_clouds = _import_distance_to_clouds()
distance_to_clouds(s2_out, ratio=RESOLUTION_RATIO)
_rescale_dist_cloud(s2_out)
# S3 compositing
s2_refl_path = next(iter(s2_out.glob("*_REFL.tif")), None)
if s2_refl_path is None:
raise ValueError(f"No REFL files in {s2_out} — S2 download may have failed")
s3_out.mkdir(parents=True, exist_ok=True)
print(f"[{sitename}] Running S3 compositing pipeline...")
_prepare_s3(s3_raw, s2_refl_path, s3_out)
summary = {
"sitename": sitename,
"evaluation_year": year,
"lat": lat,
"lon": lon,
"s2_refl_count": len(list(s2_out.glob("*_REFL.tif"))),
"s2_dist_cloud_count": len(list(s2_out.glob("*_DIST_CLOUD.tif"))),
"s3_raw_count": len(list(s3_raw.glob("S3*.tif"))),
"s3_composite_count": len(list(s3_out.glob("composite_*.tif"))),
}
site_dir.mkdir(parents=True, exist_ok=True)
(site_dir / "data.json").write_text(
json.dumps(summary, indent=2) + "\n", encoding="utf-8"
)
return summary
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to process (default: all step-2 PASS sites)",
)
parser.add_argument(
"--skip-downloaded",
action="store_true",
help="Skip sites whose directory already exists under data/sentinel_data/{year}/",
)
args = parser.parse_args(argv)
year = args.evaluation_year
pass_sites = _load_screening_pass_sites(year)
if not pass_sites:
print("[Sentinel data] No PASS sites found in step-2 screening output")
return 1
if args.site:
pass_sites = [s for s in pass_sites if s["sitename"] == args.site]
if not pass_sites:
print(f"[Sentinel data] Site '{args.site}' not found in step-2 PASS sites")
return 1
print(f"[Sentinel data] Processing {len(pass_sites)} site(s)")
for i, site in enumerate(pass_sites, 1):
sitename = site["sitename"]
site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename
if args.skip_downloaded and site_dir.exists():
print(
f"[Sentinel data] ({i}/{len(pass_sites)}) {sitename} — skipping (directory exists)"
)
continue
print(f"[Sentinel data] ({i}/{len(pass_sites)}) {sitename}")
summary = process_site(sitename, site["lat"], site["lon"], year)
print(
f"[Sentinel data] {sitename} done — "
f"{summary['s2_refl_count']} REFL, "
f"{summary['s3_composite_count']} composites"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

349
4-fusion.py Normal file
View file

@ -0,0 +1,349 @@
"""Step 4: Compute GCC and run EFAST BtI + ItB fusion for prepared sites.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` ``*_REFL.tif`` + ``*_DIST_CLOUD.tif``
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` ``composite_*.tif`` (4-band)
Outputs (``data/``):
- ``sentinel_data/{year}/{sitename}/prepared/s2/*_GCC.tif`` S2 GCC (in-place)
- ``sentinel_data/{year}/{sitename}/prepared/gcc_s3/*.tif`` S3 GCC composites
- ``fusion/{year}/{sitename}/bti/fusion/REFL_*.tif`` BtI fused 4-band reflectance
- ``fusion/{year}/{sitename}/bti/gcc/GCC_*.tif`` GCC derived from BtI fusion
- ``fusion/{year}/{sitename}/itb/s2/GCC_*.tif`` per-acquisition S2 GCC (simplified names)
- ``fusion/{year}/{sitename}/itb/s3/GCC_*.tif`` per-composite S3 GCC (simplified names)
- ``fusion/{year}/{sitename}/itb/fusion/GCC_*.tif`` ItB fused GCC
Requires ``uv sync`` (efast).
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` (optional; default: all prepared sites under ``sentinel_data/{year}/``)
Prior step: :mod:`3-sentinel-data`.
"""
from __future__ import annotations
import argparse
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import numpy as np
import rasterio
from dateutil import rrule
# ---------------------------------------------------------------------------
# Public constants
# ---------------------------------------------------------------------------
RESOLUTION_RATIO = 30
MOSAIC_STEP = 2
MAX_DAYS = 100
MINIMUM_ACQUISITION_IMPORTANCE = 0
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# ---------------------------------------------------------------------------
# efast import helper
# ---------------------------------------------------------------------------
def _import_efast():
try:
import efast.efast as efast_module
return efast_module
except ImportError as exc:
raise ImportError("efast not found. Install with: uv sync") from exc
# ---------------------------------------------------------------------------
# GCC computation (from s2_cloud_native.py and s3_openeo.py)
# ---------------------------------------------------------------------------
def compute_gcc_s2(s2_dir: Path, output_dir: Path) -> None:
"""Compute GCC from S2 REFL files and write ``*_GCC.tif`` to ``output_dir``.
Reads every ``*_REFL.tif`` (band order B02/B03/B04) and writes a co-located
single-band GCC file. Cloud-masked pixels (zero in all bands) remain zero.
"""
output_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(s2_dir.glob("*_REFL.tif")):
out_path = output_dir / src_path.name.replace("_REFL.tif", "_GCC.tif")
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
gcc = g / (total + 1e-10)
gcc[total == 0] = 0
profile.update(count=1)
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
def compute_gcc_s3(s3_dir: Path, output_dir: Path) -> None:
"""Compute GCC from S3 composite files and write single-band GeoTIFFs.
Reads every ``composite_*.tif`` (band order Oa04/Oa06/Oa08/Oa17) and writes
a single-band GCC file. NaN pixels in the input remain NaN.
"""
output_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(s3_dir.glob("composite_*.tif")):
out_path = output_dir / src_path.name
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
gcc = g / (total + 1e-10)
gcc[np.isnan(total)] = np.nan
profile.update(count=1, dtype="float32")
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
def compute_gcc_from_refl(refl_dir: Path, gcc_dir: Path) -> None:
"""Derive GCC from ``REFL_YYYYMMDD.tif`` files (BtI fusion output).
Reads every ``REFL_*.tif`` and writes a co-located single-band
``GCC_YYYYMMDD.tif``. Pixels where any reflectance band is negative (which
can arise from EFAST's temporal correction) are written as NaN; the GCC
ratio is undefined there because the denominator B+G+R is near-zero or
negative due to band cancellation. All-zero pixels (cloud mask) remain
zero.
"""
gcc_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(refl_dir.glob("REFL_*.tif")):
out_path = gcc_dir / src_path.name.replace("REFL_", "GCC_")
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
invalid = (b < 0) | (g < 0) | (r < 0)
gcc = np.where(invalid, np.nan, g / (total + 1e-10))
gcc[total == 0] = np.nan
profile.update(count=1, dtype="float32")
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
# ---------------------------------------------------------------------------
# Date-range detection
# ---------------------------------------------------------------------------
def _refl_date_range(s2_dir: Path) -> tuple[datetime, datetime] | None:
"""Return (start, end) datetime from REFL filenames in ``s2_dir``.
Filenames are expected to follow the S2 product naming convention, where
the acquisition date ``YYYYMMDD`` appears at position index 2 when the
stem is split by ``_``, e.g.
``S2A_MSIL2A_20230911T114111_N0509_R025_T29PKT_20230911T153131_REFL.tif``.
"""
dates: list[datetime] = []
for p in s2_dir.glob("*_REFL.tif"):
parts = p.stem.split("_")
if len(parts) >= 3:
try:
dates.append(datetime.strptime(parts[2][:8], "%Y%m%d"))
except ValueError:
pass
if not dates:
return None
return min(dates), max(dates)
# ---------------------------------------------------------------------------
# Per-site fusion
# ---------------------------------------------------------------------------
def fuse_site(sitename: str, year: int) -> dict[str, Any]:
"""Run GCC computation and EFAST BtI + ItB fusion for one prepared site."""
efast = _import_efast()
s2_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s2"
s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3"
gcc_s3_dir = (
DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "gcc_s3"
)
base = DATA_DIR / "fusion" / str(year) / sitename
if not s2_dir.is_dir() or not any(s2_dir.glob("*_REFL.tif")):
raise FileNotFoundError(f"No REFL files in {s2_dir}")
if not s3_dir.is_dir() or not any(s3_dir.glob("composite_*.tif")):
raise FileNotFoundError(f"No composite files in {s3_dir}")
print(f"[{sitename}] Computing S2 GCC (in-place)...")
compute_gcc_s2(s2_dir, s2_dir)
print(f"[{sitename}] Computing S3 GCC...")
compute_gcc_s3(s3_dir, gcc_s3_dir)
date_range = _refl_date_range(s2_dir)
if date_range is None:
raise ValueError(f"Could not detect date range from REFL filenames in {s2_dir}")
start, end = date_range
print(f"[{sitename}] Date range: {start.date()}{end.date()}")
fusion_dates = list(
rrule.rrule(
rrule.DAILY,
dtstart=start + timedelta(MOSAIC_STEP),
until=end - timedelta(MOSAIC_STEP),
interval=MOSAIC_STEP,
)
)
_fusion_kwargs = dict(
ratio=RESOLUTION_RATIO,
max_days=MAX_DAYS,
minimum_acquisition_importance=MINIMUM_ACQUISITION_IMPORTANCE,
)
# --- ItB: GCC first, then fuse GCC ---
itb_s2 = base / "itb" / "s2"
itb_s3 = base / "itb" / "s3"
itb_fusion = base / "itb" / "fusion"
itb_s2.mkdir(parents=True, exist_ok=True)
itb_s3.mkdir(parents=True, exist_ok=True)
itb_fusion.mkdir(parents=True, exist_ok=True)
for p in sorted(s2_dir.glob("*_GCC.tif")):
dst = itb_s2 / f"GCC_{p.stem.split('_')[2][:8]}.tif"
if not dst.exists():
shutil.copy2(p, dst)
for p in sorted(gcc_s3_dir.glob("composite_*.tif")):
dst = itb_s3 / f"GCC_{p.stem.split('_')[1]}.tif"
if not dst.exists():
shutil.copy2(p, dst)
print(f"[{sitename}] ItB: fusing GCC over {len(fusion_dates)} dates...")
for date in fusion_dates:
efast.fusion(
date, gcc_s3_dir, s2_dir, itb_fusion, product="GCC", **_fusion_kwargs
)
# --- BtI: fuse reflectance (3-band, matching S2 B02/B03/B04), then derive GCC ---
# S3 composites have 4 bands; strip band 4 (Oa17/NIR) so shapes match S2 REFL.
s3_rgb_dir = (
DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3_rgb"
)
s3_rgb_dir.mkdir(parents=True, exist_ok=True)
for p in sorted(s3_dir.glob("composite_*.tif")):
out = s3_rgb_dir / p.name
if not out.exists():
with rasterio.open(p) as src:
data = src.read([1, 2, 3])
profile = src.profile.copy()
profile.update(count=3)
with rasterio.open(out, "w", **profile) as dst:
dst.write(data)
bti_fusion = base / "bti" / "fusion"
bti_gcc = base / "bti" / "gcc"
bti_fusion.mkdir(parents=True, exist_ok=True)
print(f"[{sitename}] BtI: fusing REFL over {len(fusion_dates)} dates...")
for date in fusion_dates:
efast.fusion(
date, s3_rgb_dir, s2_dir, bti_fusion, product="REFL", **_fusion_kwargs
)
print(f"[{sitename}] BtI: deriving GCC from fused REFL...")
compute_gcc_from_refl(bti_fusion, bti_gcc)
return {
"sitename": sitename,
"evaluation_year": year,
"start": start.date().isoformat(),
"end": end.date().isoformat(),
"fusion_dates": len(fusion_dates),
"itb_fusion_files": len(list(itb_fusion.glob("*.tif"))),
"bti_fusion_files": len(list(bti_fusion.glob("*.tif"))),
"bti_gcc_files": len(list(bti_gcc.glob("*.tif"))),
}
# ---------------------------------------------------------------------------
# Site discovery
# ---------------------------------------------------------------------------
def _discover_sites(year: int) -> list[str]:
"""Return sitenames that have prepared S2 REFL files under sentinel_data."""
base = DATA_DIR / "sentinel_data" / str(year)
if not base.is_dir():
return []
return sorted(
d.name
for d in base.iterdir()
if d.is_dir() and any((d / "prepared" / "s2").glob("*_REFL.tif"))
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to fuse (default: all prepared sites)",
)
parser.add_argument(
"--skip-blended",
action="store_true",
help="Skip sites whose fusion directory already exists under data/fusion/{year}/",
)
args = parser.parse_args(argv)
year = args.evaluation_year
if args.site:
sites = [args.site]
else:
sites = _discover_sites(year)
if not sites:
print(f"[Fusion] No prepared sites found under data/sentinel_data/{year}/")
return 1
print(f"[Fusion] Processing {len(sites)} site(s)")
for i, sitename in enumerate(sites, 1):
fusion_dir = DATA_DIR / "fusion" / str(year) / sitename
if args.skip_blended and fusion_dir.exists():
print(
f"[Fusion] ({i}/{len(sites)}) {sitename} — skipping (fusion directory exists)"
)
continue
print(f"[Fusion] ({i}/{len(sites)}) {sitename}")
summary = fuse_site(sitename, year)
print(
f"[Fusion] {sitename} done — "
f"{summary['fusion_dates']} dates, "
f"itb={summary['itb_fusion_files']} bti={summary['bti_fusion_files']} "
f"bti_gcc={summary['bti_gcc_files']}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

788
5-metrics.py Normal file
View file

@ -0,0 +1,788 @@
"""Step 5: Pre-compute per-site GCC timeseries + raster index for the webapp.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam_screening/{year}.json`` qualifying sites + metadata
- ``phenocam/{year}/{site}_1day.csv`` daily GCC timeseries
- ``sentinel_data/{year}/{site}/prepared/s2/*_GCC.tif`` S2 GCC rasters
- ``sentinel_data/{year}/{site}/prepared/gcc_s3/composite_*.tif`` S3 GCC rasters
- ``fusion/{year}/{site}/bti/gcc/GCC_*.tif`` BtI GCC rasters
- ``fusion/{year}/{site}/itb/fusion/GCC_*.tif`` ItB GCC rasters
Outputs (``data/metrics/``):
- ``manifest.json`` years + per-site metadata
- ``{year}/{site}/gcc_phenocam.json`` PhenoCam ``gcc_90`` at matched dates
- ``{year}/{site}/gcc_s2.json`` S2 GCC (center pixel, cloud-free scenes)
- ``{year}/{site}/gcc_s2_whittaker.json`` Whittaker-smoothed S2 GCC
- ``{year}/{site}/gcc_s3.json`` S3 composite GCC
- ``{year}/{site}/gcc_s3_smooth.json`` S3 5-day moving average
- ``{year}/{site}/gcc_fusion_bti.json`` BtI fused GCC
- ``{year}/{site}/gcc_fusion_itb.json`` ItB fused GCC
- ``{year}/{site}/phenocam_images.json`` midday photo URLs for the viewer
- ``{year}/{site}/rasters_s2_refl.json`` S2 REFL paths (BtI view)
- ``{year}/{site}/rasters_s3_composite.json`` S3 composite paths (BtI view)
- ``{year}/{site}/rasters_s2_gcc.json`` S2 GCC paths (ItB view)
- ``{year}/{site}/rasters_s3_gcc.json`` S3 GCC paths (ItB view)
- ``{year}/{site}/rasters_fusion_bti_refl.json`` BtI fused REFL paths
- ``{year}/{site}/rasters_fusion_itb_gcc.json`` ItB fused GCC paths
- ``{year}/{site}/metrics.json`` NSE, RMSE, nRMSE, Pearson r vs PhenoCam per series
- ``{year}/{site}/bands_s2.json`` S2 center-pixel reflectance (B02, B03, B04) per scene
- ``{year}/{site}/bands_s3.json`` S3 center-pixel reflectance (Oa04, Oa06, Oa08, Oa17) per composite
- ``{year}/{site}/covariates.json`` spatial CV/std, S2/S3 counts, gap stats
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` (optional; default: all qualifying sites with sentinel data)
"""
from __future__ import annotations
import argparse
import csv
import json
import re
from pathlib import Path
from typing import Any
import datetime
import numpy as np
import rasterio
from rasterio.crs import CRS
from rasterio.transform import rowcol
from pyproj import Transformer
from scipy.stats import pearsonr
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# GCC smoothing window for S3 moving average (days)
S3_SMOOTH_WINDOW = 5
# Whittaker lambda (penalised smoothing strength for S2)
WHITTAKER_LAMBDA = 400.0
# Half-width in metres for the spatial heterogeneity footprint (~300 m = 1 S3 pixel)
SPATIAL_CV_HALF_M = 150
# PhenoCam archive image URL pattern
PHENOCAM_IMAGE_URL = (
"https://phenocam.nau.edu/data/archive/{site}/{year}/{month}/{filename}"
)
# ---------------------------------------------------------------------------
# Helpers: raster pixel extraction
# ---------------------------------------------------------------------------
def _window_mean(data: np.ndarray) -> float | None:
"""Mean of a pixel window; ``None`` when every value is NaN."""
valid = data[~np.isnan(data)]
if valid.size == 0:
return None
return float(np.mean(valid))
def _read_center_pixel(path: Path, lat: float, lon: float) -> float | None:
"""Return the 3×3 mean GCC value at (lat, lon) from a single-band raster.
Returns ``None`` when the pixel is masked/zero/NaN.
"""
try:
with rasterio.open(path) as src:
transformer = Transformer.from_crs(
CRS.from_epsg(4326), src.crs, always_xy=True
)
x, y = transformer.transform(lon, lat)
row, col = rowcol(src.transform, x, y)
h, w = src.height, src.width
r0, r1 = max(0, row - 1), min(h, row + 2)
c0, c1 = max(0, col - 1), min(w, col + 2)
window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0)
data = src.read(1, window=window).astype(float)
nodata = src.nodata
if nodata is not None:
data = np.where(data == nodata, np.nan, data)
data[data == 0] = np.nan
return _window_mean(data)
except Exception:
return None
# ---------------------------------------------------------------------------
# Helpers: date extraction from filenames
# ---------------------------------------------------------------------------
def _date_from_gcc_tif(path: Path) -> str | None:
"""Extract YYYYMMDD from ``GCC_YYYYMMDD.tif`` or ``composite_YYYYMMDD.tif``."""
m = re.search(r"(\d{8})", path.stem)
return m.group(1) if m else None
def _date_from_s2_tif(path: Path) -> str | None:
"""Extract YYYYMMDD from S2 product name ``S2X_TTTT_YYYYMMDD_…``."""
parts = path.stem.split("_")
if len(parts) >= 3:
m = re.match(r"(\d{8})", parts[2])
return m.group(1) if m else None
return None
# ---------------------------------------------------------------------------
# Helpers: Whittaker smoother (2nd-order differences, tridiagonal solver)
# ---------------------------------------------------------------------------
def _whittaker_smooth(
values: list[float | None], lam: float = WHITTAKER_LAMBDA
) -> list[float | None]:
"""Penalised least-squares smoother (Whittaker, 2nd-order differences).
Masked (None) values are filled via the smooth and then re-set to None in
the output so the caller can distinguish observed from gap-filled points.
"""
n = len(values)
if n < 4:
return values[:]
obs_mask = [v is not None for v in values]
y = np.array([v if v is not None else 0.0 for v in values], dtype=float)
w = np.array([1.0 if m else 0.0 for m in obs_mask], dtype=float)
W = np.diag(w)
D = np.diff(np.eye(n), n=2, axis=0) # (n-2) x n second-difference matrix
A = W + lam * D.T @ D
try:
z = np.linalg.solve(A, w * y)
except np.linalg.LinAlgError:
return values[:]
result: list[float | None] = []
for i, m in enumerate(obs_mask):
result.append(float(z[i]) if m else None)
return result
# ---------------------------------------------------------------------------
# Helpers: PhenoCam CSV parsing
# ---------------------------------------------------------------------------
def _parse_phenocam_csv(
csv_path: Path, year: int, site: str
) -> tuple[list[dict], list[dict]]:
"""Return (gcc_series, image_list) filtered to ``year``.
``gcc_series`` entries: ``{"date": "YYYY-MM-DD", "gcc_90": float}``
``image_list`` entries: ``{"date": "YYYY-MM-DD", "url": str}``
"""
gcc_series: list[dict] = []
image_list: list[dict] = []
year_str = str(year)
if not csv_path.is_file():
return gcc_series, image_list
with csv_path.open() as f:
lines = [line for line in f if not line.startswith("#")]
reader = csv.DictReader(lines)
for row in reader:
if row.get("year") != year_str:
continue
date = row.get("date", "")
gcc_raw = row.get("gcc_90")
if gcc_raw and gcc_raw not in ("NA", ""):
try:
gcc_series.append({"date": date, "gcc_90": float(gcc_raw)})
except ValueError:
pass
fn = row.get("midday_filename", "").strip()
if fn and fn != "NA" and date:
month = date[5:7]
url = PHENOCAM_IMAGE_URL.format(
site=site, year=year_str, month=month, filename=fn
)
image_list.append({"date": date, "url": url})
return gcc_series, image_list
# ---------------------------------------------------------------------------
# Helpers: moving average
# ---------------------------------------------------------------------------
def _moving_average(series: list[dict], value_key: str, window: int) -> list[dict]:
"""Compute centred moving average; returns new list with ``_smooth`` suffix key."""
if not series:
return []
vals = [p[value_key] for p in series]
half = window // 2
smoothed = []
for i, pt in enumerate(series):
chunk = [v for v in vals[max(0, i - half) : i + half + 1] if v is not None]
smoothed.append(
{
"date": pt["date"],
value_key + "_smooth": (sum(chunk) / len(chunk)) if chunk else None,
}
)
return smoothed
# ---------------------------------------------------------------------------
# Helpers: validation metrics
# ---------------------------------------------------------------------------
MATCH_TOLERANCE_DAYS = 5
def compute_metrics(
ref: list[dict],
ref_key: str,
pred: list[dict],
pred_key: str,
) -> dict | None:
"""Compute NSE, RMSE, nRMSE, Pearson r between pred and ref.
Each pred point is matched to the nearest ref date within
``MATCH_TOLERANCE_DAYS``. Returns a dict or ``None`` if fewer than
2 matched pairs exist.
"""
ref_lookup: dict[str, float] = {
p["date"]: p[ref_key] for p in ref if p.get(ref_key) is not None
}
if not ref_lookup:
return None
ref_dates = sorted(ref_lookup)
obs, sim = [], []
for pt in pred:
v = pt.get(pred_key)
if v is None:
continue
nearest = min(
ref_dates,
key=lambda d: abs(
(np.datetime64(pt["date"]) - np.datetime64(d)) / np.timedelta64(1, "D")
),
)
gap = abs(
(np.datetime64(pt["date"]) - np.datetime64(nearest))
/ np.timedelta64(1, "D")
)
if gap <= MATCH_TOLERANCE_DAYS and nearest in ref_lookup:
obs.append(ref_lookup[nearest])
sim.append(v)
if len(obs) < 2:
return None
obs_arr = np.array(obs)
sim_arr = np.array(sim)
obs_mean = obs_arr.mean()
rmse = float(np.sqrt(np.mean((sim_arr - obs_arr) ** 2)))
nrmse = rmse / obs_mean if obs_mean else None
ss_res = float(np.sum((obs_arr - sim_arr) ** 2))
ss_tot = float(np.sum((obs_arr - obs_mean) ** 2))
nse = (1.0 - ss_res / ss_tot) if ss_tot else None
r, _ = pearsonr(obs_arr, sim_arr)
def _r4(v: float | None) -> float | None:
return round(v, 4) if v is not None else None
return {
"n": len(obs),
"rmse": _r4(rmse),
"nrmse": _r4(nrmse),
"nse": _r4(nse),
"r": _r4(float(r)),
}
S2_BAND_NAMES = ["B02", "B03", "B04"]
S3_BAND_NAMES = ["Oa04", "Oa06", "Oa08", "Oa17"]
def _read_multiband_center(
path: Path, lat: float, lon: float, band_names: list[str]
) -> dict[str, float | None]:
"""Return 3×3 mean per band at (lat, lon). Keys are ``band_names``, values float or None."""
try:
with rasterio.open(path) as src:
transformer = Transformer.from_crs(
CRS.from_epsg(4326), src.crs, always_xy=True
)
x, y = transformer.transform(lon, lat)
row, col = rowcol(src.transform, x, y)
h, w = src.height, src.width
r0, r1 = max(0, row - 1), min(h, row + 2)
c0, c1 = max(0, col - 1), min(w, col + 2)
window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0)
nodata = src.nodata
result = {}
for i, name in enumerate(band_names, 1):
if i > src.count:
result[name] = None
continue
data = src.read(i, window=window).astype(float)
if nodata is not None:
data = np.where(data == nodata, np.nan, data)
data[data == 0] = np.nan
val = _window_mean(data)
result[name] = None if val is None else round(val, 6)
return result
except Exception:
return {name: None for name in band_names}
def _multiband_series(
tif_paths: list[Path],
date_fn,
lat: float,
lon: float,
band_names: list[str],
desc: str,
) -> list[dict]:
"""Extract center-pixel values for all bands; return ``[{date, band1, band2, …}]``."""
result = []
for p in tqdm(tif_paths, desc=desc, leave=False):
date = date_fn(p)
if date is None:
continue
vals = _read_multiband_center(p, lat, lon, band_names)
if any(v is not None for v in vals.values()):
result.append({"date": f"{date[:4]}-{date[4:6]}-{date[6:]}", **vals})
return sorted(result, key=lambda x: x["date"])
# ---------------------------------------------------------------------------
# Helpers: spatial heterogeneity + observation density
# ---------------------------------------------------------------------------
def _read_footprint_stats(
path: Path, lat: float, lon: float, half_m: float = SPATIAL_CV_HALF_M
) -> tuple[float, float] | tuple[None, None]:
"""Return (mean, std) of valid GCC pixels within a ±half_m metre square window.
Returns ``(None, None)`` on any error or when fewer than 4 valid pixels exist.
"""
try:
with rasterio.open(path) as src:
transformer = Transformer.from_crs(
CRS.from_epsg(4326), src.crs, always_xy=True
)
x, y = transformer.transform(lon, lat)
res = abs(src.transform.a) # pixel size in CRS units (metres for UTM)
half_px = max(1, int(round(half_m / res)))
row, col = rowcol(src.transform, x, y)
h, w = src.height, src.width
r0, r1 = max(0, row - half_px), min(h, row + half_px + 1)
c0, c1 = max(0, col - half_px), min(w, col + half_px + 1)
window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0)
data = src.read(1, window=window).astype(float)
nodata = src.nodata
if nodata is not None:
data = np.where(data == nodata, np.nan, data)
data[data <= 0] = np.nan
valid = data[~np.isnan(data)]
if len(valid) < 4:
return None, None
return float(np.mean(valid)), float(np.std(valid))
except Exception:
return None, None
def compute_covariates(
s2_gcc_paths: list[Path],
s2_series: list[dict],
s3_series: list[dict],
n_gcc_points: int | None,
lat: float,
lon: float,
) -> dict:
"""Compute spatial heterogeneity and temporal observation density covariates."""
# Spatial GCC statistics over ~300 m footprint
means, stds = [], []
for p in s2_gcc_paths:
m, s = _read_footprint_stats(p, lat, lon)
if m is not None and m > 0:
means.append(m)
stds.append(s)
spatial_gcc_cv = (
round(float(np.mean([s / m for s, m in zip(stds, means)])), 4)
if means
else None
)
spatial_gcc_std = round(float(np.mean(stds)), 4) if stds else None
# S2 temporal gap statistics
s2_dates = [datetime.date.fromisoformat(p["date"]) for p in s2_series]
if len(s2_dates) >= 2:
gaps = [(s2_dates[i + 1] - s2_dates[i]).days for i in range(len(s2_dates) - 1)]
s2_mean_gap = round(float(np.mean(gaps)), 1)
s2_max_gap = int(max(gaps))
else:
s2_mean_gap = None
s2_max_gap = None
return {
"spatial_gcc_cv": spatial_gcc_cv,
"spatial_gcc_std": spatial_gcc_std,
"s2_scene_count": len(s2_series),
"s2_mean_gap_days": s2_mean_gap,
"s2_max_gap_days": s2_max_gap,
"s3_composite_count": len(s3_series),
"n_gcc_points": n_gcc_points,
}
# ---------------------------------------------------------------------------
# Per-site export
# ---------------------------------------------------------------------------
def _write_json(path: Path, data: Any) -> None:
path.write_text(json.dumps(data, separators=(",", ":")))
def _raster_series(
tif_paths: list[Path],
date_fn,
lat: float,
lon: float,
desc: str,
) -> list[dict]:
"""Extract center-pixel GCC from each tif, return ``[{date, gcc}]`` sorted."""
result = []
for p in tqdm(tif_paths, desc=desc, leave=False):
date = date_fn(p)
if date is None:
continue
val = _read_center_pixel(p, lat, lon)
if val is not None:
result.append({"date": f"{date[:4]}-{date[4:6]}-{date[6:]}", "gcc": val})
return sorted(result, key=lambda x: x["date"])
def _raster_index(tif_paths: list[Path], date_fn, rel_root: Path) -> list[dict]:
"""Build raster index: ``[{date, path}]`` sorted by date."""
result = []
for p in tif_paths:
date = date_fn(p)
if date is None:
continue
try:
rel = str(p.relative_to(rel_root))
except ValueError:
rel = str(p)
result.append({"date": date, "path": rel})
return sorted(result, key=lambda x: x["date"])
def export_site(
site: str,
year: int,
lat: float,
lon: float,
out_dir: Path,
n_gcc_points: int | None = None,
) -> bool:
"""Export timeseries.json and rasters.json for one site. Returns True on success."""
sentinel_base = DATA_DIR / "sentinel_data" / str(year) / site / "prepared"
fusion_base = DATA_DIR / "fusion" / str(year) / site
s2_gcc_dir = sentinel_base / "s2"
s3_gcc_dir = sentinel_base / "gcc_s3"
bti_gcc_dir = fusion_base / "bti" / "gcc"
itb_gcc_dir = fusion_base / "itb" / "fusion"
# Raster slider sources
s2_refl_dir = sentinel_base / "s2"
s3_comp_dir = sentinel_base / "s3"
bti_refl_dir = fusion_base / "bti" / "fusion"
has_fusion = bti_gcc_dir.is_dir() and any(bti_gcc_dir.glob("GCC_*.tif"))
if not has_fusion:
return False
out_dir.mkdir(parents=True, exist_ok=True)
# --- GCC timeseries from rasters ---
s2_gcc_paths = sorted(s2_gcc_dir.glob("*_GCC.tif"))
s3_gcc_paths = sorted(s3_gcc_dir.glob("composite_*.tif"))
bti_paths = sorted(bti_gcc_dir.glob("GCC_*.tif"))
itb_paths = sorted(itb_gcc_dir.glob("GCC_*.tif"))
s2_series = _raster_series(s2_gcc_paths, _date_from_s2_tif, lat, lon, f"{site} S2")
s3_series = _raster_series(s3_gcc_paths, _date_from_gcc_tif, lat, lon, f"{site} S3")
bti_series = _raster_series(bti_paths, _date_from_gcc_tif, lat, lon, f"{site} BtI")
itb_series = _raster_series(itb_paths, _date_from_gcc_tif, lat, lon, f"{site} ItB")
# Whittaker on S2
s2_vals = [p["gcc"] for p in s2_series]
s2_smooth_vals = _whittaker_smooth(s2_vals)
s2_whittaker = [
{"date": p["date"], "gcc": v}
for p, v in zip(s2_series, s2_smooth_vals)
if v is not None
]
# S3 5-day moving average
s3_smooth = _moving_average(s3_series, "gcc", S3_SMOOTH_WINDOW)
# PhenoCam CSV
csv_path = DATA_DIR / "phenocam" / str(year) / f"{site}_1day.csv"
phenocam_series, image_list = _parse_phenocam_csv(csv_path, year, site)
s3_smooth_series = [
{"date": p["date"], "gcc": p["gcc_smooth"]}
for p in s3_smooth
if p.get("gcc_smooth") is not None
]
# Band reflectance timeseries (multi-band center-pixel)
bands_s2 = _multiband_series(
sorted(s2_refl_dir.glob("*_REFL.tif")),
_date_from_s2_tif,
lat,
lon,
S2_BAND_NAMES,
f"{site} S2 bands",
)
bands_s3 = _multiband_series(
sorted(s3_comp_dir.glob("composite_*.tif")),
_date_from_gcc_tif,
lat,
lon,
S3_BAND_NAMES,
f"{site} S3 bands",
)
# --- Per-metric JSON outputs ---
_write_json(out_dir / "gcc_phenocam.json", phenocam_series)
_write_json(out_dir / "gcc_s2.json", s2_series)
_write_json(out_dir / "gcc_s2_whittaker.json", s2_whittaker)
_write_json(out_dir / "gcc_s3.json", s3_series)
_write_json(out_dir / "gcc_s3_smooth.json", s3_smooth_series)
_write_json(out_dir / "gcc_fusion_bti.json", bti_series)
_write_json(out_dir / "gcc_fusion_itb.json", itb_series)
_write_json(out_dir / "phenocam_images.json", image_list)
_write_json(out_dir / "bands_s2.json", bands_s2)
_write_json(out_dir / "bands_s3.json", bands_s3)
# --- Raster index for slider ---
rel_root = DATA_DIR.parent # paths relative to project root
# Valid-pixel sets: only show S2/S3 rasters where the center pixel had
# usable data (non-zero GCC). This excludes cloud-masked / snow-covered
# scenes that would render as black or visually nonsensical.
s2_valid_dates = {p["date"].replace("-", "") for p in s2_series}
s3_valid_dates = {p["date"].replace("-", "") for p in s3_series}
s2_refl = [
r
for r in _raster_index(
sorted(s2_refl_dir.glob("*_REFL.tif")), _date_from_s2_tif, rel_root
)
if r["date"] in s2_valid_dates
]
s3_comp = [
r
for r in _raster_index(
sorted(s3_comp_dir.glob("composite_*.tif")), _date_from_gcc_tif, rel_root
)
if r["date"] in s3_valid_dates
]
s2_gcc = [
r
for r in _raster_index(
sorted(s2_gcc_dir.glob("*_GCC.tif")), _date_from_s2_tif, rel_root
)
if r["date"] in s2_valid_dates
]
s3_gcc = [
r
for r in _raster_index(
sorted(s3_gcc_dir.glob("composite_*.tif")), _date_from_gcc_tif, rel_root
)
if r["date"] in s3_valid_dates
]
bti_refl = _raster_index(
sorted(bti_refl_dir.glob("REFL_*.tif")), _date_from_gcc_tif, rel_root
)
itb_gcc = _raster_index(
sorted(itb_gcc_dir.glob("GCC_*.tif")), _date_from_gcc_tif, rel_root
)
_write_json(out_dir / "rasters_s2_refl.json", s2_refl)
_write_json(out_dir / "rasters_s3_composite.json", s3_comp)
_write_json(out_dir / "rasters_s2_gcc.json", s2_gcc)
_write_json(out_dir / "rasters_s3_gcc.json", s3_gcc)
_write_json(out_dir / "rasters_fusion_bti_refl.json", bti_refl)
_write_json(out_dir / "rasters_fusion_itb_gcc.json", itb_gcc)
# --- Site covariates (heterogeneity + observation density) ---
_write_json(
out_dir / "covariates.json",
compute_covariates(s2_gcc_paths, s2_series, s3_series, n_gcc_points, lat, lon),
)
# --- Validation metrics vs PhenoCam gcc_90 ---
_write_json(
out_dir / "metrics.json",
{
"bti": compute_metrics(phenocam_series, "gcc_90", bti_series, "gcc"),
"itb": compute_metrics(phenocam_series, "gcc_90", itb_series, "gcc"),
"s2_whittaker": compute_metrics(
phenocam_series, "gcc_90", s2_whittaker, "gcc"
),
"s3_smooth": compute_metrics(
phenocam_series, "gcc_90", s3_smooth_series, "gcc"
),
"s2": compute_metrics(phenocam_series, "gcc_90", s2_series, "gcc"),
"s3": compute_metrics(phenocam_series, "gcc_90", s3_series, "gcc"),
},
)
# Remove legacy bundled outputs if present
for legacy in ("timeseries.json", "rasters.json"):
(out_dir / legacy).unlink(missing_ok=True)
return True
# ---------------------------------------------------------------------------
# Manifest
# ---------------------------------------------------------------------------
VEG_TYPE_LABELS = {
"AG": "Agriculture",
"DB": "Deciduous broadleaf",
"DN": "Deciduous needleleaf",
"EB": "Evergreen broadleaf",
"EN": "Evergreen needleleaf",
"GR": "Grassland",
"MX": "Mixed",
"SH": "Shrubland",
"TN": "Tundra",
"UN": "Unknown",
"WL": "Wetland",
"RF": "Reference",
}
def build_manifest(years: list[int], filter_site: str | None = None) -> dict:
manifest: dict[str, Any] = {"years": years, "sites": {}}
for year in years:
screening_path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not screening_path.is_file():
continue
data = json.loads(screening_path.read_text())
sites_meta: dict[str, Any] = {}
for entry in data.get("sites", []):
if entry.get("calculations", {}).get("status") != "PASS":
continue
cam = entry.get("response", {}).get("camera", {})
roi = entry.get("response", {}).get("roi", {})
calc = entry.get("calculations", {})
site = cam.get("Sitename", "")
if not site:
continue
if filter_site and site != filter_site:
continue
sm = cam.get("sitemetadata", {})
veg_raw = sm.get("primary_veg_type") or roi.get("roitype") or "UN"
fusion_dir = DATA_DIR / "fusion" / str(year) / site / "bti" / "gcc"
has_fusion = fusion_dir.is_dir() and any(fusion_dir.glob("GCC_*.tif"))
sites_meta[site] = {
"lat": cam.get("Lat"),
"lon": cam.get("Lon"),
"veg_type": veg_raw,
"veg_label": VEG_TYPE_LABELS.get(veg_raw, veg_raw),
"description": sm.get("site_description", ""),
"dominant_species": sm.get("dominant_species", ""),
"group": sm.get("group", ""),
"snr": calc.get("snr"),
"n_gcc_points": calc.get("n_gcc_points"),
"has_fusion": has_fusion,
}
manifest["sites"][str(year)] = sites_meta
return manifest
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument("--site", type=str, default=None)
args = parser.parse_args()
year = args.evaluation_year
filter_site = args.site
out_base = DATA_DIR / "metrics"
out_base.mkdir(parents=True, exist_ok=True)
# Determine years with screening data
screening_dir = DATA_DIR / "phenocam_screening"
years = sorted(
int(p.stem) for p in screening_dir.glob("*.json") if p.stem.isdigit()
)
if not years:
years = [year]
print(f"Building manifest for years: {years}")
manifest = build_manifest(years, filter_site)
# Export per-site data for the requested year
year_sites = manifest["sites"].get(str(year), {})
fusion_sites = {s: m for s, m in year_sites.items() if m["has_fusion"]}
print(f"Exporting {len(fusion_sites)} site(s) with fusion data for {year}")
for site, meta in tqdm(fusion_sites.items(), desc="Sites"):
out_dir = out_base / str(year) / site
ok = export_site(
site, year, meta["lat"], meta["lon"], out_dir, meta.get("n_gcc_points")
)
if ok:
print(f"{site}")
else:
print(f"{site} — no fusion data found")
manifest_path = out_base / "manifest.json"
if filter_site and manifest_path.is_file():
# Merge: update only the filtered site(s); preserve all other entries.
existing: dict = json.loads(manifest_path.read_text())
for year_key, year_sites_new in manifest["sites"].items():
existing.setdefault("sites", {}).setdefault(year_key, {}).update(
year_sites_new
)
all_years = sorted(set(existing.get("years", [])) | set(manifest["years"]))
existing["years"] = all_years
manifest_path.write_text(json.dumps(existing, separators=(",", ":")))
else:
manifest_path.write_text(json.dumps(manifest, separators=(",", ":")))
print(f"Manifest written → {manifest_path}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,257 @@
"""Step 6: Paired ItB-vs-BtI significance test across the full sample.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``metrics/{year}/{site}/metrics.json`` per-site validation metrics (Step 5)
Outputs (``data/statistics_fusion_order/``):
- ``{year}.json`` paired Wilcoxon + t-test summary for NSE, RMSE, nRMSE, r;
includes ``dropped_sites`` (union) and per-metric ``dropped_sites`` lists
CLI:
- ``--evaluation-year`` (default 2025)
- ``--alpha`` (default 0.05) significance threshold for ``better_order``
This step aggregates across all sites with Step 5 output; it does not accept
``--site`` (a single-site filter would not support a sample-level test).
"""
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
import numpy as np
from scipy.stats import ttest_rel, wilcoxon
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
DEFAULT_ALPHA = 0.05
METRICS = ["nse", "rmse", "nrmse", "r"]
LOWER_IS_BETTER = {"rmse", "nrmse"}
MIN_PAIRS_WARNING = 6
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _r4(v: float | None) -> float | None:
return round(v, 4) if v is not None else None
def _load_site_metrics(year: int) -> list[tuple[str, dict[str, Any]]]:
"""Return ``(sitename, metrics.json payload)`` for every site under ``{year}``."""
metrics_dir = DATA_DIR / "metrics" / str(year)
if not metrics_dir.is_dir():
return []
payloads: list[tuple[str, dict[str, Any]]] = []
for site_dir in sorted(metrics_dir.iterdir()):
if not site_dir.is_dir():
continue
path = site_dir / "metrics.json"
if not path.is_file():
continue
payloads.append((site_dir.name, json.loads(path.read_text())))
return payloads
def collect_pairs(
site_metrics: list[tuple[str, dict[str, Any]]], metric: str
) -> tuple[list[float], list[float], list[str]]:
"""Return paired BtI / ItB values for ``metric`` and dropped site names."""
bti_vals: list[float] = []
itb_vals: list[float] = []
dropped_sites: list[str] = []
for site, payload in site_metrics:
bti = payload.get("bti")
itb = payload.get("itb")
if not isinstance(bti, dict) or not isinstance(itb, dict):
dropped_sites.append(site)
continue
bti_v = bti.get(metric)
itb_v = itb.get(metric)
if bti_v is None or itb_v is None:
dropped_sites.append(site)
continue
bti_vals.append(float(bti_v))
itb_vals.append(float(itb_v))
return bti_vals, itb_vals, dropped_sites
def _better_order(
bti_vals: list[float],
itb_vals: list[float],
metric: str,
p_value: float | None,
alpha: float,
) -> str:
"""Name the better fusion order when Wilcoxon p < alpha, else no difference."""
if p_value is None or p_value >= alpha:
return "no significant difference"
mean_diff = float(np.mean(itb_vals) - np.mean(bti_vals))
if metric in LOWER_IS_BETTER:
return "itb" if mean_diff < 0 else "bti"
return "itb" if mean_diff > 0 else "bti"
def paired_test(
bti_vals: list[float],
itb_vals: list[float],
metric: str,
alpha: float,
) -> dict[str, Any]:
"""Run paired Wilcoxon (primary) and t-test; return summary dict."""
n_pairs = len(bti_vals)
bti_arr = np.array(bti_vals, dtype=float)
itb_arr = np.array(itb_vals, dtype=float)
diffs = itb_arr - bti_arr
result: dict[str, Any] = {
"n_pairs": n_pairs,
"bti_mean": _r4(float(bti_arr.mean())) if n_pairs else None,
"bti_median": _r4(float(np.median(bti_arr))) if n_pairs else None,
"itb_mean": _r4(float(itb_arr.mean())) if n_pairs else None,
"itb_median": _r4(float(np.median(itb_arr))) if n_pairs else None,
"mean_diff": _r4(float(diffs.mean())) if n_pairs else None,
"median_diff": _r4(float(np.median(diffs))) if n_pairs else None,
"wilcoxon": {"statistic": None, "p_value": None},
"ttest": {"statistic": None, "p_value": None},
"better_order": "insufficient data",
}
if n_pairs < 2:
return result
wilcoxon_stat: float | None = None
wilcoxon_p: float | None = None
if np.any(diffs != 0):
try:
w_stat, w_p = wilcoxon(itb_arr, bti_arr)
wilcoxon_stat = float(w_stat)
wilcoxon_p = float(w_p)
except ValueError:
pass
t_stat, t_p = ttest_rel(itb_arr, bti_arr)
result["wilcoxon"] = {
"statistic": _r4(wilcoxon_stat),
"p_value": _r4(wilcoxon_p),
}
result["ttest"] = {
"statistic": _r4(float(t_stat)),
"p_value": _r4(float(t_p)),
}
result["better_order"] = _better_order(
bti_vals, itb_vals, metric, wilcoxon_p, alpha
)
return result
def _print_summary(
year: int, alpha: float, n_sites_total: int, metrics_out: dict
) -> None:
print(f"\nPaired ItB vs BtI test — {year} (alpha={alpha}, sites={n_sites_total})")
print(
f"{'metric':<8} {'n':>4} {'BtI mean':>10} {'ItB mean':>10} "
f"{'diff':>10} {'W p':>8} {'t p':>8} better"
)
print("-" * 78)
for metric in METRICS:
m = metrics_out[metric]
bti_mean = m["bti_mean"]
itb_mean = m["itb_mean"]
mean_diff = m["mean_diff"]
w_p = m["wilcoxon"]["p_value"]
t_p = m["ttest"]["p_value"]
better = m["better_order"]
def _fmt(v: float | None) -> str:
return f"{v:10.4f}" if v is not None else f"{'':>10}"
print(
f"{metric:<8} {m['n_pairs']:>4} {_fmt(bti_mean)} {_fmt(itb_mean)} "
f"{_fmt(mean_diff)} "
f"{w_p if w_p is not None else '':>8} "
f"{t_p if t_p is not None else '':>8} {better}"
)
if 0 < m["n_pairs"] < MIN_PAIRS_WARNING:
print(
f" warning: only {m['n_pairs']} pair(s) for {metric}; "
"interpret p-values cautiously"
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--alpha",
type=float,
default=DEFAULT_ALPHA,
help="Significance threshold for better_order (default 0.05)",
)
args = parser.parse_args()
year = args.evaluation_year
alpha = args.alpha
site_metrics = _load_site_metrics(year)
n_sites_total = len(site_metrics)
if n_sites_total == 0:
raise SystemExit(
f"No Step 5 metrics found under {DATA_DIR / 'metrics' / str(year)}"
)
metrics_out: dict[str, Any] = {}
all_dropped: set[str] = set()
for metric in METRICS:
bti_vals, itb_vals, dropped_sites = collect_pairs(site_metrics, metric)
summary = paired_test(bti_vals, itb_vals, metric, alpha)
summary["n_dropped"] = len(dropped_sites)
summary["dropped_sites"] = dropped_sites
all_dropped.update(dropped_sites)
metrics_out[metric] = summary
payload = {
"year": year,
"alpha": alpha,
"n_sites_total": n_sites_total,
"dropped_sites": sorted(all_dropped),
"metrics": metrics_out,
}
out_dir = DATA_DIR / "statistics_fusion_order"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{year}.json"
out_path.write_text(json.dumps(payload, separators=(",", ":")))
_print_summary(year, alpha, n_sites_total, metrics_out)
print(f"\nWritten → {out_path}")
if __name__ == "__main__":
main()

742
7-gcc-suitability.py Normal file
View file

@ -0,0 +1,742 @@
"""Step 7: PhenoCam GCC suitability as a fusion-accuracy reference.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``metrics/{year}/{site}/gcc_s2.json``, ``gcc_phenocam.json`` Step 5 timeseries
- ``metrics/manifest.json`` site lat/lon
- ``sentinel_data/{year}/{site}/prepared/s2/`` S2 REFL/GCC/DIST_CLOUD (Steps 34)
- ``sentinel_data/{year}/{site}/prepared/gcc_s3/``, ``prepared/s3_rgb/`` Step 4
Outputs (``data/gcc_suitability/``):
- ``{year}.json`` representativeness (Line A), LOOCV concordance (Line B),
per-site and aggregate suitability verdict
CLI:
- ``--evaluation-year`` (default 2025)
- ``--min-cloudfree-s2`` (default 10) minimum cloud-free S2 dates for LOOCV
- ``--alpha`` (default 0.05) reserved for future significance tests
Full-sample aggregate; does not accept ``--site``.
"""
from __future__ import annotations
import argparse
import json
import re
import shutil
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
import numpy as np
import rasterio
from pyproj import Transformer
from rasterio.crs import CRS
from rasterio.transform import rowcol
from scipy.stats import linregress, pearsonr, spearmanr
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
DEFAULT_ALPHA = 0.05
MIN_CLOUDFREE_S2 = 10
REPR_R_THRESHOLD = 0.7
MATCH_TOLERANCE_DAYS = 5
RESOLUTION_RATIO = 30
MAX_DAYS = 100
MINIMUM_ACQUISITION_IMPORTANCE = 0
SMALL_SAMPLE_SITES = 6
# ---------------------------------------------------------------------------
# efast import
# ---------------------------------------------------------------------------
def _import_efast():
try:
import efast.efast as efast_module
return efast_module
except ImportError as exc:
raise ImportError("efast not found. Install with: uv sync") from exc
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _r4(v: float | None) -> float | None:
return round(v, 4) if v is not None else None
def _window_mean(data: np.ndarray) -> float | None:
valid = data[~np.isnan(data)]
if valid.size == 0:
return None
return float(np.mean(valid))
def _read_center_pixel(path: Path, lat: float, lon: float) -> float | None:
try:
with rasterio.open(path) as src:
transformer = Transformer.from_crs(
CRS.from_epsg(4326), src.crs, always_xy=True
)
x, y = transformer.transform(lon, lat)
row, col = rowcol(src.transform, x, y)
h, w = src.height, src.width
r0, r1 = max(0, row - 1), min(h, row + 2)
c0, c1 = max(0, col - 1), min(w, col + 2)
window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0)
data = src.read(1, window=window).astype(float)
nodata = src.nodata
if nodata is not None:
data = np.where(data == nodata, np.nan, data)
data[data == 0] = np.nan
return _window_mean(data)
except Exception:
return None
def _date_from_s2_tif(path: Path) -> str | None:
parts = path.stem.split("_")
if len(parts) >= 3:
m = re.match(r"(\d{8})", parts[2])
return m.group(1) if m else None
return None
def _iso_to_yyyymmdd(iso: str) -> str:
return iso.replace("-", "")
def _yyyymmdd_to_iso(d: str) -> str:
return f"{d[:4]}-{d[4:6]}-{d[6:]}"
def _day_gap(a: str, b: str) -> float:
return abs((np.datetime64(a) - np.datetime64(b)) / np.timedelta64(1, "D"))
def _match_series(
ref: list[dict],
ref_key: str,
pred: list[dict],
pred_key: str,
tolerance_days: int = MATCH_TOLERANCE_DAYS,
) -> tuple[list[float], list[float], list[str]]:
"""Return paired (ref_vals, pred_vals, ref_dates) within tolerance."""
ref_lookup: dict[str, float] = {
p["date"]: p[ref_key] for p in ref if p.get(ref_key) is not None
}
if not ref_lookup:
return [], [], []
ref_dates = sorted(ref_lookup)
obs, sim, matched_dates = [], [], []
for pt in pred:
v = pt.get(pred_key)
if v is None:
continue
nearest = min(ref_dates, key=lambda d: _day_gap(pt["date"], d))
if _day_gap(pt["date"], nearest) <= tolerance_days:
obs.append(ref_lookup[nearest])
sim.append(v)
matched_dates.append(nearest)
return obs, sim, matched_dates
def _accuracy_metrics(obs: list[float], sim: list[float]) -> dict[str, Any] | None:
if len(obs) < 2:
return None
obs_arr = np.array(obs, dtype=float)
sim_arr = np.array(sim, dtype=float)
diff = sim_arr - obs_arr
rmse = float(np.sqrt(np.mean(diff**2)))
mae = float(np.mean(np.abs(diff)))
bias = float(np.mean(diff))
r, _ = pearsonr(obs_arr, sim_arr)
return {
"n": len(obs),
"rmse": _r4(rmse),
"mae": _r4(mae),
"bias": _r4(bias),
"r": _r4(float(r)),
}
def _gcc_from_refl_file(refl_path: Path, gcc_path: Path) -> None:
with rasterio.open(refl_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
invalid = (b < 0) | (g < 0) | (r < 0)
gcc = np.where(invalid, np.nan, g / (total + 1e-10))
gcc[total == 0] = np.nan
profile.update(count=1, dtype="float32")
with rasterio.open(gcc_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
def _load_json_series(path: Path) -> list[dict]:
if not path.is_file():
return []
return json.loads(path.read_text())
def _load_site_coords(year: int) -> dict[str, tuple[float, float]]:
manifest_path = DATA_DIR / "metrics" / "manifest.json"
if not manifest_path.is_file():
return {}
manifest = json.loads(manifest_path.read_text())
sites = manifest.get("sites", {}).get(str(year), {})
coords: dict[str, tuple[float, float]] = {}
for site, meta in sites.items():
lat, lon = meta.get("lat"), meta.get("lon")
if lat is not None and lon is not None:
coords[site] = (float(lat), float(lon))
return coords
def _discover_sites(year: int) -> list[str]:
metrics_dir = DATA_DIR / "metrics" / str(year)
if not metrics_dir.is_dir():
return []
return sorted(
d.name
for d in metrics_dir.iterdir()
if d.is_dir() and (d / "gcc_s2.json").is_file()
)
def _build_hr_symlink_dir(s2_dir: Path, holdout_yyyymmdd: str, dest: Path) -> None:
"""Symlink all S2 hr inputs except the held-out acquisition date."""
dest.mkdir(parents=True, exist_ok=True)
for pattern in ("*_REFL.tif", "*_GCC.tif", "*_DIST_CLOUD.tif"):
for src in sorted(s2_dir.glob(pattern)):
date_token = (
src.stem.split("_")[2][:8] if len(src.stem.split("_")) >= 3 else ""
)
if date_token == holdout_yyyymmdd:
continue
link = dest / src.name
if link.exists() or link.is_symlink():
link.unlink()
link.symlink_to(src.resolve())
# ---------------------------------------------------------------------------
# Line A — representativeness
# ---------------------------------------------------------------------------
def compute_representativeness(phenocam: list[dict], s2: list[dict]) -> dict[str, Any]:
"""PhenoCam gcc_90 vs co-located observed S2 GCC."""
obs, sim, _ = _match_series(phenocam, "gcc_90", s2, "gcc")
result: dict[str, Any] = {
"n": len(obs),
"r": None,
"spearman": None,
"slope": None,
"intercept": None,
"rmse": None,
"bias": None,
"peak_offset_days": None,
"representative": False,
}
if len(obs) < 2:
return result
obs_arr = np.array(obs, dtype=float)
sim_arr = np.array(sim, dtype=float)
r, _ = pearsonr(obs_arr, sim_arr)
sp, _ = spearmanr(obs_arr, sim_arr)
reg = linregress(sim_arr, obs_arr)
diff = sim_arr - obs_arr
result.update(
{
"r": _r4(float(r)),
"spearman": _r4(float(sp)),
"slope": _r4(float(reg.slope)),
"intercept": _r4(float(reg.intercept)),
"rmse": _r4(float(np.sqrt(np.mean(diff**2)))),
"bias": _r4(float(np.mean(diff))),
"representative": float(r) >= REPR_R_THRESHOLD,
}
)
pc_dates = [p["date"] for p in phenocam if p.get("gcc_90") is not None]
s2_dates = [p["date"] for p in s2 if p.get("gcc") is not None]
if pc_dates and s2_dates:
pc_peak = max(
phenocam,
key=lambda p: p["gcc_90"] if p.get("gcc_90") is not None else -1,
)["date"]
s2_peak = max(s2, key=lambda p: p["gcc"] if p.get("gcc") is not None else -1)[
"date"
]
result["peak_offset_days"] = int(_day_gap(pc_peak, s2_peak))
return result
# ---------------------------------------------------------------------------
# Line B — LOOCV
# ---------------------------------------------------------------------------
def _phenocam_lookup(phenocam: list[dict]) -> dict[str, float]:
return {p["date"]: p["gcc_90"] for p in phenocam if p.get("gcc_90") is not None}
def _nearest_phenocam(iso_date: str, lookup: dict[str, float]) -> float | None:
if not lookup:
return None
dates = sorted(lookup)
nearest = min(dates, key=lambda d: _day_gap(iso_date, d))
if _day_gap(iso_date, nearest) <= MATCH_TOLERANCE_DAYS:
return lookup[nearest]
return None
def run_loocv_site(
site: str,
year: int,
lat: float,
lon: float,
s2_series: list[dict],
phenocam: list[dict],
efast,
) -> list[dict[str, Any]]:
"""Leave-one-out EFAST for each cloud-free S2 date; return per-date records."""
s2_dir = DATA_DIR / "sentinel_data" / str(year) / site / "prepared" / "s2"
gcc_s3_dir = DATA_DIR / "sentinel_data" / str(year) / site / "prepared" / "gcc_s3"
s3_rgb_dir = DATA_DIR / "sentinel_data" / str(year) / site / "prepared" / "s3_rgb"
pc_lookup = _phenocam_lookup(phenocam)
s2_truth = {p["date"]: p["gcc"] for p in s2_series}
fusion_kwargs = dict(
ratio=RESOLUTION_RATIO,
max_days=MAX_DAYS,
minimum_acquisition_importance=MINIMUM_ACQUISITION_IMPORTANCE,
)
records: list[dict[str, Any]] = []
dates = [p["date"] for p in s2_series]
with tempfile.TemporaryDirectory(prefix=f"loocv_{site}_") as tmp_root:
tmp = Path(tmp_root)
hr_dir = tmp / "hr"
itb_out = tmp / "itb"
bti_out = tmp / "bti"
bti_gcc = tmp / "bti_gcc"
itb_out.mkdir()
bti_out.mkdir()
bti_gcc.mkdir()
for iso_date in tqdm(dates, desc=f"{site} LOOCV", leave=False):
yyyymmdd = _iso_to_yyyymmdd(iso_date)
truth = s2_truth.get(iso_date)
if truth is None:
continue
if hr_dir.exists():
shutil.rmtree(hr_dir)
_build_hr_symlink_dir(s2_dir, yyyymmdd, hr_dir)
pred_date = datetime.strptime(yyyymmdd, "%Y%m%d")
for f in itb_out.glob("*.tif"):
f.unlink()
for f in bti_out.glob("*.tif"):
f.unlink()
for f in bti_gcc.glob("*.tif"):
f.unlink()
efast.fusion(
pred_date,
gcc_s3_dir,
hr_dir,
itb_out,
product="GCC",
**fusion_kwargs,
)
efast.fusion(
pred_date,
s3_rgb_dir,
hr_dir,
bti_out,
product="REFL",
**fusion_kwargs,
)
itb_path = itb_out / f"GCC_{yyyymmdd}.tif"
refl_path = bti_out / f"REFL_{yyyymmdd}.tif"
bti_path = bti_gcc / f"GCC_{yyyymmdd}.tif"
pred_itb = (
_read_center_pixel(itb_path, lat, lon) if itb_path.is_file() else None
)
pred_bti = None
if refl_path.is_file():
_gcc_from_refl_file(refl_path, bti_path)
if bti_path.is_file():
pred_bti = _read_center_pixel(bti_path, lat, lon)
pc_val = _nearest_phenocam(iso_date, pc_lookup)
records.append(
{
"date": iso_date,
"s2_truth": truth,
"pred_bti": pred_bti,
"pred_itb": pred_itb,
"phenocam": pc_val,
}
)
return records
def _method_accuracy(records: list[dict], pred_key: str, ref_key: str) -> dict | None:
obs, sim = [], []
for rec in records:
pred = rec.get(pred_key)
ref = rec.get(ref_key)
if pred is None or ref is None:
continue
obs.append(ref)
sim.append(pred)
return _accuracy_metrics(obs, sim)
def _winner(rmse_bti: float | None, rmse_itb: float | None) -> str | None:
if rmse_bti is None or rmse_itb is None:
return None
if rmse_bti < rmse_itb:
return "bti"
if rmse_itb < rmse_bti:
return "itb"
return "tie"
def summarize_loocv(records: list[dict]) -> dict[str, Any]:
bti_vs_s2 = _method_accuracy(records, "pred_bti", "s2_truth")
itb_vs_s2 = _method_accuracy(records, "pred_itb", "s2_truth")
bti_vs_pc = _method_accuracy(records, "pred_bti", "phenocam")
itb_vs_pc = _method_accuracy(records, "pred_itb", "phenocam")
winner_s2 = _winner(
bti_vs_s2["rmse"] if bti_vs_s2 else None,
itb_vs_s2["rmse"] if itb_vs_s2 else None,
)
winner_pc = _winner(
bti_vs_pc["rmse"] if bti_vs_pc else None,
itb_vs_pc["rmse"] if itb_vs_pc else None,
)
agreement = (
winner_s2 == winner_pc
if winner_s2 and winner_pc and winner_s2 != "tie" and winner_pc != "tie"
else None
)
return {
"n_dates": len(records),
"bti": {"vs_s2": bti_vs_s2, "vs_phenocam": bti_vs_pc},
"itb": {"vs_s2": itb_vs_s2, "vs_phenocam": itb_vs_pc},
"winner_s2": winner_s2,
"winner_phenocam": winner_pc,
"winner_agreement": agreement,
}
# ---------------------------------------------------------------------------
# Aggregate concordance
# ---------------------------------------------------------------------------
def _pooled_concordance(
all_records: list[dict[str, Any]],
) -> dict[str, Any]:
"""Pooled metrics across all held-out dates."""
residual_pairs: list[tuple[float, float]] = []
vec_s2: list[float] = []
vec_pc: list[float] = []
for site_data in all_records:
for rec in site_data.get("records", []):
truth = rec.get("s2_truth")
pc = rec.get("phenocam")
for key in ("pred_bti", "pred_itb"):
pred = rec.get(key)
if pred is None or truth is None:
continue
err_s2 = abs(pred - truth)
if pc is not None:
err_pc = abs(pred - pc)
vec_s2.append(err_s2)
vec_pc.append(err_pc)
residual_pairs.append((err_s2, err_pc))
pooled_spearman = None
if len(vec_s2) >= 3:
sp, _ = spearmanr(vec_s2, vec_pc)
if not np.isnan(sp):
pooled_spearman = _r4(float(sp))
residual_corr = None
if len(residual_pairs) >= 3:
xs = np.array([p[0] for p in residual_pairs])
ys = np.array([p[1] for p in residual_pairs])
rc, _ = pearsonr(xs, ys)
residual_corr = _r4(float(rc))
agreements = [
s.get("winner_agreement")
for s in all_records
if s.get("eligible") and s.get("winner_agreement") is not None
]
winner_agreement_rate = (
_r4(sum(1 for a in agreements if a) / len(agreements)) if agreements else None
)
n_loocv_dates = sum(len(s.get("records", [])) for s in all_records)
return {
"pooled_spearman": pooled_spearman,
"residual_corr": residual_corr,
"winner_agreement_rate": winner_agreement_rate,
"n_loocv_dates": n_loocv_dates,
}
def _suitability_verdict(
n_repr_pass: int,
n_eligible: int,
n_total: int,
pooled: dict[str, Any],
) -> str:
if n_eligible == 0:
return "insufficient data"
repr_rate = n_repr_pass / n_total if n_total else 0
agree = pooled.get("winner_agreement_rate")
sp = pooled.get("pooled_spearman")
rc = pooled.get("residual_corr")
strong = 0
if repr_rate >= 0.6:
strong += 1
if agree is not None and agree >= 0.7:
strong += 1
if sp is not None and sp >= 0.8:
strong += 1
if rc is not None and rc >= 0.5:
strong += 1
if strong >= 3:
return "suitable"
if strong >= 1 or repr_rate >= 0.4:
return "partially suitable"
return "not suitable"
# ---------------------------------------------------------------------------
# Per-site processing
# ---------------------------------------------------------------------------
def process_site(
site: str,
year: int,
lat: float,
lon: float,
min_cloudfree: int,
efast,
) -> dict[str, Any]:
metrics_dir = DATA_DIR / "metrics" / str(year) / site
phenocam = _load_json_series(metrics_dir / "gcc_phenocam.json")
s2_series = _load_json_series(metrics_dir / "gcc_s2.json")
repr_metrics = compute_representativeness(phenocam, s2_series)
n_cloudfree = len(s2_series)
eligible = n_cloudfree >= min_cloudfree
result: dict[str, Any] = {
"eligible": eligible,
"n_cloudfree_s2": n_cloudfree,
"representativeness": repr_metrics,
"loocv": None,
"winner_s2": None,
"winner_phenocam": None,
"winner_agreement": None,
"records": [],
}
if not eligible:
return result
records = run_loocv_site(site, year, lat, lon, s2_series, phenocam, efast)
loocv = summarize_loocv(records)
result["loocv"] = loocv
result["winner_s2"] = loocv["winner_s2"]
result["winner_phenocam"] = loocv["winner_phenocam"]
result["winner_agreement"] = loocv["winner_agreement"]
result["records"] = records
return result
# ---------------------------------------------------------------------------
# Output / summary
# ---------------------------------------------------------------------------
def _compact_site_payload(site_result: dict[str, Any]) -> dict[str, Any]:
"""Drop raw LOOCV records from JSON output (keep summaries only)."""
out = {
"eligible": site_result["eligible"],
"n_cloudfree_s2": site_result["n_cloudfree_s2"],
"representativeness": site_result["representativeness"],
"winner_s2": site_result.get("winner_s2"),
"winner_phenocam": site_result.get("winner_phenocam"),
"winner_agreement": site_result.get("winner_agreement"),
}
if site_result.get("loocv"):
out["loocv"] = site_result["loocv"]
return out
def _print_summary(payload: dict[str, Any]) -> None:
year = payload["year"]
agg = payload["aggregate"]
print(
f"\nPhenoCam GCC suitability — {year} "
f"({payload['n_sites_total']} site(s), "
f"{payload['n_sites_eligible']} LOOCV-eligible, "
f"{payload['n_sites_repr_pass']} representative)"
)
print(f"Verdict: {agg['suitability_verdict']}")
print(
f" pooled Spearman (method errors): {agg.get('pooled_spearman', '')} "
f"residual corr: {agg.get('residual_corr', '')} "
f"winner agreement: {agg.get('winner_agreement_rate', '')} "
f"LOOCV dates: {agg.get('n_loocv_dates', '')}"
)
print(f"\n{'site':<28} {'repr r':>8} {'pass':>5} {'LOOCV n':>8} {'win agree':>10}")
print("-" * 65)
for site, data in sorted(payload["sites"].items()):
rep = data["representativeness"]
loocv_n = data.get("loocv", {}).get("n_dates") if data.get("loocv") else ""
agree = data.get("winner_agreement")
agree_s = "yes" if agree else ("no" if agree is False else "")
pass_s = "yes" if rep.get("representative") else "no"
print(
f"{site:<28} {rep.get('r') or '':>8} {pass_s:>5} "
f"{loocv_n!s:>8} {agree_s:>10}"
)
if payload["n_sites_total"] < SMALL_SAMPLE_SITES:
print(
f"\nNote: only {payload['n_sites_total']} site(s); "
"interpret cross-site aggregates cautiously."
)
if payload.get("dropped_sites"):
print(f"Dropped/ineligible: {', '.join(payload['dropped_sites'])}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--min-cloudfree-s2",
type=int,
default=MIN_CLOUDFREE_S2,
help="Minimum cloud-free S2 dates for LOOCV (default 10)",
)
parser.add_argument(
"--alpha",
type=float,
default=DEFAULT_ALPHA,
help="Significance threshold (reserved; default 0.05)",
)
args = parser.parse_args()
year = args.evaluation_year
min_cloudfree = args.min_cloudfree_s2
sites = _discover_sites(year)
if not sites:
raise SystemExit(
f"No Step 5 metrics found under {DATA_DIR / 'metrics' / str(year)}"
)
coords = _load_site_coords(year)
efast = _import_efast()
site_results: dict[str, dict[str, Any]] = {}
dropped: list[str] = []
for site in tqdm(sites, desc="Sites"):
if site not in coords:
dropped.append(site)
continue
lat, lon = coords[site]
site_results[site] = process_site(site, year, lat, lon, min_cloudfree, efast)
if not site_results[site]["eligible"]:
dropped.append(site)
n_eligible = sum(1 for s in site_results.values() if s["eligible"])
n_repr_pass = sum(
1
for s in site_results.values()
if s["representativeness"].get("representative")
)
pooled = _pooled_concordance(list(site_results.values()))
verdict = _suitability_verdict(n_repr_pass, n_eligible, len(sites), pooled)
payload = {
"year": year,
"alpha": args.alpha,
"repr_r_threshold": REPR_R_THRESHOLD,
"min_cloudfree_s2": min_cloudfree,
"n_sites_total": len(sites),
"n_sites_eligible": n_eligible,
"n_sites_repr_pass": n_repr_pass,
"aggregate": {
"suitability_verdict": verdict,
**pooled,
},
"sites": {
site: _compact_site_payload(data)
for site, data in sorted(site_results.items())
},
"dropped_sites": sorted(set(dropped)),
}
out_dir = DATA_DIR / "gcc_suitability"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"{year}.json"
out_path.write_text(json.dumps(payload, separators=(",", ":")))
_print_summary(payload)
print(f"\nWritten → {out_path}")
if __name__ == "__main__":
main()

21
LICENSE Normal file
View file

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Felix Delattre
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

92
README.md Normal file
View file

@ -0,0 +1,92 @@
# EFAST fusion with phenocam validation.
End-to-end pipeline from selecting sites from the global [PhenoCam Network](https://phenocam.nau.edu/) to run [EFAST](https://github.com/DHI-GRAS/efast) spatio-temporal fusion with Sentinel-2 / Sentinel-3 and validate GCCs accross sensors. The numbered steps cover site selection, Sentinel data acquisition, different fusion orders, accuracy metrics, and sample-level statistics, all feeding a static web QA viewer.
---
## Pipeline overview
| Step | Script | What it does |
|------|--------|--------------|
| 1 | `1-phenocam.py` | Download PhenoCam metadata and `one_day_summary` GCC CSVs |
| 2 | `2-phenocam-screening.py` | Apply PhenoCam count, SNR, and proximity gates to select feasible sites |
| 3 | `3-sentinel-data.py` | Acquire S2 (Earth Search COG) and S3 OLCI SYN L2 (CDSE OpenEO); prepare REFL, DIST_CLOUD, and composite GeoTIFFs |
| 4 | `4-fusion.py` | Run EFAST BtI (fuse reflectance → GCC) and ItB (fuse GCC directly) for each screened site |
| 5 | `5-metrics.py` | Extract PhenoCam-matched timeseries, compute NSE/RMSE/r baselines and fusion metrics, emit per-site JSON and webapp manifest |
| 6 | `6-statistics-fusion-order.py` | Paired ItB-vs-BtI significance test (Wilcoxon + t-test) across all sites |
| 7 | `7-gcc-suitability.py` | PhenoCam GCC suitability as a fusion-accuracy reference (representativeness + LOOCV concordance) |
---
## Quick start
### Run pipeline wrapper (recommended)
```bash
uv sync
uv run python run-pipeline.py --evaluation-year 2025
```
Runs all five steps in order. Steps 1 and 2 are skipped when their output already exists. Each site in steps 35 is skipped when `data/metrics/{year}/{site}/metrics.json` is present. Any failure stops the run immediately, so one can fix the issue and re-run; completed work is never repeated.
```bash
# single site (steps 1 and 2 still skip if already done)
uv run python run-pipeline.py --evaluation-year 2025 --site ICOSFR-Fon1
```
### Step by step
```bash
uv sync
uv run python 1-phenocam.py --evaluation-year 2025
uv run python 2-phenocam-screening.py --evaluation-year 2025
uv run python 3-sentinel-data.py --evaluation-year 2025
uv run python 4-fusion.py --evaluation-year 2025
uv run python 5-metrics.py --evaluation-year 2025
uv run python 6-statistics-fusion-order.py --evaluation-year 2025
uv run python 7-gcc-suitability.py --evaluation-year 2025
```
Steps 15 accept `--evaluation-year` (default `2025`) and `--site` (optional, for single-site runs). Steps 67 are full-sample aggregates and only accept `--evaluation-year` (Step 6 and 7 also accept `--alpha`; Step 7 adds `--min-cloudfree-s2`, default `10`). Steps 35 are resumable — existing output files are skipped.
```bash
# single site
uv run python 3-sentinel-data.py --evaluation-year 2025 --site ICOSFR-Fon1
uv run python 4-fusion.py --evaluation-year 2025 --site ICOSFR-Fon1
uv run python 5-metrics.py --evaluation-year 2025 --site ICOSFR-Fon1
```
### Credentials
Step 3 S3 download uses CDSE OpenEO (`SENTINEL3_SYN_L2_SYN`). Set `CDSE_USER` and `CDSE_PASSWORD` in `../.env` at the workspace root. S2 uses AWS Earth Search COG range reads (no auth required).
---
## Outputs (under `data/`)
| Artifact | Step | Role |
|----------|------|------|
| `phenocam/{year}.json` | 1 | Site list + `sites_dir` pointer |
| `phenocam/{year}/{site}.json`, `{site}_1day.csv` | 1 | Raw API payload and GCC CSV |
| `phenocam_screening/{year}.json` / `.csv` | 2 | Gate results (pass/fail per site) |
| `sentinel_data/{year}/{site}/prepared/s2/` | 3 | S2 REFL + DIST_CLOUD GeoTIFFs |
| `sentinel_data/{year}/{site}/prepared/s3/` | 3 | S3 composite GeoTIFFs |
| `fusion/{year}/{site}/bti/`, `.../itb/` | 4 | BtI fused reflectance + GCC; ItB fused GCC |
| `metrics/{year}/{site}/` | 5 | Per-site timeseries, metrics, covariates JSON |
| `metrics/manifest.json` | 5 | Webapp manifest (years + site metadata) |
| `statistics_fusion_order/{year}.json` | 6 | Paired ItB-vs-BtI test summary (NSE, RMSE, nRMSE, r) |
| `gcc_suitability/{year}.json` | 7 | PhenoCam GCC suitability summary (representativeness + LOOCV concordance) |
---
## Web viewer
`python3 -m http.server 8080` runs the webapp on [http://localhost:8000/index.html](http://localhost:8000/index.html). Requires step 5 output (`data/metrics/manifest.json`). The Statistics overlay GCC suitability tab uses step 7 output (`data/gcc_suitability/{year}.json`).
---
## License
[MIT](LICENSE)

File diff suppressed because it is too large Load diff

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

57
common.js Normal file
View file

@ -0,0 +1,57 @@
/** Shared GeoTIFF → canvas for RGB (≥3 bands) or grayscale (1 band). Requires global GeoTIFF. */
async function geotiffToCanvasDataUrl(arrayBuffer) {
const tiff = await GeoTIFF.fromArrayBuffer(arrayBuffer);
const image = await tiff.getImage();
const rasters = await image.readRasters();
const width = image.getWidth();
const height = image.getHeight();
const bbox = image.getBoundingBox();
const geoKeys = image.getGeoKeys();
const crsCode = geoKeys.ProjectedCSTypeGeoKey
? `EPSG:${geoKeys.ProjectedCSTypeGeoKey}`
: geoKeys.GeographicTypeGeoKey !== 4326
? `EPSG:${geoKeys.GeographicTypeGeoKey}`
: "EPSG:4326";
const normalize = (arr) => {
let min = Infinity,
max = -Infinity;
for (const v of arr) if (!isNaN(v) && v > 0) {
min = Math.min(min, v);
max = Math.max(max, v);
}
return arr.map((v) => Math.max(0, Math.min(255, ((v - min) / (max - min || 1)) * 255)));
};
const spp = typeof image.getSamplesPerPixel === "function" ? image.getSamplesPerPixel() : 0;
const multi = spp >= 3 || (rasters[1] != null && rasters[2] != null);
const b0 = Array.from(rasters[0]);
let rN, gN, bN;
if (multi) {
const blue = b0,
green = Array.from(rasters[1]),
red = Array.from(rasters[2]);
rN = normalize(red);
gN = normalize(green);
bN = normalize(blue);
} else {
const g = normalize(b0);
rN = g;
gN = g;
bN = g;
}
const canvas = Object.assign(document.createElement("canvas"), { width, height });
const ctx = canvas.getContext("2d");
ctx.imageSmoothingEnabled = false;
const imgData = ctx.createImageData(width, height);
for (let i = 0; i < rN.length; i++) {
const idx = i * 4;
if (rN[i] === 0 && gN[i] === 0 && bN[i] === 0) imgData.data[idx + 3] = 0;
else {
imgData.data[idx] = rN[i];
imgData.data[idx + 1] = gN[i];
imgData.data[idx + 2] = bN[i];
imgData.data[idx + 3] = 255;
}
}
ctx.putImageData(imgData, 0, 0);
return { dataUrl: canvas.toDataURL(), bbox, crsCode };
}

Binary file not shown.

Binary file not shown.

File diff suppressed because one or more lines are too long

Binary file not shown.

Binary file not shown.

1446
index.html Normal file

File diff suppressed because it is too large Load diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

31
pyproject.toml Normal file
View file

@ -0,0 +1,31 @@
[project]
name = "efast-phenocam-validation"
version = "0.1.0"
description = "EFAST Sentinel-2/Sentinel-3 fusion pipeline with PhenoCam GCC validation"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"efast @ git+https://github.com/DHI-GRAS/efast.git",
"netCDF4",
"numpy",
"openeo",
"pystac-client",
"python-dateutil",
"python-dotenv",
"rasterio",
"requests",
"scipy",
"shapely",
"tqdm",
]
[dependency-groups]
dev = [
"ruff",
]
[tool.ruff.lint.per-file-ignores]
"1-phenocam.py" = ["E402"]
"2-phenocam-screening.py" = ["E402"]
"3-sentinel-data.py" = ["E402"]
"4-fusion.py" = ["E402"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

142
run-pipeline.py Normal file
View file

@ -0,0 +1,142 @@
"""Pipeline wrapper: run steps 1 → 2 → 3 → 4 → 5.
Steps 1 and 2 run once for the whole year (skipped when their output already
exists). Steps 35 run site-by-site for every PASS site from
``data/phenocam_screening/{year}.json``; a site is skipped entirely when
``data/metrics/{year}/{site}/metrics.json`` already exists.
Any failure stops the run immediately. Fix the issue and re-run completed
steps and sites are skipped automatically.
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` single site to run steps 35 for (default: all PASS sites)
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# Steps 1 and 2 are global (once per year); steps 35 repeat per site.
GLOBAL_STEPS: list[tuple[str, Path]] = [
# (script, skip-if-this-file-exists)
("1-phenocam.py", Path("phenocam/{year}.json")),
("2-phenocam-screening.py", Path("phenocam_screening/{year}.json")),
]
PER_SITE_STEPS = [
"3-sentinel-data.py",
"4-fusion.py",
"5-metrics.py",
]
def _load_pass_sites(year: int) -> list[str]:
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not path.is_file():
raise SystemExit(f"Step-2 screening manifest not found: {path}")
payload: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
sites = []
for row in payload.get("sites", []):
if row.get("calculations", {}).get("status") != "PASS":
continue
name = row.get("response", {}).get("camera", {}).get("Sitename")
if name:
sites.append(str(name))
return sorted(sites)
def _is_site_complete(year: int, site: str) -> bool:
return (DATA_DIR / "metrics" / str(year) / site / "metrics.json").is_file()
def _run_global_step(step: str, year: int) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year)],
check=False,
)
return result.returncode
def _run_site_step(step: str, year: int, site: str) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year), "--site", site],
check=False,
)
return result.returncode
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to process (default: all PASS sites)",
)
args = parser.parse_args(argv)
year = args.evaluation_year
# --- Global steps (steps 1 and 2) ---
for script, marker_template in GLOBAL_STEPS:
marker = DATA_DIR / str(marker_template).replace("{year}", str(year))
if marker.is_file():
print(f"[pipeline] {script} — skipping (output already exists)")
else:
print(f"[pipeline] Running {script}...")
rc = _run_global_step(script, year)
if rc != 0:
raise SystemExit(
f"[pipeline] {script} failed (exit {rc}); cannot continue"
)
# --- Per-site steps (steps 3, 4, 5) ---
sites = _load_pass_sites(year)
if args.site:
if args.site not in sites:
raise SystemExit(
f"Site '{args.site}' not found in step-2 PASS sites for {year}"
)
sites = [args.site]
n_total = len(sites)
skipped: list[str] = []
succeeded: list[str] = []
print(f"[pipeline] {n_total} PASS site(s) for {year}")
for i, site in enumerate(sites, 1):
prefix = f"[pipeline] ({i}/{n_total}) {site}"
if _is_site_complete(year, site):
print(f"{prefix} — skipping (already complete)")
skipped.append(site)
continue
print(f"{prefix}")
for step in PER_SITE_STEPS:
rc = _run_site_step(step, year, site)
if rc != 0:
raise SystemExit(f"[pipeline] {step} failed for {site} (exit {rc})")
succeeded.append(site)
print(
f"\n[pipeline] Done: {len(skipped)} skipped, {len(succeeded)} succeeded"
f" (of {n_total} total)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())

Binary file not shown.

1491
uv.lock generated Normal file

File diff suppressed because it is too large Load diff

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.