Using AI with Multiple Nodes

On the Neuron system, distributed learning methods are introduced for dividing the computations required for deep learning training across dozens of GPUs using multiple nodes, with the results aggregated over a high-speed network.

A. HOROVOD

Horovod uses a common standard MPI model for message passing and communication management in high-performance distributed computing environments. Horovod's MPI implementation offers a simplified programming model compared to the standard TensorFlow distributed training model.

1. HOROVOD (TensorFlow) Installation and Verification

1) Installation method

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1 cmake/3.16.9

$ conda create -n my_tensorflow
$ source activate my_tensorflow

(my_tensorflow) $ conda install tensorflow-gpu=2.0.0 tensorboard=2.0.0 \
 tensorflow-estimator=2.0.0 python=3.7 cudnn cudatoolkit=10 nccl=2.8.3

(my_tensorflow) $ HOROVOD_WITH_MPI=1 HOROVOD_GPU_OPERATIONS=NCCL \
HOROVOD_NCCL_LINK=SHARED HOROVOD_WITH_TENSORFLOW=1 \
pip install --no-cache-dir horovod==0.23.0

2) Installation verification

(my_tensorflow) $ pip list | grep horovod 
horovod 0.23.0

(my_tensorflow) $ python 
>>> import horovod 
>>> horovod.__version__ '0.23.0‘ 

(my_tensorflow) $ horovodrun -cb
Horovod v0.23.0: 

 Available Frameworks: 
    [X] TensorFlow 
    [X] PyTorch 
    [ ] MXNet 

 Available Controllers: 
    [X] MPI 
    [X] Gloo 

 Available Tensor Operations: 
    [X] NCCL 
    [ ] DDL 
    [ ] CCL 
    [X] MPI 
    [X] Gloo

2. HOROVOD (TensorFlow) Execution Example

1) Execution using a job submission script

#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment tensorflow

module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1

source activate my_tensorflow

horovodrun -np 2 python tensorflow2_mnist.py

2) Execution using interactive job submission

$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
--gres=gpu:2 --comment=tensorflow

$ echo $SLURM_NODELIST
gpu[12-13]

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_tensorflow

(my_tensorflow) $ horovodrun -np 4 -H gpu12:2,gpu13:2 python tensorflow2_mnist.py

3. Installing and Verying HOROVOD (PyTorch)

1) Installation method

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 
$ module load python/3.7.1 cmake/3.16.9

$ conda create -n my_pytorch
$ source activate my_pytorch

(my_pytorch) $ conda install pytorch=1.11.0 python=3.9 \
torchvision=0.12.0  torchaudio=0.11.0 cudatoolkit=10.2 -c pytorch
(my_pytorch) $ HOROVOD_WITH_MPI=1 HOROVOD_NCCL_LINK=SHARED \
 HOROVOD_GPU_OPERATIONS=NCCL HOROVOD_WITH_PYTORCH=1 \
pip install --no-cache-dir horovod==0.24.0

2) Installation verification

(my_pytorch) $ pip list | grep horovod
horovod 0.24.0

(my_pytorch) $ python
>>> import horovod
>>> horovod.__version__
'0.24.0'

(my_pytorch) $ horovodrun -cb
Horovod v0.24.0:

Available Frameworks:
    [ ] TensorFlow
    [X] PyTorch
    [ ] MXNet

Available Controllers:
    [X] MPI
    [X] Gloo

Available Tensor Operations:
    [X] NCCL
    [ ] DDL
    [ ] CCL
    [X] MPI
    [X] Gloo

4. HOROVOD (PyTorch) Execution Example

1) Job submission script example

#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment pytorch

module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1

source activate my_pytorch

horovodrun -np 2 python pytorch_ex.py

2) Execution using interactive job submission

$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
 --gres=gpu:2 --comment=pytorch

$ echo $SLURM_NODELIST gpu[22-23] 

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1 
$ source activate my_pytorch 

(my_pytorch) $ horovodrun -np 4 -H gpu22:2,gpu23:2 python pytorch_ex.py

B. GLOO

GLOO is an open-source collective communication library developed by Facebook, included in Horovod, which supports multi-node jobs using Horovod without requiring the installation of MPI.

The installation of GLOO is dependent on Horovod and is installed alongside Horovod during its installation process.

※ For the Horovod installation method, refer to the Horovod installation and verification section above

1. GLOO Execution Example

1) Job submission script example

#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment tensorflow

module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_tensorflow

horovodrun --gloo -np 4 python tensorflow2_mnist.py

2) Execution using interactive job submission

$ salloc --partition=cas_v100_4 -J debug --nodes=2 -n 4 —time=08:00:00 \
 --gres=gpu:2 --comment=tensorflow

$ echo $SLURM_NODELIST
  gpu[12-13]

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ source activate my_tensorflow

(my_tensorflow) $ horovodrun –np 4 --gloo --network-interface ib0 \
-H gpu12:2,gpu13:2 python tensorflow2_mnist.py

C. Ray

Ray is a Python-based workload that provides parallel execution in a multi-node environment. It can be used with deep learning models like PyTorch using various libraries. For more details, visit the following website : https://docs.ray.io/en/latest/cluster/index.html

1. Ray Installation and Single Node Execution Example

1) Installation method

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
$ conda create -n my_ray
$ source activate my_ray

