Skip to content

ezpz.jobs

Helpers for persisting and replaying scheduler job environments.

jobs.py

add_to_jobslog(hostfile=None)

Append the current job directory to the jobs log if it is new.

Source code in src/ezpz/jobs.py
def add_to_jobslog(hostfile: Optional[Union[str, Path]] = None):
    """Append the current job directory to the jobs log if it is new."""
    jobenv = get_jobenv(hostfile=hostfile)
    assert len(jobenv.keys()) > 0
    jobdir = get_jobdir_from_env()
    assert jobenv is not None
    assert jobdir is not None
    jobslog_file = get_jobslog_file()
    # jobfile_sh = get_jobfile_sh()
    # jobfile_yaml = get_jobfile_yaml()
    Path(jobslog_file).parent.mkdir(exist_ok=True, parents=True)
    last_jobdir = get_jobdir_from_jobslog(-1)
    if jobdir.as_posix() != last_jobdir:
        with jobslog_file.open("a") as f:
            _ = f.write(f"{jobdir}\n")
    else:
        log.warning(
            " ".join(
                [
                    f"{jobdir.as_posix()} ",
                    f"already in {jobslog_file.as_posix()}, ",
                    "not appending !!",
                ]
            )
        )

check_scheduler(scheduler=None)

Ensure the detected scheduler is supported.

Parameters

scheduler: Optional override. When None the value from :func:ezpz.configs.get_scheduler is used.

Returns

bool True when the scheduler is recognised, otherwise a TypeError is raised.

Source code in src/ezpz/jobs.py
def check_scheduler(scheduler: Optional[str] = None) -> bool:
    """Ensure the detected scheduler is supported.

    Parameters
    ----------
    scheduler:
        Optional override.  When ``None`` the value from
        :func:`ezpz.configs.get_scheduler` is used.

    Returns
    -------
    bool
        ``True`` when the scheduler is recognised, otherwise a ``TypeError`` is
        raised.
    """
    from ezpz.configs import SCHEDULERS

    scheduler = get_scheduler() if scheduler is None else scheduler
    if scheduler is not None and len(scheduler) > 0:
        assert scheduler.upper() in SCHEDULERS.values()
    if scheduler.lower() != "pbs":
        raise TypeError(f"{scheduler} not yet implemented!")
    return True

get_jobdir_from_env()

Return the directory used to persist job metadata on the login node.

Source code in src/ezpz/jobs.py
def get_jobdir_from_env() -> Path:
    """Return the directory used to persist job metadata on the login node."""
    from ezpz.pbs import get_pbs_env

    pbs_env = get_pbs_env()
    scheduler = get_scheduler()
    if scheduler.lower() != "pbs" and os.environ.get("PBS_JOBID"):
        scheduler = "PBS"
    _ = os.environ.get(
        "HOSTFILE",
        os.environ.get("PBS_NODEFILE", os.environ.get("HOSTFILE", None)),
    )
    jobid = pbs_env["PBS_JOBID"].split(".")[0]
    jobdir = Path.home() / f"{scheduler}-jobs" / f"{jobid}"
    jobdir.mkdir(exist_ok=True, parents=True)
    return jobdir

get_jobdir_from_jobslog(idx=-1)

Return the most recent job directory (or the idx entry).

Source code in src/ezpz/jobs.py
def get_jobdir_from_jobslog(idx: Optional[int] = -1) -> str:  # noqa  type:ignore
    """Return the most recent job directory (or the ``idx`` entry)."""
    # return Path(jobdirs[0] if len(jobdirs) == 1 else jobdirs[-idx]
    # jobdirs = get_jobdirs_from_jobslog()
    # if len(jobdirs) > 0:
    #     jobdir = jobdirs[0] if len(jobdirs) == 1 else jobdirs[-idx]
    # else:
    #     jobdir = get_jobdir_from_env()
    # return Path(jobdir).as_posix()
    return get_jobdir_from_env().as_posix()

get_jobdirs_from_jobslog()

