"""Step 3: Download S2 and S3 rasters and prepare EFAST inputs. Inputs (``data/``, ``{year}`` = ``--evaluation-year``): - ``phenocam_screening/{year}.json`` — step-2 PASS sites (coordinates included) Outputs (``data/``): - ``sentinel_data/{year}/{sitename}/raw/s3/*.tif`` — S3 SYN L2 per-date GeoTIFFs - ``sentinel_data/{year}/{sitename}/prepared/s2/`` — S2 REFL + DIST_CLOUD GeoTIFFs - ``sentinel_data/{year}/{sitename}/prepared/s3/`` — S3 composite GeoTIFFs - ``sentinel_data/{year}/{sitename}/data.json`` — run summary Requires ``CDSE_USER`` / ``CDSE_PASSWORD`` in ``../.env`` (workspace root; ``uv sync`` installs efast). CLI: - ``--evaluation-year`` (default 2025) - ``--site`` (optional; default: all step-2 PASS sites) Prior step: :mod:`2-phenocam-screening`. Next step: :mod:`4-fusion`. """ from __future__ import annotations import argparse import json import os import shutil import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Any import netCDF4 import numpy as np import openeo import rasterio import requests from dotenv import load_dotenv from pystac_client import Client from rasterio import shutil as rio_shutil from rasterio.enums import Resampling from rasterio.errors import WindowError from rasterio.transform import from_bounds from rasterio.vrt import WarpedVRT from rasterio.warp import transform_geom from rasterio.windows import Window from rasterio.windows import from_bounds as window_from_bounds from rasterio.windows import transform as window_transform from shapely import wkt as shapely_wkt from tqdm import tqdm # --------------------------------------------------------------------------- # Public constants — edit here to change pipeline behaviour # --------------------------------------------------------------------------- S2_BANDS = ["B02", "B03", "B04"] S3_BANDS = [ "Syn_Oa04_reflectance", "Syn_Oa06_reflectance", "Syn_Oa08_reflectance", "Syn_Oa17_reflectance", ] S3_BAND_NAMES = ["SDR_Oa04", "SDR_Oa06", "SDR_Oa08", "SDR_Oa17"] RESOLUTION_RATIO = 30 S3_MOSAIC_DAYS = 100 S3_COMPOSITE_STEP = 2 S3_COMPOSITE_SIGMA_DOY = 10 S3_COMPOSITE_D = 20 S3_SMOOTHING_STD = 1 S3_REFLECTANCE_SCALE = 10_000 # OpenEO SYN L2 SDR → 0–1 (EFAST expects < 5) # --------------------------------------------------------------------------- # Internal S2 constants # --------------------------------------------------------------------------- EARTH_SEARCH_URL = "https://earth-search.aws.element84.com/v1" _BAND_ASSETS: dict[str, str] = { "B02": "blue", "B03": "green", "B04": "red", "B05": "rededge1", "B06": "rededge2", "B07": "rededge3", "B08": "nir", "B8A": "nir08", "B11": "swir16", "B12": "swir22", } _SCL_ASSET = "scl" _MIN_BBOX_HALF_DEG = 0.008 _GDAL_COG_ENV = { # HTTP/1.1 avoids HTTP/2 multiplexing connection-reset cascades on S3. "GDAL_HTTP_VERSION": "1.1", "GDAL_HTTP_MERGE_CONSECUTIVE_RANGES": "YES", "GDAL_HTTP_TCP_KEEPALIVE": "YES", "GDAL_DISABLE_READDIR_ON_OPEN": "EMPTY_DIR", "CPL_VSIL_CURL_CACHE_SIZE": "200000000", # Built-in GDAL retries for 429/502/503/504 and transient resets. "GDAL_HTTP_MAX_RETRY": "3", "GDAL_HTTP_RETRY_DELAY": "0.5", "AWS_NO_SIGN_REQUEST": "YES", } # --------------------------------------------------------------------------- # Internal S3 constants # --------------------------------------------------------------------------- CDSE_TOKEN_URL = ( "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/" "protocol/openid-connect/token" ) OPENEO_URL = "openeo.dataspace.copernicus.eu" S3_COLLECTION = "SENTINEL3_SYN_L2_SYN" DATA_DIR = Path("data") DEFAULT_YEAR = 2025 WORKSPACE_ROOT = Path(__file__).resolve().parent.parent # --------------------------------------------------------------------------- # Credentials # --------------------------------------------------------------------------- def _cdse_credentials() -> dict[str, str | None]: load_dotenv(WORKSPACE_ROOT / ".env") return { "username": os.getenv("CDSE_USER"), "password": os.getenv("CDSE_PASSWORD"), } # --------------------------------------------------------------------------- # Screening manifest helpers # --------------------------------------------------------------------------- def _load_screening_pass_sites(year: int) -> list[dict[str, Any]]: """Return list of PASS-site dicts from step-2 screening JSON. Each entry has ``sitename``, ``lat``, ``lon`` keys. """ path = DATA_DIR / "phenocam_screening" / f"{year}.json" if not path.is_file(): raise FileNotFoundError(f"Step-2 screening manifest not found: {path}") payload = json.loads(path.read_text(encoding="utf-8")) sites = [] for row in payload.get("sites", []): calc = row.get("calculations", {}) if calc.get("status") != "PASS": continue camera = row.get("response", {}).get("camera", {}) name = camera.get("Sitename") lat = camera.get("Lat") lon = camera.get("Lon") if name and lat is not None and lon is not None: sites.append({"sitename": str(name), "lat": float(lat), "lon": float(lon)}) return sites # --------------------------------------------------------------------------- # S2: geometry helpers (from s2_cloud_native.py) # --------------------------------------------------------------------------- def wkt_to_bbox(geometry_wkt: str) -> list[float]: """Convert a WKT geometry to a ``[west, south, east, north]`` bbox.""" geom = shapely_wkt.loads(geometry_wkt) minx, miny, maxx, maxy = geom.bounds if minx == maxx and miny == maxy: minx -= _MIN_BBOX_HALF_DEG maxx += _MIN_BBOX_HALF_DEG miny -= _MIN_BBOX_HALF_DEG maxy += _MIN_BBOX_HALF_DEG return [minx, miny, maxx, maxy] def _boa_offset(item: Any) -> int: """Return the BOA additive offset for a STAC item. Processing baseline >= 04.00 applies a -1000 offset; earlier baselines use 0. """ if item.properties.get("earthsearch:boa_offset_applied"): return 0 baseline_str = str( item.properties.get("processing:baseline") or item.properties.get("s2:processing_baseline") or "0" ) try: baseline = float(baseline_str) except ValueError: baseline = 0.0 return -1000 if baseline >= 4.0 else 0 def _window_for_bbox( src: rasterio.io.DatasetReader, bbox_4326: list[float], ) -> Window | None: """Return the rasterio Window for a EPSG:4326 bbox clipped to src bounds.""" bbox_geom = { "type": "Polygon", "coordinates": [ [ [bbox_4326[0], bbox_4326[1]], [bbox_4326[2], bbox_4326[1]], [bbox_4326[2], bbox_4326[3]], [bbox_4326[0], bbox_4326[3]], [bbox_4326[0], bbox_4326[1]], ] ], } src_geom = transform_geom("EPSG:4326", src.crs.to_wkt(), bbox_geom) xs = [c[0] for c in src_geom["coordinates"][0][:4]] ys = [c[1] for c in src_geom["coordinates"][0][:4]] win = window_from_bounds(min(xs), min(ys), max(xs), max(ys), src.transform) try: return win.intersection(Window(0, 0, src.width, src.height)) except WindowError: return None def _read_window( href: str, bbox_4326: list[float], out_shape: tuple[int, int] | None = None, resampling: Resampling = Resampling.bilinear, ) -> tuple[np.ndarray, dict[str, Any]] | None: """Range-read a single-band array for the bbox window from a COG URL.""" with rasterio.open(href) as src: win = _window_for_bbox(src, bbox_4326) if win is None: return None data = src.read(1, window=win, out_shape=out_shape, resampling=resampling) profile: dict[str, Any] = { "crs": src.crs, "transform": window_transform(win, src.transform), "height": data.shape[0], "width": data.shape[1], "dtype": src.dtypes[0], } return data, profile def _read_bands( item: Any, bbox: list[float], bands: list[str], ) -> tuple[list[np.ndarray], dict[str, Any]] | None: """Range-read all requested bands for one STAC item.""" band_arrays: list[np.ndarray] = [] ref_profile: dict[str, Any] | None = None for band_name in bands: asset_key = _BAND_ASSETS.get(band_name) if asset_key is None or asset_key not in item.assets: return None ref_shape = ( (ref_profile["height"], ref_profile["width"]) if ref_profile else None ) result = _read_window(item.assets[asset_key].href, bbox, out_shape=ref_shape) if result is None: return None data, profile = result if ref_profile is None: ref_profile = profile band_arrays.append(data.astype("float32")) return (band_arrays, ref_profile) if ref_profile is not None else None def _cloud_mask(item: Any, bbox: list[float], shape: tuple[int, int]) -> np.ndarray: """Return a boolean cloud/shadow mask from the item's SCL band. Masks SCL classes 0 (no data), 3 (cloud shadow), and >7 (clouds, cirrus, snow). """ scl = item.assets.get(_SCL_ASSET) result = ( _read_window(scl.href, bbox, out_shape=shape, resampling=Resampling.nearest) if scl else None ) if result is None: return np.zeros(shape, dtype=bool) scl_data, _ = result return (scl_data == 0) | (scl_data == 3) | (scl_data > 7) def _pad_to_multiple(arr: np.ndarray, ratio: int) -> np.ndarray: """Zero-pad (bands, H, W) so H and W are multiples of ``ratio``.""" pad_h = (ratio - arr.shape[1] % ratio) % ratio pad_w = (ratio - arr.shape[2] % ratio) % ratio if pad_h or pad_w: arr = np.pad(arr, ((0, 0), (0, pad_h), (0, pad_w)), constant_values=0) return arr # --------------------------------------------------------------------------- # S2: STAC search + download (from s2_cloud_native.py) # --------------------------------------------------------------------------- def stac_search_s2( bbox: list[float], start_date: datetime, end_date: datetime, ) -> list[Any]: """Search Earth Search for S2 L2A items intersecting a bbox.""" client = Client.open(EARTH_SEARCH_URL) search = client.search( collections=["sentinel-2-l2a"], bbox=bbox, datetime=( f"{start_date.strftime('%Y-%m-%dT%H:%M:%SZ')}/" f"{end_date.strftime('%Y-%m-%dT23:59:59Z')}" ), max_items=10_000, ) return list({item.id: item for item in search.items()}.values()) def _process_item( item: Any, bbox: list[float], bands: list[str], output_dir: Path, ratio: int, ) -> str | None: """Range-read one S2 item and write a masked REFL GeoTIFF. Returns a skip-message string when the item cannot be processed, else None. """ out_path = output_dir / f"{item.id}_REFL.tif" if out_path.is_file(): return None bands_result = _read_bands(item, bbox, bands) if bands_result is None: return f"[S2] Skipping {item.id}: missing asset or no bbox overlap" band_arrays, ref_profile = bands_result mask = _cloud_mask(item, bbox, (ref_profile["height"], ref_profile["width"])) stacked = (np.stack(band_arrays) + _boa_offset(item)) / 10_000.0 np.clip(stacked, 0, None, out=stacked) stacked[:, mask] = 0.0 stacked = _pad_to_multiple(stacked, ratio) out_profile = { "driver": "GTiff", "count": len(bands), "dtype": "float32", "nodata": 0, "crs": ref_profile["crs"], "transform": ref_profile["transform"], "height": stacked.shape[1], "width": stacked.shape[2], "compress": "lzw", } with rasterio.open(out_path, "w", **out_profile) as dst: dst.write(stacked) for i, band_name in enumerate(bands, 1): dst.set_band_description(i, band_name) return None def download_s2_window( items: list[Any], bbox: list[float], output_dir: Path, bands: list[str], ratio: int = RESOLUTION_RATIO, max_workers: int = 12, ) -> None: """Range-read S2 L2A COG windows and write masked REFL GeoTIFFs. Writes ``{item.id}_REFL.tif`` directly — no intermediate raw download. Cloud/shadow pixels (SCL 0, 3, >7) are zeroed. BOA offset is inferred from ``processing:baseline``. Output is zero-padded to multiples of ``ratio``. Items are fetched in parallel using ``max_workers`` threads. """ output_dir.mkdir(parents=True, exist_ok=True) with rasterio.Env(**_GDAL_COG_ENV): with ThreadPoolExecutor(max_workers=max_workers) as pool: futures = { pool.submit( _process_item, item, bbox, bands, output_dir, ratio ): item.id for item in items } with tqdm( total=len(futures), unit="granule", desc="S2 COG window read" ) as pbar: for fut in as_completed(futures): msg = fut.result() if msg: tqdm.write(msg) pbar.update(1) # --------------------------------------------------------------------------- # S3: download (from s3_openeo.py) # --------------------------------------------------------------------------- def _utm_epsg(bbox: list[float]) -> int: """Return the UTM EPSG code for the centre of a ``[W, S, E, N]`` bbox.""" lon = (bbox[0] + bbox[2]) / 2 lat = (bbox[1] + bbox[3]) / 2 zone = int((lon + 180) / 6) + 1 return 32600 + zone if lat >= 0 else 32700 + zone def _cdse_token(username: str, password: str) -> str: """Obtain a CDSE bearer token via password grant.""" resp = requests.post( CDSE_TOKEN_URL, data={ "grant_type": "password", "username": username, "password": password, "client_id": "cdse-public", }, timeout=30, ) resp.raise_for_status() return resp.json()["access_token"] def _netcdf_to_geotiffs(nc_path: Path, output_dir: Path, epsg: int) -> int: """Split an OpenEO NetCDF into per-date GeoTIFFs. Output filenames match the ``S3*__YYYYMMDDTHHMMSS.tif`` pattern that ``s3_processing.produce_median_composite`` expects. Handles half-pixel cell-centre coordinates, ascending y-axis (flip_y), and fills NetCDF masked values with NaN. """ written = 0 with netCDF4.Dataset(str(nc_path), "r") as nc: times = netCDF4.num2date(nc.variables["t"][:], nc.variables["t"].units) x_coords = np.asarray(nc.variables["x"][:], dtype=float) y_coords = np.asarray(nc.variables["y"][:], dtype=float) half_x = abs(x_coords[1] - x_coords[0]) / 2 if len(x_coords) > 1 else 0.0 half_y = abs(y_coords[1] - y_coords[0]) / 2 if len(y_coords) > 1 else 0.0 transform = from_bounds( x_coords.min() - half_x, y_coords.min() - half_y, x_coords.max() + half_x, y_coords.max() + half_y, len(x_coords), len(y_coords), ) flip_y = len(y_coords) > 1 and y_coords[0] < y_coords[-1] date_counts: dict[str, int] = {} for t_idx, time_val in enumerate(times): date_str = time_val.strftime("%Y%m%d") n = date_counts.get(date_str, 0) date_counts[date_str] = n + 1 raw = np.stack([nc.variables[b][t_idx, :, :] for b in S3_BANDS], axis=0) stacked = ( np.ma.filled(raw, fill_value=np.nan).astype("float32") / S3_REFLECTANCE_SCALE ) if flip_y: stacked = stacked[:, ::-1, :] filename = f"S3_{date_str}_{n}__{date_str}T120000.tif" with rasterio.open( output_dir / filename, "w", driver="GTiff", height=len(y_coords), width=len(x_coords), count=len(S3_BANDS), dtype="float32", nodata=float("nan"), crs=f"EPSG:{epsg}", transform=transform, compress="lzw", ) as dst: dst.write(stacked) for i, band_name in enumerate(S3_BAND_NAMES, 1): dst.set_band_description(i, band_name) written += 1 return written _S3_DOWNLOAD_RETRIES = 4 _S3_DOWNLOAD_BACKOFF = 30 # seconds; doubled on each retry def _download_with_retry(datacube: Any, nc_path: Path) -> None: """Download an OpenEO datacube to *nc_path*, retrying on transient errors. Retries up to ``_S3_DOWNLOAD_RETRIES`` times with exponential backoff starting at ``_S3_DOWNLOAD_BACKOFF`` seconds. Re-authenticates on each attempt so an expired token never blocks a retry. """ delay = _S3_DOWNLOAD_BACKOFF last_exc: Exception | None = None for attempt in range(1, _S3_DOWNLOAD_RETRIES + 1): try: if nc_path.exists(): nc_path.unlink() datacube.download(str(nc_path), format="NetCDF") return except Exception as exc: last_exc = exc if attempt < _S3_DOWNLOAD_RETRIES: print( f"[S3-OEO] Download attempt {attempt} failed ({exc}); " f"retrying in {delay}s..." ) time.sleep(delay) delay *= 2 else: print(f"[S3-OEO] All {_S3_DOWNLOAD_RETRIES} download attempts failed") raise RuntimeError( f"S3 download failed after {_S3_DOWNLOAD_RETRIES} attempts" ) from last_exc def download_s3_openeo( start_date: datetime, end_date: datetime, aoi_geometry: str, output_dir: Path, credentials: dict[str, str | None], ) -> None: """Download S3 SYN L2 SDR for an AOI via CDSE OpenEO, server-side clipped. Writes per-date ``S3_{YYYYMMDD}_{n}__{YYYYMMDD}T120000.tif`` files to ``output_dir``, ready for ``s3_processing.produce_median_composite``. Skips if any ``S3*.tif`` files already exist. """ output_dir.mkdir(parents=True, exist_ok=True) if any(output_dir.glob("S3*.tif")): print("[S3-OEO] Skipping — output_dir already contains S3 GeoTIFFs") return bbox = wkt_to_bbox(aoi_geometry) epsg = _utm_epsg(bbox) spatial_extent = { "west": bbox[0], "east": bbox[2], "south": bbox[1], "north": bbox[3], } print("[S3-OEO] Authenticating with CDSE...") token = _cdse_token(credentials["username"], credentials["password"]) # type: ignore[arg-type] conn = openeo.connect(OPENEO_URL) conn.authenticate_oidc_access_token(token) start_str = start_date.strftime("%Y-%m-%d") end_str = end_date.strftime("%Y-%m-%d") print(f"[S3-OEO] Loading {S3_COLLECTION} ({start_str} → {end_str})...") datacube = conn.load_collection( S3_COLLECTION, spatial_extent=spatial_extent, temporal_extent=[start_str, end_str], bands=S3_BANDS, ).resample_spatial(projection=epsg) nc_path = output_dir / "_s3_syn_l2.nc" print(f"[S3-OEO] Downloading NetCDF to {nc_path}...") t0 = time.time() _download_with_retry(datacube, nc_path) print(f"[S3-OEO] Download completed in {time.time() - t0:.1f}s") print("[S3-OEO] Splitting into per-date GeoTIFFs...") written = _netcdf_to_geotiffs(nc_path, output_dir, epsg) nc_path.unlink(missing_ok=True) print(f"[S3-OEO] {written} GeoTIFFs written to {output_dir}") # --------------------------------------------------------------------------- # S2: distance_to_clouds helper # --------------------------------------------------------------------------- def _import_distance_to_clouds(): try: from efast.s2_processing import distance_to_clouds return distance_to_clouds except ImportError as exc: raise ImportError("efast not found. Install with: uv sync") from exc def _normalize_s2_grid(s2_dir: Path) -> None: """Remove REFL (and companion DIST_CLOUD/GCC) files that don't match the dominant shape. Sites at MGRS tile boundaries can have S2 downloads from two tiles with different spatial extents. For example, a site at the top edge of tile 16SDA will produce narrow (e.g. 30×180) REFL files from 16SDA and full-extent (180×180) files from 16SDB. EFAST requires all S2 files to share the same grid, so we keep only the shape with the most pixels (largest tile coverage) and discard the rest together with any already-computed companions. """ refl_files = sorted(s2_dir.glob("*_REFL.tif")) if not refl_files: return shape_to_files: dict[tuple[int, int], list[Path]] = {} for f in refl_files: with rasterio.open(f) as src: shape: tuple[int, int] = src.shape # type: ignore[assignment] shape_to_files.setdefault(shape, []).append(f) if len(shape_to_files) <= 1: return ref_shape = max(shape_to_files.keys(), key=lambda s: s[0] * s[1]) shapes_summary = ", ".join( f"{s[0]}×{s[1]}({len(fs)})" for s, fs in shape_to_files.items() ) n_removed = 0 for shape, files in shape_to_files.items(): if shape == ref_shape: continue for refl_path in files: stem = refl_path.stem[: -len("_REFL")] # e.g. "S2A_16SDA_20250106_0_L2A" for companion in s2_dir.glob(f"{stem}_*.tif"): companion.unlink() refl_path.unlink(missing_ok=True) n_removed += 1 print( f"[S2-NORM] Tile boundary — shapes: {shapes_summary}; " f"removed {n_removed} file-sets, kept {ref_shape[0]}×{ref_shape[1]}" ) def _rescale_dist_cloud(s2_dir: Path) -> None: """Ensure DIST_CLOUD values are in pixel units (not normalised to [0,1]).""" for dc_path in s2_dir.glob("*DIST_CLOUD.tif"): with rasterio.open(dc_path) as src: d = src.read(1) if float(np.nanmax(d)) <= 1: with rasterio.open(dc_path, "r+") as dst: dst.write(np.where(d > 0, 2.0, d).astype(np.float32), 1) # --------------------------------------------------------------------------- # S3: compositing + reprojection helpers (from 4-sentinel-data.py) # --------------------------------------------------------------------------- def _import_s3_processing(): try: from efast import s3_processing return s3_processing except ImportError as exc: raise ImportError("efast not found. Install with: uv sync") from exc def _reproject_s3_composites_to_s2_grid( composite_dir: Path, s2_refl_path: Path, s3_out_dir: Path, *, resolution_ratio: int = RESOLUTION_RATIO, ) -> None: """Reproject S3 composites to the S2 spatial grid at LR resolution.""" s3_out_dir.mkdir(parents=True, exist_ok=True) with rasterio.open(s2_refl_path) as s2_ref: target_bounds = s2_ref.bounds target_crs = s2_ref.crs width = s2_ref.width // resolution_ratio height = s2_ref.height // resolution_ratio s3_transform = rasterio.transform.from_bounds( target_bounds.left, target_bounds.bottom, target_bounds.right, target_bounds.top, width, height, ) for sen3_path in sorted(composite_dir.glob("composite_*.tif")): date_part = sen3_path.stem.split("_", 1)[1].replace("-", "") outfile = s3_out_dir / f"composite_{date_part}.tif" vrt_options = { "transform": s3_transform, "height": height, "width": width, "crs": target_crs, "resampling": Resampling.cubic, } with rasterio.open(sen3_path) as s3_src: with WarpedVRT(s3_src, **vrt_options) as vrt: profile = vrt.profile.copy() profile.update({"dtype": "float32", "nodata": 0, "driver": "GTiff"}) rio_shutil.copy(vrt, outfile, **profile) def _s3_reflectance_scale(raw_s3_dir: Path) -> float: """Return multiplier that maps raw SYN L2 SDR values to 0–1 reflectance.""" for path in raw_s3_dir.glob("S3*.tif"): with rasterio.open(path) as src: data = src.read() if not np.any(np.isfinite(data)): continue mx = float(np.nanmax(data)) if np.isfinite(mx) and mx > 5: return 1.0 / S3_REFLECTANCE_SCALE return 1.0 def _stage_s3_for_efast(raw_s3_dir: Path, staging_dir: Path) -> int: """Copy ``S3_*.tif`` inputs, scaling reflectance when still in DN form.""" scale = _s3_reflectance_scale(raw_s3_dir) if staging_dir.exists(): shutil.rmtree(staging_dir) staging_dir.mkdir(parents=True) count = 0 for src_path in sorted(raw_s3_dir.glob("S3*.tif")): dst_path = staging_dir / src_path.name with rasterio.open(src_path) as src: data = src.read().astype("float32") * scale profile = src.profile.copy() profile.update(dtype="float32") descriptions = src.descriptions with rasterio.open(dst_path, "w", **profile) as dst: dst.write(data) for i, desc in enumerate(descriptions, 1): if desc: dst.set_band_description(i, desc) count += 1 if scale != 1.0: print(f"[S3-PREP] Scaled raw SDR by {scale:g} for EFAST compositing") return count def _prepare_s3( raw_s3_dir: Path, s2_refl_path: Path, s3_out_dir: Path, *, work_dir: Path | None = None, ) -> None: """Run EFAST S3 compositing pipeline and reproject to S2 grid.""" s3 = _import_s3_processing() base = work_dir or (s3_out_dir / "_efast_work") staging = base / "scaled" composites = base / "composites" blurred = base / "blurred" calibrated = base / "calibrated" for directory in (staging, composites, blurred, calibrated): if directory.exists(): shutil.rmtree(directory) directory.mkdir(parents=True, exist_ok=True) staged = _stage_s3_for_efast(raw_s3_dir, staging) if staged == 0: raise ValueError(f"No S3*.tif files found in {raw_s3_dir}") print( f"[S3-PREP] produce_median_composite: mosaic_days={S3_MOSAIC_DAYS}, " f"step={S3_COMPOSITE_STEP}, sigma_doy={S3_COMPOSITE_SIGMA_DOY}, " f"D={S3_COMPOSITE_D}" ) s3.produce_median_composite( staging, composites, step=S3_COMPOSITE_STEP, mosaic_days=S3_MOSAIC_DAYS, s3_bands=[1, 2, 3, 4], D=S3_COMPOSITE_D, sigma_doy=S3_COMPOSITE_SIGMA_DOY, ) s3.smoothing( composites, blurred, product="composite", std=S3_SMOOTHING_STD, preserve_nan=False, ) s3.reformat_s3(blurred, calibrated, product="composite", scaling_factor=1) for old in s3_out_dir.glob("composite_*.tif"): old.unlink() _reproject_s3_composites_to_s2_grid(calibrated, s2_refl_path, s3_out_dir) if work_dir is None and base.exists(): shutil.rmtree(base) n_out = len(list(s3_out_dir.glob("composite_*.tif"))) print(f"[S3-PREP] Wrote {n_out} composites") # --------------------------------------------------------------------------- # Per-site pipeline # --------------------------------------------------------------------------- def process_site( sitename: str, lat: float, lon: float, year: int, ) -> dict[str, Any]: """Download S2 + S3 and run EFAST preparation for one site.""" site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename s2_out = site_dir / "prepared" / "s2" s3_raw = site_dir / "raw" / "s3" s3_out = site_dir / "prepared" / "s3" aoi_wkt = f"POINT ({lon} {lat})" bbox = wkt_to_bbox(aoi_wkt) creds = _cdse_credentials() # S3 download print(f"[{sitename}] Downloading S3...") download_s3_openeo( start_date=datetime(year, 1, 1), end_date=datetime(year, 12, 31), aoi_geometry=aoi_wkt, output_dir=s3_raw, credentials=creds, ) # S2 download print(f"[{sitename}] Searching S2 on Earth Search...") items = stac_search_s2(bbox, datetime(year, 1, 1), datetime(year, 12, 31)) print(f"[{sitename}] {len(items)} S2 items found — downloading windows...") download_s2_window(items, bbox, s2_out, S2_BANDS, RESOLUTION_RATIO) _normalize_s2_grid(s2_out) # S2 distance-to-clouds print(f"[{sitename}] Computing distance-to-clouds...") distance_to_clouds = _import_distance_to_clouds() distance_to_clouds(s2_out, ratio=RESOLUTION_RATIO) _rescale_dist_cloud(s2_out) # S3 compositing s2_refl_path = next(iter(s2_out.glob("*_REFL.tif")), None) if s2_refl_path is None: raise ValueError(f"No REFL files in {s2_out} — S2 download may have failed") s3_out.mkdir(parents=True, exist_ok=True) print(f"[{sitename}] Running S3 compositing pipeline...") _prepare_s3(s3_raw, s2_refl_path, s3_out) summary = { "sitename": sitename, "evaluation_year": year, "lat": lat, "lon": lon, "s2_refl_count": len(list(s2_out.glob("*_REFL.tif"))), "s2_dist_cloud_count": len(list(s2_out.glob("*_DIST_CLOUD.tif"))), "s3_raw_count": len(list(s3_raw.glob("S3*.tif"))), "s3_composite_count": len(list(s3_out.glob("composite_*.tif"))), } site_dir.mkdir(parents=True, exist_ok=True) (site_dir / "data.json").write_text( json.dumps(summary, indent=2) + "\n", encoding="utf-8" ) return summary # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR) parser.add_argument( "--site", type=str, default=None, help="Single sitename to process (default: all step-2 PASS sites)", ) parser.add_argument( "--skip-downloaded", action="store_true", help="Skip sites whose directory already exists under data/sentinel_data/{year}/", ) args = parser.parse_args(argv) year = args.evaluation_year pass_sites = _load_screening_pass_sites(year) if not pass_sites: print("[Sentinel data] No PASS sites found in step-2 screening output") return 1 if args.site: pass_sites = [s for s in pass_sites if s["sitename"] == args.site] if not pass_sites: print(f"[Sentinel data] Site '{args.site}' not found in step-2 PASS sites") return 1 print(f"[Sentinel data] Processing {len(pass_sites)} site(s)") for i, site in enumerate(pass_sites, 1): sitename = site["sitename"] site_dir = DATA_DIR / "sentinel_data" / str(year) / sitename if args.skip_downloaded and site_dir.exists(): print( f"[Sentinel data] ({i}/{len(pass_sites)}) {sitename} — skipping (directory exists)" ) continue print(f"[Sentinel data] ({i}/{len(pass_sites)}) {sitename}") summary = process_site(sitename, site["lat"], site["lon"], year) print( f"[Sentinel data] {sitename} done — " f"{summary['s2_refl_count']} REFL, " f"{summary['s3_composite_count']} composites" ) return 0 if __name__ == "__main__": raise SystemExit(main())