Parallelization (again) - MPI to the rescue!

The mpi interface has commands for collecting smaller memory blocks into a single large one on the root. So i preallocate a large chunk of memory on the root and then loop over the results to create the precomputed gradients. The pointers are only setup on the root and on the workers i do call grad and write the gradients to rhe local chunk of memory. One can do the setup of the pointers asynchronosly on the root. Maybe that buys us a little bit. Apart from that we are getting 3.6x speed when there is 4x to get which is not too bad. Lets see how it scales with larger problems on a cluster.

Hi Bob!

I finally got to push my prototype code to github. Things are now on a branch on cmdstan. The key file is this:

The code runs provided that you supply a binary boost serialization and boost mpi (and some mpi implemention; openmpi from macports will do).

Let me know if you have questions or need some more comments for documentation in the code. This is not pretty code, just a prototype…

I will try this approach out on a larger problem in a cluster environment to see how it really scales.

Sebastian

Whow… I am stunned by these results, but have a look yourself. Note that the 1 core result did not finish yet, but I am not patient enough to wait longer (I assumed an exact 2x speed increase when going from 1 to 2 cores which may not be true).

Long story short:

  • starting from 2 CPUs upward the scaling with additional CPU cores in speed is essentially linear.
  • The problem size was 1600 subjects
  • The problem was a rather simple model
  • All MPI runs were run on the same machine, so no network traffic
  • per subject 10 observations were calculated and the problem had 7 parameters (hence 80 doubles need to be transferred per subject)
  • results match exactly by all digits

Before rolling this out to a real model, I need to introduce ragged array structures. Otherwise it would be too complicated to adapt this code to the messy real world data.

This looks very promising, I hope this can land sooner rather than later in Stan!

Best,
Sebastian

ode_mpi_parallel.pdf (5.7 KB)

Bump.

Minor update as the 1 core and 20 core results are finished. Going from 1 to 2 cores gives a 1.9x speedup “only” (which is probably due to Turbo mode the intel processors can use if there is not much load on the server during this run).

However, all in all this is great news for big models and I am very curious how this approach performs on one of our bigger models and datasets.

Sebastian

ode_mpi_parallel.pdf (6.0 KB)

1 Like

Awesome. This is fantastic. 15x speedup with 16 cores is more than I would’ve hoped for.

The first graph would be more visually appealing if you used a logscale for time—then the line would be pretty straight; as is, it doesn’t look like efficient parallelism just scanning by eye (until you get to graph 2).

I’m replying to an off-list question from Sebastian about issues with running across multiple machines having good scaling properties but not returning the exact same results.

With good inter-machine communication, I’m not too surprised. It could even work faster if there were a lot of memory contention on a single machine.

Results aren’t guaranteed across hardware, OS, C++ compiler or even optimization levels.

We need to work out what to do with NaN transmission in general—this is coming up in other areas.

As to how we can get this into Stan, what we need is an abstraction that we can code. We were talking during the meeting about a general map function (same “map” as in map-reduce and in the Lisp function). In C++, this will look like

template <typename F, typename T, typename R>
vector<R> map(const F& f, const vector<T>& x) {
  vector<R> result;
  result.reserve(x.size());
  for (size_t i = 0; i < x.size(); ++i)
   result.push_back(f(x[i]));
  return result;
}

As long as f and x and f(x) are not stateful, this can be done safely in parallel. The easiest parallelization would be to have something like a parallel map.

A perhaps easier first step would be to write a very specific parallel ODE function. It would just turn all the inputs into a vector, but given that we don’t have ragged arrays, this seems tricky either way.

When you say you want to get this into Stan quickly, what specifically do you think we should be adding?

My apologies there was no special reason why this could not have gone directly here (besides the fact that this is an almost only conversation between the two of us):

I just wanted to share that I tried out MPI across machines on our cluster. So I ran the program on 40 cores to have it at least being spread over 2 machines, but the queuing system spread it over even more machines. The results did blow me away: I got essentially a linear scaling of the performance even under this scenario. Probably our high-performance infiniband network contributes to that, but I am still shocked.

That’s the good news. The bad news is that when running across different machines the results are different as compared to when running on a single machine. At least I am always getting exactly the same results when running on multiple machines. Since I indicate with NaN that there were failures (which triggers the root to raise an exception) I am suspecting that NaN gets not transmitted correctly, but I have to follow this up.

All in all if this turns out to hold and assuming I am able to fix that glitch this is an incredible speedup (5.5h down to few minutes).

Thanks for posting on the group list. We’re trying to keep all the design discussion for the project as public as possible, so I keep encouraging people to post here rather than send me personal email.

