Skip to content

ezpz.doctorΒΆ

Runtime diagnostics for cluster readiness and local environment health.

CheckResult dataclass ΒΆ

Structured outcome for individual diagnostic checks.

Source code in src/ezpz/doctor.py
@dataclass(frozen=True, slots=True)
class CheckResult:
    """Structured outcome for individual diagnostic checks."""

    name: str
    status: Status
    message: str
    remedy: Optional[str] = None

    def _get_status(self) -> str:
        if self.status == "ok":
            return f"βœ… {colors.green}OKAY{colors.reset}"
        if self.status == "warning":
            return f"⚠️ {colors.yellow}WARN{colors.reset}"
        if self.status == "error":
            return f"🚨 {colors.red}ERROR{colors.reset}"
        raise ValueError(f"Unexpected value for {self.status=}")

    def get_status(self) -> str:
        lines: list[str] = []
        summary = " ".join(
            [
                f"[{colors.reset}{self._get_status():>7}{colors.reset}]",
                f"[{colors.blue}{self.name:<9}{colors.reset}]: {self.message}",
            ]
        )
        lines.append(summary)
        if self.remedy:
            lines.append(f"          ↳ {self.remedy}")

        return "\n".join(lines)

    def to_dict(self) -> dict[str, str | None]:
        return {
            "name": self.name,
            "status": self.status,
            "message": self.message,
            "remedy": self.remedy,
        }

check_mpi(which=shutil.which) ΒΆ

Verify mpi4py importability and presence of a launcher command.

Source code in src/ezpz/doctor.py
def check_mpi(
    which: Callable[[str], Optional[str]] = shutil.which,
) -> CheckResult:
    """Verify mpi4py importability and presence of a launcher command."""
    try:
        from mpi4py import MPI  # noqa: F401

        mpi_available = True
    except Exception:  # pragma: no cover - exercised via negative paths
        mpi_available = False

    has_launcher = any(
        _command_exists(cmd, which=which) for cmd in ("mpiexec", "mpirun")
    )
    if mpi_available and has_launcher:
        return CheckResult(
            name="mpi",
            status="ok",
            message="mpi4py import succeeded and an MPI launcher was found.",
        )
    if mpi_available:
        return CheckResult(
            name="mpi",
            status="warning",
            message="mpi4py is importable, but no MPI launcher was detected on PATH.",
            remedy="Install mpiexec/mpirun or load the appropriate module before launching distributed jobs.",
        )
    if has_launcher:
        return CheckResult(
            name="mpi",
            status="warning",
            message="An MPI launcher is available, but mpi4py could not be imported.",
            remedy="Install mpi4py into the active environment so Python workers can join the MPI communicator.",
        )
    return CheckResult(
        name="mpi",
        status="error",
        message="Neither mpi4py nor a launcher (mpiexec/mpirun) is available.",
        remedy="Install mpi4py and ensure an MPI runtime is accessible on PATH.",
    )

check_scheduler(*, get_scheduler=ezpz.configs.get_scheduler, environ=None) ΒΆ

Determine scheduler visibility from environment variables.

Source code in src/ezpz/doctor.py
def check_scheduler(
    *,
    get_scheduler: Callable[[], str] = ezpz.configs.get_scheduler,
    environ: Optional[dict[str, str]] = None,
) -> CheckResult:
    """Determine scheduler visibility from environment variables."""
    env = os.environ if environ is None else environ
    scheduler = get_scheduler()
    if scheduler in {"PBS", "SLURM"}:
        return CheckResult(
            name="scheduler",
            status="ok",
            message=f"Detected active scheduler: {scheduler}.",
        )
    suspect_vars = [
        key
        for key in ("PBS_JOBID", "SLURM_JOB_ID", "SLURM_JOBID")
        if env.get(key)
    ]
    if suspect_vars:
        return CheckResult(
            name="scheduler",
            status="warning",
            message="Scheduler variables detected but mapping returned UNKNOWN.",
            remedy="Confirm ezpz.configs.get_scheduler recognises this host or provide a plug-in adapter.",
        )
    return CheckResult(
        name="scheduler",
        status="warning",
        message="No scheduler detected – assuming local launch mode.",
        remedy="Set scheduler environment variables or configure a custom adapter if running under a job queue.",
    )

check_torch_device() ΒΆ

Check torch availability and configured accelerator.

