Proposed parallelism RFC - Stan language bits

Hi!

In the course of the Stan parallel design RFC it was requested to share how the proposed parallel building blocks can be used in the Stan language by the user. This bit has been left out from the RFC since we do not want to introduce higher-level functions which we would need for this. Thus, I opted to go ahead anyways and assume that we will be able to adapt a Stan 3 parser easily to what I am suggesting in the RFC. In order to facilitate continued review I am sharing here a fully working Stan program which brings the adapter code in the form of calling a user-defined C++ function.

The example is a simulated hierarchical Poisson problem:

functions {
  // runs reduce over the index range start to end. Mapping from
  // data-index set to group indices pre-stored in gidx
  real hierarchical_reduce(int start, int end, int[] y, vector log_lambda_group, int[] gidx) {
    return poisson_log_lpmf(y[start:end]| log_lambda_group[gidx[start:end]]);   
  }

  // function defined in C++ which calls parallel_reduce_sum with a
  // lambda functor which simply binds the arguments passed into it
  real parallel_hierarchical_reduce(int[] y, vector log_lambda_group, int[] gidx, int grainsize);

  // map-rect alterantive
  vector mr_reduce(vector beta, vector log_lambda, real[] xr, int[] xi) {
    real lp = poisson_log_lpmf(xi| log_lambda[1]);
    return [lp]';
  }
}
data {
  int<lower=0> N;
  int<lower=0> G;
  int<lower=1> grainsize;
  int<lower=0,upper=1> use_tbb;
  int<lower=0,upper=1> use_mr;
}
transformed data {
  real true_log_lambda = log(5.0);
  real true_tau = log(10)/1.96;
  int y[N*G];
  int yg[G,N];
  real xr[G,0];
  int gidx[N*G];
  int start[G];
  int end[G];
  vector[0] theta_dummy;

  print("Simulating problem size: G = ", G, "; N = ", N);
  if (use_tbb) {
    print("Using parallel TBB with grainsize = ", grainsize);
  } else {
    if(use_mr) {
      print("Using map_rect code.");
    } else {
      print("Using serial code only.");
    }
  }

  for(g in 1:G) {
    real lambda_group = lognormal_rng(true_log_lambda, true_tau);
    int elem = 1;
    start[g] = (g-1) * N + 1;
    end[g] = g * N;
    for(i in start[g]:end[g]) {
      y[i] = poisson_rng(lambda_group);
      yg[g,elem] = y[i];
      gidx[i] = g;
      elem += 1; 
    }
  }
}
parameters { 
  real log_lambda;
  real<lower=0> tau;
  vector[G] eta;
}
model {
  vector[G] log_lambda_group = log_lambda + eta * tau;
  real lpmf = 0;

  if (use_tbb) {
    lpmf = parallel_hierarchical_reduce(y, log_lambda_group, gidx, grainsize);
  } else {
    if(use_mr) {
      vector[1] log_lambda_group_tilde[G];
      for(g in 1:G)
        log_lambda_group_tilde[g,1] = log_lambda_group[g];
      lpmf = sum(map_rect(mr_reduce, theta_dummy, log_lambda_group_tilde, xr, yg));
    } else {
      lpmf = poisson_log_lpmf(y| log_lambda_group[gidx]);
    }
  }

  target += lpmf;
  target += std_normal_lpdf(log_lambda);
  target += std_normal_lpdf(tau);
  target += std_normal_lpdf(eta);
}
generated quantities {
  real msq_log_lambda = square(true_log_lambda - log_lambda);
  real msq_tau = square(true_tau - tau);
}

The user_header.hpp file contains this:

#include <stan/math.hpp>

#include <boost/iterator/counting_iterator.hpp>
#include <stan/math/prim/scal/functor/parallel_reduce_sum.hpp>
#include <stan/math/rev/scal/functor/parallel_reduce_sum.hpp>

