190 lines
7.1 KiB
Python
190 lines
7.1 KiB
Python
"""Sentinel-2-MSI acquisition from AWS Element84 Earth Search (STAC catalog)."""
|
|
import numpy as np
|
|
import rasterio
|
|
import xml.etree.ElementTree as ET
|
|
import requests
|
|
from pathlib import Path
|
|
from rasterio.crs import CRS
|
|
from rasterio.warp import Resampling, calculate_default_transform, reproject, transform_geom
|
|
from rasterio.windows import from_bounds, transform as window_transform
|
|
from pystac_client import Client
|
|
|
|
BBOX_SIZE = 0.011
|
|
TARGET_CRS = CRS.from_epsg(32632)
|
|
|
|
|
|
def _get_bbox(lon, lat):
|
|
half = BBOX_SIZE / 2
|
|
return [lon - half, lat - half, lon + half, lat + half]
|
|
|
|
|
|
def _get_window_for_bbox(src, bbox):
|
|
bbox_geom = {
|
|
"type": "Polygon",
|
|
"coordinates": [
|
|
[
|
|
[bbox[0], bbox[1]],
|
|
[bbox[2], bbox[1]],
|
|
[bbox[2], bbox[3]],
|
|
[bbox[0], bbox[3]],
|
|
[bbox[0], bbox[1]],
|
|
]
|
|
],
|
|
}
|
|
bbox_transformed = transform_geom("EPSG:4326", src.crs, bbox_geom)
|
|
coords = bbox_transformed["coordinates"][0]
|
|
x_coords = [c[0] for c in coords[:4]]
|
|
y_coords = [c[1] for c in coords[:4]]
|
|
bbox_crs = [min(x_coords), min(y_coords), max(x_coords), max(y_coords)]
|
|
src_bounds = src.bounds
|
|
intersect_bbox = [
|
|
max(bbox_crs[0], src_bounds.left),
|
|
max(bbox_crs[1], src_bounds.bottom),
|
|
min(bbox_crs[2], src_bounds.right),
|
|
min(bbox_crs[3], src_bounds.top),
|
|
]
|
|
return from_bounds(*intersect_bbox, src.transform)
|
|
|
|
|
|
def _extract_viewing_angle(item):
|
|
if "granule_metadata" not in item.assets:
|
|
return None
|
|
try:
|
|
xml_url = item.assets["granule_metadata"].href
|
|
xml_resp = requests.get(xml_url, timeout=10)
|
|
xml_resp.raise_for_status()
|
|
root = ET.fromstring(xml_resp.content)
|
|
angles = [
|
|
abs(float(zenith_elem.text))
|
|
for angle_elem in root.findall(".//Mean_Viewing_Incidence_Angle")
|
|
if (zenith_elem := angle_elem.find("ZENITH_ANGLE")) is not None
|
|
]
|
|
return angles[0] if angles else None
|
|
except Exception as e:
|
|
print(f"[S2] Warning: Could not extract viewing angle: {e}")
|
|
return None
|
|
|
|
|
|
def download_s2(season, site_position, site_name, date_range=None):
|
|
lat, lon = site_position
|
|
datetime_range = date_range or f"{season}-01-01/{season}-12-31"
|
|
output_dir = Path(f"data/{site_name}/{season}/raw/s2/")
|
|
|
|
print(f"[S2] Starting download: {site_name} ({lat:.6f}, {lon:.6f}), {season}")
|
|
|
|
bbox = _get_bbox(lon, lat)
|
|
bands = {"B02": "blue", "B03": "green", "B04": "red", "B8A": "nir"}
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print("[S2] Connecting to STAC catalog...")
|
|
client = Client.open("https://earth-search.aws.element84.com/v1")
|
|
search = client.search(
|
|
collections=["sentinel-2-l2a"],
|
|
intersects={"type": "Point", "coordinates": [lon, lat]},
|
|
datetime=datetime_range,
|
|
max_items=1000,
|
|
)
|
|
|
|
print("[S2] Searching items...")
|
|
items_by_key = {}
|
|
for item in search.items():
|
|
date = item.datetime.strftime("%Y%m%d")
|
|
parts = item.id.split("_")
|
|
increment = parts[3] if len(parts) > 3 else "0"
|
|
key = (date, increment)
|
|
if key not in items_by_key:
|
|
items_by_key[key] = item
|
|
|
|
print(f"[S2] Found {len(items_by_key)} unique items")
|
|
|
|
for (date, increment), item in items_by_key.items():
|
|
filepath = output_dir / f"{date}_{increment}.geotiff"
|
|
if filepath.exists():
|
|
print(f"[S2] Skipping {date}_{increment}.geotiff (exists)")
|
|
continue
|
|
|
|
print(f"[S2] Processing {date}_{increment}...")
|
|
band_data = {}
|
|
profile = None
|
|
|
|
for band_name, asset_name in bands.items():
|
|
if asset_name not in item.assets:
|
|
continue
|
|
asset = item.assets[asset_name]
|
|
with rasterio.open(asset.href) as src:
|
|
window = _get_window_for_bbox(src, bbox)
|
|
if window.height <= 0 or window.width <= 0:
|
|
continue
|
|
data = src.read(window=window)
|
|
new_transform = window_transform(window, src.transform)
|
|
if profile is None:
|
|
profile = {
|
|
"driver": "GTiff",
|
|
"height": window.height,
|
|
"width": window.width,
|
|
"count": len(bands),
|
|
"dtype": data.dtype,
|
|
"crs": src.crs,
|
|
"transform": new_transform,
|
|
"compress": "lzw",
|
|
}
|
|
band_idx = list(bands.keys()).index(band_name)
|
|
band_data[band_idx] = data[0]
|
|
|
|
if profile and len(band_data) == len(bands):
|
|
stacked = np.array([band_data[i] for i in sorted(band_data.keys())])
|
|
band_names = [list(bands.keys())[i] for i in sorted(band_data.keys())]
|
|
viewing_angle = _extract_viewing_angle(item)
|
|
|
|
if profile["crs"] != TARGET_CRS:
|
|
src_transform = profile["transform"]
|
|
src_height, src_width = profile["height"], profile["width"]
|
|
left, bottom, right, top = rasterio.transform.array_bounds(
|
|
src_height, src_width, src_transform
|
|
)
|
|
dst_transform, dst_width, dst_height = calculate_default_transform(
|
|
profile["crs"], TARGET_CRS, src_width, src_height,
|
|
left=left, bottom=bottom, right=right, top=top,
|
|
)
|
|
reprojected = np.empty(
|
|
(len(stacked), dst_height, dst_width), dtype=stacked.dtype
|
|
)
|
|
for i in range(len(stacked)):
|
|
reproject(
|
|
source=stacked[i],
|
|
destination=reprojected[i],
|
|
src_transform=src_transform,
|
|
src_crs=profile["crs"],
|
|
dst_transform=dst_transform,
|
|
dst_crs=TARGET_CRS,
|
|
resampling=Resampling.bilinear,
|
|
)
|
|
stacked = reprojected
|
|
profile.update({
|
|
"crs": TARGET_CRS,
|
|
"transform": dst_transform,
|
|
"width": dst_width,
|
|
"height": dst_height,
|
|
})
|
|
|
|
with rasterio.open(filepath, "w", **profile) as dst:
|
|
for i, data in enumerate(stacked, 1):
|
|
dst.write(data, i)
|
|
dst.set_band_description(i, band_names[i - 1])
|
|
tags = {}
|
|
if viewing_angle is not None:
|
|
tags["VIEWING_ZENITH_ANGLE"] = str(viewing_angle)
|
|
pb = item.properties.get("s2:processing_baseline")
|
|
if pb is not None:
|
|
tags["PROCESSING_BASELINE"] = str(pb)
|
|
if tags:
|
|
dst.update_tags(**tags)
|
|
|
|
angle_msg = (
|
|
f" (viewing angle: {viewing_angle:.2f}°)" if viewing_angle else ""
|
|
)
|
|
print(f"[S2] Saved: {filepath}{angle_msg}")
|
|
else:
|
|
print(f"[S2] Skipping {date}_{increment} (missing bands)")
|
|
|
|
print("[S2] Completed")
|