efast-phenocam-validation/4-fusion.py
Felix Delattre d29754e4a5 Ran linter.
2026-06-11 16:33:14 +02:00

344 lines
12 KiB
Python

"""Step 4: Compute GCC and run EFAST BtI + ItB fusion for prepared sites.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — ``*_REFL.tif`` + ``*_DIST_CLOUD.tif``
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — ``composite_*.tif`` (4-band)
Outputs (``data/``):
- ``sentinel_data/{year}/{sitename}/prepared/s2/*_GCC.tif`` — S2 GCC (in-place)
- ``sentinel_data/{year}/{sitename}/prepared/gcc_s3/*.tif`` — S3 GCC composites
- ``fusion/{year}/{sitename}/bti/fusion/REFL_*.tif`` — BtI fused 4-band reflectance
- ``fusion/{year}/{sitename}/bti/gcc/GCC_*.tif`` — GCC derived from BtI fusion
- ``fusion/{year}/{sitename}/itb/s2/GCC_*.tif`` — per-acquisition S2 GCC (simplified names)
- ``fusion/{year}/{sitename}/itb/s3/GCC_*.tif`` — per-composite S3 GCC (simplified names)
- ``fusion/{year}/{sitename}/itb/fusion/GCC_*.tif`` — ItB fused GCC
Requires ``uv sync`` (efast).
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` (optional; default: all prepared sites under ``sentinel_data/{year}/``)
Prior step: :mod:`3-sentinel-data`.
"""
from __future__ import annotations
import argparse
import shutil
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
import numpy as np
import rasterio
from dateutil import rrule
# ---------------------------------------------------------------------------
# Public constants
# ---------------------------------------------------------------------------
RESOLUTION_RATIO = 30
MOSAIC_STEP = 2
MAX_DAYS = 100
MINIMUM_ACQUISITION_IMPORTANCE = 0
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# ---------------------------------------------------------------------------
# efast import helper
# ---------------------------------------------------------------------------
def _import_efast():
try:
import efast.efast as efast_module
return efast_module
except ImportError as exc:
raise ImportError("efast not found. Install with: uv sync") from exc
# ---------------------------------------------------------------------------
# GCC computation (from s2_cloud_native.py and s3_openeo.py)
# ---------------------------------------------------------------------------
def compute_gcc_s2(s2_dir: Path, output_dir: Path) -> None:
"""Compute GCC from S2 REFL files and write ``*_GCC.tif`` to ``output_dir``.
Reads every ``*_REFL.tif`` (band order B02/B03/B04) and writes a co-located
single-band GCC file. Cloud-masked pixels (zero in all bands) remain zero.
"""
output_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(s2_dir.glob("*_REFL.tif")):
out_path = output_dir / src_path.name.replace("_REFL.tif", "_GCC.tif")
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
gcc = g / (total + 1e-10)
gcc[total == 0] = 0
profile.update(count=1)
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
def compute_gcc_s3(s3_dir: Path, output_dir: Path) -> None:
"""Compute GCC from S3 composite files and write single-band GeoTIFFs.
Reads every ``composite_*.tif`` (band order Oa04/Oa06/Oa08/Oa17) and writes
a single-band GCC file. NaN pixels in the input remain NaN.
"""
output_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(s3_dir.glob("composite_*.tif")):
out_path = output_dir / src_path.name
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
gcc = g / (total + 1e-10)
gcc[np.isnan(total)] = np.nan
profile.update(count=1, dtype="float32")
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
def compute_gcc_from_refl(refl_dir: Path, gcc_dir: Path) -> None:
"""Derive GCC from ``REFL_YYYYMMDD.tif`` files (BtI fusion output).
Reads every ``REFL_*.tif`` and writes a co-located single-band
``GCC_YYYYMMDD.tif``. Zero pixels remain zero.
"""
gcc_dir.mkdir(parents=True, exist_ok=True)
for src_path in sorted(refl_dir.glob("REFL_*.tif")):
out_path = gcc_dir / src_path.name.replace("REFL_", "GCC_")
if out_path.is_file():
continue
with rasterio.open(src_path) as src:
b, g, r = src.read(1), src.read(2), src.read(3)
profile = src.profile
total = b + g + r
gcc = g / (total + 1e-10)
gcc[total == 0] = 0
profile.update(count=1)
with rasterio.open(out_path, "w", **profile) as dst:
dst.write(gcc[np.newaxis].astype("float32"))
# ---------------------------------------------------------------------------
# Date-range detection
# ---------------------------------------------------------------------------
def _refl_date_range(s2_dir: Path) -> tuple[datetime, datetime] | None:
"""Return (start, end) datetime from REFL filenames in ``s2_dir``.
Filenames are expected to follow the S2 product naming convention, where
the acquisition date ``YYYYMMDD`` appears at position index 2 when the
stem is split by ``_``, e.g.
``S2A_MSIL2A_20230911T114111_N0509_R025_T29PKT_20230911T153131_REFL.tif``.
"""
dates: list[datetime] = []
for p in s2_dir.glob("*_REFL.tif"):
parts = p.stem.split("_")
if len(parts) >= 3:
try:
dates.append(datetime.strptime(parts[2][:8], "%Y%m%d"))
except ValueError:
pass
if not dates:
return None
return min(dates), max(dates)
# ---------------------------------------------------------------------------
# Per-site fusion
# ---------------------------------------------------------------------------
def fuse_site(sitename: str, year: int) -> dict[str, Any]:
"""Run GCC computation and EFAST BtI + ItB fusion for one prepared site."""
efast = _import_efast()
s2_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s2"
s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3"
gcc_s3_dir = (
DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "gcc_s3"
)
base = DATA_DIR / "fusion" / str(year) / sitename
if not s2_dir.is_dir() or not any(s2_dir.glob("*_REFL.tif")):
raise FileNotFoundError(f"No REFL files in {s2_dir}")
if not s3_dir.is_dir() or not any(s3_dir.glob("composite_*.tif")):
raise FileNotFoundError(f"No composite files in {s3_dir}")
print(f"[{sitename}] Computing S2 GCC (in-place)...")
compute_gcc_s2(s2_dir, s2_dir)
print(f"[{sitename}] Computing S3 GCC...")
compute_gcc_s3(s3_dir, gcc_s3_dir)
date_range = _refl_date_range(s2_dir)
if date_range is None:
raise ValueError(f"Could not detect date range from REFL filenames in {s2_dir}")
start, end = date_range
print(f"[{sitename}] Date range: {start.date()}{end.date()}")
fusion_dates = list(
rrule.rrule(
rrule.DAILY,
dtstart=start + timedelta(MOSAIC_STEP),
until=end - timedelta(MOSAIC_STEP),
interval=MOSAIC_STEP,
)
)
_fusion_kwargs = dict(
ratio=RESOLUTION_RATIO,
max_days=MAX_DAYS,
minimum_acquisition_importance=MINIMUM_ACQUISITION_IMPORTANCE,
)
# --- ItB: GCC first, then fuse GCC ---
itb_s2 = base / "itb" / "s2"
itb_s3 = base / "itb" / "s3"
itb_fusion = base / "itb" / "fusion"
itb_s2.mkdir(parents=True, exist_ok=True)
itb_s3.mkdir(parents=True, exist_ok=True)
itb_fusion.mkdir(parents=True, exist_ok=True)
for p in sorted(s2_dir.glob("*_GCC.tif")):
dst = itb_s2 / f"GCC_{p.stem.split('_')[2][:8]}.tif"
if not dst.exists():
shutil.copy2(p, dst)
for p in sorted(gcc_s3_dir.glob("composite_*.tif")):
dst = itb_s3 / f"GCC_{p.stem.split('_')[1]}.tif"
if not dst.exists():
shutil.copy2(p, dst)
print(f"[{sitename}] ItB: fusing GCC over {len(fusion_dates)} dates...")
for date in fusion_dates:
efast.fusion(
date, gcc_s3_dir, s2_dir, itb_fusion, product="GCC", **_fusion_kwargs
)
# --- BtI: fuse reflectance (3-band, matching S2 B02/B03/B04), then derive GCC ---
# S3 composites have 4 bands; strip band 4 (Oa17/NIR) so shapes match S2 REFL.
s3_rgb_dir = (
DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3_rgb"
)
s3_rgb_dir.mkdir(parents=True, exist_ok=True)
for p in sorted(s3_dir.glob("composite_*.tif")):
out = s3_rgb_dir / p.name
if not out.exists():
with rasterio.open(p) as src:
data = src.read([1, 2, 3])
profile = src.profile.copy()
profile.update(count=3)
with rasterio.open(out, "w", **profile) as dst:
dst.write(data)
bti_fusion = base / "bti" / "fusion"
bti_gcc = base / "bti" / "gcc"
bti_fusion.mkdir(parents=True, exist_ok=True)
print(f"[{sitename}] BtI: fusing REFL over {len(fusion_dates)} dates...")
for date in fusion_dates:
efast.fusion(
date, s3_rgb_dir, s2_dir, bti_fusion, product="REFL", **_fusion_kwargs
)
print(f"[{sitename}] BtI: deriving GCC from fused REFL...")
compute_gcc_from_refl(bti_fusion, bti_gcc)
return {
"sitename": sitename,
"evaluation_year": year,
"start": start.date().isoformat(),
"end": end.date().isoformat(),
"fusion_dates": len(fusion_dates),
"itb_fusion_files": len(list(itb_fusion.glob("*.tif"))),
"bti_fusion_files": len(list(bti_fusion.glob("*.tif"))),
"bti_gcc_files": len(list(bti_gcc.glob("*.tif"))),
}
# ---------------------------------------------------------------------------
# Site discovery
# ---------------------------------------------------------------------------
def _discover_sites(year: int) -> list[str]:
"""Return sitenames that have prepared S2 REFL files under sentinel_data."""
base = DATA_DIR / "sentinel_data" / str(year)
if not base.is_dir():
return []
return sorted(
d.name
for d in base.iterdir()
if d.is_dir() and any((d / "prepared" / "s2").glob("*_REFL.tif"))
)
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to fuse (default: all prepared sites)",
)
parser.add_argument(
"--skip-blended",
action="store_true",
help="Skip sites whose fusion directory already exists under data/fusion/{year}/",
)
args = parser.parse_args(argv)
year = args.evaluation_year
if args.site:
sites = [args.site]
else:
sites = _discover_sites(year)
if not sites:
print(f"[Fusion] No prepared sites found under data/sentinel_data/{year}/")
return 1
print(f"[Fusion] Processing {len(sites)} site(s)")
for i, sitename in enumerate(sites, 1):
fusion_dir = DATA_DIR / "fusion" / str(year) / sitename
if args.skip_blended and fusion_dir.exists():
print(
f"[Fusion] ({i}/{len(sites)}) {sitename} — skipping (fusion directory exists)"
)
continue
print(f"[Fusion] ({i}/{len(sites)}) {sitename}")
summary = fuse_site(sitename, year)
print(
f"[Fusion] {sitename} done — "
f"{summary['fusion_dates']} dates, "
f"itb={summary['itb_fusion_files']} bti={summary['bti_fusion_files']} "
f"bti_gcc={summary['bti_gcc_files']}"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())