(my_ray) $ pip install --no-cache-dir ray

$ export PATH=/home01/${USER}/.local/bin:$PATH

2) Job submission script example

#!/bin/bash
#SBATCH -J test_job
#SBATCH -p cas_v100_4
#SBATCH -N 1
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:2
#SBATCH --comment xxx

module purge
module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1
source activate my_ray

python test.py
  • test.py

import ray 

ray.init() 

@ray.remote 
def f(x):
     return x * x 

 futures = [f.remote(i) for i in range(4)] 

 print(ray.get(futures)) # [0, 1, 4, 9] 

 @ray.remote
 class Counter(object): 
     def __init__(self): 
        self.n = 0 

     def increment(self): 
        self.n += 1 

     def read(self): 
        return self.n 

counters = [Counter.remote() for i in range(4)] 
[c.increment.remote() for c in counters] 
futures = [c.read.remote() for c in counters] 
print(ray.get(futures)) # [1, 1, 1, 1]

3) Execution using interactive job submission

$ salloc --partition=cas_v100_4 -J debug --nodes=1 -n 4 —time=08:00:00 \
 --gres=gpu:2 --comment=xxx
$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1 python/3.7.1

$ source activate my_ray

(my_tensorflow) $ python test.py
[0, 1, 4, 9]
[1, 1, 1, 1]

2. Ray Cluster Installation and Multi Node Execution Example

When executing Ray in a multi-node setup, it runs with one head node and multiple worker nodes, efficiently scheduling tasks to the allocated resources.

The example created by NERSC can be downloaded from GITHUB (https://github.com/NERSC/slurm-ray-cluster.git) and executed as follows.

1) Installation method

$ git clone https://github.com/NERSC/slurm-ray-cluster.git
$ cd slurm-ray-cluster

[edit submit-ray-cluster.sbatch, start-head.sh, start-worker.sh]

$ sbatch submit-ray-cluster.sbatch

2) Job submission script example

#!/bin/bash

#SBATCH -J ray_test
#SBATCH -p cas_v100_4
#SBATCH --time=00:10:00

### This script works for any number of nodes, Ray will find and manage all resources
#SBATCH --nodes=2

### Give all resources to a single Ray task, ray can manage the resources internally
#SBATCH --ntasks-per-node=4
#SBATCH --comment etc
#SBATCH --gres=gpu:2

# Load modules or your own conda environment here
module load gcc/8.3.0 cuda/10.0 cudampi/openmpi-3.1.0 python/3.7.1
source activate my_ray
export PATH=/home01/${USER} /.local/bin:$PATH




#### call your code below
python test.py
exit 
  • start-head.sh

#!/bin/bash 

#export LC_ALL=C.UTF-8 
#export LANG=C.UTF-8 

echo "starting ray head node" 
# Launch the head node 
ray start --head --node-ip-address=$1 --port=12345 --redis-password=$2 
sleep infinity
  • start-worker.sh

#!/bin/bash 

#export LC_ALL=C.UTF-8 
#export LANG=C.UTF-8 

echo "starting ray worker node" 
ray start --address $1 --redis-password=$2 
sleep infinity

3) Output

==========================================
SLURM_JOB_ID = 107575
SLURM_NODELIST = gpu[21-22]
==========================================
         'cuda/10.0' supports the {CUDA_MPI}. 
IP Head: 10.151.0.21:12345
STARTING HEAD at gpu21
starting ray head node
2022-04-27 10:57:38,976 INFO services.py:1092 -- View the Ray dashboard at http://localhost:8265
2022-04-27 10:57:38,599 INFO scripts.py:467 -- Local node IP: 10.151.0.21
2022-04-27 10:57:39,000 SUCC scripts.py:497 -- --------------------
2022-04-27 10:57:39,000 SUCC scripts.py:498 -- Ray runtime started.
2022-04-27 10:57:39,000 SUCC scripts.py:499 -- --------------------
2022-04-27 10:57:39,000 INFO scripts.py:501 -- Next steps
2022-04-27 10:57:39,000 INFO scripts.py:503 -- To connect to this Ray runtime from another node, run
2022-04-27 10:57:39,000 INFO scripts.py:507 -- ray start --address='10.151.0.21:12345' --redis-password='90268c54-9df9-4d98-b363-ed6ed4d61a61'
2022-04-27 10:57:39,000 INFO scripts.py:509 -- Alternatively, use the following Python code:
2022-04-27 10:57:39,001 INFO scripts.py:512 -- import ray
2022-04-27 10:57:39,001 INFO scripts.py:519 -- ray.init(address='auto', _redis_password='90268c54-9df9-4d98-b363-ed6ed4d61a61')
2022-04-27 10:57:39,001 INFO scripts.py:522 -- If connection fails, check your firewall settings and network configuration.
2022-04-27 10:57:39,001 INFO scripts.py:526 -- To terminate the Ray runtime, run
2022-04-27 10:57:39,001 INFO scripts.py:527 -- ray stop
STARTING WORKER 1 at gpu22
starting ray worker node
2022-04-27 10:58:08,922 INFO scripts.py:591 -- Local node IP: 10.151.0.22
2022-04-27 10:58:09,069 SUCC scripts.py:606 -- --------------------
2022-04-27 10:58:09,069 SUCC scripts.py:607 -- Ray runtime started.
2022-04-27 10:58:09,069 SUCC scripts.py:608 -- --------------------
2022-04-27 10:58:09,069 INFO scripts.py:610 -- To terminate the Ray runtime, run
2022-04-27 10:58:09,069 INFO scripts.py:611 -- ray stop
2022-04-27 10:58:13,915 INFO services.py:1092 -- View the Ray dashboard at http://127.0.0.1:8265
[0, 1, 4, 9]
[1, 1, 1, 1]

