MPI Design Discussion

Hi!

A few more things came up and comments are welcome:

  • sticking the mpi-evaluate function into stan-math makes things a bit harder. My current prototype takes advantage of the fact that the workers run the Stan program in the same way as the root and hence the data is being “transferred” for me in an implicit way. We will loose this elegant solution and need to worry about sending data from the root to the workers. Ok by me, just more effort.

  • Can we go with an mpi thing for now which can only be called in scope of the transformed data block and when we are sure that the data hasn’t been messed around after the transformed data block? In this case, we can always cache the data on the workers.

  • This mpi thing will be the first stateful function and I plan to do that with a singleton type design - other ideas?

  • Should we maybe go to a design where we have rectangular parameter blocks, but we do sent all data to all nodes and we also sent the index i of the job number? That would avoid the need for ragged arrays as this is pushed onto the user to deal with it. This design does not scale to huge datasets, but it would be a compromise until we have the ragged arrays.

  • Can we have variadic functions? My idea here is to do the following: The function which is passed by the user has signature as

real[] foo(real[] params, int i, ... data-arguments ...)

So, I leave the data arguments here to be flexible. For a given function from the user this is of course a fixed set of parameters. The … data-arguments… can be int, real or vectors of those. The Stan function itself would not be variadic, but the C++ implementation inside would have to deal with it (Stan parser and the mpi evaluation function in stan-math). C++11 has support for this kind of stuff as I understood.

Comments are very welcome.

Best,
Sebastian

I missed this thread—still can’t figure out how to configure Discourse so that topics don’t just disappear on me without my reading them.

I think I see the problem you’re getting at with trying to put things in the math library. If we try to treat it as a function in the math lib, then we have no way of making sure the data hasn’t changed since the last time we called it. If we don’t treat it as a function in the math lib, then it’s not an operation like any of our other operations (and we use the utility of having a simple map function in the math lib).

I’m confused as to why you’d want to call MPI in the transformed data block.

Are you talking about statefulness in terms of whether to resend data or not? That sounds dangerous if the state is static as it will mess with any threading we may eventually do. This is making it harder to see how we’d put this in the math library.

If we went with indexes, it’d need to be a closure that could link to data. But if we try to make it a closure, we’ll need to set up the whole model again. Maybe that’s what you meant by having the whole program run on each node?

The handling of multiple types is tricky with the ... in C++11—you have to write old-school tail-recursive list peelers.

With MPI we have to cut down on communication and therefore data shall not be sent multiple times. As I learned from you guys whenever something has the type “data” it is not sufficient to assume that this thing may not change. Only data which has been declared in the transformed data block can be assumed to static data.

In my second item, where I speak of “in transformed data block scope”, I was asking if we can restrict our first map implementation to deal with static data only. As I understood, you can detect in the parser that this is the case.

BTW, assuming static data is not enough, we also need to assume to have a fixed number of parameters sets for each evaluation of the function. So the N parameter sets are always mapped in a fixed manner to the M workers. This is an assumption in my prototype as well and makes my life a little easier. If we go to the all-data-to-all-workers design, then this assumption is not needed any more.

In my current prototype the model is setup on the root and on the workers. On all nodes the the program will enter the transformed data block and eventually call the setup_mpi_function, see here

This setup function will only return on the root while on all other nodes the program goes into a loop where it waits for parameters to process. The upside of such an approach is that the data is being transferred to the worker nodes automagically.

Having the transformed data stuff available on the worker nodes as static data which the user can access in the mpi worker function would be very convenient. Would this be doable now that we have closures with C++11? I know that discussion were around this in the past, but we always decided against it.

What we need is a client / server structure where the server sends blocks of parameter sets to the clients and the clients return per parameter set the function value and the gradients. The server is the root process and the clients are the workers.

Splitting things between math and stan makes things a bit more involved, yes. From the math library perspective we want to calculate in a distributed way the gradient of a function with some parameters. Not sure yet how, but if we could leave the data handling bit out of what the math library has to do using closures, that would make things possibly easier. So the function signature of a functor given to math is real[] foo(real[] params) (or for the index design real[] foo(real[] params, int i)). My thinking is to have in math a singleton facility where we can register such functor objects which essentially follow the “Command Design Pattern” as described in Modern C++. Once things are registered we can do repeated evaluations of the functor with varying parameter sets.

The ... stuff can be done at a later stage. I was hoping that the new C++11 features make this easy to deal with and would allow nicer syntax, but we can just defer this as it would anyway be a generalization.

