MPI Design Discussion

Ah, sorry - was talking about using reduce as a backbone for map_rect - was leaving the _lpdf discussion for later.

(You can implement map on top of reduce with something like std::vector<var>.push_back as the binary operation and an empty vector as the initializer. You could also create a custom binary operation that takes care of things like nesting the autodiff, etc.)

I might try out a simple example if I can get it to compile on my machine… I’m mostly worried about the data being available globally on each node. It would be a huge win if we could not introduce our own code to do mapping, data distribution, etc. and rely on a higher level primitive like this.

Well, I am curious at your results. However, I have to say that I went the complete reverse in that I initially used some higher level functionality of boost::mpi which relied on serializing struct’s. Once I figured that one can avoid all that C++ abstraction by using the right boost::mpi calls which map straight to the C implementations, I tried that and I got much better performance. So in essence I would recommend to

  • use scatterv to transfer from the root chunks to the childs
  • use gatherv for the reverse process
  • use broadcast for the rest

Are you worried about the global data on the childs by the fact that it is global or is your concern that it’s too much data?

Using as “reduce” a push_back operation is creative! From my experience I am quite skeptical if we get good performance out of such an approach which is not low-level, but I agree that it would probably be easier to code up and maintain which needs to be balanced by the performance penalty. I would be surprised if there is no price to pay for the abstraction in this setting. Let’s see…

Yeah, you’re probably right about the performance, but I think it’d be good to see some timings to see how it affects things for this and my own future reference.

So the worker nodes will start up without any data files or data available, is that right? I guess they’re all running some separate main that just sets them up to receive commands and data and then everything that isn’t code has to be sent over the wire? This is slightly less encouraging evidence against reduce being within reasonable striking distance performance-wise, unless we do as you have already done and copy the data separately… I just got your branch compiling, so I’ll play around with it a little and see what happens.

Okay, so I played around with mpi and reduce a bunch and boy oh boy, I did not realize how low-level this API was. For example, I thought reduce was a high level function, and it sort of is, but it is constrained in that the type T of items in the collection to be reduced over must be the same type T that the reduction operation returns! So my idea about implementing map via reduce only works in other languages (where the type signature is (a -> b -> a) -> a -> [b] -> a, i.e. the accumulator is of an unrelated type to the list elements).

All this said, I do like the idea of making composable building blocks instead of big specific-purpose functions that get copy-pasted to make new functions that are only slightly different. I will try to make some we could use here…

I am not surprised of your finding. After all MPI is a very old technique developed in the 90s for high-performance computing where low-level approaches were very common.

I tried to already compose some building blocks for the mpi infrastructure (handling workers doing what I want and the very basic scheduling). So you intend to break the distributed_map_rect struct (if I recall right) into smaller pieces which we can use more flexibly? Sounds good to me. However, having Bob in my ear saying copy twice, then generalize would justify a first version based on what is there now, I think… but maybe there is an obvious way to modularize.

That’s going to depend on the size of the parameters and bus arrangement or network arrangement on a multi-core setup.

I also didn’t see that.

How do we send data and dependencies with closures? This is the piece I don’t understand. Sebastian was suggesting that we somehow make a structure for the data and then retool functions to use that environment. I’d rather not do that unless absolutely necessary as it duplicates all of our maintenance for this one application. It’s important, but I’m very reluctant to add all the code duplication.

Won’t this also depend on the size of the Jacobians?

I didn’t intend to be an ear worm! Generally, you want to reduce copies, but overall, copying things is relatively cheap compared to everything else we do.

Well, the fact that MPI introduces separate processes is a blessing in that AD can run in parallel, but this makes things harder.

I think we will save ourselves a lot of trouble if we do not allow functions which are wrapped in closures to be given to the map_rect function. It’s simply really hard to ensure proper serialization of these objects - let alone making this efficient performance wise. Remember that MPI seems to be only efficient with copying array chunks; at least this is my experience.

I do not see this as a huge limitation, because we can still use closures - inside the mapped function.

Would that be a hard limitation?

To me that’s a fair restriction for getting the benefits of MPI.

