Skip to content

Commit

Permalink
sequence packing support try 1
Browse files Browse the repository at this point in the history
Signed-off-by: Vivian Chen <[email protected]>
  • Loading branch information
xuanzic authored and Vivian Chen committed Aug 1, 2024
1 parent 6150584 commit 5a8e906
Showing 1 changed file with 49 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,37 +243,9 @@ def pack_sequence(args, seq_lens):
bins = packing_fn(seq_lens, args.max_seq_length)
return bins


def main():
torch.multiprocessing.set_sharing_strategy('file_system')

args = get_args()
nemo_config = OmegaConf.load(args.hparams_file)
nemo_config.model.mm_cfg.vision_encoder.from_pretrained = args.hf_vision_encoder
nemo_config.model.data.data_path = args.data_path
nemo_config.model.data.image_folder = args.image_folder
nemo_config.model.data.conv_template = args.conv_template
nemo_config.model.data.image_aspect_ratio = args.image_aspect_ratio

tokenizer = get_nmt_tokenizer(
library="sentencepiece",
tokenizer_model=args.tokenizer_path,
)
image_processor = create_image_processor(nemo_config.model.mm_cfg)
train_ds = make_supervised_data_module(
tokenizer=tokenizer, image_processor=image_processor, model_cfg=nemo_config.model
)["train_dataset"]
train_dl = DataLoader(train_ds, num_workers=32, collate_fn=None, shuffle=False)
# Example shape: {'tokens': torch.Size([1, 344]), 'labels': torch.Size([1, 344]), 'image': torch.Size([1, 1, 3, 224, 224])}

output_dir = args.output_dir
os.makedirs(output_dir, exist_ok=True)
logging.info(f"Output directory: {output_dir}")

prefix_path = f"{output_dir}/packed_seq_dataset"
# Original Datasets to Sequence Lengths Files
def process_data_file(train_dl, prefix_path, data_file):
builders = {}
for item_dict in tqdm(train_dl, desc="Building indexed datasets"):
for item_dict in tqdm(train_dl, desc=f"Building indexed datasets for {data_file}"):
item_dict = {k: v[0] for k, v in item_dict.items()}
seq_len = len(item_dict['tokens'])
if seq_len in builders:
Expand All @@ -294,7 +266,7 @@ def main():
logging.info(f"Finalizing builder for sequence length {seq_len} at {idx_path}")
builder.finalize(idx_path)

# Packing Sequences into Bins
def pack_sequences_into_bins(args, output_dir, prefix_path):
files = os.listdir(f"{output_dir}/packed_seq_dataset")
pattern = rf"seqlen_(\d+).bin"
seq_len_list = []
Expand All @@ -303,16 +275,22 @@ def main():
if match:
seq_len = int(match.group(1))
seq_len_list.append(seq_len)

aggregated_seq_lens = []
doc_pop_order = {}
indexed_datasets = {}
error_len = 0
for seq_len in seq_len_list:
dataset_path = f"{prefix_path}/seqlen_{seq_len}"
dataset = IndexedDataset(dataset_path, multimodal=True)
aggregated_seq_lens.extend([seq_len] * (len(dataset.document_indices) - 1))
doc_pop_order[seq_len] = list(np.random.permutation(len(dataset.document_indices) - 1))
indexed_datasets[seq_len] = dataset
try:
dataset = IndexedDataset(dataset_path, multimodal=True)
aggregated_seq_lens.extend([seq_len] * (len(dataset.document_indices) - 1))
doc_pop_order[seq_len] = list(np.random.permutation(len(dataset.document_indices) - 1))
indexed_datasets[seq_len] = dataset
except Exception as e:
error_len += 1
logging.error(f"Error while processing {dataset_path}: {e}")
logging.info(f"Number of errors: {error_len}")

logging.info("Getting bins")
bins = pack_sequence(args, aggregated_seq_lens)
Expand All @@ -323,7 +301,6 @@ def main():
avg_bins_sum = sum([sum(x) for x in bins]) / num_bins
logging.info(f"Number of bins: {num_bins}, Average bin length: {avg_bins_len}, Average bin sum: {avg_bins_sum}")

# Reading Sequence Lengths and Packing into New Files
final_builder_path = get_bin_path(f"{prefix_path}")
logging.info(f"Creating final builder at {final_builder_path}")
final_builder = IndexedDatasetBuilder(final_builder_path, dtype=np.float32, multimodal=True)
Expand Down Expand Up @@ -356,6 +333,41 @@ def main():
final_builder.finalize(idx_path)
logging.info(f"Number of bins: {num_bins}, Average bin length: {avg_bins_len}, Average bin sum: {avg_bins_sum}")

def main():
torch.multiprocessing.set_sharing_strategy('file_system')

args = get_args()
nemo_config = OmegaConf.load(args.hparams_file)
nemo_config.model.mm_cfg.vision_encoder.from_pretrained = args.hf_vision_encoder
nemo_config.model.data.data_path = args.data_path
nemo_config.model.data.image_folder = args.image_folder
nemo_config.model.data.conv_template = args.conv_template
nemo_config.model.data.image_aspect_ratio = args.image_aspect_ratio

tokenizer = get_nmt_tokenizer(
library="sentencepiece",
tokenizer_model=args.tokenizer_path,
)
image_processor = create_image_processor(nemo_config.model.mm_cfg)
output_dir = args.output_dir
os.makedirs(output_dir, exist_ok=True)
logging.info(f"Output directory: {output_dir}")

prefix_path = f"{output_dir}/packed_seq_dataset"
os.makedirs(prefix_path, exist_ok=True)

data_files = nemo_config.model.data.data_file_names
for data_file in data_files:
logging.info(f"Processing data file: {data_file}")

train_ds = make_supervised_data_module(
tokenizer=tokenizer, image_processor=image_processor, model_cfg=nemo_config.model, data_file=data_file
)["train_dataset"]
train_dl = DataLoader(train_ds, num_workers=32, collate_fn=None, shuffle=False)

process_data_file(train_dl, prefix_path, data_file)

pack_sequences_into_bins(args, output_dir, prefix_path)

if __name__ == "__main__":
main()

0 comments on commit 5a8e906

Please sign in to comment.