torchtraining.accelerators.horovod module¶
This module allows user to train networks in distributed manner using horovod
Note
IMPORTANT: This module is experimental and may not be working correctly. Use at your own risk and report any issues you find.
Note
IMPORTANT: This module needs horovod
Python package to be visible.
You can install it with pip install -U torchtraining[horovod]
.
Also you should export CUDA_HOME
variable like this:
CUDA_HOME=/opt/cuda pip install -U torchtraining[horovod]
(your path may vary)
See Horovod documentation for details about the framework (installation, capabilities etc.).
Example:
import torchtraining as tt
import torchtraining.accumulators.horovod as horovod
class TrainStep(tt.steps.Train):
def forward(self, module, sample):
# Dummy step
images, labels = sample
return loss
model = ...
criterion = ...
dataset = ...
optimizer = ...
writer = ...
# Accelerate!
accelerator = tt.accelerators.Horovod(model, optimize.optimizer)
# Distributed optimization with gradient accumulation
optimizer = horovod.optimizer(optimizer, module.named_parameters())
# Special distributed DataLoader
dataloader = horovod.DataLoader(dataset, batch_size=64)
step = (
TrainStep(criterion, device)
** tt.pytorch.ZeroGrad()
** tt.pytorch.Backward()
** tt.pytorch.Optimize(optimizer)
)
iteration = (
** tt.iterations.TrainIteration(step, model, dataloader)
** horovod.AllReduce()
** tt.accumulators.Mean()
** horovod.OnRank(tt.callbacks.Tensorboard(writer, "Loss"))
)
Specific operations
integrated by torchtraining
below.
-
class
torchtraining.accelerators.horovod.
AllGather
(name: str = None)[source]¶ Concatenate input tensors from all processes.
Tensor after concatenation will be available to all processes. Concatenation is done over
0
th dimension, so it’s the only dimension in whichtorch.Tensor
on different processes is allowed to be different.If
data
requires gradient you can backpropagate through this operation.- Parameters
name (str, optional) – Name of the reduction operator. If not provided it will be generated automatically. Default:
None
(automatic generation)- Returns
Tensor with the same shape as
data
except0
dimension (which will be larger as it’s concatenation of data from all processes).- Return type
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be gathered across all processes.
-
class
torchtraining.accelerators.horovod.
AllReduce
(reduction: str = 'mean', compression='none', name=None)[source]¶ Perform reduction of the input tensor over all the processes.
If
data
requires gradient you can backpropagate through this operation.- Parameters
reduction (str, optional) – The reduction operation to use when combining gradients across different processes. Can be one of: [“mean”, “sum”] being respectively: [hvd.mpi_ops.Average, hvd.mpi_ops.Sum]. Default: “mean”
compression (str, optional) – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Can be one of “none” or “fp16”. Default: “none”
name (str, optional) – Name of the reduction operator. If not provided it will be generated automatically. Default:
None
(automatic generation)
- Returns
Tensor with the same shape as
data
averaged (reduction="mean"
) or summed (reduction="sum"
) across all processes.- Return type
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be reduced
-
class
torchtraining.accelerators.horovod.
AsyncAllGather
(name=None)[source]¶ Asynchronously concatenate input tensors from all processes.
Tensor after concatenation will be available to all processes. Concatenation is done over
0`th dimension, so it's the only dimension in which `torch.Tensor
on different processes is allowed to be different.- Parameters
name (str, optional) – Name of the reduction operator. If not provided it will be generated automatically. Default:
None
(automatic generation)- Returns
Handle to be used with
tt.accelerators.horovod.Synchronize()
- Return type
Handle
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be gathered across all processes.
-
class
torchtraining.accelerators.horovod.
AsyncAllReduce
(reduction: str = 'mean', compression='none', name=None)[source]¶ Perform asynchronous reduction of the input tensor over all the processes.
User should pipe this object into
tt.accelerators.horovod.Synchronize()
in order to get value.- Parameters
reduction (str, optional) – The reduction operation to use when combining gradients across different processes. Can be one of: [“mean”, “sum”] being respectively: [hvd.mpi_ops.Average, hvd.mpi_ops.Sum]. Default: “mean”
compression (str, optional) – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Can be one of “none” or “fp16”. Default: “none”
name (str, optional) – Name of the reduction operator. If not provided it will be generated automatically. Default:
None
(automatic generation)
- Returns
Handle to be used with
tt.accelerators.horovod.Synchronize()
- Return type
Handle
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be reduced across all processes.
-
class
torchtraining.accelerators.horovod.
AsyncBroadcast
(rank: int = 0, name=None)[source]¶ Asynchronously broadcast tensor from
rank
process to all other processes.- Parameters
- Returns
Handle to be used with
tt.accelerators.horovod.Synchronize()
- Return type
Handle
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be broadcasted across all processes.
-
class
torchtraining.accelerators.horovod.
Broadcast
(rank: int = 0, name=None)[source]¶ Broadcast tensor from
rank
process to all other processes.If
data
requires gradient you can backpropagate through this operation.- Parameters
- Returns
Tensor with the same shape as
data
with broadcasted values.- Return type
-
forward
(data)[source]¶ - Parameters
data (torch.Tensor) – Tensor to be broadcasted across all processes.
-
class
torchtraining.accelerators.horovod.
DataLoader
(dataset, batch_size=1, shuffle=False, num_workers=0, collate_fn=None, pin_memory=False, drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None, generator=None, sampler_seed=0)[source]¶ PyTorch
torch.utils.data.DataLoader
suited forhorovod
integration.Works exactly like it’s PyTorch counterpart but creates appropriate
torch.utils.data.DistributedSampler
under the hood (hence users cannot specifysampler
orbatch_sampler
).- dataset: torch.utils.data.Dataset
Dataset from which to load the data.
- batch_size: int, optional
How many samples per batch to load. Default:
1
- shuffle: bool, optional
Set to
True
to have the data reshuffled at every epoch. Default:False
- num_workers: int, optional
How many subprocesses to use for data loading.
0
means that the data will be loaded in the main process. Default:0
- collate_fn Callable, optional
Merges a list of samples to form a mini-batch of Tensor(s). Used when using batched loading from a map-style dataset. Default:
None
(default PyTorch collation)- pin_memory: bool, optional
If
True
, the data loader will copytorch.Tensors
into CUDA pinned memory before returning them. Default:False
- drop_last: bool, optional
Set to
True
to drop the last incomplete batch, if the dataset size is not divisible by the batch size. IfFalse
and the size of dataset is not divisible by the batch size, then the last batch will be smaller. Default:False
- timeout: Numeric, optional
If positive, the timeout value for collecting a batch from workers. Should always be non-negative. Default:
0
- worker_init_fn: Callable, optional
If not
None
, this will be called on each worker subprocess with the worker id (an int in[0, num_workers - 1]
) as input, after seeding and before data loading. Default:None
-
class
torchtraining.accelerators.horovod.
OnRank
(operation: torchtraining._base.Operation, rank: int = 0)[source]¶ Run any operation only if it runs in specified process (specified rank).
Otherwise return unchanged
data
.- Parameters
operation (tt.Operation) – Operation to run (
callbacks
,metrics
and whatever else you want).rank (int, optional) – Rank (process) on which the operation will be run. Default:
0
(main process)
- Returns
If run in specified process, return
operation(data)
. Otherwise forward data without changes.- Return type
data | operation(data)
-
class
torchtraining.accelerators.horovod.
Synchronize
[source]¶ Asynchronously broadcast tensor from
rank
process to all other processes.- Returns
Value of the previous asynchronous operation after synchronization. Whatever it should return.
- Return type
-
forward
(handle)[source]¶ - Parameters
handle (Handle) – Handle returned by an
AsyncAllReduce
,AsyncAllGather`or `AsyncBroadcast
which will be used to retrievetorch.Tensor
.
-
torchtraining.accelerators.horovod.
load
(f, rank: int = 0, map_location=None, pickle_module=<module 'pickle' from '/home/vyz/.conda/envs/torchtrain-docs/lib/python3.6/pickle.py'>, **pickle_load_args)[source]¶ Load object saved with
torch.save
in a single process and distribute to all other processes.Useful when loading saved
torch.nn.Module
(or othertorch
objects likeoptimizer
), which is saved on a single machine.It can be easily distributed to other processes this way.
If you wish to
torch.save
on a single process you can create an object like this:save = tt.accelerators.horovod.OnRank(torch.save) save(your_module)
- Parameters
f (file-like) – A file-like object (has to implement
read()
, :meth`readline`, :meth`tell`, and :meth`seek`) or a string oros.PathLike
object containing a file name.rank (int, optional) – Process rank on which data will be loaded.
map_location (Callable |
torch.device
| string | dict, optional) – Specifies how to remap storage locations. Default:None
pickle_module (module, optional) – Module used for unpickling metadata and objects, (has to match the
pickle_module
used to serialize file). Default:pickle
**pickle_load_args – optional keyword arguments passed over to
pickle_module.load()
andpickle_module.Unpickler()
, e.g.,errors=...
.
- Returns
Anything you saved with
torch.save
really- Return type
torch.Tensor | torch.nn.Module | Any
-
torchtraining.accelerators.horovod.
optimizer
(optimizer, named_parameters, reduction: str = 'sum', compression: str = 'none', accumulate: int = 1, rank: int = 0)[source]¶ Create Horovod compatible optimizer.
State of optimizer will be distributed on specified
rank
. Should be used aftertorchtraining.accelerators.Horovod
object was created.- Parameters
optimizer (torch.optim.Optimizer) – Instance of optimizer-like object with interface aligned with
torch.optim.Optimizer
.named_parameters (torch.nn.parameter.Parameter) – A mapping between parameter names and values. Used for naming of allreduce operations. Typically just
model.named_parameters()
.reduction (str, optional) – The reduction operation to use when combining gradients across different processes. Can be one of: [“mean”, “sum”] being respectively: [hvd.mpi_ops.Average, hvd.mpi_ops.Sum]. Default: “mean”
compression (str, optional) – Compression algorithm used during allreduce to reduce the amount of data sent during the each parameter update step. Can be one of “none” or “fp16”. Default: “none”
accumulate (int, optional) – Divide loss by
accumulate
if gradient accumulation is used. This approach averages gradient from multiple batches. Default:1
(no accumulation)rank (int, optional) – Rank from which optimizer’s state will be broadcasted. Default:
0
- Returns
Instance of optimizer but distributed across workers.
- Return type
horovod.torch.DistributedOptimizer