MPI Design Discussion

I documented mpi_command, mpi_cluster and mpi_distributed_apply. In the documentiation I also explain which macros I think we should introduce.

What also helps to follow things is to look at the map_rect_mpi test which there are in prim and rev under mat/functor in the test tree.

Thanks, @wds15 Iā€™ll just ask questions on this thread.

Why is the code split between arr and mat? Not sure if both are used.

sure. the files in arr are the cluster handling things which do not have any dependency on Eigen. Hence I placed them there. I just tried my best guess at where it should go, but if you think mpi_* should go in a single directory, then we can do that.

Sebastian:

To work with Stan, the map_rect functions are going to need to accept functors, not just template parameters. Iā€™ll build the map_rect_serial in its own file. Right now, the serial version looks like itā€™s implemented with parallelism. From the Stan language, weā€™ll require a simple function (that is, not an lpdf/lpmf/lcdf/lccdf, not an rng, and not an lp modifier).

Iā€™ll create a very simple direct implementation of all of this. I seem to recall you mentioning something about wanting the autodiff stack to be the same each way. That seems like a bad way to implement the map non-serially, but it could probably be done. Iā€™ll do the simple version first and then we can think about the more complex one after youā€™ve implemented the parallel version so I know what the target is going to be.

Hereā€™s what I have for the serial version:

template <typename F, typename T, typename U>
Eigen::Matrix<typename stan::return_type<T, U>::type, -1, 1>
map_rect_serial(const F& f,
                const Eigen::Matrix<T, -1, 1>& shared_params,
                const std::vector<Eigen::Matrix<U, -1, 1>>& job_params,
                const std::vector<std::vector<double>>& x_r,
                const std::vector<std::vector<int>>& x_i,
                std::ostream& msgs) {
  const char* fun = "map_rect (_serial)";
  check_matching_sizes(fun, "job parameters", job_params, "real data", x_r);
  check_matching_sizes(fun, "job parameters", job_params, "int data", x_i);

  Eigen::Matrix<typename stan::return_type<T, U>::type, -1, 1> 
    result(job_params.size());
  for (int i = 0; i < result.size(); ++i)
    result[i] = f(shared_params, job_params[i], x_r[i], x_i[i], &msgs);
  return result;
}

Two changes:

  • constant functor argument
  • reference to the messagesā€”we should be converting everyting to references going forward

for the second, we do not want to be checking if pointers are NULL everywhereā€”itā€™s super error prone and also too verbose. Weā€™ll be moving everything to pointers.

This does not even try to get the exact same answer as the MPI version.

The code all needs to be reorganized toward two goals:

  1. Allowing map_rect to compile without MPI (right now, map_rect.hpp includes all of MPI)

  2. Respecting our include order and not having files in prim directories include rev.

Should I start trying to do that?

As far as functors go, this is a hard choice that can go two ways:

  1. We figure out how to serialize proper functors. This would let us use lambdas and closures. Sounds like itā€™s complicated.

  2. We just pass type information and forego using lambdas in the code. This will work with what we have now, as our functors are stateless and have nullary constructors. It wonā€™t work with lambdas. I was really hoping we could move to lambdas so that we could pass system functions with implicit reference to parameter and data variables in scope (rather than having to pack and unpack).

The implementation on the branch doesnā€™t match the design document:

The design doc passed in a functor (though somehow it got registered as a non-referenceā€”it should be a constant reference, not a copy).

Let me answer those things which came up:

  • your implementation of map_rect_serial would allow only a single value to be returned by each function call. However, the function signature of the user function returns a column Eigen vector of arbitrary length. Each job is allowed to return an arbitrary number of elements in that vector such that the final output is a ragged vector. A implementation of that which is very straightforward is in map_rect.hpp on line 121, see here.

  • the functor is passed as type only at the moment which we agreed a while ago on. Thinking about it, we should be able to support functors efficiently which are static such that we would only need to serialize these once. However, this would require us to make these functors serializable via boost. I doubt this will work with lambdas and it will certainly require additional serialization code. Looks like a headache to me.

  • map_rect should compile without any MPI stuff if you do not define STAN_HAS_MPI. In case it pulls in any MPI stuff whenever STAN_HAS_MPI is not defined is a mistake and that should be easy to fix. The design has this specifically in mind to keep things separate.

  • the map_rect_serial implementation , the one here, is not parallelized, but it does work exactly the same way as the MPI implementation. I do not understand why it is a bad idea to do like this. After all, this implementation cuts down a larger problem I gave it from 17h down to 10h on a single-core run. I explain below the rough design in a nutshell.

So the basic idea @seantalts and myself developed was to split the map/reduce operation such that we essentially have two steps

  • a reduce: this functor is passed in as doubles only all things needed to calculate results from a single job (shared & job-specifc params/real data/int data). The function is expected to return as Eigen Matrix all outputs which are column major ordered. So each column contains the result of the functor plus additional stuff like the gradients of the functor wrt to the operands which are vars.

  • a combine step: this functor is given all the results computed and is responsible for inserting results into the AD graph on the root.