3. Ray Cluster (PyTorch) Installation and Multi Node Execution Example

1) Installation method

$ pip install --user torch torchvision torchaudio tabulate tensorboardX

[edit submit-ray-cluster.sbatch]

$ sbatch submit-ray-cluster.sbatch

2) Job submission script example

#!/bin/bash

#SBATCH -J ray_test
#SBATCH -p cas_v100_4
#SBATCH --time=00:10:00

### This script works for any number of nodes, 
### Ray will find and manage all resources
#SBATCH --nodes=2

### Give all resources to a single Ray task, 
### ray can manage the resources internally
#SBATCH --ntasks-per-node=4
#SBATCH --comment etc
#SBATCH --gres=gpu:2

# Load modules or your own conda environment here
module load gcc/8.3.0 cuda/10.0 cudampi/openmpi-3.1.0 python/3.7.1
source activate my_ray
export PATH=/home01/${USER} /.local/bin:$PATH



#### call your code below
python examples/mnist_pytorch_trainable.py
exit
  • mnist_pytorch_trainable.py

from __future__ import print_function

import argparse
import os
import torch
import torch.optim as optim

import ray
from ray import tune
from ray.tune.schedulers import ASHAScheduler
from ray.tune.examples.mnist_pytorch import (train, test, get_data_loaders,
                                             ConvNet)

...

 ray.init(address='auto', _node_ip_address=os.environ["ip_head"].split(":")[0], _redis_password=os.environ["redis_password"])
      sched = ASHAScheduler(metric="mean_accuracy", mode="max")
      analysis = tune.run(TrainMNIST,
                         scheduler=sched,
                         stop={"mean_accuracy": 0.99,
                               "training_iteration": 100},
                         resources_per_trial={"cpu":10, "gpu": 1},
                         num_samples=128,
                         checkpoint_at_end=True,
                         config={"lr": tune.uniform(0.001, 1.0),
                                 "momentum": tune.uniform(0.1, 0.9),
                                 "use_gpu": True})
 print("Best config is:", analysis.get_best_config(metric="mean_accuracy", mode="max"))

3) Output

...

== Status == 
Memory usage on this node: 57.4/377.4 GiB 
Using AsyncHyperBand: num_stopped=11 
Bracket: Iter 64.000: None | Iter 16.000: 0.9390625000000001 | Iter 4.000: 0.85625 | Iter 1.000: 0.534375 
Resources requested: 80/80 CPUs, 8/8 GPUs, 0.0/454.88 GiB heap, 0.0/137.26 GiB objects 
Result logdir: /home01/${USER}/ray_results/TrainMNIST_2022-04-27_11-24-20 
Number of trials: 20/128 (1 PENDING, 8 RUNNING, 11 TERMINATED) 
+------------------------+------------+------------------+-----------+------------+----------+--------+------------------+
| Trial name             | status     | loc              |        lr | momentum |      acc |    iter |    total time (s) |
|------------------------+------------+------------------+-----------+------------+----------+--------+------------------|
| TrainMNIST_25518_00000 | RUNNING  | 10.151.0.22:4128 | 0.0831396 |    0.691582 |  0.953125 |     26 |         4.98667  |
| TrainMNIST_25518_00001 | RUNNING  | 10.151.0.22:4127 | 0.0581841 |    0.685648 |  0.94375  |     25 |         5.08798  |
| TrainMNIST_25518_00013 | RUNNING  |                  | 0.0114732 |    0.386122 |           |        |                   |
| TrainMNIST_25518_00014 | RUNNING  |                  | 0.686305  |    0.546706 |           |        |                   |
| TrainMNIST_25518_00015 | RUNNING  |                  | 0.442195  |    0.525069 |           |        |                   |
| TrainMNIST_25518_00016 | RUNNING  |                  | 0.647866  |    0.397167 |           |        |                   |
| TrainMNIST_25518_00017 | RUNNING  |                  | 0.479493  |    0.429876 |           |        |                   |
| TrainMNIST_25518_00018 | RUNNING  |                  | 0.341561  |    0.42485  |           |        |                   |
| TrainMNIST_25518_00019 | PENDING  |                  | 0.205629  |    0.551851 |           |         |                  |
| TrainMNIST_25518_00002 | TERMINATED |               | 0.740295  |    0.209155 |  0.078125 |      1 |         0.206633 |
| TrainMNIST_25518_00003 | TERMINATED |               | 0.202496  |    0.102844 |  0.853125 |      4 |          1.17559 |
| TrainMNIST_25518_00004 | TERMINATED |               | 0.431773  |    0.449912 |  0.85625  |      4 |         0.811173 |
| TrainMNIST_25518_00005 | TERMINATED |               | 0.595764  |    0.643525 |  0.121875 |      1 |         0.214556 |
| TrainMNIST_25518_00006 | TERMINATED |               | 0.480667  |    0.412854 |  0.728125 |      4 |         0.885571 |
| TrainMNIST_25518_00007 | TERMINATED |               | 0.544958  |    0.280743 |  0.15625  |      1 |         0.185517 |
| TrainMNIST_25518_00008 | TERMINATED |               | 0.277231  |    0.258283 |  0.48125  |      1 |         0.186344 |
| TrainMNIST_25518_00009 | TERMINATED |               | 0.87852   |    0.28864  |  0.10625  |      1 |         0.203304 |
| TrainMNIST_25518_00010 | TERMINATED |               | 0.691046  |    0.351471 |  0.103125 |      1 |          0.23274 |
| TrainMNIST_25518_00011 | TERMINATED |               | 0.926629  |    0.17118  |  0.121875 |      1 |         0.267205 |
| TrainMNIST_25518_00012 | TERMINATED |               | 0.618234  |    0.881444 |  0.046875 |      1 |         0.226228 |
+------------------------+------------+------------------+-----------+------------+----------+--------+------------------+
...

