Parallelizing Deep Learning Frameworks with Horovod

A. How to Use Horovod in TensorFlow

❍ When utilizing multiple GPUs across multiple nodes, Horovod can be integrated with TensorFlow for parallelization. By adding code for Horovod as shown in the example below, it can be integrated with TensorFlow. Both TensorFlow and the Keras API within TensorFlow can be integrated with Horovod. First, we will introduce how to integrate Horovod with TensorFlow.(Example: MNIST Dataset and LeNet-5 CNN structure)

※ For detailed instructions on using Horovod with TensorFlow, refer to the Horovod official guide (https://github.com/horovod/horovod#usage).

  • Import Horovod and initialize Horovod in the main function for use with TensorFlow.

 import horovod.tensorflow as hvd
 ...
 hvd.init()

※ horovod.tensorflow: Module for integrating Horovod with TensorFlow

※ Initialize Horovod to enable its use.

  • Set the dataset to use Horovod in the main function

 (x_train, y_train), (x_test, y_test) = \
 keras.datasets.mnist.load_data('MNIST-data-%d' % hvd.rank())

※ Set and create the dataset based on the Horovod rank to assign datasets to each task.

  • Apply Horovod-related settings to the optimizer in the main function, and configure broadcasting and the number of training steps.

 opt = tf.train.AdamOptimizer(0.001 * hvd.size())
 opt = hvd.DistributedOptimizer(opt)
 global_step = tf.train.get_or_create_global_step()
 train_op = opt.minimize(loss, global_step=global_step)
 hooks = [hvd.BroadcastGlobalVariablesHook(0),
 tf.train.StopAtStepHook(last_step=20000 // hvd.size()), ... ]

※ Apply Horovod-related settings to the optimizer and use broadcasting to distribute these settings to each task.

※ Set the training steps for each task according to the number of Horovod tasks.

  • Assign GPU devices based on the Horovod process rank

 config = tf.ConfigProto()
 config.gpu_options.allow_growth = True
 config.gpu_options.visible_device_list = str(hvd.local_rank())

※ Allocate one task per GPU according to Horovod's local rank

  • Set the checkpoint on the Rank 0 task

 checkpoint_dir = './checkpoints' if hvd.rank() == 0 else None
 ...
 with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
 hooks=hooks,
 config=config) as mon_sess:

※ Since checkpoint saving and loading should be performed by a single process, configure it on rank 0

B. Using Horovod with Keras

In TensorFlow, Horovod can be integrated with the Keras API for parallelization. By adding the code for Horovod as shown in the example below, it can be integrated with Keras. (Example: MNIST Dataset and LeNet-5 CNN structure)

※ For detailed instructions on using Horovod with Keras, refer to the Horovod official guide. (https://github.com/horovod/horovod/blob/master/docs/keras.rst)

  • Import Horovod and initialize it in the main function for use with Keras.

 import horovod.tensorflow.keras as hvd
 ...
 hvd.init()

※ horovod.tensorflow.keras: Module for integrating Horovod with Keras in TensorFlow

※ Initialize Horovod to enable its use.

  • Assign GPU devices based on the Horovod process rank

 config = tf.ConfigProto()
 config.gpu_options.allow_growth = True
 config.gpu_options.visible_device_list = str(hvd.local_rank())

※ Allocate a single job for each GPU according to the Horovod local rank.

  • Apply Horovod-related settings to the optimizer in the main function, and configure broadcasting and the number of training steps.

 epochs = int(math.ceil(12.0 / hvd.size()))
 ...
 opt = keras.optimizers.Adadelta(1.0 * hvd.size())
 opt = hvd.DistributedOptimizer(opt)
 callbacks = [ hvd.callbacks.BroadcastGlobalVariablesCallback(0), ]

※ Set the training steps for each task according to the number of Horovod tasks.

※ Apply Horovod-related settings to the optimizer and use broadcasting to distribute these settings to each task.

  • Set the checkpoint on the Rank 0 task

if hvd.rank() == 0:
     callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

※ Since checkpoint saving and loading should be performed by a single process, configure it on rank 0

  • Assign GPU devices based on the Horovod process rank

model.fit(x_train, y_train, batch_size=batch_size, callbacks=callbacks, epochs=epochs, verbose=1 if hvd.rank() == 0 else 0, validation_data=(x_test, y_test))

※ To ensure that training output is displayed only by the Rank 0 task, set the verbose value to 1 only for the Rank 0 task.

C. Using Horovod with PyTorch

When utilizing multiple GPUs across multiple nodes, Horovod can be integrated with PyTorch for parallelization. By adding the code for Horovod as shown in the example below, it can be integrated with PyTorch. (Example: MNIST Dataset and LeNet-5 CNN structure)

※ For detailed instructions on using Horovod with PyTorch, refer to the Horovod official guide. (https://github.com/horovod/horovod/blob/master/docs/pytorch.rst)

  • Import Horovod and initialize it in the main function for use with PyTorch, then configure the settings.

 import torch.utils.data.distributed
 import horovod.torch as hvd
 ...
 hvd.init()
 if args.cuda:
     torch.cuda.set_device(hvd.local_rank())
     torch.set_num_threads(1)

※ torch.utils.data.distributed: Module for performing distributed training in PyTorch

※ horovod.torch: Module for integrating Horovod with PyTorch

※ Initialize Horovod and configure the device to be used according to the rank set during initialization.

※ Use torch.set_num_threads(1) to assign one CPU thread per task.

  • Add Horovod-related content to the training process.

 def train(args, model, device, train_loader, optimizer, epoch):
 ...
 train_sampler.set_epoch(epoch)
 ...
     if batch_idx % args.log_interval == 0:
         print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                 epoch, batch_idx * len(data), len(train_sampler),
                 100.* batch_idx / len(train_loader), loss.item()))

※ train_sampler.set_epoch(epoch): Sets the epoch for the train sampler.

※ Since the training dataset is divided among multiple tasks, use len(train_sampler) to check the total dataset size.

  • Calculate the average value using Horovod.

 def metric_average(val, name):
 tensor = torch.tensor(val)
 avg_tensor = hvd.allreduce(tensor, name=name)
 return avg_tensor.item()

※ To calculate the average across multiple nodes, use Horovod's Allreduce communication.

  • ❍ Add Horovod-related content to the testing process.

 test_loss /= len(test_sampler)
 test_accuracy /= len(test_sampler)
 test_loss = metric_average(test_loss, 'avg_loss')
 test_accuracy = metric_average(test_accuracy, 'avg_accuracy')
 if hvd.rank() == 0:
     print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
            test_loss, 100. * test_accuracy))