Return all previously recorded job directories from the jobs log.

Source code in src/ezpz/jobs.py
def get_jobdirs_from_jobslog() -> list[str]:
    """Return all previously recorded job directories from the jobs log."""
    jobslog_file = get_jobslog_file()
    jobdirs: list[str] = []
    if jobslog_file.is_file():
        with jobslog_file.open("r") as f:
            jobdirs.extend([jd.rstrip("\n") for jd in f.readlines()])
    return jobdirs

get_jobenv(framework=None, hostfile=None, verbose=None, verbose_dist_info=None, verbose_pbs_env=None)

Collect runtime information describing the active batch job.

Source code in src/ezpz/jobs.py
def get_jobenv(
    framework: Optional[str] = None,
    hostfile: Optional[Union[str, Path]] = None,
    # max_hosts_to_print: Optional[int] = None,
    verbose: Optional[bool] = None,
    verbose_dist_info: Optional[bool] = None,
    verbose_pbs_env: Optional[bool] = None,
) -> dict:
    """Collect runtime information describing the active batch job."""
    from ezpz.dist import \
        get_dist_info  # get_pbs_launch_info,; get_pbs_launch_cmd,
    from ezpz.pbs import get_pbs_env

    hostfile = os.environ.get("HOSTFILE") if hostfile is None else hostfile
    if hostfile is None:
        hostfile = os.environ.get("PBS_NODEFILE")

    jobenv: dict[str, str | int | list[Any]] = {}
    try:
        jobenv |= get_dist_info(
            hostfile=hostfile,
            framework=framework,
            verbose=verbose_dist_info,
        )
    except Exception as exc:
        if verbose:
            log.debug(f"Falling back to minimal jobenv: {exc}")
    scheduler = get_scheduler()
    if scheduler.lower() != "pbs" and os.environ.get("PBS_JOBID"):
        scheduler = "PBS"
    if scheduler.lower() == "pbs":
        # from ezpz.dist import get_pbs_launch_cmd
        # if (dlaunch := os.environ.get("DIST_LAUNCH", None)) is not None:
        #     dinfo |= {"DIST_LAUNCH": dlaunch}
        jobenv |= get_pbs_env(hostfile=hostfile, verbose=verbose_pbs_env)
        # jobenv |= get_pbs_launch_info(hostfile=hostfile)
        # jobenv |= {'LAUNCH_CMD': get_pbs_launch_cmd(hostfile=hostfile)}
        if verbose:
            log.info(
                "\n".join(
                    ["\n", "[JOB ENV]:"]
                    + [f"  • {k}={v}" for k, v in jobenv.items()]
                    + ["\n"]
                )
            )
        # jobenv |= get_dist_info(
        #     framework=framework,
        #     verbose=verbose,
        #     max_hosts_to_print=max_hosts_to_print,
        #     hostfile=hostfile,
        # )
        return jobenv
    # TODO: Add Slurm support to Python API
    raise ValueError(f"{scheduler} not yet implemented!")

get_jobfile_ext(ext)

Return the path to the stored job metadata with a given extension.

Source code in src/ezpz/jobs.py
def get_jobfile_ext(ext: str) -> Path:
    """Return the path to the stored job metadata with a given extension."""
    # jobid = get_jobid()
    jobdir = get_jobdir_from_env()
    # return jobdir.joinpath(f'{SCHEDULER}-{jobid}.{ext}')
    return jobdir.joinpath(f"jobenv.{ext}")

get_jobfile_json()

Return jobenv.json creating parent directories as needed.

Source code in src/ezpz/jobs.py
def get_jobfile_json() -> Path:
    """Return ``jobenv.json`` creating parent directories as needed."""
    jobfile_ext = get_jobfile_ext("json")
    jobfile_ext.parent.mkdir(exist_ok=True, parents=True)
    return jobfile_ext

get_jobfile_sh()

Return jobenv.sh creating parent directories as needed.