D. Submit it

Submitit is a lightweight tool for submitting Python functions for computation within a Slurm cluster. It primarily organizes submitted jobs and provides access to results, logs, etc.

1. Example (1)

  • add.py

#!/usr/bin/env python3 

 import submitit 

 def add(a, b): 
    return a + b 

 # create slurm wrapper object 
executor = submitit.AutoExecutor(folder="log_test") 

 # update slurm parameters 
executor.update_parameters( 
    partition=‘cas_V100_2’, 
    comment=‘python’ ) 

 # submit job 
job = executor.submit(add, 5, 7) 

 # print job ID 
print(job.job_id) 

 # print result 
print(job.result())

$ ./add.py 
110651 
12
  • 110651_submission.sh

#!/bin/bash 

# Parameters
#SBATCH --comment="python"
#SBATCH --error=/scratch/${USER}/2022/02-submitit/test/log_test/%j_0_log.err
#SBATCH --job-name=submitit
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --open-mode=append
#SBATCH --output=/scratch/${USER}/2022/02-submitit/test/log_test/%j_0_log.out
#SBATCH --partition=cas_v100_2
#SBATCH --signal=USR1@90
#SBATCH --wckey=submitit

# command
export SUBMITIT_EXECUTOR=slurm
srun 
    --output /scratch/${USER}/2022/02-submitit/test/log_test/%j_%t_log.out 
    --error /scratch/${USER}/2022/02-submitit/test/log_test/%j_%t_log.err 
    --unbuffered 
    /scratch/${USER}/.conda/submitit/bin/python3 
    -u 
    -m submitit.core._submit 
    /scratch/${USER}/2022/02-submitit/test/log_test

Python function stored as pickle file

-rw-r--r-- 1 testuser01 tu0000   0 May  4 21:28 log_test/110651_0_log.err
-rw-r--r-- 1 testuser01 tu0000 476 May  4 21:28 log_test/110651_0_log.out
-rw-r--r-- 1 testuser01 tu0000  26 May  4 21:28 log_test/110651_0_result.pkl
-rw-r--r-- 1 testuser01 tu0000 634 May  4 21:28 log_test/110651_submitted.pkl
-rw-r--r-- 1 testuser01 tu0000 735 May  4 21:28 log_test/110651_submission.sh

2. Example (2)

  • add.py

#!/usr/bin/env python3 

 def add(a, b): 
    return a + b 

 print(add(5, 7)) 
  • run.sh

#!/bin/bash
 
#SBATCH --comment="python" 
#SBATCH --job-name=submitit 
#SBATCH --nodes=1 
#SBATCH --ntasks-per-node=1 
#SBATCH --partition=cas_v100_2 

./add.py
$ sbatch run.sh 
110650
  • add.py

#!/usr/bin/env python3 

 import submitit 

 def add(a, b): 
    return a + b 

 # create slurm wrapper object 
executor = submitit.AutoExecutor(folder="log_test") 

 # update slurm parameters 
executor.update_parameters( 
    partition=‘cas_V100_2’, 
    comment=‘python’ 
) 

 # submit job 
job = executor.submit(add, 5, 7) 

 # print job ID 
print(job.job_id) 

 # print result 
print(job.result())
$ ./add.py 
110651 
12

3. Submit: Multitask Job Example

  • add_para.py

#!/usr/bin/env python3 

 import submitit 

 def add(a, b): 
    return a + b 

 # create slurm object 
executor = submitit.AutoExecutor(folder="log_test") 

 # slurm parameters 
executor.update_parameters( 
    partition=‘cas_V100_2’, 
    comment=‘python’, 
    ntasks_per_node=3) 
) 

 # submit job 
job = executor.submit(add, 5, 7) 

 # print job ID 
print(job.job_id) 

 # print results 
print(job.results())
$ ./add_para.py 
110658 
[12, 12, 12]
  • submitit/submitit/slurm/slurm.py

