MPI Design Discussion

I am not sure where you have been seeing a problem. I was able to compile & run test/unit/math/prim/mat/functor/map_rect_test.cpp without having STAN_HAS_MPI being defined. I added a test/unit/math/rev/mat/functor/map_rect_test.cpp to verify the same thing there. So I think the code is MPI free whenever you do not define STAN_HAS_MPI, but I could have missed something.

Here again, quickly checking things, I thought to have done the right thing. So whenever I am in prim then I only ever include things from prim and nothing else. I thought that was the right way, no? Or have I missed something? What I haven’t done yet is run make test-headers on the new code as the speed of rewriting things was to fast for me in the past; I hope this settles now.

That won’t work with Stan—we don’t have a ragged vector type yet.

That’ll cover our current functions. It won’t allow lambdas, though, which produce what is essentially a functor.

Great. I must have misunderstood what I was reading.

That sounds good to me.

It’s great you coded it that way.

OK, I think that answers my question above, which I’ll pull down here just for confirmation.

By “works exactly the same way”, does it depend on MPI and just run multiple processes on one core? Or do you just mean that it reduces the autodiff tree by doing nested autodiff and producing smaller expression graphs overall? If it’s the latter, that’s fantastic. Even if it’s the former, that’s good to know and would be good to know how it scales. 70% faster code is fantastic and it’d be nice to exploit this trick elsewhere.

Yes, that looks right. I was confused about where the file was that I was I was looking at. It’s in mat where it should be.

I was confused by the MPI includes, but now I see what’s going on with the ifdefs. I always find it confusing to keep in mind that not everything in a file will be seen outside the preprocessor.

While there is no ragged vector type yet, we can still return a ragged array in the sense that the output length from each job can be different and we just paste things together to a big output vector. Since the user must know the output sizes himself he will be able to make sense of the large vector being returned. I favored making this behavior explicit in the argument list by including an additional int array with the output count per job as argument to map_rect. @seantalts felt very strong about not doing so as we can figure out the output sizes as we execute things - and I think he has a good point to not over-burden the user and I was able to figure out a solution for this (since not knowing output sizes makes things a bit more tricky).

I mean that my map_rect_serial version performs nested autodiff which make the global expression graph smaller and this type of approach allows to reuse the smaller nested graphs in cases whenever that is possible (if you have many outputs from a given job). That 70% is really cool, but I am not sure yet if this generalizes to other models as well - I would hope so.

Some progress:

  • I have dumped boost things into the feature/concept-mpi-2 branch such that a make stan-mpi builds whatever is needed for boost. Only the mpi installation (openmpi or mpich) needs to be on the local system. See MPI_NOTES for some details.

  • I also aligned the cmdstan branch feature/proto-mpi to work easily with this simplified setup. However, I had to hard-code Mac specific instructions into make/models to ensure that the dynamic boost libraries can be found. I tried to get around this using rpath linker options, but I failed … maybe someone else has experience with this?

In short: The boost build is now bundled in the stan-math branch. It does work nicely, but I have no clue how we can make the linker link against the dynamic libraries with absolute paths (I never got static linking working with mpi/serialization). The current makefile additions are very basic and probably need refinement.

Finally, I also addressed recursive calls to map_rect which would have lead to program termination so far. So when we have nested map_rect calls, then only the outer map_rect call will be using MPI. The inner map_rect will just use a serial version of map_rect. I am not sure how I can test this behavior adequately with google test…

@seantalts — want to check the build stuff for this?

Since the MPI code is fairly mature (from my view) I applied it now to one of our bigger models. The runtime on a single core with Stan is 7h-9h (chain variation) and on 20 cores the execution time goes down to 35-45 minutes. So about 12x-14x faster on 20 cores! The problem uses analytic approximations to an ODE problem and includes >1000 subjects. Needless to say that the results are exactly the same.

This a huge step forward as it changes the usual model development iteration process from a once-per-day process to a 3-4x models per day iteration thing.

Now, I have to say that coding this up is not super nice as you have to force all the data into rectangular structures and then be very careful when you access the data to avoid that data gets casted to var internally. But my hope would be to fine tune some Stan utility functions which make data conversion and extraction easier.

Overall this is really nice. No more worries from compute intense models as long as you can throw hardware at it.

2 Likes

I’m actually working on the build stuff for both this and GPU. I have two goals:

  1. make it easy to configure and use MPI and GPU
  2. make sure it doesn’t break for everyone else.

It’s taking a while because there’s been feature creep happening in the makefile over the years and in order to make sure we can continue to have users on all platforms work without breaking, I’m having to rebuild the makefiles.

For anyone else that’s interested in the build process, we can start a new, dedicated thread.

I’m paying the price of being late by going through this long thread.

Thanks to @wds15 I’m also going through his branch. Here’s my understanding of how slaves and master step through the communication:

  • Slaves
    • cluster listen
    • receive mpi_command through broadcast
    • construct slave version of mpi_parallel_call
    • reduce
    • gatherv
    • cluster stop_listen
  • Master
    • map_rect_reduce
    • mpi_parallel_call
    • send mpi_command through broadcast
    • reduce
    • gatherv

Am I getting this right? Also, the reduce uses ReduceF followed by CombineF, so it’s like containing both map and fold functional. I was mislead by the name for quite a while. What’s logic behind this naming?

Another question is about the setting of listen(). To me it seems like a blocking operation: the slaves stay in listen mode until the cluster goes out of scope. Does this imply a slave has to wait till all the others finish before it can stop listen?

1 Like

Re: names - I think those are maybe my fault. They follow Clojure’s reducers naming conventions. I think every language uses something slightly different, and many of them don’t have this separate combination step. Here’s what they mean:

ReduceF is your standard reduction operator (from e.g. fold in Haskell) with Haskell style signature accumulator -> element -> accumulator. So reduce gives ReduceF an accumulator and an element from the collection to be reduced over and it returns an accumulator. Map can be implemented on top of this where the accumulator is a list and ReduceF is append.

CombineF is the final aggregation step combining the results of all of the reduce calls on each node. If we were only allowing reduce with associative ReduceF, we could just use ReduceF here. An example of associative ReduceF is + used something like sum = reduce(+, 0, [1, 2, 3, 4]). But you can’t implement map on top of a reduce restricted in such a way, because list append is not associative. Versions of fold that aren’t very parallelism-aware seem to skip this step, I think, but it’s important when you’re thinking about how the work is going to get farmed across many machines and then recombined (unless you do what many frameworks do and limit the accumulator types to something with an obvious CombineF step, like list concatenation for lists or dictionary merging for dictionaries like in Google’s MapReduce framework).

What names would you suggest? It’s pretty quick to change them now before the PR goes in.

Thanks. Your link really clears things up, as I’m not familiar with clojure lingo. IMO there no need for renaming.

A few details on the master/slave questions you had: What I have essentially recreated is a threadpool type of implementation (without knowing it). So the idea is that the cluster goes into a listen mode which means that the slaves are waiting for command broadcast which encapsulate code to be executed on the slaves. The master initiates these commands and once a command is initiated the cluster is locked in the sense that all resources are then dedicated to the command being executed. Hence, once the command is started all following communication must be initiated and controlled by the command which is being run. Once the command finishes, the cluster will automatically return to the listen state.

The mpi_parallel_call has two entry points: One for the root and one for the slaves. Besides different entry points, the code has been written along the lines of the boost mpi library which means that the same code runs on slaves and the root, but there are very few if statements which discriminate behavior on the root and on non-root nodes (the root has rank==0). For example, the gather commands from boost mpi trigger sending data on the slaves and recieving data on the root. Doing so maximizes code which is shared as there is essentially only a single version of code which runs on the master and on the slave.

The listen operations makes the slaves waiting for commands. To me that is not really a blocking operation - the slaves just need to sit there and wait for whatever work the master sends. The design permits any command you like to send, i.e. mpi_parallel_call is just one of these, but we can easily have other commands given this design at a later point.

One last point: Currently the dispatching of the N jobs to the W workers is kept really simple in that the order of the jobs determines what goes where. So job 1…N/W is send to rank 0, N/W + 1 … 2*N/W is send to rank 1 and so on. Should it not be possible to split into equal N/W jobs, then the remainder is split evenly over rank 1 - W-1 (so rank 0 should usually have a job less than the others). As a consequence, the execution order should be chosen by the user in a way that we split things into equal sized packages. This is really simple for us to implement and puts a bit of responsibility onto the user to make best use of the facility. However, under the assumption that data sets come in at random order, this allocation strategy should do fine (and we can tweak that later on).

Finally, another key concept in this MPI business is the idea of static data. The static data is distributed a single time only and then reused.

Let me know if you need more details.

But the master has the blocking gatherv in its reduce() method. Unless the master finishes gathering from every slave, it won’t be able to dispatch next job.

Yes, that’s correct. Now, bear in mind that the way I am distributing the job chunks to the slaves leads to roughly the same execution time for each slave. Recall that I am sending always large job chunks to each slave. Therefore, each slave will have it’s result block ready by about the same time as any other slave. The master must wait for all results to return in order to insert all the results in the AD tree at once. Remember that we can only operate serially on the master since the AD stack is not thread safe. Thus, I don’t think that we can do any asynchronous magic on the master.

In short I distribute all N jobs at once out to the cluster and then collect all the results.

Also: I have benchmarked the code using ODE problems and analytically solvable problems. For ODE problems the speedup is essentially linear in the number of cores and for analytic problems I am getting (for the model I tested) ~13x speedup when using 20 cores if I remember right. Getting linear speedup for the ODE case is to be expected, but the 13x when using 20 cores is not so bad after all, I think.

What I want to say is that the speedups are already nice. I have never programmed up this type of stuff and I am very happy with the performance. That said, I am sure one can do this even more elegant and clever. Right now I am expecting the user to have the jobs in an order which leads to good utilization and I think this will be the case for many problems anyway without the user doing anything special.

So we are assuming sequential processing on the master side, no load balancing concern(or leave it to users), and in need of a slave-side listening state. In that case, would a simple MPI skeleton like below be easier to reason with? (I’m using MPI not boost::mpi but the idea is the same, and I’m also skipping data scattering as well as serialization & deserialization).

Also, do we need a mpi namespace within math?

#define MPI_WORK_TAG     1
#define MPI_EXIT_TAG     2

int main(int argc, char** argv)
{
  int rank;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  (rank == 0)? master() : slave();
  MPI_Finalize();
}

void master()
{
  int njob, rank, data;
  double result;
  MPI_Status status;
  MPI_Comm_size(MPI_COMM_WORLD, &njob);

  {
    for (rank = 1; rank < njob; ++rank) {
      data = 1;                 // first job
      MPI_Send(&data, 1, MPI_INT, rank, MPI_WORK_TAG, MPI_COMM_WORLD);
    }

    for (rank = 1; rank < njob; ++rank) {
      MPI_Recv(&result, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
               MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }
  }

  {
    for (rank = 1; rank < njob; ++rank) {
      data = 2;                 // second job
      MPI_Send(&data, 1, MPI_INT, rank, MPI_WORK_TAG, MPI_COMM_WORLD);
    }

    for (rank = 1; rank < njob; ++rank) {
      MPI_Recv(&result, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
               MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }
  }

  {
    // next job
  }

  for (rank = 1; rank < njob; ++rank) {
    MPI_Send(0, 0, MPI_INT, rank, MPI_EXIT_TAG, MPI_COMM_WORLD);
  }
}

void slave()
{
  double result;
  int data;
  MPI_Status status;
  while(1) {
    MPI_Recv(&data, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

    if (status.MPI_TAG == MPI_EXIT_TAG) break;

    result = double(data)*double(data); // hard work

    MPI_Send(&result, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
  }
}

The intention is to put the MPI calls in the same level, as well as to use tag to flag the listening state. Will this design achieve the goal? If not, what would I be missing here?

EDIT: typo

I am not sure if I follow on this one. What do you mean here and how does this help us with what problem?

Again… what is the benefit of tags here? In the current implementation I am using essentially C++ types to tell the system what to do. The types are meaningful to our C++ system and the translation to what they trigger is handled by the boost libraries automatically. All I need to do is to register the types using macros. So there is also a define, but managing these lookup tables is done by boost.

So where does the current system has shortcomings and how do you address these?

Design elements of what we have are from my view:

  • mpi_cluster

    • provides management of cluster resource
    • allows execution of any function object you like (so we can easily introduce other mpi stuff besides the mpi_parallel_call)
    • no need to setup lookup tables (like tag defines). These are handled by boost serialization + boost mpi for us.
  • mpi_parallel_call

    • can call any functor you like with parameters and data
    • supports concept of static data which ensures that not changing data is only ever transmitted once and then cached locally for future reuse.
    • the parameters passed into the functors can be var or double (this changes the type of the functor), but reuse of static data is still warranted.
    • ensures that communication is done in big chunks which are fast to transmit.
  • map_rect_mpi

    • supports nested calls to the facility. Thus you can nest map_rect calls into each other if you wish so.
  • map_rect_serial

    • computes things in exactly the same way as the mpi version. So we are getting exact reproducible results no matter which version you use.

All of the above is implemented with minimal impact on the overall stan-math architecture and with minimal impact on needs to the Stan parser.

I’m probably least familiar with stan in this thread and definitely knows less then @wds15 about current design. For the problems like ODEs reported here, IMO the current design suits perfectly. My main question is what future use of MPI would emerge and how the current design would adapt to them.

For example, it seems that we are not concerned about load balancing. Will this change in the future? What if the number of tasks >> the number of slaves and we need dispatch them in a round-robin fashion? Or even further what if the number of tasks is determined dynamically?

Also, does the current design suits the needs of @anon75146577 and @avehtari for their sparse matrix project?(or do we even care?) For example, parallel matrix operations often require inter-slave communication, how do we do that in current setup?

Like I said, I don’t know if we have those concerns. If yes, I’d suggest keep things simple, by exposing MPI structures in master and slave in common SPMD fashion. If not, then all things great imo.

I think the MPI specifics are to this end only ever designed by myself. The mpi_cluster concept is a very lightweight and yet general implementation as it allows you to execute what you like on the cluster in a way which nicely integrates with C++ idioms.

I think the biggest concern we have is to get this branch into Stan ASAP since it performs darn well already. However, what you raise are valid points - so I have chosen the most simplistic scheduling you can do and more sophistication may speed up things further. Let me explain once more: If I have 1000 jobs and I have 10 cores then I will slice the work into 10 packets each of size 100. Hence all workers will receive equal amount of work. The rational is that on average each work packet should be the same computational cost (on average). Hence, this split should work ok and another upside of this way is that I can transmit all parameters block wise a single time.

In the current setup you would create a command which we initiate once the sparse matrix stuff starts to execute. Then you have full control over the cluster and can do whatever you like. You would use the full cluster to compute the operation itself and the gradients, store things on the AD stack and then things continue to execute.

Ok, I have shown the good performance already a few times of MPI, but to this end mostly with synthetic examples. I just compiled a realistic example

  • Hierarchical ODE based model
  • 1300 subjects
  • real world data set

The running time on a single core takes 2.6 days to finish. I have setup things such that the 10 & 15 core run were on a single machine while the 20, 40 & 80 core run were distributed in blocks of 10 (so 2, 4 and 8 machines) onto the cluster which is networked using infiniband. The key question to me was how well the performance scales and the result is stunning. The 64h on a single core go down to about 1h which is a 62x speedup, but look for yourself.

… and all results match exactly.

5 Likes

My biggest concern now is to get the interfaces and installs sorted. As long as the interface (here map_rect) to users is OK, we can change infrastructure later. I think we’re in good shape on the interface now as it’s simple.

We’ll probably want to move to some kind of more mature queue-based load balancing. But then there’s issue of getting the data transported. I’m not an expert, but I believe there’ll be a lot of work to do here.

This is going to be a matter of interleaving management of implicit operations called by matrix operations and user-defined parallelism.

I don’t think we can anticipate what they’re going to need yet. There are GPUs (probably not much use for sparse matrices), multi-process and multi-threading as contenders.

:-)