Skip to content

ezpz.utils

Utility functions for the ezpz package.

Overview

The ezpz.utils module provides various utility functions for common tasks such as:

  • Debugging and breakpoint management
  • Timestamp generation and formatting
  • String normalization and formatting
  • Tensor/array conversion utilities
  • Memory monitoring
  • Model summary generation
  • Data serialization and deserialization

Key Functions

ezpz/utils/init.py

DistributedPdb

Bases: Pdb

Supports using PDB from inside a multiprocessing child process.

Usage: DistributedPdb().set_trace()

Source code in src/ezpz/utils/__init__.py
class DistributedPdb(pdb.Pdb):
    """
    Supports using PDB from inside a multiprocessing child process.

    Usage:
    DistributedPdb().set_trace()
    """

    def interaction(self, *args, **kwargs):
        _stdin = sys.stdin
        try:
            sys.stdin = open("/dev/stdin")
            pdb.Pdb.interaction(self, *args, **kwargs)
        finally:
            sys.stdin = _stdin

breakpoint(rank=0)

Set a breakpoint, but only on a single rank. All other ranks will wait for you to be done with the breakpoint before continuing.

Parameters:

Name Type Description Default
rank int

Which rank to break on. Default: 0

0
Source code in src/ezpz/utils/__init__.py
def breakpoint(rank: int = 0):
    """
    Set a breakpoint, but only on a single rank.  All other ranks will wait for you to be
    done with the breakpoint before continuing.

    Args:
        rank (int): Which rank to break on.  Default: ``0``
    """
    if ezpz.get_rank() == rank:
        pdb = DistributedPdb()
        pdb.message(
            "\n!!! ATTENTION !!!\n\n"
            f"Type 'up' to get to the frame that called dist.breakpoint(rank={rank})\n"
        )
        pdb.set_trace()
    torch.distributed.barrier()

format_pair(k, v, precision=6)

Format a key-value pair as a string.

Formats a key-value pair where the value can be an integer, boolean, or float. Integers and booleans are formatted without decimal places, while floats are formatted with the specified precision.

Parameters:

Name Type Description Default
k str

The key/name of the parameter.

required
v ScalarLike

The value to format (int, bool, float, or numpy scalar).

required
precision int

Number of decimal places for float values. Defaults to 6.

6

Returns:

Name Type Description
str str

Formatted key-value pair string in the format "key=value".

Example

format_pair("lr", 0.001) 'lr=0.001000' format_pair("epochs", 10) 'epochs=10' format_pair("verbose", True) 'verbose=True'

Source code in src/ezpz/utils/__init__.py
def format_pair(k: str, v: ScalarLike, precision: int = 6) -> str:
    """Format a key-value pair as a string.

    Formats a key-value pair where the value can be an integer, boolean, or float.
    Integers and booleans are formatted without decimal places, while floats are
    formatted with the specified precision.

    Args:
        k (str): The key/name of the parameter.
        v (ScalarLike): The value to format (int, bool, float, or numpy scalar).
        precision (int, optional): Number of decimal places for float values.
            Defaults to 6.

    Returns:
        str: Formatted key-value pair string in the format "key=value".

    Example:
        >>> format_pair("lr", 0.001)
        'lr=0.001000'
        >>> format_pair("epochs", 10)
        'epochs=10'
        >>> format_pair("verbose", True)
        'verbose=True'
    """
    if isinstance(v, (int, bool, np.integer)):
        # return f'{k}={v:<3}'
        return f"{k}={v}"
    # return f'{k}={v:<3.4f}'
    return f"{k}={v:<.{precision}f}"

get_bf16_config_json(enabled=True)

Get the deepspeed bf16 config json.

Parameters:

Name Type Description Default
enabled bool

Whether to use bf16. Default: True.

True

Returns:

Name Type Description
dict dict

Deepspeed bf16 config.