Source code in src/ezpz/jobs.py
def get_jobfile_sh() -> Path:
    """Return ``jobenv.sh`` creating parent directories as needed."""
    jobfile_sh = get_jobfile_ext("sh")
    jobfile_sh.parent.mkdir(exist_ok=True, parents=True)
    return jobfile_sh

get_jobfile_yaml()

Return jobenv.yaml creating parent directories as needed.

Source code in src/ezpz/jobs.py
def get_jobfile_yaml() -> Path:
    """Return ``jobenv.yaml`` creating parent directories as needed."""
    jobfile_ext = get_jobfile_ext("yaml")
    jobfile_ext.parent.mkdir(exist_ok=True, parents=True)
    return jobfile_ext

get_jobid()

Return the job identifier (without scheduler suffix).

Source code in src/ezpz/jobs.py
def get_jobid() -> str:
    """Return the job identifier (without scheduler suffix)."""
    jobenv = get_jobenv()
    return jobenv["PBS_JOBID"].split(".")[0]

get_jobslog_file()

Return the path to the rolling job history log in $HOME.

Source code in src/ezpz/jobs.py
def get_jobslog_file() -> Path:
    """Return the path to the rolling job history log in ``$HOME``."""
    scheduler = get_scheduler()
    jobslog_file = Path.home().joinpath(f"{scheduler}-jobs.log")
    jobslog_file.parent.mkdir(exist_ok=True, parents=True)
    return jobslog_file

loadjobenv(jobdir=None)

Load job environment metadata from the cache under jobdir.

Source code in src/ezpz/jobs.py
def loadjobenv(jobdir: Optional[str | Path] = None) -> dict[str, str]:
    """Load job environment metadata from the cache under ``jobdir``."""
    from ezpz.dist import get_dist_info
    from ezpz.pbs import get_pbs_launch_info

    jobenv = {}
    jobdir = Path(get_jobdir_from_jobslog(-1) if jobdir is None else jobdir)
    assert jobdir.is_dir()
    jobenv = loadjobenv_from_yaml(jobdir=jobdir)
    jobenv |= get_pbs_launch_info()
    jobenv |= {
        f"{k.upper()}": f"{v}"
        for k, v in (get_dist_info("pytorch", verbose=False).items())
    }
    for key, val in jobenv.items():
        os.environ[key] = val.as_posix() if isinstance(val, Path) else f"{val}"
    dotenv_file = save_to_dotenv_file(jobenv)
    # print_json(data=jobenv, indent=4, sort_keys=True)
    log.info(f"jobenv={json.dumps(jobenv, indent=4, sort_keys=True)}")
    log.critical(
        "\n".join(
            [
                "",
                "Run:",
                "",
                f"    source {dotenv_file.as_posix()}",
                "",
                "to set these environment variables.",
                "",
            ]
        )
    )
    return jobenv

loadjobenv_from_yaml(jobdir=None, idx=-1)

Load a job environment dictionary from YAML stored in jobdir.

Source code in src/ezpz/jobs.py
def loadjobenv_from_yaml(
    jobdir: Optional[str | Path] = None,  # type:ignore[reportDeprecated]
    idx: Optional[int] = -1,
) -> dict[str, str]:
    """Load a job environment dictionary from YAML stored in ``jobdir``."""
    jobdir = Path(get_jobdir_from_jobslog(idx) if jobdir is None else jobdir)
    assert jobdir.is_dir()
    if len((jobenv_files_yaml := list(jobdir.rglob("*.yaml")))) == 0:
        raise FileNotFoundError(f"Unable to find `.yaml` file(s) in `{jobdir=}`")
    jobenv_file = jobenv_files_yaml[0]
    with jobenv_file.open("r") as stream:
        jobenv = dict(yaml.safe_load(stream))
    return jobenv

main(hostfile=None)

Entry point that captures the current job environment and saves it.

