839 lines
28 KiB
Python
839 lines
28 KiB
Python
"""Step 3: Download S2 and S3 rasters and prepare EFAST inputs.
|
||
|
||
Inputs (``data/``, ``{year}`` = ``--evaluation-year``):
|
||
|
||
- ``phenocam_screening/{year}.json`` — step-2 PASS sites (coordinates included)
|
||
|
||
Outputs (``data/``):
|
||
|
||
- ``sentinel_data/{year}/{sitename}/raw/s3/*.tif`` — S3 SYN L2 per-date GeoTIFFs
|
||
- ``sentinel_data/{year}/{sitename}/prepared/s2/`` — S2 REFL + DIST_CLOUD GeoTIFFs
|
||
- ``sentinel_data/{year}/{sitename}/prepared/s3/`` — S3 composite GeoTIFFs
|
||
- ``sentinel_data/{year}/{sitename}/data.json`` — run summary
|
||
|
||
Requires ``CDSE_USER`` / ``CDSE_PASSWORD`` in ``../.env`` (workspace root; ``uv sync`` installs efast).
|
||
|
||
CLI:
|
||
|
||
- ``--evaluation-year`` (default 2025)
|
||
- ``--site`` (optional; default: all step-2 PASS sites)
|
||
|
||
Prior step: :mod:`2-phenocam-screening`.
|
||
Next step: :mod:`4-fusion`.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import shutil
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Any
|
||
|
||
import netCDF4
|
||
import numpy as np
|
||
import openeo
|
||
import rasterio
|
||
import requests
|
||
from dotenv import load_dotenv
|
||
from pystac_client import Client
|
||
from rasterio import shutil as rio_shutil
|
||
from rasterio.enums import Resampling
|
||
from rasterio.errors import WindowError
|
||
from rasterio.transform import from_bounds
|
||
from rasterio.vrt import WarpedVRT
|
||
from rasterio.warp import transform_geom
|
||
from rasterio.windows import Window
|
||
from rasterio.windows import from_bounds as window_from_bounds
|
||
from rasterio.windows import transform as window_transform
|
||
from shapely import wkt as shapely_wkt
|
||
from tqdm import tqdm
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Public constants — edit here to change pipeline behaviour
|
||
# ---------------------------------------------------------------------------
|
||
|
||
S2_BANDS = ["B02", "B03", "B04"]
|
||
|
||
S3_BANDS = [
|
||
"Syn_Oa04_reflectance",
|
||
"Syn_Oa06_reflectance",
|
||
"Syn_Oa08_reflectance",
|
||
"Syn_Oa17_reflectance",
|
||
]
|
||
S3_BAND_NAMES = ["SDR_Oa04", "SDR_Oa06", "SDR_Oa08", "SDR_Oa17"]
|
||
|
||
RESOLUTION_RATIO = 30
|
||
S3_MOSAIC_DAYS = 100
|
||
S3_COMPOSITE_STEP = 2
|
||
S3_COMPOSITE_SIGMA_DOY = 10
|
||
S3_COMPOSITE_D = 20
|
||
S3_SMOOTHING_STD = 1
|
||
S3_REFLECTANCE_SCALE = 10_000 # OpenEO SYN L2 SDR → 0–1 (EFAST expects < 5)
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Internal S2 constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
EARTH_SEARCH_URL = "https://earth-search.aws.element84.com/v1"
|
||
|
||
_BAND_ASSETS: dict[str, str] = {
|
||
"B02": "blue",
|
||
"B03": "green",
|
||
"B04": "red",
|
||
"B05": "rededge1",
|
||
"B06": "rededge2",
|
||
"B07": "rededge3",
|
||
"B08": "nir",
|
||
"B8A": "nir08",
|
||
"B11": "swir16",
|
||
"B12": "swir22",
|
||
}
|
||
_SCL_ASSET = "scl"
|
||
_MIN_BBOX_HALF_DEG = 0.008
|
||
|
||
_GDAL_COG_ENV = {
|
||
"GDAL_HTTP_VERSION": "2",
|
||
"GDAL_HTTP_MERGE_CONSECUTIVE_RANGES": "YES",
|
||
"GDAL_HTTP_MULTIPLEX": "YES",
|
||
"GDAL_HTTP_TCP_KEEPALIVE": "YES",
|
||
"GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR",
|
||
"CPL_VSIL_CURL_CACHE_SIZE": "200000000",
|
||
"GDAL_MAX_CONNECTIONS": "100",
|
||
"AWS_NO_SIGN_REQUEST": "YES",
|
||
}
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Internal S3 constants
|
||
# ---------------------------------------------------------------------------
|
||
|
||
CDSE_TOKEN_URL = (
|
||
"https://identity.dataspace.copernicus.eu/auth/realms/CDSE/"
|
||
"protocol/openid-connect/token"
|
||
)
|
||
OPENEO_URL = "openeo.dataspace.copernicus.eu"
|
||
S3_COLLECTION = "SENTINEL3_SYN_L2_SYN"
|
||
|
||
DATA_DIR = Path("data")
|
||
DEFAULT_YEAR = 2025
|
||
WORKSPACE_ROOT = Path(__file__).resolve().parent.parent
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Credentials
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _cdse_credentials() -> dict[str, str | None]:
|
||
load_dotenv(WORKSPACE_ROOT / ".env")
|
||
return {
|
||
"username": os.getenv("CDSE_USER"),
|
||
"password": os.getenv("CDSE_PASSWORD"),
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Screening manifest helpers
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _load_screening_pass_sites(year: int) -> list[dict[str, Any]]:
|
||
"""Return list of PASS-site dicts from step-2 screening JSON.
|
||
|
||
Each entry has ``sitename``, ``lat``, ``lon`` keys.
|
||
"""
|
||
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
|
||
if not path.is_file():
|
||
raise FileNotFoundError(f"Step-2 screening manifest not found: {path}")
|
||
payload = json.loads(path.read_text(encoding="utf-8"))
|
||
sites = []
|
||
for row in payload.get("sites", []):
|
||
calc = row.get("calculations", {})
|
||
if calc.get("status") != "PASS":
|
||
continue
|
||
camera = row.get("response", {}).get("camera", {})
|
||
name = camera.get("Sitename")
|
||
lat = camera.get("Lat")
|
||
lon = camera.get("Lon")
|
||
if name and lat is not None and lon is not None:
|
||
sites.append({"sitename": str(name), "lat": float(lat), "lon": float(lon)})
|
||
return sites
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# S2: geometry helpers (from s2_cloud_native.py)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def wkt_to_bbox(geometry_wkt: str) -> list[float]:
|
||
"""Convert a WKT geometry to a ``[west, south, east, north]`` bbox."""
|
||
geom = shapely_wkt.loads(geometry_wkt)
|
||
minx, miny, maxx, maxy = geom.bounds
|
||
if minx == maxx and miny == maxy:
|
||
minx -= _MIN_BBOX_HALF_DEG
|
||
maxx += _MIN_BBOX_HALF_DEG
|
||
miny -= _MIN_BBOX_HALF_DEG
|
||
maxy += _MIN_BBOX_HALF_DEG
|
||
return [minx, miny, maxx, maxy]
|
||
|
||
|
||
def _boa_offset(item: Any) -> int:
|
||
"""Return the BOA additive offset for a STAC item.
|
||
|
||
Processing baseline >= 04.00 applies a -1000 offset; earlier baselines use 0.
|
||
"""
|
||
if item.properties.get("earthsearch:boa_offset_applied"):
|
||
return 0
|
||
baseline_str = str(
|
||
item.properties.get("processing:baseline")
|
||
or item.properties.get("s2:processing_baseline")
|
||
or "0"
|
||
)
|
||
try:
|
||
baseline = float(baseline_str)
|
||
except ValueError:
|
||
baseline = 0.0
|
||
return -1000 if baseline >= 4.0 else 0
|
||
|
||
|
||
def _window_for_bbox(
|
||
src: rasterio.io.DatasetReader,
|
||
bbox_4326: list[float],
|
||
) -> Window | None:
|
||
"""Return the rasterio Window for a EPSG:4326 bbox clipped to src bounds."""
|
||
bbox_geom = {
|
||
"type": "Polygon",
|
||
"coordinates": [
|
||
[
|
||
[bbox_4326[0], bbox_4326[1]],
|
||
[bbox_4326[2], bbox_4326[1]],
|
||
[bbox_4326[2], bbox_4326[3]],
|
||
[bbox_4326[0], bbox_4326[3]],
|
||
[bbox_4326[0], bbox_4326[1]],
|
||
]
|
||
],
|
||
}
|
||
src_geom = transform_geom("EPSG:4326", src.crs.to_wkt(), bbox_geom)
|
||
xs = [c[0] for c in src_geom["coordinates"][0][:4]]
|
||
ys = [c[1] for c in src_geom["coordinates"][0][:4]]
|
||
win = window_from_bounds(min(xs), min(ys), max(xs), max(ys), src.transform)
|
||
try:
|
||
return win.intersection(Window(0, 0, src.width, src.height))
|
||
except WindowError:
|
||
return None
|
||
|
||
|
||
def _read_window(
|
||
href: str,
|
||
bbox_4326: list[float],
|
||
out_shape: tuple[int, int] | None = None,
|
||
resampling: Resampling = Resampling.bilinear,
|
||
) -> tuple[np.ndarray, dict[str, Any]] | None:
|
||
"""Range-read a single-band array for the bbox window from a COG URL."""
|
||
with rasterio.open(href) as src:
|
||
win = _window_for_bbox(src, bbox_4326)
|
||
if win is None:
|
||
return None
|
||
data = src.read(1, window=win, out_shape=out_shape, resampling=resampling)
|
||
profile: dict[str, Any] = {
|
||
"crs": src.crs,
|
||
"transform": window_transform(win, src.transform),
|
||
"height": data.shape[0],
|
||
"width": data.shape[1],
|
||
"dtype": src.dtypes[0],
|
||
}
|
||
return data, profile
|
||
|
||
|
||
def _read_bands(
|
||
item: Any,
|
||
bbox: list[float],
|
||
bands: list[str],
|
||
) -> tuple[list[np.ndarray], dict[str, Any]] | None:
|
||
"""Range-read all requested bands for one STAC item."""
|
||
band_arrays: list[np.ndarray] = []
|
||
ref_profile: dict[str, Any] | None = None
|
||
|
||
for band_name in bands:
|
||
asset_key = _BAND_ASSETS.get(band_name)
|
||
if asset_key is None or asset_key not in item.assets:
|
||
return None
|
||
ref_shape = (
|
||
(ref_profile["height"], ref_profile["width"]) if ref_profile else None
|
||
)
|
||
result = _read_window(item.assets[asset_key].href, bbox, out_shape=ref_shape)
|
||
if result is None:
|
||
return None
|
||
data, profile = result
|
||
if ref_profile is None:
|
||
ref_profile = profile
|
||
band_arrays.append(data.astype("float32"))
|
||
|
||
return (band_arrays, ref_profile) if ref_profile is not None else None
|
||
|
||
|
||
def _cloud_mask(item: Any, bbox: list[float], shape: tuple[int, int]) -> np.ndarray:
|
||
"""Return a boolean cloud/shadow mask from the item's SCL band.
|
||
|
||
Masks SCL classes 0 (no data), 3 (cloud shadow), and >7 (clouds, cirrus, snow).
|
||
"""
|
||
scl = item.assets.get(_SCL_ASSET)
|
||
result = (
|
||
_read_window(scl.href, bbox, out_shape=shape, resampling=Resampling.nearest)
|
||
if scl
|
||
else None
|
||
)
|
||
if result is None:
|
||
return np.zeros(shape, dtype=bool)
|
||
scl_data, _ = result
|
||
return (scl_data == 0) | (scl_data == 3) | (scl_data > 7)
|
||
|
||
|
||
def _pad_to_multiple(arr: np.ndarray, ratio: int) -> np.ndarray:
|
||
"""Zero-pad (bands, H, W) so H and W are multiples of ``ratio``."""
|
||
pad_h = (ratio - arr.shape[1] % ratio) % ratio
|
||
pad_w = (ratio - arr.shape[2] % ratio) % ratio
|
||
if pad_h or pad_w:
|
||
arr = np.pad(arr, ((0, 0), (0, pad_h), (0, pad_w)), constant_values=0)
|
||
return arr
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# S2: STAC search + download (from s2_cloud_native.py)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def stac_search_s2(
|
||
bbox: list[float],
|
||
start_date: datetime,
|
||
end_date: datetime,
|
||
) -> list[Any]:
|
||
"""Search Earth Search for S2 L2A items intersecting a bbox."""
|
||
client = Client.open(EARTH_SEARCH_URL)
|
||
search = client.search(
|
||
collections=["sentinel-2-l2a"],
|
||
bbox=bbox,
|
||
datetime=(
|
||
f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/"
|
||
f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}"
|
||
),
|
||
max_items=10_000,
|
||
)
|
||
return list({item.id: item for item in search.items()}.values())
|
||
|
||
|
||
def _process_item(
|
||
item: Any,
|
||
bbox: list[float],
|
||
bands: list[str],
|
||
output_dir: Path,
|
||
ratio: int,
|
||
) -> str | None:
|
||
"""Range-read one S2 item and write a masked REFL GeoTIFF.
|
||
|
||
Returns a skip-message string when the item cannot be processed, else None.
|
||
"""
|
||
out_path = output_dir / f"{item.id}_REFL.tif"
|
||
if out_path.is_file():
|
||
return None
|
||
bands_result = _read_bands(item, bbox, bands)
|
||
if bands_result is None:
|
||
return f"[S2] Skipping {item.id}: missing asset or no bbox overlap"
|
||
band_arrays, ref_profile = bands_result
|
||
mask = _cloud_mask(item, bbox, (ref_profile["height"], ref_profile["width"]))
|
||
stacked = (np.stack(band_arrays) + _boa_offset(item)) / 10_000.0
|
||
np.clip(stacked, 0, None, out=stacked)
|
||
stacked[:, mask] = 0.0
|
||
stacked = _pad_to_multiple(stacked, ratio)
|
||
out_profile = {
|
||
"driver": "GTiff",
|
||
"count": len(bands),
|
||
"dtype": "float32",
|
||
"nodata": 0,
|
||
"crs": ref_profile["crs"],
|
||
"transform": ref_profile["transform"],
|
||
"height": stacked.shape[1],
|
||
"width": stacked.shape[2],
|
||
"compress": "lzw",
|
||
}
|
||
with rasterio.open(out_path, "w", **out_profile) as dst:
|
||
dst.write(stacked)
|
||
for i, band_name in enumerate(bands, 1):
|
||
dst.set_band_description(i, band_name)
|
||
return None
|
||
|
||
|
||
def download_s2_window(
|
||
items: list[Any],
|
||
bbox: list[float],
|
||
output_dir: Path,
|
||
bands: list[str],
|
||
ratio: int = RESOLUTION_RATIO,
|
||
max_workers: int = 32,
|
||
) -> None:
|
||
"""Range-read S2 L2A COG windows and write masked REFL GeoTIFFs.
|
||
|
||
Writes ``{item.id}_REFL.tif`` directly — no intermediate raw download.
|
||
Cloud/shadow pixels (SCL 0, 3, >7) are zeroed. BOA offset is inferred from
|
||
``processing:baseline``. Output is zero-padded to multiples of ``ratio``.
|
||
Items are fetched in parallel using ``max_workers`` threads.
|
||
"""
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
with rasterio.Env(**_GDAL_COG_ENV):
|
||
with ThreadPoolExecutor(max_workers=max_workers) as pool:
|
||
futures = {
|
||
pool.submit(_process_item, item, bbox, bands, output_dir, ratio): item.id
|
||
for item in items
|
||
}
|
||
with tqdm(total=len(futures), unit="granule", desc="S2 COG window read") as pbar:
|
||
for fut in as_completed(futures):
|
||
msg = fut.result()
|
||
if msg:
|
||
tqdm.write(msg)
|
||
pbar.update(1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# S3: download (from s3_openeo.py)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _utm_epsg(bbox: list[float]) -> int:
|
||
"""Return the UTM EPSG code for the centre of a ``[W, S, E, N]`` bbox."""
|
||
lon = (bbox[0] + bbox[2]) / 2
|
||
lat = (bbox[1] + bbox[3]) / 2
|
||
zone = int((lon + 180) / 6) + 1
|
||
return 32600 + zone if lat >= 0 else 32700 + zone
|
||
|
||
|
||
def _cdse_token(username: str, password: str) -> str:
|
||
"""Obtain a CDSE bearer token via password grant."""
|
||
resp = requests.post(
|
||
CDSE_TOKEN_URL,
|
||
data={
|
||
"grant_type": "password",
|
||
"username": username,
|
||
"password": password,
|
||
"client_id": "cdse-public",
|
||
},
|
||
timeout=30,
|
||
)
|
||
resp.raise_for_status()
|
||
return resp.json()["access_token"]
|
||
|
||
|
||
def _netcdf_to_geotiffs(nc_path: Path, output_dir: Path, epsg: int) -> int:
|
||
"""Split an OpenEO NetCDF into per-date GeoTIFFs.
|
||
|
||
Output filenames match the ``S3*__YYYYMMDDTHHMMSS.tif`` pattern that
|
||
``s3_processing.produce_median_composite`` expects.
|
||
|
||
Handles half-pixel cell-centre coordinates, ascending y-axis (flip_y),
|
||
and fills NetCDF masked values with NaN.
|
||
"""
|
||
written = 0
|
||
with netCDF4.Dataset(str(nc_path), "r") as nc:
|
||
times = netCDF4.num2date(nc.variables["t"][:], nc.variables["t"].units)
|
||
x_coords = np.asarray(nc.variables["x"][:], dtype=float)
|
||
y_coords = np.asarray(nc.variables["y"][:], dtype=float)
|
||
|
||
half_x = abs(x_coords[1] - x_coords[0]) / 2 if len(x_coords) > 1 else 0.0
|
||
half_y = abs(y_coords[1] - y_coords[0]) / 2 if len(y_coords) > 1 else 0.0
|
||
transform = from_bounds(
|
||
x_coords.min() - half_x,
|
||
y_coords.min() - half_y,
|
||
x_coords.max() + half_x,
|
||
y_coords.max() + half_y,
|
||
len(x_coords),
|
||
len(y_coords),
|
||
)
|
||
flip_y = len(y_coords) > 1 and y_coords[0] < y_coords[-1]
|
||
|
||
date_counts: dict[str, int] = {}
|
||
for t_idx, time_val in enumerate(times):
|
||
date_str = time_val.strftime("%Y%m%d")
|
||
n = date_counts.get(date_str, 0)
|
||
date_counts[date_str] = n + 1
|
||
|
||
raw = np.stack(
|
||
[nc.variables[b][t_idx, :, :] for b in S3_BANDS], axis=0
|
||
)
|
||
stacked = (
|
||
np.ma.filled(raw, fill_value=np.nan).astype("float32")
|
||
/ S3_REFLECTANCE_SCALE
|
||
)
|
||
if flip_y:
|
||
stacked = stacked[:, ::-1, :]
|
||
|
||
filename = f"S3_{date_str}_{n}__{date_str}T120000.tif"
|
||
with rasterio.open(
|
||
output_dir / filename,
|
||
"w",
|
||
driver="GTiff",
|
||
height=len(y_coords),
|
||
width=len(x_coords),
|
||
count=len(S3_BANDS),
|
||
dtype="float32",
|
||
nodata=float("nan"),
|
||
crs=f"EPSG:{epsg}",
|
||
transform=transform,
|
||
compress="lzw",
|
||
) as dst:
|
||
dst.write(stacked)
|
||
for i, band_name in enumerate(S3_BAND_NAMES, 1):
|
||
dst.set_band_description(i, band_name)
|
||
written += 1
|
||
|
||
return written
|
||
|
||
|
||
def download_s3_openeo(
|
||
start_date: datetime,
|
||
end_date: datetime,
|
||
aoi_geometry: str,
|
||
output_dir: Path,
|
||
credentials: dict[str, str | None],
|
||
) -> None:
|
||
"""Download S3 SYN L2 SDR for an AOI via CDSE OpenEO, server-side clipped.
|
||
|
||
Writes per-date ``S3_{YYYYMMDD}_{n}__{YYYYMMDD}T120000.tif`` files to
|
||
``output_dir``, ready for ``s3_processing.produce_median_composite``.
|
||
Skips if any ``S3*.tif`` files already exist.
|
||
"""
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
if any(output_dir.glob("S3*.tif")):
|
||
print("[S3-OEO] Skipping — output_dir already contains S3 GeoTIFFs")
|
||
return
|
||
|
||
bbox = wkt_to_bbox(aoi_geometry)
|
||
epsg = _utm_epsg(bbox)
|
||
spatial_extent = {
|
||
"west": bbox[0],
|
||
"east": bbox[2],
|
||
"south": bbox[1],
|
||
"north": bbox[3],
|
||
}
|
||
|
||
print("[S3-OEO] Authenticating with CDSE...")
|
||
token = _cdse_token(credentials["username"], credentials["password"]) # type: ignore[arg-type]
|
||
conn = openeo.connect(OPENEO_URL)
|
||
conn.authenticate_oidc_access_token(token)
|
||
|
||
start_str = start_date.strftime("%Y-%m-%d")
|
||
end_str = end_date.strftime("%Y-%m-%d")
|
||
print(f"[S3-OEO] Loading {S3_COLLECTION} ({start_str} → {end_str})...")
|
||
datacube = conn.load_collection(
|
||
S3_COLLECTION,
|
||
spatial_extent=spatial_extent,
|
||
temporal_extent=[start_str, end_str],
|
||
bands=S3_BANDS,
|
||
).resample_spatial(projection=epsg)
|
||
|
||
nc_path = output_dir / "_s3_syn_l2.nc"
|
||
print(f"[S3-OEO] Downloading NetCDF to {nc_path}...")
|
||
t0 = time.time()
|
||
datacube.download(str(nc_path), format="NetCDF")
|
||
print(f"[S3-OEO] Download completed in {time.time() - t0:.1f}s")
|
||
|
||
print("[S3-OEO] Splitting into per-date GeoTIFFs...")
|
||
written = _netcdf_to_geotiffs(nc_path, output_dir, epsg)
|
||
nc_path.unlink(missing_ok=True)
|
||
print(f"[S3-OEO] {written} GeoTIFFs written to {output_dir}")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# S2: distance_to_clouds helper
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _import_distance_to_clouds():
|
||
try:
|
||
from efast.s2_processing import distance_to_clouds
|
||
|
||
return distance_to_clouds
|
||
except ImportError as exc:
|
||
raise ImportError(
|
||
"efast not found. Install with: uv sync"
|
||
) from exc
|
||
|
||
|
||
def _rescale_dist_cloud(s2_dir: Path) -> None:
|
||
"""Ensure DIST_CLOUD values are in pixel units (not normalised to [0,1])."""
|
||
for dc_path in s2_dir.glob("*DIST_CLOUD.tif"):
|
||
with rasterio.open(dc_path) as src:
|
||
d = src.read(1)
|
||
if float(np.nanmax(d)) <= 1:
|
||
with rasterio.open(dc_path, "r+") as dst:
|
||
dst.write(np.where(d > 0, 2.0, d).astype(np.float32), 1)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# S3: compositing + reprojection helpers (from 4-sentinel-data.py)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def _import_s3_processing():
|
||
try:
|
||
from efast import s3_processing
|
||
|
||
return s3_processing
|
||
except ImportError as exc:
|
||
raise ImportError(
|
||
"efast not found. Install with: uv sync"
|
||
) from exc
|
||
|
||
|
||
def _reproject_s3_composites_to_s2_grid(
|
||
composite_dir: Path,
|
||
s2_refl_path: Path,
|
||
s3_out_dir: Path,
|
||
*,
|
||
resolution_ratio: int = RESOLUTION_RATIO,
|
||
) -> None:
|
||
"""Reproject S3 composites to the S2 spatial grid at LR resolution."""
|
||
s3_out_dir.mkdir(parents=True, exist_ok=True)
|
||
with rasterio.open(s2_refl_path) as s2_ref:
|
||
target_bounds = s2_ref.bounds
|
||
target_crs = s2_ref.crs
|
||
width = s2_ref.width // resolution_ratio
|
||
height = s2_ref.height // resolution_ratio
|
||
s3_transform = rasterio.transform.from_bounds(
|
||
target_bounds.left,
|
||
target_bounds.bottom,
|
||
target_bounds.right,
|
||
target_bounds.top,
|
||
width,
|
||
height,
|
||
)
|
||
|
||
for sen3_path in sorted(composite_dir.glob("composite_*.tif")):
|
||
date_part = sen3_path.stem.split("_", 1)[1].replace("-", "")
|
||
outfile = s3_out_dir / f"composite_{date_part}.tif"
|
||
vrt_options = {
|
||
"transform": s3_transform,
|
||
"height": height,
|
||
"width": width,
|
||
"crs": target_crs,
|
||
"resampling": Resampling.cubic,
|
||
}
|
||
with rasterio.open(sen3_path) as s3_src:
|
||
with WarpedVRT(s3_src, **vrt_options) as vrt:
|
||
profile = vrt.profile.copy()
|
||
profile.update({"dtype": "float32", "nodata": 0, "driver": "GTiff"})
|
||
rio_shutil.copy(vrt, outfile, **profile)
|
||
|
||
|
||
def _s3_reflectance_scale(raw_s3_dir: Path) -> float:
|
||
"""Return multiplier that maps raw SYN L2 SDR values to 0–1 reflectance."""
|
||
for path in raw_s3_dir.glob("S3*.tif"):
|
||
with rasterio.open(path) as src:
|
||
mx = float(np.nanmax(src.read()))
|
||
if np.isfinite(mx) and mx > 5:
|
||
return 1.0 / S3_REFLECTANCE_SCALE
|
||
return 1.0
|
||
|
||
|
||
def _stage_s3_for_efast(raw_s3_dir: Path, staging_dir: Path) -> int:
|
||
"""Copy ``S3_*.tif`` inputs, scaling reflectance when still in DN form."""
|
||
scale = _s3_reflectance_scale(raw_s3_dir)
|
||
if staging_dir.exists():
|
||
shutil.rmtree(staging_dir)
|
||
staging_dir.mkdir(parents=True)
|
||
|
||
count = 0
|
||
for src_path in sorted(raw_s3_dir.glob("S3*.tif")):
|
||
dst_path = staging_dir / src_path.name
|
||
with rasterio.open(src_path) as src:
|
||
data = src.read().astype("float32") * scale
|
||
profile = src.profile.copy()
|
||
profile.update(dtype="float32")
|
||
descriptions = src.descriptions
|
||
with rasterio.open(dst_path, "w", **profile) as dst:
|
||
dst.write(data)
|
||
for i, desc in enumerate(descriptions, 1):
|
||
if desc:
|
||
dst.set_band_description(i, desc)
|
||
count += 1
|
||
|
||
if scale != 1.0:
|
||
print(f"[S3-PREP] Scaled raw SDR by {scale:g} for EFAST compositing")
|
||
return count
|
||
|
||
|
||
def _prepare_s3(
|
||
raw_s3_dir: Path,
|
||
s2_refl_path: Path,
|
||
s3_out_dir: Path,
|
||
*,
|
||
work_dir: Path | None = None,
|
||
) -> None:
|
||
"""Run EFAST S3 compositing pipeline and reproject to S2 grid."""
|
||
s3 = _import_s3_processing()
|
||
base = work_dir or (s3_out_dir / "_efast_work")
|
||
staging = base / "scaled"
|
||
composites = base / "composites"
|
||
blurred = base / "blurred"
|
||
calibrated = base / "calibrated"
|
||
|
||
for directory in (staging, composites, blurred, calibrated):
|
||
if directory.exists():
|
||
shutil.rmtree(directory)
|
||
directory.mkdir(parents=True, exist_ok=True)
|
||
|
||
staged = _stage_s3_for_efast(raw_s3_dir, staging)
|
||
if staged == 0:
|
||
raise ValueError(f"No S3*.tif files found in {raw_s3_dir}")
|
||
|
||
print(
|
||
f"[S3-PREP] produce_median_composite: mosaic_days={S3_MOSAIC_DAYS}, "
|
||
f"step={S3_COMPOSITE_STEP}, sigma_doy={S3_COMPOSITE_SIGMA_DOY}, "
|
||
f"D={S3_COMPOSITE_D}"
|
||
)
|
||
s3.produce_median_composite(
|
||
staging,
|
||
composites,
|
||
step=S3_COMPOSITE_STEP,
|
||
mosaic_days=S3_MOSAIC_DAYS,
|
||
s3_bands=[1, 2, 3, 4],
|
||
D=S3_COMPOSITE_D,
|
||
sigma_doy=S3_COMPOSITE_SIGMA_DOY,
|
||
)
|
||
s3.smoothing(
|
||
composites,
|
||
blurred,
|
||
product="composite",
|
||
std=S3_SMOOTHING_STD,
|
||
preserve_nan=False,
|
||
)
|
||
s3.reformat_s3(blurred, calibrated, product="composite", scaling_factor=1)
|
||
|
||
for old in s3_out_dir.glob("composite_*.tif"):
|
||
old.unlink()
|
||
_reproject_s3_composites_to_s2_grid(calibrated, s2_refl_path, s3_out_dir)
|
||
|
||
if work_dir is None and base.exists():
|
||
shutil.rmtree(base)
|
||
|
||
n_out = len(list(s3_out_dir.glob("composite_*.tif")))
|
||
print(f"[S3-PREP] Wrote {n_out} composites")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Per-site pipeline
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def process_site(
|
||
sitename: str,
|
||
lat: float,
|
||
lon: float,
|
||
year: int,
|
||
) -> dict[str, Any]:
|
||
"""Download S2 + S3 and run EFAST preparation for one site."""
|
||
site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename
|
||
s2_out = site_dir / "prepared" / "s2"
|
||
s3_raw = site_dir / "raw" / "s3"
|
||
s3_out = site_dir / "prepared" / "s3"
|
||
aoi_wkt = f"POINT ({lon} {lat})"
|
||
bbox = wkt_to_bbox(aoi_wkt)
|
||
creds = _cdse_credentials()
|
||
|
||
# S3 download
|
||
print(f"[{sitename}] Downloading S3...")
|
||
download_s3_openeo(
|
||
start_date=datetime(year, 1, 1),
|
||
end_date=datetime(year, 12, 31),
|
||
aoi_geometry=aoi_wkt,
|
||
output_dir=s3_raw,
|
||
credentials=creds,
|
||
)
|
||
|
||
# S2 download
|
||
print(f"[{sitename}] Searching S2 on Earth Search...")
|
||
items = stac_search_s2(bbox, datetime(year, 1, 1), datetime(year, 12, 31))
|
||
print(f"[{sitename}] {len(items)} S2 items found — downloading windows...")
|
||
download_s2_window(items, bbox, s2_out, S2_BANDS, RESOLUTION_RATIO)
|
||
|
||
# S2 distance-to-clouds
|
||
print(f"[{sitename}] Computing distance-to-clouds...")
|
||
distance_to_clouds = _import_distance_to_clouds()
|
||
distance_to_clouds(s2_out, ratio=RESOLUTION_RATIO)
|
||
_rescale_dist_cloud(s2_out)
|
||
|
||
# S3 compositing
|
||
s2_refl_path = next(iter(s2_out.glob("*_REFL.tif")), None)
|
||
if s2_refl_path is None:
|
||
raise ValueError(f"No REFL files in {s2_out} — S2 download may have failed")
|
||
s3_out.mkdir(parents=True, exist_ok=True)
|
||
print(f"[{sitename}] Running S3 compositing pipeline...")
|
||
_prepare_s3(s3_raw, s2_refl_path, s3_out)
|
||
|
||
summary = {
|
||
"sitename": sitename,
|
||
"evaluation_year": year,
|
||
"lat": lat,
|
||
"lon": lon,
|
||
"s2_refl_count": len(list(s2_out.glob("*_REFL.tif"))),
|
||
"s2_dist_cloud_count": len(list(s2_out.glob("*_DIST_CLOUD.tif"))),
|
||
"s3_raw_count": len(list(s3_raw.glob("S3*.tif"))),
|
||
"s3_composite_count": len(list(s3_out.glob("composite_*.tif"))),
|
||
}
|
||
site_dir.mkdir(parents=True, exist_ok=True)
|
||
(site_dir / "data.json").write_text(
|
||
json.dumps(summary, indent=2) + "\n", encoding="utf-8"
|
||
)
|
||
return summary
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# CLI
|
||
# ---------------------------------------------------------------------------
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
parser = argparse.ArgumentParser(description=__doc__)
|
||
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
|
||
parser.add_argument(
|
||
"--site",
|
||
type=str,
|
||
default=None,
|
||
help="Single sitename to process (default: all step-2 PASS sites)",
|
||
)
|
||
args = parser.parse_args(argv)
|
||
year = args.evaluation_year
|
||
|
||
pass_sites = _load_screening_pass_sites(year)
|
||
if not pass_sites:
|
||
print("[Sentinel-3] No PASS sites found in step-2 screening output")
|
||
return 1
|
||
|
||
if args.site:
|
||
pass_sites = [s for s in pass_sites if s["sitename"] == args.site]
|
||
if not pass_sites:
|
||
print(f"[Sentinel-3] Site '{args.site}' not found in step-2 PASS sites")
|
||
return 1
|
||
|
||
print(f"[Sentinel-3] Processing {len(pass_sites)} site(s)")
|
||
for i, site in enumerate(pass_sites, 1):
|
||
sitename = site["sitename"]
|
||
print(f"[Sentinel-3] ({i}/{len(pass_sites)}) {sitename}")
|
||
try:
|
||
summary = process_site(sitename, site["lat"], site["lon"], year)
|
||
print(
|
||
f"[Sentinel-3] {sitename} done — "
|
||
f"{summary['s2_refl_count']} REFL, "
|
||
f"{summary['s3_composite_count']} composites"
|
||
)
|
||
except Exception as exc:
|
||
print(f"[Sentinel-3] {sitename} FAILED: {exc}")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|