※ Since the average needs to be calculated across multiple nodes, use the metric_average function declared above.

※ After performing Allreduce communication across nodes, each node has the same calculated values for loss and accuracy, so the print function is executed on rank 0.

  • Set the dataset in the main function for use with Horovod

 train_dataset = datasets.MNIST('data-%d' % hvd.rank(), train=True, download=True,
 transform=transforms.Compose([transforms.ToTensor(),
 transforms.Normalize((0.1307,), (0.3081,)) ]))
 train_sampler = torch.utils.data.distributed.DistributedSampler(
 train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
 train_loader = torch.utils.data.DataLoader(
 train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
 test_dataset = datasets.MNIST('data-%d' % hvd.rank(), train=False, transform=transforms.Compose([
 transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,)) ]))
 test_sampler = torch.utils.data.distributed.DistributedSampler(
 test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
 test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
 sampler=test_sampler, **kwargs)

※ Set and create the dataset based on the Horovod rank to assign datasets to each task.

※ Set up PyTorch's distributed sampler and assign it to the data loader.

  • Apply Horovod-related settings to the optimizer in the main function and add the sampler to the training and testing processes.

 optimizer = optim.SGD(model.parameters(), lr=args.lr * hvd.size(), momentum=args.momentum)
 hvd.broadcast_parameters(model.state_dict(), root_rank=0)
 hvd.broadcast_optimizer_state(optimizer, root_rank=0)
 optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
 for epoch in range(1, args.epochs + 1):
     train(args, model, train_loader, optimizer, epoch, train_sampler)
     test(args, model, test_loader, test_sampler)

※ Apply Horovod-related settings to the optimizer and use broadcasting to distribute these settings to each task.

※ Add the sampler to the training and testing processes and pass it to each function.

Last updated on November 11, 2024.

Last updated