Source code in src/ezpz/utils/__init__.py
def get_bf16_config_json(
    enabled: bool = True,
) -> dict:
    """
    Get the deepspeed bf16 config json.

    Args:
        enabled (bool): Whether to use bf16. Default: ``True``.

    Returns:
        dict: Deepspeed bf16 config.
    """
    return {"enabled": enabled}

get_deepspeed_adamw_optimizer_config_json(auto_config=True)

Get the deepspeed adamw optimizer config json.

Parameters:

Name Type Description Default
auto_config bool

Whether to use the auto config. Default: True.

True

Returns:

Name Type Description
dict dict

Deepspeed adamw optimizer config.

Source code in src/ezpz/utils/__init__.py
def get_deepspeed_adamw_optimizer_config_json(
    auto_config: Optional[bool] = True,
) -> dict:
    """
    Get the deepspeed adamw optimizer config json.

    Args:
        auto_config (bool): Whether to use the auto config. Default: ``True``.

    Returns:
        dict: Deepspeed adamw optimizer config.
    """
    return (
        {"type": "AdamW"}
        if not auto_config
        else {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "weight_decay": "auto",
                "torch_adam": True,
                "adam_w_mode": True,
            },
        }
    )

get_deepspeed_config_json(auto_config=True, gradient_accumulation_steps=1, gradient_clipping='auto', steps_per_print=10, train_batch_size='auto', train_micro_batch_size_per_gpu='auto', wall_clock_breakdown=False, wandb=True, bf16=True, fp16=None, flops_profiler=None, optimizer=None, scheduler=None, zero_optimization=None, stage=0, allgather_partitions=None, allgather_bucket_size=int(500000000.0), overlap_comm=None, reduce_scatter=True, reduce_bucket_size=int(500000000.0), contiguous_gradients=None, offload_param=None, offload_optimizer=None, stage3_max_live_parameters=int(1000000000.0), stage3_max_reuse_distance=int(1000000000.0), stage3_prefetch_bucket_size=int(500000000.0), stage3_param_persistence_threshold=int(1000000.0), sub_group_size=None, elastic_checkpoint=None, stage3_gather_16bit_weights_on_model_save=None, ignore_unused_parameters=None, round_robin_gradients=None, zero_hpz_partition_size=None, zero_quantized_weights=None, zero_quantized_gradients=None, log_trace_cache_warnings=None, save_config=True, output_file=None, output_dir=None)

Write a deepspeed config to the output directory.

