efast-phenocam-validation/3-sentinel-data.py
Felix Delattre 79ee099809 foo
2026-06-11 00:38:27 +02:00

883 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Step 3: Download S2 and S3 rasters and prepare EFAST inputs.
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
- ``phenocam_screening/{year}.json`` — step-2 PASS sites (coordinates included)
Outputs (``data/``):
- ``sentinel_data/{year}/{sitename}/raw/s3/*.tif`` — S3 SYN L2 per-date GeoTIFFs
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — S2 REFL + DIST_CLOUD GeoTIFFs
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — S3 composite GeoTIFFs
- ``sentinel_data/{year}/{sitename}/data.json`` — run summary
Requires ``CDSE_USER`` / ``CDSE_PASSWORD`` in ``../.env`` (workspace root; ``uv sync`` installs efast).
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` (optional; default: all step-2 PASS sites)
Prior step: :mod:`2-phenocam-screening`.
Next step: :mod:`4-fusion`.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Any
import netCDF4
import numpy as np
import openeo
import rasterio
import requests
from dotenv import load_dotenv
from pystac_client import Client
from rasterio import shutil as rio_shutil
from rasterio.enums import Resampling
from rasterio.errors import WindowError
from rasterio.transform import from_bounds
from rasterio.vrt import WarpedVRT
from rasterio.warp import transform_geom
from rasterio.windows import Window
from rasterio.windows import from_bounds as window_from_bounds
from rasterio.windows import transform as window_transform
from shapely import wkt as shapely_wkt
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Public constants — edit here to change pipeline behaviour
# ---------------------------------------------------------------------------
S2_BANDS = ["B02", "B03", "B04"]
S3_BANDS = [
"Syn_Oa04_reflectance",
"Syn_Oa06_reflectance",
"Syn_Oa08_reflectance",
"Syn_Oa17_reflectance",
]
S3_BAND_NAMES = ["SDR_Oa04", "SDR_Oa06", "SDR_Oa08", "SDR_Oa17"]
RESOLUTION_RATIO = 30
S3_MOSAIC_DAYS = 100
S3_COMPOSITE_STEP = 2
S3_COMPOSITE_SIGMA_DOY = 10
S3_COMPOSITE_D = 20
S3_SMOOTHING_STD = 1
S3_REFLECTANCE_SCALE = 10_000 # OpenEO SYN L2 SDR → 01 (EFAST expects < 5)
# ---------------------------------------------------------------------------
# Internal S2 constants
# ---------------------------------------------------------------------------
EARTH_SEARCH_URL = "https://earth-search.aws.element84.com/v1"
_BAND_ASSETS: dict[str, str] = {
"B02": "blue",
"B03": "green",
"B04": "red",
"B05": "rededge1",
"B06": "rededge2",
"B07": "rededge3",
"B08": "nir",
"B8A": "nir08",
"B11": "swir16",
"B12": "swir22",
}
_SCL_ASSET = "scl"
_MIN_BBOX_HALF_DEG = 0.008
_GDAL_COG_ENV = {
# HTTP/1.1 avoids HTTP/2 multiplexing connection-reset cascades on S3.
"GDAL_HTTP_VERSION": "1.1",
"GDAL_HTTP_MERGE_CONSECUTIVE_RANGES": "YES",
"GDAL_HTTP_TCP_KEEPALIVE": "YES",
"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR",
"CPL_VSIL_CURL_CACHE_SIZE": "200000000",
# Built-in GDAL retries for 429/502/503/504 and transient resets.
"GDAL_HTTP_MAX_RETRY": "3",
"GDAL_HTTP_RETRY_DELAY": "0.5",
"AWS_NO_SIGN_REQUEST": "YES",
}
# ---------------------------------------------------------------------------
# Internal S3 constants
# ---------------------------------------------------------------------------
CDSE_TOKEN_URL = (
"https://identity.dataspace.copernicus.eu/auth/realms/CDSE/"
"protocol/openid-connect/token"
)
OPENEO_URL = "openeo.dataspace.copernicus.eu"
S3_COLLECTION = "SENTINEL3_SYN_L2_SYN"
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent
# ---------------------------------------------------------------------------
# Credentials
# ---------------------------------------------------------------------------
def _cdse_credentials() -> dict[str, str | None]:
load_dotenv(WORKSPACE_ROOT / ".env")
return {
"username": os.getenv("CDSE_USER"),
"password": os.getenv("CDSE_PASSWORD"),
}
# ---------------------------------------------------------------------------
# Screening manifest helpers
# ---------------------------------------------------------------------------
def _load_screening_pass_sites(year: int) -> list[dict[str, Any]]:
"""Return list of PASS-site dicts from step-2 screening JSON.
Each entry has ``sitename``, ``lat``, ``lon`` keys.
"""
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not path.is_file():
raise FileNotFoundError(f"Step-2 screening manifest not found: {path}")
payload = json.loads(path.read_text(encoding="utf-8"))
sites = []
for row in payload.get("sites", []):
calc = row.get("calculations", {})
if calc.get("status") != "PASS":
continue
camera = row.get("response", {}).get("camera", {})
name = camera.get("Sitename")
lat = camera.get("Lat")
lon = camera.get("Lon")
if name and lat is not None and lon is not None:
sites.append({"sitename": str(name), "lat": float(lat), "lon": float(lon)})
return sites
# ---------------------------------------------------------------------------
# S2: geometry helpers (from s2_cloud_native.py)
# ---------------------------------------------------------------------------
def wkt_to_bbox(geometry_wkt: str) -> list[float]:
"""Convert a WKT geometry to a ``[west, south, east, north]`` bbox."""
geom = shapely_wkt.loads(geometry_wkt)
minx, miny, maxx, maxy = geom.bounds
if minx == maxx and miny == maxy:
minx -= _MIN_BBOX_HALF_DEG
maxx += _MIN_BBOX_HALF_DEG
miny -= _MIN_BBOX_HALF_DEG
maxy += _MIN_BBOX_HALF_DEG
return [minx, miny, maxx, maxy]
def _boa_offset(item: Any) -> int:
"""Return the BOA additive offset for a STAC item.
Processing baseline >= 04.00 applies a -1000 offset; earlier baselines use 0.
"""
if item.properties.get("earthsearch:boa_offset_applied"):
return 0
baseline_str = str(
item.properties.get("processing:baseline")
or item.properties.get("s2:processing_baseline")
or "0"
)
try:
baseline = float(baseline_str)
except ValueError:
baseline = 0.0
return -1000 if baseline >= 4.0 else 0
def _window_for_bbox(
src: rasterio.io.DatasetReader,
bbox_4326: list[float],
) -> Window | None:
"""Return the rasterio Window for a EPSG:4326 bbox clipped to src bounds."""
bbox_geom = {
"type": "Polygon",
"coordinates": [
[
[bbox_4326[0], bbox_4326[1]],
[bbox_4326[2], bbox_4326[1]],
[bbox_4326[2], bbox_4326[3]],
[bbox_4326[0], bbox_4326[3]],
[bbox_4326[0], bbox_4326[1]],
]
],
}
src_geom = transform_geom("EPSG:4326", src.crs.to_wkt(), bbox_geom)
xs = [c[0] for c in src_geom["coordinates"][0][:4]]
ys = [c[1] for c in src_geom["coordinates"][0][:4]]
win = window_from_bounds(min(xs), min(ys), max(xs), max(ys), src.transform)
try:
return win.intersection(Window(0, 0, src.width, src.height))
except WindowError:
return None
def _read_window(
href: str,
bbox_4326: list[float],
out_shape: tuple[int, int] | None = None,
resampling: Resampling = Resampling.bilinear,
) -> tuple[np.ndarray, dict[str, Any]] | None:
"""Range-read a single-band array for the bbox window from a COG URL."""
with rasterio.open(href) as src:
win = _window_for_bbox(src, bbox_4326)
if win is None:
return None
data = src.read(1, window=win, out_shape=out_shape, resampling=resampling)
profile: dict[str, Any] = {
"crs": src.crs,
"transform": window_transform(win, src.transform),
"height": data.shape[0],
"width": data.shape[1],
"dtype": src.dtypes[0],
}
return data, profile
def _read_bands(
item: Any,
bbox: list[float],
bands: list[str],
) -> tuple[list[np.ndarray], dict[str, Any]] | None:
"""Range-read all requested bands for one STAC item."""
band_arrays: list[np.ndarray] = []
ref_profile: dict[str, Any] | None = None
for band_name in bands:
asset_key = _BAND_ASSETS.get(band_name)
if asset_key is None or asset_key not in item.assets:
return None
ref_shape = (
(ref_profile["height"], ref_profile["width"]) if ref_profile else None
)
result = _read_window(item.assets[asset_key].href, bbox, out_shape=ref_shape)
if result is None:
return None
data, profile = result
if ref_profile is None:
ref_profile = profile
band_arrays.append(data.astype("float32"))
return (band_arrays, ref_profile) if ref_profile is not None else None
def _cloud_mask(item: Any, bbox: list[float], shape: tuple[int, int]) -> np.ndarray:
"""Return a boolean cloud/shadow mask from the item's SCL band.
Masks SCL classes 0 (no data), 3 (cloud shadow), and >7 (clouds, cirrus, snow).
"""
scl = item.assets.get(_SCL_ASSET)
result = (
_read_window(scl.href, bbox, out_shape=shape, resampling=Resampling.nearest)
if scl
else None
)
if result is None:
return np.zeros(shape, dtype=bool)
scl_data, _ = result
return (scl_data == 0) | (scl_data == 3) | (scl_data > 7)
def _pad_to_multiple(arr: np.ndarray, ratio: int) -> np.ndarray:
"""Zero-pad (bands, H, W) so H and W are multiples of ``ratio``."""
pad_h = (ratio - arr.shape[1] % ratio) % ratio
pad_w = (ratio - arr.shape[2] % ratio) % ratio
if pad_h or pad_w:
arr = np.pad(arr, ((0, 0), (0, pad_h), (0, pad_w)), constant_values=0)
return arr
# ---------------------------------------------------------------------------
# S2: STAC search + download (from s2_cloud_native.py)
# ---------------------------------------------------------------------------
def stac_search_s2(
bbox: list[float],
start_date: datetime,
end_date: datetime,
) -> list[Any]:
"""Search Earth Search for S2 L2A items intersecting a bbox."""
client = Client.open(EARTH_SEARCH_URL)
search = client.search(
collections=["sentinel-2-l2a"],
bbox=bbox,
datetime=(
f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/"
f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}"
),
max_items=10_000,
)
return list({item.id: item for item in search.items()}.values())
def _process_item(
item: Any,
bbox: list[float],
bands: list[str],
output_dir: Path,
ratio: int,
) -> str | None:
"""Range-read one S2 item and write a masked REFL GeoTIFF.
Returns a skip-message string when the item cannot be processed, else None.
"""
out_path = output_dir / f"{item.id}_REFL.tif"
if out_path.is_file():
return None
bands_result = _read_bands(item, bbox, bands)
if bands_result is None:
return f"[S2] Skipping {item.id}: missing asset or no bbox overlap"
band_arrays, ref_profile = bands_result
mask = _cloud_mask(item, bbox, (ref_profile["height"], ref_profile["width"]))
stacked = (np.stack(band_arrays) + _boa_offset(item)) / 10_000.0
np.clip(stacked, 0, None, out=stacked)
stacked[:, mask] = 0.0
stacked = _pad_to_multiple(stacked, ratio)
out_profile = {
"driver": "GTiff",
"count": len(bands),
"dtype": "float32",
"nodata": 0,
"crs": ref_profile["crs"],
"transform": ref_profile["transform"],
"height": stacked.shape[1],
"width": stacked.shape[2],
"compress": "lzw",
}
with rasterio.open(out_path, "w", **out_profile) as dst:
dst.write(stacked)
for i, band_name in enumerate(bands, 1):
dst.set_band_description(i, band_name)
return None
def download_s2_window(
items: list[Any],
bbox: list[float],
output_dir: Path,
bands: list[str],
ratio: int = RESOLUTION_RATIO,
max_workers: int = 12,
) -> None:
"""Range-read S2 L2A COG windows and write masked REFL GeoTIFFs.
Writes ``{item.id}_REFL.tif`` directly — no intermediate raw download.
Cloud/shadow pixels (SCL 0, 3, >7) are zeroed. BOA offset is inferred from
``processing:baseline``. Output is zero-padded to multiples of ``ratio``.
Items are fetched in parallel using ``max_workers`` threads.
"""
output_dir.mkdir(parents=True, exist_ok=True)
with rasterio.Env(**_GDAL_COG_ENV):
with ThreadPoolExecutor(max_workers=max_workers) as pool:
futures = {
pool.submit(_process_item, item, bbox, bands, output_dir, ratio): item.id
for item in items
}
with tqdm(total=len(futures), unit="granule", desc="S2 COG window read") as pbar:
for fut in as_completed(futures):
msg = fut.result()
if msg:
tqdm.write(msg)
pbar.update(1)
# ---------------------------------------------------------------------------
# S3: download (from s3_openeo.py)
# ---------------------------------------------------------------------------
def _utm_epsg(bbox: list[float]) -> int:
"""Return the UTM EPSG code for the centre of a ``[W, S, E, N]`` bbox."""
lon = (bbox[0] + bbox[2]) / 2
lat = (bbox[1] + bbox[3]) / 2
zone = int((lon + 180) / 6) + 1
return 32600 + zone if lat >= 0 else 32700 + zone
def _cdse_token(username: str, password: str) -> str:
"""Obtain a CDSE bearer token via password grant."""
resp = requests.post(
CDSE_TOKEN_URL,
data={
"grant_type": "password",
"username": username,
"password": password,
"client_id": "cdse-public",
},
timeout=30,
)
resp.raise_for_status()
return resp.json()["access_token"]
def _netcdf_to_geotiffs(nc_path: Path, output_dir: Path, epsg: int) -> int:
"""Split an OpenEO NetCDF into per-date GeoTIFFs.
Output filenames match the ``S3*__YYYYMMDDTHHMMSS.tif`` pattern that
``s3_processing.produce_median_composite`` expects.
Handles half-pixel cell-centre coordinates, ascending y-axis (flip_y),
and fills NetCDF masked values with NaN.
"""
written = 0
with netCDF4.Dataset(str(nc_path), "r") as nc:
times = netCDF4.num2date(nc.variables["t"][:], nc.variables["t"].units)
x_coords = np.asarray(nc.variables["x"][:], dtype=float)
y_coords = np.asarray(nc.variables["y"][:], dtype=float)
half_x = abs(x_coords[1] - x_coords[0]) / 2 if len(x_coords) > 1 else 0.0
half_y = abs(y_coords[1] - y_coords[0]) / 2 if len(y_coords) > 1 else 0.0
transform = from_bounds(
x_coords.min() - half_x,
y_coords.min() - half_y,
x_coords.max() + half_x,
y_coords.max() + half_y,
len(x_coords),
len(y_coords),
)
flip_y = len(y_coords) > 1 and y_coords[0] < y_coords[-1]
date_counts: dict[str, int] = {}
for t_idx, time_val in enumerate(times):
date_str = time_val.strftime("%Y%m%d")
n = date_counts.get(date_str, 0)
date_counts[date_str] = n + 1
raw = np.stack(
[nc.variables[b][t_idx, :, :] for b in S3_BANDS], axis=0
)
stacked = (
np.ma.filled(raw, fill_value=np.nan).astype("float32")
/ S3_REFLECTANCE_SCALE
)
if flip_y:
stacked = stacked[:, ::-1, :]
filename = f"S3_{date_str}_{n}__{date_str}T120000.tif"
with rasterio.open(
output_dir / filename,
"w",
driver="GTiff",
height=len(y_coords),
width=len(x_coords),
count=len(S3_BANDS),
dtype="float32",
nodata=float("nan"),
crs=f"EPSG:{epsg}",
transform=transform,
compress="lzw",
) as dst:
dst.write(stacked)
for i, band_name in enumerate(S3_BAND_NAMES, 1):
dst.set_band_description(i, band_name)
written += 1
return written
_S3_DOWNLOAD_RETRIES = 4
_S3_DOWNLOAD_BACKOFF = 30 # seconds; doubled on each retry
def _download_with_retry(datacube: Any, nc_path: Path) -> None:
"""Download an OpenEO datacube to *nc_path*, retrying on transient errors.
Retries up to ``_S3_DOWNLOAD_RETRIES`` times with exponential backoff
starting at ``_S3_DOWNLOAD_BACKOFF`` seconds. Re-authenticates on each
attempt so an expired token never blocks a retry.
"""
delay = _S3_DOWNLOAD_BACKOFF
last_exc: Exception | None = None
for attempt in range(1, _S3_DOWNLOAD_RETRIES + 1):
try:
if nc_path.exists():
nc_path.unlink()
datacube.download(str(nc_path), format="NetCDF")
return
except Exception as exc:
last_exc = exc
if attempt < _S3_DOWNLOAD_RETRIES:
print(
f"[S3-OEO] Download attempt {attempt} failed ({exc}); "
f"retrying in {delay}s..."
)
time.sleep(delay)
delay *= 2
else:
print(f"[S3-OEO] All {_S3_DOWNLOAD_RETRIES} download attempts failed")
raise RuntimeError(f"S3 download failed after {_S3_DOWNLOAD_RETRIES} attempts") from last_exc
def download_s3_openeo(
start_date: datetime,
end_date: datetime,
aoi_geometry: str,
output_dir: Path,
credentials: dict[str, str | None],
) -> None:
"""Download S3 SYN L2 SDR for an AOI via CDSE OpenEO, server-side clipped.
Writes per-date ``S3_{YYYYMMDD}_{n}__{YYYYMMDD}T120000.tif`` files to
``output_dir``, ready for ``s3_processing.produce_median_composite``.
Skips if any ``S3*.tif`` files already exist.
"""
output_dir.mkdir(parents=True, exist_ok=True)
if any(output_dir.glob("S3*.tif")):
print("[S3-OEO] Skipping — output_dir already contains S3 GeoTIFFs")
return
bbox = wkt_to_bbox(aoi_geometry)
epsg = _utm_epsg(bbox)
spatial_extent = {
"west": bbox[0],
"east": bbox[2],
"south": bbox[1],
"north": bbox[3],
}
print("[S3-OEO] Authenticating with CDSE...")
token = _cdse_token(credentials["username"], credentials["password"]) # type: ignore[arg-type]
conn = openeo.connect(OPENEO_URL)
conn.authenticate_oidc_access_token(token)
start_str = start_date.strftime("%Y-%m-%d")
end_str = end_date.strftime("%Y-%m-%d")
print(f"[S3-OEO] Loading {S3_COLLECTION} ({start_str}{end_str})...")
datacube = conn.load_collection(
S3_COLLECTION,
spatial_extent=spatial_extent,
temporal_extent=[start_str, end_str],
bands=S3_BANDS,
).resample_spatial(projection=epsg)
nc_path = output_dir / "_s3_syn_l2.nc"
print(f"[S3-OEO] Downloading NetCDF to {nc_path}...")
t0 = time.time()
_download_with_retry(datacube, nc_path)
print(f"[S3-OEO] Download completed in {time.time() - t0:.1f}s")
print("[S3-OEO] Splitting into per-date GeoTIFFs...")
written = _netcdf_to_geotiffs(nc_path, output_dir, epsg)
nc_path.unlink(missing_ok=True)
print(f"[S3-OEO] {written} GeoTIFFs written to {output_dir}")
# ---------------------------------------------------------------------------
# S2: distance_to_clouds helper
# ---------------------------------------------------------------------------
def _import_distance_to_clouds():
try:
from efast.s2_processing import distance_to_clouds
return distance_to_clouds
except ImportError as exc:
raise ImportError(
"efast not found. Install with: uv sync"
) from exc
def _rescale_dist_cloud(s2_dir: Path) -> None:
"""Ensure DIST_CLOUD values are in pixel units (not normalised to [0,1])."""
for dc_path in s2_dir.glob("*DIST_CLOUD.tif"):
with rasterio.open(dc_path) as src:
d = src.read(1)
if float(np.nanmax(d)) <= 1:
with rasterio.open(dc_path, "r+") as dst:
dst.write(np.where(d > 0, 2.0, d).astype(np.float32), 1)
# ---------------------------------------------------------------------------
# S3: compositing + reprojection helpers (from 4-sentinel-data.py)
# ---------------------------------------------------------------------------
def _import_s3_processing():
try:
from efast import s3_processing
return s3_processing
except ImportError as exc:
raise ImportError(
"efast not found. Install with: uv sync"
) from exc
def _reproject_s3_composites_to_s2_grid(
composite_dir: Path,
s2_refl_path: Path,
s3_out_dir: Path,
*,
resolution_ratio: int = RESOLUTION_RATIO,
) -> None:
"""Reproject S3 composites to the S2 spatial grid at LR resolution."""
s3_out_dir.mkdir(parents=True, exist_ok=True)
with rasterio.open(s2_refl_path) as s2_ref:
target_bounds = s2_ref.bounds
target_crs = s2_ref.crs
width = s2_ref.width // resolution_ratio
height = s2_ref.height // resolution_ratio
s3_transform = rasterio.transform.from_bounds(
target_bounds.left,
target_bounds.bottom,
target_bounds.right,
target_bounds.top,
width,
height,
)
for sen3_path in sorted(composite_dir.glob("composite_*.tif")):
date_part = sen3_path.stem.split("_", 1)[1].replace("-", "")
outfile = s3_out_dir / f"composite_{date_part}.tif"
vrt_options = {
"transform": s3_transform,
"height": height,
"width": width,
"crs": target_crs,
"resampling": Resampling.cubic,
}
with rasterio.open(sen3_path) as s3_src:
with WarpedVRT(s3_src, **vrt_options) as vrt:
profile = vrt.profile.copy()
profile.update({"dtype": "float32", "nodata": 0, "driver": "GTiff"})
rio_shutil.copy(vrt, outfile, **profile)
def _s3_reflectance_scale(raw_s3_dir: Path) -> float:
"""Return multiplier that maps raw SYN L2 SDR values to 01 reflectance."""
for path in raw_s3_dir.glob("S3*.tif"):
with rasterio.open(path) as src:
data = src.read()
if not np.any(np.isfinite(data)):
continue
mx = float(np.nanmax(data))
if np.isfinite(mx) and mx > 5:
return 1.0 / S3_REFLECTANCE_SCALE
return 1.0
def _stage_s3_for_efast(raw_s3_dir: Path, staging_dir: Path) -> int:
"""Copy ``S3_*.tif`` inputs, scaling reflectance when still in DN form."""
scale = _s3_reflectance_scale(raw_s3_dir)
if staging_dir.exists():
shutil.rmtree(staging_dir)
staging_dir.mkdir(parents=True)
count = 0
for src_path in sorted(raw_s3_dir.glob("S3*.tif")):
dst_path = staging_dir / src_path.name
with rasterio.open(src_path) as src:
data = src.read().astype("float32") * scale
profile = src.profile.copy()
profile.update(dtype="float32")
descriptions = src.descriptions
with rasterio.open(dst_path, "w", **profile) as dst:
dst.write(data)
for i, desc in enumerate(descriptions, 1):
if desc:
dst.set_band_description(i, desc)
count += 1
if scale != 1.0:
print(f"[S3-PREP] Scaled raw SDR by {scale:g} for EFAST compositing")
return count
def _prepare_s3(
raw_s3_dir: Path,
s2_refl_path: Path,
s3_out_dir: Path,
*,
work_dir: Path | None = None,
) -> None:
"""Run EFAST S3 compositing pipeline and reproject to S2 grid."""
s3 = _import_s3_processing()
base = work_dir or (s3_out_dir / "_efast_work")
staging = base / "scaled"
composites = base / "composites"
blurred = base / "blurred"
calibrated = base / "calibrated"
for directory in (staging, composites, blurred, calibrated):
if directory.exists():
shutil.rmtree(directory)
directory.mkdir(parents=True, exist_ok=True)
staged = _stage_s3_for_efast(raw_s3_dir, staging)
if staged == 0:
raise ValueError(f"No S3*.tif files found in {raw_s3_dir}")
print(
f"[S3-PREP] produce_median_composite: mosaic_days={S3_MOSAIC_DAYS}, "
f"step={S3_COMPOSITE_STEP}, sigma_doy={S3_COMPOSITE_SIGMA_DOY}, "
f"D={S3_COMPOSITE_D}"
)
s3.produce_median_composite(
staging,
composites,
step=S3_COMPOSITE_STEP,
mosaic_days=S3_MOSAIC_DAYS,
s3_bands=[1, 2, 3, 4],
D=S3_COMPOSITE_D,
sigma_doy=S3_COMPOSITE_SIGMA_DOY,
)
s3.smoothing(
composites,
blurred,
product="composite",
std=S3_SMOOTHING_STD,
preserve_nan=False,
)
s3.reformat_s3(blurred, calibrated, product="composite", scaling_factor=1)
for old in s3_out_dir.glob("composite_*.tif"):
old.unlink()
_reproject_s3_composites_to_s2_grid(calibrated, s2_refl_path, s3_out_dir)
if work_dir is None and base.exists():
shutil.rmtree(base)
n_out = len(list(s3_out_dir.glob("composite_*.tif")))
print(f"[S3-PREP] Wrote {n_out} composites")
# ---------------------------------------------------------------------------
# Per-site pipeline
# ---------------------------------------------------------------------------
def process_site(
sitename: str,
lat: float,
lon: float,
year: int,
) -> dict[str, Any]:
"""Download S2 + S3 and run EFAST preparation for one site."""
site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename
s2_out = site_dir / "prepared" / "s2"
s3_raw = site_dir / "raw" / "s3"
s3_out = site_dir / "prepared" / "s3"
aoi_wkt = f"POINT ({lon} {lat})"
bbox = wkt_to_bbox(aoi_wkt)
creds = _cdse_credentials()
# S3 download
print(f"[{sitename}] Downloading S3...")
download_s3_openeo(
start_date=datetime(year, 1, 1),
end_date=datetime(year, 12, 31),
aoi_geometry=aoi_wkt,
output_dir=s3_raw,
credentials=creds,
)
# S2 download
print(f"[{sitename}] Searching S2 on Earth Search...")
items = stac_search_s2(bbox, datetime(year, 1, 1), datetime(year, 12, 31))
print(f"[{sitename}] {len(items)} S2 items found — downloading windows...")
download_s2_window(items, bbox, s2_out, S2_BANDS, RESOLUTION_RATIO)
# S2 distance-to-clouds
print(f"[{sitename}] Computing distance-to-clouds...")
distance_to_clouds = _import_distance_to_clouds()
distance_to_clouds(s2_out, ratio=RESOLUTION_RATIO)
_rescale_dist_cloud(s2_out)
# S3 compositing
s2_refl_path = next(iter(s2_out.glob("*_REFL.tif")), None)
if s2_refl_path is None:
raise ValueError(f"No REFL files in {s2_out} — S2 download may have failed")
s3_out.mkdir(parents=True, exist_ok=True)
print(f"[{sitename}] Running S3 compositing pipeline...")
_prepare_s3(s3_raw, s2_refl_path, s3_out)
summary = {
"sitename": sitename,
"evaluation_year": year,
"lat": lat,
"lon": lon,
"s2_refl_count": len(list(s2_out.glob("*_REFL.tif"))),
"s2_dist_cloud_count": len(list(s2_out.glob("*_DIST_CLOUD.tif"))),
"s3_raw_count": len(list(s3_raw.glob("S3*.tif"))),
"s3_composite_count": len(list(s3_out.glob("composite_*.tif"))),
}
site_dir.mkdir(parents=True, exist_ok=True)
(site_dir / "data.json").write_text(
json.dumps(summary, indent=2) + "\n", encoding="utf-8"
)
return summary
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to process (default: all step-2 PASS sites)",
)
parser.add_argument(
"--skip-downloaded",
action="store_true",
help="Skip sites whose directory already exists under data/sentinel_data/{year}/",
)
args = parser.parse_args(argv)
year = args.evaluation_year
pass_sites = _load_screening_pass_sites(year)
if not pass_sites:
print("[Sentinel-3] No PASS sites found in step-2 screening output")
return 1
if args.site:
pass_sites = [s for s in pass_sites if s["sitename"] == args.site]
if not pass_sites:
print(f"[Sentinel-3] Site '{args.site}' not found in step-2 PASS sites")
return 1
print(f"[Sentinel-3] Processing {len(pass_sites)} site(s)")
for i, site in enumerate(pass_sites, 1):
sitename = site["sitename"]
site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename
if args.skip_downloaded and site_dir.exists():
print(f"[Sentinel-3] ({i}/{len(pass_sites)}) {sitename} — skipping (directory exists)")
continue
print(f"[Sentinel-3] ({i}/{len(pass_sites)}) {sitename}")
summary = process_site(sitename, site["lat"], site["lon"], year)
print(
f"[Sentinel-3] {sitename} done — "
f"{summary['s2_refl_count']} REFL, "
f"{summary['s3_composite_count']} composites"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())