MPI Design Discussion

We should be able to map out ahead of time where each expression is going to go in the expression graph then deal with return values and Jacobians asynchronously. I may be talking at cross purposes here, but it’d be good to get rid of any synchronization and buffering bottlenecks that aren’t necessary.

YES!!! I just mapped my small PK/PD library which is formulated using ragged arrays to use rectangular data structures. That means I should rather quickly be able to run real project problems with this approach. The key was to figure out a conversion from ragged structures to a (padded) rectangular data storage.

@Bob_Carpenter: For me exact repro is a really important thing and we should by all means make it at least possible that exact repro can be warranted. Moreover, the MPI stuff works by chunking the N jobs onto the W workers. Often N >> W as we do not have a gazillion of workers. Since W won’t be too large, the gain due to asynchrony may not be huge.

In pharma it would not be acceptable to have not exactly reproducible results. So if we can make this optional (with a performance hit then), that would be the best approach. I would expect that other fields are similar in this regard.

I thought you were saying you’d need to run everything not in parallel anyway for a final run?

But I think we can have our cake and eat it too, so to speak. We can allocate the memory for the vari in the expression graph as we send the jobs out—we know the operands and I think we know the size of the output. Then we can just fill in the Jacobian asynchronously when we get it. The final result will be identical no matter how it gets filled in.

Currently we have Stan 2.15.0 in production. So I can develop models with MPI using a dev version. At the end things have to run again with 2.15.0 serially.

Once the Stan-MPI release is out (let’s say its 2.18.0), then I will immediately ask IT to deploy that version. Once a MPI enabled Stan is in our production, I will never ever run non-parallel unless I have to.

… we do know the size if the output after evaluating the functions, but all of this is already solved in my code and I do ensure ordering in my current code.

Can you add some quotes to your post so I can tell what each bullet point is responding to? There’s a lot of stuff going on in this thread.

It’d be good to have a microbenchmark that ran quickly and didn’t use proprietary data so we can all run it locally.

I think even if we don’t use Boost’s serialization, it would still be good to abstract away the serialization and unserialization of nested vectors such that you aren’t writing the same code that does this for each set of nested vectors (e.g. x_r, x_i, theta, etc).

Uhm… I never managed to edit posts and this was just what I remember what came up.

Maybe this is good point to talk once more?

The bechnmarks I have in mind would use toy examples… nothing proprietary, of course.

If we can workout microbenchmarks, that would be really good; I used not micro, but mega benchmarks such that I always see performance as if I were to run code on a project.

So Sebastian, what do you need going forward from me? Would it help if I wrote out the API that I was imagining and we could iterate on it?

If you could sketch that API and we iterate, that would be very helpful, yes. Should we do that with dummy code? If you have some tool in mind for this (UML) I am open to learning that should that promise to boost our progress.

Two things came to my mind in the meantime:

  • We should settle on the nested vector vs Eigen structure question as one of the early decisions.

  • You suggested to write a scatterv function which can cache results locally. Maybe we go even one step further here. So if we write our own flavors of communication functions (scatter+gather), then we could make these function take care of the chunking of the data. So let’s say they take array data as input. We could then declare the first index to be the one which we use to split up over the workers. Example: We have to distribute to W workers and have N jobs to process. Then we would pass always into the communication functions arrays with the first index of size N. The scatterv function would the split the array using the first index.

Ignore my last comments should this distract from writing the API down which you have in mind.

Let me know if we need a hangout to sync in order to move forward.

UML is for OO designs to express containment and inheritance relationships.

A wiki with an API design would be better.

Yeah, I’ve never found UML to all that helpful. While I’m working on an API document, do you (@wds15) think you could go through and give everything a descriptive variable name? I’m trying to reload the context into my head and encountering the same problems I did the first time around where I have to write out and keep a glossary of variable letters and scope to meaning. It’s possible I’d finish the API doc before you finish the renaming but the renaming has to be done anyway so I figured I’d ask. Thanks!

Okay, here’s a rough draft that fleshes out my design review from here a little more:

class MPIWorld {
  boost::mpi::communicator world_;
  int callsite_id_;

public:
  MPIWorld(callsite_id): callsite_id_(callsite_id) {}

  template<typename T>
  std::vector<T> scatter(const std::vector<T>& in_values);

  template<typename T>
  std::vector<T> broadcast(const std::vector<T>& in_values);

  // F: In -> Out
  template<typename F, typename In, typename Out>
  std::vector<Out> map(const std::vector<In>& in);

  // F: (Accum, Element) -> Accum
  template<typename F, typename Accum, typename Element>
  Accum reduce(const std::vector<Element>& collection,
               Accum initial_value);
};