Source code in src/ezpz/utils/__init__.py
def get_deepspeed_config_json(
    auto_config: Optional[bool] = True,
    gradient_accumulation_steps: int = 1,
    gradient_clipping: Optional[str | float] = "auto",
    steps_per_print: Optional[int] = 10,
    train_batch_size: str = "auto",
    train_micro_batch_size_per_gpu: str = "auto",
    wall_clock_breakdown: bool = False,
    wandb: bool = True,  # NOTE: Opinionated, W&B is enabled by default
    bf16: bool = True,  # NOTE: Opinionated, BF16 is enabled by default
    fp16: Optional[bool] = None,
    flops_profiler: Optional[dict] = None,
    optimizer: Optional[dict] = None,
    scheduler: Optional[dict] = None,
    zero_optimization: Optional[dict] = None,
    stage: Optional[int] = 0,
    allgather_partitions: Optional[bool] = None,
    allgather_bucket_size: Optional[int] = int(5e8),
    overlap_comm: Optional[bool] = None,
    reduce_scatter: Optional[bool] = True,
    reduce_bucket_size: Optional[int] = int(5e8),
    contiguous_gradients: Optional[bool] = None,
    offload_param: Optional[dict] = None,
    offload_optimizer: Optional[dict] = None,
    stage3_max_live_parameters: Optional[int] = int(1e9),
    stage3_max_reuse_distance: Optional[int] = int(1e9),
    stage3_prefetch_bucket_size: Optional[int] = int(5e8),
    stage3_param_persistence_threshold: Optional[int] = int(1e6),
    sub_group_size: Optional[int] = None,
    elastic_checkpoint: Optional[dict] = None,
    stage3_gather_16bit_weights_on_model_save: Optional[bool] = None,
    ignore_unused_parameters: Optional[bool] = None,
    round_robin_gradients: Optional[bool] = None,
    zero_hpz_partition_size: Optional[int] = None,
    zero_quantized_weights: Optional[bool] = None,
    zero_quantized_gradients: Optional[bool] = None,
    log_trace_cache_warnings: Optional[bool] = None,
    save_config: bool = True,
    output_file: Optional[str] = None,
    output_dir: Optional[PathLike] = None,
) -> dict:
    """
    Write a deepspeed config to the output directory.
    """
    import json

    wandb_config = {"enabled": wandb}
    bf16_config = {"enabled": bf16}
    fp16_config = {"enabled": fp16}
    flops_profiler_config = (
        get_flops_profiler_config_json() if flops_profiler is None else flops_profiler
    )

    optimizer = (
        get_deepspeed_adamw_optimizer_config_json() if optimizer is None else optimizer
    )
    scheduler = (
        get_deepspeed_warmup_decay_scheduler_config_json()
        if scheduler is None
        else scheduler
    )

    if stage is not None and int(stage) > 0:
        zero_optimization = (
            get_deepspeed_zero_config_json(
                stage=stage,
                allgather_partitions=allgather_partitions,
                allgather_bucket_size=allgather_bucket_size,
                overlap_comm=overlap_comm,
                reduce_scatter=reduce_scatter,
                reduce_bucket_size=reduce_bucket_size,
                contiguous_gradients=contiguous_gradients,
                offload_param=offload_param,
                offload_optimizer=offload_optimizer,
                stage3_max_live_parameters=stage3_max_live_parameters,
                stage3_max_reuse_distance=stage3_max_reuse_distance,
                stage3_prefetch_bucket_size=stage3_prefetch_bucket_size,
                stage3_param_persistence_threshold=stage3_param_persistence_threshold,
                sub_group_size=sub_group_size,
                elastic_checkpoint=elastic_checkpoint,
                stage3_gather_16bit_weights_on_model_save=stage3_gather_16bit_weights_on_model_save,
                ignore_unused_parameters=ignore_unused_parameters,
                round_robin_gradients=round_robin_gradients,
                zero_hpz_partition_size=zero_hpz_partition_size,
                zero_quantized_weights=zero_quantized_weights,
                zero_quantized_gradients=zero_quantized_gradients,
                log_trace_cache_warnings=log_trace_cache_warnings,
            )
            if zero_optimization is None
            else zero_optimization
        )
    else:
        zero_optimization = None
    ds_config = {
        "gradient_accumulation_steps": gradient_accumulation_steps,
        "gradient_clipping": gradient_clipping,
        "steps_per_print": steps_per_print,
        "train_batch_size": train_batch_size,
        "train_micro_batch_size_per_gpu": train_micro_batch_size_per_gpu,
        "wall_clock_breakdown": wall_clock_breakdown,
        "wandb": wandb,
        "bf16": bf16,
        "fp16": fp16,
        "flops_profiler": flops_profiler,
        "optimizer": optimizer,
        "scheduler": scheduler,
        "zero_optimization": zero_optimization,
    }
    if save_config:
        if output_file is None:
            if output_dir is None:
                output_dir = Path(os.getcwd()).joinpath("ds_configs")
            output_dir = Path(output_dir)
            output_dir.mkdir(exist_ok=True, parents=True)
            outfile = output_dir.joinpath("deepspeed_config.json")
        else:
            outfile = Path(output_file)
        logger.info(f"Saving DeepSpeed config to: {outfile.as_posix()}")
        logger.info(json.dumps(ds_config, indent=4))
        with outfile.open("w") as f:
            json.dump(
                ds_config,
                fp=f,
                indent=4,
            )

    return ds_config

