library(foreach)
Parallel loops with foreach & doFuture
The foreach
package
The foreach
package implements a looping construct without an explicit counter. It doesn’t require the preallocation of an output container, it brings to R an equivalent of the Python or Julia list comprehensions, and mostly, it allows for an easy execution of loops in parallel. Unlike loops, it creates variables (loops are used for their side-effect).
We will explore an example to calculate the sum of 1e3 random vectors of length 3.
First, let’s launch an interactive job with a single task (by default Slurm grants one CPU per task, so we are asking for one CPU):
salloc --time=50 --mem-per-cpu=3700M --ntasks=1
You can see the full list of salloc
options here.
Then we can launch R interactively:
R
We are now in the R terminal and can start typing R commands.
Let’s load the foreach
package:
Below is a classic while loop:
set.seed(2)
<- numeric(3) # Preallocate output container
result1 <- 0 # Initialise counter variable
i
while(i < 1e3) {
<- result1 + runif(3) # Calculate the sum
result1 <- i + 1 # Update the counter
i
}
result1
[1] 509.4325 504.8698 496.6869
Here is the equivalent code using foreach
:
set.seed(2)
<- foreach(i = 1:1e3, .combine = '+') %do% runif(3)
result2
result2
[1] 509.4325 504.8698 496.6869
We can verify that both expressions return the same result:
identical(result1, result2)
[1] TRUE
The best part of foreach
is that you can turn sequential loops into parallel ones.
There are many parallelization backends available: doFuture
, doMC
, doMPI
, doParallel
, doRedis
, doRNG
, doSNOW
, and doAzureParallel
.
In this lesson, we will use doFuture
, a modern package which allows to evaluate foreach
expressions following any of the strategies of the future
package.
So first, what is the future
package?
The future
package
A future is an object that acts as an abstract representation for a value in the future. A future can be resolved (if the value has been computed) or unresolved. If the value is queried while the future is unresolved, the process is blocked until the future is resolved. Futures thus allow for asynchronous and parallel evaluations.
The future
package allows to evaluate futures sequentially or in various forms of parallelism while keeping code simple and consistent. The evaluation strategy is set thanks to the plan
function:
plan(sequential)
:
Futures are evaluated sequentially in the current R session.plan(multisession)
:
Futures are evaluated by new R sessions spawned in the background (multi-processing in shared memory).plan(multicore)
:
Futures are evaluated in processes forked from the existing process (multi-processing in shared memory).plan(cluster)
:
Futures are evaluated on an ad-hoc cluster (distributed parallelism across multiple nodes).
Consistency
To ensure a consistent behaviour across plans, all evaluations are done in a local environment:
library(future)
<- 1
a
%<-% { # %<-% is used instead of <- to use futures
b <- 2
a
}
a
[1] 1
The doFuture
package
The doFuture
package allows to evaluate foreach
expressions across the evaluation strategies of the future
package very easily.
Let’s load the doFuture
package:
library(doFuture)
This automatically loads the foreach
and future
packages.
We need to choose an evaluation strategy for our futures (e.g. plan(multicore)
):
plan(multicore)
To run the code in parallel, we can now replace %do%
with %dofuture%
.
There is however one last twist: whenever you create random numbers in parallel, it isn’t enough to use set.seed()
to ensure reproducibility. You also need to make sure to generate parallel-safe random numbers. Using the %seed%
operator (with %seed% TRUE
) or the option .options.future = list(seed = TRUE)
pregenerates the random seeds for all iterations using L’Ecuyer-CMRG RNG streams1.
Here are the two equivalent syntaxes:
set.seed(2)
<- foreach(
result3 i = 1:1e3,
.options.future = list(seed = TRUE),
.combine = '+'
%dofuture% {
) runif(3)
}
set.seed(2)
<- foreach(i = 1:1e3, .combine = '+') %dofuture% {
result3 runif(3)
%seed% TRUE }
Of course remember that we asked Slurm for a single CPU (--ntasks=1
). So we don’t have the hardware to run any code in parallel with our current job.
It is now time to play with our code with all serial and parallel methods and do some benchmarking.
Benchmarks
With the overhead of parallelization, it doesn’t make sense to parallelize such a fast code: the parallel version will take longer than the serial one.
Let’s artificially make our code much slower without adding any complexity that would distract us from the parallelization question. To do that, we will simply add a delay at each iteration:
set.seed(2)
<- foreach(i = 1:1e3, .combine = '+') %do% {
result2 Sys.sleep(0.01) # Wait for 0.01s
runif(3)
}
Now, let’s load the bench
package that we will use for benchmarking our various tests:
library(bench)
Reference timing
Let’s first time this to get a reference:
set.seed(2)
<- mark(
bm <- foreach(i = 1:1e3, .combine = '+') %do% {
result2 Sys.sleep(0.01)
runif(3)
}
)
$median bm
[1] 11.4s
Plan sequential
This is the parallelizable foreach
code, but run sequentially:
plan(sequential) # Set the evaluation strategy
set.seed(2)
<- mark(
bm <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
result3 Sys.sleep(0.01)
runif(3)
%seed% TRUE
}
)
$median bm
[1] 10.6s
No surprise: this is similar to the previous timing.
Multi-processing in distributed memory
Create a cluster of workers
To test parallel execution in distributed memory, let’s ask Slurm for 8 tasks by editing our rf.sh
script:
rf.sh
#!/bin/bash
#SBATCH --time=10
#SBATCH --mem-per-cpu=3700M
#SBATCH --ntasks=8
Rscript rf.R # This is the code that we are running
Let’s verify that we do get 8 tasks by accessing the SLURM_NTASKS
environment variable from within R.
Edit rf.R
to contain the following:
rf.R
as.numeric(Sys.getenv("SLURM_NTASKS"))
Run the job:
sbatch rf.sh
We get:
[1] 8
Let’s see which nodes we are using:
rf.R
system("srun hostname -s", intern = T)
We get:
[1] "node1" "node1" "node1" "node1" "node2" "node2" "node2" "node2"
To run the RandomForest code with distributed parallelism using 8 CPU cores across both nodes, we will need to create a cluster of workers. We do this with the makeCluster()
function from the base R parallel
package: we create a character vector with the names of the nodes our tasks are running on and pass this vector to the makeCluster()
function:
## Create a character vector with the nodes names
<- system("srun hostname -s", intern = T)
hosts
## Create the cluster of workers
<- parallel::makeCluster(hosts) cl
Let’s test it:
rf.R
library(doFuture)
<- system("srun hostname -s", intern = T)
hosts <- parallel::makeCluster(hosts)
cl
cl
If we run this code, we get:
Loading required package: foreach
Loading required package: future
socket cluster with 8 nodes on hosts ‘node1’, ‘node2’
Make sure that your code has finished running before printing the output file. Remember that you can monitor the job with sq
.
Plan cluster
We can now run the code in distributed memory parallelism:
rf.R
library(doFuture)
library(bench)
<- system("srun hostname -s", intern = T)
hosts <- parallel::makeCluster(hosts)
cl plan(cluster, workers = cl)
set.seed(2)
<- mark(
bm <- foreach(i = 1:1e3, .combine = '+') %dofuture% {
result3 Sys.sleep(0.01)
runif(3)
%seed% TRUE
}
)
$median bm
We get:
Loading required package: foreach
Loading required package: future
Warning message:
Some expressions had a GC in every iteration; so filtering is disabled.
[1] 1.94s
Speedup: 11.4 / 1.94 = 5.9
. Here again, this is not bad with 8 CPU cores, considering the added overhead of message passing between both nodes.
The cluster of workers can be stopped with:
::stopCluster(cl) parallel
Here, this is not necessary since our job stops running as soon as the execution is complete, but in other systems, this will prevent you from monopolizing hardware unnecessarily.