MPI framework for parallelized warmups

As mentioned at Parallel dynamic HMC merits we can improve adaptation by collecting adapt info from multiple chains during warmup. The following concept is an MPI framework to do that through master-slave paradigm.(code at https://github.com/yizhang-yiz/stan/tree/mpi_warmup_framework).
We use n+1 processes to run n chains, with proc 0 being master and the rest being slaves. Each slave runs a chain that communicates with master every m iterations during warmup. That is, at iteration j*m, j=1, 2, \dots, master proc and slave proc do:

  1. Slave i generates adapt information and send it to master. The adapt information to be sent is defined by functor F_s:
struct Fs {
  template<typename Sampler, typename Model>
  void operator()(Sampler& sampler, Model& model,
                  Eigen::MatrixXd& adapt_info, int slave_id) {...}
};
  1. Master collects adapt information from each slave, and use it to generate ensemble adapt information, through functor E_m
struct Em {
  template<typename Sampler, typename Model>
  void operator()(Sampler& sampler, Model& model,
                  Eigen::MatrixXd& ensemble_adaptation_info) {...}
};
  1. Master distribute ensemble adaptation information to slaves.
  2. Slave receive new adaptation information and combine it with its current adaptation through functor G_s
struct Gs {
  template<typename Sampler, typename Model>
  void operator()(Sampler& sampler, Model& model,
                  Eigen::MatrixXd& adapt_info) {...}
};
  1. Slaves do another m iterations before next round of ensemble adaptation.

Interval m and functor F_s, E_m, and G_s define the ensemble adaptation algorithm. But the asynchronized parallel communication framework remains the same(which is the point of the design).

Which an algorithm defined, the run_adaptive_sampler would be something like this

  for (i = 0; i < num_intervals; ++i) {
    if (is_master_proc) {
      // construct master
      warmup_dynamic_loader_master master(warmup_comm, internval);
     // master recv/send adapt info
      master(sampler, model, f, g, h);
    } else {
      // construct slave
      warmup_dynamic_loader_slave slave(warmup_comm, internval);
      // slave move forward
      util::generate_transitions(sampler, interval, nbegin, num_warmup + num_samples,
                                 num_thin, refresh, save_warmup, true, writer, s,
                                 model, rng, interrupt, logger);    
      nbegin += interval;
      // slave send/receive adapt info
      slave(sampler, model, f, g);
    }
  }

Tagging @betanalpha and @Bob_Carpenter for the soundness of the design and possible algorithm(functors mentioned above) to try.

3 Likes

As an initial side note, I recommend not using the master/slave terminology. We had this discussion previously and ended using worker for the latter but I can’t remember what we did for the former.

There are two important questions to address here.

The first is parallelization technology – are we going to use MPI or threading with the TBB? Most of the discussion so far has been with threading and the map_rect speed up from using TBB instead of MPI was sufficiently large that I am currently inclined towards pushing everything to threading.

The second question is persistency and communication. Your design here presumes that each parallel chain is persistent and communication is handled with functors/callbacks. The alternative is to have each chain segment run until completion then return the samples (or even just adaptation info), letting main process handle the creation and configuration of new chain segments. My personal inclination is towards the latter because if we’re not saving warmup iterations then there’s no reason to have persistent states, and all the communication can be handled by receiving outputs/reconfiguring inputs.

For your particular design my only major question is whether the Sampler instance needs to be passed in each functor. If the MPI process is persistent won’t it have it’s own sampler and hence be capable of independently consuming and incorporating the global updates to adaptation info?

wasn’t involved in the previous discussion, why did we forgo an established term(see, for example, 3.13 of https://www.google.com/books/edition/Using_MPI/xpBZ0RyRb-oC?hl=en&gbpv=1&printsec=frontcover)

The first is parallelization technology – are we going to use MPI or threading with the TBB? Most of the discussion so far has been with threading and the map_rect speed up from using TBB instead of MPI was sufficiently large that I am currently inclined towards pushing everything to threading.

I believe @wds15 mentioned somewhere that TBB is not in conflict with MPI, though that’s not my argument. The models I’d like to solve may involve dedicated 3rd party/external solver utilizing thousands of cores for a super AD node. How would current TBB design address that? In practice it’s easier to have a cluster node to dispatch fine grain jobs to threading, no the other way around.

Why is persistency a disadvantage? This is distributed parallelization, at the end of warmup slave nodes can just move on to sampling.

Sampler is not passed around through MPI, it’s passed locally as an argument so that it’s adaptation information can be retrieved and passed to master. Without passing sampler, how does F_s know nom_epsilon_ if we want to get current stepsize at certain slave node?

I agree that they’re not in conflict, but I personally would prefer to have one common parallelization framework to minimize what developers have to learn and increase maintenance capabilities.

In my opinion because it makes the logic harder to follow. I’d rather the code for handling the reading of adaptation info from previous chain segments, the updating of global adaptation info, and the reconfiguring of each new chain segment all happen in the same block so that the parallel chain code only has to worry about sampling.

What does adapt_info do, then? Something seems redundant to me.

Was expecting a technical argument, didn’t see this coming. Term change ok to me then.

Like said earlier, there’s need to have high-level distributed parallelization for many problems. We shouldn’t expect every developer to know every part of Stan.

I agree. And this design is to suit this pattern as much as distributed computing allows. Note that we already have MPI logic, so there’s no reason to believe this is harder than what’s now in math.

It contains the adaptation information retrieved from sampler and ready to be passed to master. Take stepsize for example, it would contain local epsilon and counter_.

I was not aware that @yizhang works on this. In fact, I have started this threaded warmup already and things are working on a branch I have locally; so sorry for not sharing earlier.

My plan was to go with threading only for this, since it is much easier to code up. Moreover, the threading with TBB is available on all of our platforms by a mere makefile configuration switch. No need to deal with setting up MPI, which is a pain to do for most people (and not even available on Windows). That said, for super-large stan runs MPI is better as it can scale across machines.

Anyway… I realised that we should actually first design things such that it

  1. easily allows to parallelise with threads
  2. we setup things such that we actually gain from this threaded warmup

So what I have figured would make sense here is that I address point 1 with the following logic:

  • We sample num_chains which is by default 1
  • Sampling multiple chains means that we increase the chain_id accordingly => so when we used to call id=0 to sample, we will now sample id=0,1,2,3 when num_chains=4
  • the run_sampler methods then sample the different chains in turn. At the moment we sample in the run_sample method by looping over the iterations. Now we do the same, but nest the loop over the chains into that. So this becomes
for i in 1:num_iter {
   for j in 1:num_chains {
      ... get one iteration for chain j ....
  }
}

The logic used to now pool the adaptation info is to stop this loop at the end of each adaptation window. At the moment I am simply taking the average of the learned diagonal of the mass matrix - the stepsize stays untouched per sampler. This is probably not ideal yet.

The advantage of this approach is that the output is automatically ordered just as we specified it earlier (iteration x, chain 0-3, iteration x+1…) and this scheme can easily be paralleled using threads. There is only chain-snchronizaion needed at the end of the adaptation windows and one needs to think about some buffering scheme to deal with the asynchronous output being generated.

However, before now throwing in threads I would like to get a handle on what makes sense. The above scheme already works in my local branch - I just haven’t implemented the threading yet as I think we should first learn what we can gain from all this extra work. The benchmarks I have in mind are

  • Run 1 long chain with 1000 warmup / 1000 iteration
  • Run 2 half as long chains 500 warmup / 500 iteration each => same number of iterations
  • Run 4 chains 1/4 as long so 250 warmup / 250 iterations each => same number of iterations

This needs to be run on a reasonable example for many times. This way we can learn what pooling strategy is a good one. I would very much welcome comments on this benchmarking strategy (and how to pool the adaptation info) as I think this is really the first step here to learn what’s the gain here.

@yizhang I would think that doing all of this with threads only is much easier to implement. We should probably work on how MPI resources are shared across samplers which run in threads. Splitting MPI resources cleverly is something we anyway need.

What?

The MPI communication framework above was setup rather quickly, and it’s independent of current Stan’s MPI design.

Not sure I understand what you mean by this.

I agree. I was actually talking to @seantalts yesterday on we should work out a design doc for parallelization hierarchy, partially because current MPI setup in Stan isn’t really up to what I’m doing here(which is why I designed it independent of map_rect). I’ll write out something for this.

1 Like

MPI is not available on Windows for stan at the moment since I never managed to get boost mpi working on that platform. That does not mean its not possible, of course.

Managing resources for parallelisation is something I would ideal just leave to the TBB. That’s one argument from my perspective for the thread pool approach where you just dump your work. How MPI blends into that is something we should work out if MPI is supposed to work in such a setting… but I better stop here as I don’t want to derail the thread. We should probably start a new discourse thread for that.

As I pointed out at early stage of MPI, we don’t need boost. All my MPI implementation doesn’t require boost MPI and works on MS MPI.

How does TBB manage distributed parallel resource?

Yes. I’ve said this earlier, we’re not doing things in a coordinated way.

FWIW, I have Boost::Mpi working on Windows right now in my CMake monorepo. It is also true that it is a huge pain as a dependency.

1 Like

From Bob’s thread on parallel performance It sounds like the goal here is to decrease the time to ‘convergence’. Could we do something like

  1. For K chains, do adaption for N iterations and sample for M iterations, saving the mass matrix
  2. Calculate ESS
  3. Rerun with supplied mass matrix for additional N adaption iterations and sample for another M iterations
  4. Goto 2 until some arbitrary number of adaption steps have happened.
  5. Take ESS and divide it by the number of M samples to get a normalized ESS
  6. Divide that by the number of warmups to get a Normalized ESS per adaption iterations.
NESS= \frac{ESS}{Samples} \\ Hurdle = \frac{NESS}{Adaption}

5 and 6 I’m just sort of throwing out there, but it’s nice because it at least sets some form of a lower and upper bound on the performance metric (max is (1/1) and min of 0 can be either poor ESS or adaption going to infinity). Though I’m not sure it’s that interpretable which is a shame. It also can’t handle 0 adaption steps (though we could just slap a 1 in the denominator)

Would wall time be a metric here? It could be nice as I’m not sure how much the overhead vs. any data we can share between chains without making copies would effect overall speed.

1 Like

Well, I need boost MPI to implement what I did…much cleaner interface. The map_rect stuff in math is my very first MPI thing I did. I haven’t committed to windows because it is beyond me to setup MPI on windows and there is more trouble with things under Windows. I would assume that you can make it work under Windows - with boost MPI and the current source it should work; someone “just” has to work out the build hell.

Not at all to my knowledge.

Cool!

then why do you propose

I am not against adaptive warmup at all. I would just prefer to go simple first.

A problem with all these calculations with the ESS is that we are adapting the tuning parameters of the sampler continuously… so all of this gets relatively hand-waving to my understanding.

Having as a starting point a static scheme which pools together the information along the windows we have already defined appears to me like a sensible thing to do.

Maybe it’s time to refactor that part of Stan then.

1 Like

Think of the TBB like something which you just throw work at it on a local machine - and it will handle it for you. MPI should be used to distribute work between machines while the TBB is used for distributing work across machines.

So we could have run multiple chains on the head node which we control with the TBB. Each chain then has a number of MPI workers assigned. That could make sense, no?

Other than that - I really do think that TBB coding is much easier than MPI coding, MPI is a tool from the C past. The TBB blends nicely into C++ and is much easier to deal with for us C++ developers.

Sure - go ahead! A refactor which does a better static data distribution which is better integrated with the stanc3 parser is an obvious thing to do. I am personally hesitant to put resources into MPI. We have threading under control, it’s fast, easy to setup, easy to code - and there is a ton of performance to gain from these 48 core CPUs nowadays. So I do that.

1 Like

Pretty sure you cannot do that in Stan’s current MPI setup without introducing MPI bottleneck, namely one chain is taking over MPI workers while other chains waiting. So no, it doesn’t make sense to me.

Sorry I’m not following what this means. Isn’t the main idea to pool information across adaptation across the chains? It may just be that I have not looked over the algorithms stuff enought.

Confused, I thought the mass matrix was the main thing we get out of the adaptation step? If that’s the main thing then yeah what I’m proposing above is doing adaptation in chunks. If there is other stuff we would need can we also have stan pull that out at the end of adaptation so it can be stopped and restarted?

I’m for whatever as a first sketch of a benchmark! Might be good to lay out the rough steps of the benchmark you have in mind

It feels like I would want to flip flop this in a different way and give MPI N chains to distribute across machines then use the TBB within each MPI process. Then the chains only need to pass back and forth a small amount of info about adaptation stuff across machines. <- is under the assumption we have map_rect use tbb so we are not interweaving mpi to tbb to mpi

1 Like

I agree that this is a good opportunity for design docs here!

We have the design for how to run and manage chains in parallel from within Stan, which parallelization architecture they should use, and checkpointing/callbacking to allow for adaptation.

Then we have the actual adaptation strategy itself, which is independent of the parallelization implementation modulo the intra-chain communication issues. I think the strategy can be developed independently before an implementation is considered, however, allowing both to make progress on designs for each without needed decisions on either yet (and avoiding redundant work in the meantime).

1 Like

What do you think about the benchmarks I am suggesting?

Before doing any work here we should settle on how we evaluate it first. That’s my view. So I would like to know first if all the effort here is really worth doing it. It sounds like a no-brainer that we want this - yes, but we must be able to articulate first the advantages and how we measure them. So comments on the benchmarks are very welcome. I am happy to start a new thread.

Finally, before writing a design doc, I would much prefer to have a prototype around. That does not need to be the final implementation, sure not, but it will lead us through all the obstacles we will have. At least for me who has never worked on stan services it too a while to get the idea of the interface (I am still not sure if it did sink in).

The design we come up with needs to fulfil quite a few constrains

  • fit into stan services API
  • enable easy parallelisation (synchronisation)
  • enable the pooling adapter to get the information they need easily

That are quite a lot of constraints.

I would, personally, not start discussing the parallelisation technology. I would simply use the TBB and that’s it. MPI is too complicated to setup for users and too hard to program. Coding something up which fits MPI or TBB is great - but that will likely slow down this feature significantly and we should not hold this back.

1 Like