get_deepspeed_warmup_decay_scheduler_config_json(auto_config=True)

Get the deepspeed warmup decay scheduler config json.

Parameters:

Name Type Description Default
auto_config bool

Whether to use the auto config. Default: True.

True

Returns:

Name Type Description
dict dict

Deepspeed warmup decay scheduler config.

Source code in src/ezpz/utils/__init__.py
def get_deepspeed_warmup_decay_scheduler_config_json(
    auto_config: Optional[bool] = True,
) -> dict:
    """
    Get the deepspeed warmup decay scheduler config json.

    Args:
        auto_config (bool): Whether to use the auto config. Default: ``True``.

    Returns:
        dict: Deepspeed warmup decay scheduler config.
    """
    return (
        {"type": "WarmupDecayLR"}
        if not auto_config
        else {
            "type": "WarmupDecayLR",
            "params": {
                "warmup_min_lr": "auto",
                "warmup_max_lr": "auto",
                "warmup_num_steps": "auto",
                "total_num_steps": "auto",
            },
        }
    )

get_deepspeed_zero_config_json(zero_config)

Return the DeepSpeed zero config as a dict.

Source code in src/ezpz/utils/__init__.py
def get_deepspeed_zero_config_json(zero_config: ZeroConfig) -> dict:
    """Return the DeepSpeed zero config as a dict."""
    return asdict(zero_config)

get_flops_profiler_config_json(enabled=True, profile_step=1, module_depth=-1, top_modules=1, detailed=True)

Get the deepspeed flops profiler config json.

Parameters:

Name Type Description Default
enabled bool

Whether to use the flops profiler. Default: True.

True
profile_step int

The step to profile. Default: 1.

1
module_depth int

The depth of the module. Default: -1.

-1
top_modules int

The number of top modules to show. Default: 1.

1
detailed bool

Whether to show detailed profiling. Default: True.

True

Returns:

Name Type Description
dict dict

Deepspeed flops profiler config.

Source code in src/ezpz/utils/__init__.py
def get_flops_profiler_config_json(
    enabled: bool = True,
    profile_step: int = 1,
    module_depth: int = -1,
    top_modules: int = 1,
    detailed: bool = True,
) -> dict:
    """
    Get the deepspeed flops profiler config json.

    Args:
        enabled (bool): Whether to use the flops profiler. Default: ``True``.
        profile_step (int): The step to profile. Default: ``1``.
        module_depth (int): The depth of the module. Default: ``-1``.
        top_modules (int): The number of top modules to show. Default: ``1``.
        detailed (bool): Whether to show detailed profiling. Default: ``True``.

    Returns:
        dict: Deepspeed flops profiler config.
    """
    return {
        "enabled": enabled,
        "profile_step": profile_step,
        "module_depth": module_depth,
        "top_modules": top_modules,
        "detailed": detailed,
    }

get_fp16_config_json(enabled=True)

Get the deepspeed fp16 config json.

Parameters:

Name Type Description Default
enabled bool

Whether to use fp16. Default: True.

True

Returns:

Name Type Description
dict

Deepspeed fp16 config.

Source code in src/ezpz/utils/__init__.py
def get_fp16_config_json(
    enabled: bool = True,
):
    """
    Get the deepspeed fp16 config json.

    Args:
        enabled (bool): Whether to use fp16. Default: ``True``.

    Returns:
        dict: Deepspeed fp16 config.
    """
    return {"enabled": enabled}

get_max_memory_allocated(device)

Get the maximum memory allocated on the specified device.

Parameters:

Name Type Description Default
device device

The device to check memory allocation for.

