diff --git a/gap_validation/batch_spatial.py b/gap_validation/batch_spatial.py index 5c57ec1..9b3b5d8 100644 --- a/gap_validation/batch_spatial.py +++ b/gap_validation/batch_spatial.py @@ -7,7 +7,6 @@ import json import re from pathlib import Path -from gap_validation.calendar import DEFAULT_GAP_LENGTHS, TRANSITIONS from gap_validation.run import run_validation # Primary season per site (matches scripts/export_thesis_tables.py). @@ -47,26 +46,49 @@ def _parse_scenario(key: str) -> tuple[str, int | None, str]: return strategy, sigma if sigma == 30 else (None if sigma == 20 else sigma), mode -def _best_bti_from_metrics(metrics_path: Path) -> str | None: +def _best_from_metrics(metrics_path: Path, workflow: str) -> str | None: + """Best scenario key (max no-gap NSE_PC) for ``workflow`` (``bti`` or ``itb``).""" + if workflow not in ("bti", "itb"): + raise ValueError(f"workflow must be bti or itb, got {workflow!r}") if not metrics_path.is_file(): return None temporal = json.loads(metrics_path.read_text(encoding="utf-8")).get("temporal") or {} + want_itb = workflow == "itb" best_key, best_nse = None, None for k, v in temporal.items(): - if not k.endswith("_itb") and isinstance(v, dict): - n = v.get("nse_pc") - if isinstance(n, (int, float)) and (best_nse is None or n > best_nse): - best_nse = n - best_key = k + if k.endswith("_itb") != want_itb or not isinstance(v, dict): + continue + n = v.get("nse_pc") + if isinstance(n, (int, float)) and (best_nse is None or n > best_nse): + best_nse = n + best_key = k return best_key +def _best_bti_from_metrics(metrics_path: Path) -> str | None: + return _best_from_metrics(metrics_path, "bti") + + +def _best_itb_from_metrics(metrics_path: Path) -> str | None: + return _best_from_metrics(metrics_path, "itb") + + +def _resolve_workflows(workflow: str) -> tuple[str, ...]: + return ("bti", "itb") if workflow == "both" else (workflow,) + + def main() -> None: ap = argparse.ArgumentParser(description="Batch spatial gap validation (six sites).") ap.add_argument("--data-dir", type=Path, default=Path("data")) ap.add_argument("--sites-geojson", type=Path, default=Path("data/sites.geojson")) ap.add_argument("--skip-fusion", action="store_true") ap.add_argument("--write-manifest-only", action="store_true") + ap.add_argument( + "--workflow", + choices=["bti", "itb", "both"], + default="both", + help="Fusion workflow(s) to validate (default: both best BtI and best ItB).", + ) ap.add_argument( "--gap-days", type=int, @@ -76,6 +98,7 @@ def main() -> None: args = ap.parse_args() positions = _site_positions(args.sites_geojson) gap_filter = args.gap_days + workflows = _resolve_workflows(args.workflow) for site, season in sorted(PRIMARY_SEASON.items()): pos = positions.get(site) @@ -83,28 +106,29 @@ def main() -> None: print(f"[skip] No coordinates for {site}") continue metrics_path = args.data_dir / site / str(season) / "metrics.json" - scenario_key = _best_bti_from_metrics(metrics_path) - if not scenario_key: - print(f"[skip] {site} {season}: no metrics.json / BtI scenarios") - continue - strategy, sigma, mode = _parse_scenario(scenario_key) - sigma_kw = 30 if sigma == 30 else None - print(f"=== {site} {season} {scenario_key} ===") - out = run_validation( - site, - season, - pos, - strategy, - sigma_kw, - mode, - skip_manifest=False, - skip_fusion=args.skip_fusion, - write_manifest_only=args.write_manifest_only, - gap_days_filter=gap_filter, - transition_filter=None, - s2_calendar_strategy=strategy, - ) - print(out) + for workflow in workflows: + scenario_key = _best_from_metrics(metrics_path, workflow) + if not scenario_key: + print(f"[skip] {site} {season}: no metrics.json / {workflow} scenarios") + continue + strategy, sigma, mode = _parse_scenario(scenario_key) + sigma_kw = 30 if sigma == 30 else None + print(f"=== {site} {season} {scenario_key} ===") + out = run_validation( + site, + season, + pos, + strategy, + sigma_kw, + mode, + skip_manifest=False, + skip_fusion=args.skip_fusion, + write_manifest_only=args.write_manifest_only, + gap_days_filter=gap_filter, + transition_filter=None, + s2_calendar_strategy=strategy, + ) + print(out) if __name__ == "__main__": diff --git a/gap_validation/batch_temporal.py b/gap_validation/batch_temporal.py index 2e8ef84..3a286f7 100644 --- a/gap_validation/batch_temporal.py +++ b/gap_validation/batch_temporal.py @@ -7,8 +7,9 @@ from pathlib import Path from gap_validation.batch_spatial import ( PRIMARY_SEASON, - _best_bti_from_metrics, + _best_from_metrics, _parse_scenario, + _resolve_workflows, _site_positions, ) from gap_validation.temporal_pc import run_temporal_pc @@ -19,9 +20,16 @@ def main() -> None: ap.add_argument("--data-dir", type=Path, default=Path("data")) ap.add_argument("--sites-geojson", type=Path, default=Path("data/sites.geojson")) ap.add_argument("--skip-fusion", action="store_true") + ap.add_argument( + "--workflow", + choices=["bti", "itb", "both"], + default="both", + help="Fusion workflow(s) to validate (default: both best BtI and best ItB).", + ) ap.add_argument("--gap-days", type=int, action="append") args = ap.parse_args() positions = _site_positions(args.sites_geojson) + workflows = _resolve_workflows(args.workflow) for site, season in sorted(PRIMARY_SEASON.items()): pos = positions.get(site) @@ -29,27 +37,28 @@ def main() -> None: print(f"[skip] No coordinates for {site}") continue metrics_path = args.data_dir / site / str(season) / "metrics.json" - scenario_key = _best_bti_from_metrics(metrics_path) - if not scenario_key: - print(f"[skip] {site} {season}: no metrics.json") - continue - strategy, sigma, mode = _parse_scenario(scenario_key) - sigma_kw = 30 if sigma == 30 else None - print(f"=== {site} {season} temporal {scenario_key} ===") - out = run_temporal_pc( - site, - season, - pos, - strategy, - sigma_kw, - mode, - skip_manifest=False, - skip_fusion=args.skip_fusion, - gap_days_filter=args.gap_days, - transition_filter=None, - s2_calendar_strategy=strategy, - ) - print(out) + for workflow in workflows: + scenario_key = _best_from_metrics(metrics_path, workflow) + if not scenario_key: + print(f"[skip] {site} {season}: no metrics.json / {workflow} scenarios") + continue + strategy, sigma, mode = _parse_scenario(scenario_key) + sigma_kw = 30 if sigma == 30 else None + print(f"=== {site} {season} temporal {scenario_key} ===") + out = run_temporal_pc( + site, + season, + pos, + strategy, + sigma_kw, + mode, + skip_manifest=False, + skip_fusion=args.skip_fusion, + gap_days_filter=args.gap_days, + transition_filter=None, + s2_calendar_strategy=strategy, + ) + print(out) if __name__ == "__main__": diff --git a/gap_validation/phenology_offsets.py b/gap_validation/phenology_offsets.py index 7f779a7..19d8dea 100644 --- a/gap_validation/phenology_offsets.py +++ b/gap_validation/phenology_offsets.py @@ -12,8 +12,9 @@ from phenology_timesat import phenocam_phenology_path from gap_validation.batch_spatial import ( PRIMARY_SEASON, - _best_bti_from_metrics, + _best_from_metrics, _parse_scenario, + _resolve_workflows, _site_positions, ) from gap_validation.calendar import load_manifest, validation_dir @@ -57,11 +58,12 @@ def compute_offsets_for_site( season: int, site_position: tuple[float, float], *, + workflow: str = "bti", gap_days_list: tuple[int, ...] = (15, 30), ) -> list[dict]: base = Path(f"data/{site}/{season}") metrics_path = base / "metrics.json" - scenario_key = _best_bti_from_metrics(metrics_path) + scenario_key = _best_from_metrics(metrics_path, workflow) if not scenario_key: return [] ref_path = phenocam_phenology_path(site, season) @@ -112,14 +114,26 @@ def write_phenology_offsets( season: int, site_position: tuple[float, float], *, + workflow: str = "bti", gap_days_list: tuple[int, ...] = (15, 30), ) -> Path: rows = compute_offsets_for_site( - site, season, site_position, gap_days_list=gap_days_list + site, season, site_position, workflow=workflow, gap_days_list=gap_days_list ) - out = validation_dir(site, season) / "gap_phenology_offsets.json" - payload = {"site_name": site, "season": season, "records": rows} + vdir = validation_dir(site, season) + payload = { + "site_name": site, + "season": season, + "workflow": workflow, + "records": rows, + } + out = vdir / f"gap_phenology_offsets_{workflow}.json" out.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + if workflow == "bti": + # Legacy alias for backward-compatible readers. + (vdir / "gap_phenology_offsets.json").write_text( + json.dumps(payload, indent=2) + "\n", encoding="utf-8" + ) return out @@ -127,14 +141,22 @@ def main() -> None: ap = argparse.ArgumentParser(description="Gap fusion TIMESAT offsets vs PhenoCam.") ap.add_argument("--data-dir", type=Path, default=Path("data")) ap.add_argument("--sites-geojson", type=Path, default=Path("data/sites.geojson")) + ap.add_argument( + "--workflow", + choices=["bti", "itb", "both"], + default="both", + help="Fusion workflow(s) (default: both best BtI and best ItB).", + ) args = ap.parse_args() positions = _site_positions(args.sites_geojson) + workflows = _resolve_workflows(args.workflow) for site, season in sorted(PRIMARY_SEASON.items()): pos = positions.get(site) if not pos: continue - p = write_phenology_offsets(site, season, pos) - print(p) + for workflow in workflows: + p = write_phenology_offsets(site, season, pos, workflow=workflow) + print(p) if __name__ == "__main__": diff --git a/gap_validation/run.py b/gap_validation/run.py index 753fe56..30bf24f 100644 --- a/gap_validation/run.py +++ b/gap_validation/run.py @@ -273,8 +273,13 @@ def run_validation( } }, } - out_path = vdir / "gap_validation_summary.json" + out_path = vdir / f"gap_validation_summary_{mode}.json" out_path.write_text(json.dumps(summary, indent=2) + "\n", encoding="utf-8") + if mode == "bti": + # Legacy alias for backward-compatible readers (webapp, older scripts). + (vdir / "gap_validation_summary.json").write_text( + json.dumps(summary, indent=2) + "\n", encoding="utf-8" + ) return out_path diff --git a/gap_validation/temporal_pc.py b/gap_validation/temporal_pc.py index 40429e7..aaeaffb 100644 --- a/gap_validation/temporal_pc.py +++ b/gap_validation/temporal_pc.py @@ -19,7 +19,7 @@ from metrics_stats import ( ) from gap_validation.calendar import TRANSITIONS, load_manifest, validation_dir, write_manifest -from gap_validation.fusion_masked import run_masked_fusion_season, validation_fusion_dir +from gap_validation.fusion_masked import run_masked_fusion_season from gap_validation.run import ( _filter_entries, _scenario_key, @@ -247,8 +247,13 @@ def run_temporal_pc( } }, } - out_path = vdir / "gap_metrics.json" + out_path = vdir / f"gap_metrics_{mode}.json" out_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8") + if mode == "bti": + # Legacy alias for backward-compatible readers. + (vdir / "gap_metrics.json").write_text( + json.dumps(payload, indent=2) + "\n", encoding="utf-8" + ) return out_path diff --git a/suitability_screening.py b/suitability_screening.py new file mode 100644 index 0000000..ed0d2de --- /dev/null +++ b/suitability_screening.py @@ -0,0 +1,634 @@ +#!/usr/bin/env python3 +"""Compute per-site suitability indicators from existing pipeline outputs. + +The script is intentionally schema-tolerant: it prints one site's discovered JSON +structure first, then uses a small set of common field-name conventions to compute +SNR, S2 archive density, and S2-S3 GCC coherence. +""" + +from __future__ import annotations + +import argparse +import json +import math +import re +from collections.abc import Iterable +from pathlib import Path +from typing import Any + +import numpy as np +import pandas as pd +from scipy.interpolate import UnivariateSpline +from scipy.stats import pearsonr + + +OUTPUT_NAME = "suitability_screening.json" +SNR_THRESHOLD = 2.0 +MATCH_TOLERANCE_DAYS = 2 + + +def load_json(path: Path) -> Any | None: + if not path.is_file(): + return None + try: + with path.open("r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError) as exc: + print(f"[WARN] Could not read JSON {path}: {exc}") + return None + + +def jsonable_float(value: Any) -> float | None: + if isinstance(value, bool): + return None + try: + out = float(value) + except (TypeError, ValueError): + return None + if not math.isfinite(out): + return None + return out + + +def parse_date(value: Any) -> pd.Timestamp | None: + if value is None: + return None + if isinstance(value, pd.Timestamp): + return value.normalize() + text = str(value).strip() + if not text: + return None + match = re.search(r"(? Any: + """Return a short representation suitable for discovery logging.""" + if isinstance(value, dict): + return {k: compact(v, max_text=max_text) for k, v in list(value.items())[:12]} + if isinstance(value, list): + return [compact(v, max_text=max_text) for v in value[:2]] + text = repr(value) + if len(text) > max_text: + return text[: max_text - 3] + "..." + return value + + +def top_keys(data: Any) -> list[str]: + if isinstance(data, dict): + return list(data.keys()) + if isinstance(data, list) and data and isinstance(data[0], dict): + keys: set[str] = set() + for entry in data[:5]: + keys.update(entry.keys()) + return sorted(keys) + return [] + + +def normalize_records(data: Any) -> list[dict[str, Any]]: + """Convert common JSON shapes into a list of record dictionaries.""" + if data is None: + return [] + if isinstance(data, list): + records = [] + for item in data: + if isinstance(item, dict): + records.append(dict(item)) + else: + records.append({"value": item}) + return records + if not isinstance(data, dict): + return [{"value": data}] + + for key in ("timeseries", "time_series", "data", "entries", "results", "records"): + value = data.get(key) + if isinstance(value, list): + return normalize_records(value) + + # Dict keyed by date or filename. + if data and all(not isinstance(v, (list, tuple)) for v in data.values()): + records = [] + for key, value in data.items(): + if isinstance(value, dict): + record = dict(value) + record.setdefault("date", key) + else: + record = {"date": key, "value": value} + records.append(record) + return records + + return [dict(data)] + + +def first_records(data: Any, count: int = 2) -> list[Any]: + records = normalize_records(data) + return records[:count] + + +def recursive_snr_candidates(data: Any, prefix: str = "") -> list[tuple[str, Any]]: + found: list[tuple[str, Any]] = [] + if isinstance(data, dict): + for key, value in data.items(): + path = f"{prefix}.{key}" if prefix else str(key) + if "snr" in str(key).lower(): + found.append((path, value)) + found.extend(recursive_snr_candidates(value, path)) + elif isinstance(data, list): + for i, value in enumerate(data[:10]): + found.extend(recursive_snr_candidates(value, f"{prefix}[{i}]")) + return found + + +def find_numeric_snr(data: Any) -> float | None: + candidates = recursive_snr_candidates(data) + # Prefer exact leaf keys named "snr"; fall back to any numeric snr-containing key. + candidates.sort(key=lambda kv: 0 if kv[0].split(".")[-1].lower() == "snr" else 1) + for _, value in candidates: + numeric = jsonable_float(value) + if numeric is not None: + return numeric + if isinstance(value, dict): + nested = value.get("snr") + numeric = jsonable_float(nested) + if numeric is not None: + return numeric + return None + + +def find_site_roots(base_dir: Path) -> list[tuple[str, Path]]: + """Find direct site roots, plus the repo's common site/year layout.""" + roots: list[tuple[str, Path]] = [] + if not base_dir.is_dir(): + return roots + + def looks_like_site_root(path: Path) -> bool: + return any( + ( + (path / "metrics.json").exists(), + (path / "raw" / "preselection").exists(), + (path / "phenocam").exists(), + (path / "raw" / "phenocam").exists(), + ) + ) + + for child in sorted(p for p in base_dir.iterdir() if p.is_dir()): + if looks_like_site_root(child): + roots.append((child.name, child)) + continue + for grandchild in sorted(p for p in child.iterdir() if p.is_dir()): + if looks_like_site_root(grandchild): + name = child.name if grandchild.name.isdigit() else f"{child.name}_{grandchild.name}" + roots.append((name, grandchild)) + + return roots + + +def find_s2_preselection(site_root: Path) -> Path | None: + candidates = [ + site_root / "raw" / "preselection" / "s2_preselection.json", + site_root / "preselection" / "s2_preselection.json", + ] + return next((p for p in candidates if p.is_file()), None) + + +def find_s3_timeseries(site_root: Path) -> Path | None: + candidates = [ + site_root / "processed_aggressive_sigma20" / "gcc" / "s3" / "timeseries.json", + site_root / "processed_aggressive_itb_sigma20" / "gcc" / "s3" / "timeseries.json", + ] + for candidate in candidates: + if candidate.is_file(): + return candidate + matches = sorted(site_root.glob("processed*aggressive*sigma20*/gcc/s3/timeseries.json")) + return matches[0] if matches else None + + +def find_metrics(site_root: Path) -> Path | None: + path = site_root / "metrics.json" + return path if path.is_file() else None + + +def find_phenocam(site_root: Path) -> Path | None: + candidates = [ + site_root / "phenocam" / "gcc_90.json", + site_root / "phenocam" / "phenocam_gcc.json", + site_root / "raw" / "phenocam" / "gcc_90.json", + site_root / "raw" / "phenocam" / "phenocam_gcc.json", + ] + for candidate in candidates: + if candidate.is_file(): + return candidate + patterns = [ + "phenocam/*gcc*90*.json", + "phenocam/*gcc*.json", + "raw/phenocam/*gcc*90*.json", + "raw/phenocam/*gcc*.json", + "raw/phenocam/*.json", + ] + for pattern in patterns: + matches = sorted(site_root.glob(pattern)) + if matches: + return matches[0] + return None + + +def print_structure(label: str, path: Path | None) -> None: + print(f"\n[{label}]") + if path is None: + print("missing") + return + data = load_json(path) + print(f"path: {path}") + print(f"type: {type(data).__name__}") + print(f"keys: {top_keys(data)}") + records = [] if label == "metrics.json" else first_records(data, 2) + if records: + print(f"first {len(records)} entr{'y' if len(records) == 1 else 'ies'}:") + print(json.dumps(compact(records), indent=2, default=str)) + if label == "metrics.json": + snr = recursive_snr_candidates(data) + phenocam_keys = [] + if isinstance(data, dict): + for key, value in data.items(): + if "phenocam" in str(key).lower(): + phenocam_keys.append((key, top_keys(value))) + print(f"phenocam-like keys: {phenocam_keys}") + print(f"snr-like keys: {[(path, compact(value)) for path, value in snr]}") + + +def run_discovery(site_name: str, site_root: Path) -> None: + print("\n=== Discovery mode ===") + print(f"Using site: {site_name} ({site_root})") + print_structure("s2_preselection.json", find_s2_preselection(site_root)) + print_structure("S3 timeseries.json", find_s3_timeseries(site_root)) + print_structure("metrics.json", find_metrics(site_root)) + print_structure("PhenoCam gcc_90 file", find_phenocam(site_root)) + print("\n=== Computing indicators ===") + + +def choose_discovery_site(site_roots: list[tuple[str, Path]]) -> tuple[str, Path]: + def score(item: tuple[str, Path]) -> int: + _, root = item + return sum( + int(path is not None) + for path in ( + find_s2_preselection(root), + find_s3_timeseries(root), + find_metrics(root), + find_phenocam(root), + ) + ) + + return max(site_roots, key=score) + + +def truthy_status(value: Any, *, field_name: str | None = None) -> bool | None: + if isinstance(value, bool): + if field_name and any(word in field_name.lower() for word in ("reject", "exclude")): + return not value + return value + if value is None: + return True + if isinstance(value, (int, float)) and not isinstance(value, bool): + if field_name and any(word in field_name.lower() for word in ("reject", "exclude")): + return not bool(value) + return bool(value) + text = str(value).strip().lower() + if text in {"", "none", "null", "nan", "ok", "pass", "passed", "keep", "kept", "valid", "selected"}: + return True + if text in { + "fail", + "failed", + "false", + "reject", + "rejected", + "exclude", + "excluded", + "invalid", + "cloud", + "cloudy", + "dark", + "bad", + }: + return False + if field_name and any(word in field_name.lower() for word in ("reason", "status")): + return False + return None + + +def acquisition_passes(entry: dict[str, Any], strategy: str) -> bool: + strategy_aliases = { + strategy, + strategy.replace("nonaggressive", "non_aggressive"), + strategy.replace("nonaggressive", "non-aggressive"), + } + negative_prefixes = ("excluded", "exclude", "rejected", "reject") + positive_prefixes = ("passed", "pass", "keep", "kept", "valid", "selected") + + for alias in strategy_aliases: + for prefix in negative_prefixes: + key = f"{prefix}_{alias}" + if key in entry: + return not bool(entry[key]) + for prefix in positive_prefixes: + key = f"{prefix}_{alias}" + if key in entry: + return bool(entry[key]) + + for alias in strategy_aliases: + nested = entry.get(alias) + if isinstance(nested, dict): + for key, value in nested.items(): + passed = truthy_status(value, field_name=key) + if passed is not None: + return passed + elif nested is not None: + passed = truthy_status(nested, field_name=alias) + if passed is not None: + return passed + + # Generic status fields. + for key in (*negative_prefixes, *positive_prefixes, "status", "strategy", "reason", "rejection_reason"): + if key in entry: + passed = truthy_status(entry[key], field_name=key) + if passed is not None: + return passed + + # Dict keyed by date with a scalar rejection reason. + if "value" in entry and len(entry) <= 3: + passed = truthy_status(entry.get("value"), field_name="value") + if passed is not None: + return passed + + # Existing pipeline entries with band means and no rejection marker are usable. + return True + + +def band_value(entry: dict[str, Any], names: Iterable[str]) -> float | None: + lowered = {str(k).lower(): v for k, v in entry.items()} + for name in names: + if name.lower() in lowered: + value = jsonable_float(lowered[name.lower()]) + if value is not None: + return value + for container_key in ("bands", "band_means", "reflectance", "reflectances", "means", "window_means"): + container = entry.get(container_key) + if isinstance(container, dict): + value = band_value(container, names) + if value is not None: + return value + return None + + +def entry_date(entry: dict[str, Any]) -> pd.Timestamp | None: + for key in ("date", "datetime", "time", "timestamp", "acquisition_date"): + if key in entry: + date = parse_date(entry[key]) + if date is not None: + return date + for key in ("filename", "file", "path", "name"): + if key in entry: + date = parse_date(entry[key]) + if date is not None: + return date + return None + + +def s2_gcc_series(s2_data: Any) -> pd.DataFrame: + rows = [] + for entry in normalize_records(s2_data): + if not isinstance(entry, dict) or not acquisition_passes(entry, "aggressive"): + continue + date = entry_date(entry) + blue = band_value(entry, ("b02", "blue", "B02", "band_1", "band1")) + green = band_value(entry, ("b03", "green", "B03", "band_2", "band2")) + red = band_value(entry, ("b04", "red", "B04", "band_3", "band3")) + if date is None or blue is None or green is None or red is None: + continue + denom = blue + green + red + if denom <= 0: + continue + rows.append({"date": date, "s2_gcc": green / denom}) + if not rows: + return pd.DataFrame(columns=["date", "s2_gcc"]) + return pd.DataFrame(rows).groupby("date", as_index=False)["s2_gcc"].mean().sort_values("date") + + +def value_from_record(entry: dict[str, Any], preferred: Iterable[str]) -> float | None: + lowered = {str(k).lower(): v for k, v in entry.items()} + for name in preferred: + value = jsonable_float(lowered.get(name.lower())) + if value is not None: + return value + for key, value in lowered.items(): + if any(token in key for token in ("gcc", "greenness")): + numeric = jsonable_float(value) + if numeric is not None: + return numeric + return None + + +def gcc_timeseries(data: Any, value_name: str) -> pd.DataFrame: + rows = [] + for entry in normalize_records(data): + if not isinstance(entry, dict): + continue + date = entry_date(entry) + value = value_from_record( + entry, + ("greenness_index", "gcc_90", "gcc", "value", "mean", "site_value"), + ) + if date is not None and value is not None: + rows.append({"date": date, value_name: value}) + if not rows: + return pd.DataFrame(columns=["date", value_name]) + return pd.DataFrame(rows).groupby("date", as_index=False)[value_name].mean().sort_values("date") + + +def compute_archive_density(s2_data: Any | None) -> tuple[int | None, int | None]: + if s2_data is None: + return None, None + records = [entry for entry in normalize_records(s2_data) if isinstance(entry, dict)] + if not records: + return None, None + aggressive = sum(1 for entry in records if acquisition_passes(entry, "aggressive")) + nonaggressive = sum(1 for entry in records if acquisition_passes(entry, "nonaggressive")) + return aggressive, nonaggressive + + +def compute_coherence(s2_data: Any | None, s3_data: Any | None) -> tuple[int | None, float | None, float | None]: + if s2_data is None or s3_data is None: + return None, None, None + s2 = s2_gcc_series(s2_data) + s3 = gcc_timeseries(s3_data, "s3_gcc") + if s2.empty or s3.empty: + return 0, None, None + + matched = pd.merge_asof( + s2.sort_values("date"), + s3.sort_values("date"), + on="date", + direction="nearest", + tolerance=pd.Timedelta(days=MATCH_TOLERANCE_DAYS), + ).dropna(subset=["s2_gcc", "s3_gcc"]) + n = int(len(matched)) + if n < 2: + return n, None, None + r, p_value = pearsonr(matched["s2_gcc"].to_numpy(), matched["s3_gcc"].to_numpy()) + return n, jsonable_float(r), jsonable_float(p_value) + + +def phenocam_series(data: Any | None) -> pd.DataFrame: + if data is None: + return pd.DataFrame(columns=["date", "gcc"]) + rows = [] + for entry in normalize_records(data): + if isinstance(entry, dict): + date = entry_date(entry) + value = value_from_record( + entry, + ("gcc_90", "greenness_index", "gcc", "gcc_mean", "value"), + ) + else: + date = None + value = jsonable_float(entry) + if date is not None and value is not None: + rows.append({"date": date, "gcc": value}) + if not rows: + return pd.DataFrame(columns=["date", "gcc"]) + return pd.DataFrame(rows).groupby("date", as_index=False)["gcc"].mean().sort_values("date") + + +def compute_snr_from_phenocam(phenocam_data: Any | None) -> float | None: + series = phenocam_series(phenocam_data) + if len(series) < 5: + return None + x = (series["date"] - series["date"].min()).dt.days.to_numpy(dtype=float) + y = series["gcc"].to_numpy(dtype=float) + if len(np.unique(x)) < 5: + return None + try: + spline = UnivariateSpline(x, y, k=3) + residual = y - spline(x) + except Exception as exc: + print(f"[WARN] Could not fit PhenoCam smoothing spline: {exc}") + return None + rmse = float(np.sqrt(np.mean(residual**2))) + amplitude = float(np.max(y) - np.min(y)) + if rmse <= 0: + return None + return amplitude / rmse + + +def compute_snr(metrics_data: Any | None, phenocam_data: Any | None) -> float | None: + from_metrics = find_numeric_snr(metrics_data) + if from_metrics is not None: + return from_metrics + return compute_snr_from_phenocam(phenocam_data) + + +def compute_site(site_root: Path) -> dict[str, Any]: + s2_data = load_json(find_s2_preselection(site_root) or Path("__missing__")) + s3_data = load_json(find_s3_timeseries(site_root) or Path("__missing__")) + metrics_data = load_json(find_metrics(site_root) or Path("__missing__")) + phenocam_data = load_json(find_phenocam(site_root) or Path("__missing__")) + + snr = compute_snr(metrics_data, phenocam_data) + n_s2_aggressive, n_s2_nonaggressive = compute_archive_density(s2_data) + n_matched, pearson_r, p_value = compute_coherence(s2_data, s3_data) + + return { + "snr": snr, + "snr_pass": None if snr is None else snr >= SNR_THRESHOLD, + "n_s2_aggressive": n_s2_aggressive, + "n_s2_nonaggressive": n_s2_nonaggressive, + "coherence_n_matched": n_matched, + "coherence_pearson_r": pearson_r, + "coherence_p_value": p_value, + } + + +def print_summary(results: dict[str, dict[str, Any]]) -> None: + print("\nSuitability summary") + if not results: + print("(no sites found)") + return + + columns = [ + ("site", "site"), + ("snr", "snr"), + ("snr_pass", "pass"), + ("n_s2_aggressive", "n_s2_agg"), + ("n_s2_nonaggressive", "n_s2_nonagg"), + ("coherence_n_matched", "n_match"), + ("coherence_pearson_r", "pearson_r"), + ("coherence_p_value", "p_value"), + ] + + def fmt(value: Any, key: str) -> str: + if value is None: + return "null" + if key.startswith("n_") or key == "coherence_n_matched": + return str(int(value)) + if isinstance(value, bool): + return "true" if value else "false" + if isinstance(value, (int, float)): + return f"{float(value):.4g}" + return str(value) + + rows = [] + for site, values in results.items(): + rows.append([site, *[fmt(values.get(key), key) for key, _ in columns[1:]]]) + widths = [ + max(len(header), *(len(row[i]) for row in rows)) + for i, (_, header) in enumerate(columns) + ] + header = " ".join(header.ljust(widths[i]) for i, (_, header) in enumerate(columns)) + print(header) + print(" ".join("-" * width for width in widths)) + for row in rows: + print(" ".join(row[i].ljust(widths[i]) for i in range(len(columns)))) + + +def main() -> int: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--base-dir", + required=True, + type=Path, + help="Pipeline output root containing one subdirectory per site.", + ) + args = parser.parse_args() + + base_dir = args.base_dir.expanduser().resolve() + site_roots = find_site_roots(base_dir) + if site_roots: + run_discovery(*choose_discovery_site(site_roots)) + else: + print(f"[WARN] No site directories found under {base_dir}") + + results = {site_name: compute_site(site_root) for site_name, site_root in site_roots} + output_path = base_dir / OUTPUT_NAME + with output_path.open("w", encoding="utf-8") as f: + json.dump(results, f, indent=2, allow_nan=False) + f.write("\n") + print_summary(results) + print(f"\nWrote {output_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())