MPI Design Discussion


Hmm… curious on your comments, but I think I missed your point about MPIWorld which is to make it non-templated on purpose. Right?

I think we need to bit more boiler plate code to manage MPI things (BTW, this is much like a threadpool). So let me know your thought and I revise the above.


I’m not sure I understand the question.

The stuff I was writing was not intended to be the finished product :P Just a guide to the large architectural landscape features, if you will.

mpi_map seems confusing - map as a noun is usually a key value store.

I think a pattern I would like to see is that things are generic; i.e. that they can take any type. If we have to only transmit basic primitives for some MPI-related reason, then we should at least support anything convertable to double or int (and specifically var and fvar). At least ideally - if this becomes prohibitive we can only do specializations.

Some comments on the draft:

  1. We will need both CombineF and ReduceF as type parameters
  2. I’d prefer to have versions of reduce and map available that are generic and not have parameters or types specific to this problem - so for example I think there should be a version of reduce that accepts any collection of elements (again, we could code it such that only elements convertible to primitives will compile if need be).
  3. I’m not sure if I’d want to restrict the general purpose MPI primitives to working only on Eigen types, but I could probably be convinced. Using Eigen types under the hood is fine though.

I’m not sure what you mean - is there some complication I’m not seeing that would cause a split into map and reduce not to work? I definitely haven’t actually written the code that will do remote work, was hoping you could fill that into this architectural framework.



Anything which needs to be communicated over MPI should be done via arrays of primitives. Otherwise we will loose quite some performance. This is why I would like to ship around Eigen matrices and operate on columns. So to enable generic things to go into reduce it would need to be convertible to Eigen columns.

I suggest to start another branch where I put together another prototype (still without an actual implementation, but meta code). Makes sense?

The bits and pieces to do remote calls is worked out and I will fill it in.

Your point about mpi_map as a bad name is good… would mpi_map_reduce be better?



Gotcha. Branch makes sense, thanks! Exciting.

re: name… hmm, let’s brainstorm a little… It’s wrapping up the underlying MPI interface and adding caching, higher level distributed computation primitives, keeping track of the communicator, and there is one of these per higher level “callsite” which I guess is just whichever callers want to share cache. So maybe there’s a cached_MPI and an MPI_functional_wrapper in there, or something like that? mpi_map_reduce isn’t bad but it makes me think of actual MapReduce, which this isn’t quite. We also might want to add filter at some point (can also be built on top of reduce) 🤣


You seem to like capital MPI, right? I don’t mind either way, so let’s go with capital MPI.

So far I have prefixed MPI stuff which are needed for stan-math with “MPI_”. So let’s start with that. In some sense it is a helper class for doing a map reduce call… what about




Sorry, I also don’t care about capitalization, that was just my default. I think the style guide prefers lower-case mpi, actually.

The class provides more than map_rect. If you don’t like mpi_functional_wrapper maybe something like mpi_parallel_call or mpi_wrapper?


Let’s go with mpi_parallel_call then.


With C++11, static const locals are guaranteed to be constructed in a single thread. So the whole singleton is really simple as described in:

Yes, if you mean the above pattern, that’s the way to go. Member variables or functions can’t be static by definition. Static functions are nonmember class functions and static variables are nonmember class variables.

There are only two big efficiency differences. Eigen vectors can be constructed at a given size without initializing elements. Standard vectors are much more efficient to append to because you can resize. Of course, Eigen vectors are necessary if you’re going to do linear algebra and you’d have to pay a penalty to copy from a standard vector. Otherwise, they work exactly the same way as far as memory allocation is used—they both use the standard RAII pattern (not from GoF and only capitalized because it’s an acronym).

This is a general principle. The problem is that you need them for generality in code and for static evaluations. Eigen::Tensor will have a template parameter, right?

I think it’s not just allocation, it’s also simple indexing and slice/block mapping. The allocation means we don’t need collections of collections, which means we get memory locality. That’s a big deal if we have more than 1D containers. Eigen::Matrix provides the same advantages if we only need 2D structures. If sizes are known at compile time, we can use the new array classes in C++11.

Right—that’ll make accessing the columns memory local. They’ll be cheap to access via Map or .col(), but won’t be efficient to copy into full VectorXd. So it’ll depend how they get used and/or serialized over the network. A plain old matrix should be much easier to serialize than a sequence of structures.