required
Source code in src/ezpz/utils/__init__.py
def get_max_memory_allocated(device: torch.device) -> float:
    """
    Get the maximum memory allocated on the specified device.

    Args:
        device (torch.device): The device to check memory allocation for.
    """
    if torch.cuda.is_available():
        return torch.cuda.max_memory_allocated(device)
    elif torch.xpu.is_available():  #  and ipex is not None:
        try:
            import intel_extension_for_pytorch as ipex

            return ipex.xpu.max_memory_allocated(device)
        except ImportError:
            return -1.0
    raise RuntimeError(f"Memory allocation not available for {device=}")

get_timestamp(fstr=None)

Get formatted timestamp.

Returns the current date and time as a formatted string. By default, returns a timestamp in the format 'YYYY-MM-DD-HHMMSS'. A custom format string can be provided to change the output format.

Parameters:

Name Type Description Default
fstr str

Format string for strftime. If None, uses default format '%Y-%m-%d-%H%M%S'. Defaults to None.

None

Returns:

Name Type Description
str str

Formatted timestamp string.

Example

get_timestamp() # Returns something like '2023-12-01-143022' get_timestamp("%Y-%m-%d") # Returns something like '2023-12-01'

Source code in src/ezpz/utils/__init__.py
def get_timestamp(fstr: Optional[str] = None) -> str:
    """Get formatted timestamp.

    Returns the current date and time as a formatted string. By default, returns
    a timestamp in the format 'YYYY-MM-DD-HHMMSS'. A custom format string can
    be provided to change the output format.

    Args:
        fstr (str, optional): Format string for strftime. If None, uses default
            format '%Y-%m-%d-%H%M%S'. Defaults to None.

    Returns:
        str: Formatted timestamp string.

    Example:
        >>> get_timestamp()  # Returns something like '2023-12-01-143022'
        >>> get_timestamp("%Y-%m-%d")  # Returns something like '2023-12-01'
    """
    import datetime

    now = datetime.datetime.now()
    return now.strftime("%Y-%m-%d-%H%M%S") if fstr is None else now.strftime(fstr)

grab_tensor(x, force=False)

Convert various tensor/array-like objects to numpy arrays.

This function converts different types of array-like objects (tensors, lists, etc.) to numpy arrays for consistent handling. Supports PyTorch tensors, numpy arrays, and nested lists.

Parameters:

Name Type Description Default
x Any

The object to convert to a numpy array. Can be None, scalar values, lists, numpy arrays, or PyTorch tensors.

required
force bool

Force conversion even if it requires copying data. Defaults to False.

False

Returns:

Type Description
Union[ndarray, ScalarLike, None]

Union[np.ndarray, ScalarLike, None]: Numpy array representation of the input, or the original scalar value, or None if input was None.

Raises:

Type Description
ValueError

If unable to convert a list to array.

Example

import torch import numpy as np grab_tensor([1, 2, 3]) array([1, 2, 3]) grab_tensor(torch.tensor([1, 2, 3])) array([1, 2, 3]) grab_tensor(np.array([1, 2, 3])) array([1, 2, 3])