namespace poisson_hierarchical_scale_model_namespace {

// user provided function
template <typename T3__>
typename boost::math::tools::promote_args<T3__>::type
hierarchical_reduce(const int& start,
                        const int& end,
                        const std::vector<int>& y,
                        const Eigen::Matrix<T3__, Eigen::Dynamic, 1>& log_lambda_group,
                    const std::vector<int>& gidx, std::ostream* pstream__);

template <typename T3>
inline typename boost::math::tools::promote_args<T3>::type
parallel_hierarchical_reduce(const std::vector<int>& y,
                             const Eigen::Matrix<T3, Eigen::Dynamic, 1>& log_lambda_group,
                             const std::vector<int>& gidx,
                             const int& grainsize,
                             std::ostream* pstream__) {
  typedef boost::counting_iterator<int> count_iter;
  typedef typename boost::math::tools::promote_args<T3>::type return_t;
  const int elems = y.size();

  typedef typename boost::math::tools::promote_args<T3>::type local_scalar_t__;
  typedef local_scalar_t__ fun_return_scalar_t__;

  // C++ only binds with a lambda the arguments of the user function
  // this adapter code would need to be generated by the Stan parser
  return_t lpmf = stan::math::parallel_reduce_sum(
      count_iter(1), count_iter(elems+1), return_t(0.0),
        [&](int start, int end) {
          return hierarchical_reduce(start, end, y, log_lambda_group, gidx, pstream__);
        }, grainsize);


  /* C++ only version
  return_t lpmf = stan::math::parallel_reduce_sum(
      count_iter(1), count_iter(elems+1), return_t(0.0),
        [&](int start, int end) {
          const std::size_t partial_elems = end-start+1;
          std::vector<int> partial_data;
          partial_data.insert(partial_data.end(), y.begin() + start - 1,
                              y.begin() + end);
          std::vector<return_t> partial_log_lambda(partial_elems);
          for(std::size_t i = 0; i != partial_elems; ++i)
            partial_log_lambda[i] = log_lambda_group[gidx[start + i-1]-1];
          return stan::math::poisson_log_lpmf(partial_data, partial_log_lambda);
        }, grainsize);
  */
  
  return lpmf;
}

}

The stan data file which makes the above fully running with the stan-math branch parallel-ad-tape-3 is this:

G <- 400
N <- 10
grainsize <- 1100
use_tbb <- 1
use_mr <- 0

Now, the reason why it took me a while is that I really wanted to make sure that all of this speeds up Stan programs - and it does… but: I see about 2x speedup on 2 cores (great)… however, with more cores the speed degrades and with 4-6 cores the execution times explode. Clearly there is still some issue (probably with threads going to sleep or whatever).

In the meantime I also coded up the same example using the performance benchmark suite from @seantalts, see here. When running benchmarks there, I do get again very nice speedups:

1 core
BM_tbbM_median        467394 ns     467134 ns       1242
2 cores => 1.8x
BM_tbbM_median        255672 ns     255380 ns       2660
4 cores => 2.9x
BM_tbbM_median        161253 ns     160471 ns       4579

All in all this convinces me that we are on the right track - the weirdness which happens in a real Stan program needs to be sorted out, but a stripped down problem points in the correct direction.

@syclik I hope this clarifies as to how I think the user should be using the new parallel reduce facility. Bear in mind that the user-provided function must have start and end arguments as the first two arguments. All the remaining arguments are whatever is needed by the user to write down the sliced likelihood evaluation. The slicing is left intentionally to the user who will know best how to subset his data structures in whatever way (and in additional the Stan language has good support for subsetting which we do not have in stan-math).

Best,
Sebastian

2 Likes

Thanks. That’s really helpful.

My question’s always been how all the differnt forms of parallelism will be managed. If I have a parallel map_rect, then what happens if the function in map_rect wants to use multiple threads?

When you do nested map_rect calls right now and only have STAN_THREADS on, then you will end up creating on the lower level again STAN_NUM_THREADS Childs - so you end up getting a lot of threads.

In the bright future with the Intel TBB as I am proposing we do not need to worry about actual threads any more. Instead we tell the TBB to allow for a maximal concurrency which we want to allow for. From the Stan side we then only enqueue so-called task objects which are dispatched by the TBB task scheduler to it’s backend which runs at the desired maximal concurrency. There is no need to manage threads ourselves anymore. We only need to worry about coding up our tasks we want the results for and the TBB will keep the worker threads busy as needed.