… one more advantage of having the transformed data being read in on the workers locally would be that this data has to be transferred only once by the user. That is multiple chains would be able to read in the same data on the local worker machine. Any other design would copy data many times around.

Sebastian

Thinking a bit about things we could get the map which I suggested into Stan in a very easy way if we are willing to do the same hacks as I have done in my prototype:

  • static data
  • static size of the parameters block
  • a single parallel region only
  • the parallel region may only occur in transformed parameters, model or generated quantities block

If that holds, then we can just let the stan program start on the root and on the workers as usual. The program would reach the map on the root and on the workers in the same way. Therefore, all data gets loaded in the same way and the map will be called on all nodes in exactly the same way. The behavior of the map function then just differs on the workers and on the root:

  • we will assume a fixed assignment of jobs to the workers
  • the workers will simply subset the data relevant to their jobs and then go into a waiting position to wait for the parameters for which they are supposed to do the work which is sent from the root
  • the root will slice the parameter block of size N into as many slices as we have workers and sent these chunks out to the workers. The workers do their work and return things to the root.
  • The map function is only exited on the root process where the Stan program continues as usual. The worker nodes DO NEVER leaver the map function again as they keep sitting there and waiting for new parameter blocks.

This would mimick the design of my prototype. The only real drawback is that we have to live with the above assumptions. My suggestion would be to go with this for now and then generalize later. Having a single parallel region is already a huge step forward vs having none.

Sebastian

I’m still not clear on what this would look like from the perspective fo the Stan language. Are you suggesting we let the functions access variables in the data or transformed data block then make sure they’re in scope on the workers?

The other option we’ve bandied about is requiring the data to be the name of a data variable rather than just being data. Again, this won’t work as a function in the math library—it’d have to be plumbed through specially.

Maybe we have a hangout to go over the options? I can’t make it to this weeks meeting. Maybe tomorrow (or even today) my afternoon?

I only have a limited sense of what is possible in the Stan language/the actual Stan model framework. So maybe if we talk briefly, we converge somewhat faster.

The problem to solve is that we will be running Stan concurrently in a distributed way.

As said, with the (strong) assumptions above we can get an easy solution quickly, I think. Maybe another more general solution is also quickly possible; but I do not see that yet.

Looks like I am one important step further… I now figured a way so that the root can tell the worker to initiate whatever work I want to be executed. So this lifts the restriction of only a single operation which I was suggesting. In essence I “convinced” b boost serialization to sent a pointer of an abstract base class “command” to the workers. The workers then just listen for these objects an call the “run()” method from these. The code being executed then depends on the exact implementation of that derived concrete class being send. This should work out…but may cause headaches when we template this…let’s see.

Next we need to figure out how to deal with the data. I think referring to variables by name would make sense. Then we can register the transformed data variables in such a way and then access them by string names. Possibly not too efficient, but fit for purpose?

It’s a holiday weekend here and I’m out until Wednesday. I’m trying to make some progress on coding.

I think it’s finally sinking in to me why you keep talking about transformed data. If we somehow have the function that’s evaluated in parallel have access to transformed data, we know it doesn’t change. Ideally we’d be able to carve out some of the transformed data, but at worse we’d do what Sean suggested and ship all the data to each node.

We could just use the variables rather than strings—it’s all the same when I have control over the parser. They can take variable names and act like they’re mapping on the function. The question is whether we want to impose a condition on the variables such that they represent the values in a ragged array and the end indexes are also present.

vector[] map_rect(function fun, vector[] foo, vector[] bar, int[ ,] baz);

where the arguments are

  • fun: name of the function with signature (vector, vector, int[]) : vector
  • foo: name of parameter variable N x I
  • bar: name of real data variable N x J
  • baz: name of integer data variable, N x K

and the return value will be N x L. By data variable, I mean a variable declared in the data or transformed data block.

If we wanted to make a ragged version, we could instead take the arguments to be simple 1D collections, each with a matching size N set of endpoints, e.g., foo_ends, bar_ends, baz_ends, declared as data varaibles. This is all relatively easy to verify by the parser.

Clearly it wouldn’t be something we could drop in the math lib. I’m not sure how we’d map the right pieces out to the compute nodes, but the program will have everything it needs at compile time to figure that out.

Ah, i forgot about july 4th.

Looks like we are on the same page now. With the mechanism to send commands I think its now feasible to put things into math as I am now able to tell the workers “do task X” or “do task Y”.

What is left to be done is to be able to tell the workers which data to use in a way such that we only pass pointers to the data around over the network, but the data itself is already local.

