Accelerating R with multi-node parallelism – Rmpi, BatchJobs and OpenLava

Gord Sissons, Feng Li

0409-parallel-970-630x420In a previous blog we showed how we could use the R BatchJobs package with OpenLava to accelerate a single-threaded k-means calculation by breaking the workload into chunks and running them as serial jobs.

R users frequently need to find solutions to parallelize workloads, and while solutions like multicore and socket level parallelism are good for some problems, when it comes to large problems there is nothing like a distributed cluster.

The message passing interface (MPI) is a staple technique among HPC aficionados for achieving parallelism. MPI is meant to operate in a distributed, shared nothing environment and provides primitives for tasks (referred to as ranks or slaves) to share state with one another by efficiently passing messages. Rmpi provides an R wrapper to the MPI API making MPI functions accessible to R programmers.

While Rmpi can be used with snow, in a snow environment it is assumed that the MPI job is the only problem running on the cluster. Often in academic or workgroup environments however, multiple users or departments are sharing a cluster and need to run multiple MPI jobs concurrently or mix serial and MPI workloads. When it comes to scaling workloads, there’s nothing like a workload manager.

Here we’ll expand on our previous blog and show how we can express our k-means problem as an Rmpi job that executes in parallel across multiple cluster nodes and cores. Although not an ideal problem to solve with MPI, it serves to illustrate the programming model.

Setting up a cluster with open-source components like OpenLava and OpenMPI can be tricky, so for simplicity we are running this example on a four-node cluster deployed on Amazon web-services via the Teraproc cluster-as-a-service offering. R-ready clusters of up to three nodes (using t2.micro instances) can be run for free using the Teraproc service using the AWS free-tier. Deploying the cluster this way also results in a shared NFS file system so that all of our cluster nodes have the same view of the file system under the /home directory.

The program below reads a 250,000 line data set comprised of x, y pairs and runs k-means to group the data points into five separate clusters, identifying the coordinates of cluster centers. We run the k-means simulation with nsize=100 which starts the algorithm with 100 sets of randomly generated centers comparing runs to find the best result. The higher the value of nsize, the more simulations and the more accurate our result. As explained in our previous blog, running four independent k-means calculations with nsize=25 is equivalent to running a single simulation with nsize=100 so long as we aggregate the results from our parallel tasks on completion. Below is the script that runs k-means without parallelization.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env Rscript
#
# serial.R - Sample script that performs a kmeans analysis on the generated dataset.csv file to find and plot the four centers
# examples included in this repository.
#
# This script will run unmodified on any size compute cluster because the kmeans
# function can only use a single processor core.
#
library(ggplot2)
 
setwd("~/examples")
 
data <- read.csv('./data/dataset.csv')
start.time <- Sys.time()
result &<- kmeans(data, centers=4, nstart=10)     
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
print(result$centers)
data$cluster = factor(result$cluster)
centers = as.data.frame(result$centers)
plot = ggplot(data=data, aes(x=x, y=y, color=cluster )) + geom_point() + geom_point(data=centers, aes(x=x,y=y, color='Center')) + geom_point(data=centers, aes(x=x,y=y, color='Center'), size=52, alpha=.3, show_guide=FALSE)
print(plot)

Running the script yields the result below showing an execution time of 1.79 minutes.

1
2
3
4
5
6
7
8
> source ('examples/kmeans-serial.R')
Time difference of 1.78692 mins
           x          y
1 -0.5074524  0.7377786
2  1.1726992  1.1448767
3  0.6350942 -1.1341592
4 -1.2821850 -0.9832719
>

To take advantage of parallel processing, rather than breaking the algorithm into discrete batch jobs as we did in the previous example, in this case we re-code our example to use the Rmpi package and run a single parallel job.

