library(foreach)Parallel loops with foreach & doFuture
The foreach package
The foreach package implements a looping construct without an explicit counter. It doesn’t require the preallocation of an output container, it brings to R an equivalent of the Python or Julia list comprehensions, and mostly, it allows for an easy execution of loops in parallel. Unlike loops, it creates variables (loops are used for their side-effect).
We will explore an example to calculate the sum of 1e3 random vectors of length 3.
First, let’s launch an interactive job with a single task (by default Slurm grants one CPU per task, so we are asking for one CPU):
salloc --time=50 --mem-per-cpu=3700M --ntasks=1You can see the full list of salloc options here.
Then we can launch R interactively:
RWe are now in the R terminal and can start typing R commands.
Let’s load the foreach package:
Below is a classic while loop:
set.seed(2)
result1 <- numeric(3) # Preallocate output container
i <- 0 # Initialise counter variable
while(i < 1e3) {
result1 <- result1 + runif(3) # Calculate the sum
i <- i + 1 # Update the counter
}
result1[1] 509.4325 504.8698 496.6869
Here is the equivalent code using foreach:
set.seed(2)
result2 <- foreach(i = 1:1e3, .combine = '+') %do% runif(3)
result2[1] 509.4325 504.8698 496.6869
We can verify that both expressions return the same result:
identical(result1, result2)[1] TRUE
The best part of foreach is that you can turn sequential loops into parallel ones.
There are many parallelization backends available: doFuture, doMC, doMPI, doParallel, doRedis, doRNG, doSNOW, and doAzureParallel.
In this lesson, we will use doFuture, a modern package which allows to evaluate foreach expressions following any of the strategies of the future package.
So first, what is the future package?
The future package
A future is an object that acts as an abstract representation for a value in the future. A future can be resolved (if the value has been computed) or unresolved. If the value is queried while the future is unresolved, the process is blocked until the future is resolved. Futures thus allow for asynchronous and parallel evaluations.
The future package allows to evaluate futures sequentially or in various forms of parallelism while keeping code simple and consistent. The evaluation strategy is set thanks to the plan function:
plan(sequential):
Futures are evaluated sequentially in the current R session.plan(multisession):
Futures are evaluated by new R sessions spawned in the background (multi-processing in shared memory).plan(multicore):
Futures are evaluated in processes forked from the existing process (multi-processing in shared memory).plan(cluster):
Futures are evaluated on an ad-hoc cluster (distributed parallelism across multiple nodes).
Consistency
To ensure a consistent behaviour across plans, all evaluations are done in a local environment:
library(future)
a <- 1
b %<-% { # %<-% is used instead of <- to use futures
a <- 2
}
a[1] 1
The doFuture package
The doFuture package allows to evaluate foreach expressions across the evaluation strategies of the future package very easily.
Let’s load the doFuture package:
library(doFuture)This automatically loads the foreach and future packages.
We need to choose an evaluation strategy for our futures (e.g. plan(multicore)):
plan(multicore)To run the code in parallel, we can now replace %do% with %dofuture%.
There is however one last twist: whenever you create random numbers in parallel, it isn’t enough to use set.seed() to ensure reproducibility. You also need to make sure to generate parallel-safe random numbers. Using the %seed% operator (with %seed% TRUE) or the option .options.future = list(seed = TRUE) pregenerates the random seeds for all iterations using L’Ecuyer-CMRG RNG streams1.
Here are the two equivalent syntaxes:
set.seed(2)
result3 <- foreach(
i = 1:1e3,
.options.future = list(seed = TRUE),
.combine = '+'
) %dofuture% {
runif(3)
}set.seed(2)
result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
runif(3)
} %seed% TRUEOf course remember that we asked Slurm for a single CPU (--ntasks=1). So we don’t have the hardware to run any code in parallel with our current job.
It is now time to play with our code with all serial and parallel methods and do some benchmarking.
Benchmarks
With the overhead of parallelization, it doesn’t make sense to parallelize such a fast code: the parallel version will take longer than the serial one.
Let’s artificially make our code much slower without adding any complexity that would distract us from the parallelization question. To do that, we will simply add a delay at each iteration:
set.seed(2)
result2 <- foreach(i = 1:1e3, .combine = '+') %do% {
Sys.sleep(0.01) # Wait for 0.01s
runif(3)
}Now, let’s load the bench package that we will use for benchmarking our various tests:
library(bench)Reference timing
Let’s first time this to get a reference:
set.seed(2)
bm <- mark(
result2 <- foreach(i = 1:1e3, .combine = '+') %do% {
Sys.sleep(0.01)
runif(3)
}
)
bm$median[1] 11.4s
Plan sequential
This is the parallelizable foreach code, but run sequentially:
plan(sequential) # Set the evaluation strategy
set.seed(2)
bm <- mark(
result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
Sys.sleep(0.01)
runif(3)
} %seed% TRUE
)
bm$median[1] 10.6s
No surprise: this is similar to the previous timing.
Multi-processing in distributed memory
Create a cluster of workers
To test parallel execution in distributed memory, let’s ask Slurm for 8 tasks by editing our rf.sh script:
rf.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=3700M
#SBATCH --ntasks=8
Rscript rf.R # This is the code that we are runningLet’s verify that we do get 8 tasks by accessing the SLURM_NTASKS environment variable from within R.
Edit rf.R to contain the following:
rf.R
as.numeric(Sys.getenv("SLURM_NTASKS"))Run the job:
sbatch rf.shWe get:
[1] 8
Let’s see which nodes we are using:
rf.R
system("srun hostname -s", intern = T)We get:
[1] "node1" "node1" "node1" "node1" "node2" "node2" "node2" "node2"
To run the RandomForest code with distributed parallelism using 8 CPU cores across both nodes, we will need to create a cluster of workers. We do this with the makeCluster() function from the base R parallel package: we create a character vector with the names of the nodes our tasks are running on and pass this vector to the makeCluster() function:
## Create a character vector with the nodes names
hosts <- system("srun hostname -s", intern = T)
## Create the cluster of workers
cl <- parallel::makeCluster(hosts)Let’s test it:
rf.R
library(doFuture)
hosts <- system("srun hostname -s", intern = T)
cl <- parallel::makeCluster(hosts)
clIf we run this code, we get:
Loading required package: foreach
Loading required package: future
socket cluster with 8 nodes on hosts ‘node1’, ‘node2’
Make sure that your code has finished running before printing the output file. Remember that you can monitor the job with sq.
Plan cluster
We can now run the code in distributed memory parallelism:
rf.R
library(doFuture)
library(bench)
hosts <- system("srun hostname -s", intern = T)
cl <- parallel::makeCluster(hosts)
plan(cluster, workers = cl)
set.seed(2)
bm <- mark(
result3 <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
Sys.sleep(0.01)
runif(3)
} %seed% TRUE
)
bm$medianWe get:
Loading required package: foreach
Loading required package: future
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
[1] 1.94s
Speedup: 11.4 / 1.94 = 5.9. Here again, this is not bad with 8 CPU cores, considering the added overhead of message passing between both nodes.
The cluster of workers can be stopped with:
parallel::stopCluster(cl)Here, this is not necessary since our job stops running as soon as the execution is complete, but in other systems, this will prevent you from monopolizing hardware unnecessarily.