Source code in src/ezpz/utils/__init__.py
def grab_tensor(x: Any, force: bool = False) -> Union[np.ndarray, ScalarLike, None]:
    """Convert various tensor/array-like objects to numpy arrays.

    This function converts different types of array-like objects (tensors, lists, etc.)
    to numpy arrays for consistent handling. Supports PyTorch tensors, numpy arrays,
    and nested lists.

    Args:
        x (Any): The object to convert to a numpy array. Can be None, scalar values,
            lists, numpy arrays, or PyTorch tensors.
        force (bool, optional): Force conversion even if it requires copying data.
            Defaults to False.

    Returns:
        Union[np.ndarray, ScalarLike, None]: Numpy array representation of the input,
            or the original scalar value, or None if input was None.

    Raises:
        ValueError: If unable to convert a list to array.

    Example:
        >>> import torch
        >>> import numpy as np
        >>> grab_tensor([1, 2, 3])
        array([1, 2, 3])
        >>> grab_tensor(torch.tensor([1, 2, 3]))
        array([1, 2, 3])
        >>> grab_tensor(np.array([1, 2, 3]))
        array([1, 2, 3])
    """
    if x is None:
        return None
    if isinstance(x, (int, float, bool, np.floating)):
        return x
    if isinstance(x, list):
        if isinstance(x[0], torch.Tensor):
            return grab_tensor(torch.stack(x))
        if isinstance(x[0], np.ndarray):
            return np.stack(x)
        if isinstance(x[0], (int, float, bool, np.floating)):
            return np.array(x)
        else:
            raise ValueError(f"Unable to convert list: \n {x=}\n to array")
        # else:
        #     try:
        #         import tensorflow as tf  # type:ignore
        #     except (ImportError, ModuleNotFoundError) as exc:
        #         raise exc
        #     if isinstance(x[0], tf.Tensor):
        #         return grab_tensor(tf.stack(x))
    elif isinstance(x, np.ndarray):
        return x
    elif isinstance(x, torch.Tensor):
        return x.numpy(force=force)
        # return x.detach().cpu().numpy()
    elif callable(getattr(x, "numpy", None)):
        assert callable(getattr(x, "numpy"))
        return x.numpy(force=force)
    breakpoint(0)

model_summary(model, verbose=False, depth=1, input_size=None)

Print a summary of the model using torchinfo.

Parameters:

Name Type Description Default
model

The model to summarize.

required
verbose bool

Whether to print the summary. Default: False.

False
depth int

The depth of the summary. Default: 1.

1
input_size Optional[Sequence[int]]

The input size for the model. Default: None.

None

Returns:

Type Description
ModelStatistics | None

ModelStatistics | None: The model summary if torchinfo is available, otherwise None.

Source code in src/ezpz/utils/__init__.py
def model_summary(
    model,
    verbose: bool = False,
    depth: int = 1,
    input_size: Optional[Sequence[int]] = None,
) -> ModelStatistics | None:
    """
    Print a summary of the model using torchinfo.

    Args:
        model: The model to summarize.
        verbose (bool): Whether to print the summary. Default: ``False``.
        depth (int): The depth of the summary. Default: ``1``.
        input_size (Optional[Sequence[int]]): The input size for the model. Default: ``None``.

    Returns:
        ModelStatistics | None: The model summary if torchinfo is available, otherwise None.
    """
    try:
        from torchinfo import summary

        return summary(
            model,
            input_size=input_size,
            depth=depth,
            verbose=verbose,
        )
        # logger.info(f'\n{summary_str}')

    except (ImportError, ModuleNotFoundError):
        logger.warning("torchinfo not installed, unable to print model summary!")

normalize(name)

Normalize a name by replacing special characters with dashes and converting to lowercase.

This function replaces hyphens, underscores, and periods with single dashes, then converts the result to lowercase.

Parameters:

Name Type Description Default
name str

The name to normalize.

required

Returns:

Name Type Description
str str

The normalized name with only lowercase letters, numbers, and dashes.

Example

normalize("Test_Name.Sub-Name") 'test-name-sub-name' normalize("example__file..name") 'example-file-name'

Source code in src/ezpz/utils/__init__.py
def normalize(name: str) -> str:
    """Normalize a name by replacing special characters with dashes and converting to lowercase.

    This function replaces hyphens, underscores, and periods with single dashes,
    then converts the result to lowercase.

    Args:
        name (str): The name to normalize.

    Returns:
        str: The normalized name with only lowercase letters, numbers, and dashes.

    Example:
        >>> normalize("Test_Name.Sub-Name")
        'test-name-sub-name'
        >>> normalize("example__file..name")
        'example-file-name'
    """
    return re.sub(r"[-_.]+", "-", name).lower()

summarize_dict(d, precision=6)

Summarize a dictionary into a string with formatted key-value pairs.