Today I learned that “member” in C++ is the same as “instance” in Python and Java (and whatever else). Why do these languages with practically identical concepts choose different names?! I was thinking it referred to anything scoped to a class - class or instance based.

Fair - I was mostly referring to that kind of stuff vs. using the Tensor library’s “reduce” with MPI somehow in a clever way, which I don’t think will actually work out.


Hi Bob!

I think we can live with 2D storage such that a matrix is just fine. And yes, it’s the memory locality thing and to keep things in a single big chunk of memory which I care about. The Eigen matrix class has a conservativeResuze() command which allows resizing while keeping things as they were in the already existing matrix.

Unfortunately, we do not know the sizes in advance, because the user function can return as many elements as it likes. For this reason I was considering to let the ReduceF functor do the evaluation in two steps like

  1. evaluate the function. This function returns the output size and stores the result internally in an ReduceF instance
  2. then the big container where we collect things in is checked for size and potentially resized.
  3. pass to the ReduceF instance the container where we want things to be stored

The alternative is to call ReduceF which returns a dynamic sized matrix and then we copy it over. Doing the more complex thing above avoids one copy of results, but I am not sure if the extra hassle is worth it.

What I also have in mind is to cache the output sizes per job. These will usually never change between evaluations such that we can pre-allocate the correct output container size after the first call. Since we anyway cache things, this is just an additional number to save.



We could enforce that the sizes don’t change somehow, I suppose. It’s not so easy within Stan with functions to actually enforce it. Memory allocation is constant time whereas copying grows linearly with data size, so you may find that allocation isn’t an issue if you need to copy anyway. One copy of something result size shouldn’t be a big deal in this context.


If everyone is fine with enforcing the same output sizes for repeated evaluation, then I think this should be considered as it may make the design easier. We would upon the first call just save the number of outputs per job and then just use that for any further invocation. Fine with me.


Before doing that, I’d like to know that preallocating will save a noticeable amount of time.

Most of the applications will have an assignment to something that’s of a fixed size for the output.


It’s more than just memory stuff… we save on MPI communication. If the output sizes can change each time, we need to have an MPI communication for it each time. Saving a full MPI call should make a difference.


Ah, I see. In that case, we probably should lock down the size to be consistent call to call. Any suggestions on how to try to enforce that? That is, what happens if the user’s function tries to return something of different length?


I see two options:

  1. Ask the user for the output sizes per job. He anyway must know these in advance to make any use of the ragged output. This is my preferred choice as it makes the implementation a lot easier as we can share this output vector once and then just cache it as everything else.

  2. Upon first invocation we cache the output sizes.

Whenever the output sizes change in a future call, we simply throw an exception as usual.

The upside of all of this is that we can make memory allocation up-front and we save a MPI call each iteration.


I like the idea of having the users specify it. It has the benefit of making what’s going on clearer.



@seantalts your voice on this?


Can we use gatherv to get back dynamically sized output from ReduceF run on each node? And the copy you’re talking about has to happen anyway across the network from worker child to root node, right? This would enable us to make the API more dynamic and similar to how you wrote above with ReduceF returning a dynamically sized Matrix. Then on the root if the provided CombineF and the initial value passed in want to raise an exception because e.g. the initial value was a Matrix and it doesn’t support ragged arrays, that’s up to the user (of reduce).

When we call map, we can provide a CombineF and initial value that together do indeed raise some kind of exception if we want to be able to fit things into an Eigen Matrix and not something that supports raggedness.

That’d be my preference if that all makes sense - I might be missing something in practice here.


Hi Sean!

gatherv Allows you to collect on the root a variable amount of data per child node. However, you need to know how many data items you are going to collect from each in advance on the root node.

So without fixing the output size in advance what you need to do to get the results on the root is a two step process; say we have N jobs on M nodes, then:

  1. Use gatherv to collect N/M output sizes per node into a vector of length N on the root -> now we know how many outputs are sitting on each node
  2. Now we can use gatherv to collect all the outputs from the childs

If we know already in advance the output size for each of the N jobs in advance, then you can drop step 1 from above.

If we target all of this to scale to multiple machines (which I think we are doing), then we should minimize MPI communication by all means.

That is why I prefer to fix the output sizes by the user in advance. Does that make sense?