Source code in src/ezpz/doctor.py
def check_torch_device() -> CheckResult:
    """Check torch availability and configured accelerator."""
    try:
        import torch
    except Exception:  # pragma: no cover - optional dependency
        return CheckResult(
            name="torch",
            status="error",
            message="PyTorch is not importable in the current environment.",
            remedy="Install torch (matching your accelerator) or activate the environment that provides it.",
        )

    env_device = os.environ.get("TORCH_DEVICE")
    if env_device:
        return CheckResult(
            name="torch",
            status="ok",
            message=f"TORCH_DEVICE={env_device} (PyTorch {torch.__version__}).",
        )
    device_ok = (
        (torch.cuda.is_available() and torch.cuda.device_count() > 0)
        or (
            hasattr(torch, "xpu")
            and torch.xpu.is_available()
            and torch.xpu.device_count() > 0
        )
        or torch.backends.mps.is_built()
        and torch.backends.mps.is_available()
    )
    if device_ok:
        return CheckResult(
            name="torch",
            status="ok",
            message="PyTorch detected at least one accelerator.",
        )
    return CheckResult(
        name="torch",
        status="warning",
        message="PyTorch import succeeded but no accelerators were detected.",
        remedy="Confirm drivers are available or set TORCH_DEVICE=cpu for CPU-only execution.",
    )

check_wandb(environ=None) ΒΆ

Advise on Weights & Biases connectivity expectations.

Source code in src/ezpz/doctor.py
def check_wandb(environ: Optional[dict[str, str]] = None) -> CheckResult:
    """Advise on Weights & Biases connectivity expectations."""
    env = os.environ if environ is None else environ
    try:
        import wandb  # type: ignore # pragma: no cover - optional dependency

        _ = wandb.__version__
        wandb_importable = True
    except Exception:
        wandb_importable = False

    api_key = env.get("WANDB_API_KEY")
    offline_mode = env.get("WANDB_MODE", "").lower() == "offline"

    if not wandb_importable:
        if api_key or not offline_mode:
            return CheckResult(
                name="wandb",
                status="warning",
                message="WANDB credentials present but the library could not be imported.",
                remedy="Install `ezpz[monitoring]` or set WANDB_MODE=offline to suppress remote logging.",
            )
        return CheckResult(
            name="wandb",
            status="ok",
            message="wandb not installed and no cloud logging requested.",
        )

    if offline_mode:
        return CheckResult(
            name="wandb",
            status="ok",
            message="wandb is available and offline logging is configured.",
        )
    if api_key:
        return CheckResult(
            name="wandb",
            status="ok",
            message="wandb is available and WANDB_API_KEY is set for cloud logging.",
        )
    if ezpz.dist._verify_wandb_from_netrc_config():
        return CheckResult(
            name="wandb",
            status="ok",
            message="wandb authentication provided in '~/.netrc' Should be all set.",
        )
    return CheckResult(
        name="wandb",
        status="warning",
        message="wandb installed but WANDB_API_KEY is not configured.",
        remedy="Set WANDB_MODE=offline for air-gapped runs or export WANDB_API_KEY for remote tracking.",
    )

parse_args(argv=None) ΒΆ

Parse CLI arguments for the doctor command.

Source code in src/ezpz/doctor.py
def parse_args(argv: Optional[Sequence[str]] = None):
    """Parse CLI arguments for the doctor command."""
    parser = build_doctor_parser()
    return parser.parse_args(argv)

run(argv=None) ΒΆ

Entry point used by the CLI glue.

Source code in src/ezpz/doctor.py
def run(argv: Optional[Sequence[str]] = None) -> int:
    """Entry point used by the CLI glue."""
    args = parse_args(argv)
    results = run_checks()
    worst_status = max(results, key=lambda r: STATUS_PRIORITY[r.status]).status
    if args.json:
        print(json.dumps([r.to_dict() for r in results], indent=2))
    else:
        _print_runtime_context()
        rstrs = [r.get_status() for r in results]
        for r in rstrs:
            print(r)
    return 0 if STATUS_PRIORITY[worst_status] < STATUS_PRIORITY["error"] else 1

run_checks(checks=None) ΒΆ

Execute all diagnostic checks, returning structured results.

Source code in src/ezpz/doctor.py
def run_checks(
    checks: list[Callable] | None = None,
) -> list[CheckResult]:
    """Execute all diagnostic checks, returning structured results."""
    checks = (
        [
            check_mpi,
            check_wandb,
            check_torch_device,
            check_hostfile,
            check_scheduler,
        ]
        if checks is None
        else checks
    )
    results: list[CheckResult] = []
    for check in checks:
        try:
            results.append(check())
        except Exception as exc:  # pragma: no cover - defensive
            name = getattr(check, "__name__", "")
            logger.exception(f"Diagnostic check {name} crashed.")
            results.append(
                CheckResult(
                    name=name,
                    status="error",
                    message=f"Check raised {exc!r}",
                    remedy="Inspect the full stack trace above and report the failure.",
                )
            )
    return results