Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tensor_parallel method distributed=True #114

Open
Johnno1011 opened this issue Aug 2, 2023 · 2 comments
Open

tensor_parallel method distributed=True #114

Johnno1011 opened this issue Aug 2, 2023 · 2 comments

Comments

@Johnno1011
Copy link

Hey, really liking this library !

I'm wanting to benchmark the difference between running TP normally (as in the demo notebook) and adding the distributed=True flag to the tp.tensor_parallel method; which I think will use torch.distributed rather than the NCCL backend for process communication between devices, please correct me if I understood this wrong.

I can see that in the Docstring that torchrun is required, ie starting the script with 'torchrun --nproc_per_node=4'.
I have done this and also played around with init process group within the code in combination with torchrun.

To be honest, I'm struggling to understand how to implement this with distributed=True, especially since with this flag, the device_ids parameter requires only one GPU to be passed.
I'm working with 4x 3090 GPUs that have NVLink and the Falcon-40B-Instruct model.

Any guidance on how to set this up would be much appreciated.
Let me know if you need any additional info from me.

Thanks and keep going !

@BlackSamorez
Copy link
Owner

With distributed=True you pass the device associated with current worker. For example, if you have 4 GPUs, you launch with --nproc_per_node=4, extract local worker rank with LOCAL_RANK = int(os.environ['LOCAL_RANK']) and assign a specific GPU with

model_tp = tensor_parallel(
    module=model, device_ids=["cuda:{}".format(LOCAL_RANK)], distributed=True
)

This way each worker will have it's own GPU and a portion of the model, and they will all communicate through torch.distributed backend.

Here's a full example of how you could initialize and use the model:

import os
import argparse

import torch
import torch.distributed as dist
from torch.distributed.elastic.multiprocessing.errors import record

from transformers import AutoModelForCausalLM
from tensor_parallel import tensor_parallel

NAME = "bigscience/bloom-560m"

# Environment variables set by torch.distributed.launch
LOCAL_RANK = int(os.environ['LOCAL_RANK'])
WORLD_SIZE = int(os.environ['WORLD_SIZE'])
WORLD_RANK = int(os.environ['RANK'])

def init_process(backend):
    """ Initialize the distributed environment. """
    dist.init_process_group(backend, rank=LOCAL_RANK, world_size=WORLD_SIZE)
    run(backend)

@record
def run(backend):
    torch.manual_seed(0)
    
    if backend == 'nccl':
        device=torch.device("cuda:{}".format(LOCAL_RANK))
    else:
        device=torch.device("cpu")
    
    model = AutoModelForCausalLM.from_pretrained(NAME).to(device)
    
    inp1 = torch.randint(1, 1000, size=(2, 3), device=device)
    inp2 = torch.randint(1, 1000, size=(2, 1), device=device)
    inp3 = torch.randint(1, 1000, size=(2, 2), device=device)

    out1_ref = model(inp1, use_cache=True, output_hidden_states=True)
    out2_ref = model(inp2, use_cache=True, past_key_values=out1_ref.past_key_values)
    out3_ref = model(inp3, use_cache=True, past_key_values=out2_ref.past_key_values)

    model_tp = tensor_parallel(
        module=model, device_ids=[device], distributed=True
    )
    del model

    out1 = model_tp(inp1, use_cache=True, output_hidden_states=True)
    # print([key for key in out1])
    out2 = model_tp(inp2, use_cache=True, past_key_values=out1.past_key_values)
    out3 = model_tp(inp3, use_cache=True, past_key_values=out2.past_key_values)

    torch.testing.assert_close(out1_ref.hidden_states[-1], out1.hidden_states[-1], atol=3e-3, rtol=1e-05)
    
    # print(out1_ref.logits, out1.logits)
    
    torch.testing.assert_close(out1_ref.logits, out1.logits, atol=3e-3, rtol=1e-05)
    torch.testing.assert_close(out2_ref.logits, out2.logits, atol=3e-3, rtol=1e-05)
    torch.testing.assert_close(out3_ref.logits, out3.logits, atol=3e-3, rtol=1e-05)
    print(f"Everything seems to work at worker {LOCAL_RANK}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--backend", type=str, default="nccl", choices=['nccl', 'gloo'])
    args = parser.parse_args()
    
    init_process(backend=args.backend)

@Johnno1011
Copy link
Author