Source code in src/ezpz/jobs.py
def main(
    hostfile: Optional[str] = None,
    # max_hosts_to_print: Optional[int] = None,
):
    """Entry point that captures the current job environment and saves it."""
    # scheduler = get_scheduler()
    # from ezpz.dist import get_dist_info
    # dinfo = get_dist_info(
    #     hostfile=hostfile,
    #     max_hosts_to_print=max_hosts_to_print,
    # )
    # if scheduler.lower() == 'pbs':
    #     from ezpz.dist import get_pbs_launch_cmd
    #     dinfo |= {'LAUNCH_CMD': get_pbs_launch_cmd(hostfile=hostfile)}
    # log.info(
    #     '\n'.join(
    #         ['\n', "[DIST_INFO]:"]
    #         + [f"  • {k}={v}" for k, v in dinfo.items()]
    #         + ['\n']
    #     )
    # )
    # line = None
    # last_jobdir = None
    # jobenv_file_sh = None
    # if hostfile is None:
    #     PBS_JOBID = os.environ.get('PBS_JOBID')
    #     pbsnf = Path(os.environ.get('PBS_NODEFILE', ''))
    #     if (PBS_JOBID is not None and pbsnf.is_file()):
    #         log.info(f'Caught {PBS_JOBID=}, {pbsnf=} from env. Saving jobenv!')
    #         savejobenv(verbose=False)
    #     else:
    #         log.info('Didnt catch PBS_JOBID in env, loading jobenv!')
    #         _ = loadjobenv()
    # else:
    # _ = get_launch_cmd(verbose=True)
    try:
        savejobenv(
            framework="pytorch",
            hostfile=hostfile,
            # max_hosts_to_print=max_hosts_to_print,
            verbose=True,
            verbose_dist_info=True,
            print_jobenv=False,
            verbose_dotenv=False,
            verbose_get_jobenv=False,
        )
    except Exception as exc:
        log.exception(exc)

save_to_dotenv_file(jobenv=None, hostfile=None, verbose=None)

Write a .jobenv file capturing scheduler environment variables.

Source code in src/ezpz/jobs.py
def save_to_dotenv_file(
    jobenv: Optional[dict[str, str]] = None,  # type:ignore[reportDeprecated]
    hostfile: Optional[Union[str, Path]] = None,
    verbose: Optional[bool] = None,
) -> Path:
    """Write a ``.jobenv`` file capturing scheduler environment variables."""
    jobenv = get_jobenv(hostfile=hostfile) if jobenv is None else jobenv
    denvf1 = Path(get_jobdir_from_env()).joinpath(".jobenv")
    denvf2 = Path(os.getcwd()).joinpath(".jobenv")
    # launch_cmd = jobenv.get('LAUNCH_CMD', get_pbs_launch_cmd)
    from ezpz.pbs import get_pbs_launch_cmd

    launch_cmd = get_pbs_launch_cmd(hostfile=hostfile)
    assert launch_cmd is not None
    for denvf in [denvf1, denvf2]:
        log.info(" ".join(["Saving job env to", f"{denvf.parent.as_posix()}/.jobenv"]))
        # launch_cmd = jobenv.get(
        #         'LAUNCH_CMD',
        #         get_jobenv().get('LAUNCH_CMD', '')
        # )
        # if launch_cmd is not None:
        # _ = f.write(f'echo "creating alias launch={launch_cmd}"\n')
        with denvf.open("w") as f:
            _ = f.write("#!/bin/bash --login\n")
            for key, val in jobenv.items():
                _ = f.write(f'{key.upper()}="{val}"\n')
            _ = f.write(f'alias launch="{launch_cmd}"\n')
            _ = f.write('echo "$(which launch)"\n')
    # log.warning(' '.join([
    #     f'To use `launch` alias, be sure to: ',
    #     f'`source {denvf2.as_posix()}'
    # ]))
    if verbose:
        log.critical(
            "\n".join(
                [
                    "",
                    "Run:",
                    "",
                    f"    source {denvf2.relative_to(os.getcwd()).as_posix()}",
                    "",
                    "to set environment variables.",
                    "",
                    # "Then, running :"
                    # "    run_cmd=${LAUNCH_CMD} <cmd>\"'",
                    # "    3. 'eval \"${run_cmd}\"'",
                    # # "2. 'launch <cmd>'",
                    # # "  or, 'eval ${LAUNCH_CMD}'",
                    # # "3. , 'echo ${LAUNCH_CMD}'",
                    # "",
                    # f"{name}='{lcmd}'",
                    'Then, running `echo "${LAUNCH_CMD}"` should print:',
                    "",
                    f"    {launch_cmd}",
                    "",
                    # "  ()"
                ]
            )
        )
    return denvf2

