# Parallelization (again) - MPI to the rescue!

Hi!

Damn…took me forever to get this idea: The bottleneck to parallelize Stan programs is autodiff which cannot be used in a threaded program due to its design with singletons.

Now, MPI runs a program in separate processes and hence AD can be run within each MPI process without a problem. Attached is a toy example program which demonstrates the feasibility to calculate the gradient in each sub-process. Running the program with mpirun -np 4 ./hello gives me this output:

1
./hello
Hello World! from process 1
Worker 1: Gradient of f for theta: 1, 2; gradient: 1, 4
1
./hello
Hello World! from process 2
Worker 2: Gradient of f for theta: 1, 4; gradient: 1, 8
1
./hello
Hello World! from process 0
1
./hello
Hello World! from process 3
Worker 0: Gradient of f for theta: 1, 0; gradient: 1, 0
Worker 3: Gradient of f for theta: 1, 6; gradient: 1, 12


The output is garbled due to concurrent writing to std::cout, but the important points are:

1. It does not crash
3. The gradients per process are correct

Voila!

I think there is now a very clear path now forward to get within chain parallelization going:

• We need to distributed the transformed data to all childs once
• The really expensive functions (ODE, whatever) are handled by the MPI childs
• The MPI childs only recieve parameters for which function values and their gradients gets calculate - for any function as AD can simply be run. Results get passed around in vector only format which is then feed into the main-process AD stack via pre-computed gradients.

That should solve any performance problems given enough machines. The only thing to watch out for is to cut the Stan programs into clever blocks such that the communication cost is low.

And to integrate this with the stan language we could probably think about a _mpi ending for functions which can be used with this approach.

Sebastian

hello.cpp:

#include <boost/mpi.hpp>

#include <boost/math/tools/promotion.hpp>

#include <iostream>

#include <stan/math.hpp>

struct fun {
template <typename T>
T operator()(const Eigen::Matrix<T, Eigen::Dynamic, 1>& theta) const {
using std::pow;
T a = theta[0];
T b = theta[1];
return a + pow(b, 2);
}
};

int main(int argc, char* argv[])
{
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;

std::cout << argc << std::endl;
std::cout << argv[0] << std::endl;
std::cout << "Hello World! from process " << world.rank() << std::endl;

fun f;

Eigen::VectorXd theta(2);
theta(0) = 1;
theta(1) = 2*world.rank();

double fx;

std::cout << "Worker " << world.rank() << ": Gradient of f for theta: " << theta(0) << ", " << theta(1) << "; gradient: " << grad_fx(0) << ", " << grad_fx(1) << std::endl;

return 0;
}


Note: To get the above running, you need openmpi and boost mpi and boost serialization library installed and linked.

4 Likes

You have to make the memory for the singletons thread local. Then you can run in parallel. But you still need to put the entire expression graph back together in one place.

I didn’t mean to imply that your approach wouldn’t also work. You’ve essentially recreated the scaling out approach used by TensorFlow. Specifically, it will let you scale out over processes.

The multi-threading will let you scale up on a single process.

Then there’s scaling (down?) to GPUs.

Each MPI process runs as its own process, which is why it can run on different machines. MPI simply organizes the message passing between these fully independent processes.

Of course, inside the main process one has to put together the expression graph using the pre computed gradients from each of the children.

The key point about this scale out approach is that it can facilitate AD. So a parallel_mpi_for loop is realistically thinkable - with all the nice comfort for the user.

I hope to be able to figure out a prototype which runs also inside a Stan program with reasonable effort. The main problem to solve is to minimize and organize the between-process communication.

Right. You just need a way to ship the gradients back and reinsert them where necessary into the autodiff expression graph.

So cool: I got MPI working for a function which takes a vector of parameters and returns a vector. The code ships the parameter vector to the nodes and ships the function result and the jacobian back to the root where the AD graph is assembled.

Now begins the coding of the task blocking per node and the additional distribution of data only stuff. However, all is working out so far very nicely. If anyone has experience with boosts mpi scatterv and gatherv function, then I am certainly listening.

Looking forward to a benchmark to see how well or bad the communication cost is.

I was talking to a parallel system architect last night and realized that I didn’t know what the time required for each of the ODE solvers you’re doing. Is it more than 1ms per solve? If not, you could potentially organize them into batches.

I’m curious how you’re doing the synch to make sure you can reinsert into the auto-diff graph. We know the shape of the gradients before the call, so should in theory be able to pre-allocate and then actually fill in the values asynchronously. Every node’s parents need to complete before we can run reverse-mode, but that’s much less synch than doing it every call.

Oh yes, odes are expensive and no fun to with stan unless you use the openmp stuff which i have done. I really hope that the mpi will also enable suitable parallelization for non ode problems, which depends on the communication cost.