Ah I see what I was not doing right now, thanks for sharing this example and so quickly too!

It looks like this is for distributed and tensor parallelism in one, is that right?
I think I misunderstood the documentation if that's the case, I thought it just controlled the backend communication between the processes in TP...
I did try to implement similar to what your example is doing, but in my case as the model is so big I:

  1. init_empty_weights with accelerate
  2. get the device map with infer_sharded_device_map
  3. then use the accelerate set_module_tensor_to_device to move the tensors appropriately.

These steps work with distributed = False, but maybe don't apply in the case of True.

I tried your method (no init empty weights , device map or set_module_tensor_to_device), but this seems to consume all of my servers RAM while loading the weights prior to putting them onto the GPU, since I have 4 processes this maxes out and causes the script to crash.

If I'm right in thinking that distributed=True leads to a copy of the model on each device then you can close this issue.
Else, I'm sharing my code with you below, please point out anything that I can do to resolve my issue.

import torch.distributed as dist
import torch.quantization
import os
import tensor_parallel as tp
from transformers.utils.bitsandbytes import replace_with_bnb_linear
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig, BitsAndBytesConfig

LOCAL_RANK = int(os.environ['LOCAL_RANK'])
WORLD_SIZE = int(os.environ['WORLD_SIZE'])
WORLD_RANK = int(os.environ['RANK'])

def init_process():
    dist.init_process_group(backend='nccl', init_method='env://', rank=LOCAL_RANK, world_size=WORLD_SIZE)
    tensor_parallel_load_model()
def tensor_parallel_load_model():
    device = torch.device('cuda:{}'.format(LOCAL_RANK))
    model_path = '/PATH/TO/MODEL/models--tiiuae--falcon-40b-instruct/snapshots/1e7fdcc9f45d13704f3826e99937917e007cd975/'
    bnb_config = BitsAndBytesConfig(
            load_in_4bit=True,
            llm_int8_threshold=6.0,
            llm_int8_has_fp16_weight=False,
            bnb_4bit_compute_dtype=torch.bfloat16,
            bnb_4bit_use_double_quant=True,
            bnb_4bit_quant_type="nf4"
    )
    config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
    config.max_new_tokens = 2048
    tokenizer = AutoTokenizer.from_pretrained(model_path, quantiation_config=bnb_config, device_map='auto', torch_dtype=torch.float16, trust_remote_code=True)
    
    model = AutoModelForCausalLM.from_pretrained(model_path, config=config, trust_remote_code=True).half().to(device)
    print('loaded model on {}'.format(device))
    model_tp = tp.tensor_parallel(model, device_ids=[device], distributed=True) # device_ids
    model_tp = replace_with_bnb_linear(model_tp, quantization_config=bnb_config)
    model_tp.is_loaded_in_4bit = True
    # device_map = tp.infer_sharded_device_map(model_tp)


    # with open(model_path + 'pytorch_model.bin.index.json', 'r') as index_file:
    #     shard_filenames = set(json.load(index_file)["weight_map"].values())

    # for shard_filename in sorted(shard_filenames):
    #     converted_state_dict = tp.convert_state_dict(
    #         torch.load(model_path + shard_filename),
    #         model_tp.tensor_parallel_config,
    #         world_size=WORLD_SIZE,
    #         for_pretrained=True,
    #     )

    #     for param_name, param in converted_state_dict.items():
    #         module_name = param_name
            
    #         while len(module_name) > 0 and module_name not in device_map:
    #             module_name = ".".join(module_name.split(".")[:-1])
    #         param_device = device_map[module_name]

    #         accelerate.utils.set_module_tensor_to_device(model_tp, param_name, param_device, value=param)
    #         converted_state_dict[param_name] = None
    #     del converted_state_dict
    #     del model
    #     print('Loaded shard', shard_filename)
    del model

    print('Model on worker {} ready'.format(LOCAL_RANK))
    prompt = """Hello Falcon, """
    input_ids = tokenizer.encode(prompt, return_tensors="pt").to("cuda:0")
    # Generate text
    output = model_tp.generate(input_ids, temperature=0.0, repetition_penalty=1.0, max_new_tokens=150)
    print(tokenizer.decode(output[0], skip_special_tokens=True))

if __name__ == '__main__':
    init_process()

I've left my older code reg moving tensors to devices in comments for your convenience.
Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants