sure. the files in arr are the cluster handling things which do not have any dependency on Eigen. Hence I placed them there. I just tried my best guess at where it should go, but if you think mpi_* should go in a single directory, then we can do that.
To work with Stan, the map_rect functions are going to need to accept functors, not just template parameters. Iāll build the map_rect_serial in its own file. Right now, the serial version looks like itās implemented with parallelism. From the Stan language, weāll require a simple function (that is, not an lpdf/lpmf/lcdf/lccdf, not an rng, and not an lp modifier).
Iāll create a very simple direct implementation of all of this. I seem to recall you mentioning something about wanting the autodiff stack to be the same each way. That seems like a bad way to implement the map non-serially, but it could probably be done. Iāll do the simple version first and then we can think about the more complex one after youāve implemented the parallel version so I know what the target is going to be.
reference to the messagesāwe should be converting everyting to references going forward
for the second, we do not want to be checking if pointers are NULL everywhereāitās super error prone and also too verbose. Weāll be moving everything to pointers.
This does not even try to get the exact same answer as the MPI version.
As far as functors go, this is a hard choice that can go two ways:
We figure out how to serialize proper functors. This would let us use lambdas and closures. Sounds like itās complicated.
We just pass type information and forego using lambdas in the code. This will work with what we have now, as our functors are stateless and have nullary constructors. It wonāt work with lambdas. I was really hoping we could move to lambdas so that we could pass system functions with implicit reference to parameter and data variables in scope (rather than having to pack and unpack).
The implementation on the branch doesnāt match the design document:
The design doc passed in a functor (though somehow it got registered as a non-referenceāit should be a constant reference, not a copy).
your implementation of map_rect_serial would allow only a single value to be returned by each function call. However, the function signature of the user function returns a column Eigen vector of arbitrary length. Each job is allowed to return an arbitrary number of elements in that vector such that the final output is a ragged vector. A implementation of that which is very straightforward is in map_rect.hpp on line 121, see here.
the functor is passed as type only at the moment which we agreed a while ago on. Thinking about it, we should be able to support functors efficiently which are static such that we would only need to serialize these once. However, this would require us to make these functors serializable via boost. I doubt this will work with lambdas and it will certainly require additional serialization code. Looks like a headache to me.
map_rect should compile without any MPI stuff if you do not define STAN_HAS_MPI. In case it pulls in any MPI stuff whenever STAN_HAS_MPI is not defined is a mistake and that should be easy to fix. The design has this specifically in mind to keep things separate.
the map_rect_serial implementation , the one here, is not parallelized, but it does work exactly the same way as the MPI implementation. I do not understand why it is a bad idea to do like this. After all, this implementation cuts down a larger problem I gave it from 17h down to 10h on a single-core run. I explain below the rough design in a nutshell.
So the basic idea @seantalts and myself developed was to split the map/reduce operation such that we essentially have two steps
a reduce: this functor is passed in as doubles only all things needed to calculate results from a single job (shared & job-specifc params/real data/int data). The function is expected to return as Eigen Matrix all outputs which are column major ordered. So each column contains the result of the functor plus additional stuff like the gradients of the functor wrt to the operands which are vars.
a combine step: this functor is given all the results computed and is responsible for inserting results into the AD graph on the root.
As you see, the first reduce step is fully ignorant of MPI. The second combine step however can be split into two parts. The first is to collect all local results on the workers on the root. This first part requires MPI. The second part is again independent of MPI.
This is the reason why the map_rect_serial uses map_rect_reduce as reduce step and it uses map_rect_combine as combine step. Both are MPI free. On the other hand, the map_rect_mpi uses map_rect_reduce (the same as before, no difference) and mpi_map_rect_combine. the mpi_map_rect_combine internally does the combine step by first collecting things at the root and then just using map_rect_combine.
You see, things are intended to be rather modular.
From the two points you put down: Let me first ensure that the map_rect_serial build without any MPI stuff. While I have tried to respect the includes, you can probably do this part better than I did.
I hope that makes sense. Let me know if things are unclear.
I am not sure where you have been seeing a problem. I was able to compile & run test/unit/math/prim/mat/functor/map_rect_test.cpp without having STAN_HAS_MPI being defined. I added a test/unit/math/rev/mat/functor/map_rect_test.cpp to verify the same thing there. So I think the code is MPI free whenever you do not define STAN_HAS_MPI, but I could have missed something.
Here again, quickly checking things, I thought to have done the right thing. So whenever I am in prim then I only ever include things from prim and nothing else. I thought that was the right way, no? Or have I missed something? What I havenāt done yet is run make test-headers on the new code as the speed of rewriting things was to fast for me in the past; I hope this settles now.
That wonāt work with Stanāwe donāt have a ragged vector type yet.
Thatāll cover our current functions. It wonāt allow lambdas, though, which produce what is essentially a functor.
Great. I must have misunderstood what I was reading.
That sounds good to me.
Itās great you coded it that way.
OK, I think that answers my question above, which Iāll pull down here just for confirmation.
By āworks exactly the same wayā, does it depend on MPI and just run multiple processes on one core? Or do you just mean that it reduces the autodiff tree by doing nested autodiff and producing smaller expression graphs overall? If itās the latter, thatās fantastic. Even if itās the former, thatās good to know and would be good to know how it scales. 70% faster code is fantastic and itād be nice to exploit this trick elsewhere.
Yes, that looks right. I was confused about where the file was that I was I was looking at. Itās in mat where it should be.
I was confused by the MPI includes, but now I see whatās going on with the ifdefs. I always find it confusing to keep in mind that not everything in a file will be seen outside the preprocessor.
While there is no ragged vector type yet, we can still return a ragged array in the sense that the output length from each job can be different and we just paste things together to a big output vector. Since the user must know the output sizes himself he will be able to make sense of the large vector being returned. I favored making this behavior explicit in the argument list by including an additional int array with the output count per job as argument to map_rect. @seantalts felt very strong about not doing so as we can figure out the output sizes as we execute things - and I think he has a good point to not over-burden the user and I was able to figure out a solution for this (since not knowing output sizes makes things a bit more tricky).
I mean that my map_rect_serial version performs nested autodiff which make the global expression graph smaller and this type of approach allows to reuse the smaller nested graphs in cases whenever that is possible (if you have many outputs from a given job). That 70% is really cool, but I am not sure yet if this generalizes to other models as well - I would hope so.
I have dumped boost things into the feature/concept-mpi-2 branch such that a make stan-mpi builds whatever is needed for boost. Only the mpi installation (openmpi or mpich) needs to be on the local system. See MPI_NOTES for some details.
I also aligned the cmdstan branch feature/proto-mpi to work easily with this simplified setup. However, I had to hard-code Mac specific instructions into make/models to ensure that the dynamic boost libraries can be found. I tried to get around this using rpath linker options, but I failed ā¦ maybe someone else has experience with this?
In short: The boost build is now bundled in the stan-math branch. It does work nicely, but I have no clue how we can make the linker link against the dynamic libraries with absolute paths (I never got static linking working with mpi/serialization). The current makefile additions are very basic and probably need refinement.
Finally, I also addressed recursive calls to map_rect which would have lead to program termination so far. So when we have nested map_rect calls, then only the outer map_rect call will be using MPI. The inner map_rect will just use a serial version of map_rect. I am not sure how I can test this behavior adequately with google testā¦
Since the MPI code is fairly mature (from my view) I applied it now to one of our bigger models. The runtime on a single core with Stan is 7h-9h (chain variation) and on 20 cores the execution time goes down to 35-45 minutes. So about 12x-14x faster on 20 cores! The problem uses analytic approximations to an ODE problem and includes >1000 subjects. Needless to say that the results are exactly the same.
This a huge step forward as it changes the usual model development iteration process from a once-per-day process to a 3-4x models per day iteration thing.
Now, I have to say that coding this up is not super nice as you have to force all the data into rectangular structures and then be very careful when you access the data to avoid that data gets casted to var internally. But my hope would be to fine tune some Stan utility functions which make data conversion and extraction easier.
Overall this is really nice. No more worries from compute intense models as long as you can throw hardware at it.
Iām actually working on the build stuff for both this and GPU. I have two goals:
make it easy to configure and use MPI and GPU
make sure it doesnāt break for everyone else.
Itās taking a while because thereās been feature creep happening in the makefile over the years and in order to make sure we can continue to have users on all platforms work without breaking, Iām having to rebuild the makefiles.
For anyone else thatās interested in the build process, we can start a new, dedicated thread.
Iām paying the price of being late by going through this long thread.
Thanks to @wds15 Iām also going through his branch. Hereās my understanding of how slaves and master step through the communication:
Slaves
cluster listen
receive mpi_command through broadcast
construct slave version of mpi_parallel_call
reduce
gatherv
cluster stop_listen
Master
map_rect_reduce
mpi_parallel_call
send mpi_command through broadcast
reduce
gatherv
Am I getting this right? Also, the reduce uses ReduceF followed by CombineF, so itās like containing both map and fold functional. I was mislead by the name for quite a while. Whatās logic behind this naming?
Another question is about the setting of listen(). To me it seems like a blocking operation: the slaves stay in listen mode until the cluster goes out of scope. Does this imply a slave has to wait till all the others finish before it can stop listen?
Re: names - I think those are maybe my fault. They follow Clojureās reducers naming conventions. I think every language uses something slightly different, and many of them donāt have this separate combination step. Hereās what they mean:
ReduceF is your standard reduction operator (from e.g. fold in Haskell) with Haskell style signature accumulator -> element -> accumulator. So reduce gives ReduceF an accumulator and an element from the collection to be reduced over and it returns an accumulator. Map can be implemented on top of this where the accumulator is a list and ReduceF is append.
CombineF is the final aggregation step combining the results of all of the reduce calls on each node. If we were only allowing reduce with associative ReduceF, we could just use ReduceF here. An example of associative ReduceF is + used something like sum = reduce(+, 0, [1, 2, 3, 4]). But you canāt implement map on top of a reduce restricted in such a way, because list append is not associative. Versions of fold that arenāt very parallelism-aware seem to skip this step, I think, but itās important when youāre thinking about how the work is going to get farmed across many machines and then recombined (unless you do what many frameworks do and limit the accumulator types to something with an obvious CombineF step, like list concatenation for lists or dictionary merging for dictionaries like in Googleās MapReduce framework).
What names would you suggest? Itās pretty quick to change them now before the PR goes in.
A few details on the master/slave questions you had: What I have essentially recreated is a threadpool type of implementation (without knowing it). So the idea is that the cluster goes into a listen mode which means that the slaves are waiting for command broadcast which encapsulate code to be executed on the slaves. The master initiates these commands and once a command is initiated the cluster is locked in the sense that all resources are then dedicated to the command being executed. Hence, once the command is started all following communication must be initiated and controlled by the command which is being run. Once the command finishes, the cluster will automatically return to the listen state.
The mpi_parallel_call has two entry points: One for the root and one for the slaves. Besides different entry points, the code has been written along the lines of the boost mpi library which means that the same code runs on slaves and the root, but there are very few if statements which discriminate behavior on the root and on non-root nodes (the root has rank==0). For example, the gather commands from boost mpi trigger sending data on the slaves and recieving data on the root. Doing so maximizes code which is shared as there is essentially only a single version of code which runs on the master and on the slave.
The listen operations makes the slaves waiting for commands. To me that is not really a blocking operation - the slaves just need to sit there and wait for whatever work the master sends. The design permits any command you like to send, i.e. mpi_parallel_call is just one of these, but we can easily have other commands given this design at a later point.
One last point: Currently the dispatching of the N jobs to the W workers is kept really simple in that the order of the jobs determines what goes where. So job 1ā¦N/W is send to rank 0, N/W + 1 ā¦ 2*N/W is send to rank 1 and so on. Should it not be possible to split into equal N/W jobs, then the remainder is split evenly over rank 1 - W-1 (so rank 0 should usually have a job less than the others). As a consequence, the execution order should be chosen by the user in a way that we split things into equal sized packages. This is really simple for us to implement and puts a bit of responsibility onto the user to make best use of the facility. However, under the assumption that data sets come in at random order, this allocation strategy should do fine (and we can tweak that later on).
Finally, another key concept in this MPI business is the idea of static data. The static data is distributed a single time only and then reused.