Skip to content

ezpz.pbsΒΆ

pbs.py

Contains helper functions for working with the PBS Pro scheduler @ ALCF

See: docs/pbs.md for more information.

build_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None) ΒΆ

Build the launch command for the current job.

Returns:

Name Type Description
str str

The launch command.

Source code in src/ezpz/pbs.py
def build_launch_cmd(
    ngpus: Optional[int] = None,
    nhosts: Optional[int] = None,
    ngpu_per_host: Optional[int] = None,
    hostfile: Optional[Union[str, Path, os.PathLike]] = None,
) -> str:
    """Build the launch command for the current job.

    Returns:
        str: The launch command.
    """
    from ezpz.configs import get_scheduler

    scheduler = get_scheduler().lower()
    if scheduler == "pbs":
        return get_pbs_launch_cmd(
            ngpus=ngpus,
            nhosts=nhosts,
            ngpu_per_host=ngpu_per_host,
            hostfile=hostfile,
        )

    elif scheduler == "slurm":
        import ezpz.slurm

        return ezpz.slurm.build_launch_cmd()
    else:
        raise ValueError(f"Unsupported scheduler: {scheduler}")

get_pbs_env(hostfile=None, jobid=None, verbose=None) ΒΆ

Get the PBS environment variables

Source code in src/ezpz/pbs.py
def get_pbs_env(
    hostfile: Optional[Union[str, Path]] = None,
    jobid: Optional[Union[int, str]] = None,
    verbose: Optional[bool] = None,
) -> dict[str, str]:
    """Get the PBS environment variables"""
    from ezpz.configs import get_scheduler

    assert get_scheduler() == "PBS"
    pbsenv = {k: v for k, v in dict(os.environ).items() if "PBS" in k}
    if hostfile is None:
        hostfile = os.environ.get("PBS_NODEFILE")
    if hostfile is None:
        hostfile = get_pbs_nodefile(jobid=jobid)

    assert hostfile is not None
    if (hfp := Path(hostfile)).is_file():
        pbsenv |= {
            f"{k.upper()}": f"{v}" for k, v in get_pbs_launch_info(hfp).items()
        }
        pbsenv |= {"LAUNCH_CMD": get_pbs_launch_cmd(hostfile=hostfile)}
    os.environ |= pbsenv
    if verbose and ezpz.dist.get_rank() == 0:
        ezpz.dist.log_dict_as_bulleted_list(pbsenv, name="pbsenv")
    return pbsenv

get_pbs_jobid_of_active_job() ΒΆ

Get the jobid of the currently active job.

Source code in src/ezpz/pbs.py
def get_pbs_jobid_of_active_job() -> str | None:
    """Get the jobid of the currently active job."""
    # 1. Find all of users' currently running jobs
    #
    #    ```python
    #    jobs = {
    #        jobid_A: [host_A0, host_A1, host_A2, ..., host_AN],
    #        jobid_B: [host_B0, host_B0, host_B2, ..., host_BN],
    #        ...,
    #    }
    #    ```
    #
    # 2. Loop over {jobid, [hosts]} dictionary.
    #    At each iteration, look and see if _our_ `hostname` is anywhere in the `[hosts]` list.
    #    If so, then we know that we are currently participating in the `jobid` of that entry.
    jobs = get_pbs_running_jobs_for_user()
    for jobid, nodelist in jobs.items():
        # NOTE:
        # - `socket.fqdn()` (fully qualified domain name):
        #   - This will be of the form `x[0-9]+.cm.aurora.alcf.anl.gov`
        #     We only need the part before the first '.'
        if socket.getfqdn().split(".")[0] in nodelist:
            return jobid
    return None

get_pbs_launch_cmd(ngpus=None, nhosts=None, ngpu_per_host=None, hostfile=None, *, verbose=False) ΒΆ

Get the PBS launch command.

ParametersΒΆ

ngpus : int, optional Total number of GPUs to use. If None, inferred from other args or max. nhosts : int, optional Number of hosts (nodes). If None, defaults to max available. ngpu_per_host : int, optional GPUs per host. If None, inferred when possible. hostfile : path-like, optional Hostfile to use. If None, uses a fallback from get_hostfile_with_fallback. verbose : bool, keyword-only If True, log more and pass --verbose (where applicable).

