efast-phenocam-validation/metrics_stats.py
Felix Delattre 5ceeeabd11 foo:
2026-05-03 17:07:25 +02:00

361 lines
12 KiB
Python

"""Metrics and statistics: temporal metrics and PhenoCam stats."""
import json
import numpy as np
from pathlib import Path
from datetime import datetime, timedelta
from scipy import sparse
from scipy.sparse.linalg import spsolve
from scipy.stats import pearsonr
WHITTAKER_LAMBDA_DAYS_SQ = 400.0
def _norm_date_key(s):
if s is None:
return None
t = str(s).strip()
return t.split("T")[0][:10] if "T" in t else t[:10]
def load_timeseries(filepath):
"""Load JSON timeseries and return dict mapping date -> value."""
if not Path(filepath).exists():
return {}
with open(filepath) as f:
data = json.load(f)
return {item["date"]: item.get("greenness_index") for item in data}
def match_dates(fusion_ts, phenocam_ts):
"""Match dates between timeseries, return aligned numpy arrays (filter None values)."""
def _bundle(m):
out = {}
for k, v in m.items():
nk = _norm_date_key(k)
if nk and nk not in out:
out[nk] = v
return out
fa, pa = _bundle(fusion_ts), _bundle(phenocam_ts)
common_dates = set(fa) & set(pa)
fusion_vals = []
phenocam_vals = []
dates = []
for date in sorted(common_dates):
fusion_val = fa[date]
phenocam_val = pa[date]
if fusion_val is not None and phenocam_val is not None:
fusion_vals.append(fusion_val)
phenocam_vals.append(phenocam_val)
dates.append(date)
return np.array(fusion_vals), np.array(phenocam_vals), dates
def pearson_correlation(y_true, y_pred):
"""Calculate Pearson correlation coefficient r."""
if len(y_true) < 2 or np.std(y_true) == 0 or np.std(y_pred) == 0:
return None
r, _ = pearsonr(y_true, y_pred)
return float(r)
def r_squared(y_true, y_pred):
"""Calculate coefficient of determination R²."""
if len(y_true) < 2 or np.std(y_true) == 0:
return None
ss_res = np.sum((y_true - y_pred) ** 2)
ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
if ss_tot == 0:
return None
return float(1 - (ss_res / ss_tot))
def rmse(y_true, y_pred):
"""Calculate Root Mean Square Error."""
if len(y_true) == 0:
return None
return float(np.sqrt(np.mean((y_true - y_pred) ** 2)))
def mae(y_true, y_pred):
"""Calculate Mean Absolute Error."""
if len(y_true) == 0:
return None
return float(np.mean(np.abs(y_true - y_pred)))
def nrmse(y_true, y_pred):
"""Calculate normalized RMSE (RMSE / mean(y_true))."""
if len(y_true) == 0:
return None
mean_val = np.mean(y_true)
if mean_val == 0:
return None
rmse_val = rmse(y_true, y_pred)
return float(rmse_val / mean_val) if rmse_val is not None else None
def nse(y_true, y_pred):
"""Calculate Nash-Sutcliffe Efficiency."""
if len(y_true) < 2:
return None
numerator = np.sum((y_true - y_pred) ** 2)
denominator = np.sum((y_true - np.mean(y_true)) ** 2)
if denominator == 0:
return None
return float(1 - (numerator / denominator))
def calculate_temporal_metrics(fusion_ts, phenocam_ts):
"""Temporal metrics vs PhenoCam (nse_pc; nse is the same value)."""
fusion_vals, phenocam_vals, dates = match_dates(fusion_ts, phenocam_ts)
if len(fusion_vals) < 2:
return None
n_pc = nse(phenocam_vals, fusion_vals)
metrics = {
"pearson_r": pearson_correlation(phenocam_vals, fusion_vals),
"r_squared": r_squared(phenocam_vals, fusion_vals),
"rmse": rmse(phenocam_vals, fusion_vals),
"mae": mae(phenocam_vals, fusion_vals),
"nrmse": nrmse(phenocam_vals, fusion_vals),
"nse_pc": n_pc,
"nse": n_pc,
"n_samples": len(fusion_vals),
"date_range": {"start": dates[0], "end": dates[-1]} if dates else None,
}
return metrics
def calculate_phenocam_stats(phenocam_ts):
"""Calculate phenocam summary statistics."""
values = [v for v in phenocam_ts.values() if v is not None]
if len(values) == 0:
return None
vals = np.array(values)
return {
"mean": float(np.mean(vals)),
"std": float(np.std(vals)),
"min": float(np.min(vals)),
"max": float(np.max(vals)),
"n_samples": len(vals),
}
def _s2_gcc_series_from_preselection(base: Path):
"""Build the raw S2 GCC series from s2_preselection.json.
Uses the 3x3 site-window band means stored per raw S2 acquisition and
computes GCC = b03 / (b02 + b03 + b04). Scale cancels, so DN vs
reflectance is irrelevant. Returns (all_gcc, flags) where all_gcc maps
YYYY-MM-DD -> gcc for every row with a positive band sum, and flags maps
the same date key -> (excluded_aggressive, excluded_nonaggressive).
"""
path = base / "raw" / "preselection" / "s2_preselection.json"
if not path.exists():
return {}, {}
with open(path) as f:
rows = json.load(f)
all_gcc: dict = {}
flags: dict = {}
for e in rows:
nk = _norm_date_key(e.get("date"))
if not nk:
continue
try:
b02 = float(e.get("b02"))
b03 = float(e.get("b03"))
b04 = float(e.get("b04"))
except (TypeError, ValueError):
continue
total = b02 + b03 + b04
if not np.isfinite(total) or total <= 0:
continue
gcc = b03 / total
if not np.isfinite(gcc):
continue
if nk in all_gcc:
continue
all_gcc[nk] = float(gcc)
flags[nk] = (
bool(e.get("excluded_aggressive")),
bool(e.get("excluded_nonaggressive")),
)
return all_gcc, flags
def _whittaker_smooth_dict(obs_dates, obs_values, lam: float, n_min: int = 3):
"""Daily Whittaker (weights 1 at obs); returns {YYYY-MM-DD: z}."""
pairs = [
(_norm_date_key(d), float(v))
for d, v in zip(obs_dates, obs_values)
if v is not None and _norm_date_key(d)
]
if len(pairs) < 2:
return {}
days = sorted({p[0] for p in pairs})
t0 = datetime.strptime(days[0], "%Y-%m-%d").date()
t1 = datetime.strptime(days[-1], "%Y-%m-%d").date()
n = (t1 - t0).days + 1
if n < n_min:
return {}
w = np.zeros(n)
y = np.zeros(n)
for dk, val in pairs:
i = (datetime.strptime(dk, "%Y-%m-%d").date() - t0).days
if 0 <= i < n:
w[i] = 1.0
y[i] = val
D = sparse.diags(
[1.0, -2.0, 1.0], [0, 1, 2], shape=(n - 2, n), format="csc", dtype=np.float64
)
H = D.T @ D
Wm = sparse.diags(w.astype(np.float64), format="csc")
z = spsolve(Wm + lam * H, w * y)
out = {}
for i in range(n):
out[(t0 + timedelta(days=i)).isoformat()] = float(z[i])
return out
def calculate_all_metrics(season, site_name, site_position):
"""Calculate metrics for all 4 scenarios and save to JSON."""
del site_position
results = {"temporal": {}}
base = Path(f"data/{site_name}/{season}")
# Load phenocam timeseries once (same for all scenarios)
phenocam_ts_path = base / "raw" / "phenocam" / "phenocam_gcc.json"
phenocam_ts = load_timeseries(phenocam_ts_path)
if not phenocam_ts:
print("[METRICS] Warning: No phenocam data found")
return results
# Calculate phenocam stats
phenocam_stats = calculate_phenocam_stats(phenocam_ts)
if phenocam_stats:
results["phenocam_stats"] = phenocam_stats
baseline = {}
all_gcc, flags = _s2_gcc_series_from_preselection(base)
if all_gcc:
m0 = calculate_temporal_metrics(all_gcc, phenocam_ts)
if m0:
baseline["s2"] = m0
for strategy, flag_idx in (("aggressive", 0), ("nonaggressive", 1)):
kept_items = sorted(
(
(d, g)
for d, g in all_gcc.items()
if d in flags and not flags[d][flag_idx]
),
key=lambda x: x[0],
)
if not kept_items:
continue
kept_ts = dict(kept_items)
mcf = calculate_temporal_metrics(kept_ts, phenocam_ts)
if mcf:
baseline.setdefault("s2_cloudfree", {})[strategy] = mcf
obs_d, obs_v = zip(*kept_items)
smooth = _whittaker_smooth_dict(obs_d, obs_v, WHITTAKER_LAMBDA_DAYS_SQ)
if smooth:
mw = calculate_temporal_metrics(smooth, phenocam_ts)
if mw:
baseline.setdefault("s2_whittaker_lambda400", {})[strategy] = mw
for strategy in ("aggressive", "nonaggressive"):
p = base / f"processed_{strategy}_sigma20" / "gcc" / "s3" / "timeseries.json"
if not p.exists():
continue
s3_ts = load_timeseries(p)
if s3_ts:
m3 = calculate_temporal_metrics(s3_ts, phenocam_ts)
if m3:
baseline.setdefault("s3", {})[strategy] = m3
if baseline:
results["baseline"] = baseline
# Calculate fusion metrics for each scenario
for strategy in ["aggressive", "nonaggressive"]:
for sigma in [20, 30]:
scenario_name = f"{strategy}_sigma{sigma}"
print(f"[METRICS] Calculating metrics for {scenario_name}...")
processed_dir = f"processed_{strategy}_sigma{sigma}"
# Load fusion timeseries
fusion_ts_path = base / processed_dir / "gcc" / "fusion" / "timeseries.json"
fusion_ts = load_timeseries(fusion_ts_path)
if not fusion_ts:
print(
f"[METRICS] Warning: Missing fusion data for {scenario_name}, skipping"
)
continue
temporal_metrics = calculate_temporal_metrics(fusion_ts, phenocam_ts)
if temporal_metrics:
results["temporal"][scenario_name] = temporal_metrics
for strategy in ["aggressive", "nonaggressive"]:
for sigma in [20, 30]:
scenario_name = f"{strategy}_sigma{sigma}_itb"
processed_dir = f"processed_{strategy}_itb_sigma{sigma}"
fusion_ts_path = base / processed_dir / "gcc" / "fusion" / "timeseries.json"
fusion_ts = load_timeseries(fusion_ts_path)
if not fusion_ts:
print(
f"[METRICS] Warning: Missing ItB fusion data for {scenario_name}, skipping"
)
continue
temporal_metrics = calculate_temporal_metrics(fusion_ts, phenocam_ts)
if temporal_metrics:
results["temporal"][scenario_name] = temporal_metrics
# Save results
output_path = Path(f"data/{site_name}/{season}/metrics.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
print(f"[METRICS] Saved results to {output_path}")
return results
def main():
"""Standalone script entry point."""
import sys
if len(sys.argv) < 4:
print("Usage: metrics_stats.py <season> <site_name> <lat> <lon>")
print("Example: metrics_stats.py 2024 innsbruck 47.116171 11.320308")
sys.exit(1)
season = int(sys.argv[1])
site_name = sys.argv[2]
site_position = (float(sys.argv[3]), float(sys.argv[4]))
results = calculate_all_metrics(season, site_name, site_position)
# Save results
output_path = Path(f"data/{site_name}/{season}/metrics.json")
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
print(f"[METRICS] Saved results to {output_path}")
if __name__ == "__main__":
main()