Added run-pipeline.py script.

This commit is contained in:
Felix Delattre 2026-06-11 17:47:50 +02:00
parent 60dbf932f8
commit ae88e2291c
2 changed files with 158 additions and 0 deletions

View file

@ -18,6 +18,22 @@ End-to-end pipeline from selecting sites from the global [PhenoCam Network](http
## Quick start
### Run pipeline wrapper (recommended)
```bash
uv sync
uv run python run-pipeline.py --evaluation-year 2025
```
Runs all five steps in order. Steps 1 and 2 are skipped when their output already exists. Each site in steps 35 is skipped when `data/metrics/{year}/{site}/metrics.json` is present. Any failure stops the run immediately, so one can fix the issue and re-run; completed work is never repeated.
```bash
# single site (steps 1 and 2 still skip if already done)
uv run python run-pipeline.py --evaluation-year 2025 --site ICOSFR-Fon1
```
### Step by step
```bash
uv sync
uv run python 1-phenocam.py --evaluation-year 2025

142
run-pipeline.py Normal file
View file

@ -0,0 +1,142 @@
"""Pipeline wrapper: run steps 1 → 2 → 3 → 4 → 5.
Steps 1 and 2 run once for the whole year (skipped when their output already
exists). Steps 35 run site-by-site for every PASS site from
``data/phenocam_screening/{year}.json``; a site is skipped entirely when
``data/metrics/{year}/{site}/metrics.json`` already exists.
Any failure stops the run immediately. Fix the issue and re-run completed
steps and sites are skipped automatically.
CLI:
- ``--evaluation-year`` (default 2025)
- ``--site`` single site to run steps 35 for (default: all PASS sites)
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from pathlib import Path
from typing import Any
DATA_DIR = Path("data")
DEFAULT_YEAR = 2025
# Steps 1 and 2 are global (once per year); steps 35 repeat per site.
GLOBAL_STEPS: list[tuple[str, Path]] = [
# (script, skip-if-this-file-exists)
("1-phenocam.py", Path("phenocam/{year}.json")),
("2-phenocam-screening.py", Path("phenocam_screening/{year}.json")),
]
PER_SITE_STEPS = [
"3-sentinel-data.py",
"4-fusion.py",
"5-metrics.py",
]
def _load_pass_sites(year: int) -> list[str]:
path = DATA_DIR / "phenocam_screening" / f"{year}.json"
if not path.is_file():
raise SystemExit(f"Step-2 screening manifest not found: {path}")
payload: dict[str, Any] = json.loads(path.read_text(encoding="utf-8"))
sites = []
for row in payload.get("sites", []):
if row.get("calculations", {}).get("status") != "PASS":
continue
name = row.get("response", {}).get("camera", {}).get("Sitename")
if name:
sites.append(str(name))
return sorted(sites)
def _is_site_complete(year: int, site: str) -> bool:
return (DATA_DIR / "metrics" / str(year) / site / "metrics.json").is_file()
def _run_global_step(step: str, year: int) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year)],
check=False,
)
return result.returncode
def _run_site_step(step: str, year: int, site: str) -> int:
result = subprocess.run(
[sys.executable, step, "--evaluation-year", str(year), "--site", site],
check=False,
)
return result.returncode
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--evaluation-year", type=int, default=DEFAULT_YEAR)
parser.add_argument(
"--site",
type=str,
default=None,
help="Single sitename to process (default: all PASS sites)",
)
args = parser.parse_args(argv)
year = args.evaluation_year
# --- Global steps (steps 1 and 2) ---
for script, marker_template in GLOBAL_STEPS:
marker = DATA_DIR / str(marker_template).replace("{year}", str(year))
if marker.is_file():
print(f"[pipeline] {script} — skipping (output already exists)")
else:
print(f"[pipeline] Running {script}...")
rc = _run_global_step(script, year)
if rc != 0:
raise SystemExit(
f"[pipeline] {script} failed (exit {rc}); cannot continue"
)
# --- Per-site steps (steps 35) ---
sites = _load_pass_sites(year)
if args.site:
if args.site not in sites:
raise SystemExit(
f"Site '{args.site}' not found in step-2 PASS sites for {year}"
)
sites = [args.site]
n_total = len(sites)
skipped: list[str] = []
succeeded: list[str] = []
print(f"[pipeline] {n_total} PASS site(s) for {year}")
for i, site in enumerate(sites, 1):
prefix = f"[pipeline] ({i}/{n_total}) {site}"
if _is_site_complete(year, site):
print(f"{prefix} — skipping (already complete)")
skipped.append(site)
continue
print(f"{prefix}")
for step in PER_SITE_STEPS:
rc = _run_site_step(step, year, site)
if rc != 0:
raise SystemExit(f"[pipeline] {step} failed for {site} (exit {rc})")
succeeded.append(site)
print(
f"\n[pipeline] Done: {len(skipped)} skipped, {len(succeeded)} succeeded"
f" (of {n_total} total)"
)
return 0
if __name__ == "__main__":
raise SystemExit(main())