As you see, the first reduce step is fully ignorant of MPI. The second combine step however can be split into two parts. The first is to collect all local results on the workers on the root. This first part requires MPI. The second part is again independent of MPI.

This is the reason why the map_rect_serial uses map_rect_reduce as reduce step and it uses map_rect_combine as combine step. Both are MPI free. On the other hand, the map_rect_mpi uses map_rect_reduce (the same as before, no difference) and mpi_map_rect_combine. the mpi_map_rect_combine internally does the combine step by first collecting things at the root and then just using map_rect_combine.

You see, things are intended to be rather modular.

From the two points you put down: Let me first ensure that the map_rect_serial build without any MPI stuff. While I have tried to respect the includes, you can probably do this part better than I did.

I hope that makes sense. Let me know if things are unclear.

I am not sure where you have been seeing a problem. I was able to compile & run test/unit/math/prim/mat/functor/map_rect_test.cpp without having STAN_HAS_MPI being defined. I added a test/unit/math/rev/mat/functor/map_rect_test.cpp to verify the same thing there. So I think the code is MPI free whenever you do not define STAN_HAS_MPI, but I could have missed something.

Here again, quickly checking things, I thought to have done the right thing. So whenever I am in prim then I only ever include things from prim and nothing else. I thought that was the right way, no? Or have I missed something? What I havenā€™t done yet is run make test-headers on the new code as the speed of rewriting things was to fast for me in the past; I hope this settles now.

That wonā€™t work with Stanā€”we donā€™t have a ragged vector type yet.

Thatā€™ll cover our current functions. It wonā€™t allow lambdas, though, which produce what is essentially a functor.

Great. I must have misunderstood what I was reading.

That sounds good to me.

Itā€™s great you coded it that way.

OK, I think that answers my question above, which Iā€™ll pull down here just for confirmation.

By ā€œworks exactly the same wayā€, does it depend on MPI and just run multiple processes on one core? Or do you just mean that it reduces the autodiff tree by doing nested autodiff and producing smaller expression graphs overall? If itā€™s the latter, thatā€™s fantastic. Even if itā€™s the former, thatā€™s good to know and would be good to know how it scales. 70% faster code is fantastic and itā€™d be nice to exploit this trick elsewhere.

Yes, that looks right. I was confused about where the file was that I was I was looking at. Itā€™s in mat where it should be.

I was confused by the MPI includes, but now I see whatā€™s going on with the ifdefs. I always find it confusing to keep in mind that not everything in a file will be seen outside the preprocessor.

While there is no ragged vector type yet, we can still return a ragged array in the sense that the output length from each job can be different and we just paste things together to a big output vector. Since the user must know the output sizes himself he will be able to make sense of the large vector being returned. I favored making this behavior explicit in the argument list by including an additional int array with the output count per job as argument to map_rect. @seantalts felt very strong about not doing so as we can figure out the output sizes as we execute things - and I think he has a good point to not over-burden the user and I was able to figure out a solution for this (since not knowing output sizes makes things a bit more tricky).

I mean that my map_rect_serial version performs nested autodiff which make the global expression graph smaller and this type of approach allows to reuse the smaller nested graphs in cases whenever that is possible (if you have many outputs from a given job). That 70% is really cool, but I am not sure yet if this generalizes to other models as well - I would hope so.

Some progress:

  • I have dumped boost things into the feature/concept-mpi-2 branch such that a make stan-mpi builds whatever is needed for boost. Only the mpi installation (openmpi or mpich) needs to be on the local system. See MPI_NOTES for some details.

  • I also aligned the cmdstan branch feature/proto-mpi to work easily with this simplified setup. However, I had to hard-code Mac specific instructions into make/models to ensure that the dynamic boost libraries can be found. I tried to get around this using rpath linker options, but I failed ā€¦ maybe someone else has experience with this?

In short: The boost build is now bundled in the stan-math branch. It does work nicely, but I have no clue how we can make the linker link against the dynamic libraries with absolute paths (I never got static linking working with mpi/serialization). The current makefile additions are very basic and probably need refinement.

Finally, I also addressed recursive calls to map_rect which would have lead to program termination so far. So when we have nested map_rect calls, then only the outer map_rect call will be using MPI. The inner map_rect will just use a serial version of map_rect. I am not sure how I can test this behavior adequately with google testā€¦

@seantalts ā€” want to check the build stuff for this?

Since the MPI code is fairly mature (from my view) I applied it now to one of our bigger models. The runtime on a single core with Stan is 7h-9h (chain variation) and on 20 cores the execution time goes down to 35-45 minutes. So about 12x-14x faster on 20 cores! The problem uses analytic approximations to an ODE problem and includes >1000 subjects. Needless to say that the results are exactly the same.

This a huge step forward as it changes the usual model development iteration process from a once-per-day process to a 3-4x models per day iteration thing.