This is super exciting. Even more so than GPUs in a lot of ways because we have more code that isn’t GPU-friendly and pretty much any scalable likelihood should be parallelizable this way. The really awesome part is that the two speedups are completely orthogonal. So we could do distributed Cholesky factorization on GPUs.

Following your discussion is useful also for those who are not commenting. Very interesting results.

Aki

2 Likes

As to how to add this to Stan my thoughts are as follows:

I do not think that an ODE only function is really a good choice. One would anyway have to deal with ragged arrays there as well. And in addition I almost always only need the solution of the ODE for a single compartment. Hence, subsetting directly to just one compartment in the mapped function cuts down the communication cost significantly.

Moreover, my experiments so far show impressive speedups and I am convinced that any Stan program where you build a hierarchical model over many units will benefit from this as long as the computational cost per unit is somehow high. This does also apply to population PK models with analytic solutions due to the tedious dosing to take care of.

My goal would be to be able to run this MPI stuff from a vanilla cmdstan installation. For this I see two steps:

  1. Generalize the build system so that I have control over the build process with user-defined variables. Specifically, I would need to be able to define my own main.cpp. Having that would allow me to use the cmdstan installation on our production system, but inject the necessary code. This is grey zone, but probably ok.

  2. I do think that a general map is most useful. The problem to work around is to share the data with the childs just once. A great solution would be to ship the transformed data to the childs and have the map function pass an integer of the current job into that mapped function. Then the user can take care of the ragged array business himself. As an alternative one can use a structure as in my example which is based on regular matrices / arrays.

This stuff is stunning news for our applications of Stan… I shared the good news with a colleague of mine who is already a bruned child from long Stan runs (days) and this would be THE solution to it.

1 Like

Good news. It’s not pretty, but I figured out how to solve all of our problems with one ugly map function. First, without thinking about data, this could be

R[] map(F f,
        real[] params, int[] param_starts,
        real[] data_r, int[] data_r_starts,
        int[] data_i, int[] data_i_starts);
        

where f is a function with signature

R f(real[] params, real[] data_r, int[] data_i);

Like with the ODE functions, the param variables can be parameters and the data variables have to be constant.

The definition of the function in Stan-ish pseudocode would be

R[] map(F f,
        real[] params, int[] param_end,
        real[] data_r, int[] data_r_end,
        int[] data_i, int[] data_i_end) {
  ... validate all ends are same size, in bounds, ordered
  R y[size(param_starts)];
  int param_start = 1;
  int data_r_start_ = 1;
  int data_r_start = 1;
  for (i in 1:size(param_end) *in parallel*) {
    y[i] = f(param_start[param_start:param_end[i]], 
             data_r[data_r_start:data_r_end[i]],
             data_i[data_i_endi:data_i_end[i]]);
    param_start = param_end[i] + 1;
    data_r_start = data_r_end[i] + 1;
    data_i_start = data_i_end[i] + 1;
  }
  return y;
}

Now, if we want to restrict this to allow for the data to be only posted to the remote processes once, we could restrict all the data arguments to be variables declared in the data or transformed data blocks. The only thing we lose is the ability to make the data arguments dependent on parameters, which is probably a bad idea anywya.

This will work for PK/PD and it’ll work for likelihoods.

It will be much much easier to implement if we can pick one type for R. Such as a vector. The downside is that it again leaves the calling function to unpack.

Essentially, this will be the map part of map-reduce (the etymology is the same as the map function in Lisp)—it computes the Jacobian of y[i] w.r.t. params and then messages the result back to the controller, which will then merge the Jacobians into the expression graph and block until all Jacobians are merged before returning a value from map. We know the size of y is the same as that of param_end, but we don’t know the size of each entry of y ahead of time. It will have to be rectangular (or padded) to fit into Stan’s existing data types.

1 Like

Hi!

I know its a bit of a pain to code with this ragged structures a lot…but this is how most of my Stan programs look right now anyway. So this sounds great to me. A few questions/comments:

  • Will this backtracing of the data also work for data which is passed into functions? This is not a requirement, but would be nice to have

  • In all my applications I have in mind, I would not need a ragged structure over the parameters. That is, a simple 2D array of real[,] params would be enough. However, I can imagine that this could be restrictive and I am totally fine with also making it ragged.

  • The output of the map function should again be a ragged array. The user has to know the output structure in advance anyway to make sense of the (padded) output. Hence, I would suggest to pass into the map function another int[] f_starts which essentially holds the output sizes of each block. Having that, we can then just concatenate the output from each function evaluation to a big ragged array. This would avoid lots of padding which will be very costly as all of this will end up on the AD stack. And again, the user anyway has to know the ragged structure (and hence the int array) in advance to make any sense from the output.

  • Last: We haven’t discussed yet, I think, but we should go for a map which we configure at compile time to execute via mpi, some other yet to be thought of parallel engine or serial execution. This way we can ensure that programs like this will work in Rstan as well where we do not have MPI available. Would that make sense?