savejobenv(verbose=None, framework=None, hostfile=None, print_jobenv=None, verbose_dotenv=None, verbose_get_jobenv=None, verbose_dist_info=None, verbose_pbs_env=None)

Persist job metadata to disk and optionally echo the environment.

Source code in src/ezpz/jobs.py
def savejobenv(
    verbose: Optional[bool] = None,
    framework: Optional[str] = None,
    hostfile: Optional[Union[str, Path]] = None,
    # max_hosts_to_print: Optional[int] = None,
    print_jobenv: Optional[bool] = None,
    verbose_dotenv: Optional[bool] = None,
    verbose_get_jobenv: Optional[bool] = None,
    verbose_dist_info: Optional[bool] = None,
    verbose_pbs_env: Optional[bool] = None,
):
    """Persist job metadata to disk and optionally echo the environment."""
    jobenv: dict[str, Any] = get_jobenv(
        verbose=verbose_get_jobenv,
        hostfile=hostfile,
        framework=framework,
        # max_hosts_to_print=max_hosts_to_print,
        verbose_pbs_env=verbose_pbs_env,
        verbose_dist_info=verbose_dist_info,
    )
    assert len(jobenv.keys()) > 0
    # assert jobenv is not None
    dotenv_file = save_to_dotenv_file(
        jobenv=jobenv, hostfile=hostfile, verbose=verbose_dotenv
    )
    # jobid = get_jobid()
    pbs_jobid = os.environ.get("PBS_JOBID")
    pbs_nodefile = Path(os.environ.get("PBS_NODEFILE", ""))
    if hostfile is None and pbs_jobid is not None and pbs_nodefile.is_file:
        jobdir = get_jobdir_from_env()
        assert jobdir is not None
        log.info(
            " ".join(
                [
                    f"Caught {pbs_jobid=}, {pbs_nodefile=} from env.",
                    "Saving jobenv!",
                ]
            )
        )
        # -------------------------------------------------------------------
        # Append {jobdir} as a new line at the end of ~/{scheduler}-jobs.log
        # where:
        #   jobdir = Path.home() / f'{scheduler}-jobs' / f'{jobid}'
        add_to_jobslog()
        # -------------------------------------------------------------------
        # Save {scheduler}-related environment variables to
        # `{.sh,.yaml,.json}` files INSIDE {jobdir}
        # for easy loading in other processes
        log.info(
            " ".join(
                [
                    f"Writing {SCHEDULER} env vars to ",
                    f"{jobdir} / jobenv" + "{.sh, .yaml, .json}",
                ]
            )
        )
        jobenv = savejobenv_sh(jobenv)
        jobenv = savejobenv_json(jobenv)
        jobenv = savejobenv_yaml(jobenv)
    for key, val in jobenv.items():
        os.environ[key] = f"{val}"
    if print_jobenv:
        log.info(f"jobenv={json.dumps(jobenv, indent=4, sort_keys=True)}")
    if verbose:
        log.critical(
            "\n".join(
                [
                    "",
                    "Run:",
                    "",
                    f"    source {dotenv_file.relative_to(os.getcwd()).as_posix()}",
                    "",
                    "to set these environment variables.",
                    "",
                ]
            )
        )

savejobenv_json(jobenv=None)

Write jobenv.json containing the captured job metadata.

