"""Metrics and statistics: temporal metrics and PhenoCam stats.""" import json import numpy as np from pathlib import Path from datetime import datetime, timedelta from scipy import sparse from scipy.sparse.linalg import spsolve from scipy.stats import pearsonr WHITTAKER_LAMBDA_DAYS_SQ = 400.0 def _norm_date_key(s): if s is None: return None t = str(s).strip() return t.split("T")[0][:10] if "T" in t else t[:10] def load_timeseries(filepath): """Load JSON timeseries and return dict mapping date -> value.""" if not Path(filepath).exists(): return {} with open(filepath) as f: data = json.load(f) return {item["date"]: item.get("greenness_index") for item in data} def match_dates(fusion_ts, phenocam_ts): """Match dates between timeseries, return aligned numpy arrays (filter None values).""" def _bundle(m): out = {} for k, v in m.items(): nk = _norm_date_key(k) if nk and nk not in out: out[nk] = v return out fa, pa = _bundle(fusion_ts), _bundle(phenocam_ts) common_dates = set(fa) & set(pa) fusion_vals = [] phenocam_vals = [] dates = [] for date in sorted(common_dates): fusion_val = fa[date] phenocam_val = pa[date] if fusion_val is not None and phenocam_val is not None: fusion_vals.append(fusion_val) phenocam_vals.append(phenocam_val) dates.append(date) return np.array(fusion_vals), np.array(phenocam_vals), dates def pearson_correlation(y_true, y_pred): """Calculate Pearson correlation coefficient r.""" if len(y_true) < 2 or np.std(y_true) == 0 or np.std(y_pred) == 0: return None r, _ = pearsonr(y_true, y_pred) return float(r) def r_squared(y_true, y_pred): """Calculate coefficient of determination R².""" if len(y_true) < 2 or np.std(y_true) == 0: return None ss_res = np.sum((y_true - y_pred) ** 2) ss_tot = np.sum((y_true - np.mean(y_true)) ** 2) if ss_tot == 0: return None return float(1 - (ss_res / ss_tot)) def rmse(y_true, y_pred): """Calculate Root Mean Square Error.""" if len(y_true) == 0: return None return float(np.sqrt(np.mean((y_true - y_pred) ** 2))) def mae(y_true, y_pred): """Calculate Mean Absolute Error.""" if len(y_true) == 0: return None return float(np.mean(np.abs(y_true - y_pred))) def nrmse(y_true, y_pred): """Calculate normalized RMSE (RMSE / mean(y_true)).""" if len(y_true) == 0: return None mean_val = np.mean(y_true) if mean_val == 0: return None rmse_val = rmse(y_true, y_pred) return float(rmse_val / mean_val) if rmse_val is not None else None def nse(y_true, y_pred): """Calculate Nash-Sutcliffe Efficiency.""" if len(y_true) < 2: return None numerator = np.sum((y_true - y_pred) ** 2) denominator = np.sum((y_true - np.mean(y_true)) ** 2) if denominator == 0: return None return float(1 - (numerator / denominator)) def calculate_temporal_metrics(fusion_ts, phenocam_ts): """Temporal metrics vs PhenoCam (nse_pc; nse is the same value).""" fusion_vals, phenocam_vals, dates = match_dates(fusion_ts, phenocam_ts) if len(fusion_vals) < 2: return None n_pc = nse(phenocam_vals, fusion_vals) metrics = { "pearson_r": pearson_correlation(phenocam_vals, fusion_vals), "r_squared": r_squared(phenocam_vals, fusion_vals), "rmse": rmse(phenocam_vals, fusion_vals), "mae": mae(phenocam_vals, fusion_vals), "nrmse": nrmse(phenocam_vals, fusion_vals), "nse_pc": n_pc, "nse": n_pc, "n_samples": len(fusion_vals), "date_range": {"start": dates[0], "end": dates[-1]} if dates else None, } return metrics def calculate_phenocam_stats(phenocam_ts): """Calculate phenocam summary statistics.""" values = [v for v in phenocam_ts.values() if v is not None] if len(values) == 0: return None vals = np.array(values) return { "mean": float(np.mean(vals)), "std": float(np.std(vals)), "min": float(np.min(vals)), "max": float(np.max(vals)), "n_samples": len(vals), } def _s2_kept_date_set(base: Path, strategy: str) -> set: path = base / "raw" / "preselection" / "s2_preselection.json" if not path.exists(): return set() with open(path) as f: rows = json.load(f) key = f"excluded_{strategy}" out = set() for e in rows: if e.get(key): continue nk = _norm_date_key(e.get("date")) if nk: out.add(nk) return out def _whittaker_smooth_dict(obs_dates, obs_values, lam: float, n_min: int = 3): """Daily Whittaker (weights 1 at obs); returns {YYYY-MM-DD: z}.""" pairs = [ (_norm_date_key(d), float(v)) for d, v in zip(obs_dates, obs_values) if v is not None and _norm_date_key(d) ] if len(pairs) < 2: return {} days = sorted({p[0] for p in pairs}) t0 = datetime.strptime(days[0], "%Y-%m-%d").date() t1 = datetime.strptime(days[-1], "%Y-%m-%d").date() n = (t1 - t0).days + 1 if n < n_min: return {} w = np.zeros(n) y = np.zeros(n) for dk, val in pairs: i = (datetime.strptime(dk, "%Y-%m-%d").date() - t0).days if 0 <= i < n: w[i] = 1.0 y[i] = val D = sparse.diags( [1.0, -2.0, 1.0], [0, 1, 2], shape=(n - 2, n), format="csc", dtype=np.float64 ) H = D.T @ D Wm = sparse.diags(w.astype(np.float64), format="csc") z = spsolve(Wm + lam * H, w * y) out = {} for i in range(n): out[(t0 + timedelta(days=i)).isoformat()] = float(z[i]) return out def calculate_all_metrics(season, site_name, site_position): """Calculate metrics for all 4 scenarios and save to JSON.""" del site_position results = {"temporal": {}} base = Path(f"data/{site_name}/{season}") # Load phenocam timeseries once (same for all scenarios) phenocam_ts_path = base / "raw" / "phenocam" / "phenocam_gcc.json" phenocam_ts = load_timeseries(phenocam_ts_path) if not phenocam_ts: print("[METRICS] Warning: No phenocam data found") return results # Calculate phenocam stats phenocam_stats = calculate_phenocam_stats(phenocam_ts) if phenocam_stats: results["phenocam_stats"] = phenocam_stats baseline = {} s2_ts = {} for sub in ("processed_aggressive_sigma20", "processed_nonaggressive_sigma20"): p = base / sub / "gcc" / "s2" / "timeseries.json" if p.exists(): s2_ts = load_timeseries(p) if s2_ts: break if s2_ts: m0 = calculate_temporal_metrics(s2_ts, phenocam_ts) if m0: baseline["s2"] = m0 for strategy in ("aggressive", "nonaggressive"): kept = _s2_kept_date_set(base, strategy) filtered = [ (k, v) for k, v in sorted( s2_ts.items(), key=lambda x: _norm_date_key(x[0]) or "" ) if _norm_date_key(k) in kept and v is not None ] if not filtered: continue sub_ts = dict(filtered) mcf = calculate_temporal_metrics(sub_ts, phenocam_ts) if mcf: baseline.setdefault("s2_cloudfree", {})[strategy] = mcf obs_d, obs_v = zip(*filtered) smooth = _whittaker_smooth_dict(obs_d, obs_v, WHITTAKER_LAMBDA_DAYS_SQ) if smooth: mw = calculate_temporal_metrics(smooth, phenocam_ts) if mw: baseline.setdefault("s2_whittaker_lambda400", {})[strategy] = mw for strategy in ("aggressive", "nonaggressive"): p = base / f"processed_{strategy}_sigma20" / "gcc" / "s3" / "timeseries.json" if not p.exists(): continue s3_ts = load_timeseries(p) if s3_ts: m3 = calculate_temporal_metrics(s3_ts, phenocam_ts) if m3: baseline.setdefault("s3", {})[strategy] = m3 if baseline: results["baseline"] = baseline # Calculate fusion metrics for each scenario for strategy in ["aggressive", "nonaggressive"]: for sigma in [20, 30]: scenario_name = f"{strategy}_sigma{sigma}" print(f"[METRICS] Calculating metrics for {scenario_name}...") processed_dir = f"processed_{strategy}_sigma{sigma}" # Load fusion timeseries fusion_ts_path = base / processed_dir / "gcc" / "fusion" / "timeseries.json" fusion_ts = load_timeseries(fusion_ts_path) if not fusion_ts: print( f"[METRICS] Warning: Missing fusion data for {scenario_name}, skipping" ) continue temporal_metrics = calculate_temporal_metrics(fusion_ts, phenocam_ts) if temporal_metrics: results["temporal"][scenario_name] = temporal_metrics for strategy in ["aggressive", "nonaggressive"]: for sigma in [20, 30]: scenario_name = f"{strategy}_sigma{sigma}_itb" processed_dir = f"processed_{strategy}_itb_sigma{sigma}" fusion_ts_path = base / processed_dir / "gcc" / "fusion" / "timeseries.json" fusion_ts = load_timeseries(fusion_ts_path) if not fusion_ts: print( f"[METRICS] Warning: Missing ItB fusion data for {scenario_name}, skipping" ) continue temporal_metrics = calculate_temporal_metrics(fusion_ts, phenocam_ts) if temporal_metrics: results["temporal"][scenario_name] = temporal_metrics # Save results output_path = Path(f"data/{site_name}/{season}/metrics.json") output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: json.dump(results, f, indent=2) print(f"[METRICS] Saved results to {output_path}") return results def main(): """Standalone script entry point.""" import sys if len(sys.argv) < 4: print("Usage: metrics_stats.py ") print("Example: metrics_stats.py 2024 innsbruck 47.116171 11.320308") sys.exit(1) season = int(sys.argv[1]) site_name = sys.argv[2] site_position = (float(sys.argv[3]), float(sys.argv[4])) results = calculate_all_metrics(season, site_name, site_position) # Save results output_path = Path(f"data/{site_name}/{season}/metrics.json") output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: json.dump(results, f, indent=2) print(f"[METRICS] Saved results to {output_path}") if __name__ == "__main__": main()