def _make_sbatch_string( 
    command: str, 
    folder: tp.Union[str, Path], 
    job_name: str = "submitit", 
    partition: tp.Optional[str] = None, 
    time: int = 5, 
    nodes: int = 1, 
    ntasks_per_node: tp.Optional[int] = None, 
    cpus_per_task: tp.Optional[int] = None, 
    cpus_per_gpu: tp.Optional[int] = None, 
    num_gpus: tp.Optional[int] = None,  # legacy 
    gpus_per_node: tp.Optional[int] = None, 
    gpus_per_task: tp.Optional[int] = None, 
    qos: tp.Optional[str] = None,  # quality of service 
    setup: tp.Optional[tp.List[str]] = None, 
    mem: tp.Optional[str] = None, 
    mem_per_gpu: tp.Optional[str] = None, 
    mem_per_cpu: tp.Optional[str] = None, 
    signal_delay_s: int = 90, 
    comment: tp.Optional[str] = None, 
    constraint: tp.Optional[str] = None, 
    exclude: tp.Optional[str] = None, 
    account: tp.Optional[str] = None, 
    gres: tp.Optional[str] = None, 
    exclusive: tp.Optional[tp.Union[bool, str]] = None, 
    array_parallelism: int = 256, 
    wckey: str = "submitit", 
    stderr_to_stdout: bool = False, 
    map_count: tp.Optional[int] = None,  # used internally 
    additional_parameters: tp.Optional[tp.Dict[str, tp.Any]] = None, 
    srun_args: tp.Optional[tp.Iterable[str]] = None ) -> str:
 - 110658_0_log.out 
submitit INFO (2022-04-08 12:32:19,583) 
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=0(3), node=0(1), 
global_rank=0(3)) submitit INFO (2022-04-08 12:32:19,584) 
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl 
submitit INFO (2022-04-08 12:32:19,612) - Job completed successfully 

 - 110658_1_log.out submitit 
INFO (2022-04-08 12:32:19,584) 
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=1(3), node=0(1), global_rank=1(3)) 
submitit INFO (2022-04-08 12:32:19,620) 
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl 
submitit INFO (2022-04-08 12:32:19,624) - Job completed successfully 

 - 110658_2_log.out 
submitit INFO (2022-04-08 12:32:19,583) 
- Starting with JobEnvironment(job_id=110658, hostname=gpu25, local_rank=2(3), node=0(1), global_rank=2(3)) 
submitit INFO (2022-04-08 12:32:19,676) 
- Loading pickle: /scratch/${USER}/2022/02-submitit/test/log_test/110658_submitted.pkl 
submitit INFO (2022-04-08 12:32:19,681) - Job completed successfully
import torch 
import torch.multiprocessing as mp 
import torchvision 

 class NeuralNet(self): 
    def __init__(self):

        self.layer1 = … 
        self.layer2 = … 


#Allow passing an NN object to submit()
      def __call__(self):
         job_env = submitit.JobEnvironment() # local rank, global rank 
        dataset = torchvision.dataset.MNIST(…) 
        loader  =  torch.utils.data.DataLoader(…) 

        mp.spawn(…) # data distributed training

mnist = NeuralNet() … job = executor.submit(mnist)
...
job = executor.submit(mnist)

E. NCCL

This section introduces the installation method and example execution of NCCL, a multi-GPU and multi-node collective communication library optimized for NVIDIA GPUs on the Neuron system.

1. NCCL Installation and Verification

1) Installation method

Download the desired version from https://developer.nvidia.com/nccl/nccl-legacy-downloads.

$ tar Jxvf nccl_2.11.4-1+cuda11.4_x86_64.txz

2) Installation verification

$ ls –al nccl_2.11.4-1+cuda11.4_x86_64/
include/  lib/    LICENSE.txt

2. NCCL Execution Example.

1) Download the execution example.

$ git clone https://github.com/1duo/nccl-examples

2) Compile the execution example.

$ module load gcc/10.2.0 cuda/11.4 cudampi/openmpi-4.1.1

$ cd nccl-example​
$ mkdir build
$ cd build
$ cmake -DNCCL_INCLUDE_DIR=$NCCL_HOME/include
                -DNCCL_LIBRARY=$NCCL_HOME/lib/libnccl.so ..
$ make

3) Check the execution results.

$ ./run.sh
Example 1: Single Process, Single Thread, Multiple Devices
Success
Example 2: One Device Per Process Or Thread
[MPI Rank 1] Success
[MPI Rank 0] Success
Example 3: Multiple Devices Per Thread
[MPI Rank 1] Success
[MPI Rank 0] Success

4) Execution example of 8 GPUs across 2 nodes using Example 3 above

4-1) Job script

#!/bin/sh
#SBATCH -J STREAM
#SBATCH --time=10:00:00 # walltime
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=1
#SBATCH --gres=gpu:4
#SBATCH -o %N_%j.out
#SBATCH -e %N_%j.err
#SBATCH -p amd_a100_4

srun build/examples/example-3

4-2) Example code

//
// Example 3: Multiple Devices Per Thread
//

#include
#include "cuda_runtime.h"
#include "nccl.h"
#include "mpi.h"
#include
#include
#include

#define MPICHECK(cmd) do {                      \
  int e = cmd;                                    \
  if( e != MPI_SUCCESS ) {                        \
    printf("Failed: MPI error %s:%d '%d'\n",       \
        __FILE__,__LINE__, e);                     \
    exit(EXIT_FAILURE);                            \
  }                                                 \
} while(0)

#define CUDACHECK(cmd) do {                     \
  cudaError_t e = cmd;                             \
  if( e != cudaSuccess ) {                           \
    printf("Failed: Cuda error %s:%d '%s'\n",       \
        __FILE__,__LINE__,cudaGetErrorString(e));   \
    exit(EXIT_FAILURE);                             \
  }                                                  \
} while(0)