The modified code is shown below. The function called mpi.function() is essentially our parallel MPI batch job. Inside mpi.function() we define another function called parallel.function() that will be executed by each MPI slave. Because each MPI thread can execute on any host on the cluster, we need to load the dataset from the shared NFS file system inside the function so that each MPI thread has a local copy of the data. We adjust nstart to scale the kmeans calculations depending on how many parallel threads are executing. In this example below, the clusterApply function is used to run six instances of the parallel.function each with nstart set to 17 resulting in a total of 105 randomly selected centers for the overall k-means calculation. As before we aggregate the individual results into a single result by finding the simulation that has the minimum value of tot.withinss (see https://stat.ethz.ch/R-manual/R-devel/library/stats/html/kmeans.html) the total sum of the squares, reflecting the distance between clustered data points and the cluster centers. As before, the R script goes on to plot the results.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env Rscript
#
# kmeans-mpi.R - Sample script that performs a kmeans analysis on the generated dataset.csv file
#
# This example modified to use ten cores, each running nstart=10 to generate a workload
# equivalent to kmeans-serial.R
 
library("BatchJobs")
library(parallel)
library(ggplot2)
 
setwd("~/examples")
 
conf = BatchJobs:::getBatchJobsConf()
conf$cluster.functions = makeClusterFunctionsOpenLava("../rmpi-batch.tmpl")
 
# this is the function that represents one MPI job
mpi.function <- function(i) {
  library(parallel)
  library(Rmpi)
 
  slaveno <- mpi.universe.size() - 1
  if (slaveno < 1) {
    slaveno <- 1
  }
  parallel.function <- function(i) {
    set.seed(i)
    data <- read.csv('data/dataset.csv')
    result <- kmeans(data, centers=5, nstart=i)
    result
  }
 
  cl <- makeCluster(slaveno, type = "MPI")
  results <- clusterApply(cl, c(17,17,17,17,17,17), fun=parallel.function)
 
  temp.vector <- sapply( results, function(result) { result$tot.withinss } )
  result <- results[[which.min(temp.vector)]]
  saveRDS(result, file = "~/examples/data/kmeans-result.rds")
  stopCluster(cl)
  result
}
 
# Make registry for one OpenLava MPI job
reg <- makeRegistry(id="TPcalc")
ids <- batchMap(reg, fun=mpi.function, 1)
print(ids)
print(reg)
 
# submit the job to the OpenLava cluster, specify four cores for MPI parallelization
start.time <- Sys.time()
done <- submitJobs(reg, np=7)
waitForJobs(reg)
 
end.time <- Sys.time()
time.taken <- end.time - start.time
print(time.taken)
 
# Load results of the OpenLava MPI job
final_result <- readRDS("~/examples/data/kmeans-result.rds")
 
# show the results
print(final_result$centers)
 
# plot the results
data <- read.csv('data/dataset.csv')
data$cluster = factor(final_result$cluster)
centers = as.data.frame(final_result$centers)
plot = ggplot(data=data, aes(x=x, y=y, color=cluster )) + geom_point() + geom_point(data=centers, aes(x=x,y=y, color='Center')) + geom_point(data=centers, aes(x=x,y=y, color='Center'), size=52, alpha=.3, show_guide=FALSE)
print(plot)
 
# clean up after the batch job by removing the registry
removeRegistry(reg,"no")

In this case the result was run several times with different numbers of parallel threads and different nsize values. The results are shown in the graph below reflecting 2, 3, 4 and 6 MPI slaves. We were able to reduce our run-time all the way from 1.79 minutes (or 107 seconds) down to approximately 20 seconds using the code example above over a 5-fold improvement.

mpi_results_graph_corrected

Monitoring execution at run-time, we see the difference from the previous example. Instead of our simulation showing up as six discrete jobs, using OpenLava (bjobs) to query our running jobs shows a single job comprised of six parallel parts. Four MPI tasks are running on one EC2 host and two on another.

1
2
3
4
gordsissons@ip-10-0-18-72:~/examples/TPcalc-files/jobs/01$ bjobs
JOBID USER STAT QUEUE FROM_HOST EXEC_HOST JOB_NAME SUBMIT_TIME
149 gordsis RUN batch ip-10-0-18- 4*ip-10-0-2 *s/01/1.R Apr 9 14:49 
                                  2*ip-10-0-26-174

We can see this more clearly looking using the bhosts command to see the allocation of the job across the managed hosts in the cluster. Note that one of the hosts is full (closed) because all four scheduling slots corresponding here to virtual CPUs are busy and on a second host we are using two of four vCPUs to support our 6-way job.

1
2
3
4
5
6
7
gordsissons@ip-10-0-18-72:~/examples/TPcalc-files/jobs/01$ bhosts
HOST_NAME          STATUS       JL/U    MAX  NJOBS    RUN  SSUSP  USUSP    RSV
ip-10-0-18-72      ok              -      4      0      0      0      0      0
ip-10-0-26-174     ok              -      4      2      2      0      0      0
ip-10-0-26-175     closed          -      4      4      4      0      0      0
ip-10-0-26-176     ok              -      4      0      0      0      0      0
gordsissons@ip-10-0-18-72:~/examples/TPcalc-files/jobs/01$

We haven’t bothered explaining how to setup OpenMPI on the cluster hosts because this is done for us automatically using the Teraproc service, but the template file (Rmpi-batch.tmpl) required by BatchJobs is shown below illustrating the syntax being used to actually launch the parallel job using the openmpi-mpirun command.

1
2
#BSUB -q batch
/opt/openlava/bin/openmpi-mpirun -np 1 R CMD BATCH <%= rscript %>

Hopefully this example will be useful to anyone looking for an example of how to use Rmpi together with BatchJobs and the OpenLava workload manager.