efast-phenocam-validation/gap_validation/spatial_metrics.py
Felix Delattre e3af4bf2f4 foo
2026-05-29 08:41:44 +02:00

234 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Per-pixel GCC vs withheld S2; NSE (nse_s2); no-gap baseline; deltas."""
from __future__ import annotations
from pathlib import Path
import numpy as np
import rasterio
from rasterio.warp import reproject, Resampling
from scipy.stats import pearsonr
# Match postprocessing valid mask on reflectance (METH / postprocessing.py).
VALID_REFL_THRESHOLD = 0.001
GCC_DENOM_EPS = 1e-3
MAX_REPORTED_NSE_S2 = 20.0
def _gcc_from_rgb(blue: np.ndarray, green: np.ndarray, red: np.ndarray) -> np.ndarray:
t = red.astype(np.float64) + green.astype(np.float64) + blue.astype(np.float64)
out = np.full_like(blue, np.nan, dtype=np.float64)
m = (
np.isfinite(t)
& (t >= GCC_DENOM_EPS)
& np.isfinite(blue)
& np.isfinite(green)
& np.isfinite(red)
& (blue > GCC_DENOM_EPS)
& (green > GCC_DENOM_EPS)
& (red > GCC_DENOM_EPS)
)
out[m] = green[m].astype(np.float64) / t[m]
return out.astype(np.float32)
def _positive_bgr_mask(fusion_path: Path) -> np.ndarray | None:
"""Pixels with strictly positive blue, green, red (BtI REFL); None if not applicable."""
with rasterio.open(fusion_path) as src:
if src.count < 3:
return None
stacks = src.read(indexes=[1, 2, 3]).astype(np.float32)
return np.isfinite(stacks).all(axis=0) & (stacks > GCC_DENOM_EPS).all(axis=0)
def read_fused_gcc(fusion_path: Path) -> tuple[np.ndarray, dict]:
"""Fused GCC: BtI from 4-band REFL or ItB single-band GCC."""
with rasterio.open(fusion_path) as src:
if src.count >= 4:
b = src.read(1).astype(np.float32)
g = src.read(2).astype(np.float32)
r = src.read(3).astype(np.float32)
gcc = _gcc_from_rgb(b, g, r)
else:
gcc = src.read(1).astype(np.float32)
prof = src.profile.copy()
return gcc, prof
def warp_refl_bands_to_grid(
refl_path: Path,
height: int,
width: int,
transform,
crs,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Resample S2 REFL blue/green/red to fusion grid (bilinear)."""
with rasterio.open(refl_path) as src:
b = np.empty((height, width), dtype=np.float32)
g = np.empty((height, width), dtype=np.float32)
r = np.empty((height, width), dtype=np.float32)
for i, dst in enumerate((b, g, r), start=1):
reproject(
source=rasterio.band(src, i),
destination=dst,
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=crs,
resampling=Resampling.bilinear,
)
return b, g, r
def valid_mask_fused(fusion_path: Path, mode: str) -> np.ndarray:
"""Valid pixels: BtI uses REFL-style mask; ItB uses single-band GCC (postprocessing ItB)."""
with rasterio.open(fusion_path) as src:
if mode == "itb" or src.count < 4:
d = src.read(1).astype(np.float32)
return np.isfinite(d) & (d > VALID_REFL_THRESHOLD)
stacks = src.read().astype(np.float32)
with np.errstate(all="ignore"):
mx = np.nanmax(stacks, axis=0)
ok = np.isfinite(stacks).all(axis=0) & np.isfinite(mx) & (
mx > VALID_REFL_THRESHOLD
)
return ok
def spatial_scores(
y_true_gcc: np.ndarray,
y_pred_gcc: np.ndarray,
mask: np.ndarray,
) -> dict:
"""RMSE, MAE, mean bias, Pearson r, nse_s2 (NashSutcliffe vs spatial truth)."""
yt = y_true_gcc[mask].astype(np.float64).ravel()
yp = y_pred_gcc[mask].astype(np.float64).ravel()
n = int(yt.size)
if n < 2:
return {"n_pixels": n}
mean_t = float(np.mean(yt))
rmse = float(np.sqrt(np.mean((yt - yp) ** 2)))
mae = float(np.mean(np.abs(yt - yp)))
bias = float(np.mean(yp - yt))
den = float(np.sum((yt - mean_t) ** 2))
nse_s2 = None
if den > 0:
raw = float(1.0 - np.sum((yt - yp) ** 2) / den)
if abs(raw) <= MAX_REPORTED_NSE_S2:
nse_s2 = raw
r = None
if np.std(yt) > 0 and np.std(yp) > 0:
r = float(pearsonr(yt, yp)[0])
return {
"n_pixels": n,
"rmse": rmse,
"mae": mae,
"mean_bias": bias,
"pearson_r": r,
"nse_s2": nse_s2,
}
def withheld_gcc_on_fusion_grid(
withheld_refl_path: Path, fused_path: Path
) -> tuple[np.ndarray, np.ndarray, dict]:
"""``y_true`` GCC (withheld S2) and ``y_pred`` GCC from ``fused_path``, same grid."""
yp, prof = read_fused_gcc(fused_path)
h, w = yp.shape
b, g, r = warp_refl_bands_to_grid(
withheld_refl_path, h, w, prof["transform"], prof["crs"]
)
yt = _gcc_from_rgb(b, g, r)
return yt, yp, prof
def mask_gap_whittaker(
yt: np.ndarray,
y_gap: np.ndarray,
fused_gap_path: Path,
mode: str,
) -> np.ndarray:
"""Mask for gap fusion and Whittaker vs withheld S2 (does not require no-gap fusion)."""
m = (
valid_mask_fused(fused_gap_path, mode)
& np.isfinite(yt)
& np.isfinite(y_gap)
& (yt > VALID_REFL_THRESHOLD)
& (yt <= 1.0)
& (y_gap > VALID_REFL_THRESHOLD)
& (y_gap <= 1.0)
)
pos = _positive_bgr_mask(fused_gap_path)
if pos is not None:
m &= pos
return m
def common_valid_mask(
yt: np.ndarray,
y_gap: np.ndarray,
y_nogap: np.ndarray | None,
fused_gap_path: Path,
mode: str,
) -> np.ndarray:
"""Mask including no-gap fusion when computing gap-vs-no-gap deltas (internal QA)."""
m = mask_gap_whittaker(yt, y_gap, fused_gap_path, mode)
if y_nogap is not None:
m &= (
np.isfinite(y_nogap)
& (y_nogap > VALID_REFL_THRESHOLD)
& (y_nogap <= 1.0)
)
return m
def evaluate_gap_vs_withheld(
withheld_refl_path: Path,
fused_gap_path: Path,
fused_nogap_path: Path | None,
mode: str,
*,
whittaker_context: tuple[Path, str, str, str, str, str] | None = None,
) -> dict:
"""Spatial metrics for gap and no-gap; optional Whittaker constant-field vs withheld S2.
``delta_rmse`` / ``delta_nse`` compare gap vs no-gap fusion on a shared mask (QA only;
``delta_nse`` = NSE_no_gap NSE_gap, not exported to thesis tables).
"""
yt, y_gap, _prof = withheld_gcc_on_fusion_grid(withheld_refl_path, fused_gap_path)
y_nogap = None
if fused_nogap_path is not None and fused_nogap_path.is_file():
y_nogap, _ = read_fused_gcc(fused_nogap_path)
mask_gw = mask_gap_whittaker(yt, y_gap, fused_gap_path, mode)
out: dict = {"gap": spatial_scores(yt, y_gap, mask_gw)}
if y_nogap is not None:
mask_full = common_valid_mask(yt, y_gap, y_nogap, fused_gap_path, mode)
out["no_gap"] = spatial_scores(yt, y_nogap, mask_full)
g, ng = out["gap"], out["no_gap"]
if g.get("rmse") is not None and ng.get("rmse") is not None:
out["delta_rmse"] = float(g["rmse"] - ng["rmse"])
if g.get("nse_s2") is not None and ng.get("nse_s2") is not None:
out["delta_nse"] = float(ng["nse_s2"] - g["nse_s2"])
if whittaker_context is not None:
from gap_validation.whittaker_compare import whittaker_gcc_on_gap_masked_series
base, strategy, prediction_iso, withheld_iso, w0, w1 = whittaker_context
wgcc = whittaker_gcc_on_gap_masked_series(
base,
strategy,
prediction_iso,
withheld_iso,
window_start_iso=w0,
window_end_iso=w1,
)
if wgcc is not None:
out["whittaker"] = constant_field_scores(yt, float(wgcc), mask_gw)
return out
def constant_field_scores(
y_true_gcc: np.ndarray, scalar: float, mask: np.ndarray
) -> dict:
"""NSE / RMSE when prediction is a spatially constant Whittaker value (same mask as fusion)."""
yp = np.full_like(y_true_gcc, scalar, dtype=np.float32)
return spatial_scores(y_true_gcc, yp, mask)