I agree that this is a bit awkward, but it will have a big impact for those big models for which Stan is built for.

Sebastian

  • I’m not sure what use case you have in mind with functions. I don’t think you’d be able to call the data-only variables from a function, because they wouldn’t be in scope.

  • I realized from your bullet point two that if one wanted to share parameters (which should be OK for autodiff), then they’d have to be duplicated

  • Absolutely this function needs to work without MPI. The discussion around GPUs is that we will have a top-level compiler flag that turns them on, then each function will look at the size of the matrices it’s dealing with and decide if they’re worth sending to GPU; we could do the same thing here. I don’t know if we want to somehow configure the number of nodes or how any of that is done. Ideally, we’d then want to be able to use GPUs on the distributed nodes. Then we’d have the same kind of flexiblity in scaling out and scaling up as Tensorflow (though without the node-based design)

Glad you’re OK with the general design—it’ll make using this clunky, but do-able, which I always think is the first step if an elegant solution would require way too much work and you need the functionality.

That’s fine if only inside the scope of transformed data it can be pre-shared with the workers in order to cache it locally on the worker nodes. I was just wondering if you could even do more magic with the language.

And yes, parameters need to be duplicated if you want to use a shared parameter.

I was arguing for a regular 2d data structure for the parameters. However, I think there are applications where we may even have a varying number of parameters per case. So in short, a ragged array structure there would be good as well.

What are your thougts on asking the user to input a f_starts int array which gives the ragged structure of the output. This will avoid unneded padding and allows for a more efficient implementation as we do know all sizes in advance.

I think we already have a good plan. How about we move this to a wiki page to make the design more concrete?

Do others have thoughts on the design which has been put forward so far?

Sebastian

Starting a design wiki sounds good. You can just put it on stan-dev/stan.

Here is a start:

We should settle on the open points and then we should be able to drill down further.

Of course, if you want to add anything, go ahead!

What’s the best way to engage with this proposal? Should we make it a google doc or pull request so we can do comments / questions / etc inline? Or just edit the wiki with some text markers indicating who wrote it / something like that?

We’ve been just editing wikis, sometimes leaving multiple proposals for comparisons.

This looks great. I edited the Wiki doc with some questions. Maybe it would be easier to have the design conversation here and just keep the Wikipedia for our current thinking. We never did that with Google Groups because they were hard to manage.

Assuming all the MPI stuff is doable, I’m happy with the spec as is. Is there more you think we need?

Discussing here and editing our wiki consistent with our current thinking sounds good. Let me try to answer the questions:

  • Question: is there a way to do this on the first call and then set a flag we can check later to make sure the variables have already been shipped?

Shipping the data on the first call only and then flag it can be done, yes. We could make the workers wait for data for the very first invocation only. The root would then sent data only during the iteration 1 if we can do that.

  • Would it be easier to generate a single main.cpp and have the compilation of it controlled by some kind of flag?]

My initial thought when defining a new main.cpp was that we could make the build system more flexible in allowing for choices in the main.cpp. However, I don’t have a preference for any way of implementing this detail of using different mains or some compiler flags. Also note that the prototype was really just intended to get things up and running, nothing more. My choices there were made to get a prototype up and running and to get some experience with the MPI approach.

  • Can we do this more efficiently than spin-waiting?

The waiting for new packets of work is done with this code:

I am not sure what you have in mind to improve it, but what I have done was the obvious way to do it. From my persepctive one could complement this with some code which will first wait for an instruction which tells the worker first what is happening next and then recieves its work chunk or does whatever else the root wants the worker to do (for example, exit gracefully)

What about goals and non-goals… are these fine? In particular is it fine to have a single map call in a Stan program? Also… thinking about the flexibility of the Stan language, how can we ensure that the data is really constant? The user could in dependence on parameter values choose different sub-index vectors to subset the data. Could this be detected?

I think that if we detect that data is not guranteed to be persistent accross calls, then we should warn the user at the compilation stage if we can do that.

What I never thought about… but this code will go into the stan repo, right? This is my first addition to that layer, so I am not 100% familiar with it yet.

Sebastian