Skip to content

ezpz.examples.deepspeed.tp.train_batch_lengthΒΆ

DeepSpeed TP training script that pads sequences to max_length for benchmarking.

Launch with:

1
ezpz launch -m ezpz.examples.deepspeed.tp.train_batch_length --model_name_or_path facebook/opt-125m

Argparse help is available once optional dependencies (transformers/deepspeed) are installed:

1
python3 -m ezpz.examples.deepspeed.tp.train_batch_length --help

DataArguments dataclass ΒΆ

Data path configuration for supervised training.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
@dataclass
class DataArguments:
    """Data path configuration for supervised training."""

    data_path: str = field(
        default=None, metadata={"help": "Path to the training data."}
    )

DataCollatorForSupervisedDataset dataclass ΒΆ

Bases: object

Collate examples for supervised fine-tuning.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
@dataclass
class DataCollatorForSupervisedDataset(object):
    """Collate examples for supervised fine-tuning."""

    tokenizer: transformers.PreTrainedTokenizer

    def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
        """Pad and batch token/label tensors."""
        input_ids, labels = tuple(
            [instance[key] for instance in instances] for key in ("input_ids", "labels")
        )
        input_ids = torch.nn.utils.rnn.pad_sequence(
            input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
        )
        labels = torch.nn.utils.rnn.pad_sequence(
            labels, batch_first=True, padding_value=IGNORE_INDEX
        )
        return dict(
            input_ids=input_ids,
            labels=labels,
            attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
        )

__call__(instances) ΒΆ

Pad and batch token/label tensors.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
    """Pad and batch token/label tensors."""
    input_ids, labels = tuple(
        [instance[key] for instance in instances] for key in ("input_ids", "labels")
    )
    input_ids = torch.nn.utils.rnn.pad_sequence(
        input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
    )
    labels = torch.nn.utils.rnn.pad_sequence(
        labels, batch_first=True, padding_value=IGNORE_INDEX
    )
    return dict(
        input_ids=input_ids,
        labels=labels,
        attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
    )

ModelArguments dataclass ΒΆ

Model loading arguments for HF/DeepSpeed training.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
@dataclass
class ModelArguments:
    """Model loading arguments for HF/DeepSpeed training."""

    model_name_or_path: Optional[str] = field(default="facebook/opt-125m")

SupervisedDataset ΒΆ

Bases: Dataset

Dataset for supervised fine-tuning.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
class SupervisedDataset(Dataset):
    """Dataset for supervised fine-tuning."""

    def __init__(self, data_path: str, tokenizer: transformers.PreTrainedTokenizer):
        """Load, tokenize, and cache dataset for instruction tuning."""
        super(SupervisedDataset, self).__init__()
        logging.warning("Loading data...")
        list_data_dict = utils.jload(data_path)

        logging.warning("Formatting inputs...")
        prompt_input, prompt_no_input = (
            PROMPT_DICT["prompt_input"],
            PROMPT_DICT["prompt_no_input"],
        )
        sources = [
            prompt_input.format_map(example)
            if example.get("input", "") != ""
            else prompt_no_input.format_map(example)
            for example in list_data_dict
        ]
        targets = [
            f"{example['output']}{tokenizer.eos_token}" for example in list_data_dict
        ]

        # this is only for fast test.
        import os, pickle

        dataset_cache_name = "dataset_dict" + str(tokenizer.model_max_length) + ".pkl"
        if os.path.exists(dataset_cache_name):
            with open(dataset_cache_name, "rb") as file:
                data_dict = pickle.load(file)
                logging.warning("loaded dataset dict cache")
        else:
            logging.warning("Tokenizing inputs... This may take some time...")
            data_dict = preprocess(sources, targets, tokenizer)
            logging.warning("Saving dataset cache")
            with open(dataset_cache_name, "wb") as file:
                pickle.dump(data_dict, file)
            logging.warning("Finished saving")

        self.input_ids = data_dict["input_ids"]
        self.labels = data_dict["labels"]

    def __len__(self):
        """Return dataset size."""
        return len(self.input_ids)

    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        """Return one tokenized example."""
        return dict(input_ids=self.input_ids[i], labels=self.labels[i])

__getitem__(i) ΒΆ

Return one tokenized example.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def __getitem__(self, i) -> Dict[str, torch.Tensor]:
    """Return one tokenized example."""
    return dict(input_ids=self.input_ids[i], labels=self.labels[i])