#define NCCLCHECK(cmd) do {                      \
  ncclResult_t r = cmd;                             \
  if (r!= ncclSuccess) {                              \
    printf("Failed, NCCL error %s:%d '%s'\n",       \
        __FILE__,__LINE__,ncclGetErrorString(r));    \
    exit(EXIT_FAILURE);                             \
  }                                                  \
} while(0)

static uint64_t getHostHash(const char *string) {
    // Based on DJB2, result = result * 33 + char
    uint64_t result = 5381;
    for (int c = 0; string[c] != '\0'; c++) {
        result = ((result << 5) + result) + string[c];
    }
    return result;
}

static void getHostName(char *hostname, int maxlen) {
    gethostname(hostname, maxlen);
    for (int i = 0; i < maxlen; i++) {
        if (hostname[i] == '.') {
            hostname[i] = '\0';
            return;
        }
    }
}

int main(int argc, char *argv[]) {
    //int size = 32 * 1024 * 1024;
    int size = 5;
    int myRank, nRanks, localRank = 0;

    // initializing MPI
    MPICHECK(MPI_Init(&argc, &argv));
    MPICHECK(MPI_Comm_rank(MPI_COMM_WORLD, &myRank));
    MPICHECK(MPI_Comm_size(MPI_COMM_WORLD, &nRanks));

    // calculating localRank which is used in selecting a GPU
    uint64_t hostHashs[nRanks];
    char hostname[1024];
    getHostName(hostname, 1024);
    hostHashs[myRank] = getHostHash(hostname);
    MPICHECK(MPI_Allgather(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
                           hostHashs, sizeof(uint64_t), MPI_BYTE, MPI_COMM_WORLD));

    for (int p = 0; p < nRanks; p++) {
        if (p == myRank) {
            break;
        }

        if (hostHashs[p] == hostHashs[myRank]) {
            localRank++;
        }
    }

    // each process is using four GPUs
    int nDev = 4;
    int **hostsendbuff = (int **) malloc(nDev * sizeof(int *));
    int **hostrecvbuff = (int **) malloc(nDev * sizeof(int *));
    int **sendbuff = (int **) malloc(nDev * sizeof(int *));
    int **recvbuff = (int **) malloc(nDev * sizeof(int *));

    cudaStream_t *s = (cudaStream_t *) malloc(sizeof(cudaStream_t) * nDev);

    // picking GPUs based on localRank
    for (int i = 0; i < nDev; ++i) {
                    hostsendbuff[i] = (int *) malloc(size * sizeof(int));
               for (int j = 0; j < size; j++) {
                    hostsendbuff[i][j]=1;
                }
                    hostrecvbuff[i] = (int *) malloc(size * sizeof(int));

        CUDACHECK(cudaSetDevice(localRank * nDev + i));
        CUDACHECK(cudaMalloc(sendbuff + i, size * sizeof(int)));
        CUDACHECK(cudaMalloc(recvbuff + i, size * sizeof(int)));
        CUDACHECK(cudaMemcpy(sendbuff[i], hostsendbuff[i], size * sizeof(int),
                              cudaMemcpyHostToDevice));
        CUDACHECK(cudaMemset(recvbuff[i], 0, size * sizeof(int)));
        CUDACHECK(cudaStreamCreate(s + i));
    }

    ncclUniqueId id;
    ncclComm_t comms[nDev];

    // generating NCCL unique ID at one process and broadcasting it to all
    if (myRank == 0) {
        ncclGetUniqueId(&id);
    }

    MPICHECK(MPI_Bcast((void *) &id, sizeof(id), MPI_BYTE, 0,
                       MPI_COMM_WORLD));
    // initializing NCCL, group API is required around ncclCommInitRank
    // as it is called across multiple GPUs in each thread/process
    NCCLCHECK(ncclGroupStart());
    for (int i = 0; i < nDev; i++) {
        CUDACHECK(cudaSetDevice(localRank * nDev + i));
        NCCLCHECK(ncclCommInitRank(comms + i, nRanks * nDev, id,
                                   myRank * nDev + i));
    }
    NCCLCHECK(ncclGroupEnd());

    // calling NCCL communication API. Group API is required when
    // using multiple devices per thread/process
    NCCLCHECK(ncclGroupStart());
    for (int i = 0; i < nDev; i++) {
NCCLCHECK(ncclAllReduce((const void *) sendbuff[i],
                             (void *) recvbuff[i], size, ncclInt, ncclSum,
                             comms[i], s[i]));
    }
    NCCLCHECK(ncclGroupEnd());

    // synchronizing on CUDA stream to complete NCCL communication
    for (int i = 0; i < nDev; i++) {
        CUDACHECK(cudaStreamSynchronize(s[i]));
    }

    // freeing device memory
    for (int i = 0; i < nDev; i++) {
        CUDACHECK(cudaMemcpy(hostrecvbuff[i], recvbuff[i], size * sizeof(int),
                             cudaMemcpyDeviceToHost));
        CUDACHECK(cudaFree(sendbuff[i]));
        CUDACHECK(cudaFree(recvbuff[i]));

    }

    printf(" \n");
    for (int i = 0; i < nDev; i++) {
        printf("%s rank:%d gpu%d ", hostname, myRank, i);
        for (int j = 0; j < size; j++) {
                printf("%d ", hostsendbuff[i][j]);
        }
        printf("\n");
    }

    printf(" \n");
    for (int i = 0; i < nDev; i++) {
        printf("%s rank:%d gpu%d ", hostname, myRank, i);
        for (int j = 0; j < size; j++) {
                printf("%d ", hostrecvbuff[i][j]);
        }
        printf("\n");
    }

    // finalizing NCCL

    for (int i = 0; i < nDev; i++) {
        ncclCommDestroy(comms[i]);
    }

    // finalizing MPI
    MPICHECK(MPI_Finalize());
    printf("[MPI Rank %d] Success \n", myRank);
    return 0;
}