wrt. to the need for a _lpdf version: Of course, my conclusion only holds for the example I looked into which would be a very typical use case for my applications. Moreover, for higher-order AD, the benefits of a _lpdf version with shared parameters will be a lot larger, I think.

Re: Closures: Agreed we don’t want to get into anything resembling serialization of closures, sorry! So yeah, at least in something similar to the current API we wouldn’t want to pass in a closure and expect the data referenced inside to be serialized appropriately. Maybe there is some version where data is already in scope and on the machine and so closures are okay… But sorry, let’s ignore that for now.

Good.

Although, the closure thing will come back to us in a different way when we want to do the more user-friendly version we discussed above. Essentially, the immutable data, which is locally available on each child, needs to form a big en-closure for the functions to be called. This way we can call the enclosed functions such that we only pass in the parameter set of the given iteration, but the immutable data is taken from the enclosure.

1 Like

Okay, I think I understand a lot more now than when I started. Within this map_rect API, here’s what I might like to see in the design:

low-level independent functions:

  • flatten a vector<vector<T>>
  • unflatten a vector<vector<T>>
  • Send data to all nodes (maybe this is just broadcast) (not cached)
  • cached scatterv
  • maybe even cached scatterv for a doubly-nested vector (cached)

map_rect_mpi would look something like:

  1. broadcast shared params & data
  2. map across thetas
  3. var-related memory management (not sure where this goes exactly)

map looks like:

  1. reduce(std::vector.push_back composed with F, …)

Reduce looks like:

  1. distributes data to map over with cached scatterv
  2. Applies reduction op
  3. gathers results with gatherv
  4. Finish reductions by applying reduction op to gathered data on root node

reduce functor for map_rect might involve separate entry and exit “context manager” style fixtures as independent functions or classes:

  1. start nested & un-nest / cleanup autodiff
  2. perhaps the var memory stuff can go here?

Does this make sense? I’m happy to provide more example code as well if that helps. I was originally starting to do some refactoring on my own as its own example but figured I should put this out here first.

As I understand you intend to put this into the map-reduce paradigm which sounds as a good approach to me. The exact steps are not yet 100% clear to me yet, so some dummy design code would be great. This does not need to be fully working code, but rather spelling out the functions and how they call each other without and implementation.

The only thing which concerns me a little bit is that my code works block wise, while yours looks like it is doing things on a more granular level which may be not as fast. However, I may misunderstand your idea / my concerns are not relevant / it may still be a good thing to do your thing given that it’s probably better in abstracting things.

Some comments:

  • We already have a flatten operation which is the to_array_1d function
  • We also have a to_array_2d function which currently only works for matrices, but we should probably just add another to_array_2d for arrays which is what we need here
  • I like the cached scatterv idea! This makes sense, I am curious at how you implement it. This was meant to be cache for data, right (so T=double)? Would the cache then be for a vector<vector<double>> structure?

Other than that I am also curious how you will refactor the mpi utilities like mpi_cluster and the put forward mechanism to handle work distribution to the childs. BTW, have you seen my proposal how to lock the mpi cluster during execution of a map command by using a mutex? I think my code there should throw at runtime should you nest a map_rect_mpi call…but these things are secondary.

All in all this looks like going into a great direction. Once we have this working… should we benchmark it with the examples I have worked out already? I think it would be good to ensure that the new design work as good (or better) than what we have. Makes sense?

Great to see this moving! I am planning to throw this technique ASAP onto one of our projects.

We can punt on the closure discussion. It’s beginning to look like closures will be too confusing and too incompatible with other parts of the way Stan’s called to be worthwhile.

I think what @wds15 is suggesting here is that we don’t use closures from C++ but define our own data-variable-only binding operator for MPI. Or maybe there’s a way to grab a C++11 closure and find out what it binds so we can send just that data?

Is there a way to avoid vector<vector<double>>? It’s a very inefficient data structure compared to just MatrixXd. Eigen::Matrix and std::vector both manage memory through malloc/free in constructor/destructor (standard RAII implementation), so for vector<vector<T>>, each element is doing memory management and having constructors and destructors called compared to doing it once for Matrix. Eigen gives you views of underlying memory as matrices or vectors, which can save a lot of copying. We might also be able to use C++11 move semantics to avoid what would otherwise involve copying.