At the moment i am planning to do the sinplest batching approach possible, i.e. Form deterministic batches by dividing the number of jobs by the number of workers. I may allocate a bit less work to the root node. For the moment being i will first process things in parallel and just collect the double only information at the root node. At the root node i will then use the precomputed gradients calls to build up the ad graph.

Your suggestion to asynchronously build the ad graph and populate it in a second sweep sounds like a good idea, but i want to first do the low hanging fruits and demonstrate a speedup with this simple minded way.

I may ned a hand or some more advise on doing the asynchronous approach…

Looks promising… I am now able to ship real and int data to the mpi nodes, calculate there the result of a function with the signature

template <typename T>
Eigen::Matrix<T, Eigen::Dynamic, 1>
mpi_function(const Eigen::Matrix<T, Eigen::Dynamic, 1>& theta,
const std::vector<double>& x_r,
const std::vector<int>& x_i) {}


The function result and the Jacobian are then computed on the node and the results are shipped back to the main process.

Performing 1000 integrations of a 2-cmt system (3 states, 4 parameters) gives me these timings:

4 cores:
real	0m10,420s
user	0m41,035s
sys	0m0,134s

2 cores:
real	0m20,266s
user	0m40,202s
sys	0m0,165s

1 cores:
real	0m37,563s
user	0m37,431s
sys	0m0,085s


Next step is to plug this into a running Stan program.

Where this is getting really interesting is if we start to combine OpenMP or whatever threading on a machine with parallelization across machines via MPI.

This reminds me on Dunsons talk on putting MCMC onto multiple machines… but here its without any scarification by some approximation (just raw computing power).

Sebastian

Yes! Getting the right result in parallel is super exciting.

Presumably this is on the same machine. The cost for transporting data is very different across machines. Much higher latency and much slower throughput compared to doing it all in memory on a single machine. That’s usually mitigating by cutting down on what gets communicated and doing larger batches of work in each external call.

Looks like I am now at the point where the MPI is correctly running in my Stan program. I am getting the same results from the MPI parallelized program with 4 cores vs 1 core only. Looking at the running times, there is apparently some overhead:

MPI 4 cores, 40 jobs:
real	1m1.516s
user	4m3.943s
sys	0m0.727s

1 cores, 40 jobs:
real	3m7.077s
user	3m6.306s
sys	0m0.324s


However, this is the very first working prototype, i.e. memory alignment has not yet been given much attention. We should think about a cluster-stan edition of cmdstan (or a contrib directory in cmdstan)?

I am curious as to how programs with analytic Gradients can benefit from this approach. Lets see.

Sebastian

Three times as fast with four cores is fantastic. I’m very curious how it scales with CPUs. What winds up happening is that memory contention and synchronization take up proportionally more of each CPU’s work, so minimizing them becomes much more important for throughput.

And the backward pass is still going to be serial, so scaling with overall problem size is another consideration—the time to take the (reduced) backward pass is still going to lower-bound the amount of time it takes per iteration, and it will go up with problem size.

To solve the synch problem, we should be able to insert operands-and-partials results from the different processes asynchronously since we know how big the gradient is going to be in the autodiff expression graph ahead of the call and just need to write it there; it’s then immutable until the actual backward pass. The only problem is then that we have to still wait until some loop-end point if anything past that computation point depends on the value.

A thought just occurred to me—we can often compute the value quickly and that’s all we really need to go on with reverse mode. We don’t actually have to wait for all the gradients to come back. I should hold that thought until much later…

Anyway, I’m very excited about this and want to start thinking about how we can build it into the language. The operands-and-partials refactor Sean’s doing might help, as it gives you a neat way to collect up results. And I’m hoping it will play nicely with some closure-based functions I’m thinking of adding (so the autodiff functions automatically get access to data and parameters—though parameters will still have to be flattened for Jacobians internally).

P.S. I always find these timing reports super confusing, so I finally searched for a definition and it’s what you’d expect from the numbers but not the names.

Real refers to actual elapsed time; User and Sys refer to CPU time used only by the process. Real is wall clock time - time from start to finish of the call."
1 Like

Having this in Stan soon would be really amazing; it lets Stan scale with number of computers. Just a few points:

• In other problems I tried I saw a 2.8x speedup when going from 1 to 4 cores. I will throw this onto a bigger problem on our cluster with 20 cores per machine and run similar benchmarks as I did for the OpenMP stuff.

• The way boost::mpi works is to sent around the data using boost::serialization. My idea to speed things up is to either attempt to ship the full ChainableStack from the childs to the root which has the drawback that the stack can be much larger than what is actually needed, but has the advantage that its hopefully simple to do. The other idea is to write the Jacobian directly into data-structures like the stored_gradients and then ship these around. The upside of doing so is that with the anyway needed step to restore these objects inside the root, then the restoring process itself would directly build the AD tree. I am not yet sure how to do that given the AD stack design and the boost serialization

• Your thought to maybe not compute gradients which are not needed could be doable with the futures concept in C++11. There you can declare a future and give it a lazy evaluation execution strategy. Hence, if its never evaluated it never gets executed.