4-3) Execution results

gpu37 rank:1 gpu0 1 1 1 1 1 1 1 1
gpu37 rank:1 gpu1 2 2 2 2 2 2 2 2
gpu37 rank:1 gpu2 3 3 3 3 3 3 3 3
gpu37 rank:1 gpu3 4 4 4 4 4 4 4 4

gpu37 rank:1 gpu0 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu1 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu2 0 0 0 0 0 0 0 0
gpu37 rank:1 gpu3 0 0 0 0 0 0 0 0
[MPI Rank 1] Success

gpu36 rank:0 gpu0 1 1 1 1 1 1 1 1
gpu36 rank:0 gpu1 2 2 2 2 2 2 2 2
gpu36 rank:0 gpu2 3 3 3 3 3 3 3 3
gpu36 rank:0 gpu3 4 4 4 4 4 4 4 4

gpu36 rank:0 gpu0 20 20 20 20 20 20 20 20
gpu36 rank:0 gpu1 0 0 0 0 0 0 0 0
gpu36 rank:0 gpu2 0 0 0 0 0 0 0 0
gpu36 rank:0 gpu3 0 0 0 0 0 0 0 0

[MPI Rank 0] Success

F. Tensorflow Distribute

TensorFlow Distribute is a TensorFlow API that enables distributed training using multiple GPUs or multiple servers. (Uses TensorFlow 2.0)

1. TensorFlow Installation and Verification in the Conda Environment

1) Installation method

$ module load python/3.7.1
$ conda create -n tf_test
$ conda activate tf_test
(tf_test) $ conda install tensorflow

2) Installation verification

(tf_test) $ conda list | grep tensorflow
tensorflow                2.0.0           gpu_py37h768510d_0
tensorflow-base           2.0.0           gpu_py37h0ec5d1f_0
tensorflow-estimator      2.0.0              pyh2649769_0
tensorflow-gpu            2.0.0                h0d30ee6_0
tensorflow-metadata       1.7.0                    pypi_0    pypi

2. Utilization of Single Node, Multi-GPU (using tf.distribute.MirroredStrategy()).

1) Code example (tf_multi_keras.py)

import tensorflow as tf 
import numpy as np 

import os 

strategy = tf.distribute.MirroredStrategy() 

BUFFER_SIZE = 1000 

n_workers = 1 
batch_size_per_gpu = 64 
global_batch_size = batch_size_per_gpu * n_workers 

 def mnist_dataset(batch_size): 
        (x_train, y_train), _ = tf.keras.datasets.mnist.load_data() 
        x_train = x_train / np.float32(255) 
        y_train = y_train.astype(np.int64) 
        train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train)).shuffle(60000).repeat().batch(batch_size) 
        return train_dataset 

 def build_and_compile_cnn_model(): 
        model = tf.keras.Sequential([ 
                tf.keras.Input(shape=(28, 28)), 
                tf.keras.layers.Reshape(target_shape=(28, 28, 1)), 
                tf.keras.layers.Conv2D(32, 3, activation='relu'), 
                tf.keras.layers.Flatten(), 
                tf.keras.layers.Dense(128, activation='relu'), 
                tf.keras.layers.Dense(10) 
        ]) 

        model.compile( 
               loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                optimizer=tf.keras.optimizers.SGD(learning_rate=0.001), 
                metrics=['accuracy']) 
        return model 

 dataset = mnist_dataset(global_batch_size) 

 with strategy.scope(): 
        multi_worker_model = build_and_compile_cnn_model() 

 multi_worker_model.fit(dataset, epochs=5, steps_per_epoch=BUFFER_SIZE)

2) Interactive job submission (1 node, 4 GPUs)

$ salloc --partition=cas_v100_4 --nodes=1 —ntasks-per-node=4 \
--gres=gpu:4 --comment=etc
$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ python tf_multi_keras.py

3) Batch job submission script (1 node, 4 GPUs) (tf_dist_run.sh)

#!/bin/bash
#SBATCH -J tf_dist_test
#SBATCH -p cas_v100_4
#SBATCH -N 1
#SBATCH -n 4
#SBATCH -o %x.o%j
#SBATCH -e %s.e%j
#SBATCH --time 00:30:00
#SBATCH --gres=gpu:4
#SBATCH –comment etc

module purge
module load python/3.7.1

source activate tf_test

python tf_multi_keras.py

3. Utilization of Multi Node, Multi-GPU (using tf.distribute.MultiWorkerMirroredStrategy())

Modify the strategy for use on multiple nodes and set the environment variable TF_CONFIG on each node.

1) Modify the code example

strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()