In practice, the TBB will create a thread-pool for us and distribute the work within that thread-pool - but we won’t have control over that since it’s considered an implementation detail from the view of the client code of the TBB.

Does that help?

I’ve heard for years I won’t need to worry about memory any more. I don’t believe it.

A thread pool with queing isn’t a magic bullet—it comes with substantial overhead.

How are the queued jobs selected for processing within the thread pool?

How do we deal with all the ordering constraints. If we have a parallel loop, we have to wait for all the threads to finish before going on. Is that kind of thing built into some kind of structured thread pool?

(Sorry if this is off-topic for the current thread).

What’s the scope for developers to build parallelism into functions/distributions themselves? At the moment it seems like the burden is on the user to write their Stan code with map_rect, but it would be great if we could write that on the back-end and just have the user enable threading (similar to how the GPU code is going to work).

I don’t really care as long as it works nicely. If you are interested in the details, you may start here: Documentation Library

That’s easy, I think: Just look at the C++ bit above. The statement

return_t lpmf = stan::math::parallel_reduce_sum(
      count_iter(1), count_iter(elems+1), return_t(0.0),
        [&](int start, int end) {
          return hierarchical_reduce(start, end, y, log_lambda_group, gidx, pstream__);
        }, grainsize);

Will internally start a parallel reduce from the TBB. Only after the parallel reduce is complete in it’s entirety, the function will return. In case you nest another call to another parallel function in that, then that is not a problem as that nested parallel region will also run until completion in the nested call. Now, having said that it is not impossible to create dead-locks when using these techniques - so things are not safe by simply using the TBB. In particular with nested parallelism you sometimes need special considerations, see here: Documentation Library

The TBB offers a concept called “isolation” which means that threads may only be used in dependent tasks, but not in any other. I don’t think we need this feature, but it’s good that it is there.

@andrjohns This is off topic for this thread and it has been discussed as “super-nodes” in the context of the parallel design RFC here: adding parallel_autodiff RFC by wds15 · Pull Request #5 · stan-dev/design-docs · GitHub

In super-brevity:

  • what I propose will allow us to create super-nodes easily (if we implement sub-slicing of data in Stan-math which I don’t think we should waste time on, since it is already implemented in Stan)
  • the map_rect puts some burden on users to program their function, but it gives them ultimate flexibility. The real problem with map_rect is (i) user-unfriendliness due to packing/unpacking of arguments and (ii) it gives up vectorisation and (iii) it requires the user to pre-specify sharding. All of these points are addressed.
  • A really good example for the need of the flexibility of a general reduce is given from @paul.buerkner in this post: Parallel reduce in the Stan language - #11 by paul.buerkner

Super-nodes are useful, yes, but it’s far more useful to have a general thing since otherwise that super-node you need for your model is missing.

…and… @syclik , the post Parallel reduce in the Stan language , is actually another write-up of how I think the new parallel reduce things are to be used in the Stan language. I am going to link that in the parallel RFC as well.

1 Like

I thought that was the topic of this thread—how to enable parallelism within developer-defined Stan C++ functions.

As in performance tests with all these things turned on?

I generally find it hard to optimize coding without knowing what the underlying system is going to do with the code. In particular, the synchronization is where this stuff starts to get expensive, especially when jobs start to get very granular.

Thanks. I’ll take a look.

Sorry if that was not clear. The intention of this post is to demonstrate how the proposed stan-math parallel bits from the RFC can be used by a Stan user. This is to demonstrate what a stan user needs to provide and what a future Stanc3 parser would need to generate/handle. It makes the use case clear, I hope.

I do try to avoid the need to do any synchronization - that is the key to good parallel performance.

(I also need to dig into details here since my toy examples scales greatly while the actual Stan model scales only to 2x, but then performance gets really bad which should not be the case)

Maybe we mean something different by “Stan user”. I’m thinking of someone writing a program in the Stan language.

