330 lines
12 KiB
Python
330 lines
12 KiB
Python
"""Step 4: Compute GCC and run EFAST BtI + ItB fusion for prepared sites.
|
|
|
|
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
|
|
|
|
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — ``*_REFL.tif`` + ``*_DIST_CLOUD.tif``
|
|
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — ``composite_*.tif`` (4-band)
|
|
|
|
Outputs (``data/``):
|
|
|
|
- ``sentinel_data/{year}/{sitename}/prepared/s2/*_GCC.tif`` — S2 GCC (in-place)
|
|
- ``sentinel_data/{year}/{sitename}/prepared/gcc_s3/*.tif`` — S3 GCC composites
|
|
- ``fusion/{year}/{sitename}/bti/fusion/REFL_*.tif`` — BtI fused 4-band reflectance
|
|
- ``fusion/{year}/{sitename}/bti/gcc/GCC_*.tif`` — GCC derived from BtI fusion
|
|
- ``fusion/{year}/{sitename}/itb/s2/GCC_*.tif`` — per-acquisition S2 GCC (simplified names)
|
|
- ``fusion/{year}/{sitename}/itb/s3/GCC_*.tif`` — per-composite S3 GCC (simplified names)
|
|
- ``fusion/{year}/{sitename}/itb/fusion/GCC_*.tif`` — ItB fused GCC
|
|
|
|
Requires ``uv sync`` (efast).
|
|
|
|
CLI:
|
|
|
|
- ``--evaluation-year`` (default 2025)
|
|
- ``--site`` (optional; default: all prepared sites under ``sentinel_data/{year}/``)
|
|
|
|
Prior step: :mod:`3-sentinel-data`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import shutil
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import rasterio
|
|
from dateutil import rrule
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public constants
|
|
# ---------------------------------------------------------------------------
|
|
|
|
RESOLUTION_RATIO = 30
|
|
MOSAIC_STEP = 2
|
|
MAX_DAYS = 100
|
|
MINIMUM_ACQUISITION_IMPORTANCE = 0
|
|
|
|
DATA_DIR = Path("data")
|
|
DEFAULT_YEAR = 2025
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# efast import helper
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _import_efast():
|
|
try:
|
|
import efast.efast as efast_module
|
|
|
|
return efast_module
|
|
except ImportError as exc:
|
|
raise ImportError(
|
|
"efast not found. Install with: uv sync"
|
|
) from exc
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GCC computation (from s2_cloud_native.py and s3_openeo.py)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def compute_gcc_s2(s2_dir: Path, output_dir: Path) -> None:
|
|
"""Compute GCC from S2 REFL files and write ``*_GCC.tif`` to ``output_dir``.
|
|
|
|
Reads every ``*_REFL.tif`` (band order B02/B03/B04) and writes a co-located
|
|
single-band GCC file. Cloud-masked pixels (zero in all bands) remain zero.
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
for src_path in sorted(s2_dir.glob("*_REFL.tif")):
|
|
out_path = output_dir / src_path.name.replace("_REFL.tif", "_GCC.tif")
|
|
if out_path.is_file():
|
|
continue
|
|
with rasterio.open(src_path) as src:
|
|
b, g, r = src.read(1), src.read(2), src.read(3)
|
|
profile = src.profile
|
|
total = b + g + r
|
|
gcc = g / (total + 1e-10)
|
|
gcc[total == 0] = 0
|
|
profile.update(count=1)
|
|
with rasterio.open(out_path, "w", **profile) as dst:
|
|
dst.write(gcc[np.newaxis].astype("float32"))
|
|
|
|
|
|
def compute_gcc_s3(s3_dir: Path, output_dir: Path) -> None:
|
|
"""Compute GCC from S3 composite files and write single-band GeoTIFFs.
|
|
|
|
Reads every ``composite_*.tif`` (band order Oa04/Oa06/Oa08/Oa17) and writes
|
|
a single-band GCC file. NaN pixels in the input remain NaN.
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
for src_path in sorted(s3_dir.glob("composite_*.tif")):
|
|
out_path = output_dir / src_path.name
|
|
if out_path.is_file():
|
|
continue
|
|
with rasterio.open(src_path) as src:
|
|
b, g, r = src.read(1), src.read(2), src.read(3)
|
|
profile = src.profile
|
|
total = b + g + r
|
|
gcc = g / (total + 1e-10)
|
|
gcc[np.isnan(total)] = np.nan
|
|
profile.update(count=1, dtype="float32")
|
|
with rasterio.open(out_path, "w", **profile) as dst:
|
|
dst.write(gcc[np.newaxis].astype("float32"))
|
|
|
|
|
|
def compute_gcc_from_refl(refl_dir: Path, gcc_dir: Path) -> None:
|
|
"""Derive GCC from ``REFL_YYYYMMDD.tif`` files (BtI fusion output).
|
|
|
|
Reads every ``REFL_*.tif`` and writes a co-located single-band
|
|
``GCC_YYYYMMDD.tif``. Zero pixels remain zero.
|
|
"""
|
|
gcc_dir.mkdir(parents=True, exist_ok=True)
|
|
for src_path in sorted(refl_dir.glob("REFL_*.tif")):
|
|
out_path = gcc_dir / src_path.name.replace("REFL_", "GCC_")
|
|
if out_path.is_file():
|
|
continue
|
|
with rasterio.open(src_path) as src:
|
|
b, g, r = src.read(1), src.read(2), src.read(3)
|
|
profile = src.profile
|
|
total = b + g + r
|
|
gcc = g / (total + 1e-10)
|
|
gcc[total == 0] = 0
|
|
profile.update(count=1)
|
|
with rasterio.open(out_path, "w", **profile) as dst:
|
|
dst.write(gcc[np.newaxis].astype("float32"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Date-range detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _refl_date_range(s2_dir: Path) -> tuple[datetime, datetime] | None:
|
|
"""Return (start, end) datetime from REFL filenames in ``s2_dir``.
|
|
|
|
Filenames are expected to follow the S2 product naming convention, where
|
|
the acquisition date ``YYYYMMDD`` appears at position index 2 when the
|
|
stem is split by ``_``, e.g.
|
|
``S2A_MSIL2A_20230911T114111_N0509_R025_T29PKT_20230911T153131_REFL.tif``.
|
|
"""
|
|
dates: list[datetime] = []
|
|
for p in s2_dir.glob("*_REFL.tif"):
|
|
parts = p.stem.split("_")
|
|
if len(parts) >= 3:
|
|
try:
|
|
dates.append(datetime.strptime(parts[2][:8], "%Y%m%d"))
|
|
except ValueError:
|
|
pass
|
|
if not dates:
|
|
return None
|
|
return min(dates), max(dates)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-site fusion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def fuse_site(sitename: str, year: int) -> dict[str, Any]:
|
|
"""Run GCC computation and EFAST BtI + ItB fusion for one prepared site."""
|
|
efast = _import_efast()
|
|
|
|
s2_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s2"
|
|
s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3"
|
|
gcc_s3_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "gcc_s3"
|
|
base = DATA_DIR / "fusion" / str(year) / sitename
|
|
|
|
if not s2_dir.is_dir() or not any(s2_dir.glob("*_REFL.tif")):
|
|
raise FileNotFoundError(f"No REFL files in {s2_dir}")
|
|
if not s3_dir.is_dir() or not any(s3_dir.glob("composite_*.tif")):
|
|
raise FileNotFoundError(f"No composite files in {s3_dir}")
|
|
|
|
print(f"[{sitename}] Computing S2 GCC (in-place)...")
|
|
compute_gcc_s2(s2_dir, s2_dir)
|
|
|
|
print(f"[{sitename}] Computing S3 GCC...")
|
|
compute_gcc_s3(s3_dir, gcc_s3_dir)
|
|
|
|
date_range = _refl_date_range(s2_dir)
|
|
if date_range is None:
|
|
raise ValueError(f"Could not detect date range from REFL filenames in {s2_dir}")
|
|
start, end = date_range
|
|
print(f"[{sitename}] Date range: {start.date()} → {end.date()}")
|
|
|
|
fusion_dates = list(
|
|
rrule.rrule(
|
|
rrule.DAILY,
|
|
dtstart=start + timedelta(MOSAIC_STEP),
|
|
until=end - timedelta(MOSAIC_STEP),
|
|
interval=MOSAIC_STEP,
|
|
)
|
|
)
|
|
|
|
_fusion_kwargs = dict(
|
|
ratio=RESOLUTION_RATIO,
|
|
max_days=MAX_DAYS,
|
|
minimum_acquisition_importance=MINIMUM_ACQUISITION_IMPORTANCE,
|
|
)
|
|
|
|
# --- ItB: GCC first, then fuse GCC ---
|
|
itb_s2 = base / "itb" / "s2"
|
|
itb_s3 = base / "itb" / "s3"
|
|
itb_fusion = base / "itb" / "fusion"
|
|
itb_s2.mkdir(parents=True, exist_ok=True)
|
|
itb_s3.mkdir(parents=True, exist_ok=True)
|
|
itb_fusion.mkdir(parents=True, exist_ok=True)
|
|
|
|
for p in sorted(s2_dir.glob("*_GCC.tif")):
|
|
dst = itb_s2 / f"GCC_{p.stem.split('_')[2][:8]}.tif"
|
|
if not dst.exists():
|
|
shutil.copy2(p, dst)
|
|
for p in sorted(gcc_s3_dir.glob("composite_*.tif")):
|
|
dst = itb_s3 / f"GCC_{p.stem.split('_')[1]}.tif"
|
|
if not dst.exists():
|
|
shutil.copy2(p, dst)
|
|
|
|
print(f"[{sitename}] ItB: fusing GCC over {len(fusion_dates)} dates...")
|
|
for date in fusion_dates:
|
|
efast.fusion(date, gcc_s3_dir, s2_dir, itb_fusion, product="GCC", **_fusion_kwargs)
|
|
|
|
# --- BtI: fuse reflectance (3-band, matching S2 B02/B03/B04), then derive GCC ---
|
|
# S3 composites have 4 bands; strip band 4 (Oa17/NIR) so shapes match S2 REFL.
|
|
s3_rgb_dir = DATA_DIR / "sentinel_data" / str(year) / sitename / "prepared" / "s3_rgb"
|
|
s3_rgb_dir.mkdir(parents=True, exist_ok=True)
|
|
for p in sorted(s3_dir.glob("composite_*.tif")):
|
|
out = s3_rgb_dir / p.name
|
|
if not out.exists():
|
|
with rasterio.open(p) as src:
|
|
data = src.read([1, 2, 3])
|
|
profile = src.profile.copy()
|
|
profile.update(count=3)
|
|
with rasterio.open(out, "w", **profile) as dst:
|
|
dst.write(data)
|
|
|
|
bti_fusion = base / "bti" / "fusion"
|
|
bti_gcc = base / "bti" / "gcc"
|
|
bti_fusion.mkdir(parents=True, exist_ok=True)
|
|
|
|
print(f"[{sitename}] BtI: fusing REFL over {len(fusion_dates)} dates...")
|
|
for date in fusion_dates:
|
|
efast.fusion(date, s3_rgb_dir, s2_dir, bti_fusion, product="REFL", **_fusion_kwargs)
|
|
|
|
print(f"[{sitename}] BtI: deriving GCC from fused REFL...")
|
|
compute_gcc_from_refl(bti_fusion, bti_gcc)
|
|
|
|
return {
|
|
"sitename": sitename,
|
|
"evaluation_year": year,
|
|
"start": start.date().isoformat(),
|
|
"end": end.date().isoformat(),
|
|
"fusion_dates": len(fusion_dates),
|
|
"itb_fusion_files": len(list(itb_fusion.glob("*.tif"))),
|
|
"bti_fusion_files": len(list(bti_fusion.glob("*.tif"))),
|
|
"bti_gcc_files": len(list(bti_gcc.glob("*.tif"))),
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Site discovery
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _discover_sites(year: int) -> list[str]:
|
|
"""Return sitenames that have prepared S2 REFL files under sentinel_data."""
|
|
base = DATA_DIR / "sentinel_data" / str(year)
|
|
if not base.is_dir():
|
|
return []
|
|
return sorted(
|
|
d.name
|
|
for d in base.iterdir()
|
|
if d.is_dir() and any((d / "prepared" / "s2").glob("*_REFL.tif"))
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
|
|
parser.add_argument(
|
|
"--site",
|
|
type=str,
|
|
default=None,
|
|
help="Single sitename to fuse (default: all prepared sites)",
|
|
)
|
|
args = parser.parse_args(argv)
|
|
year = args.evaluation_year
|
|
|
|
if args.site:
|
|
sites = [args.site]
|
|
else:
|
|
sites = _discover_sites(year)
|
|
if not sites:
|
|
print(f"[Fusion] No prepared sites found under data/sentinel_data/{year}/")
|
|
return 1
|
|
|
|
print(f"[Fusion] Processing {len(sites)} site(s)")
|
|
for i, sitename in enumerate(sites, 1):
|
|
print(f"[Fusion] ({i}/{len(sites)}) {sitename}")
|
|
try:
|
|
summary = fuse_site(sitename, year)
|
|
print(
|
|
f"[Fusion] {sitename} done — "
|
|
f"{summary['fusion_dates']} dates, "
|
|
f"itb={summary['itb_fusion_files']} bti={summary['bti_fusion_files']} "
|
|
f"bti_gcc={summary['bti_gcc_files']}"
|
|
)
|
|
except Exception as exc:
|
|
print(f"[Fusion] {sitename} FAILED: {exc}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|