diff --git a/.gitignore b/.gitignore index b4c999e..1177266 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,9 @@ -# Project data -data/* -webapp/data +# Generated caches and downloads (regenerate via pipeline steps) +data/ -# Environment +# Environment and secrets .env -.venv +.venv/ venv/ env/ @@ -42,6 +41,3 @@ dist/ # OS .DS_Store Thumbs.db - -AGENTS.md -.vibe \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml deleted file mode 100644 index 6ba9060..0000000 --- a/.pre-commit-config.yaml +++ /dev/null @@ -1,8 +0,0 @@ -repos: - - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.8.4 - hooks: - - id: ruff - args: [--fix] - - id: ruff-format - diff --git a/1-phenocam.py b/1-phenocam.py new file mode 100644 index 0000000..9276080 --- /dev/null +++ b/1-phenocam.py @@ -0,0 +1,278 @@ +"""Step 1: download worldwide PhenoCam sites for a calendar year. + +Inputs (``data/``): none — queries the PhenoCam API. + +Outputs (``data/``, ``{year}`` = ``--evaluation-year``): + +- ``phenocam/{year}.json`` — site list manifest +- ``phenocam/{year}/{sitename}.json`` — camera + ROI metadata +- ``phenocam/{year}/{sitename}_1day.csv`` — ``one_day_summary`` GCC CSV + +CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional comma-separated filter). + +Next step: :mod:`2-phenocam-screening`. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from datetime import date +from pathlib import Path +from typing import Any + +import requests + +PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing" +if str(PROCESSING_DIR) not in sys.path: + sys.path.insert(0, str(PROCESSING_DIR)) + +from acquisition_phenocam import PHENOCAM_API # noqa: E402 +from acquisition_phenocam_all_europe import _paginate_cameras, _parse_iso_date # noqa: E402 + +EVALUATION_YEAR = 2025 +HOST_PROBE = "https://phenocam.nau.edu/api/cameras/?limit=1" +ONE_DAY_CSV_SUFFIX = "_1day.csv" + + +def check_phenocam_host() -> None: + try: + response = requests.get(HOST_PROBE, timeout=30) + response.raise_for_status() + except requests.RequestException as exc: + raise RuntimeError( + f"PhenoCam API unreachable (phenocam.nau.edu): " + f"{exc.__class__.__name__}: {exc}" + ) from exc + + +def _overlaps_year(first: str | None, last: str | None, season: int) -> bool: + start = _parse_iso_date(first) + end = _parse_iso_date(last) + if start is None or end is None: + return False + return start <= date(season, 12, 31) and end >= date(season, 1, 1) + + +def sites_dir(cache_dir: Path, evaluation_year: int) -> Path: + return cache_dir / "phenocam" / str(evaluation_year) + + +def site_json_path(cache_dir: Path, evaluation_year: int, sitename: str) -> Path: + return sites_dir(cache_dir, evaluation_year) / f"{sitename}.json" + + +def site_csv_path(cache_dir: Path, evaluation_year: int, sitename: str) -> Path: + return sites_dir(cache_dir, evaluation_year) / f"{sitename}{ONE_DAY_CSV_SUFFIX}" + + +def load_candidate_cameras( + evaluation_year: int, + *, + site_filter: set[str] | None = None, + active_only: bool = False, + limit: int | None = None, +) -> list[dict[str, Any]]: + cameras: list[dict[str, Any]] = [] + for camera in _paginate_cameras(): + if active_only and not camera.get("active"): + continue + sitename = str(camera["Sitename"]) + if site_filter is not None and sitename not in site_filter: + continue + if not _overlaps_year(camera.get("date_first"), camera.get("date_last"), evaluation_year): + continue + cameras.append(dict(camera)) + cameras.sort(key=lambda item: str(item["Sitename"])) + if limit is not None: + cameras = cameras[:limit] + return cameras + + +def fetch_roi_record(site_name: str) -> dict[str, Any] | None: + rois: list[dict[str, Any]] = [] + url = f"{PHENOCAM_API}/roilists/" + params: dict[str, Any] | None = {"site": site_name} + while url: + response = requests.get(url, params=params, timeout=60) + response.raise_for_status() + payload = response.json() + rois.extend( + item for item in payload.get("results", []) if item.get("site") == site_name + ) + url = payload.get("next") + params = None + if rois: + break + return dict(rois[0]) if rois else None + + +def download_one_day_csv(csv_url: str, output_path: Path) -> None: + response = requests.get(csv_url, timeout=60) + response.raise_for_status() + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(response.text, encoding="utf-8") + + +def download_site( + camera: dict[str, Any], + evaluation_year: int, + cache_dir: Path, +) -> str: + sitename = str(camera["Sitename"]) + roi = fetch_roi_record(sitename) + payload = {"response": {"camera": camera, "roi": roi}} + json_path = site_json_path(cache_dir, evaluation_year, sitename) + json_path.parent.mkdir(parents=True, exist_ok=True) + json_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + + csv_url = roi.get("one_day_summary") if roi else None + if csv_url: + download_one_day_csv(csv_url, site_csv_path(cache_dir, evaluation_year, sitename)) + return sitename + + +def load_or_download_site( + camera: dict[str, Any], + evaluation_year: int, + cache_dir: Path, + *, + refresh: bool, +) -> str: + sitename = str(camera["Sitename"]) + json_path = site_json_path(cache_dir, evaluation_year, sitename) + csv_path = site_csv_path(cache_dir, evaluation_year, sitename) + if not refresh and json_path.is_file(): + if not csv_path.is_file(): + payload = json.loads(json_path.read_text(encoding="utf-8")) + roi = payload.get("response", {}).get("roi") or {} + csv_url = roi.get("one_day_summary") + if csv_url: + download_one_day_csv(csv_url, csv_path) + return sitename + return download_site(camera, evaluation_year, cache_dir) + + +def run_download( + *, + cache_dir: Path, + evaluation_year: int, + active_only: bool = False, + site_filter: set[str] | None = None, + limit: int | None = None, + refresh: bool = False, +) -> list[str]: + check_phenocam_host() + candidates = load_candidate_cameras( + evaluation_year, + site_filter=site_filter, + active_only=active_only, + limit=limit, + ) + print( + f"[PhenoCam-1] {len(candidates)} candidate(s) with archive overlap for " + f"{evaluation_year}" + ) + + sitenames: list[str] = [] + for index, camera in enumerate(candidates, start=1): + sitename = str(camera["Sitename"]) + print( + f"[PhenoCam-1] ({index}/{len(candidates)}) {sitename} " + f"({float(camera['Lat']):.4f}, {float(camera['Lon']):.4f})" + ) + sitenames.append( + load_or_download_site( + camera, + evaluation_year, + cache_dir, + refresh=refresh, + ) + ) + return sorted(sitenames) + + +def write_manifest( + sitenames: list[str], + output_path: Path, + cache_dir: Path, + evaluation_year: int, +) -> None: + rel_sites_dir = sites_dir(cache_dir, evaluation_year).relative_to(output_path.parent) + payload = { + "evaluation_year": evaluation_year, + "count": len(sitenames), + "sites_dir": rel_sites_dir.as_posix(), + "sites": sitenames, + } + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + print(f"[PhenoCam-1] Wrote {output_path}") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--cache-dir", + type=Path, + default=Path("data"), + help="Base directory for per-site files and manifest", + ) + parser.add_argument( + "--evaluation-year", + type=int, + default=EVALUATION_YEAR, + help=f"Calendar year to download (default: {EVALUATION_YEAR})", + ) + parser.add_argument( + "--active-only", + action="store_true", + help="Restrict candidates to cameras marked active in the API", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Process only the first N candidate sites (testing)", + ) + parser.add_argument( + "--sites", + type=str, + default=None, + help="Comma-separated sitenames to download (testing)", + ) + parser.add_argument( + "--refresh", + action="store_true", + help="Re-download sites even when cache files exist", + ) + parser.add_argument( + "--output-json", + type=Path, + default=None, + help="Manifest output path (default: data/phenocam/{year}.json)", + ) + args = parser.parse_args(argv) + + site_filter = None + if args.sites: + site_filter = {name.strip() for name in args.sites.split(",") if name.strip()} + + sitenames = run_download( + cache_dir=args.cache_dir, + evaluation_year=args.evaluation_year, + active_only=args.active_only, + site_filter=site_filter, + limit=args.limit, + refresh=args.refresh, + ) + manifest_path = args.output_json or ( + args.cache_dir / "phenocam" / f"{args.evaluation_year}.json" + ) + write_manifest(sitenames, manifest_path, args.cache_dir, args.evaluation_year) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/2-phenocam-screening.py b/2-phenocam-screening.py new file mode 100644 index 0000000..954f44e --- /dev/null +++ b/2-phenocam-screening.py @@ -0,0 +1,495 @@ +"""Step 2: PhenoCam GCC + SNR screening on step-1 cache. + +Inputs (``data/``, ``{year}`` = ``--evaluation-year``): + +- ``phenocam/{year}.json`` — step-1 manifest +- ``phenocam/{year}/{sitename}.json`` — per-site metadata +- ``phenocam/{year}/{sitename}_1day.csv`` — GCC timeseries + +Outputs (``data/phenocam_screening/``): + +- ``{year}.json`` — full per-site results +- ``{year}.csv`` — flat summary table + +CLI: ``--evaluation-year`` (default 2025), ``--sites`` (optional; default: all manifest sites). + +Next step: :mod:`3-sentinel-clouds`. +""" + +from __future__ import annotations + +import argparse +import csv +import json +import math +import sys +from datetime import date, datetime +from pathlib import Path +from typing import Any + +import numpy as np +from scipy.interpolate import UnivariateSpline + +PROCESSING_DIR = Path(__file__).resolve().parents[1] / "processing" +if str(PROCESSING_DIR) not in sys.path: + sys.path.insert(0, str(PROCESSING_DIR)) + +from acquisition_phenocam import _phenocam_summary_gcc_value # noqa: E402 + +MIN_GCC_POINTS = 30 +SNR_THRESHOLD = 2.0 +CLUSTER_RADIUS_M = 500.0 +GATE_ORDER = ("phenocam", "snr", "cluster") +ONE_DAY_CSV_SUFFIX = "_1day.csv" +_EARTH_RADIUS_M = 6371000.0 + + +def load_manifest(path: Path) -> dict[str, Any]: + payload = json.loads(path.read_text(encoding="utf-8")) + for key in ("evaluation_year", "sites_dir", "sites"): + if key not in payload: + raise ValueError(f"Expected '{key}' in manifest {path}") + return payload + + +def resolve_sites_dir(manifest_path: Path, manifest: dict[str, Any]) -> Path: + return (manifest_path.parent / manifest["sites_dir"]).resolve() + + +def load_site_entry(sites_dir: Path, sitename: str) -> dict[str, Any]: + json_path = sites_dir / f"{sitename}.json" + payload = json.loads(json_path.read_text(encoding="utf-8")) + csv_path = sites_dir / f"{sitename}{ONE_DAY_CSV_SUFFIX}" + payload["_one_day_csv"] = csv_path if csv_path.is_file() else None + return payload + + +def parse_gcc90_series(csv_path: Path, evaluation_year: int) -> list[tuple[str, float]]: + lines = [ + line + for line in csv_path.read_text(encoding="utf-8").split("\n") + if line and not line.startswith("#") + ] + reader = csv.DictReader(lines) + fieldnames = reader.fieldnames or () + use_mean_fallback = "gcc_90" not in fieldnames + + year_start = date(evaluation_year, 1, 1) + year_end = date(evaluation_year, 12, 31) + series: list[tuple[str, float]] = [] + for row in reader: + date_str = row.get("date") + if not date_str: + continue + try: + row_date = datetime.strptime(date_str, "%Y-%m-%d").date() + except ValueError: + continue + if not (year_start <= row_date <= year_end): + continue + gcc = _phenocam_summary_gcc_value(row, use_mean_fallback) + if gcc is None: + continue + series.append((row_date.isoformat(), float(gcc))) + series.sort(key=lambda item: item[0]) + return series + + +def _months_covered(day_strings: list[str]) -> int: + months: set[int] = set() + for day in day_strings: + months.add(datetime.strptime(day, "%Y-%m-%d").month) + return len(months) + + +def _aic_for_spline(x: np.ndarray, y: np.ndarray, spline: UnivariateSpline) -> float: + residuals = y - spline(x) + rss = float(np.sum(residuals**2)) + n = len(y) + if rss <= 0 or n < 4: + return math.inf + edf = float(spline.get_knots().shape[0] + spline.get_coeffs().shape[0]) + return n * math.log(rss / n) + 2.0 * edf + + +def compute_snr_aic_spline(series: list[tuple[str, float]]) -> float | None: + if len(series) < MIN_GCC_POINTS: + return None + + dates = [datetime.strptime(day, "%Y-%m-%d").date() for day, _ in series] + x = np.array([(d - dates[0]).days for d in dates], dtype=float) + y = np.array([value for _, value in series], dtype=float) + if len(np.unique(x)) < 5: + return None + + y_var = float(np.var(y)) + if y_var <= 0: + return None + + candidates = np.logspace(-4, 2, 40) * y_var * len(y) + best_spline: UnivariateSpline | None = None + best_aic = math.inf + for smoothing in candidates: + try: + spline = UnivariateSpline(x, y, k=3, s=float(smoothing)) + except Exception: + continue + aic = _aic_for_spline(x, y, spline) + if aic < best_aic: + best_aic = aic + best_spline = spline + + if best_spline is None: + return None + + residuals = y - best_spline(x) + rmse = float(np.sqrt(np.mean(residuals**2))) + amplitude = float(np.max(y) - np.min(y)) + if rmse <= 0: + return None + return amplitude / rmse + + +def screen_site( + site_entry: dict[str, Any], + *, + evaluation_year: int, + min_gcc_points: int, + snr_threshold: float, +) -> dict[str, Any]: + response = site_entry["response"] + roi = response.get("roi") + csv_path = site_entry.get("_one_day_csv") + calculations: dict[str, Any] = { + "evaluation_year": evaluation_year, + "n_gcc_points": 0, + "first_gcc_date": None, + "last_gcc_date": None, + "months_with_gcc": 0, + "snr": None, + "min_gcc_points": min_gcc_points, + "snr_threshold": snr_threshold, + "status": "FAIL", + "failing_gate": None, + "passed_gates": [], + "reason": None, + } + + if roi is None or not roi.get("one_day_summary") or csv_path is None: + calculations["failing_gate"] = "phenocam" + calculations["reason"] = "no_roi" + return {"response": response, "calculations": calculations} + + series = parse_gcc90_series(csv_path, evaluation_year) + calculations["n_gcc_points"] = len(series) + if calculations["n_gcc_points"] == 0: + calculations["failing_gate"] = "phenocam" + calculations["reason"] = "no_gcc_in_year" + return {"response": response, "calculations": calculations} + + day_strings = [day for day, _ in series] + calculations["first_gcc_date"] = day_strings[0] + calculations["last_gcc_date"] = day_strings[-1] + calculations["months_with_gcc"] = _months_covered(day_strings) + + if calculations["n_gcc_points"] < min_gcc_points: + calculations["failing_gate"] = "phenocam" + calculations["reason"] = "insufficient_gcc_points" + return {"response": response, "calculations": calculations} + + calculations["passed_gates"].append("phenocam") + + snr = compute_snr_aic_spline(series) + calculations["snr"] = snr + if snr is None or snr < snr_threshold: + calculations["failing_gate"] = "snr" + calculations["reason"] = "insufficient_snr" if snr is not None else "snr_undefined" + return {"response": response, "calculations": calculations} + + calculations["passed_gates"].append("snr") + calculations["status"] = "PASS" + calculations["failing_gate"] = None + calculations["reason"] = None + return {"response": response, "calculations": calculations} + + +def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: + p1, p2 = math.radians(lat1), math.radians(lat2) + dlat = math.radians(lat2 - lat1) + dlon = math.radians(lon2 - lon1) + a = math.sin(dlat / 2) ** 2 + math.cos(p1) * math.cos(p2) * math.sin(dlon / 2) ** 2 + return 2 * _EARTH_RADIUS_M * math.asin(math.sqrt(a)) + + +def _site_coords(row: dict[str, Any]) -> tuple[float, float] | None: + camera = row["response"]["camera"] + lat, lon = camera.get("Lat"), camera.get("Lon") + if lat is None or lon is None: + return None + return float(lat), float(lon) + + +def _cluster_rank(row: dict[str, Any]) -> tuple[int, float]: + calc = row["calculations"] + return calc["n_gcc_points"], float(calc.get("snr") or 0.0) + + +def apply_cluster_gate(results: list[dict[str, Any]], *, radius_m: float) -> int: + pool: list[tuple[int, float, float]] = [] + for idx, row in enumerate(results): + if "snr" not in row["calculations"]["passed_gates"]: + continue + coords = _site_coords(row) + if coords is None: + row["calculations"]["passed_gates"].append("cluster") + continue + pool.append((idx, coords[0], coords[1])) + + n = len(pool) + parent = list(range(n)) + + def find(x: int) -> int: + while parent[x] != x: + parent[x] = parent[parent[x]] + x = parent[x] + return x + + def union(a: int, b: int) -> None: + ra, rb = find(a), find(b) + if ra != rb: + parent[rb] = ra + + for i in range(n): + _, lat1, lon1 = pool[i] + for j in range(i + 1, n): + _, lat2, lon2 = pool[j] + if _haversine_m(lat1, lon1, lat2, lon2) <= radius_m: + union(i, j) + + clusters: dict[int, list[int]] = {} + for i in range(n): + clusters.setdefault(find(i), []).append(i) + + demoted = 0 + for members in clusters.values(): + result_indices = [pool[i][0] for i in members] + cluster_size = len(result_indices) + winner_idx = max(result_indices, key=lambda idx: _cluster_rank(results[idx])) + winner_name = str(results[winner_idx]["response"]["camera"]["Sitename"]) + for idx in result_indices: + calc = results[idx]["calculations"] + calc["cluster_size"] = cluster_size + if idx == winner_idx: + calc["passed_gates"].append("cluster") + else: + calc["status"] = "FAIL" + calc["failing_gate"] = "cluster" + calc["reason"] = "nearby_duplicate" + calc["cluster_winner"] = winner_name + demoted += 1 + return demoted + + +def run_screening( + manifest: dict[str, Any], + sites_dir: Path, + *, + evaluation_year: int, + min_gcc_points: int, + snr_threshold: float, + site_filter: set[str] | None = None, +) -> list[dict[str, Any]]: + results: list[dict[str, Any]] = [] + sitenames = manifest["sites"] + if site_filter is not None: + sitenames = [name for name in sitenames if name in site_filter] + for index, sitename in enumerate(sitenames, start=1): + print(f"[PhenoCam-2] ({index}/{len(sitenames)}) {sitename}") + site_entry = load_site_entry(sites_dir, sitename) + results.append( + screen_site( + site_entry, + evaluation_year=evaluation_year, + min_gcc_points=min_gcc_points, + snr_threshold=snr_threshold, + ) + ) + return results + + +def print_summary(results: list[dict[str, Any]], evaluation_year: int) -> None: + passing = [row for row in results if row["calculations"]["status"] == "PASS"] + gates_label = " + ".join(GATE_ORDER) + print( + f"\n[PhenoCam-2] Screening for {evaluation_year}: " + f"{len(passing)}/{len(results)} pass ({gates_label})" + ) + + for gate in GATE_ORDER: + fails = sum(1 for row in results if row["calculations"]["failing_gate"] == gate) + after = sum(1 for row in results if gate in row["calculations"]["passed_gates"]) + print(f" after_{gate}: {after}, fail_at_{gate}: {fails}") + + print("\nPer-site table") + print( + f"{'site':<24} {'n':>4} {'mon':>3} {'snr':>6} " + f"{'status':>6} gate reason" + ) + print("-" * 72) + for row in sorted( + results, + key=lambda item: str(item["response"]["camera"]["Sitename"]), + ): + camera = row["response"]["camera"] + calc = row["calculations"] + snr_text = f"{calc['snr']:.2f}" if calc["snr"] is not None else "" + print( + f"{camera['Sitename']:<24} {calc['n_gcc_points']:4d} " + f"{calc['months_with_gcc']:3d} {snr_text:>6} " + f"{calc['status']:>6} {(calc['failing_gate'] or '-'):<8} " + f"{calc['reason'] or '-'}" + ) + + +def write_screening_json( + results: list[dict[str, Any]], + output_path: Path, + evaluation_year: int, +) -> None: + passing = [row for row in results if row["calculations"]["status"] == "PASS"] + payload = { + "evaluation_year": evaluation_year, + "count": len(results), + "qualifying_count": len(passing), + "sites": sorted( + results, + key=lambda item: str(item["response"]["camera"]["Sitename"]), + ), + } + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + print(f"[PhenoCam-2] Wrote {output_path}") + + +def write_screening_csv(results: list[dict[str, Any]], output_path: Path) -> None: + rows: list[dict[str, Any]] = [] + for row in results: + camera = row["response"]["camera"] + metadata = camera.get("sitemetadata") or {} + roi = row["response"].get("roi") or {} + calc = row["calculations"] + rows.append( + { + "Sitename": camera.get("Sitename"), + "Lat": camera.get("Lat"), + "Lon": camera.get("Lon"), + "site_description": metadata.get("site_description"), + "primary_veg_type": metadata.get("primary_veg_type"), + "site_type": metadata.get("site_type"), + "one_day_summary": roi.get("one_day_summary"), + **calc, + } + ) + fieldnames = list(rows[0].keys()) if rows else ["Sitename", "status"] + if rows: + extra = [k for row in rows for k in row if k not in fieldnames] + fieldnames.extend(dict.fromkeys(extra)) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8", newline="") as handle: + writer = csv.DictWriter(handle, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(rows) + print(f"[PhenoCam-2] Wrote {output_path}") + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--evaluation-year", + type=int, + default=2025, + help="Evaluation year (default: 2025)", + ) + parser.add_argument( + "--sites", + type=str, + default=None, + help="Comma-separated sitenames (default: all sites in step-1 manifest)", + ) + parser.add_argument( + "--min-gcc-points", + type=int, + default=MIN_GCC_POINTS, + help=f"Minimum valid gcc_90 observations in-year (default: {MIN_GCC_POINTS})", + ) + parser.add_argument( + "--snr-threshold", + type=float, + default=SNR_THRESHOLD, + help=f"Minimum AIC-spline SNR (default: {SNR_THRESHOLD})", + ) + parser.add_argument( + "--output-json", + type=Path, + default=None, + help="Screening output (default: data/phenocam_screening/{year}.json)", + ) + parser.add_argument( + "--output-csv", + type=Path, + default=None, + help="Flat CSV summary path", + ) + parser.add_argument( + "--cluster-radius-m", + type=float, + default=CLUSTER_RADIUS_M, + help=f"Deduplicate SNR-passed sites within this radius (default: {CLUSTER_RADIUS_M})", + ) + parser.add_argument( + "--no-cluster", + action="store_true", + help="Skip nearby-site deduplication gate", + ) + args = parser.parse_args(argv) + + evaluation_year = args.evaluation_year + manifest_path = Path("data") / "phenocam" / f"{evaluation_year}.json" + if not manifest_path.is_file(): + raise SystemExit(f"Step-1 manifest not found: {manifest_path}") + + site_filter = None + if args.sites: + site_filter = {name.strip() for name in args.sites.split(",") if name.strip()} + + manifest = load_manifest(manifest_path) + sites_dir_path = resolve_sites_dir(manifest_path, manifest) + + results = run_screening( + manifest, + sites_dir_path, + evaluation_year=evaluation_year, + min_gcc_points=args.min_gcc_points, + snr_threshold=args.snr_threshold, + site_filter=site_filter, + ) + if not args.no_cluster: + demoted = apply_cluster_gate(results, radius_m=args.cluster_radius_m) + if demoted: + print(f"[PhenoCam-2] Cluster dedup: demoted {demoted} nearby duplicate(s)") + print_summary(results, evaluation_year) + + default_dir = Path("data") / "phenocam_screening" + json_name = f"{evaluation_year}.json" + csv_name = f"{evaluation_year}.csv" + write_screening_json( + results, + args.output_json or (default_dir / json_name), + evaluation_year, + ) + write_screening_csv(results, args.output_csv or (default_dir / csv_name)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/3-sentinel-data.py b/3-sentinel-data.py new file mode 100644 index 0000000..16287f8 --- /dev/null +++ b/3-sentinel-data.py @@ -0,0 +1,805 @@ +"""Step 3: Download S2 and S3 rasters and prepare EFAST inputs. + +Inputs (``data/``, ``{year}`` = ``--evaluation-year``): + +- ``phenocam_screening/{year}.json`` — step-2 PASS sites (coordinates included) + +Outputs (``data/``): + +- ``sentinel_data/{year}/{sitename}/raw/s3/*.tif`` — S3 SYN L2 per-date GeoTIFFs +- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — S2 REFL + DIST_CLOUD GeoTIFFs +- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — S3 composite GeoTIFFs +- ``sentinel_data/{year}/{sitename}/data.json`` — run summary + +Requires ``CDSE_USER`` / ``CDSE_PASSWORD`` (``uv sync`` installs efast). + +CLI: + +- ``--evaluation-year`` (default 2025) +- ``--site`` (optional; default: all step-2 PASS sites) + +Prior step: :mod:`2-phenocam-screening`. +Next step: :mod:`4-fusion`. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +import time +from datetime import datetime +from pathlib import Path +from typing import Any + +import netCDF4 +import numpy as np +import openeo +import rasterio +import requests +from dotenv import load_dotenv +from pystac_client import Client +from rasterio import shutil as rio_shutil +from rasterio.enums import Resampling +from rasterio.errors import WindowError +from rasterio.transform import from_bounds +from rasterio.vrt import WarpedVRT +from rasterio.warp import transform_geom +from rasterio.windows import Window +from rasterio.windows import from_bounds as window_from_bounds +from rasterio.windows import transform as window_transform +from shapely import wkt as shapely_wkt +from tqdm import tqdm + +# --------------------------------------------------------------------------- +# Public constants — edit here to change pipeline behaviour +# --------------------------------------------------------------------------- + +S2_BANDS = ["B02", "B03", "B04"] + +S3_BANDS = [ + "Syn_Oa04_reflectance", + "Syn_Oa06_reflectance", + "Syn_Oa08_reflectance", + "Syn_Oa17_reflectance", +] +S3_BAND_NAMES = ["SDR_Oa04", "SDR_Oa06", "SDR_Oa08", "SDR_Oa17"] + +RESOLUTION_RATIO = 30 +S3_MOSAIC_DAYS = 100 +S3_COMPOSITE_STEP = 2 +S3_COMPOSITE_SIGMA_DOY = 10 +S3_COMPOSITE_D = 20 +S3_SMOOTHING_STD = 1 +S3_REFLECTANCE_SCALE = 10_000 # OpenEO SYN L2 SDR → 0–1 (EFAST expects < 5) + +# --------------------------------------------------------------------------- +# Internal S2 constants +# --------------------------------------------------------------------------- + +EARTH_SEARCH_URL = "https://earth-search.aws.element84.com/v1" + +_BAND_ASSETS: dict[str, str] = { + "B02": "blue", + "B03": "green", + "B04": "red", + "B05": "rededge1", + "B06": "rededge2", + "B07": "rededge3", + "B08": "nir", + "B8A": "nir08", + "B11": "swir16", + "B12": "swir22", +} +_SCL_ASSET = "scl" +_MIN_BBOX_HALF_DEG = 0.008 + +# --------------------------------------------------------------------------- +# Internal S3 constants +# --------------------------------------------------------------------------- + +CDSE_TOKEN_URL = ( + "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/" + "protocol/openid-connect/token" +) +OPENEO_URL = "openeo.dataspace.copernicus.eu" +S3_COLLECTION = "SENTINEL3_SYN_L2_SYN" + +DATA_DIR = Path("data") +DEFAULT_YEAR = 2025 + + +# --------------------------------------------------------------------------- +# Credentials +# --------------------------------------------------------------------------- + + +def _cdse_credentials() -> dict[str, str | None]: + load_dotenv() + return { + "username": os.getenv("CDSE_USER"), + "password": os.getenv("CDSE_PASSWORD"), + } + + +# --------------------------------------------------------------------------- +# Screening manifest helpers +# --------------------------------------------------------------------------- + + +def _load_screening_pass_sites(year: int) -> list[dict[str, Any]]: + """Return list of PASS-site dicts from step-2 screening JSON. + + Each entry has ``sitename``, ``lat``, ``lon`` keys. + """ + path = DATA_DIR / "phenocam_screening" / f"{year}.json" + if not path.is_file(): + raise FileNotFoundError(f"Step-2 screening manifest not found: {path}") + payload = json.loads(path.read_text(encoding="utf-8")) + sites = [] + for row in payload.get("sites", []): + calc = row.get("calculations", {}) + if calc.get("status") != "PASS": + continue + camera = row.get("response", {}).get("camera", {}) + name = camera.get("Sitename") + lat = camera.get("Lat") + lon = camera.get("Lon") + if name and lat is not None and lon is not None: + sites.append({"sitename": str(name), "lat": float(lat), "lon": float(lon)}) + return sites + + +# --------------------------------------------------------------------------- +# S2: geometry helpers (from s2_cloud_native.py) +# --------------------------------------------------------------------------- + + +def wkt_to_bbox(geometry_wkt: str) -> list[float]: + """Convert a WKT geometry to a ``[west, south, east, north]`` bbox.""" + geom = shapely_wkt.loads(geometry_wkt) + minx, miny, maxx, maxy = geom.bounds + if minx == maxx and miny == maxy: + minx -= _MIN_BBOX_HALF_DEG + maxx += _MIN_BBOX_HALF_DEG + miny -= _MIN_BBOX_HALF_DEG + maxy += _MIN_BBOX_HALF_DEG + return [minx, miny, maxx, maxy] + + +def _boa_offset(item: Any) -> int: + """Return the BOA additive offset for a STAC item. + + Processing baseline >= 04.00 applies a -1000 offset; earlier baselines use 0. + """ + if item.properties.get("earthsearch:boa_offset_applied"): + return 0 + baseline_str = str( + item.properties.get("processing:baseline") + or item.properties.get("s2:processing_baseline") + or "0" + ) + try: + baseline = float(baseline_str) + except ValueError: + baseline = 0.0 + return -1000 if baseline >= 4.0 else 0 + + +def _window_for_bbox( + src: rasterio.io.DatasetReader, + bbox_4326: list[float], +) -> Window | None: + """Return the rasterio Window for a EPSG:4326 bbox clipped to src bounds.""" + bbox_geom = { + "type": "Polygon", + "coordinates": [ + [ + [bbox_4326[0], bbox_4326[1]], + [bbox_4326[2], bbox_4326[1]], + [bbox_4326[2], bbox_4326[3]], + [bbox_4326[0], bbox_4326[3]], + [bbox_4326[0], bbox_4326[1]], + ] + ], + } + src_geom = transform_geom("EPSG:4326", src.crs.to_wkt(), bbox_geom) + xs = [c[0] for c in src_geom["coordinates"][0][:4]] + ys = [c[1] for c in src_geom["coordinates"][0][:4]] + win = window_from_bounds(min(xs), min(ys), max(xs), max(ys), src.transform) + try: + return win.intersection(Window(0, 0, src.width, src.height)) + except WindowError: + return None + + +def _read_window( + href: str, + bbox_4326: list[float], + out_shape: tuple[int, int] | None = None, + resampling: Resampling = Resampling.bilinear, +) -> tuple[np.ndarray, dict[str, Any]] | None: + """Range-read a single-band array for the bbox window from a COG URL.""" + with rasterio.open(href) as src: + win = _window_for_bbox(src, bbox_4326) + if win is None: + return None + data = src.read(1, window=win, out_shape=out_shape, resampling=resampling) + profile: dict[str, Any] = { + "crs": src.crs, + "transform": window_transform(win, src.transform), + "height": data.shape[0], + "width": data.shape[1], + "dtype": src.dtypes[0], + } + return data, profile + + +def _read_bands( + item: Any, + bbox: list[float], + bands: list[str], +) -> tuple[list[np.ndarray], dict[str, Any]] | None: + """Range-read all requested bands for one STAC item.""" + band_arrays: list[np.ndarray] = [] + ref_profile: dict[str, Any] | None = None + + for band_name in bands: + asset_key = _BAND_ASSETS.get(band_name) + if asset_key is None or asset_key not in item.assets: + return None + ref_shape = ( + (ref_profile["height"], ref_profile["width"]) if ref_profile else None + ) + result = _read_window(item.assets[asset_key].href, bbox, out_shape=ref_shape) + if result is None: + return None + data, profile = result + if ref_profile is None: + ref_profile = profile + band_arrays.append(data.astype("float32")) + + return (band_arrays, ref_profile) if ref_profile is not None else None + + +def _cloud_mask(item: Any, bbox: list[float], shape: tuple[int, int]) -> np.ndarray: + """Return a boolean cloud/shadow mask from the item's SCL band. + + Masks SCL classes 0 (no data), 3 (cloud shadow), and >7 (clouds, cirrus, snow). + """ + scl = item.assets.get(_SCL_ASSET) + result = ( + _read_window(scl.href, bbox, out_shape=shape, resampling=Resampling.nearest) + if scl + else None + ) + if result is None: + return np.zeros(shape, dtype=bool) + scl_data, _ = result + return (scl_data == 0) | (scl_data == 3) | (scl_data > 7) + + +def _pad_to_multiple(arr: np.ndarray, ratio: int) -> np.ndarray: + """Zero-pad (bands, H, W) so H and W are multiples of ``ratio``.""" + pad_h = (ratio - arr.shape[1] % ratio) % ratio + pad_w = (ratio - arr.shape[2] % ratio) % ratio + if pad_h or pad_w: + arr = np.pad(arr, ((0, 0), (0, pad_h), (0, pad_w)), constant_values=0) + return arr + + +# --------------------------------------------------------------------------- +# S2: STAC search + download (from s2_cloud_native.py) +# --------------------------------------------------------------------------- + + +def stac_search_s2( + bbox: list[float], + start_date: datetime, + end_date: datetime, +) -> list[Any]: + """Search Earth Search for S2 L2A items intersecting a bbox.""" + client = Client.open(EARTH_SEARCH_URL) + search = client.search( + collections=["sentinel-2-l2a"], + bbox=bbox, + datetime=( + f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/" + f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}" + ), + max_items=10_000, + ) + return list({item.id: item for item in search.items()}.values()) + + +def download_s2_window( + items: list[Any], + bbox: list[float], + output_dir: Path, + bands: list[str], + ratio: int = RESOLUTION_RATIO, +) -> None: + """Range-read S2 L2A COG windows and write masked REFL GeoTIFFs. + + Writes ``{item.id}_REFL.tif`` directly — no intermediate raw download. + Cloud/shadow pixels (SCL 0, 3, >7) are zeroed. BOA offset is inferred from + ``processing:baseline``. Output is zero-padded to multiples of ``ratio``. + """ + output_dir.mkdir(parents=True, exist_ok=True) + + for item in tqdm(items, unit="granule", desc="S2 COG window read"): + out_path = output_dir / f"{item.id}_REFL.tif" + if out_path.is_file(): + continue + + bands_result = _read_bands(item, bbox, bands) + if bands_result is None: + tqdm.write(f"[S2] Skipping {item.id}: missing asset or no bbox overlap") + continue + band_arrays, ref_profile = bands_result + target_shape = (ref_profile["height"], ref_profile["width"]) + mask = _cloud_mask(item, bbox, target_shape) + + stacked = (np.stack(band_arrays) + _boa_offset(item)) / 10_000.0 + np.clip(stacked, 0, None, out=stacked) + stacked[:, mask] = 0.0 + stacked = _pad_to_multiple(stacked, ratio) + + out_profile = { + "driver": "GTiff", + "count": len(bands), + "dtype": "float32", + "nodata": 0, + "crs": ref_profile["crs"], + "transform": ref_profile["transform"], + "height": stacked.shape[1], + "width": stacked.shape[2], + "compress": "lzw", + } + with rasterio.open(out_path, "w", **out_profile) as dst: + dst.write(stacked) + for i, band_name in enumerate(bands, 1): + dst.set_band_description(i, band_name) + + +# --------------------------------------------------------------------------- +# S3: download (from s3_openeo.py) +# --------------------------------------------------------------------------- + + +def _utm_epsg(bbox: list[float]) -> int: + """Return the UTM EPSG code for the centre of a ``[W, S, E, N]`` bbox.""" + lon = (bbox[0] + bbox[2]) / 2 + lat = (bbox[1] + bbox[3]) / 2 + zone = int((lon + 180) / 6) + 1 + return 32600 + zone if lat >= 0 else 32700 + zone + + +def _cdse_token(username: str, password: str) -> str: + """Obtain a CDSE bearer token via password grant.""" + resp = requests.post( + CDSE_TOKEN_URL, + data={ + "grant_type": "password", + "username": username, + "password": password, + "client_id": "cdse-public", + }, + timeout=30, + ) + resp.raise_for_status() + return resp.json()["access_token"] + + +def _netcdf_to_geotiffs(nc_path: Path, output_dir: Path, epsg: int) -> int: + """Split an OpenEO NetCDF into per-date GeoTIFFs. + + Output filenames match the ``S3*__YYYYMMDDTHHMMSS.tif`` pattern that + ``s3_processing.produce_median_composite`` expects. + + Handles half-pixel cell-centre coordinates, ascending y-axis (flip_y), + and fills NetCDF masked values with NaN. + """ + written = 0 + with netCDF4.Dataset(str(nc_path), "r") as nc: + times = netCDF4.num2date(nc.variables["t"][:], nc.variables["t"].units) + x_coords = np.asarray(nc.variables["x"][:], dtype=float) + y_coords = np.asarray(nc.variables["y"][:], dtype=float) + + half_x = abs(x_coords[1] - x_coords[0]) / 2 if len(x_coords) > 1 else 0.0 + half_y = abs(y_coords[1] - y_coords[0]) / 2 if len(y_coords) > 1 else 0.0 + transform = from_bounds( + x_coords.min() - half_x, + y_coords.min() - half_y, + x_coords.max() + half_x, + y_coords.max() + half_y, + len(x_coords), + len(y_coords), + ) + flip_y = len(y_coords) > 1 and y_coords[0] < y_coords[-1] + + date_counts: dict[str, int] = {} + for t_idx, time_val in enumerate(times): + date_str = time_val.strftime("%Y%m%d") + n = date_counts.get(date_str, 0) + date_counts[date_str] = n + 1 + + raw = np.stack( + [nc.variables[b][t_idx, :, :] for b in S3_BANDS], axis=0 + ) + stacked = ( + np.ma.filled(raw, fill_value=np.nan).astype("float32") + / S3_REFLECTANCE_SCALE + ) + if flip_y: + stacked = stacked[:, ::-1, :] + + filename = f"S3_{date_str}_{n}__{date_str}T120000.tif" + with rasterio.open( + output_dir / filename, + "w", + driver="GTiff", + height=len(y_coords), + width=len(x_coords), + count=len(S3_BANDS), + dtype="float32", + nodata=float("nan"), + crs=f"EPSG:{epsg}", + transform=transform, + compress="lzw", + ) as dst: + dst.write(stacked) + for i, band_name in enumerate(S3_BAND_NAMES, 1): + dst.set_band_description(i, band_name) + written += 1 + + return written + + +def download_s3_openeo( + start_date: datetime, + end_date: datetime, + aoi_geometry: str, + output_dir: Path, + credentials: dict[str, str | None], +) -> None: + """Download S3 SYN L2 SDR for an AOI via CDSE OpenEO, server-side clipped. + + Writes per-date ``S3_{YYYYMMDD}_{n}__{YYYYMMDD}T120000.tif`` files to + ``output_dir``, ready for ``s3_processing.produce_median_composite``. + Skips if any ``S3*.tif`` files already exist. + """ + output_dir.mkdir(parents=True, exist_ok=True) + + if any(output_dir.glob("S3*.tif")): + print("[S3-OEO] Skipping — output_dir already contains S3 GeoTIFFs") + return + + bbox = wkt_to_bbox(aoi_geometry) + epsg = _utm_epsg(bbox) + spatial_extent = { + "west": bbox[0], + "east": bbox[2], + "south": bbox[1], + "north": bbox[3], + } + + print("[S3-OEO] Authenticating with CDSE...") + token = _cdse_token(credentials["username"], credentials["password"]) # type: ignore[arg-type] + conn = openeo.connect(OPENEO_URL) + conn.authenticate_oidc_access_token(token) + + start_str = start_date.strftime("%Y-%m-%d") + end_str = end_date.strftime("%Y-%m-%d") + print(f"[S3-OEO] Loading {S3_COLLECTION} ({start_str} → {end_str})...") + datacube = conn.load_collection( + S3_COLLECTION, + spatial_extent=spatial_extent, + temporal_extent=[start_str, end_str], + bands=S3_BANDS, + ).resample_spatial(projection=epsg) + + nc_path = output_dir / "_s3_syn_l2.nc" + print(f"[S3-OEO] Downloading NetCDF to {nc_path}...") + t0 = time.time() + datacube.download(str(nc_path), format="NetCDF") + print(f"[S3-OEO] Download completed in {time.time() - t0:.1f}s") + + print("[S3-OEO] Splitting into per-date GeoTIFFs...") + written = _netcdf_to_geotiffs(nc_path, output_dir, epsg) + nc_path.unlink(missing_ok=True) + print(f"[S3-OEO] {written} GeoTIFFs written to {output_dir}") + + +# --------------------------------------------------------------------------- +# S2: distance_to_clouds helper +# --------------------------------------------------------------------------- + + +def _import_distance_to_clouds(): + try: + from efast.s2_processing import distance_to_clouds + + return distance_to_clouds + except ImportError as exc: + raise ImportError( + "efast not found. Install with: uv sync" + ) from exc + + +def _rescale_dist_cloud(s2_dir: Path) -> None: + """Ensure DIST_CLOUD values are in pixel units (not normalised to [0,1]).""" + for dc_path in s2_dir.glob("*DIST_CLOUD.tif"): + with rasterio.open(dc_path) as src: + d = src.read(1) + if float(np.nanmax(d)) <= 1: + with rasterio.open(dc_path, "r+") as dst: + dst.write(np.where(d > 0, 2.0, d).astype(np.float32), 1) + + +# --------------------------------------------------------------------------- +# S3: compositing + reprojection helpers (from 4-sentinel-data.py) +# --------------------------------------------------------------------------- + + +def _import_s3_processing(): + try: + from efast import s3_processing + + return s3_processing + except ImportError as exc: + raise ImportError( + "efast not found. Install with: uv sync" + ) from exc + + +def _reproject_s3_composites_to_s2_grid( + composite_dir: Path, + s2_refl_path: Path, + s3_out_dir: Path, + *, + resolution_ratio: int = RESOLUTION_RATIO, +) -> None: + """Reproject S3 composites to the S2 spatial grid at LR resolution.""" + s3_out_dir.mkdir(parents=True, exist_ok=True) + with rasterio.open(s2_refl_path) as s2_ref: + target_bounds = s2_ref.bounds + target_crs = s2_ref.crs + width = s2_ref.width // resolution_ratio + height = s2_ref.height // resolution_ratio + s3_transform = rasterio.transform.from_bounds( + target_bounds.left, + target_bounds.bottom, + target_bounds.right, + target_bounds.top, + width, + height, + ) + + for sen3_path in sorted(composite_dir.glob("composite_*.tif")): + date_part = sen3_path.stem.split("_", 1)[1].replace("-", "") + outfile = s3_out_dir / f"composite_{date_part}.tif" + vrt_options = { + "transform": s3_transform, + "height": height, + "width": width, + "crs": target_crs, + "resampling": Resampling.cubic, + } + with rasterio.open(sen3_path) as s3_src: + with WarpedVRT(s3_src, **vrt_options) as vrt: + profile = vrt.profile.copy() + profile.update({"dtype": "float32", "nodata": 0, "driver": "GTiff"}) + rio_shutil.copy(vrt, outfile, **profile) + + +def _s3_reflectance_scale(raw_s3_dir: Path) -> float: + """Return multiplier that maps raw SYN L2 SDR values to 0–1 reflectance.""" + for path in raw_s3_dir.glob("S3*.tif"): + with rasterio.open(path) as src: + mx = float(np.nanmax(src.read())) + if np.isfinite(mx) and mx > 5: + return 1.0 / S3_REFLECTANCE_SCALE + return 1.0 + + +def _stage_s3_for_efast(raw_s3_dir: Path, staging_dir: Path) -> int: + """Copy ``S3_*.tif`` inputs, scaling reflectance when still in DN form.""" + scale = _s3_reflectance_scale(raw_s3_dir) + if staging_dir.exists(): + shutil.rmtree(staging_dir) + staging_dir.mkdir(parents=True) + + count = 0 + for src_path in sorted(raw_s3_dir.glob("S3*.tif")): + dst_path = staging_dir / src_path.name + with rasterio.open(src_path) as src: + data = src.read().astype("float32") * scale + profile = src.profile.copy() + profile.update(dtype="float32") + descriptions = src.descriptions + with rasterio.open(dst_path, "w", **profile) as dst: + dst.write(data) + for i, desc in enumerate(descriptions, 1): + if desc: + dst.set_band_description(i, desc) + count += 1 + + if scale != 1.0: + print(f"[S3-PREP] Scaled raw SDR by {scale:g} for EFAST compositing") + return count + + +def _prepare_s3( + raw_s3_dir: Path, + s2_refl_path: Path, + s3_out_dir: Path, + *, + work_dir: Path | None = None, +) -> None: + """Run EFAST S3 compositing pipeline and reproject to S2 grid.""" + s3 = _import_s3_processing() + base = work_dir or (s3_out_dir / "_efast_work") + staging = base / "scaled" + composites = base / "composites" + blurred = base / "blurred" + calibrated = base / "calibrated" + + for directory in (staging, composites, blurred, calibrated): + if directory.exists(): + shutil.rmtree(directory) + directory.mkdir(parents=True, exist_ok=True) + + staged = _stage_s3_for_efast(raw_s3_dir, staging) + if staged == 0: + raise ValueError(f"No S3*.tif files found in {raw_s3_dir}") + + print( + f"[S3-PREP] produce_median_composite: mosaic_days={S3_MOSAIC_DAYS}, " + f"step={S3_COMPOSITE_STEP}, sigma_doy={S3_COMPOSITE_SIGMA_DOY}, " + f"D={S3_COMPOSITE_D}" + ) + s3.produce_median_composite( + staging, + composites, + step=S3_COMPOSITE_STEP, + mosaic_days=S3_MOSAIC_DAYS, + s3_bands=[1, 2, 3, 4], + D=S3_COMPOSITE_D, + sigma_doy=S3_COMPOSITE_SIGMA_DOY, + ) + s3.smoothing( + composites, + blurred, + product="composite", + std=S3_SMOOTHING_STD, + preserve_nan=False, + ) + s3.reformat_s3(blurred, calibrated, product="composite", scaling_factor=1) + + for old in s3_out_dir.glob("composite_*.tif"): + old.unlink() + _reproject_s3_composites_to_s2_grid(calibrated, s2_refl_path, s3_out_dir) + + if work_dir is None and base.exists(): + shutil.rmtree(base) + + n_out = len(list(s3_out_dir.glob("composite_*.tif"))) + print(f"[S3-PREP] Wrote {n_out} composites") + + +# --------------------------------------------------------------------------- +# Per-site pipeline +# --------------------------------------------------------------------------- + + +def process_site( + sitename: str, + lat: float, + lon: float, + year: int, +) -> dict[str, Any]: + """Download S2 + S3 and run EFAST preparation for one site.""" + site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename + s2_out = site_dir / "prepared" / "s2" + s3_raw = site_dir / "raw" / "s3" + s3_out = site_dir / "prepared" / "s3" + aoi_wkt = f"POINT ({lon} {lat})" + bbox = wkt_to_bbox(aoi_wkt) + creds = _cdse_credentials() + + # S3 download + print(f"[{sitename}] Downloading S3...") + download_s3_openeo( + start_date=datetime(year, 1, 1), + end_date=datetime(year, 12, 31), + aoi_geometry=aoi_wkt, + output_dir=s3_raw, + credentials=creds, + ) + + # S2 download + print(f"[{sitename}] Searching S2 on Earth Search...") + items = stac_search_s2(bbox, datetime(year, 1, 1), datetime(year, 12, 31)) + print(f"[{sitename}] {len(items)} S2 items found — downloading windows...") + download_s2_window(items, bbox, s2_out, S2_BANDS, RESOLUTION_RATIO) + + # S2 distance-to-clouds + print(f"[{sitename}] Computing distance-to-clouds...") + distance_to_clouds = _import_distance_to_clouds() + distance_to_clouds(s2_out, ratio=RESOLUTION_RATIO) + _rescale_dist_cloud(s2_out) + + # S3 compositing + s2_refl_path = next(iter(s2_out.glob("*_REFL.tif")), None) + if s2_refl_path is None: + raise ValueError(f"No REFL files in {s2_out} — S2 download may have failed") + s3_out.mkdir(parents=True, exist_ok=True) + print(f"[{sitename}] Running S3 compositing pipeline...") + _prepare_s3(s3_raw, s2_refl_path, s3_out) + + summary = { + "sitename": sitename, + "evaluation_year": year, + "lat": lat, + "lon": lon, + "s2_refl_count": len(list(s2_out.glob("*_REFL.tif"))), + "s2_dist_cloud_count": len(list(s2_out.glob("*_DIST_CLOUD.tif"))), + "s3_raw_count": len(list(s3_raw.glob("S3*.tif"))), + "s3_composite_count": len(list(s3_out.glob("composite_*.tif"))), + } + site_dir.mkdir(parents=True, exist_ok=True) + (site_dir / "data.json").write_text( + json.dumps(summary, indent=2) + "\n", encoding="utf-8" + ) + return summary + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR) + parser.add_argument( + "--site", + type=str, + default=None, + help="Single sitename to process (default: all step-2 PASS sites)", + ) + args = parser.parse_args(argv) + year = args.evaluation_year + + pass_sites = _load_screening_pass_sites(year) + if not pass_sites: + print("[Sentinel-3] No PASS sites found in step-2 screening output") + return 1 + + if args.site: + pass_sites = [s for s in pass_sites if s["sitename"] == args.site] + if not pass_sites: + print(f"[Sentinel-3] Site '{args.site}' not found in step-2 PASS sites") + return 1 + + print(f"[Sentinel-3] Processing {len(pass_sites)} site(s)") + for i, site in enumerate(pass_sites, 1): + sitename = site["sitename"] + print(f"[Sentinel-3] ({i}/{len(pass_sites)}) {sitename}") + try: + summary = process_site(sitename, site["lat"], site["lon"], year) + print( + f"[Sentinel-3] {sitename} done — " + f"{summary['s2_refl_count']} REFL, " + f"{summary['s3_composite_count']} composites" + ) + except Exception as exc: + print(f"[Sentinel-3] {sitename} FAILED: {exc}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/4-fusion.py b/4-fusion.py new file mode 100644 index 0000000..eeac275 --- /dev/null +++ b/4-fusion.py @@ -0,0 +1,330 @@ +"""Step 4: Compute GCC and run EFAST BtI + ItB fusion for prepared sites. + +Inputs (``data/``, ``{year}`` = ``--evaluation-year``): + +- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — ``*_REFL.tif`` + ``*_DIST_CLOUD.tif`` +- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — ``composite_*.tif`` (4-band) + +Outputs (``data/``): + +- ``sentinel_data/{year}/{sitename}/prepared/s2/*_GCC.tif`` — S2 GCC (in-place) +- ``sentinel_data/{year}/{sitename}/prepared/gcc_s3/*.tif`` — S3 GCC composites +- ``fusion/{year}/{sitename}/bti/fusion/REFL_*.tif`` — BtI fused 4-band reflectance +- ``fusion/{year}/{sitename}/bti/gcc/GCC_*.tif`` — GCC derived from BtI fusion +- ``fusion/{year}/{sitename}/itb/s2/GCC_*.tif`` — per-acquisition S2 GCC (simplified names) +- ``fusion/{year}/{sitename}/itb/s3/GCC_*.tif`` — per-composite S3 GCC (simplified names) +- ``fusion/{year}/{sitename}/itb/fusion/GCC_*.tif`` — ItB fused GCC + +Requires ``uv sync`` (efast). + +CLI: + +- ``--evaluation-year`` (default 2025) +- ``--site`` (optional; default: all prepared sites under ``sentinel_data/{year}/``) + +Prior step: :mod:`3-sentinel-data`. +""" + +from __future__ import annotations + +import argparse +import shutil +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any + +import numpy as np +import rasterio +from dateutil import rrule + +# --------------------------------------------------------------------------- +# Public constants +# --------------------------------------------------------------------------- + +RESOLUTION_RATIO = 30 +MOSAIC_STEP = 2 +MAX_DAYS = 100 +MINIMUM_ACQUISITION_IMPORTANCE = 0 + +DATA_DIR = Path("data") +DEFAULT_YEAR = 2025 + + +# --------------------------------------------------------------------------- +# efast import helper +# --------------------------------------------------------------------------- + + +def _import_efast(): + try: + import efast.efast as efast_module + + return efast_module + except ImportError as exc: + raise ImportError( + "efast not found. Install with: uv sync" + ) from exc + + +# --------------------------------------------------------------------------- +# GCC computation (from s2_cloud_native.py and s3_openeo.py) +# --------------------------------------------------------------------------- + + +def compute_gcc_s2(s2_dir: Path, output_dir: Path) -> None: + """Compute GCC from S2 REFL files and write ``*_GCC.tif`` to ``output_dir``. + + Reads every ``*_REFL.tif`` (band order B02/B03/B04) and writes a co-located + single-band GCC file. Cloud-masked pixels (zero in all bands) remain zero. + """ + output_dir.mkdir(parents=True, exist_ok=True) + for src_path in sorted(s2_dir.glob("*_REFL.tif")): + out_path = output_dir / src_path.name.replace("_REFL.tif", "_GCC.tif") + if out_path.is_file(): + continue + with rasterio.open(src_path) as src: + b, g, r = src.read(1), src.read(2), src.read(3) + profile = src.profile + total = b + g + r + gcc = g / (total + 1e-10) + gcc[total == 0] = 0 + profile.update(count=1) + with rasterio.open(out_path, "w", **profile) as dst: + dst.write(gcc[np.newaxis].astype("float32")) + + +def compute_gcc_s3(s3_dir: Path, output_dir: Path) -> None: + """Compute GCC from S3 composite files and write single-band GeoTIFFs. + + Reads every ``composite_*.tif`` (band order Oa04/Oa06/Oa08/Oa17) and writes + a single-band GCC file. NaN pixels in the input remain NaN. + """ + output_dir.mkdir(parents=True, exist_ok=True) + for src_path in sorted(s3_dir.glob("composite_*.tif")): + out_path = output_dir / src_path.name + if out_path.is_file(): + continue + with rasterio.open(src_path) as src: + b, g, r = src.read(1), src.read(2), src.read(3) + profile = src.profile + total = b + g + r + gcc = g / (total + 1e-10) + gcc[np.isnan(total)] = np.nan + profile.update(count=1, dtype="float32") + with rasterio.open(out_path, "w", **profile) as dst: + dst.write(gcc[np.newaxis].astype("float32")) + + +def compute_gcc_from_refl(refl_dir: Path, gcc_dir: Path) -> None: + """Derive GCC from ``REFL_YYYYMMDD.tif`` files (BtI fusion output). + + Reads every ``REFL_*.tif`` and writes a co-located single-band + ``GCC_YYYYMMDD.tif``. Zero pixels remain zero. + """ + gcc_dir.mkdir(parents=True, exist_ok=True) + for src_path in sorted(refl_dir.glob("REFL_*.tif")): + out_path = gcc_dir / src_path.name.replace("REFL_", "GCC_") + if out_path.is_file(): + continue + with rasterio.open(src_path) as src: + b, g, r = src.read(1), src.read(2), src.read(3) + profile = src.profile + total = b + g + r + gcc = g / (total + 1e-10) + gcc[total == 0] = 0 + profile.update(count=1) + with rasterio.open(out_path, "w", **profile) as dst: + dst.write(gcc[np.newaxis].astype("float32")) + + +# --------------------------------------------------------------------------- +# Date-range detection +# --------------------------------------------------------------------------- + + +def _refl_date_range(s2_dir: Path) -> tuple[datetime, datetime] | None: + """Return (start, end) datetime from REFL filenames in ``s2_dir``. + + Filenames are expected to follow the S2 product naming convention, where + the acquisition date ``YYYYMMDD`` appears at position index 2 when the + stem is split by ``_``, e.g. + ``S2A_MSIL2A_20230911T114111_N0509_R025_T29PKT_20230911T153131_REFL.tif``. + """ + dates: list[datetime] = [] + for p in s2_dir.glob("*_REFL.tif"): + parts = p.stem.split("_") + if len(parts) >= 3: + try: + dates.append(datetime.strptime(parts[2][:8], "%Y%m%d")) + except ValueError: + pass + if not dates: + return None + return min(dates), max(dates) + + +# --------------------------------------------------------------------------- +# Per-site fusion +# --------------------------------------------------------------------------- + + +def fuse_site(sitename: str, year: int) -> dict[str, Any]: + """Run GCC computation and EFAST BtI + ItB fusion for one prepared site.""" + efast = _import_efast() + + s2_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s2" + s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3" + gcc_s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "gcc_s3" + base = DATA_DIR / "fusion" / str(year) / sitename + + if not s2_dir.is_dir() or not any(s2_dir.glob("*_REFL.tif")): + raise FileNotFoundError(f"No REFL files in {s2_dir}") + if not s3_dir.is_dir() or not any(s3_dir.glob("composite_*.tif")): + raise FileNotFoundError(f"No composite files in {s3_dir}") + + print(f"[{sitename}] Computing S2 GCC (in-place)...") + compute_gcc_s2(s2_dir, s2_dir) + + print(f"[{sitename}] Computing S3 GCC...") + compute_gcc_s3(s3_dir, gcc_s3_dir) + + date_range = _refl_date_range(s2_dir) + if date_range is None: + raise ValueError(f"Could not detect date range from REFL filenames in {s2_dir}") + start, end = date_range + print(f"[{sitename}] Date range: {start.date()} → {end.date()}") + + fusion_dates = list( + rrule.rrule( + rrule.DAILY, + dtstart=start + timedelta(MOSAIC_STEP), + until=end - timedelta(MOSAIC_STEP), + interval=MOSAIC_STEP, + ) + ) + + _fusion_kwargs = dict( + ratio=RESOLUTION_RATIO, + max_days=MAX_DAYS, + minimum_acquisition_importance=MINIMUM_ACQUISITION_IMPORTANCE, + ) + + # --- ItB: GCC first, then fuse GCC --- + itb_s2 = base / "itb" / "s2" + itb_s3 = base / "itb" / "s3" + itb_fusion = base / "itb" / "fusion" + itb_s2.mkdir(parents=True, exist_ok=True) + itb_s3.mkdir(parents=True, exist_ok=True) + itb_fusion.mkdir(parents=True, exist_ok=True) + + for p in sorted(s2_dir.glob("*_GCC.tif")): + dst = itb_s2 / f"GCC_{p.stem.split('_')[2][:8]}.tif" + if not dst.exists(): + shutil.copy2(p, dst) + for p in sorted(gcc_s3_dir.glob("composite_*.tif")): + dst = itb_s3 / f"GCC_{p.stem.split('_')[1]}.tif" + if not dst.exists(): + shutil.copy2(p, dst) + + print(f"[{sitename}] ItB: fusing GCC over {len(fusion_dates)} dates...") + for date in fusion_dates: + efast.fusion(date, gcc_s3_dir, s2_dir, itb_fusion, product="GCC", **_fusion_kwargs) + + # --- BtI: fuse reflectance (3-band, matching S2 B02/B03/B04), then derive GCC --- + # S3 composites have 4 bands; strip band 4 (Oa17/NIR) so shapes match S2 REFL. + s3_rgb_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3_rgb" + s3_rgb_dir.mkdir(parents=True, exist_ok=True) + for p in sorted(s3_dir.glob("composite_*.tif")): + out = s3_rgb_dir / p.name + if not out.exists(): + with rasterio.open(p) as src: + data = src.read([1, 2, 3]) + profile = src.profile.copy() + profile.update(count=3) + with rasterio.open(out, "w", **profile) as dst: + dst.write(data) + + bti_fusion = base / "bti" / "fusion" + bti_gcc = base / "bti" / "gcc" + bti_fusion.mkdir(parents=True, exist_ok=True) + + print(f"[{sitename}] BtI: fusing REFL over {len(fusion_dates)} dates...") + for date in fusion_dates: + efast.fusion(date, s3_rgb_dir, s2_dir, bti_fusion, product="REFL", **_fusion_kwargs) + + print(f"[{sitename}] BtI: deriving GCC from fused REFL...") + compute_gcc_from_refl(bti_fusion, bti_gcc) + + return { + "sitename": sitename, + "evaluation_year": year, + "start": start.date().isoformat(), + "end": end.date().isoformat(), + "fusion_dates": len(fusion_dates), + "itb_fusion_files": len(list(itb_fusion.glob("*.tif"))), + "bti_fusion_files": len(list(bti_fusion.glob("*.tif"))), + "bti_gcc_files": len(list(bti_gcc.glob("*.tif"))), + } + + +# --------------------------------------------------------------------------- +# Site discovery +# --------------------------------------------------------------------------- + + +def _discover_sites(year: int) -> list[str]: + """Return sitenames that have prepared S2 REFL files under sentinel_data.""" + base = DATA_DIR / "sentinel_data" / str(year) + if not base.is_dir(): + return [] + return sorted( + d.name + for d in base.iterdir() + if d.is_dir() and any((d / "prepared" / "s2").glob("*_REFL.tif")) + ) + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main(argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR) + parser.add_argument( + "--site", + type=str, + default=None, + help="Single sitename to fuse (default: all prepared sites)", + ) + args = parser.parse_args(argv) + year = args.evaluation_year + + if args.site: + sites = [args.site] + else: + sites = _discover_sites(year) + if not sites: + print(f"[Fusion] No prepared sites found under data/sentinel_data/{year}/") + return 1 + + print(f"[Fusion] Processing {len(sites)} site(s)") + for i, sitename in enumerate(sites, 1): + print(f"[Fusion] ({i}/{len(sites)}) {sitename}") + try: + summary = fuse_site(sitename, year) + print( + f"[Fusion] {sitename} done — " + f"{summary['fusion_dates']} dates, " + f"itb={summary['itb_fusion_files']} bti={summary['bti_fusion_files']} " + f"bti_gcc={summary['bti_gcc_files']}" + ) + except Exception as exc: + print(f"[Fusion] {sitename} FAILED: {exc}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/5-metrics.py b/5-metrics.py new file mode 100644 index 0000000..2f26fdc --- /dev/null +++ b/5-metrics.py @@ -0,0 +1,695 @@ +"""Step 5: Pre-compute per-site GCC timeseries + raster index for the webapp. + +Inputs (``data/``, ``{year}`` = ``--evaluation-year``): + +- ``phenocam_screening/{year}.json`` — qualifying sites + metadata +- ``phenocam/{year}/{site}_1day.csv`` — daily GCC timeseries +- ``sentinel_data/{year}/{site}/prepared/s2/*_GCC.tif`` — S2 GCC rasters +- ``sentinel_data/{year}/{site}/prepared/gcc_s3/composite_*.tif`` — S3 GCC rasters +- ``fusion/{year}/{site}/bti/gcc/GCC_*.tif`` — BtI GCC rasters +- ``fusion/{year}/{site}/itb/fusion/GCC_*.tif`` — ItB GCC rasters + +Outputs (``data/metrics/``): + +- ``manifest.json`` — years + per-site metadata +- ``{year}/{site}/gcc_phenocam.json`` — PhenoCam ``gcc_90`` at matched dates +- ``{year}/{site}/gcc_s2.json`` — S2 GCC (center pixel, cloud-free scenes) +- ``{year}/{site}/gcc_s2_whittaker.json`` — Whittaker-smoothed S2 GCC +- ``{year}/{site}/gcc_s3.json`` — S3 composite GCC +- ``{year}/{site}/gcc_s3_smooth.json`` — S3 5-day moving average +- ``{year}/{site}/gcc_fusion_bti.json`` — BtI fused GCC +- ``{year}/{site}/gcc_fusion_itb.json`` — ItB fused GCC +- ``{year}/{site}/phenocam_images.json`` — midday photo URLs for the viewer +- ``{year}/{site}/rasters_s2_refl.json`` — S2 REFL paths (BtI view) +- ``{year}/{site}/rasters_s3_composite.json`` — S3 composite paths (BtI view) +- ``{year}/{site}/rasters_s2_gcc.json`` — S2 GCC paths (ItB view) +- ``{year}/{site}/rasters_s3_gcc.json`` — S3 GCC paths (ItB view) +- ``{year}/{site}/rasters_fusion_bti_refl.json`` — BtI fused REFL paths +- ``{year}/{site}/rasters_fusion_itb_gcc.json`` — ItB fused GCC paths +- ``{year}/{site}/metrics.json`` — NSE, RMSE, nRMSE, Pearson r vs PhenoCam per series +- ``{year}/{site}/bands_s2.json`` — S2 center-pixel reflectance (B02, B03, B04) per scene +- ``{year}/{site}/bands_s3.json`` — S3 center-pixel reflectance (Oa04, Oa06, Oa08, Oa17) per composite +- ``{year}/{site}/covariates.json`` — spatial CV/std, S2/S3 counts, gap stats + +CLI: + +- ``--evaluation-year`` (default 2025) +- ``--site`` (optional; default: all qualifying sites with sentinel data) +""" + +from __future__ import annotations + +import argparse +import csv +import json +import re +from pathlib import Path +from typing import Any + +import datetime +import numpy as np +import rasterio +from rasterio.crs import CRS +from rasterio.transform import rowcol +from pyproj import Transformer +from scipy.stats import pearsonr +from tqdm import tqdm + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +DATA_DIR = Path("data") +DEFAULT_YEAR = 2025 + +# GCC smoothing window for S3 moving average (days) +S3_SMOOTH_WINDOW = 5 + +# Whittaker lambda (penalised smoothing strength for S2) +WHITTAKER_LAMBDA = 400.0 + +# Half-width in metres for the spatial heterogeneity footprint (~300 m = 1 S3 pixel) +SPATIAL_CV_HALF_M = 150 + +# PhenoCam archive image URL pattern +PHENOCAM_IMAGE_URL = "https://phenocam.nau.edu/data/archive/{site}/{year}/{month}/{filename}" + + +# --------------------------------------------------------------------------- +# Helpers: raster pixel extraction +# --------------------------------------------------------------------------- + + +def _read_center_pixel(path: Path, lat: float, lon: float) -> float | None: + """Return the 3×3 mean GCC value at (lat, lon) from a single-band raster. + + Returns ``None`` when the pixel is masked/zero/NaN. + """ + try: + with rasterio.open(path) as src: + transformer = Transformer.from_crs( + CRS.from_epsg(4326), src.crs, always_xy=True + ) + x, y = transformer.transform(lon, lat) + row, col = rowcol(src.transform, x, y) + h, w = src.height, src.width + r0, r1 = max(0, row - 1), min(h, row + 2) + c0, c1 = max(0, col - 1), min(w, col + 2) + window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0) + data = src.read(1, window=window).astype(float) + nodata = src.nodata + if nodata is not None: + data = np.where(data == nodata, np.nan, data) + data[data == 0] = np.nan + val = np.nanmean(data) + return None if np.isnan(val) else float(val) + except Exception: + return None + + +# --------------------------------------------------------------------------- +# Helpers: date extraction from filenames +# --------------------------------------------------------------------------- + + +def _date_from_gcc_tif(path: Path) -> str | None: + """Extract YYYYMMDD from ``GCC_YYYYMMDD.tif`` or ``composite_YYYYMMDD.tif``.""" + m = re.search(r"(\d{8})", path.stem) + return m.group(1) if m else None + + +def _date_from_s2_tif(path: Path) -> str | None: + """Extract YYYYMMDD from S2 product name ``S2X_TTTT_YYYYMMDD_…``.""" + parts = path.stem.split("_") + if len(parts) >= 3: + m = re.match(r"(\d{8})", parts[2]) + return m.group(1) if m else None + return None + + +# --------------------------------------------------------------------------- +# Helpers: Whittaker smoother (2nd-order differences, tridiagonal solver) +# --------------------------------------------------------------------------- + + +def _whittaker_smooth(values: list[float | None], lam: float = WHITTAKER_LAMBDA) -> list[float | None]: + """Penalised least-squares smoother (Whittaker, 2nd-order differences). + + Masked (None) values are filled via the smooth and then re-set to None in + the output so the caller can distinguish observed from gap-filled points. + """ + n = len(values) + if n < 4: + return values[:] + + obs_mask = [v is not None for v in values] + y = np.array([v if v is not None else 0.0 for v in values], dtype=float) + w = np.array([1.0 if m else 0.0 for m in obs_mask], dtype=float) + + W = np.diag(w) + D = np.diff(np.eye(n), n=2, axis=0) # (n-2) x n second-difference matrix + A = W + lam * D.T @ D + try: + z = np.linalg.solve(A, w * y) + except np.linalg.LinAlgError: + return values[:] + + result: list[float | None] = [] + for i, m in enumerate(obs_mask): + result.append(float(z[i]) if m else None) + return result + + +# --------------------------------------------------------------------------- +# Helpers: PhenoCam CSV parsing +# --------------------------------------------------------------------------- + + +def _parse_phenocam_csv( + csv_path: Path, year: int, site: str +) -> tuple[list[dict], list[dict]]: + """Return (gcc_series, image_list) filtered to ``year``. + + ``gcc_series`` entries: ``{"date": "YYYY-MM-DD", "gcc_90": float}`` + ``image_list`` entries: ``{"date": "YYYY-MM-DD", "url": str}`` + """ + gcc_series: list[dict] = [] + image_list: list[dict] = [] + year_str = str(year) + + if not csv_path.is_file(): + return gcc_series, image_list + + with csv_path.open() as f: + lines = [l for l in f if not l.startswith("#")] + + reader = csv.DictReader(lines) + for row in reader: + if row.get("year") != year_str: + continue + date = row.get("date", "") + gcc_raw = row.get("gcc_90") + if gcc_raw and gcc_raw not in ("NA", ""): + try: + gcc_series.append({"date": date, "gcc_90": float(gcc_raw)}) + except ValueError: + pass + fn = row.get("midday_filename", "").strip() + if fn and fn != "NA" and date: + month = date[5:7] + url = PHENOCAM_IMAGE_URL.format( + site=site, year=year_str, month=month, filename=fn + ) + image_list.append({"date": date, "url": url}) + + return gcc_series, image_list + + +# --------------------------------------------------------------------------- +# Helpers: moving average +# --------------------------------------------------------------------------- + + +def _moving_average( + series: list[dict], value_key: str, window: int +) -> list[dict]: + """Compute centred moving average; returns new list with ``_smooth`` suffix key.""" + if not series: + return [] + vals = [p[value_key] for p in series] + half = window // 2 + smoothed = [] + for i, pt in enumerate(series): + chunk = [v for v in vals[max(0, i - half): i + half + 1] if v is not None] + smoothed.append({ + "date": pt["date"], + value_key + "_smooth": (sum(chunk) / len(chunk)) if chunk else None, + }) + return smoothed + + +# --------------------------------------------------------------------------- +# Helpers: validation metrics +# --------------------------------------------------------------------------- + +MATCH_TOLERANCE_DAYS = 5 + + +def compute_metrics( + ref: list[dict], ref_key: str, + pred: list[dict], pred_key: str, +) -> dict | None: + """Compute NSE, RMSE, nRMSE, Pearson r between pred and ref. + + Each pred point is matched to the nearest ref date within + ``MATCH_TOLERANCE_DAYS``. Returns a dict or ``None`` if fewer than + 2 matched pairs exist. + """ + ref_lookup: dict[str, float] = {p["date"]: p[ref_key] for p in ref if p.get(ref_key) is not None} + if not ref_lookup: + return None + + ref_dates = sorted(ref_lookup) + + obs, sim = [], [] + for pt in pred: + v = pt.get(pred_key) + if v is None: + continue + nearest = min(ref_dates, key=lambda d: abs(( + np.datetime64(pt["date"]) - np.datetime64(d)) / np.timedelta64(1, "D"))) + gap = abs((np.datetime64(pt["date"]) - np.datetime64(nearest)) / np.timedelta64(1, "D")) + if gap <= MATCH_TOLERANCE_DAYS and nearest in ref_lookup: + obs.append(ref_lookup[nearest]) + sim.append(v) + + if len(obs) < 2: + return None + + obs_arr = np.array(obs) + sim_arr = np.array(sim) + obs_mean = obs_arr.mean() + + rmse = float(np.sqrt(np.mean((sim_arr - obs_arr) ** 2))) + nrmse = rmse / obs_mean if obs_mean else None + ss_res = float(np.sum((obs_arr - sim_arr) ** 2)) + ss_tot = float(np.sum((obs_arr - obs_mean) ** 2)) + nse = (1.0 - ss_res / ss_tot) if ss_tot else None + r, _ = pearsonr(obs_arr, sim_arr) + + def _r4(v: float | None) -> float | None: + return round(v, 4) if v is not None else None + + return {"n": len(obs), "rmse": _r4(rmse), "nrmse": _r4(nrmse), "nse": _r4(nse), "r": _r4(float(r))} + + +S2_BAND_NAMES = ["B02", "B03", "B04"] +S3_BAND_NAMES = ["Oa04", "Oa06", "Oa08", "Oa17"] + + +def _read_multiband_center( + path: Path, lat: float, lon: float, band_names: list[str] +) -> dict[str, float | None]: + """Return 3×3 mean per band at (lat, lon). Keys are ``band_names``, values float or None.""" + try: + with rasterio.open(path) as src: + transformer = Transformer.from_crs(CRS.from_epsg(4326), src.crs, always_xy=True) + x, y = transformer.transform(lon, lat) + row, col = rowcol(src.transform, x, y) + h, w = src.height, src.width + r0, r1 = max(0, row - 1), min(h, row + 2) + c0, c1 = max(0, col - 1), min(w, col + 2) + window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0) + nodata = src.nodata + result = {} + for i, name in enumerate(band_names, 1): + if i > src.count: + result[name] = None + continue + data = src.read(i, window=window).astype(float) + if nodata is not None: + data = np.where(data == nodata, np.nan, data) + data[data == 0] = np.nan + val = np.nanmean(data) + result[name] = None if np.isnan(val) else round(float(val), 6) + return result + except Exception: + return {name: None for name in band_names} + + +def _multiband_series( + tif_paths: list[Path], + date_fn, + lat: float, + lon: float, + band_names: list[str], + desc: str, +) -> list[dict]: + """Extract center-pixel values for all bands; return ``[{date, band1, band2, …}]``.""" + result = [] + for p in tqdm(tif_paths, desc=desc, leave=False): + date = date_fn(p) + if date is None: + continue + vals = _read_multiband_center(p, lat, lon, band_names) + if any(v is not None for v in vals.values()): + result.append({"date": f"{date[:4]}-{date[4:6]}-{date[6:]}", **vals}) + return sorted(result, key=lambda x: x["date"]) + + +# --------------------------------------------------------------------------- +# Helpers: spatial heterogeneity + observation density +# --------------------------------------------------------------------------- + + +def _read_footprint_stats( + path: Path, lat: float, lon: float, half_m: float = SPATIAL_CV_HALF_M +) -> tuple[float, float] | tuple[None, None]: + """Return (mean, std) of valid GCC pixels within a ±half_m metre square window. + + Returns ``(None, None)`` on any error or when fewer than 4 valid pixels exist. + """ + try: + with rasterio.open(path) as src: + transformer = Transformer.from_crs(CRS.from_epsg(4326), src.crs, always_xy=True) + x, y = transformer.transform(lon, lat) + res = abs(src.transform.a) # pixel size in CRS units (metres for UTM) + half_px = max(1, int(round(half_m / res))) + row, col = rowcol(src.transform, x, y) + h, w = src.height, src.width + r0, r1 = max(0, row - half_px), min(h, row + half_px + 1) + c0, c1 = max(0, col - half_px), min(w, col + half_px + 1) + window = rasterio.windows.Window(c0, r0, c1 - c0, r1 - r0) + data = src.read(1, window=window).astype(float) + nodata = src.nodata + if nodata is not None: + data = np.where(data == nodata, np.nan, data) + data[data <= 0] = np.nan + valid = data[~np.isnan(data)] + if len(valid) < 4: + return None, None + return float(np.mean(valid)), float(np.std(valid)) + except Exception: + return None, None + + +def compute_covariates( + s2_gcc_paths: list[Path], + s2_series: list[dict], + s3_series: list[dict], + n_gcc_points: int | None, + lat: float, + lon: float, +) -> dict: + """Compute spatial heterogeneity and temporal observation density covariates.""" + # Spatial GCC statistics over ~300 m footprint + means, stds = [], [] + for p in s2_gcc_paths: + m, s = _read_footprint_stats(p, lat, lon) + if m is not None and m > 0: + means.append(m) + stds.append(s) + + spatial_gcc_cv = round(float(np.mean([s / m for s, m in zip(stds, means)])), 4) if means else None + spatial_gcc_std = round(float(np.mean(stds)), 4) if stds else None + + # S2 temporal gap statistics + s2_dates = [datetime.date.fromisoformat(p["date"]) for p in s2_series] + if len(s2_dates) >= 2: + gaps = [(s2_dates[i + 1] - s2_dates[i]).days for i in range(len(s2_dates) - 1)] + s2_mean_gap = round(float(np.mean(gaps)), 1) + s2_max_gap = int(max(gaps)) + else: + s2_mean_gap = None + s2_max_gap = None + + return { + "spatial_gcc_cv": spatial_gcc_cv, + "spatial_gcc_std": spatial_gcc_std, + "s2_scene_count": len(s2_series), + "s2_mean_gap_days": s2_mean_gap, + "s2_max_gap_days": s2_max_gap, + "s3_composite_count": len(s3_series), + "n_gcc_points": n_gcc_points, + } + + +# --------------------------------------------------------------------------- +# Per-site export +# --------------------------------------------------------------------------- + + +def _write_json(path: Path, data: Any) -> None: + path.write_text(json.dumps(data, separators=(",", ":"))) + + +def _raster_series( + tif_paths: list[Path], + date_fn, + lat: float, + lon: float, + desc: str, +) -> list[dict]: + """Extract center-pixel GCC from each tif, return ``[{date, gcc}]`` sorted.""" + result = [] + for p in tqdm(tif_paths, desc=desc, leave=False): + date = date_fn(p) + if date is None: + continue + val = _read_center_pixel(p, lat, lon) + if val is not None: + result.append({"date": f"{date[:4]}-{date[4:6]}-{date[6:]}", "gcc": val}) + return sorted(result, key=lambda x: x["date"]) + + +def _raster_index(tif_paths: list[Path], date_fn, rel_root: Path) -> list[dict]: + """Build raster index: ``[{date, path}]`` sorted by date.""" + result = [] + for p in tif_paths: + date = date_fn(p) + if date is None: + continue + try: + rel = str(p.relative_to(rel_root)) + except ValueError: + rel = str(p) + result.append({"date": date, "path": rel}) + return sorted(result, key=lambda x: x["date"]) + + +def export_site( + site: str, + year: int, + lat: float, + lon: float, + out_dir: Path, + n_gcc_points: int | None = None, +) -> bool: + """Export timeseries.json and rasters.json for one site. Returns True on success.""" + sentinel_base = DATA_DIR / "sentinel_data" / str(year) / site / "prepared" + fusion_base = DATA_DIR / "fusion" / str(year) / site + + s2_gcc_dir = sentinel_base / "s2" + s3_gcc_dir = sentinel_base / "gcc_s3" + bti_gcc_dir = fusion_base / "bti" / "gcc" + itb_gcc_dir = fusion_base / "itb" / "fusion" + + # Raster slider sources + s2_refl_dir = sentinel_base / "s2" + s3_comp_dir = sentinel_base / "s3" + bti_refl_dir = fusion_base / "bti" / "fusion" + + has_fusion = bti_gcc_dir.is_dir() and any(bti_gcc_dir.glob("GCC_*.tif")) + if not has_fusion: + return False + + out_dir.mkdir(parents=True, exist_ok=True) + + # --- GCC timeseries from rasters --- + s2_gcc_paths = sorted(s2_gcc_dir.glob("*_GCC.tif")) + s3_gcc_paths = sorted(s3_gcc_dir.glob("composite_*.tif")) + bti_paths = sorted(bti_gcc_dir.glob("GCC_*.tif")) + itb_paths = sorted(itb_gcc_dir.glob("GCC_*.tif")) + + s2_series = _raster_series(s2_gcc_paths, _date_from_s2_tif, lat, lon, f"{site} S2") + s3_series = _raster_series(s3_gcc_paths, _date_from_gcc_tif, lat, lon, f"{site} S3") + bti_series = _raster_series(bti_paths, _date_from_gcc_tif, lat, lon, f"{site} BtI") + itb_series = _raster_series(itb_paths, _date_from_gcc_tif, lat, lon, f"{site} ItB") + + # Whittaker on S2 + s2_vals = [p["gcc"] for p in s2_series] + s2_smooth_vals = _whittaker_smooth(s2_vals) + s2_whittaker = [ + {"date": p["date"], "gcc": v} + for p, v in zip(s2_series, s2_smooth_vals) + if v is not None + ] + + # S3 5-day moving average + s3_smooth = _moving_average(s3_series, "gcc", S3_SMOOTH_WINDOW) + + # PhenoCam CSV + csv_path = DATA_DIR / "phenocam" / str(year) / f"{site}_1day.csv" + phenocam_series, image_list = _parse_phenocam_csv(csv_path, year, site) + + s3_smooth_series = [ + {"date": p["date"], "gcc": p["gcc_smooth"]} + for p in s3_smooth + if p.get("gcc_smooth") is not None + ] + + # Band reflectance timeseries (multi-band center-pixel) + bands_s2 = _multiband_series(sorted(s2_refl_dir.glob("*_REFL.tif")), _date_from_s2_tif, lat, lon, S2_BAND_NAMES, f"{site} S2 bands") + bands_s3 = _multiband_series(sorted(s3_comp_dir.glob("composite_*.tif")), _date_from_gcc_tif, lat, lon, S3_BAND_NAMES, f"{site} S3 bands") + + # --- Per-metric JSON outputs --- + _write_json(out_dir / "gcc_phenocam.json", phenocam_series) + _write_json(out_dir / "gcc_s2.json", s2_series) + _write_json(out_dir / "gcc_s2_whittaker.json", s2_whittaker) + _write_json(out_dir / "gcc_s3.json", s3_series) + _write_json(out_dir / "gcc_s3_smooth.json", s3_smooth_series) + _write_json(out_dir / "gcc_fusion_bti.json", bti_series) + _write_json(out_dir / "gcc_fusion_itb.json", itb_series) + _write_json(out_dir / "phenocam_images.json", image_list) + _write_json(out_dir / "bands_s2.json", bands_s2) + _write_json(out_dir / "bands_s3.json", bands_s3) + + # --- Raster index for slider --- + rel_root = DATA_DIR.parent # paths relative to project root + + # Valid-pixel sets: only show S2/S3 rasters where the center pixel had + # usable data (non-zero GCC). This excludes cloud-masked / snow-covered + # scenes that would render as black or visually nonsensical. + s2_valid_dates = {p["date"].replace("-", "") for p in s2_series} + s3_valid_dates = {p["date"].replace("-", "") for p in s3_series} + + s2_refl = [r for r in _raster_index(sorted(s2_refl_dir.glob("*_REFL.tif")), _date_from_s2_tif, rel_root) + if r["date"] in s2_valid_dates] + s3_comp = [r for r in _raster_index(sorted(s3_comp_dir.glob("composite_*.tif")), _date_from_gcc_tif, rel_root) + if r["date"] in s3_valid_dates] + s2_gcc = [r for r in _raster_index(sorted(s2_gcc_dir.glob("*_GCC.tif")), _date_from_s2_tif, rel_root) + if r["date"] in s2_valid_dates] + s3_gcc = [r for r in _raster_index(sorted(s3_gcc_dir.glob("composite_*.tif")), _date_from_gcc_tif, rel_root) + if r["date"] in s3_valid_dates] + bti_refl = _raster_index(sorted(bti_refl_dir.glob("REFL_*.tif")), _date_from_gcc_tif, rel_root) + itb_gcc = _raster_index(sorted(itb_gcc_dir.glob("GCC_*.tif")), _date_from_gcc_tif, rel_root) + + _write_json(out_dir / "rasters_s2_refl.json", s2_refl) + _write_json(out_dir / "rasters_s3_composite.json", s3_comp) + _write_json(out_dir / "rasters_s2_gcc.json", s2_gcc) + _write_json(out_dir / "rasters_s3_gcc.json", s3_gcc) + _write_json(out_dir / "rasters_fusion_bti_refl.json", bti_refl) + _write_json(out_dir / "rasters_fusion_itb_gcc.json", itb_gcc) + + # --- Site covariates (heterogeneity + observation density) --- + _write_json(out_dir / "covariates.json", compute_covariates( + s2_gcc_paths, s2_series, s3_series, n_gcc_points, lat, lon + )) + + # --- Validation metrics vs PhenoCam gcc_90 --- + _write_json(out_dir / "metrics.json", { + "bti": compute_metrics(phenocam_series, "gcc_90", bti_series, "gcc"), + "itb": compute_metrics(phenocam_series, "gcc_90", itb_series, "gcc"), + "s2_whittaker": compute_metrics(phenocam_series, "gcc_90", s2_whittaker, "gcc"), + "s3_smooth": compute_metrics(phenocam_series, "gcc_90", s3_smooth_series, "gcc"), + "s2": compute_metrics(phenocam_series, "gcc_90", s2_series, "gcc"), + "s3": compute_metrics(phenocam_series, "gcc_90", s3_series, "gcc"), + }) + + # Remove legacy bundled outputs if present + for legacy in ("timeseries.json", "rasters.json"): + (out_dir / legacy).unlink(missing_ok=True) + return True + + +# --------------------------------------------------------------------------- +# Manifest +# --------------------------------------------------------------------------- + +VEG_TYPE_LABELS = { + "AG": "Agriculture", + "DB": "Deciduous broadleaf", + "DN": "Deciduous needleleaf", + "EB": "Evergreen broadleaf", + "EN": "Evergreen needleleaf", + "GR": "Grassland", + "MX": "Mixed", + "SH": "Shrubland", + "TN": "Tundra", + "UN": "Unknown", + "WL": "Wetland", + "RF": "Reference", +} + + +def build_manifest(years: list[int], filter_site: str | None = None) -> dict: + manifest: dict[str, Any] = {"years": years, "sites": {}} + + for year in years: + screening_path = DATA_DIR / "phenocam_screening" / f"{year}.json" + if not screening_path.is_file(): + continue + data = json.loads(screening_path.read_text()) + sites_meta: dict[str, Any] = {} + for entry in data.get("sites", []): + if entry.get("calculations", {}).get("status") != "PASS": + continue + cam = entry.get("response", {}).get("camera", {}) + roi = entry.get("response", {}).get("roi", {}) + calc = entry.get("calculations", {}) + site = cam.get("Sitename", "") + if not site: + continue + if filter_site and site != filter_site: + continue + sm = cam.get("sitemetadata", {}) + veg_raw = sm.get("primary_veg_type") or roi.get("roitype") or "UN" + fusion_dir = DATA_DIR / "fusion" / str(year) / site / "bti" / "gcc" + has_fusion = fusion_dir.is_dir() and any(fusion_dir.glob("GCC_*.tif")) + sites_meta[site] = { + "lat": cam.get("Lat"), + "lon": cam.get("Lon"), + "veg_type": veg_raw, + "veg_label": VEG_TYPE_LABELS.get(veg_raw, veg_raw), + "description": sm.get("site_description", ""), + "dominant_species": sm.get("dominant_species", ""), + "group": sm.get("group", ""), + "snr": calc.get("snr"), + "n_gcc_points": calc.get("n_gcc_points"), + "has_fusion": has_fusion, + } + manifest["sites"][str(year)] = sites_meta + + return manifest + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR) + parser.add_argument("--site", type=str, default=None) + args = parser.parse_args() + + year = args.evaluation_year + filter_site = args.site + + out_base = DATA_DIR / "metrics" + out_base.mkdir(parents=True, exist_ok=True) + + # Determine years with screening data + screening_dir = DATA_DIR / "phenocam_screening" + years = sorted( + int(p.stem) for p in screening_dir.glob("*.json") if p.stem.isdigit() + ) + if not years: + years = [year] + + print(f"Building manifest for years: {years}") + manifest = build_manifest(years, filter_site) + + # Export per-site data for the requested year + year_sites = manifest["sites"].get(str(year), {}) + fusion_sites = {s: m for s, m in year_sites.items() if m["has_fusion"]} + if filter_site: + fusion_sites = {s: m for s, m in fusion_sites.items() if s == filter_site} + + print(f"Exporting {len(fusion_sites)} site(s) with fusion data for {year}") + for site, meta in tqdm(fusion_sites.items(), desc="Sites"): + out_dir = out_base / str(year) / site + ok = export_site(site, year, meta["lat"], meta["lon"], out_dir, meta.get("n_gcc_points")) + if ok: + print(f" ✓ {site}") + else: + print(f" ✗ {site} — no fusion data found") + + manifest_path = out_base / "manifest.json" + manifest_path.write_text(json.dumps(manifest, separators=(",", ":"))) + print(f"Manifest written → {manifest_path}") + + +if __name__ == "__main__": + main() diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..2e1029f --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,151 @@ +# AGENTS.md + +Worldwide PhenoCam EFAST feasibility screening. Human summary: [`README.md`](README.md). + +--- + +## Layout + +| Path | Role | +|------|------| +| `1-phenocam.py` | Step 1: download PhenoCam metadata + `one_day_summary` CSV | +| `2-phenocam-screening.py` | Step 2: PhenoCam + SNR gates on cached CSVs | +| `3-sentinel-data.py` | Step 3: S2 (Earth Search COG) + S3 (CDSE OpenEO) download + EFAST prep | +| `4-fusion.py` | Step 4: GCC computation + EFAST BtI/ItB fusion loop | +| `5-metrics.py` | Step 5: timeseries, covariates, `metrics.json`, webapp manifest | +| `data/` | Manifests, per-site caches, screening outputs (large; mostly generated) | +| `webapp/` | Static QA viewer (`make serve` from workspace root) | + +Workspace orchestration: [`../AGENTS.md`](../AGENTS.md). + +--- + +## Where to work + +| Task | Location | +|------|----------| +| PhenoCam bulk download | `1-phenocam.py` | +| GCC/SNR screening on disk | `2-phenocam-screening.py` | +| S2/S3 download + EFAST prep | `3-sentinel-data.py` | +| GCC + fusion | `4-fusion.py` | +| Metrics + webapp index | `5-metrics.py` | +| Web QA | `../Makefile` target `serve` → `webapp/index.html` | + +--- + +## Setup + +**Preferred (uv):** from `processing/`: + +```bash +uv sync # all deps from pyproject.toml (incl. efast) +``` + +Run any script as `uv run python - - - - - - -
-
- -

Innsbruck

-

2024

-
- - - - - - - - - - -
- -
2024-01-01
-
-
Fusion RGB (closest available)
-
-
-
-
NDVI
-
GCC
-
B02 (Blue)
-
B03 (Green)
-
B04 (Red)
-
B8A (NIR)
-
-
- - - diff --git a/webapp/gap_validation.html b/webapp/gap_validation.html deleted file mode 100644 index 73e4d9a..0000000 --- a/webapp/gap_validation.html +++ /dev/null @@ -1,284 +0,0 @@ - - - - - Gap validation - - - -
- -

Gap validation

-
- - - - -
-
-
- - - diff --git a/webapp/index.html b/webapp/index.html index 40ed804..9f40375 100644 --- a/webapp/index.html +++ b/webapp/index.html @@ -1,1076 +1,787 @@ - + - Full - - - - - + +EFAST PhenoCam analysis + + + + + + -
-