I don’t understand why map is calling reduce.

I really like the reduce semantics here as it provides the right kind of algebraic structure.

One issue we’ve discussed is reproducibility with the same seed. If we allow asynchronous reduction, we won’t maintain that unless we get fancier about how things get inserte back into memory (like preallocating for each result—we will know the size it needs for operands and partials going in, just not the value or partials).

I don’t think there’s much, if any, overhead in that organization and it provides the right kind of algebraic structure over the types.

Also it might be worth considering an Eigen::Tensor for contiguous storage with arbitrary dimensions. It is in the unsupported directory but it is implemented by Googlers and used in TensorFlow.

Slightly different from Google’s MapReduce paradigm, which maps a function over key-value pairs and then reduces over key-values pairs and returns a new value for the key. My proposal is just using the sort of more primitive map and reduce constructs from functional programming (with an added cache key or something like it). But similar enough - I wonder if the associative approach Google takes leads to easier caching…

Will do! This is pretty exciting stuff.

So I was thinking that we’d make them generic and thus allow the elements that map and reduce operate over to be themselves vectors, which might end up involving Boost’s serialization library in a naive version - could that be fine for just the non-shared parameters, since there aren’t that many of them? Can you describe in English the chunking you’re using, what each chunk is and what the reasoning is again? I think I lost track of it, I’m sorry! I can take another look later too to try to reload it all into RAM, but need to head out and wanted to get a reply out today.

This is true - I think what I mean to say was basically that you have these very similar chunks of for x_r and x_i like this:

        if(X_r > 0) {
          const std::vector<double> world_x_r = to_array_1d(x_r);
          const std::vector<int> chunks_x_r = mpi_map_chunks(N, X_r);
          std::vector<double> flat_x_r_local(chunks_x_r[R]);

          boost::mpi::scatterv(world, world_x_r.data(), chunks_x_r, flat_x_r_local.data(), 0);

          // now register data
          for(std::size_t i = 0; i != chunks[R]; ++i)
            data.x_r_[i] = std::vector<double>(flat_x_r_local.begin() + i * X_r, flat_x_r_local.begin() + (i+1) * X_r);

          //std::cout << "Job " << R << " got " << flat_x_r_local.size() << " real data " << std::endl;
        }

Seems like you could put this into a function templated on the type of data, and I was thinking maybe wrap up some of the flattening and unflattening as its own primitive but maybe that’s already broken down as much as it should be. This was intended to be something of a rough draft 😅

I was thinking a generic cache that operates on a user-provided cache key (uid in your code). This implies some Boost serialization I guess, though? Maybe we can write our own version that’s specialized for std::vector (or Eigen vectors or whatever we end up using) and does this flattening and unflattening? I dislike the way we have to send metadata separately; my hope was that we can wrap that up in a single cached scatterv call that ends up giving each back a de-flattened std::vector but under the hood does flattening, sends metadata required to unflatten, and then unflattens on each node.

So the std::mutex mpi_cluster_mutex is used to keep the root node from issuing more than one command at once? And it unlocks when it exits scope, I guess? I need to read about std::mutex but if that static declaration won’t re-declare when it enters scope, and if it will get unlocked when it exits scope, that seems like a reasonable method for preventing re-entry.

I am in favor of benchmarks as often as possible :) Maybe we can check in benchmark results with major changes, at least during development on this branch, so we can easily compare versions of the code and make decisions about performance tradeoffs, etc.

Reduce is fundamentally more general than map - map and filter can be implemented in terms of reduce, and arguably should be unless there is some reason not to. We want both map and sum here already so I think having a shared reduce primitive makes a lot of sense.

I think the version of reduce we can easily build with scatterv and gatherv will be deterministic - only a more advanced version sacrificing ordering for performance would lose this.

