diff --git a/RFC-0001-economic-dataloader.md b/RFC-0001-economic-dataloader.md new file mode 100644 index 0000000..61c7336 --- /dev/null +++ b/RFC-0001-economic-dataloader.md @@ -0,0 +1,120 @@ +# Economic DataLoader: Multiprocessing Pipeline Design Suggestion + +**Authors:** +* @yoadbs + +## **Summary** +A new dataloader multiprocessing pipeline design is suggested. This pipeline splits the task of batch generation, into 2 types of workers:\ +item generating workers (by calling `dataset.__getitem__`), and batch generating workers (by calling `collate_fn`). +This pipeline is designated to significantly reduce random-access-memory (RAM) usage, without any significant reduction in throughput (TPT). + +## **Motivation** +In several applications, the input batch of a PyTorch model may require large amounts of RAM. Such applications may include video processing, 3D graphics, etc. + +By current dataloader multiprocessing pipeline design, workers simultaneously prepare batches and send them into shared memory, by a queue. +In practice, about _num_workers_ prepared batches are simultaneously stored in shared memory, nearly after epoch start. +At most, (_num_workers_ * _prefetch_factor_) prepared batches may be simultaneously stored in shared memory. +The main process operates in parallel to the workers, to extract one batch after another, from shared memory, and inject it into the model for training/validation/test. + +Simultaneously storing about _num_workers_ batches in shared memory, imposes a limit over _num_workers_:\ +_num_workers_ < (_total_available_ram_in_bytes_ / _batch_size_in_bytes_) \ +This limitation can produce a bottleneck over training TPT, not allowing to increase num_workers, due to server's RAM limitations. +Alternatively, to increase num_workers, a sever with more available RAM is required, increasing sever cost. + +A new dataloader multiprocessing pipeline design is suggested. In this pipeline, there are two types of workers: +item generating workers (by calling `dataset.__getitem__`), and batch generating workers (by calling `collate_fn`). +This design allows to simultaneously process only up to _prefetch_factor_ batches by all the workers together. +The decoupling of number of processed batches from _num_workers_, allows to increase _num_workers_, without any significant increase in shared memory consumption. +As in current implementation, the workers continuously generate items during epoch, and are not expected to enter idle state. Hence no TPT reduction is expected. + +Another smaller advantage is that in the proposed implementation, the first batch in each epoch is generated by multiple workers, while in current implementation it is generated by a single worker. +Hence, epoch can potentially start faster. + +The new flow is introducing only minor modifications in dataloader interface, making the transition almost transparent to the user. + +## **Proposed Implementation** + +### **Definitions** + +| symbol | description | +|-----------------------|:---------------------------------------------------------------------------------------------------------------------------| +| _index_queue_ | A queue to send items indices and metadata from main process to item_worker. There is a separate queue to each item_worker | +| _item_queue_ | A queue to send items from item_workers to batch_worker. There is a separate queue to each batch_worker | +| _worker_result_queue_ | A queue to send prepared batches from batch_workers to main process | +| _item_idx_ | Item serial index in epoch (0 for first item, 1 for next item, etc.) | +| _item_idx_in_batch_ | Item serial index in batch | +| _batch_idx_ | Batch serial index in epoch (0 for first batch, 1 for next batch, etc.) | +| _item_index_ | Item's dataset index, as in `dataset.__getitem__(index=item_index)` | +| _iw_idx_ | Item_worker index {0, 1, ..., _num_workers_ - 1} | +| _bw_idx_ | Batch_worker index {0, 1, ..., _num_batch_workers_ - 1} | +| _batch_size_ | batch size (may be smaller for last batch in epoch) | + +### **High Level Description** + +By the current multiprocessing pipeline, a single level of workers is used. +The main process sends _prefetch_factor_ batches to each worker, by _index_queue_. +Each worker prepares one batch at a time, and sends it back to the main process by _worker_result_queue_. +After a batch is retrieved by the main process, another batch is sent. + +In the suggested pipeline, there are 2 levels of workers: +* Item_worker - Generate one item at a time (by running `dataset.__getitem__`), and send it to a designated batch_worker, by _item_queue_ + * The item_worker is similar to the workers in the current design, but it receives and sends one item at a time (and not one batch at a time) +* Batch_worker - Retrive items from _item_queue_, prepare batches by running `collate_fn`, and send them back to the main process by _worker_result_queue_ + +Current design dataflow: main_process -> workers -> main_process + +Suggested design dataflow: main_process -> item_workers -> batch_workers -> main_process + +#### **Main Process Flow** +* Retrieve and store prepared batches from _worker_result_queue_ + * Track number of items at work (workload) by each worker. Make sure to reduce workload counter for the relevant batch_worker, and for each of the relevant item_workers, when retrieving the batch +* Send batches of items for preparation to index_queues, one batch at a time + * Each item should include the following metadata: (_item_idx_in_batch_, _batch_idx_, _item_index_, _iw_idx_, _bw_idx_, _batch_size_): + * A possibly different item_worker should be assigned to each item + * Select iw_idx by the item_worker with the minimal workload + * The same batch_worker should be assigned to all items in the same batch + * Select bw_idx by the batch_worker with the minimal workload + * Make sure that the sum of item_workers workload is always <= (_prefetch_factor_ * _batch_size_). Stop sending batches when reaching this limit + * Make sure to increase workload counter for the relevant batch_worker, and for each of the relevant item_workers, when sending the batch of items +* Once the next required batch is available (by _batch_idx_), return batch to caller function + +#### **Item_worker Flow** +* Get item metadata from _index_queue_ +* Generate item, by running `dataset.__getitem__(item_index)` +* Send item to the appropriate _item_queue_ (by item's bw_idx) + +#### **Batch_worker Flow** +* Get one item at a time from _item_queue_ and collect them into batches, by item's metadata (batch_idx, item_idx_in_batch, and batch_size) +* Once all items of a given batch are received, run collate_fn and send the prepared batch to _worker_result_queue_ + +#### **New Parameters** +The following dataloader input parameters were modified / added: + +| name | description | +|------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------| +| _num_workers_ (modified) | Number of item_workers. Setting it to 0 disables multiprocessing (as today). There is no benefit in increasing it beyond (_prefetch_factor_ * _batch_size_) | +| | | +| _prefetch_factor_ (modified) | Number of batches simultaneously sent for processing by all workers (2 by default) | +| _num_batch_workers_ (new) | Number of batch_workers (defaults to _prefetch_factor_). There is no benefit in increasing it beyond _prefetch_factor_ | + +## **Metrics** +The suggested flow should require significantly less shared memory, while preserving TPT, using similar configurations. \ +To monitor shared memory usage, type in Linux server terminal: \ +$ monitor -n0.1 df -h \ +and review /dev/shm "used" column. + +## **Drawbacks** +* Additional layer of batch_workers is required, somewhat increasing flow complexity +* CPU usage is somewhat higher in the suggested flow, due to the additional _num_batch_workers_ processes +* The user should be aware that if `collate_fn` is very slow and becomes a bottleneck, an increase in _prefetch_factor_ should be considered + + +## **How We Teach This** +* Update Dataloader documentation to include the description of the suggested pipeline +* Add/update description of the new/modified parameters + + + + + +