Now, I have to say that coding this up is not super nice as you have to force all the data into rectangular structures and then be very careful when you access the data to avoid that data gets casted to var internally. But my hope would be to fine tune some Stan utility functions which make data conversion and extraction easier.

Overall this is really nice. No more worries from compute intense models as long as you can throw hardware at it.

2 Likes

Iā€™m actually working on the build stuff for both this and GPU. I have two goals:

  1. make it easy to configure and use MPI and GPU
  2. make sure it doesnā€™t break for everyone else.

Itā€™s taking a while because thereā€™s been feature creep happening in the makefile over the years and in order to make sure we can continue to have users on all platforms work without breaking, Iā€™m having to rebuild the makefiles.

For anyone else thatā€™s interested in the build process, we can start a new, dedicated thread.

Iā€™m paying the price of being late by going through this long thread.

Thanks to @wds15 Iā€™m also going through his branch. Hereā€™s my understanding of how slaves and master step through the communication:

  • Slaves
    • cluster listen
    • receive mpi_command through broadcast
    • construct slave version of mpi_parallel_call
    • reduce
    • gatherv
    • cluster stop_listen
  • Master
    • map_rect_reduce
    • mpi_parallel_call
    • send mpi_command through broadcast
    • reduce
    • gatherv

Am I getting this right? Also, the reduce uses ReduceF followed by CombineF, so itā€™s like containing both map and fold functional. I was mislead by the name for quite a while. Whatā€™s logic behind this naming?

Another question is about the setting of listen(). To me it seems like a blocking operation: the slaves stay in listen mode until the cluster goes out of scope. Does this imply a slave has to wait till all the others finish before it can stop listen?

1 Like

Re: names - I think those are maybe my fault. They follow Clojureā€™s reducers naming conventions. I think every language uses something slightly different, and many of them donā€™t have this separate combination step. Hereā€™s what they mean:

ReduceF is your standard reduction operator (from e.g. fold in Haskell) with Haskell style signature accumulator -> element -> accumulator. So reduce gives ReduceF an accumulator and an element from the collection to be reduced over and it returns an accumulator. Map can be implemented on top of this where the accumulator is a list and ReduceF is append.

CombineF is the final aggregation step combining the results of all of the reduce calls on each node. If we were only allowing reduce with associative ReduceF, we could just use ReduceF here. An example of associative ReduceF is + used something like sum = reduce(+, 0, [1, 2, 3, 4]). But you canā€™t implement map on top of a reduce restricted in such a way, because list append is not associative. Versions of fold that arenā€™t very parallelism-aware seem to skip this step, I think, but itā€™s important when youā€™re thinking about how the work is going to get farmed across many machines and then recombined (unless you do what many frameworks do and limit the accumulator types to something with an obvious CombineF step, like list concatenation for lists or dictionary merging for dictionaries like in Googleā€™s MapReduce framework).

What names would you suggest? Itā€™s pretty quick to change them now before the PR goes in.

Thanks. Your link really clears things up, as Iā€™m not familiar with clojure lingo. IMO there no need for renaming.

A few details on the master/slave questions you had: What I have essentially recreated is a threadpool type of implementation (without knowing it). So the idea is that the cluster goes into a listen mode which means that the slaves are waiting for command broadcast which encapsulate code to be executed on the slaves. The master initiates these commands and once a command is initiated the cluster is locked in the sense that all resources are then dedicated to the command being executed. Hence, once the command is started all following communication must be initiated and controlled by the command which is being run. Once the command finishes, the cluster will automatically return to the listen state.

The mpi_parallel_call has two entry points: One for the root and one for the slaves. Besides different entry points, the code has been written along the lines of the boost mpi library which means that the same code runs on slaves and the root, but there are very few if statements which discriminate behavior on the root and on non-root nodes (the root has rank==0). For example, the gather commands from boost mpi trigger sending data on the slaves and recieving data on the root. Doing so maximizes code which is shared as there is essentially only a single version of code which runs on the master and on the slave.

The listen operations makes the slaves waiting for commands. To me that is not really a blocking operation - the slaves just need to sit there and wait for whatever work the master sends. The design permits any command you like to send, i.e. mpi_parallel_call is just one of these, but we can easily have other commands given this design at a later point.

One last point: Currently the dispatching of the N jobs to the W workers is kept really simple in that the order of the jobs determines what goes where. So job 1ā€¦N/W is send to rank 0, N/W + 1 ā€¦ 2*N/W is send to rank 1 and so on. Should it not be possible to split into equal N/W jobs, then the remainder is split evenly over rank 1 - W-1 (so rank 0 should usually have a job less than the others). As a consequence, the execution order should be chosen by the user in a way that we split things into equal sized packages. This is really simple for us to implement and puts a bit of responsibility onto the user to make best use of the facility. However, under the assumption that data sets come in at random order, this allocation strategy should do fine (and we can tweak that later on).

Finally, another key concept in this MPI business is the idea of static data. The static data is distributed a single time only and then reused.

Let me know if you need more details.