2) Interactive job submission (2 nodes, each with 4 GPUs)

$ salloc --partition=cas_v100_4 —nodes=2 —ntasks-per-node=4 \
 --gres=gpu:4 --comment=etc
$ sconstrol show hostnames
gpu01
gpu02

gpu01$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ export TF_CONFIG='{"cluster": {"worker": ["gpu01:12345", "gpu02:12345"]}, "task": {"index": 0, "type": "worker"}}'
(tf_test) $ python tf_multi_keras.py

gpu02$ conda activate tf_test
(tf_test) $ module load python/3.7.1
(tf_test) $ export TF_CONFIG='{"cluster": {"worker": ["gpu01:12345", "gpu02:12345"]}, "task": {"index": 1, "type": "worker"}}'
(tf_test) $ python tf_multi_keras.py

3) Batch job submission script (2 nodes, each with 4 GPUs) (tf_multi_run.sh)

#!/bin/bash
#SBATCH -J tf_multi_test
#SBATCH -p 4gpu
#SBATCH -N 2
#SBATCH -n 4
#SBATCH -o ./out/%x.o%j
#SBATCH -e ./out/%x.e%j
#SBATCH --time 01:00:00
###SBATCH --gres=gpu:4
#SBATCH --comment etc

module purge
module load python/3.7.1
source activate tf_test

worker=(`scontrol show hostnames`)
num_worker=${#worker[@]}
PORT=12345

unset TF_CONFIG
for i in ${worker[@]}
do
        tmp_TF_CONFIG=\"$i":"$PORT\"
        TF_CONFIG=$TF_CONFIG", "$tmp_TF_CONFIG
done

TF_CONFIG=${TF_CONFIG:2}
cluster=\"cluster\"": "\{\"worker\"": "\[$TF_CONFIG\]\}

j=0

while [ $j -lt $num_worker ]
do
        task=\"task\"": "\{\"index\"": "$j", "\"type\"": "\"worker\"\}
        tmp=${cluster}", "${task}
        tmp2=\'\{${tmp}\}\'
        ssh ${worker[$j]} "conda activate tf_test; export TF_CONFIG=$tmp2; python tf_multi_keras.py” &
        j=$(($j+1))
done

4. References

  • Distributed training using Keras (https://www.tensorflow.org/tutorials/distribute/keras)

  • [TensorFlow 2] DNN training using MNIST data as the training dataset (http://www.gisdeveloper.co.kr/?p=8534)

G. PytorchDDP

PyTorch DDP (DistributedDataParallel) provides distributed data parallelism, which can be executed in a multi-node, multi-GPU environment. For PyTorch DDP features and tutorials, refer to the website below.

  • https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

  • https://github.com/pytorch/examples/blob/main/distributed/ddp/README.md

The following example demonstrates how to run PyTorch DDP using the Slurm scheduler.

1. Job Submission Script Example

1) Single-node example (single node, 2 GPUs)

#!/bin/bash -l
#SBATCH -J PytorchDDP
#SBATCH -p cas_v100_4
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --gres=gpu:2
#SBATCH --comment pytorch
#SBATCH --time 1:00:00
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j

# Configuration
traindata='{path}'
master_port="$((RANDOM%55535+10000))"

# Load software
conda activate pytorchDDP

# Launch one SLURM task, and use torch distributed launch utility
# to spawn training worker processes; one per GPU
srun -N 1 -n 1 python main.py -a config \
                                --dist-url "tcp://127.0.0.1:${master_port}" \
                                --dist-backend 'nccl' \
                                --multiprocessing-distributed \
                                --world-size $SLURM_TASKS_PER_NODE \
                                --rank 0 \
                                $traindata

2) Multi-node example (2 nodes, 2 GPUs)

#!/bin/bash -l
#SBATCH -J PytorchDDP
#SBATCH -p cas_v100_4
#SBATCH --nodes=2
#SBATCH --ntasks-per-node=1
#SBATCH --gres=gpu:1
#SBATCH --comment pytorch
#SBATCH --time 10:00:0
#SBATCH -o %x.o%j
#SBATCH -e %x.e%j

# Load software list
module load {module name}
conda activate {conda name}

# Setup node list
nodes=$(scontrol show hostnames $SLURM_JOB_NODELIST) # Getting the node names
nodes_array=( $nodes )
master_node=${nodes_array[0]}
master_addr=$(srun --nodes=1 --ntasks=1 -w $master_node hostname --ip-address)
master_port=$((RANDOM%55535+10000))
worker_num=$(($SLURM_JOB_NUM_NODES))

# Loop over nodes and submit training tasks
for ((  node_rank=0; node_rank<$worker_num; node_rank++ )); do
          node=${nodes_array[$node_rank]}
          echo "Submitting node # $node_rank, $node"
          # Launch one SLURM task per node, and use torch distributed launch utility
          # to spawn training worker processes; one per GPU
          srun -N 1 -n 1 -w $node python main.py -a $config \
                           --dist-url tcp://$master_addr:$master_port \
                           --dist-backend 'nccl' \
                          --multiprocessing-distributed \
                          --world-size $SLURM_JOB_NUM_NODES \
                          --rank $node_rank &

          pids[${node_rank}]=$!
done

# Wait for completion
for pid in ${pids[*]}; do
          wait $pid
done

Last updated on November 11, 2024.

Last updated