Skip to content

Commit

Permalink
enables default data step in megatron parallel to operate on a wider …
Browse files Browse the repository at this point in the history
…variety of tensors - second try (NVIDIA#9671)

* enables default data step in megatron parallel to operate on a wider variety of tensors coming out of the dataloader

Signed-off-by: Jonathan Mitchell <[email protected]>

* handles the case where a batch is empty

Signed-off-by: Jonathan Mitchell <[email protected]>

* Apply isort and black reformatting

Signed-off-by: jomitchellnv <[email protected]>
Signed-off-by: Jonathan Mitchell <[email protected]>

* Allows the default data step to operate on more types
than just dictionaries

Signed-off-by: Jonathan Mitchell <[email protected]>

* Apply isort and black reformatting

Signed-off-by: jomitchellnv <[email protected]>

---------

Signed-off-by: Jonathan Mitchell <[email protected]>
Signed-off-by: jomitchellnv <[email protected]>
Co-authored-by: jomitchellnv <[email protected]>
Co-authored-by: John St. John <[email protected]>
  • Loading branch information
3 people authored and nikitaved committed Jul 16, 2024
1 parent 5eeb436 commit 2eb87fe
Showing 1 changed file with 26 additions and 4 deletions.
30 changes: 26 additions & 4 deletions nemo/lightning/megatron_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@

import torch
import torch.distributed
from megatron.core import parallel_state
from megatron.core.distributed import DistributedDataParallel as McoreDDP
from megatron.core.distributed import DistributedDataParallelConfig
from megatron.core.transformer.transformer_config import TransformerConfig
from pytorch_lightning.utilities import move_data_to_device
from torch import Tensor, nn
from typing_extensions import override

Expand All @@ -43,15 +45,35 @@ def convert_output(self, output: torch.Tensor) -> torch.Tensor: ...


def default_data_step(dataloader_iter: Iterator[DataT]) -> DataT:
"""
Moves the data to a device.
In this case we unpack the dataloader iterator. There may be a wrapper on the dataloader
iter from here: https://github.com/NVIDIA/NeMo/blob/main/nemo/lightning/fabric/strategies.py#L441.
This will not subset the data for your with context parallel so please override this function if you
want to use context parallel.
Examples:
If the dataloader_iter returns: [Tuple[<tensor>, <int>, <int>]] -> move to device
If the dataloader_iter returns: [<tensor>, <tensor>] -> move to device
Returns:
DataT: The data moved to the device.
"""
if parallel_state.get_context_parallel_world_size() > 1:
raise ValueError(
"Default data step is being used in a context parallel environment."
"Please define your own data step that appropriately slices the data for context parallel."
)

batch = next(dataloader_iter)

# If its wrapped in a tuple, unpack it.
if isinstance(batch, tuple) and len(batch) == 3:
batch = batch[0]

if isinstance(batch, dict):
batch = {k: v.cuda(non_blocking=True) for k, v in batch.items()}

return batch
return move_data_to_device(batch, torch.cuda.current_device())


def default_forward_step(model: nn.Module, batch, *args, **kwargs) -> torch.Tensor:
Expand Down

0 comments on commit 2eb87fe

Please sign in to comment.