diff --git a/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py b/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py index 60f882fa9821..be1edd66aeb0 100644 --- a/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py +++ b/examples/multimodal/multimodal_llm/neva/sequence_packing/preprocess_dataset.py @@ -243,37 +243,9 @@ def pack_sequence(args, seq_lens): bins = packing_fn(seq_lens, args.max_seq_length) return bins - -def main(): - torch.multiprocessing.set_sharing_strategy('file_system') - - args = get_args() - nemo_config = OmegaConf.load(args.hparams_file) - nemo_config.model.mm_cfg.vision_encoder.from_pretrained = args.hf_vision_encoder - nemo_config.model.data.data_path = args.data_path - nemo_config.model.data.image_folder = args.image_folder - nemo_config.model.data.conv_template = args.conv_template - nemo_config.model.data.image_aspect_ratio = args.image_aspect_ratio - - tokenizer = get_nmt_tokenizer( - library="sentencepiece", - tokenizer_model=args.tokenizer_path, - ) - image_processor = create_image_processor(nemo_config.model.mm_cfg) - train_ds = make_supervised_data_module( - tokenizer=tokenizer, image_processor=image_processor, model_cfg=nemo_config.model - )["train_dataset"] - train_dl = DataLoader(train_ds, num_workers=32, collate_fn=None, shuffle=False) - # Example shape: {'tokens': torch.Size([1, 344]), 'labels': torch.Size([1, 344]), 'image': torch.Size([1, 1, 3, 224, 224])} - - output_dir = args.output_dir - os.makedirs(output_dir, exist_ok=True) - logging.info(f"Output directory: {output_dir}") - - prefix_path = f"{output_dir}/packed_seq_dataset" - # Original Datasets to Sequence Lengths Files +def process_data_file(train_dl, prefix_path, data_file): builders = {} - for item_dict in tqdm(train_dl, desc="Building indexed datasets"): + for item_dict in tqdm(train_dl, desc=f"Building indexed datasets for {data_file}"): item_dict = {k: v[0] for k, v in item_dict.items()} seq_len = len(item_dict['tokens']) if seq_len in builders: @@ -294,7 +266,7 @@ def main(): logging.info(f"Finalizing builder for sequence length {seq_len} at {idx_path}") builder.finalize(idx_path) - # Packing Sequences into Bins +def pack_sequences_into_bins(args, output_dir, prefix_path): files = os.listdir(f"{output_dir}/packed_seq_dataset") pattern = rf"seqlen_(\d+).bin" seq_len_list = [] @@ -303,16 +275,22 @@ def main(): if match: seq_len = int(match.group(1)) seq_len_list.append(seq_len) - + aggregated_seq_lens = [] doc_pop_order = {} indexed_datasets = {} + error_len = 0 for seq_len in seq_len_list: dataset_path = f"{prefix_path}/seqlen_{seq_len}" - dataset = IndexedDataset(dataset_path, multimodal=True) - aggregated_seq_lens.extend([seq_len] * (len(dataset.document_indices) - 1)) - doc_pop_order[seq_len] = list(np.random.permutation(len(dataset.document_indices) - 1)) - indexed_datasets[seq_len] = dataset + try: + dataset = IndexedDataset(dataset_path, multimodal=True) + aggregated_seq_lens.extend([seq_len] * (len(dataset.document_indices) - 1)) + doc_pop_order[seq_len] = list(np.random.permutation(len(dataset.document_indices) - 1)) + indexed_datasets[seq_len] = dataset + except Exception as e: + error_len += 1 + logging.error(f"Error while processing {dataset_path}: {e}") + logging.info(f"Number of errors: {error_len}") logging.info("Getting bins") bins = pack_sequence(args, aggregated_seq_lens) @@ -323,7 +301,6 @@ def main(): avg_bins_sum = sum([sum(x) for x in bins]) / num_bins logging.info(f"Number of bins: {num_bins}, Average bin length: {avg_bins_len}, Average bin sum: {avg_bins_sum}") - # Reading Sequence Lengths and Packing into New Files final_builder_path = get_bin_path(f"{prefix_path}") logging.info(f"Creating final builder at {final_builder_path}") final_builder = IndexedDatasetBuilder(final_builder_path, dtype=np.float32, multimodal=True) @@ -356,6 +333,41 @@ def main(): final_builder.finalize(idx_path) logging.info(f"Number of bins: {num_bins}, Average bin length: {avg_bins_len}, Average bin sum: {avg_bins_sum}") +def main(): + torch.multiprocessing.set_sharing_strategy('file_system') + + args = get_args() + nemo_config = OmegaConf.load(args.hparams_file) + nemo_config.model.mm_cfg.vision_encoder.from_pretrained = args.hf_vision_encoder + nemo_config.model.data.data_path = args.data_path + nemo_config.model.data.image_folder = args.image_folder + nemo_config.model.data.conv_template = args.conv_template + nemo_config.model.data.image_aspect_ratio = args.image_aspect_ratio + + tokenizer = get_nmt_tokenizer( + library="sentencepiece", + tokenizer_model=args.tokenizer_path, + ) + image_processor = create_image_processor(nemo_config.model.mm_cfg) + output_dir = args.output_dir + os.makedirs(output_dir, exist_ok=True) + logging.info(f"Output directory: {output_dir}") + prefix_path = f"{output_dir}/packed_seq_dataset" + os.makedirs(prefix_path, exist_ok=True) + + data_files = nemo_config.model.data.data_file_names + for data_file in data_files: + logging.info(f"Processing data file: {data_file}") + + train_ds = make_supervised_data_module( + tokenizer=tokenizer, image_processor=image_processor, model_cfg=nemo_config.model, data_file=data_file + )["train_dataset"] + train_dl = DataLoader(train_ds, num_workers=32, collate_fn=None, shuffle=False) + + process_data_file(train_dl, prefix_path, data_file) + + pack_sequences_into_bins(args, output_dir, prefix_path) + if __name__ == "__main__": main()