One idea could be to use a flyweight implementation. So on all nodes we throw the data and transformed data into a flyweight singleton holder. Then we can instruct the workers to “do task X with data Z”. The task X is essentially identified by a type and the data Z by the index variable we use for the flyweight thing.

Does that make sense?

In your code…from where could I get L inside the map function? That would be good to have for pre-allocation of all output sizes.

To make it more concrete what I mean by sending pointers to abstract classes around, here is an example implementation:

The output I get on my machine is

Started worker 0
Started worker 1
Started worker 2
Started worker 3
Hello world from 0!
Hello world from 1!
Hello world from 3!
Finally finishing the main worker 0
Hello world from 2!
Job 2 says 3
Job 1 says 2
Terminating worker 1
Job 3 says 4
Terminating worker 3
Terminating worker 2

So you see, the hard_work functor could be user supplied. The data would need to be somehow encoded in a similar way. That means that the root sends (as part of mapped_functor, for example) a pointer to the worker node which is sufficient to find the data at the local node in, for example, a flyweight singleton or whatever else technique to make data items persistent, reference-able and static on the workers.

I think you’re right and we need to do something like that. Not sure how that ties into math, though—it won’t be a standalone function if the data’s coming in by another route. So just putting this all in stan-dev/stan would be OK with me.

It depends on what we want to achieve, I think. For one, we aim at a very tight integration with math by dumping results directly into the AD stack… and moreover a MPI enabled autodiff library is a cool thing to have, no?

However, I share your concerns about it, which is why I was not sure at the beginning where to put this thing. One could even think about a math-mpi addon library, but I don’t think it is worth the effort. The thing is that we have to introduce MPI specific concepts into math which allow it to run distributed which requires sending commands and solving that data locality problem.

BTW, I am really not an expert on MPI. In fact, I have never programmed MPI ever before. While my prototype looks very promising, it would be great to run our ideas by some expert. I mean, I don’t think its lame what we do, but learning from experts is usually a good thinkg to do. I am just not aware of anyone who we could ask.

I also don’t know any MPI experts. And nobody’s jumped into this discussion, but it’s on the developers list to which users can’t post.

I think we could drop a map() function into stan-dev/math, but there’s no good way to deal with data that doesn’t change through a simple function.

I think I now have all the pieces together, including the data stuff.

So we are going to focus on static data only which is anything declared in data or transformed data. Problem 1 to solve is to be able to refer to that data in some symbolic way. An easy way of doing that is taking advantage of the fact that the data variables are declared in a fixed order. Hence, we can throw all static data into a huge tuple and the declaration order defines the position in that tuple. Let’s call this mega-tuple for now.

In the code above I managed to essentially encode with a type the functor which gets to be executed. Now we can simply enhance that function with a fewer int template arguments. These int’s then represent the position in that mega-tuple. This way we can form messages from the root to the workers which only involve type-information (functor and int) and these encode what do to (the functor) and what data to use (the position in the mega-tuple).

The mega-tuple will, of course, be generated in all processes in the same way such that same position leads to same data. This will lead to huge tuples, but these are no problem to deal with in C++11.

Problem solved!

This solution pushes a bit of work onto the Stan parsers to generate the right code. The good thing is that only the map function needs special attention for the data arguments which have to be turned into template int arguments. So it will look like this:

Stan code:

real parallel_result[N] = map(user_function, parameter_matrix, Xr, Xi);

Now assume that Xr was the first and Xi the second declared data item. Then this would lead to C++ code which will instantiate a map-functor of the type

mpi_map<user_function, 1, 2>(parameter_matrix);

I hope the idea is clear by that pseudo code. I will try to put together a prototype soon.

I think I’ll have to see more of mpi_map to understand what you’re trying to do here. I don’t even understand why we have a marix rather than array of parameters here.

Is the user_function a functor as suggested by your first usage or purely templated like the second use suggests?

oh, the parameters should be a rectangular block - either a matrix or an array.

And I was a bit sloopy… user_function is what the user defines, but then Stan always creates a user_function_functor which is what the map should get as a first template argument.

I think what I have in my mind will work, but let me code it up which is probably an easier way of communication…

Before I forget: We should disallow any operations in the mapped user function which involves random numbers. Making in that situation things exactly reproducible would be huge headache!

Not sure where you mean. The RNGs can happen in transformed data. But they will use their own chunk of the RNG chain based on the seed. Then each chain, based on chain ID, will jump ahead from there.

I think he is saying that the mapped function cannot call anything that ends in _rng.