diff --git a/src/nanotron/parallel/pipeline_parallel/block.py b/src/nanotron/parallel/pipeline_parallel/block.py index 57bd8e0a..4e8cfeb5 100644 --- a/src/nanotron/parallel/pipeline_parallel/block.py +++ b/src/nanotron/parallel/pipeline_parallel/block.py @@ -93,7 +93,6 @@ def forward(self, **kwargs): pipeline_state=self.pipeline_state, ) continue - if v.requires_grad is True: raise ValueError( f"Pipeline engine is None and tensor requires grad. Tried sending a tensor to {self.rank}. Usually that means that your model is pipeline sharded and you haven't chosen a specific pipeline engine." @@ -152,6 +151,29 @@ def forward(self, **kwargs): # We don't store result in a buffer recv_id = batch_send_recv.add_recv(from_rank=tensor.group_rank) name_to_recv_id[name] = recv_id + elif isinstance(tensor, dict): + new_kwargs[name] = {} + for k, v in tensor.items(): + # the same as above just looped over the dict + if isinstance(v, TensorPointer): + if isinstance(self.pipeline_state, PipelineTrainBatchState): + for _ in range(len(self.pipeline_state.microbatches_activations_to_send)): + send_activation = self.pipeline_state.microbatches_activations_to_send.popleft() + # Execute + send_activation() + + if self.pipeline_state is not None: + new_kwargs[name][k] = recv_from_pipeline_state_buffer( + from_rank=tensor.group_rank, + p2p=self.p2p, + pipeline_state=self.pipeline_state, + ) + continue + # We don't store result in a buffer + recv_id = batch_send_recv.add_recv(from_rank=tensor.group_rank) + name_to_recv_id[name] = recv_id + else: + new_kwargs[name][k] = v else: new_kwargs[name] = tensor