Source code in src/ezpz/jobs.py
def savejobenv_json(
    jobenv: Optional[dict[str, str]] = None,  # type:ignore[reportDeprecated]
) -> dict[str, str]:
    """Write ``jobenv.json`` containing the captured job metadata."""
    jobenv = get_jobenv() if jobenv is None else jobenv
    assert len(jobenv.keys()) > 0
    jobfile_json = get_jobfile_json()
    jobenv |= {"jobfile_json": jobfile_json.as_posix()}
    log.info(f"Saving job env to {jobfile_json}")
    with jobfile_json.open("w") as f:
        json.dump(json.dumps(jobenv, indent=4), f)
    return jobenv

savejobenv_sh(jobenv=None)

Write jobenv.sh exporting scheduler environment variables.

Source code in src/ezpz/jobs.py
def savejobenv_sh(
    jobenv: Optional[dict[str, str]] = None,  # type:ignore[reportDeprecated]
) -> dict[str, str]:
    """Write ``jobenv.sh`` exporting scheduler environment variables."""
    jobenv = get_jobenv() if jobenv is None else jobenv
    jobfile_sh = get_jobfile_sh()
    jobenv |= {"jobfile_sh": jobfile_sh.as_posix()}
    launch_cmd = jobenv.get("LAUNCH_CMD")
    log.info(f"Saving job env to {jobfile_sh}")
    with jobfile_sh.open("w") as f:
        _ = f.write("#!/bin/bash --login\n")
        for key, val in jobenv.items():
            _ = f.write(f'export {key.upper()}="{val}"\n')
        if launch_cmd is not None:
            _ = f.write(f'alias launch="{launch_cmd}"')
    return jobenv

savejobenv_yaml(jobenv=None)

Write jobenv.yaml containing the captured job metadata.

Source code in src/ezpz/jobs.py
def savejobenv_yaml(
    jobenv: Optional[dict[str, str]] = None,  # type:ignore[reportDeprecated]
) -> dict[str, str]:
    """Write ``jobenv.yaml`` containing the captured job metadata."""
    jobenv = get_jobenv() if jobenv is None else jobenv
    assert len(jobenv.keys()) > 0
    jobfile_yaml = get_jobfile_yaml()
    jobenv |= {"jobfile_yaml": jobfile_yaml.as_posix()}
    log.info(f"Saving job env to {jobfile_yaml}")
    with jobfile_yaml.open("w") as f:
        yaml.dump(jobenv, f)
    return jobenv

write_launch_shell_script()

Create ~/.local/bin/launch.sh exporting the launch alias.

Source code in src/ezpz/jobs.py
def write_launch_shell_script():
    """Create ``~/.local/bin/launch.sh`` exporting the launch alias."""
    contents = """
    #!/bin/bash --login\n
    \n
    alias launch="${LAUNCH}"\n
    echo $(which launch)\n
    \n
    function ezLaunch() {\n
        launch "$@"\n
    }\n
    """
    local_bin = Path().home().joinpath(".local", "bin")
    local_bin.mkdir(exist_ok=True, parents=True)
    launch_file = local_bin.joinpath("launch.sh")
    # launch_file.chmod(launch_file.stat().st_mode | stat.S_IEXEC)
    log.info(f"Saving launch command to {launch_file} and adding to PATH")
    with launch_file.open("w") as f:
        _ = f.write(contents)
    os.chmod(path=launch_file, mode=755)
    path = os.environ.get("PATH")
    path = f"{path}:$HOME/.local/bin"
    os.environ["PATH"] = f"{path}"

Usage Examples

Capture the Current Job Environment

1
2
3
4
import ezpz.jobs as jobs

# Persist metadata to ~/.local/bin/launch.sh and jobenv.{sh,json,yaml}
jobs.savejobenv(verbose=True)

Load the Most Recent Job Environment

1
2
3
4
import ezpz.jobs as jobs

jobenv = jobs.loadjobenv()
print(jobenv["PBS_JOBID"])