Source code in src/ezpz/pbs.py
def get_pbs_launch_cmd(
    ngpus: Optional[int] = None,
    nhosts: Optional[int] = None,
    ngpu_per_host: Optional[int] = None,
    hostfile: Optional[Pathish] = None,
    *,
    verbose: bool = False,
) -> str:
    """Get the PBS launch command.

    Parameters
    ----------
    ngpus : int, optional
        Total number of GPUs to use. If None, inferred from other args or max.
    nhosts : int, optional
        Number of hosts (nodes). If None, defaults to max available.
    ngpu_per_host : int, optional
        GPUs per host. If None, inferred when possible.
    hostfile : path-like, optional
        Hostfile to use. If None, uses a fallback from `get_hostfile_with_fallback`.
    verbose : bool, keyword-only
        If True, log more and pass `--verbose` (where applicable).
    """

    hostfile = hostfile or get_hostfile_with_fallback(hostfile)
    hostfile_path = Path(hostfile)

    ngpus, nhosts, ngpu_per_host = _infer_topology(
        ngpus=ngpus,
        nhosts=nhosts,
        ngpu_per_host=ngpu_per_host,
        hostfile=hostfile_path,
    )

    ngpus_max = ezpz.get_world_size(total=True)
    emoji = "βœ…" if ngpus == ngpus_max else "⚠️"
    logger.info(
        f"{emoji} Using [{ngpus}/{ngpus_max}] GPUs "
        f"[{nhosts} hosts] x [{ngpu_per_host} GPU/host]"
    )

    if ngpus != ngpus_max:
        logger.warning(
            f"[🚧 WARNING] Using only [{ngpus}/{ngpus_max}] available GPUs!!"
        )

    machine_name = ezpz.get_machine().lower()
    hostfile_str = str(hostfile_path)

    if machine_name == "sophia":
        # mpirun style
        cmd_list = [
            "mpirun",
            f"-n={ngpus}",
            f"-N={ngpu_per_host}",
            f"--hostfile={hostfile_str}",
            "-x PATH",
            "-x LD_LIBRARY_PATH",
        ]
        if verbose:
            cmd_list.append("--verbose")
        return " ".join(cmd_list)

    # Default: mpiexec
    cmd_list = [
        "mpiexec",
        "--envall",
        f"--np={ngpus}",
        f"--ppn={ngpu_per_host}",
        f"--hostfile={hostfile_str}",
    ]
    if verbose:
        cmd_list.append("--verbose")

    cpu_bind_env = os.environ.get("CPU_BIND")
    use_verbose_cpu_bind = ngpus < 1024
    cpu_bind_prefix = (
        "--cpu-bind=verbose," if use_verbose_cpu_bind else "--cpu-bind="
    )

    if cpu_bind_env:
        logger.warning(f"Detected CPU_BIND from environment: {cpu_bind_env}")
        bind_value = cpu_bind_env.replace("--cpu-bind=", "")
        cmd_list.append(f"{cpu_bind_prefix}{bind_value}")
    else:
        is_intel_xpu_machine = machine_name in {"aurora", "sunspot"}
        if is_intel_xpu_machine:
            CPU_BIND_INTEL_XPU = (
                "list:2-4:10-12:18-20:26-28:34-36:42-44:"
                "54-56:62-64:70-72:78-80:86-88:94-96"
            )
            cmd_list.extend(
                [
                    "--no-vni",
                    f"{cpu_bind_prefix}{CPU_BIND_INTEL_XPU}",
                ]
            )
        else:
            # generic CPU binding
            cmd_list.extend(["--cpu-bind=depth", "--depth=8"])

    return " ".join(cmd_list)

get_pbs_launch_info(hostfile=None, jobid=None) ΒΆ

Get the PBS launch info

Source code in src/ezpz/pbs.py
def get_pbs_launch_info(
    hostfile: Optional[str | Path] = None,
    jobid: Optional[int | str] = None,
) -> dict[str, str]:
    """Get the PBS launch info"""
    from ezpz.configs import get_scheduler

    assert get_scheduler() == "PBS"
    if hostfile is None:
        hostfile = get_pbs_nodefile(jobid=jobid)
    assert hostfile is not None
    hfp = Path(hostfile)
    hosts = ezpz.dist.get_nodes_from_hostfile(hfp)
    hosts = [h.split(".")[0] for h in hosts]
    nhosts = len(hosts)
    ngpu_per_host = ezpz.dist.get_gpus_per_node()
    ngpus_available = ezpz.dist.get_world_size(total=True)
    ngpus = nhosts * ngpu_per_host
    world_size_total = ezpz.dist.get_world_size_total()
    launch_cmd = get_pbs_launch_cmd(hostfile=hostfile)
    return {
        "HOSTFILE": hfp.as_posix(),
        "HOSTS": (
            f"[{', '.join(hosts)}]"
            if nhosts < 1000
            else "[truncated (>1000 nodes)]"
        ),
        "NHOSTS": f"{nhosts}",
        "NGPU_PER_HOST": f"{ngpu_per_host}",
        "NGPUS": f"{ngpus}",
        "NGPUS_AVAILABLE": f"{ngpus_available}",
        "MACHINE": ezpz.dist.get_machine(),
        "DEVICE": ezpz.dist.get_torch_device_type(),
        "BACKEND": ezpz.dist.get_torch_backend(),
        "LAUNCH_CMD": launch_cmd,
        "world_size_total": f"{world_size_total}",
    }