• An inclusion into the main Stan would be great…the dependencies are manageable as they are all in boost. That is, one needs the build boost mpi and serialization library in binary form installed, but the biggest problem for the user is the mpi installation which is either done from macports or is already there in a cluster installation (no clue on windows though).

Sebastian

I wasn’t intentionally talking about not computing gradients (though that would be great). I meant we didn’t have to wait for results if we pre-allocate.

I think we want to do all the derivatives on the remote processes and just ship back the partials. The caller should know how to insert them all. As I keep saying, I think that can be preallocated and filled asynchronously on return.

Have you thought at all about how we could express the parallelism in the Stan language?

Is there an example in the stan math library which illustrates the pre-allocation design pattern? I have looked at code for calculating the variance and I think this goes in the direction of what you have in mind. Although, you want to go one step further and link parents to childs already in advance as I understand you. Once the real function values and Jacobians stream in they are filled into the right places… if I got you right.

The trick will be to make the deserialization process work exactly this way. I think then it will run asynchronously, although I have to check the boost mpi implementation here.

I have invested some thought into how this goes in the language, but this is far from user-friendly what I have. Essentially the design follows the ODE pattern. So the user defines a function with the signature

real[] mpi_function(real[] theta, real[] x_r, int[] x_i)

which does the work for each task. The user first has to call the function

int[] setup_mpi_function(int M, real[,] X_r, int[,] X_i)

which behaves differently on the root node and on the workers. On the root node this function does not do much except to calculate the work chunk assignment, but on the worker node this function goes essentially into an infinite loop and it waits for sets of parameters to receive from the root for which to calculate the function and the Jacobian. The setup function is intended to be called inside the transformed data block. I think in a proper Stan implementation we could call this function after the transformed data block.

Inside the model the user then calls a function with the signature

vector run_mpi_function(real[,] Theta, int[] chunks, real[,] X_r, int[,] X_i)

The chunks are from the setup function, Theta are all the parameters, X_r/X_i is real and integer data. These data-structures define per row one job (a ragged array and possibly tuples would be nicer here).

This way of coding works, is similar to the ODE pattern, but maybe not super user friendly.

If you want I can throw my research code on a branch on cmdstan so that you can have a look if you like.

Sebastian

No, there aren’t examples yet. But it’ll be simple. We just have to reserve the space the eventual vari will take up. If that’s something like precomputed_gradients_vari, we can back out the exact memory layout from the class (C++ is great that way), and just fill in the values later. We’ll know the operand pointers when we start, but not the result value or the gradients, which we can fill in asynchronously.

I was thinking something either more or less ambitious than what you lay out. At one level, we can encapsulate things like the ODE solvers to run multiple instances as data. That’d be a lot less ambitious as there’d be nothing for the user to do but call another function. At the other extreme, we could add loop parallelism and try to sort it out in general. I don’t think we could do that without some serious synching.

I was able to to get a 3.6x speedup (when going 1 to 4 cores) on an example with not too large communication cost. The trick was to make MPI perform bit-wise copying. So if I have M parameters and J jobs, then I sent out from the root to the nodes a M x J matrix (sliced into pieces accordingly). Then the nodes return a (M+1) x (J x \sum N_i) matrix. The first row is the function result and the M remaining rows contain the gradient. The number of columns is J, the number of jobs, times the total output size over all jobs.

All data is saved in column major Eigen matrices which I can pass to the MPI library as raw memory buffers.

From my understanding it would be best to allocate that chunk of memory on the AD stack directly, setup with precomputed_gradients_vari the needed AD tree pointers directly into that chunk of memory. Doing so would enable that the later “fill-in” is done by the deserialization process automatically given that things align in memory. That should make is relatively efficient.

Hmm… I just moved to an implementation where the function values and gradients are directly written to the AD stack. However, I cannot pre-allocate all precomputed_gradients_vari on the root prior to recieving the results from the nodes. The reason is that in the var implementation the value is declared const such that one can only set it at the time of construction. So either there is another way or the vari needs to be changed…

Hmm, indeed. As you point out, the problem isn’t the gradients, it’s the value. We could easily write a version that doesn’t take the gradients and fill those in later. But the value’s a problem.

Now that I think about it, there’s really no reason to preallocate any of this. You can just allocate as things get inserted back. The idea is that whatever is getting executed is independent in the expression graph so the order they get added won’t matter at all. And we have to synch up to wait for values to go to the next step, anyway.

Sorry about the misleading advice. If it comes down to it, we can probalby modify vari so that the value isn’t const—I doubt it’ll do anything to the efficiency.

What I can’t tell is how we get the operands, partials and values back into the expression graph. So they can be done asynchronously, but you need the operand pointers to reinsert, but we don’t need to send the pointers over the network, only the values. Then we get the value and gradients back, right?

Will that be possible with MPI?