1 Like

I have similar thoughts on „Stan user“… I think.

Another high-level intro to the TBB could be this:

That’s from 2008, but still gives a good intro.

1 Like

Sure… and I actually got hit by an issue of the TBB which seems to happen if you use their reduce, but have longer waits (iteration-to-iteration) between the calls. This seems to be a case their scheduler is not up to and causes a huge overhead due to some bad interaction of how their reduce creates the tasks and how the threads compete for work using work-stealing. I finally found a solution which essentially blocks the evaluation of the large reduce sum into a fixed number of tasks. The upside of this approach is that it will stay completely reproducible at the cost of not exploiting the perfect parallelization efficiency. In essence I am now pre-chunking the work to be done myself before handing it to the TBB.

You may now say…then why bother with the TBB? There are still a few reasons:

  • nested or multiple parallel regions still need scheduling to happen
  • the use of a threadpool is far more efficient since we avoid thread spin-up and down
  • the parallel map functionality which we have still benefits from automatic scheduling

The upside of this new pattern of the parallel reduce is that it is rather simple now, see here.

With this I am now getting reliable performance increases in the sense of more CPUs increases performance and with larger problem size the scaling improves:

Here you see a hierarchical Poisson likelihood which is evaluated in an end-to-end benchmark here with the Stan program you see above. The cool thing is that you get speedups with just 5000 terms already and things improve the more terms you have to deal with.

We should really move forward with this! I don’t know how much more benefit I am supposed to showcase to show the merits of this approach. In case I am missing something, then let me know.

As a next step I will look into parallelizing adj_jac_apply. I think that here one can compute the terms needed for the adjoint-vector product can be pre-computed while the forward loop continues; and that’s really cool probably.

Best,
Sebastian

2 Likes

@syclik ?

On a related note, I still need someone to test the test framework PR I have. I wasn’t going to test the rest of the matrix lib until thta gets merged.

But isn’t that application specific aside from the final += to the operand adjoints?

What should be easy to speed up is Jaobian and Hessian calculations.

Cool. That’s exactly the kind of thing that I was thinking would cause problems and require manual solutions for. I’m so glad you’re looking into this.

@wds15, thank you for going through and making that example. Here are the next steps:

  • please update the actual doc so it reflects the discussion. adding parallel_autodiff RFC by wds15 · Pull Request #5 · stan-dev/design-docs · GitHub. What you should be discussing is the design of the Stan language for parallelism. I think the design doc should focus on that aspect of it with a secondary focus on implementation, but this is not an evaluation of the implementation.
  • I’ll leave comments in the original PR so you can see where you’re not addressing design / it’s focused on the wrong thing. The work you’ve done, especially in clarifying it in the last few weeks, is really helpful. It needs to make it in the doc.
  • The goal of the doc should be that it’s a stand-alone document that describes the problem, the proposal, and we should be discussing aspects of the design there.

Does that make sense? Sorry, but this means there’s more writing / shuffling of the doc to make it a clear document.

I just added review comments.

It would really, really help if you could separate out the concerns. There is:

  1. design of the parallelism in the Stan language that affects the Stan user. This is what you should be talking about with your design document.
  2. adopting a new dependency for threading, Intel TBB. This is orthogonal. This has challenges as we’ve been discussing elsewhere. We should just leave that as is.
  3. software design or architecture. We need to design how we’d adopt Intel TBB in a sensible way. Or just threading for that matter. Right now, it’s not a great design. I think it’s prudent that we actually walk through software architecture of the existing threading; this would also help with adopting a different threading mechanism. This is strictly a Math library discussion and doesn’t have to do with the design repo.
  4. implementation. This is at the Math library and interacts with the software design.

Things that make it faster: make it easier for the reviewers to understand the scope of the problem and the solutions and whatever complicated parts are there to understand. One thing to notice: we’ve had multiple people ask about how when this parallelism should be used. That should be written up more clearly. I think it needs to be made clear that this is for Stan users (users that write in Stan language) to write their own parallel routines. It is not for Math developers to use for parallelizing computations.