get_pbs_nodefile(jobid=None) ΒΆ

Get the nodefile for a given jobid.

Parameters:

Name Type Description Default
jobid int | str

The jobid to get the nodefile for. Defaults to None.

None

Returns:

Name Type Description
str str | None

The path to the nodefile.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile(jobid: Optional[int | str] = None) -> str | None:
    """Get the nodefile for a given jobid.

    Args:
        jobid (int | str, optional): The jobid to get the nodefile for. Defaults to None.

    Returns:
        str: The path to the nodefile.
    """
    if jobid is None:
        jobid = get_pbs_jobid_of_active_job()
    if jobid is None:
        logger.warning("No active job found.")
        return None
    return get_pbs_nodefile_from_jobid(jobid)

get_pbs_nodefile_from_jobid(jobid) ΒΆ

Get the nodefile for a given jobid.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile_from_jobid(jobid: int | str) -> str:
    """Get the nodefile for a given jobid."""
    assert jobid is not None, "No jobid provided and no active job found."

    pbs_parent = Path("/var/spool/pbs/aux")
    pfiles = [
        Path(pbs_parent).joinpath(f)
        for f in os.listdir(pbs_parent)
        if str(jobid) in f
    ]
    assert len(pfiles) == 1, (
        f"Found {len(pfiles)} files matching {jobid} in {pbs_parent}"
    )
    pbs_nodefile = pfiles[0]
    assert pbs_nodefile.is_file(), f"Nodefile {pbs_nodefile} does not exist."
    return pbs_nodefile.absolute().resolve().as_posix()

get_pbs_nodefile_of_active_job() ΒΆ

Get the nodefile for the currently active job.

Source code in src/ezpz/pbs.py
def get_pbs_nodefile_of_active_job() -> str | None:
    """Get the nodefile for the currently active job."""
    jobid = get_pbs_jobid_of_active_job()
    return None if jobid is None else get_pbs_nodefile_from_jobid(jobid)

get_pbs_nodelist_from_jobid(jobid) ΒΆ

Get the nodelist for a given jobid.

Source code in src/ezpz/pbs.py
def get_pbs_nodelist_from_jobid(jobid: int | str) -> list[str]:
    """Get the nodelist for a given jobid."""
    assert jobid is not None, "No jobid provided and no active job found."
    jobs = get_pbs_running_jobs_for_user()
    assert str(jobid) in jobs, (
        f"Job ID {jobid} not found in running jobs for user {getuser()}"
    )
    return jobs[str(jobid)]

get_pbs_running_jobs_for_user() ΒΆ

Get all running jobs for the current user.

Source code in src/ezpz/pbs.py
def get_pbs_running_jobs_for_user():
    """Get all running jobs for the current user."""
    try:
        from sh import qstat  # type:ignore
    except Exception as e:
        print("Error importing sh.qstat:", e)
        raise e

    jobarr = [
        i for i in qstat(f"-fn1wru {getuser()}").split("\n") if " R " in i
    ]
    jobs = {}
    for row in jobarr:
        jstr = [i for i in row.split(" ") if len(i) > 0]
        jobid = jstr[0].split(".")[0]
        nodelist = [h.split("/")[0] for h in jstr[-1].split("+")]
        jobs[jobid] = nodelist

    return jobs

get_running_jobs_from_qstat() ΒΆ

Get the running jobs from qstat

Source code in src/ezpz/pbs.py
def get_running_jobs_from_qstat() -> list[int]:
    """Get the running jobs from qstat"""
    try:
        from sh import qstat as shqstat  # type: ignore
    except Exception as e:
        raise e
    return [
        int(i.split(".")[0])
        for i in shqstat("-u", os.environ.get("USER")).split("\n")[2:-1]
        if " R " in i
    ]