Using AI with Multiple Nodes
On the Neuron system, distributed learning methods are introduced for dividing the computations required for deep learning training across dozens of GPUs using multiple nodes, with the results aggregated over a high-speed network.
A. HOROVOD
Horovod uses a common standard MPI model for message passing and communication management in high-performance distributed computing environments. Horovod's MPI implementation offers a simplified programming model compared to the standard TensorFlow distributed training model.
1. HOROVOD (TensorFlow) Installation and Verification
1) Installation method
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1 cmake/3.16.9
$ conda create -n my_tensorflow
$ source activate my_tensorflow
(my_tensorflow) $ conda install tensorflow-gpu=2.0.0 tensorboard=2.0.0 \
tensorflow-estimator=2.0.0 python=3.7 cudnn cudatoolkit=10 nccl=2.8.3
(my_tensorflow) $ HOROVOD_WITH_MPI=1 HOROVOD_GPU_OPERATIONS=NCCL \
HOROVOD_NCCL_LINK=SHARED HOROVOD_WITH_TENSORFLOW=1 \
pip install --no-cache-dir horovod==0.23.0
2) Installation verification
(my_tensorflow) $ pip list | grep horovod
horovod 0.23.0
(my_tensorflow) $ python
>>> import horovod
>>> horovod.__version__ '0.23.0‘
(my_tensorflow) $ horovodrun -cb
Horovod v0.23.0:
Available Frameworks:
[X] TensorFlow
[X] PyTorch
[ ] MXNet
Available Controllers:
[X] MPI
[X] Gloo
Available Tensor Operations:
[X] NCCL
[ ] DDL
[ ] CCL
[X] MPI
[X] Gloo
2. HOROVOD (TensorFlow) Execution Example
1) Execution using a job submission script
#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment tensorflow
module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_tensorflow
horovodrun -np 2 python tensorflow2_mnist.py
2) Execution using interactive job submission
$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
--gres=gpu:2 --comment=tensorflow
$ echo $SLURM_NODELIST
gpu[12-13]
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_tensorflow
(my_tensorflow) $ horovodrun -np 4 -H gpu12:2,gpu13:2 python tensorflow2_mnist.py
3. Installing and Verying HOROVOD (PyTorch)
1) Installation method
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1
$ module load python/3.7.1 cmake/3.16.9
$ conda create -n my_pytorch
$ source activate my_pytorch
(my_pytorch) $ conda install pytorch=1.11.0 python=3.9 \
torchvision=0.12.0 torchaudio=0.11.0 cudatoolkit=10.2 -c pytorch
(my_pytorch) $ HOROVOD_WITH_MPI=1 HOROVOD_NCCL_LINK=SHARED \
HOROVOD_GPU_OPERATIONS=NCCL HOROVOD_WITH_PYTORCH=1 \
pip install --no-cache-dir horovod==0.24.0
2) Installation verification
(my_pytorch) $ pip list | grep horovod
horovod 0.24.0
(my_pytorch) $ python
>>> import horovod
>>> horovod.__version__
'0.24.0'
(my_pytorch) $ horovodrun -cb
Horovod v0.24.0:
Available Frameworks:
[ ] TensorFlow
[X] PyTorch
[ ] MXNet
Available Controllers:
[X] MPI
[X] Gloo
Available Tensor Operations:
[X] NCCL
[ ] DDL
[ ] CCL
[X] MPI
[X] Gloo
4. HOROVOD (PyTorch) Execution Example
1) Job submission script example
#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment pytorch
module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_pytorch
horovodrun -np 2 python pytorch_ex.py
2) Execution using interactive job submission
$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
--gres=gpu:2 --comment=pytorch
$ echo $SLURM_NODELIST gpu[22-23]
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_pytorch
(my_pytorch) $ horovodrun -np 4 -H gpu22:2,gpu23:2 python pytorch_ex.py
B. GLOO
GLOO is an open-source collective communication library developed by Facebook, included in Horovod, which supports multi-node jobs using Horovod without requiring the installation of MPI.
The installation of GLOO is dependent on Horovod and is installed alongside Horovod during its installation process.
※ For the Horovod installation method, refer to the Horovod installation and verification section above
1. GLOO Execution Example
1) Job submission script example
#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment tensorflow
module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_tensorflow
horovodrun --gloo -np 4 python tensorflow2_mnist.py
2) Execution using interactive job submission
$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
--gres=gpu:2 --comment=tensorflow
$ echo $SLURM_NODELIST
gpu[12-13]
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_tensorflow
(my_tensorflow) $ horovodrun –np 4 --gloo --network-interface ib0 \
-H gpu12:2,gpu13:2 python tensorflow2_mnist.py
C. Ray
Ray is a Python-based workload that provides parallel execution in a multi-node environment. It can be used with deep learning models like PyTorch using various libraries. For more details, visit the following website : https://docs.ray.io/en/latest/cluster/index.html
1. Ray Installation and Single Node Execution Example
1) Installation method
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ conda create -n my_ray
$ source activate my_ray
(my_ray) $ pip install --no-cache-dir ray
$ export PATH=/home01/${USER}/.local/bin:$PATH
2) Job submission script example
#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 1
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment xxx
module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_ray
python test.py
test.py
import ray
ray.init()
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]
3) Execution using interactive job submission
$ salloc --partition=cas_v100_4 -J debug --nodes=1 -n 4 —time=08:00:00 \
--gres=gpu:2 --comment=xxx
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_ray
(my_tensorflow) $ python test.py
[0, 1, 4, 9]
[1, 1, 1, 1]
2. Ray Cluster Installation and Multi Node Execution Example
When executing Ray in a multi-node setup, it runs with one head node and multiple worker nodes, efficiently scheduling tasks to the allocated resources.
The example created by NERSC can be downloaded from GITHUB (https://github.com/NERSC/slurm-ray-cluster.git) and executed as follows.
1) Installation method
$ git clone https://github.com/NERSC/slurm-ray-cluster.git
$ cd slurm-ray-cluster
[edit submit-ray-cluster.sbatch, start-head.sh, start-worker.sh]
$ sbatch submit-ray-cluster.sbatch
2) Job submission script example
#!/bin/bash
#SBATCH -J ray_test
#SBATCH -p cas_v100_4
#SBATCH --time=00:10:00
### This script works for any number of nodes, Ray will find and manage all resources
#SBATCH --nodes=2
### Give all resources to a single Ray task, ray can manage the resources internally
#SBATCH --ntasks-per-node=4
#SBATCH --comment etc
#SBATCH --gres=gpu:2
# Load modules or your own conda environment here
module load gcc/8.3.0 cuda/10.0 cudampi/openmpi-3.1.0 python/3.7.1
source activate my_ray
export PATH=/home01/${USER} /.local/bin:$PATH
…
#### call your code below
python test.py
exit
start-head.sh
#!/bin/bash
#export LC_ALL=C.UTF-8
#export LANG=C.UTF-8
echo "starting ray head node"
# Launch the head node
ray start --head --node-ip-address=$1 --port=12345 --redis-password=$2
sleep infinity
start-worker.sh
#!/bin/bash
#export LC_ALL=C.UTF-8
#export LANG=C.UTF-8
echo "starting ray worker node"
ray start --address $1 --redis-password=$2
sleep infinity
3) Output
==========================================
SLURM_JOB_ID = 107575
SLURM_NODELIST = gpu[21-22]
==========================================
'cuda/10.0' supports the {CUDA_MPI}.
IP Head: 10.151.0.21:12345
STARTING HEAD at gpu21
starting ray head node
2022-04-27 10:57:38,976 INFO services.py:1092 -- View the Ray dashboard at http://localhost:8265
2022-04-27 10:57:38,599 INFO scripts.py:467 -- Local node IP: 10.151.0.21
2022-04-27 10:57:39,000 SUCC scripts.py:497 -- --------------------
2022-04-27 10:57:39,000 SUCC scripts.py:498 -- Ray runtime started.
2022-04-27 10:57:39,000 SUCC scripts.py:499 -- --------------------
2022-04-27 10:57:39,000 INFO scripts.py:501 -- Next steps
2022-04-27 10:57:39,000 INFO scripts.py:503 -- To connect to this Ray runtime from another node, run
2022-04-27 10:57:39,000 INFO scripts.py:507 -- ray start --address='10.151.0.21:12345' --redis-password='90268c54-9df9-4d98-b363-ed6ed4d61a61'
2022-04-27 10:57:39,000 INFO scripts.py:509 -- Alternatively, use the following Python code:
2022-04-27 10:57:39,001 INFO scripts.py:512 -- import ray
2022-04-27 10:57:39,001 INFO scripts.py:519 -- ray.init(address='auto', _redis_password='90268c54-9df9-4d98-b363-ed6ed4d61a61')
2022-04-27 10:57:39,001 INFO scripts.py:522 -- If connection fails, check your firewall settings and network configuration.
2022-04-27 10:57:39,001 INFO scripts.py:526 -- To terminate the Ray runtime, run
2022-04-27 10:57:39,001 INFO scripts.py:527 -- ray stop
STARTING WORKER 1 at gpu22
starting ray worker node
2022-04-27 10:58:08,922 INFO scripts.py:591 -- Local node IP: 10.151.0.22
2022-04-27 10:58:09,069 SUCC scripts.py:606 -- --------------------
2022-04-27 10:58:09,069 SUCC scripts.py:607 -- Ray runtime started.
2022-04-27 10:58:09,069 SUCC scripts.py:608 -- --------------------
2022-04-27 10:58:09,069 INFO scripts.py:610 -- To terminate the Ray runtime, run
2022-04-27 10:58:09,069 INFO scripts.py:611 -- ray stop
2022-04-27 10:58:13,915 INFO services.py:1092 -- View the Ray dashboard at http://127.0.0.1:8265
[0, 1, 4, 9]
[1, 1, 1, 1]
3. Ray Cluster (PyTorch) Installation and Multi Node Execution Example
1) Installation method
$ pip install --user torch torchvision torchaudio tabulate tensorboardX
[edit submit-ray-cluster.sbatch]
$ sbatch submit-ray-cluster.sbatch
2) Job submission script example
#!/bin/bash
#SBATCH -J ray_test
#SBATCH -p cas_v100_4
#SBATCH --time=00:10:00
### This script works for any number of nodes,
### Ray will find and manage all resources
#SBATCH --nodes=2
### Give all resources to a single Ray task,
### ray can manage the resources internally
#SBATCH --ntasks-per-node=4
#SBATCH --comment etc
#SBATCH --gres=gpu:2
# Load modules or your own conda environment here
module load gcc/8.3.0 cuda/10.0 cudampi/openmpi-3.1.0 python/3.7.1
source activate my_ray
export PATH=/home01/${USER} /.local/bin:$PATH
…
#### call your code below
python examples/mnist_pytorch_trainable.py
exit
mnist_pytorch_trainable.py
from __future__ import print_function
import argparse
import os
import torch
import torch.optim as optim
import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.examples.mnist_pytorch import (train, test, get_data_loaders,
ConvNet)
...
ray.init(address='auto', _node_ip_address=os.environ["ip_head"].split(":")[0], _redis_password=os.environ["redis_password"])
sched = ASHAScheduler(metric="mean_accuracy", mode="max")
analysis = tune.run(TrainMNIST,
scheduler=sched,
stop={"mean_accuracy": 0.99,
"training_iteration": 100},
resources_per_trial={"cpu":10, "gpu": 1},
num_samples=128,
checkpoint_at_end=True,
config={"lr": tune.uniform(0.001, 1.0),
"momentum": tune.uniform(0.1, 0.9),
"use_gpu": True})
print("Best config is:", analysis.get_best_config(metric="mean_accuracy", mode="max"))
3) Output
...
== Status ==
Memory usage on this node: 57.4/377.4 GiB
Using AsyncHyperBand: num_stopped=11
Bracket: Iter 64.000: None | Iter 16.000: 0.9390625000000001 | Iter 4.000: 0.85625 | Iter 1.000: 0.534375
Resources requested: 80/80 CPUs, 8/8 GPUs, 0.0/454.88 GiB heap, 0.0/137.26 GiB objects
Result logdir: /home01/${USER}/ray_results/TrainMNIST_2022-04-27_11-24-20
Number of trials: 20/128 (1 PENDING, 8 RUNNING, 11 TERMINATED)
+------------------------+------------+------------------+-----------+------------+----------+--------+------------------+
| Trial name | status | loc | lr | momentum | acc | iter | total time (s) |
|------------------------+------------+------------------+-----------+------------+----------+--------+------------------|
| TrainMNIST_25518_00000 | RUNNING | 10.151.0.22:4128 | 0.0831396 | 0.691582 | 0.953125 | 26 | 4.98667 |
| TrainMNIST_25518_00001 | RUNNING | 10.151.0.22:4127 | 0.0581841 | 0.685648 | 0.94375 | 25 | 5.08798 |
| TrainMNIST_25518_00013 | RUNNING | | 0.0114732 | 0.386122 | | | |
| TrainMNIST_25518_00014 | RUNNING | | 0.686305 | 0.546706 | | | |
| TrainMNIST_25518_00015 | RUNNING | | 0.442195 | 0.525069 | | | |
| TrainMNIST_25518_00016 | RUNNING | | 0.647866 | 0.397167 | | | |
| TrainMNIST_25518_00017 | RUNNING | | 0.479493 | 0.429876 | | | |
| TrainMNIST_25518_00018 | RUNNING | | 0.341561 | 0.42485 | | | |
| TrainMNIST_25518_00019 | PENDING | | 0.205629 | 0.551851 | | | |
| TrainMNIST_25518_00002 | TERMINATED | | 0.740295 | 0.209155 | 0.078125 | 1 | 0.206633 |
| TrainMNIST_25518_00003 | TERMINATED | | 0.202496 | 0.102844 | 0.853125 | 4 | 1.17559 |
| TrainMNIST_25518_00004 | TERMINATED | | 0.431773 | 0.449912 | 0.85625 | 4 | 0.811173 |
| TrainMNIST_25518_00005 | TERMINATED | | 0.595764 | 0.643525 | 0.121875 | 1 | 0.214556 |
| TrainMNIST_25518_00006 | TERMINATED | | 0.480667 | 0.412854 | 0.728125 | 4 | 0.885571 |
| TrainMNIST_25518_00007 | TERMINATED | | 0.544958 | 0.280743 | 0.15625 | 1 | 0.185517 |
| TrainMNIST_25518_00008 | TERMINATED | | 0.277231 | 0.258283 | 0.48125 | 1 | 0.186344 |
| TrainMNIST_25518_00009 | TERMINATED | | 0.87852 | 0.28864 | 0.10625 | 1 | 0.203304 |
| TrainMNIST_25518_00010 | TERMINATED | | 0.691046 | 0.351471 | 0.103125 | 1 | 0.23274 |
| TrainMNIST_25518_00011 | TERMINATED | | 0.926629 | 0.17118 | 0.121875 | 1 | 0.267205 |
| TrainMNIST_25518_00012 | TERMINATED | | 0.618234 | 0.881444 | 0.046875 | 1 | 0.226228 |
+------------------------+------------+------------------+-----------+------------+----------+--------+------------------+
...
D. Submit it
Submitit is a lightweight tool for submitting Python functions for computation within a Slurm cluster. It primarily organizes submitted jobs and provides access to results, logs, etc.
1. Example (1)
add.py
#!/usr/bin/env python3
import submitit
def add(a, b):
return a + b
# create slurm wrapper object
executor = submitit.AutoExecutor(folder="log_test")
# update slurm parameters
executor.update_parameters(
partition=‘cas_V100_2’,
comment=‘python’ )
# submit job
job = executor.submit(add, 5, 7)
# print job ID
print(job.job_id)
# print result
print(job.result())
$ ./add.py
110651
12
110651_submission.sh
#!/bin/bash
# Parameters
#SBATCH --comment="python"
#SBATCH --error=/scratch/${USER}/2022/02-submitit/test/log_test/%j_0_log.err
#SBATCH --job-name=submitit
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --open-mode=append
#SBATCH --output=/scratch/${USER}/2022/02-submitit/test/log_test/%j_0_log.out
#SBATCH --partition=cas_v100_2
#SBATCH --signal=USR1@90
#SBATCH --wckey=submitit
# command
export SUBMITIT_EXECUTOR=slurm
srun
--output /scratch/${USER}/2022/02-submitit/test/log_test/%j_%t_log.out
--error /scratch/${USER}/2022/02-submitit/test/log_test/%j_%t_log.err
--unbuffered
/scratch/${USER}/.conda/submitit/bin/python3
-u
-m submitit.core._submit
/scratch/${USER}/2022/02-submitit/test/log_test
Python function stored as pickle file
-rw-r--r-- 1 testuser01 tu0000 0 May 4 21:28 log_test/110651_0_log.err
-rw-r--r-- 1 testuser01 tu0000 476 May 4 21:28 log_test/110651_0_log.out
-rw-r--r-- 1 testuser01 tu0000 26 May 4 21:28 log_test/110651_0_result.pkl
-rw-r--r-- 1 testuser01 tu0000 634 May 4 21:28 log_test/110651_submitted.pkl
-rw-r--r-- 1 testuser01 tu0000 735 May 4 21:28 log_test/110651_submission.sh
2. Example (2)
add.py
#!/usr/bin/env python3
def add(a, b):
return a + b
print(add(5, 7))
run.sh
#!/bin/bash
#SBATCH --comment="python"
#SBATCH --job-name=submitit
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --partition=cas_v100_2
./add.py
$ sbatch run.sh
110650
add.py
#!/usr/bin/env python3
import submitit
def add(a, b):
return a + b
# create slurm wrapper object
executor = submitit.AutoExecutor(folder="log_test")
# update slurm parameters
executor.update_parameters(
partition=‘cas_V100_2’,
comment=‘python’
)
# submit job
job = executor.submit(add, 5, 7)
# print job ID
print(job.job_id)
# print result
print(job.result())
$ ./add.py
110651
12
3. Submit: Multitask Job Example
add_para.py
#!/usr/bin/env python3
import submitit
def add(a, b):
return a + b
# create slurm object
executor = submitit.AutoExecutor(folder="log_test")
# slurm parameters
executor.update_parameters(
partition=‘cas_V100_2’,
comment=‘python’,
ntasks_per_node=3)
)
# submit job
job = executor.submit(add, 5, 7)
# print job ID
print(job.job_id)
# print results
print(job.results())
$ ./add_para.py
110658
[12, 12, 12]
submitit/submitit/slurm/slurm.py
def _make_sbatch_string(
command: str,
folder: tp.Union[str, Path],
job_name: str = "submitit",
partition: tp.Optional[str] = None,
time: int = 5,
nodes: int = 1,
ntasks_per_node: tp.Optional[int] = None,
cpus_per_task: tp.Optional[int] = None,
cpus_per_gpu: tp.Optional[int] = None,
num_gpus: tp.Optional[int] = None, # legacy
gpus_per_node: tp.Optional[int] = None,
gpus_per_task: tp.Optional[int] = None,
qos: tp.Optional[str] = None, # quality of service
setup: tp.Optional[tp.List[str]] = None,
mem: tp.Optional[str] = None,
mem_per_gpu: tp.Optional[str] = None,
mem_per_cpu: tp.Optional[str] = None,
signal_delay_s: int = 90,
comment: tp.Optional[str] = None,
constraint: tp.Optional[str] = None,
exclude: tp.Optional[str] = None,
account: tp.Optional[str] = None,
gres: tp.Optional[str] = None,
exclusive: tp.Optional[tp.Union[bool, str]] = None,
array_parallelism: int = 256,
wckey: str = "submitit",
stderr_to_stdout: bool = False,
map_count: tp.Optional[int] = None, # used internally
additional_parameters: tp.Optional[tp.Dict[str, tp.Any]] = None,
srun_args: tp.Optional[tp.Iterable[str]] = None ) -> str:
- 110658_0_log.out
submitit INFO (2022-04-08 12:32:19,583)
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=0(3), node=0(1),
global_rank=0(3)) submitit INFO (2022-04-08 12:32:19,584)
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl
submitit INFO (2022-04-08 12:32:19,612) - Job completed successfully
- 110658_1_log.out submitit
INFO (2022-04-08 12:32:19,584)
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=1(3), node=0(1), global_rank=1(3))
submitit INFO (2022-04-08 12:32:19,620)
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl
submitit INFO (2022-04-08 12:32:19,624) - Job completed successfully
- 110658_2_log.out
submitit INFO (2022-04-08 12:32:19,583)
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=2(3), node=0(1), global_rank=2(3))
submitit INFO (2022-04-08 12:32:19,676)
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl
submitit INFO (2022-04-08 12:32:19,681) - Job completed successfully
import torch
import torch.multiprocessing as mp
import torchvision
class NeuralNet(self):
def __init__(self):
self.layer1 = …
self.layer2 = …
#Allow passing an NN object to submit()
def __call__(self):
job_env = submitit.JobEnvironment() # local rank, global rank
dataset = torchvision.dataset.MNIST(…)
loader = torch.utils.data.DataLoader(…)
mp.spawn(…) # data distributed training
mnist = NeuralNet() … job = executor.submit(mnist)
...
job = executor.submit(mnist)
E. NCCL
This section introduces the installation method and example execution of NCCL, a multi-GPU and multi-node collective communication library optimized for NVIDIA GPUs on the Neuron system.
1. NCCL Installation and Verification
1) Installation method
Download the desired version from https://developer.nvidia.com/nccl/nccl-legacy-downloads.
$ tar Jxvf nccl_2.11.4-1+cuda11.4_x86_64.txz
2) Installation verification
$ ls –al nccl_2.11.4-1+cuda11.4_x86_64/
include/ lib/ LICENSE.txt
2. NCCL Execution Example.
1) Download the execution example.
$ git clone https://github.com/1duo/nccl-examples
2) Compile the execution example.
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1
$ cd nccl-example
$ mkdir build
$ cd build
$ cmake -DNCCL_INCLUDE_DIR=$NCCL_HOME/include
-DNCCL_LIBRARY=$NCCL_HOME/lib/libnccl.so ..
$ make
3) Check the execution results.
$ ./run.sh
Example 1: Single Process, Single Thread, Multiple Devices
Success
Example 2: One Device Per Process Or Thread
[MPI Rank 1] Success
[MPI Rank 0] Success
Example 3: Multiple Devices Per Thread
[MPI Rank 1] Success
[MPI Rank 0] Success
4) Execution example of 8 GPUs across 2 nodes using Example 3 above
4-1) Job script
#!/bin/sh
#SBATCH -J STREAM
#SBATCH --time=10:00:00 # walltime
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --gres=gpu:4
#SBATCH -o %N_%j.out
#SBATCH -e %N_%j.err
#SBATCH -p amd_a100_4
srun build/examples/example-3
4-2) Example code
//
// Example 3: Multiple Devices Per Thread
//
#include
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include
#include
#include
#define MPICHECK(cmd) do { \
int e = cmd; \
if( e != MPI_SUCCESS ) { \
printf("Failed: MPI error %s:%d '%d'\n", \
__FILE__,__LINE__, e); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define CUDACHECK(cmd) do { \
cudaError_t e = cmd; \
if( e != cudaSuccess ) { \
printf("Failed: Cuda error %s:%d '%s'\n", \
__FILE__,__LINE__,cudaGetErrorString(e)); \
exit(EXIT_FAILURE); \
} \
} while(0)
#define NCCLCHECK(cmd) do { \
ncclResult_t r = cmd; \
if (r!= ncclSuccess) { \
printf("Failed, NCCL error %s:%d '%s'\n", \
__FILE__,__LINE__,ncclGetErrorString(r)); \
exit(EXIT_FAILURE); \
} \
} while(0)
static uint64_t getHostHash(const char *string) {
// Based on DJB2, result = result * 33 + char
uint64_t result = 5381;
for (int c = 0; string[c] != '\0'; c++) {
result = ((result << 5) + result) + string[c];
}
return result;
}
static void getHostName(char *hostname, int maxlen) {
gethostname(hostname, maxlen);
for (int i = 0; i < maxlen; i++) {
if (hostname[i] == '.') {
hostname[i] = '\0';
return;
}
}
}
int main(int argc, char *argv[]) {
//int size = 32 * 1024 * 1024;
int size = 5;
int myRank, nRanks, localRank = 0;
// initializing MPI
MPICHECK(MPI_Init(&argc, &argv));
MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));
// calculating localRank which is used in selecting a GPU
uint64_t hostHashs[nRanks];
char hostname[1024];
getHostName(hostname, 1024);
hostHashs[myRank] = getHostHash(hostname);
MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));
for (int p = 0; p < nRanks; p++) {
if (p == myRank) {
break;
}
if (hostHashs[p] == hostHashs[myRank]) {
localRank++;
}
}
// each process is using four GPUs
int nDev = 4;
int **hostsendbuff = (int **) malloc(nDev * sizeof(int *));
int **hostrecvbuff = (int **) malloc(nDev * sizeof(int *));
int **sendbuff = (int **) malloc(nDev * sizeof(int *));
int **recvbuff = (int **) malloc(nDev * sizeof(int *));
cudaStream_t *s = (cudaStream_t *) malloc(sizeof(cudaStream_t) * nDev);
// picking GPUs based on localRank
for (int i = 0; i < nDev; ++i) {
hostsendbuff[i] = (int *) malloc(size * sizeof(int));
for (int j = 0; j < size; j++) {
hostsendbuff[i][j]=1;
}
hostrecvbuff[i] = (int *) malloc(size * sizeof(int));
CUDACHECK(cudaSetDevice(localRank * nDev + i));
CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(int)));
CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(int)));
CUDACHECK(cudaMemcpy(sendbuff[i], hostsendbuff[i], size * sizeof(int),
cudaMemcpyHostToDevice));
CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(int)));
CUDACHECK(cudaStreamCreate(s + i));
}
ncclUniqueId id;
ncclComm_t comms[nDev];
// generating NCCL unique ID at one process and broadcasting it to all
if (myRank == 0) {
ncclGetUniqueId(&id);
}
MPICHECK(MPI_Bcast((void *) &id, sizeof(id), MPI_BYTE, 0,
MPI_COMM_WORLD));
// initializing NCCL, group API is required around ncclCommInitRank
// as it is called across multiple GPUs in each thread/process
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nDev; i++) {
CUDACHECK(cudaSetDevice(localRank * nDev + i));
NCCLCHECK(ncclCommInitRank(comms + i, nRanks * nDev, id,
myRank * nDev + i));
}
NCCLCHECK(ncclGroupEnd());
// calling NCCL communication API. Group API is required when
// using multiple devices per thread/process
NCCLCHECK(ncclGroupStart());
for (int i = 0; i < nDev; i++) {
NCCLCHECK(ncclAllReduce((const void *) sendbuff[i],
(void *) recvbuff[i], size, ncclInt, ncclSum,
comms[i], s[i]));
}
NCCLCHECK(ncclGroupEnd());
// synchronizing on CUDA stream to complete NCCL communication
for (int i = 0; i < nDev; i++) {
CUDACHECK(cudaStreamSynchronize(s[i]));
}
// freeing device memory
for (int i = 0; i < nDev; i++) {
CUDACHECK(cudaMemcpy(hostrecvbuff[i], recvbuff[i], size * sizeof(int),
cudaMemcpyDeviceToHost));
CUDACHECK(cudaFree(sendbuff[i]));
CUDACHECK(cudaFree(recvbuff[i]));
}
printf(" \n");
for (int i = 0; i < nDev; i++) {
printf("%s rank:%d gpu%d ", hostname, myRank, i);
for (int j = 0; j < size; j++) {
printf("%d ", hostsendbuff[i][j]);
}
printf("\n");
}
printf(" \n");
for (int i = 0; i < nDev; i++) {
printf("%s rank:%d gpu%d ", hostname, myRank, i);
for (int j = 0; j < size; j++) {
printf("%d ", hostrecvbuff[i][j]);
}
printf("\n");
}
// finalizing NCCL
for (int i = 0; i < nDev; i++) {
ncclCommDestroy(comms[i]);
}
// finalizing MPI
MPICHECK(MPI_Finalize());
printf("[MPI Rank %d] Success \n", myRank);
return 0;
}
4-3) Execution results
gpu37 rank:1 gpu0 1 1 1 1 1 1 1 1
gpu37 rank:1 gpu1 2 2 2 2 2 2 2 2
gpu37 rank:1 gpu2 3 3 3 3 3 3 3 3
gpu37 rank:1 gpu3 4 4 4 4 4 4 4 4
gpu37 rank:1 gpu0 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu1 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu2 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu3 0 0 0 0 0 0 0 0
[MPI Rank 1] Success
gpu36 rank:0 gpu0 1 1 1 1 1 1 1 1
gpu36 rank:0 gpu1 2 2 2 2 2 2 2 2
gpu36 rank:0 gpu2 3 3 3 3 3 3 3 3
gpu36 rank:0 gpu3 4 4 4 4 4 4 4 4
gpu36 rank:0 gpu0 20 20 20 20 20 20 20 20
gpu36 rank:0 gpu1 0 0 0 0 0 0 0 0
gpu36 rank:0 gpu2 0 0 0 0 0 0 0 0
gpu36 rank:0 gpu3 0 0 0 0 0 0 0 0
[MPI Rank 0] Success
F. Tensorflow Distribute
TensorFlow Distribute is a TensorFlow API that enables distributed training using multiple GPUs or multiple servers. (Uses TensorFlow 2.0)
1. TensorFlow Installation and Verification in the Conda Environment
1) Installation method
$ module load python/3.7.1
$ conda create -n tf_test
$ conda activate tf_test
(tf_test) $ conda install tensorflow
2) Installation verification
(tf_test) $ conda list | grep tensorflow
tensorflow 2.0.0 gpu_py37h768510d_0
tensorflow-base 2.0.0 gpu_py37h0ec5d1f_0
tensorflow-estimator 2.0.0 pyh2649769_0
tensorflow-gpu 2.0.0 h0d30ee6_0
tensorflow-metadata 1.7.0 pypi_0 pypi
2. Utilization of Single Node, Multi-GPU (using tf.distribute.MirroredStrategy()).
1) Code example (tf_multi_keras.py)
import tensorflow as tf
import numpy as np
import os
strategy = tf.distribute.MirroredStrategy()
BUFFER_SIZE = 1000
n_workers = 1
batch_size_per_gpu = 64
global_batch_size = batch_size_per_gpu * n_workers
def mnist_dataset(batch_size):
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
return train_dataset
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(10)
])
model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
return model
dataset = mnist_dataset(global_batch_size)
with strategy.scope():
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(dataset, epochs=5, steps_per_epoch=BUFFER_SIZE)
2) Interactive job submission (1 node, 4 GPUs)
$ salloc --partition=cas_v100_4 --nodes=1 —ntasks-per-node=4 \
--gres=gpu:4 --comment=etc
$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ python tf_multi_keras.py
3) Batch job submission script (1 node, 4 GPUs) (tf_dist_run.sh)
#!/bin/bash
#SBATCH -J tf_dist_test
#SBATCH -p cas_v100_4
#SBATCH -N 1
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %s.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:4
#SBATCH –comment etc
module purge
module load python/3.7.1
source activate tf_test
python tf_multi_keras.py
3. Utilization of Multi Node, Multi-GPU
(using tf.distribute.MultiWorkerMirroredStrategy())
Modify the strategy for use on multiple nodes and set the environment variable TF_CONFIG on each node.
1) Modify the code example
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
2) Interactive job submission (2 nodes, each with 4 GPUs)
$ salloc --partition=cas_v100_4 —nodes=2 —ntasks-per-node=4 \
--gres=gpu:4 --comment=etc
$ sconstrol show hostnames
gpu01
gpu02
gpu01$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ export TF_CONFIG='{"cluster": {"worker": ["gpu01:12345", "gpu02:12345"]}, "task": {"index": 0, "type": "worker"}}'
(tf_test) $ python tf_multi_keras.py
gpu02$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ export TF_CONFIG='{"cluster": {"worker": ["gpu01:12345", "gpu02:12345"]}, "task": {"index": 1, "type": "worker"}}'
(tf_test) $ python tf_multi_keras.py
3) Batch job submission script (2 nodes, each with 4 GPUs) (tf_multi_run.sh)
#!/bin/bash
#SBATCH -J tf_multi_test
#SBATCH -p 4gpu
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o ./out/%x.o%j
#SBATCH -e ./out/%x.e%j
#SBATCH --time 01:00:00
###SBATCH --gres=gpu:4
#SBATCH --comment etc
module purge
module load python/3.7.1
source activate tf_test
worker=(`scontrol show hostnames`)
num_worker=${#worker[@]}
PORT=12345
unset TF_CONFIG
for i in ${worker[@]}
do
tmp_TF_CONFIG=\"$i":"$PORT\"
TF_CONFIG=$TF_CONFIG", "$tmp_TF_CONFIG
done
TF_CONFIG=${TF_CONFIG:2}
cluster=\"cluster\"": "\{\"worker\"": "\[$TF_CONFIG\]\}
j=0
while [ $j -lt $num_worker ]
do
task=\"task\"": "\{\"index\"": "$j", "\"type\"": "\"worker\"\}
tmp=${cluster}", "${task}
tmp2=\'\{${tmp}\}\'
ssh ${worker[$j]} "conda activate tf_test; export TF_CONFIG=$tmp2; python tf_multi_keras.py” &
j=$(($j+1))
done
4. References
Distributed training using Keras (https://www.tensorflow.org/tutorials/distribute/keras)
[TensorFlow 2] DNN training using MNIST data as the training dataset (http://www.gisdeveloper.co.kr/?p=8534)
G. PytorchDDP
PyTorch DDP (DistributedDataParallel) provides distributed data parallelism, which can be executed in a multi-node, multi-GPU environment. For PyTorch DDP features and tutorials, refer to the website below.
https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
https://github.com/pytorch/examples/blob/main/distributed/ddp/README.md
The following example demonstrates how to run PyTorch DDP using the Slurm scheduler.
1. Job Submission Script Example
1) Single-node example (single node, 2 GPUs)
#!/bin/bash -l
#SBATCH -J PytorchDDP
#SBATCH -p cas_v100_4
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --gres=gpu:2
#SBATCH --comment pytorch
#SBATCH --time 1:00:00
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
# Configuration
traindata='{path}'
master_port="$((RANDOM%55535+10000))"
# Load software
conda activate pytorchDDP
# Launch one SLURM task, and use torch distributed launch utility
# to spawn training worker processes; one per GPU
srun -N 1 -n 1 python main.py -a config \
--dist-url "tcp://127.0.0.1:${master_port}" \
--dist-backend 'nccl' \
--multiprocessing-distributed \
--world-size $SLURM_TASKS_PER_NODE \
--rank 0 \
$traindata
2) Multi-node example (2 nodes, 2 GPUs)
#!/bin/bash -l
#SBATCH -J PytorchDDP
#SBATCH -p cas_v100_4
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --gres=gpu:1
#SBATCH --comment pytorch
#SBATCH --time 10:00:0
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
# Load software list
module load {module name}
conda activate {conda name}
# Setup node list
nodes=$(scontrol show hostnames $SLURM_JOB_NODELIST) # Getting the node names
nodes_array=( $nodes )
master_node=${nodes_array[0]}
master_addr=$(srun --nodes=1 --ntasks=1 -w $master_node hostname --ip-address)
master_port=$((RANDOM%55535+10000))
worker_num=$(($SLURM_JOB_NUM_NODES))
# Loop over nodes and submit training tasks
for (( node_rank=0; node_rank<$worker_num; node_rank++ )); do
node=${nodes_array[$node_rank]}
echo "Submitting node # $node_rank, $node"
# Launch one SLURM task per node, and use torch distributed launch utility
# to spawn training worker processes; one per GPU
srun -N 1 -n 1 -w $node python main.py -a $config \
--dist-url tcp://$master_addr:$master_port \
--dist-backend 'nccl' \
--multiprocessing-distributed \
--world-size $SLURM_JOB_NUM_NODES \
--rank $node_rank &
pids[${node_rank}]=$!
done
# Wait for completion
for pid in ${pids[*]}; do
wait $pid
done
Last updated on November 11, 2024.
Last updated