Parameters:

Name Type Description Default
d dict

The dictionary to summarize.

required
precision int

The precision for floating point values. Default: 6.

6

Returns:

Name Type Description
str str

A string representation of the dictionary with formatted key-value pairs.

Source code in src/ezpz/utils/__init__.py
def summarize_dict(d: dict, precision: int = 6) -> str:
    """
    Summarize a dictionary into a string with formatted key-value pairs.

    Args:
        d (dict): The dictionary to summarize.
        precision (int): The precision for floating point values. Default: ``6``.

    Returns:
        str: A string representation of the dictionary with formatted key-value pairs.
    """
    return " ".join([format_pair(k, v, precision=precision) for k, v in d.items()])

write_deepspeed_zero12_auto_config(zero_stage=1, output_dir=None)

Write a deepspeed zero1 auto config to the output directory.

Source code in src/ezpz/utils/__init__.py
def write_deepspeed_zero12_auto_config(
    zero_stage: int = 1, output_dir: Optional[PathLike] = None
) -> dict:
    """
    Write a deepspeed zero1 auto config to the output directory.
    """
    import json

    ds_config = {
        "gradient_accumulation_steps": 1,
        "gradient_clipping": "auto",
        "steps_per_print": 1,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": True,
        "wandb": {"enabled": True},
        "bf16": {"enabled": True},
        "flops_profiler": {
            "enabled": True,
            "profile_step": 1,
            "module_depth": -1,
            "top_modules": 1,
            "detailed": True,
        },
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "weight_decay": "auto",
                "torch_adam": True,
                "adam_w_mode": True,
            },
        },
        "scheduler": {
            "type": "WarmupDecayLR",
            "params": {
                "warmup_min_lr": "auto",
                "warmup_max_lr": "auto",
                "warmup_num_steps": "auto",
                "total_num_steps": "auto",
            },
        },
        "zero_optimization": {
            "stage": zero_stage,
            "allgather_partitions": True,
            "allgather_bucket_size": 2e8,
            "overlap_comm": True,
            "reduce_scatter": True,
            "reduce_bucket_size": "auto",
            "contiguous_gradients": True,
        },
    }
    if output_dir is None:
        output_dir = Path(os.getcwd()).joinpath("ds_configs")

    output_dir = Path(output_dir)
    output_dir.mkdir(exist_ok=True, parents=True)
    outfile = output_dir.joinpath(f"deepspeed_zero{zero_stage}_auto_config.json")
    logger.info(
        f"Saving DeepSpeed ZeRO Stage {zero_stage} "
        f"auto config to: {outfile.as_posix()}"
    )
    with outfile.open("w") as f:
        json.dump(
            ds_config,
            fp=f,
            indent=4,
        )

    return ds_config

write_deepspeed_zero3_auto_config(zero_stage=3, output_dir=None)

Write a deepspeed zero1 auto config to the output directory.

Source code in src/ezpz/utils/__init__.py
def write_deepspeed_zero3_auto_config(
    zero_stage: int = 3, output_dir: Optional[PathLike] = None
) -> dict:
    """
    Write a deepspeed zero1 auto config to the output directory.
    """
    import json

    ds_config = {
        "gradient_accumulation_steps": 1,
        "gradient_clipping": "auto",
        "steps_per_print": 1,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": True,
        "wandb": {"enabled": True},
        "bf16": {"enabled": True},
        "flops_profiler": {
            "enabled": True,
            "profile_step": 1,
            "module_depth": -1,
            "top_modules": 1,
            "detailed": True,
        },
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "weight_decay": "auto",
                "torch_adam": True,
                "adam_w_mode": True,
            },
        },
        "scheduler": {
            "type": "WarmupDecayLR",
            "params": {
                "warmup_min_lr": "auto",
                "warmup_max_lr": "auto",
                "warmup_num_steps": "auto",
                "total_num_steps": "auto",
            },
        },
        "zero_optimization": {
            "stage": zero_stage,
            "allgather_partitions": True,
            "allgather_bucket_size": 2e8,
            "overlap_comm": True,
            "reduce_scatter": True,
            "reduce_bucket_size": "auto",
            "contiguous_gradients": True,
        },
    }
    if output_dir is None:
        output_dir = Path(os.getcwd()).joinpath("ds_configs")

    output_dir = Path(output_dir)
    output_dir.mkdir(exist_ok=True, parents=True)
    outfile = output_dir.joinpath(f"deepspeed_zero{zero_stage}_auto_config.json")
    logger.info(
        f"Saving DeepSpeed ZeRO Stage {zero_stage} "
        f"auto config to: {outfile.as_posix()}"
    )
    with outfile.open("w") as f:
        json.dump(
            ds_config,
            fp=f,
            indent=4,
        )

    return ds_config