template <typename F, typename T>
class nests_autodiff {
    std::vector<T> apply(const std::vector<T>& shared_params,
                         const std::vector<std::vector<T>>& params,
                         const std::vector<std::vector<double>>& x_r,
                         const std::vector<std::vector<int>>& x_i) {
      start_nested();
      auto result = F::apply();
      recover();
      return result;
    }
};

template <typename F, typename T>
std::vector<T>
map_rect_mpi(const std::vector<T>& shared_params,
             const std::vector<std::vector<T>>& params,
             const std::vector<std::vector<double>>& x_r,
             const std::vector<std::vector<int>>& x_i,
             int callsite_id) {
  MPIWorld mpi_world(callsite_id);
  auto x_r_ = mpi_world.broadcast(x_r);
  auto x_i_ = mpi_world.broadcast(x_i);
  auto shared_params_ = mpi_world.broadcast(shared_params);
  //need to figure out how to access x_r et al from F
  return mpi_world.map<nests_autodiff<F, T>>(params);
}

Let me know if this clarifies things, if you want more stuff fleshed out in the MPIWorld primitives, or if some API can’t work because of some MPI constraints I’m forgetting, that kind of thing! Thanks :)

Great. Let me digest that for a moment. A few off my head questions:

  • where is x_r and x_i stored per callsite_id? I introduced a singleton for that. Should that be there as well?

  • did you mean to send the result from map through reduce? The reduce step would do the gather call and register things on the AD stack, right? Maybe one needs a nest thing there as well. Not sure yet.

Will think more about it.

I think a singleton is a pretty good way to do that, yeah (and it’s not in the above code). In this case it might make sense to have that singleton map be implemented as a static member of MPIWorld (PS that name can change, I’m not totally happy with it).

Map is just a wrapper around reduce where the initial value is a list and the reduction operation is to append things to the list. Something like this (rough sketch again):

template <typename F, typename In, typename Out>
class map_reduction_op {
  static std::vector<Out>&
  apply(std::vector<Out>& accum,
        const In& element) {
    accum.push_back(F::apply(element));
    return accum;
  }
};
...
  // F: In -> Out
  template<typename F, typename In, typename Out>
  std::vector<Out> map(const std::vector<In>& in) {
    return reduce<map_reduction_op<F, In, Out>>(in, std::vector<Out>(in.size()));
  }

Hi!

I am not that happy with MPIWorld either as this terminology is taken from the boost mpi library already; need to think about a better name.

Some more thoughts:

  • Parts of the code run remotely and I don’t quite see where you currently glue this into the framework I suggested. I would consider to make a function which runs distributed and handles the tasks of distribution, execution and collection of the double results on the root node. What is then left to be done is to register the double results in the AD stack. So which steps did you envisioned to run remotely and how to do that conceptually?

  • You are using std::vector which will probably make our life easier for abstracting things, but it comes at the cost of inefficient memory handling (and on top we will have to flatten things each time when we transfer things via MPI for which communication must be done block-wise). I have been looking at the suggested Eigen::Tensor API and it could be a great choice here: (a) memory is handled block-wise and (b) the API supports user-defined reduce operations. Would you consider this as an option?

I am currently trying to absorb your suggestion and aim at coming up with the next iteration once I see how things fit together (for me); so sorry about the many questions.

Re: std::vector vs Eigen vector - I don’t have a preference, totally fine with Eigen Vectors or Tensors or whatever if you think that is more computationally efficient for our use cases. How does reduce work for them and how would we plan to use it?

Re: parts of the code that run remotely: reduce is the only primitive that actually runs things remotely (broadcast and scatter are basically nicely-wrapped versions of the MPI functions that handle caching and chunking). Reduce will do something similar to what your lpdf function was doing. I also think we need to change it’s API, now that I think about it, to break apart the combination operation and the reduction operation, similar to how clojure’s reducers allow for this separation. Here’s the new API signature:

  // CombineF: (Accum, Accum) -> Accum
  // ReduceF: (Accum, Element) -> Accum
  template<typename CombineF, typename ReduceF, typename Accum, typename Element>
  Accum reduce(const std::vector<Element>& collection,
               Accum initial_value);

Here’s what it will do:

  1. flatten data
  2. scatter data
  3. unflatten it on the nodes
  4. execute ReduceF on each item in the chunk distributed to each node
  5. gather the results from the children on the root
  6. execute CombineF on the root to combine the accumulators built up by each of the children into a single accumulator, and return it

Does that make sense? Do you think that can work for us?

1 Like

Hi!

We are getting there… yes I think that will do what you wrote. We need those two operations: ReduceF for parameters -> function output/gradients and CombineF for merging things into the AD stack.

From looking more closely on the Eigen::Tensor library I think we should use it. It allocates memory at a single chunk in column-major format. Hence, we can convert data into this format and then just slice it up for the different nodes. On each node we then can hopefully use a reduction operation which uses their framework, see here:

https://bitbucket.org/eigen/eigen/src/e99c823108f4e1cfabd27b083501f465ec98b40d/unsupported/Eigen/CXX11/src/Tensor/TensorFunctors.h?at=default&fileviewer=file-view-default#TensorFunctors.h-95

I am not yet sure if we can use their reduce thing, because we have a dynamically sized output reduce operation as we do not know in advance how many elements are being returned from some function call. I need to check that (in case we cannot use Eigen’s reduction thing, then we just write our own).

So on each node we generate in step 4 another Eigen Tensor which is put together on the root node and the step 6 can be represented as a reduction operation as well, I think.

Let me try to get the Eigen reductions working and then I hopefully can sketch the next version.

Ahh… if you don’t mind, I would try to avoid templates where we can. For example, if we go with Eigen::Tensor then we will only ever have to deal with those objects of type double (only the integer data must stay as nested vector thing).

Just one more thought: We could make our life a bit easier in implementing a map_rect_lpdf version first. This version has the advantage that we know that only a single value per function call is returned. Returning only the log-lik value is anyway what I would recommend anyone when using MPI as this cuts down communication cost. You have any thought on this? Pro: Easier for us to implement + anyway fast; Con: not as flexible and possible even more cumbersome to program for the user.

BTW…while I don’t think we have to burden ourselfes with this as a requirement, we should be able to use this design for the ODE integrators as well (write function outputs and their gradients to an Eigen::Tensor and putting that onto the AD stack).

Their reduce has the more limited type signature reduce(const T t, T* accum) i.e. an element has to be the same type as the accumulator, which won’t work as a primitive upon which we could base map. Is their version supposed to do some kind of distributed computation? I don’t see that either.

You’re in the wrong library, buddy :P

I’m still not quite wrapping my head around what Tensors can do for us outside of being a collection that allocates memory in a nice way, but if you think it has more to offer I’m happy to try to dive in more.

1 Like

My main motivation with Eigen::Tensor is efficiency in storage - which can get when we simply use Eigen::MatrixXd. Going in more detail over your code, I think there are a few things I am missing:

  • a given function evaluation leads to multiple outputs. Since the function is user given, we do not know how many elements are returned until after having evaluated the function.

  • communication with the children should only happen with basic types (doubles, ints), but information on what things are (like var or not) should only be transferred via a type (not by element).

  • I don’t see how you work with the complication that code needs to be called on the remotes.

So here is what I would suggest:

  • before things are transferred to the children we convert stuff to basic types
  • we use Eigen matrices as storage. So what is elem so far becomes a column and the accum type is a Matrix.

Here is a draft of the code (I renamed MPIWorld to mpi_map):

template<typename F, typename T_shared_params, typename T_params, typename ReduceF>
class mpi_map {
  boost::mpi::communicator world_;
  int callsite_id_;

  // subset of data needed on local machine
  const Eigen::MatrixXd* local_x_r_;
  const std::vector<std::vector<int>>* local_x_i_;

  typedef mpi_map<F, T_shared_params, T_params, ReduceF> mpi_map_t;

  // relevant Stan types
  typedef Eigen::Matrix<T_shared_params, Dynamic, 1> shared_params_t;
  typedef Eigen::Matrix<T_params, Dynamic, Dynamic> params_t;

  // internal types
  typedef Eigen::Matrix<double, Dynamic, 1> elem_t;
  typedef Eigen::Matrix<double, Dynamic, Dynamic> accum_t;

public:
  // constructor called on root node only
  mpi_map(const Eigen::MatrixXd& x_r, const std::vector<std::vector<int>>& x_i, int callsite_id)
    : callsite_id_(callsite_id) {
    // make child nodes aware of upcoming map (triggers call of
    // distributed_apply on remotes)
    mpi_broadcast_command<mpi_distributed_apply<mpi_map_t>>();

    // scatter data to childs, use cached version if available =>
    // local_x_* is then populated
  }

  // called on the remotes
  static void distributed_apply() {
    // default construct an instance
    mpi_map_t remote_work;
  }

  template <typename CombineF>
  typename CombineF::return_type reduce(const shared_params_t& shared_params, const params_t& params) {
    // 1. convert to doubles both parameter sets
    // 2. scatter out from root the double only parameters
    // 3. apply the ReduceF per element locally: Input is elem_t,
    // output is accum_t (one parameter yields multiple outputs)
    // 4. return CombineF(accum)
  }

private:

  // entry point on remotes
  mpi_map() {
    // call scatter functions to recieve data => sets up local_x_*
    reduce<null_combine>(dummy_arg1, dummy_arg2);
    // dummy args are ignored on the remotes as local_apply scatters
    // from the root the actual work.
    // null_combine does nothing at all; just returns void.. on the
    // root we call the reduce with a combine which takes accum and
    // inserts things into the AD stack and finally returns a column
    // vector of correct type.
  }

  // scatter + broadcast defs with caching
};