Hmmm… quite a lot of stuff to answer. I hope I don’t forget things:

  • I do not like nested vector stuff either for the reasons you mention. I opted for the nested vector stuff for consistency with the ODE functions and for the missing int version of a Matrix. Performance wise I do not think that we will be hurt much since the data is only ever handled once and parameters anyway need to be converted one by one from var to double. The only case where we certaintly loose performance is the double only case which is least interesting for a Stan program. However, if people are fine with a mixed interface vector map_rect(fun, vector eta, matrix theta, matrix x_r, int[,] x_i), then I am fine with using that… @seantalts Any thoughts? We should just nail that decision now.

  • My code essentially maps N jobs from the user into chunks of size N/W if W is the number of processes (world size); modulo rounding. The function which does that is map_chunks. BTW, does it make sense to split the mapping in such a way more explicit? I mean we have a big and a small split. The outer big split maps the N jobs to the W workers and the inner then processes the N/W jobs on each node which are then collected. Not sure if this makes things easier or just complicates life.

  • My current code also takes into account how the logic of boost mpi works. That is, you can execute the gatherv, for example, with the same signature on the root and on the childs, but different things happen on the childs and on the root. In turn this means that you can write a single function with few if’s in there, but the same code can be used on the root and on the childs. I think this saves some code repetition, but we do not need to follow that principle.

  • For performance tests: I can setup an ODE and a non-ODE problem. The single-core run should take ~24h and we then always compare how the scaling is when increasing the # of CPUs up to 20. The ODE case should always scale linear while the really interesting case is the analytic case where the performance will be constrained by the efficiency of our code.

  • Reproducibility won’t be an issue for map_rect which we do first. The map_rect_lpdf may not be able to guarantee exact reproducibility if the # of processes varies, but we should be able to guarantee exact reproducibility if the # of processes is the same.

  • The mutex in the broadcast_command works exactly as you described. As long as the mutex is in scope it is in effect and a second call to the function will trigger an exception such there must only be ever a single command which is running at the same time.

  • BTW, one thing which I have left out so far is that we should define macros which facilitate declaration of a mapped function call. That is, the serialization library must be made aware of those mapped functors.

  • There is one last optimization which I am aware of: Currently, I send a command, then some meta-info and then the actual work starts. We could possibly send the command directly with some meta info contained. I haven’t done that as the command mechanism which I ended up using is quite generic and I did not see an obvious way to include the meta info, but probably that can be changed

  • Somewhere you , @seantalts, mentioned that vector s should be serialized… I am not sure on that. It can be done, of course, but from what I saw it’s best to collect everything was should be send over MPI into big arrays and then send all at once. What I did is to allocate huge vectors which I hand over to boost::mpi using pointers such that it can treat it as plain array suitable for function calls using direct C MPI calls.

  • the code for caching the data can surely be templated (if we stay with nested vector for both data).

I will make my small PK/PD library compatible with this mpi stuff such that I can easily write the performance tests and should be able to try this out on a real project easily.

Boost has general array support that’s now part of the standard:

http://www.cplusplus.com/reference/array/array/

But it’s just statically (template parameter) fixed sizes.

I think we need run-time sizes for anything interesting.

1 Like

I would guess that it was just the easiest way to build a generic interface. A lot of what they were doing was sorting and aggregating counts for big sparse structures, so dictionary-type structures make sense.

That’s how static constants behave as of C++11. I just put in an issue to handle the way we do autodiff globals using this technique. It allows general singletons for class values without static initialization undefined behavior. I always mean lower-case patterns, by the way, as I can’t remember the GoF specifics.

Probably not, but being careful with this can reduce things like memory contention and fragmentation in longer-running processes if there might otherwise be a lot of std::vector construction and destruction.

How? Do the mapped elements go into the expression graph in a deterministic order? There’s no reason they would need to. We’d been discussing preallocating in the expression graph so that the “reduce” operation only needs to plug in values.

The mapping from the N jobs to the W workers is deterministic. On each worker the summation will occur in order. Putting things together at the root is then always done in order again. That should ensure exact reproducibility which I do find a very important property which I would like to maintain if we can with reasonable cost. As I said, this means same number of cores leads to the same results.

Such an approach leads to less optimal performance if the ordering of the jobs is not good, but the user is free to shuffle that to optimize throughput.