__init__(data_path, tokenizer) ΒΆ

Load, tokenize, and cache dataset for instruction tuning.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def __init__(self, data_path: str, tokenizer: transformers.PreTrainedTokenizer):
    """Load, tokenize, and cache dataset for instruction tuning."""
    super(SupervisedDataset, self).__init__()
    logging.warning("Loading data...")
    list_data_dict = utils.jload(data_path)

    logging.warning("Formatting inputs...")
    prompt_input, prompt_no_input = (
        PROMPT_DICT["prompt_input"],
        PROMPT_DICT["prompt_no_input"],
    )
    sources = [
        prompt_input.format_map(example)
        if example.get("input", "") != ""
        else prompt_no_input.format_map(example)
        for example in list_data_dict
    ]
    targets = [
        f"{example['output']}{tokenizer.eos_token}" for example in list_data_dict
    ]

    # this is only for fast test.
    import os, pickle

    dataset_cache_name = "dataset_dict" + str(tokenizer.model_max_length) + ".pkl"
    if os.path.exists(dataset_cache_name):
        with open(dataset_cache_name, "rb") as file:
            data_dict = pickle.load(file)
            logging.warning("loaded dataset dict cache")
    else:
        logging.warning("Tokenizing inputs... This may take some time...")
        data_dict = preprocess(sources, targets, tokenizer)
        logging.warning("Saving dataset cache")
        with open(dataset_cache_name, "wb") as file:
            pickle.dump(data_dict, file)
        logging.warning("Finished saving")

    self.input_ids = data_dict["input_ids"]
    self.labels = data_dict["labels"]

__len__() ΒΆ

Return dataset size.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def __len__(self):
    """Return dataset size."""
    return len(self.input_ids)

TrainingArguments dataclass ΒΆ

Bases: TrainingArguments

Extended training arguments with padding/max length controls.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
@dataclass
class TrainingArguments(transformers.TrainingArguments):
    """Extended training arguments with padding/max length controls."""

    cache_dir: Optional[str] = field(default=None)
    optim: str = field(default="adamw_torch")
    model_max_length: int = field(
        default=512,
        metadata={
            "help": "Maximum sequence length. Sequences will be right padded (and possibly truncated)."
        },
    )

make_supervised_data_module(tokenizer, data_args) ΒΆ

Make dataset and collator for supervised fine-tuning.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def make_supervised_data_module(
    tokenizer: transformers.PreTrainedTokenizer, data_args
) -> Dict:
    """Make dataset and collator for supervised fine-tuning."""
    train_dataset = SupervisedDataset(
        tokenizer=tokenizer, data_path=data_args.data_path
    )
    data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)
    return dict(
        train_dataset=train_dataset, eval_dataset=None, data_collator=data_collator
    )

preprocess(sources, targets, tokenizer) ΒΆ

Preprocess the data by tokenizing.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def preprocess(
    sources: Sequence[str],
    targets: Sequence[str],
    tokenizer: transformers.PreTrainedTokenizer,
) -> Dict:
    """Preprocess the data by tokenizing."""
    examples = [s + t for s, t in zip(sources, targets)]
    examples_tokenized, sources_tokenized = [
        _tokenize_fn(strings, tokenizer) for strings in (examples, sources)
    ]
    input_ids = examples_tokenized["input_ids"]
    labels = copy.deepcopy(input_ids)
    for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]):
        label[:source_len] = IGNORE_INDEX
    for i, l in enumerate(labels):
        l_arr = l.numpy()
        labels[i] = torch.tensor(
            [
                elem if elem != tokenizer.pad_token_id else IGNORE_INDEX
                for elem in l_arr
            ],
            dtype=l.dtype,
            device=l.device,
        )
    return dict(input_ids=input_ids, labels=labels)

smart_tokenizer_and_embedding_resize(special_tokens_dict, tokenizer, model) ΒΆ

Resize tokenizer and embedding.

