efast-phenocam-validation/run-pipeline.py
2026-06-11 17:49:52 +02:00

142 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Pipeline wrapper: run steps 1 → 2 → 3 → 4 → 5.
Steps 1 and 2 run once for the whole year (skipped when their output already
exists). Steps 35 run site-by-site for every PASS site from
``data/phenocam_screening/{year}.json``; a site is skipped entirely when
``data/metrics/{year}/{site}/metrics.json`` already exists.
Any failure stops the run immediately. Fix the issue and re-run — completed
steps and sites are skipped automatically.
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` single site to run steps 35 for (default: all PASS sites)
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# Steps 1 and 2 are global (once per year); steps 35 repeat per site.
GLOBAL_STEPS: list[tuple[str, Path]] = [
# (script, skip-if-this-file-exists)
("1-phenocam.py", Path("phenocam/{year}.json")),
("2-phenocam-screening.py", Path("phenocam_screening/{year}.json")),
]
PER_SITE_STEPS = [
"3-sentinel-data.py",
"4-fusion.py",
"5-metrics.py",
]
def _load_pass_sites(year: int) -> list[str]:
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not path.is_file():
raise SystemExit(f"Step-2 screening manifest not found: {path}")
payload: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
sites = []
for row in payload.get("sites", []):
if row.get("calculations", {}).get("status") != "PASS":
continue
name = row.get("response", {}).get("camera", {}).get("Sitename")
if name:
sites.append(str(name))
return sorted(sites)
def _is_site_complete(year: int, site: str) -> bool:
return (DATA_DIR / "metrics" / str(year) / site / "metrics.json").is_file()
def _run_global_step(step: str, year: int) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year)],
check=False,
)
return result.returncode
def _run_site_step(step: str, year: int, site: str) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year), "--site", site],
check=False,
)
return result.returncode
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to process (default: all PASS sites)",
)
args = parser.parse_args(argv)
year = args.evaluation_year
# --- Global steps (steps 1 and 2) ---
for script, marker_template in GLOBAL_STEPS:
marker = DATA_DIR / str(marker_template).replace("{year}", str(year))
if marker.is_file():
print(f"[pipeline] {script} — skipping (output already exists)")
else:
print(f"[pipeline] Running {script}...")
rc = _run_global_step(script, year)
if rc != 0:
raise SystemExit(
f"[pipeline] {script} failed (exit {rc}); cannot continue"
)
# --- Per-site steps (steps 35) ---
sites = _load_pass_sites(year)
if args.site:
if args.site not in sites:
raise SystemExit(
f"Site '{args.site}' not found in step-2 PASS sites for {year}"
)
sites = [args.site]
n_total = len(sites)
skipped: list[str] = []
succeeded: list[str] = []
print(f"[pipeline] {n_total} PASS site(s) for {year}")
for i, site in enumerate(sites, 1):
prefix = f"[pipeline] ({i}/{n_total}) {site}"
if _is_site_complete(year, site):
print(f"{prefix} — skipping (already complete)")
skipped.append(site)
continue
print(f"{prefix}")
for step in PER_SITE_STEPS:
rc = _run_site_step(step, year, site)
if rc != 0:
raise SystemExit(f"[pipeline] {step} failed for {site} (exit {rc})")
succeeded.append(site)
print(
f"\n[pipeline] Done: {len(skipped)} skipped, {len(succeeded)} succeeded"
f" (of {n_total} total)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())