diff --git a/python/paddle/distributed/fleet/dataset/dataset.py b/python/paddle/distributed/fleet/dataset/dataset.py index bdaa8cbb5ea08..d0c7ca3b7b644 100755 --- a/python/paddle/distributed/fleet/dataset/dataset.py +++ b/python/paddle/distributed/fleet/dataset/dataset.py @@ -79,9 +79,9 @@ def _set_pipe_command(self, pipe_command): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.dataset.DatasetBase() - dataset._set_pipe_command("python my_script.py") + >>> import paddle + >>> dataset = paddle.distributed.fleet.dataset.DatasetBase() + >>> dataset._set_pipe_command("python my_script.py") Args: pipe_command(str): pipe command @@ -96,9 +96,9 @@ def _set_batch_size(self, batch_size): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_batch_size(128) + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_batch_size(128) Args: batch_size(int): batch size @@ -113,9 +113,9 @@ def _set_thread(self, thread_num): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_thread(12) + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_thread(12) Args: thread_num(int): thread num @@ -130,9 +130,9 @@ def set_filelist(self, filelist): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset.set_filelist(['a.txt', 'b.txt']) + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset.set_filelist(['a.txt', 'b.txt']) Args: filelist(list[str]): list of file names of inputs. @@ -150,9 +150,9 @@ def _set_uid_slot(self, uid_slot): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_uid_slot('6048') + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_uid_slot('6048') Args: set_uid_slot(string): user slot name @@ -167,9 +167,9 @@ def _set_use_var(self, var_list): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_use_var([data, label]) + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_use_var([data, label]) Args: var_list(list): variable list @@ -198,9 +198,9 @@ def _set_hdfs_config(self, fs_name, fs_ugi): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_hdfs_config("my_fs_name", "my_fs_ugi") + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_hdfs_config("my_fs_name", "my_fs_ugi") Args: fs_name(str): fs name @@ -215,9 +215,9 @@ def _set_download_cmd(self, download_cmd): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - dataset._set_download_cmd("./read_from_afs") + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> dataset._set_download_cmd("./read_from_afs") Args: download_cmd(str): customized download command @@ -259,9 +259,10 @@ def _desc(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.DatasetBase() - print(dataset._desc()) + >>> import paddle + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> print(dataset._desc()) + pipe_command: "cat" Returns: A string message @@ -283,12 +284,12 @@ def _check_use_var_with_data_generator( Examples: .. code-block:: python - # required: skiptest - import paddle - from dataset_generator import CTRDataset - dataset = paddle.distributed.fleet.DatasetBase() - generator_class = CTRDataset() - dataset._check_use_var_with_data_generator([data, label], generator_class, "data/part-00000") + >>> # doctest: +SKIP('need to work with real dataset') + >>> import paddle + >>> from dataset_generator import CTRDataset + >>> dataset = paddle.distributed.fleet.DatasetBase() + >>> generator_class = CTRDataset() + >>> dataset._check_use_var_with_data_generator([data, label], generator_class, "data/part-00000") Args: var_list(list): variable list @@ -357,9 +358,9 @@ class InMemoryDataset(DatasetBase): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() """ @@ -400,20 +401,21 @@ def _init_distributed_settings(self, **kwargs): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=[]) - dataset._init_distributed_settings( - parse_ins_id=True, - parse_content=True, - fea_eval=True, - candidate_size=10000) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=[]) + + >>> dataset._init_distributed_settings( + ... parse_ins_id=True, + ... parse_content=True, + ... fea_eval=True, + ... candidate_size=10000) """ merge_size = kwargs.get("merge_size", -1) @@ -473,22 +475,22 @@ def update_settings(self, **kwargs): Examples: .. code-block:: python - import paddle - paddle.enable_static() + >>> import paddle + >>> paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=[]) - dataset._init_distributed_settings( - parse_ins_id=True, - parse_content=True, - fea_eval=True, - candidate_size=10000) - dataset.update_settings(batch_size=2) + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=[]) + >>> dataset._init_distributed_settings( + ... parse_ins_id=True, + ... parse_content=True, + ... fea_eval=True, + ... candidate_size=10000) + >>> dataset.update_settings(batch_size=2) """ for key in kwargs: @@ -543,45 +545,44 @@ def init(self, **kwargs): Examples: .. code-block:: python - import paddle - import os - paddle.enable_static() - - with open("test_queue_dataset_run_a.txt", "w") as f: - data = "2 1 2 2 5 4 2 2 7 2 1 3" - f.write(data) - with open("test_queue_dataset_run_b.txt", "w") as f: - data = "2 1 2 2 5 4 2 2 7 2 1 3" - f.write(data) - - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - - dataset = paddle.distributed.InMemoryDataset() - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - dataset.set_filelist( - ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) - dataset.load_into_memory() - - place = paddle.CPUPlace() - exe = paddle.static.Executor(place) - startup_program = paddle.static.Program() - main_program = paddle.static.Program() - exe.run(startup_program) - - exe.train_from_dataset(main_program, dataset) - - os.remove("./test_queue_dataset_run_a.txt") - os.remove("./test_queue_dataset_run_b.txt") + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> import os + >>> paddle.enable_static() + + >>> with open("test_queue_dataset_run_a.txt", "w") as f: + ... data = "2 1 2 2 5 4 2 2 7 2 1 3" + ... f.write(data) + >>> with open("test_queue_dataset_run_b.txt", "w") as f: + ... data = "2 1 2 2 5 4 2 2 7 2 1 3" + ... f.write(data) + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> dataset.set_filelist( + ... ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) + >>> dataset.load_into_memory() + + >>> place = paddle.CPUPlace() + >>> exe = paddle.static.Executor(place) + >>> startup_program = paddle.static.Program() + >>> main_program = paddle.static.Program() + >>> exe.run(startup_program) + + >>> exe.train_from_dataset(main_program, dataset) + + >>> os.remove("./test_queue_dataset_run_a.txt") + >>> os.remove("./test_queue_dataset_run_b.txt") """ batch_size = kwargs.get("batch_size", 1) @@ -668,10 +669,10 @@ def _set_queue_num(self, queue_num): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_queue_num(12) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_queue_num(12) """ self.is_user_set_queue_num = True @@ -687,10 +688,10 @@ def _set_parse_ins_id(self, parse_ins_id): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_parse_ins_id(True) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_parse_ins_id(True) """ self.parse_ins_id = parse_ins_id @@ -705,10 +706,10 @@ def _set_parse_content(self, parse_content): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_parse_content(True) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_parse_content(True) """ self.parse_content = parse_content @@ -723,10 +724,10 @@ def _set_fleet_send_batch_size(self, fleet_send_batch_size=1024): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_fleet_send_batch_size(800) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_fleet_send_batch_size(800) """ self.fleet_send_batch_size = fleet_send_batch_size @@ -741,10 +742,10 @@ def _set_fleet_send_sleep_seconds(self, fleet_send_sleep_seconds=0): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_fleet_send_sleep_seconds(2) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_fleet_send_sleep_seconds(2) """ self.fleet_send_sleep_seconds = fleet_send_sleep_seconds @@ -760,10 +761,10 @@ def _set_merge_by_lineid(self, merge_size=2): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_merge_by_lineid() + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_merge_by_lineid() """ self.dataset.set_merge_by_lineid(merge_size) @@ -780,10 +781,11 @@ def _set_shuffle_by_uid(self, enable_shuffle_uid): Examples: .. code-block:: python - import paddle - paddle.enable_static() - dataset = paddle.distributed.InMemoryDataset() - dataset._set_shuffle_by_uid(True) + >>> import paddle + >>> paddle.enable_static() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._set_shuffle_by_uid(True) + """ self.dataset.set_shuffle_by_uid(enable_shuffle_uid) @@ -811,23 +813,24 @@ def set_date(self, date): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - dataset.set_date("20211111") + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> dataset.set_date("20211111") + """ year = int(date[:4]) month = int(date[4:6]) @@ -867,25 +870,27 @@ def load_into_memory(self, is_shuffle=False): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + """ self._prepare_to_run() if not self.use_ps_gpu: @@ -906,26 +911,28 @@ def preload_into_memory(self, thread_num=None): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() - dataset.wait_preload_done() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.preload_into_memory() + >>> dataset.wait_preload_done() + """ self._prepare_to_run() if thread_num is None: @@ -943,26 +950,28 @@ def wait_preload_done(self): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() - dataset.wait_preload_done() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.preload_into_memory() + >>> dataset.wait_preload_done() + """ self.dataset.wait_preload_done() self.dataset.destroy_preload_readers() @@ -976,26 +985,28 @@ def local_shuffle(self): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.local_shuffle() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.local_shuffle() + """ self.dataset.local_shuffle() @@ -1011,26 +1022,27 @@ def global_shuffle(self, fleet=None, thread_num=12): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.global_shuffle() Args: fleet(Fleet): fleet singleton. Default None. @@ -1068,32 +1080,33 @@ def release_memory(self): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle() - exe = paddle.static.Executor(paddle.CPUPlace()) - startup_program = paddle.static.Program() - main_program = paddle.static.Program() - exe.run(startup_program) - exe.train_from_dataset(main_program, dataset) - dataset.release_memory() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.global_shuffle() + >>> exe = paddle.static.Executor(paddle.CPUPlace()) + >>> startup_program = paddle.static.Program() + >>> main_program = paddle.static.Program() + >>> exe.run(startup_program) + >>> exe.train_from_dataset(main_program, dataset) + >>> dataset.release_memory() """ self.dataset.release_memory() @@ -1117,26 +1130,29 @@ def get_memory_data_size(self, fleet=None): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - print dataset.get_memory_data_size() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> print(dataset.get_memory_data_size()) """ import numpy as np @@ -1171,28 +1187,31 @@ def get_shuffle_data_size(self, fleet=None): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - dataset = paddle.distributed.InMemoryDataset() - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.global_shuffle() - print dataset.get_shuffle_data_size() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset = paddle.distributed.InMemoryDataset() + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.global_shuffle() + >>> print(dataset.get_shuffle_data_size()) """ import numpy as np @@ -1245,27 +1264,28 @@ def slots_shuffle(self, slots): Examples: .. code-block:: python - import paddle - paddle.enable_static() - - dataset = paddle.distributed.InMemoryDataset() - dataset._init_distributed_settings(fea_eval=True) - slots = ["slot1", "slot2", "slot3", "slot4"] - slots_vars = [] - for slot in slots: - var = paddle.static.data( - name=slot, shape=[None, 1], dtype="int64", lod_level=1) - slots_vars.append(var) - dataset.init( - batch_size=1, - thread_num=2, - input_type=1, - pipe_command="cat", - use_var=slots_vars) - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.slots_shuffle(['slot1']) + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> paddle.enable_static() + + >>> dataset = paddle.distributed.InMemoryDataset() + >>> dataset._init_distributed_settings(fea_eval=True) + >>> slots = ["slot1", "slot2", "slot3", "slot4"] + >>> slots_vars = [] + >>> for slot in slots: + ... var = paddle.static.data( + ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) + ... slots_vars.append(var) + >>> dataset.init( + ... batch_size=1, + ... thread_num=2, + ... input_type=1, + ... pipe_command="cat", + ... use_var=slots_vars) + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.slots_shuffle(['slot1']) """ if self.fea_eval: slots_set = set(slots) @@ -1325,8 +1345,8 @@ class FileInstantDataset(DatasetBase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.FileInstantDataset() + >>> import paddle + >>> dataset = paddle.distributed.fleet.FileInstantDataset() """ def __init__(self): @@ -1350,8 +1370,8 @@ class BoxPSDataset(InMemoryDataset): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() """ def __init__(self): @@ -1386,9 +1406,9 @@ def _set_rank_offset(self, rank_offset): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset._set_rank_offset("rank_offset") + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset._set_rank_offset("rank_offset") Args: rank_offset(str): rank_offset's name @@ -1403,9 +1423,9 @@ def _set_pv_batch_size(self, pv_batch_size): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset._set_pv_batch_size(128) + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset._set_pv_batch_size(128) Args: pv_batch_size(int): pv batch size @@ -1422,9 +1442,9 @@ def _set_parse_logkey(self, parse_logkey): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset._set_parse_logkey(True) + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset._set_parse_logkey(True) """ self.parse_logkey = parse_logkey @@ -1439,9 +1459,9 @@ def _set_merge_by_sid(self, merge_by_sid): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset._set_merge_by_sid(True) + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset._set_merge_by_sid(True) """ self.merge_by_sid = merge_by_sid @@ -1456,9 +1476,9 @@ def _set_enable_pv_merge(self, enable_pv_merge): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset._set_enable_pv_merge(True) + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset._set_enable_pv_merge(True) """ self.enable_pv_merge = enable_pv_merge @@ -1480,9 +1500,9 @@ def begin_pass(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset.begin_pass() + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset.begin_pass() """ self.boxps.begin_pass() @@ -1493,9 +1513,9 @@ def end_pass(self, need_save_delta): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - dataset.end_pass(True) + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> dataset.end_pass(True) """ self.boxps.end_pass(need_save_delta) @@ -1506,12 +1526,13 @@ def wait_preload_done(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() - dataset.wait_preload_done() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.preload_into_memory() + >>> dataset.wait_preload_done() """ self.boxps.wait_feed_pass_done() @@ -1521,11 +1542,12 @@ def load_into_memory(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() """ self._prepare_to_run() self.boxps.load_into_memory() @@ -1536,11 +1558,12 @@ def preload_into_memory(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.preload_into_memory() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.preload_into_memory() """ self._prepare_to_run() self.boxps.preload_into_memory() @@ -1582,12 +1605,13 @@ def set_current_phase(self, current_phase): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.set_current_phase(1) + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.set_current_phase(1) """ self.dataset.set_current_phase(current_phase) @@ -1606,12 +1630,13 @@ def get_pv_data_size(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - print dataset.get_pv_data_size() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> print(dataset.get_pv_data_size()) """ return self.dataset.get_pv_data_size() @@ -1624,12 +1649,13 @@ def preprocess_instance(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.preprocess_instance() """ self.dataset.preprocess_instance() @@ -1641,14 +1667,15 @@ def postprocess_instance(self): Examples: .. code-block:: python - import paddle - dataset = paddle.distributed.fleet.BoxPSDataset() - filelist = ["a.txt", "b.txt"] - dataset.set_filelist(filelist) - dataset.load_into_memory() - dataset.preprocess_instance() - exe.train_from_dataset(dataset) - dataset.postprocess_instance() + >>> # doctest: +SKIP('No files to read') + >>> import paddle + >>> dataset = paddle.distributed.fleet.BoxPSDataset() + >>> filelist = ["a.txt", "b.txt"] + >>> dataset.set_filelist(filelist) + >>> dataset.load_into_memory() + >>> dataset.preprocess_instance() + >>> exe.train_from_dataset(dataset) + >>> dataset.postprocess_instance() """ self.dataset.postprocess_instance() diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_layers.py b/python/paddle/distributed/fleet/layers/mpu/mp_layers.py index 67b88cb52ab45..c3ebb10c32140 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_layers.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_layers.py @@ -61,36 +61,34 @@ class VocabParallelEmbedding(paddle.nn.Layer): Examples: .. code-block:: python - import paddle - from paddle.distributed import fleet - - class SimpleMPNet(paddle.nn.Layer): - def __init__(self, vocab_size, hidden_size, inner_size, output_size): - super().__init__() - self.linear1 = fleet.meta_parallel.ColumnParallelLinear( - hidden_size, - inner_size, - gather_output=False, - has_bias=True) - - self.linear2 = fleet.meta_parallel.RowParallelLinear( - inner_size, - hidden_size, - input_is_parallel=True, - has_bias=True) - - self.linear3 = paddle.nn.Linear(hidden_size, output_size) - - self.embedding = fleet.meta_parallel.VocabParallelEmbedding( - vocab_size, - hidden_size) - - def forward(self, x): - x = self.embedding(x) - x = self.linear1(x) - x = self.linear2(x) - x = self.linear3(x) - return x + + >>> import paddle + >>> from paddle.distributed import fleet + + >>> class SimpleMPNet(paddle.nn.Layer): + ... def __init__(self, vocab_size, hidden_size, inner_size, output_size): + ... super().__init__() + ... self.linear1 = fleet.meta_parallel.ColumnParallelLinear( + ... hidden_size, + ... inner_size, + ... gather_output=False, + ... has_bias=True) + ... self.linear2 = fleet.meta_parallel.RowParallelLinear( + ... inner_size, + ... hidden_size, + ... input_is_parallel=True, + ... has_bias=True) + ... self.linear3 = paddle.nn.Linear(hidden_size, output_size) + ... self.embedding = fleet.meta_parallel.VocabParallelEmbedding( + ... vocab_size, + ... hidden_size) + ... def forward(self, x): + ... x = self.embedding(x) + ... x = self.linear1(x) + ... x = self.linear2(x) + ... x = self.linear3(x) + ... return x + """ def __init__( @@ -327,36 +325,33 @@ class ColumnParallelLinear(paddle.nn.Layer): Examples: .. code-block:: python - import paddle - from paddle.distributed import fleet - - class SimpleMPNet(paddle.nn.Layer): - def __init__(self, vocab_size, hidden_size, inner_size, output_size): - super().__init__() - self.linear1 = fleet.meta_parallel.ColumnParallelLinear( - hidden_size, - inner_size, - gather_output=False, - has_bias=True) - - self.linear2 = fleet.meta_parallel.RowParallelLinear( - inner_size, - hidden_size, - input_is_parallel=True, - has_bias=True) - - self.linear3 = paddle.nn.Linear(hidden_size, output_size) - - self.embedding = fleet.meta_parallel.VocabParallelEmbedding( - vocab_size, - hidden_size) - - def forward(self, x): - x = self.embedding(x) - x = self.linear1(x) - x = self.linear2(x) - x = self.linear3(x) - return x + + >>> import paddle + >>> from paddle.distributed import fleet + + >>> class SimpleMPNet(paddle.nn.Layer): + ... def __init__(self, vocab_size, hidden_size, inner_size, output_size): + ... super().__init__() + ... self.linear1 = fleet.meta_parallel.ColumnParallelLinear( + ... hidden_size, + ... inner_size, + ... gather_output=False, + ... has_bias=True) + ... self.linear2 = fleet.meta_parallel.RowParallelLinear( + ... inner_size, + ... hidden_size, + ... input_is_parallel=True, + ... has_bias=True) + ... self.linear3 = paddle.nn.Linear(hidden_size, output_size) + ... self.embedding = fleet.meta_parallel.VocabParallelEmbedding( + ... vocab_size, + ... hidden_size) + ... def forward(self, x): + ... x = self.embedding(x) + ... x = self.linear1(x) + ... x = self.linear2(x) + ... x = self.linear3(x) + ... return x """ def __init__( @@ -537,36 +532,34 @@ class RowParallelLinear(paddle.nn.Layer): Examples: .. code-block:: python - import paddle - from paddle.distributed import fleet - - class SimpleMPNet(paddle.nn.Layer): - def __init__(self, vocab_size, hidden_size, inner_size, output_size): - super().__init__() - self.linear1 = fleet.meta_parallel.ColumnParallelLinear( - hidden_size, - inner_size, - gather_output=False, - has_bias=True) - - self.linear2 = fleet.meta_parallel.RowParallelLinear( - inner_size, - hidden_size, - input_is_parallel=True, - has_bias=True) - - self.linear3 = paddle.nn.Linear(hidden_size, output_size) - - self.embedding = fleet.meta_parallel.VocabParallelEmbedding( - vocab_size, - hidden_size) - - def forward(self, x): - x = self.embedding(x) - x = self.linear1(x) - x = self.linear2(x) - x = self.linear3(x) - return x + + >>> import paddle + >>> from paddle.distributed import fleet + + >>> class SimpleMPNet(paddle.nn.Layer): + ... def __init__(self, vocab_size, hidden_size, inner_size, output_size): + ... super().__init__() + ... self.linear1 = fleet.meta_parallel.ColumnParallelLinear( + ... hidden_size, + ... inner_size, + ... gather_output=False, + ... has_bias=True) + ... self.linear2 = fleet.meta_parallel.RowParallelLinear( + ... inner_size, + ... hidden_size, + ... input_is_parallel=True, + ... has_bias=True) + ... self.linear3 = paddle.nn.Linear(hidden_size, output_size) + ... self.embedding = fleet.meta_parallel.VocabParallelEmbedding( + ... vocab_size, + ... hidden_size) + ... def forward(self, x): + ... x = self.embedding(x) + ... x = self.linear1(x) + ... x = self.linear2(x) + ... x = self.linear3(x) + ... return x + """ def __init__( @@ -736,8 +729,12 @@ class ParallelCrossEntropy(paddle.nn.Layer): Examples: .. code-block:: python - loss_func = ParallelCrossEntropy() - loss = loss_func(img, lable) + + >>> # doctest: +SKIP('No img to demonstrate') + >>> from paddle.distributed.fleet.layers.mpu import ParallelCrossEntropy + >>> loss_func = ParallelCrossEntropy + >>> loss = loss_func(img, lable) + """ def __init__(self, mp_group=None, name=None, ignore_index=-100): diff --git a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py index 5a726dd5ab141..360c186103565 100644 --- a/python/paddle/distributed/fleet/layers/mpu/mp_ops.py +++ b/python/paddle/distributed/fleet/layers/mpu/mp_ops.py @@ -809,19 +809,19 @@ def split( Examples: .. code-block:: python - # required: distributed - import paddle - import paddle.distributed.fleet as fleet - - paddle.enable_static() - paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id) - fleet.init(is_collective=True) - data = paddle.randint(0, 8, shape=[10,4]) - emb_out = paddle.distributed.split( - data, - (8, 8), - operation="embedding", - num_partitions=2) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> import paddle + >>> import paddle.distributed.fleet as fleet + + >>> paddle.enable_static() + >>> paddle.set_device('gpu:%d'%paddle.distributed.ParallelEnv().dev_id) + >>> fleet.init(is_collective=True) + >>> data = paddle.randint(0, 8, shape=[10,4]) + >>> emb_out = paddle.distributed.split( + ... data, + ... (8, 8), + ... operation="embedding", + ... num_partitions=2) """ assert isinstance(size, (list, tuple)), ( diff --git a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py index 4222d80a4e374..26b0c7a12ace7 100755 --- a/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py +++ b/python/paddle/distributed/fleet/meta_parallel/parallel_layers/pp_layers.py @@ -247,68 +247,69 @@ class PipelineLayer(nn.Layer): num_virtual_pipeline_stages(int, optional): the num of virtual pipeline stages for interleave pp. Examples: .. code-block:: python - import paddle.nn as nn - import paddle.nn.functional as F - from paddle.distributed import fleet - from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer - - pipeline_parallel_size = 2 - strategy = fleet.DistributedStrategy() - strategy.hybrid_configs = { - "dp_degree": 1, - "mp_degree": 1, - "pp_degree": pipeline_parallel_size - } - strategy.pipeline_configs = { - "accumulate_steps": 4, - "micro_batch_size": 2 - } - - fleet.init(is_collective=True, strategy=strategy) - - hcg = fleet.get_hybrid_communicate_group() - - class ReshapeHelp(nn.Layer): - def __init__(self, shape): - super().__init__() - self.shape = shape - - def forward(self, x): - return x.reshape(shape=self.shape) - - class AlexNetPipeDesc(PipelineLayer): - def __init__(self, num_classes=10, **kwargs): - self.num_classes = num_classes - decs = [ - LayerDesc( - nn.Conv2D, 1, 64, kernel_size=11, stride=4, padding=5), - LayerDesc(nn.ReLU), - LayerDesc( - nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc( - nn.Conv2D, 64, 192, kernel_size=5, padding=2), - F.relu, - LayerDesc( - nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc( - nn.Conv2D, 192, 384, kernel_size=3, padding=1), - F.relu, - LayerDesc( - nn.Conv2D, 384, 256, kernel_size=3, padding=1), - F.relu, - LayerDesc( - nn.Conv2D, 256, 256, kernel_size=3, padding=1), - F.relu, - LayerDesc( - nn.MaxPool2D, kernel_size=2, stride=2), - LayerDesc( - ReshapeHelp, shape=[-1, 256]), - LayerDesc(nn.Linear, 256, self.num_classes), # classifier - ] - super().__init__( - layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs) - model = AlexNetPipeDesc(num_stages=pipeline_parallel_size, topology=hcg._topo) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> import paddle.nn as nn + >>> import paddle.nn.functional as F + >>> from paddle.distributed import fleet + >>> from paddle.distributed.fleet.meta_parallel import LayerDesc, PipelineLayer + + >>> pipeline_parallel_size = 2 + >>> strategy = fleet.DistributedStrategy() + >>> strategy.hybrid_configs = { + ... "dp_degree": 1, + ... "mp_degree": 1, + ... "pp_degree": pipeline_parallel_size + >>> } + >>> strategy.pipeline_configs = { + ... "accumulate_steps": 4, + ... "micro_batch_size": 2 + >>> } + + >>> fleet.init(is_collective=True, strategy=strategy) + + >>> hcg = fleet.get_hybrid_communicate_group() + + >>> class ReshapeHelp(nn.Layer): + ... def __init__(self, shape): + ... super().__init__() + ... self.shape = shape + ... def forward(self, x): + ... return x.reshape(shape=self.shape) + + >>> class AlexNetPipeDesc(PipelineLayer): + ... def __init__(self, num_classes=10, **kwargs): + ... self.num_classes = num_classes + ... decs = [ + ... LayerDesc( + ... nn.Conv2D, 1, 64, kernel_size=11, stride=4, padding=5), + ... LayerDesc(nn.ReLU), + ... LayerDesc( + ... nn.MaxPool2D, kernel_size=2, stride=2), + ... LayerDesc( + ... nn.Conv2D, 64, 192, kernel_size=5, padding=2), + ... F.relu, + ... LayerDesc( + ... nn.MaxPool2D, kernel_size=2, stride=2), + ... LayerDesc( + ... nn.Conv2D, 192, 384, kernel_size=3, padding=1), + ... F.relu, + ... LayerDesc( + ... nn.Conv2D, 384, 256, kernel_size=3, padding=1), + ... F.relu, + ... LayerDesc( + ... nn.Conv2D, 256, 256, kernel_size=3, padding=1), + ... F.relu, + ... LayerDesc( + ... nn.MaxPool2D, kernel_size=2, stride=2), + ... LayerDesc( + ... ReshapeHelp, shape=[-1, 256]), + ... LayerDesc(nn.Linear, 256, self.num_classes), # classifier + ... ] + ... super().__init__( + ... layers=decs, loss_fn=nn.CrossEntropyLoss(), **kwargs) + + >>> model = AlexNetPipeDesc(num_stages=pipeline_parallel_size, topology=hcg._topo) """ diff --git a/python/paddle/distributed/fleet/metrics/metric.py b/python/paddle/distributed/fleet/metrics/metric.py index 0d744d17cdd4a..746cb4d817aa1 100644 --- a/python/paddle/distributed/fleet/metrics/metric.py +++ b/python/paddle/distributed/fleet/metrics/metric.py @@ -37,16 +37,17 @@ def sum(input, scope=None, util=None): Example: .. code-block:: python - # in model.py - input = paddle.cast(some_input, dtype='float32') - cnt = paddle.sum(input) - global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) - tmp = paddle.add(cnt, global_cnt) - paddle.assign(tmp, global_cnt) - - # in train.py, after train or infer - res = np.array(scope.find_var(global_cnt.name).get_tensor()) - print("sum array: ", paddle.distributed.fleet.sum(res)) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> input = paddle.cast(some_input, dtype='float32') + >>> cnt = paddle.sum(input) + >>> global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) + >>> tmp = paddle.add(cnt, global_cnt) + >>> paddle.assign(tmp, global_cnt) + + >>> # in train.py, after train or infer + >>> res = np.array(scope.find_var(global_cnt.name).get_tensor()) + >>> print("sum array: ", paddle.distributed.fleet.sum(res)) """ if scope is None: scope = paddle.static.global_scope() @@ -77,16 +78,17 @@ def max(input, scope=None, util=None): Example: .. code-block:: python - # in model.py - input = paddle.cast(some_input, dtype='float32') - cnt = paddle.sum(input) - global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) - tmp = paddle.maximum(cnt, global_cnt) - paddle.assign(tmp, global_cnt) - - # in train.py, after train or infer - res = np.array(scope.find_var(global_cnt.name).get_tensor()) - print("max array: ", paddle.distributed.fleet.max(res)) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> input = paddle.cast(some_input, dtype='float32') + >>> cnt = paddle.sum(input) + >>> global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) + >>> tmp = paddle.maximum(cnt, global_cnt) + >>> paddle.assign(tmp, global_cnt) + + >>> # in train.py, after train or infer + >>> res = np.array(scope.find_var(global_cnt.name).get_tensor()) + >>> print("max array: ", paddle.distributed.fleet.max(res)) """ if scope is None: scope = paddle.static.global_scope() @@ -117,16 +119,17 @@ def min(input, scope=None, util=None): Example: .. code-block:: python - # in model.py - input = paddle.cast(some_input, dtype='float32') - cnt = paddle.sum(input) - global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) - tmp = paddle.minimum(cnt, global_cnt) - paddle.assign(tmp, global_cnt) - - # in train.py, after train or infer - res = np.array(scope.find_var(global_cnt.name).get_tensor()) - print("min array: ", paddle.distributed.fleet.min(res)) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> input = paddle.cast(some_input, dtype='float32') + >>> cnt = paddle.sum(input) + >>> global_cnt = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[], value=0) + >>> tmp = paddle.minimum(cnt, global_cnt) + >>> paddle.assign(tmp, global_cnt) + + >>> # in train.py, after train or infer + >>> res = np.array(scope.find_var(global_cnt.name).get_tensor()) + >>> print("min array: ", paddle.distributed.fleet.min(res)) """ if scope is None: scope = paddle.static.global_scope() @@ -158,17 +161,18 @@ def auc(stat_pos, stat_neg, scope=None, util=None): Example: .. code-block:: python - # in model.py - similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(output, min=-15.0, max=15.0)) - binary_predict = paddle.concat( - input=[paddle.subtract(paddle.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1) - self.auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] = - paddle.static.auc(input=binary_predict, label=label, curve='ROC', num_thresholds=4096) - - # in train.py, after train or infer - pos = np.array(scope.find_var(stat_pos.name).get_tensor()) - neg = np.array(scope.find_var(stat_neg.name).get_tensor()) - print("auc: ", paddle.distributed.fleet.auc(pos, neg)) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> similarity_norm = paddle.nn.functional.sigmoid(paddle.clip(output, min=-15.0, max=15.0)) + >>> binary_predict = paddle.concat( + ... input=[paddle.subtract(paddle.ceil(similarity_norm), similarity_norm), similarity_norm], axis=1) + >>> self.auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, stat_neg] = + ... paddle.static.auc(input=binary_predict, label=label, curve='ROC', num_thresholds=4096) + + >>> # in train.py, after train or infer + >>> pos = np.array(scope.find_var(stat_pos.name).get_tensor()) + >>> neg = np.array(scope.find_var(stat_neg.name).get_tensor()) + >>> print("auc: ", paddle.distributed.fleet.auc(pos, neg)) """ if scope is None: scope = paddle.static.global_scope() @@ -241,12 +245,13 @@ def mae(abserr, total_ins_num, scope=None, util=None): Example: .. code-block:: python - # in model.py - sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) - # in train.py, after train or infer - res = np.array(scope.find_var(abserr.name).get_tensor()) - print("mae: ", paddle.distributed.fleet.mae(res, total_ins_num)) + >>> # in train.py, after train or infer + >>> res = np.array(scope.find_var(abserr.name).get_tensor()) + >>> print("mae: ", paddle.distributed.fleet.mae(res, total_ins_num)) """ if scope is None: scope = paddle.static.global_scope() @@ -291,12 +296,13 @@ def rmse(sqrerr, total_ins_num, scope=None, util=None): Example: .. code-block:: python - # in model.py - sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) - # in train.py, after train or infer - res = np.array(scope.find_var(sqrerr.name).get_tensor()) - print("rmse: ", paddle.distributed.fleet.rmse(res, total_ins_num)) + >>> # in train.py, after train or infer + >>> res = np.array(scope.find_var(sqrerr.name).get_tensor()) + >>> print("rmse: ", paddle.distributed.fleet.rmse(res, total_ins_num)) """ if scope is None: scope = paddle.static.global_scope() @@ -341,12 +347,13 @@ def mse(sqrerr, total_ins_num, scope=None, util=None): Example: .. code-block:: python - # in model.py - sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> sqrerr, abserr, prob, q, pos, total = paddle.static.ctr_metric_bundle(similarity_norm, paddle.cast(x=label, dtype='float32')) - # in train.py, after train or infer - metric = np.array(scope.find_var(sqrerr.name).get_tensor()) - print("mse: ", paddle.distributed.fleet.mse(metric, total_ins_num)) + >>> # in train.py, after train or infer + >>> metric = np.array(scope.find_var(sqrerr.name).get_tensor()) + >>> print("mse: ", paddle.distributed.fleet.mse(metric, total_ins_num)) """ if scope is None: scope = paddle.static.global_scope() @@ -390,23 +397,24 @@ def acc(correct, total, scope=None, util=None): Example: .. code-block:: python - # in model.py - correct = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) - total = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) - acc = paddle.metric.accuracy(predict, label, k=1, correct=correct, total=total) + >>> # doctest: +REQUIRES(env:DISTRIBUTED) + >>> # in model.py + >>> correct = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) + >>> total = paddle.static.create_global_var(dtype='float32', shape=[1], value=0) + >>> acc = paddle.metric.accuracy(predict, label, k=1, correct=correct, total=total) - global_correct = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) - tmp1 = paddle.minimum(correct, global_correct) - paddle.assign(tmp1, global_correct) + >>> global_correct = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) + >>> tmp1 = paddle.minimum(correct, global_correct) + >>> paddle.assign(tmp1, global_correct) - global_total = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) - tmp2 = paddle.minimum(total, global_total) - paddle.assign(tmp2, global_total) + >>> global_total = paddle.static.create_global_var(persistable=True, dtype='float32', shape=[1], value=0) + >>> tmp2 = paddle.minimum(total, global_total) + >>> paddle.assign(tmp2, global_total) - # in train.py, after train or infer - correct_num = np.array(scope.find_var(correct.name).get_tensor()) - total_num = np.array(scope.find_var(total.name).get_tensor()) - print("accuracy: ", paddle.distributed.fleet.acc(correct_num, total_num)) + >>> # in train.py, after train or infer + >>> correct_num = np.array(scope.find_var(correct.name).get_tensor()) + >>> total_num = np.array(scope.find_var(total.name).get_tensor()) + >>> print("accuracy: ", paddle.distributed.fleet.acc(correct_num, total_num)) """ if scope is None: scope = paddle.static.global_scope()