Note: This is the unoptimized version that may make your embedding size not be divisible by 64.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def smart_tokenizer_and_embedding_resize(
    special_tokens_dict: Dict,
    tokenizer: transformers.PreTrainedTokenizer,
    model: transformers.PreTrainedModel,
):
    """Resize tokenizer and embedding.

    Note: This is the unoptimized version that may make your embedding size not be divisible by 64.
    """
    num_new_tokens = tokenizer.add_special_tokens(special_tokens_dict)
    # this is noly for fast test.
    model.resize_token_embeddings(len(tokenizer), mean_resizing=False)

    if num_new_tokens > 0:
        input_embeddings = model.get_input_embeddings().weight.data
        output_embeddings = model.get_output_embeddings().weight.data

        input_embeddings_avg = input_embeddings[:-num_new_tokens].mean(
            dim=0, keepdim=True
        )
        output_embeddings_avg = output_embeddings[:-num_new_tokens].mean(
            dim=0, keepdim=True
        )

        input_embeddings[-num_new_tokens:] = input_embeddings_avg
        output_embeddings[-num_new_tokens:] = output_embeddings_avg

train() ΒΆ

Run supervised causal LM fine-tuning with DeepSpeed/transformers.

Source code in src/ezpz/examples/deepspeed/tp/train_batch_length.py
def train():
    """Run supervised causal LM fine-tuning with DeepSpeed/transformers."""
    parser = transformers.HfArgumentParser(
        (ModelArguments, DataArguments, TrainingArguments)
    )
    model_args, data_args, training_args = parser.parse_args_into_dataclasses()

    model = transformers.AutoModelForCausalLM.from_pretrained(
        model_args.model_name_or_path,
        cache_dir=training_args.cache_dir,
    )

    tokenizer = transformers.AutoTokenizer.from_pretrained(
        model_args.model_name_or_path,
        cache_dir=training_args.cache_dir,
        model_max_length=training_args.model_max_length,
        padding_side="right",
        use_fast=False,
    )
    special_tokens_dict = dict()
    if tokenizer.pad_token is None:
        special_tokens_dict["pad_token"] = DEFAULT_PAD_TOKEN
    if tokenizer.eos_token is None:
        special_tokens_dict["eos_token"] = DEFAULT_EOS_TOKEN
    if tokenizer.bos_token is None:
        special_tokens_dict["bos_token"] = DEFAULT_BOS_TOKEN
    if tokenizer.unk_token is None:
        special_tokens_dict["unk_token"] = DEFAULT_UNK_TOKEN

    smart_tokenizer_and_embedding_resize(
        special_tokens_dict=special_tokens_dict,
        tokenizer=tokenizer,
        model=model,
    )
    from transformers import TrainerCallback

    def see_memory_usage(message, force=False):
        """Log GPU/CPU memory usage for debugging."""
        import deepspeed.comm as dist
        import gc, psutil

        from deepspeed import get_accelerator

        if dist.is_initialized() and not dist.get_rank() == 0:
            return

        # python doesn't do real-time garbage collection so do it explicitly to get the correct RAM reports
        gc.collect()

        # Print message except when distributed but not rank 0
        print(message)
        print(
            f"MA {round(get_accelerator().memory_allocated() / (1024 * 1024 * 1024),2 )} GB \
            Max_MA {round(get_accelerator().max_memory_allocated() / (1024 * 1024 * 1024),2)} GB \
            CA {round( get_accelerator().memory_reserved() / (1024 * 1024 * 1024),2)} GB \
            Max_CA {round(get_accelerator().max_memory_reserved() / (1024 * 1024 * 1024))} GB "
        )

        vm_stats = psutil.virtual_memory()
        used_GB = round(((vm_stats.total - vm_stats.available) / (1024**3)), 2)
        print(
            f"CPU Virtual Memory:  used = {used_GB} GB, percent = {vm_stats.percent}%"
        )

        # get the peak memory to report correct data, so reset the counter for the next call
        get_accelerator().reset_peak_memory_stats()

    class MemoryCallback(TrainerCallback):
        """Callback to log memory usage after each training step."""

        def on_step_end(self, args, state, control, **kwargs):
            """Emit memory metrics after each training step."""
            see_memory_usage("After step end", force=True)

    data_module = make_supervised_data_module(tokenizer=tokenizer, data_args=data_args)
    trainer = Trainer(
        model=model,
        tokenizer=tokenizer,
        args=training_args,
        callbacks=[MemoryCallback],
        **data_module,
    )

    trainer.train()
    # load&save distributed checkpoint
    # trainer.save_state()
    # trainer.train(resume_from_checkpoint="/host/ssd/mingzhil/autotp_training/stanford_alpaca/out_best/zero2tp/checkpoint-1200")

    # save hf model weight
    # set ```gather_16bit_weights_on_model_save=True``` in deepspeed config
    # trainer.save_model(output_dir=training_args.output_dir)
    trainer.save_model(output_dir=training_args.output_dir)