write_generic_deepspeed_config(gradient_accumulation_steps=1, gradient_clipping='auto', steps_per_print=10, train_batch_size='auto', train_micro_batch_size_per_gpu='auto', wall_clock_breakdown=False, wandb=None, bf16=None, fp16=None, flops_profiler=None, optimizer=None, scheduler=None, zero_optimization=None)

Write a generic deepspeed config to the output directory.

Source code in src/ezpz/utils/__init__.py
def write_generic_deepspeed_config(
    gradient_accumulation_steps: int = 1,
    gradient_clipping: str | float = "auto",
    steps_per_print: int = 10,
    train_batch_size: str = "auto",
    train_micro_batch_size_per_gpu: str = "auto",
    wall_clock_breakdown: bool = False,
    wandb: Optional[dict] = None,
    bf16: Optional[dict] = None,
    fp16: Optional[dict] = None,
    flops_profiler: Optional[dict] = None,
    optimizer: Optional[dict] = None,
    scheduler: Optional[dict] = None,
    zero_optimization: Optional[dict] = None,
):
    """
    Write a generic deepspeed config to the output directory.
    """
    ds_config = {
        "gradient_accumulation_steps": gradient_accumulation_steps,
        "gradient_clipping": gradient_clipping,
        "steps_per_print": steps_per_print,
        "train_batch_size": train_batch_size,
        "train_micro_batch_size_per_gpu": train_micro_batch_size_per_gpu,
        "wall_clock_breakdown": wall_clock_breakdown,
        "wandb": wandb,
        "bf16": bf16,
        "fp16": fp16,
        "flops_profiler": flops_profiler,
        "optimizer": optimizer,
        "scheduler": scheduler,
        "zero_optimization": zero_optimization,
    }
    return ds_config

Examples

Debugging with Distributed Breakpoints

1
2
3
4
import ezpz.utils as utils

# Set a breakpoint only on rank 0
utils.breakpoint(rank=0)

Timestamp Generation

1
2
3
4
5
6
7
8
9
import ezpz.utils as utils

# Get current timestamp
timestamp = utils.get_timestamp()
print(timestamp)  # Output: "2023-12-01-143022"

# Get timestamp with custom format
date_only = utils.get_timestamp("%Y-%m-%d")
print(date_only)  # Output: "2023-12-01"

String Normalization

1
2
3
4
5
import ezpz.utils as utils

# Normalize strings for consistent naming
name = utils.normalize("Test_Name.Sub-Name")
print(name)  # Output: "test-name-sub-name"

Tensor Conversion

import ezpz.utils as utils
import torch
import numpy as np

# Convert various tensor types to numpy arrays
torch_tensor = torch.tensor([1, 2, 3])
numpy_array = utils.grab_tensor(torch_tensor)
print(numpy_array)  # Output: [1 2 3]

# Convert lists to arrays
list_data = [1, 2, 3]
array_data = utils.grab_tensor(list_data)
print(array_data)  # Output: [1 2 3]