Using the TBB to parallelize the distributions

The tl;dr to the whole thing below is that I think we can refactor the marginals in the distributions with lazily evaluated lambdas then evaluate those in parallel and end up at the final log probability with TBBs parallel_deterministic_reduce. For models that use parallelism like this they will only be exactly reproducible when threading is turned on, but nothing should change wrt single threaded models.


Been thinking more about automated parallel stuff in the math library and started trying to simplify some of the distributions. Things like cleaning up multiple loops and ifs so that I could have a bit of a better picture at what’s going on. I was going on a bit of a spree and refactored things like normal so that they are just one for loop instead of several.

The weird thing to me was the multiple loops in the current implementation along with VectorBuilder, which in the case of a scalar gives [] access to the scalar and otherwise just access the elements in the original container. I was looking at seq_scalar_view and then VectorBuilder and it looks like they are v v similar? So I was like “Well what if I remove VectorBuilder?” and everything worked. Though I haven’t evaluated the performance yet so maybe removing those was not as clever as I thought.

But was thinking about what bob mentioned about simplifying the distributions in this thread

For example, in evaluatingly

normal_lpdf(y | mu, sigma)

if y is a vector of size N and sigma is a scalar, then we should be able to do this:

if (log_sigma.size() == 1)
  lpdf -= N * log_sigma[0];
else 
  for (int n = 0; n < N; ++n)
    lpdf -= log_sigma[n];

But I sort of worry about how the code will look with a bunch of new it statements, also if we calculate something out of the loop then we need to either switch to have multiple loops over the vectors or have a bunch of if statements in the loop. It would look kind of icky and while I’m a speed boy I like my code to caché when it sits in cache

Bear with me, that doesn’t make any sense in relation to the TBB but I’ll pull a Matt Levine in 30 seconds and make it relevant.

I think, with the TBB, what we could do is refactor the probability distributions to be automatically parallel. We can do this by changing the marginals to be lambdas that are given to the operands_and_partials , then the results of those lambdas (which can be done in parallel) will be accumulated in parallel into logp . We will also do the thing Bob mentioned above when we have the v simple case.

I’ll write down what I think it would look like and then comment on each section

Right now inside the loop we have stuff like

// Inside of loop over N
    if (include_summand<propto>::value) {
      logp += NEG_LOG_SQRT_TWO_PI;
    }
    if (include_summand<propto, T_scale>::value) {
      logp -= log(value_of(sigma_vec[n]));
    }
// Do this for all the log prob accumulations
  return ops_partials.build(logp);

But what I’m saying is we could get rid of the loop and do something like

T_partials logp(0.0);
  // Build operands and partials
  operands_and_partials<T_y, T_loc, T_scale> ops_partials(y, mu, sigma);
  // If it's dumb, then let it be so
  if (include_summand<propto>::value) {
    logp += N * NEG_LOG_SQRT_TWO_PI;
  }
  // The check for each marginal
  if (include_summand<propto, T_scale>::value) {
    // We can do Bobs thing and simplify stuff
    if (sigma_vec.size() == 1) {
      logp -= N * log(value_of(sigma_vec[0])); // 1
    } else {
      // Add a marginal
      ops_partials.add_marginal([](auto& sigma) { // 2
        return -log(value_of(sigma));
      }, sigma_vec); // 3 Value used in lambda
    }
  }
///////
// Pretend I did the above for each marginal here
//////
  // 6 We could run everything here. Orrr we could just initialize it...
  return ops_partials.initialize(logp);

That looks a little weird, but essentially in (1) we are doing what Bob recommended and having no loop in the case of just one variable. In (2) we have this new method for operands_and_partials, add_marginal() which takes in a lambda for calculating the logp of that marginal. We also give it the inputs to the lambda, but maybe we can be more clever about that. The inputs for operands_and_partials are put in the class as op1, op2, ... So we could expose something like ops_partials.op1 that will retrieve y for the lambda without making a copy.

But cool, at this point we have all the lambdas for the marginals defined, so what do we do in operands_and_partials ? Well, I haven’t totally fleshed that out yet, but I can tell you two things.

  1. We will probably need to refactor operands_and_partials as well as scalar_seq_view ( scalar_seq_view needs to conceptually be like an STL container). This is not a giant leap, mostly adding a few methods and type traits.
  2. For operands_and_partials we will need to run each of the marginals and then accumulate their results to logp .

How we run those marginals is an interesting Q. We could run them all in parallel getting back a vector for each log prob calculation, another step to accumulate those into single values per marginal, then a simple sum over the much smaller sum of logp for each marginal. Or maybe we can compose the lambdas into one larger one so we get better memory access? It’s an open question to me

I’ve looked over the code and idt either of these would be that bad. But we would probably want to refactor the templates to make thing more general, add the add_marginal function to store the lambdas and their inputs inside of some vector or something, and then change build to execute all this stuff with the tbb.

(6) is kind of weird and optional, but instead of starting all of the marginal calculations at the return like we do now, the return can just give back a promise that will hold the value of the marginal calculations. That’s kind of nice because then we can interweave a bunch of operations as resources become available.

@wds15 had more knowledge of this and I liked his below a lot

Your thought of returning a promise is a good one! I actually figured we could go one step further: If you use the adjoint-things from Ben you can return something which represents already the function value (which needs to be calculated instantly), but the partials can be calculated while the forward sweep continues. The partials are need only when the backward sweep goes over the respective node. This way you can continue to build the forward sweep while you calculate the partials. Basically the idea is like this, https://github.com/stan-dev/math/wiki/Parallel-Asynchronous-ODE-Integration only general.

What’s nice about the TBB infrastructure is that we can do the distribution parallelization at the stan math level, and have the parallelization at the algorithm level, and the user specified parallelization in the @wds15’s parallelization proposal! The TBB does this through a pretty clever load balancer and scheduler. If we send all of our parallelization through the TBB task graph we can give it some info so that it won’t clog up the thread pool with too much. Which is nice!

I shared this with @wds15 earlier and will include some parts of our discussion here

  • can you still get exactly reproducible results (summation order matters)?

The parallel_deterministic_reduce does help us have reproducibility across runs which use threading. But if someone used threading for the original model and then another person tried to replicate with threading turned off they would not get the same answer. Whether that’s a cost we can pay is open to debate, though worst case we could treat threading like cigarettes and have a big big warning label about how threading is bad for reproduciblity

  • how much overhead do you introduce by the parallel glue?
    […] This will likely mean that we need two versions of the distributions - a parallel and a serial one; and this is what ultimately turned me down in pursuing this avenue as it gets into a lot of work to code up two versions. If the overhead is really small then we may not need two version, then the picture is different…but I doubt that unless I am proven wrong… and that’s the magic of the general reduce as proposed. It’s clear there is overhead and since the user tells us that he wants this bit of code to be paralleled, then it must be worth it.

The parallel glue will surely make overhead, both at the performance and maintenance level. With respect to maintenance, I think my proposal above is kind of nice, it clearly encapsulates each marginal and lays out where the math tricks can happen. We can also hide a lot of the TBB stuff in operands_and_partials so that people who want to add distributions do not need to be aware of the TBB system.

As per the literal glue cost i.e. running the lambdas in the single threaded case and setting up the TTB threads. I’m not sure, but what I can say is yes definitely. As I have everything in my mindspace at the moment, iterating over the containers more than once seems horrendous. Maybe we could be clever with how we compose the lambdas to reduce that, like wrapping them in one larger lambda in the single threaded case.

I also agreed with @wds15’s note below. EOD, the parallelization I’m proposing is naive in it’s application. A user is going to have a lot more knowledge about which parts of their model should be run over the map-reduce scheme. And knowledge is power!

Both of these lead me to conclude we need a parallel user-controlled reduce as I have proposed. A parallel version of our distributions would still be cool - no question about that. We would just need to be careful about when we use them should they introduce overhead which you don’t want whenever data is small or we are already using other parallelism. The TBB can handle nested parallelism, yes, but we need to be careful to not do harm to the overall performance.

6 Likes

I’m a bit confused by what you’re calling a marginal in the evaluation of normal_lpdf(y | mu, sigma). I see calls like ops_partials.add_marginal() but I’m not sure what that’s doing, as sigma is going to be of size > 1 in this branch—so it looks like it’ll be vectorized.

We don’t want to be using dynamic calls in conditionsl ike if (sigma_vec.size() == 1). I think we can use traits to test if the original sigma is a scalar.

That’s probably bad wording on my part, it should probably be add_partial. So we add each of the partial calculations that we need into ops_partials and then execute those when we run ops_partials.build(logp).

So if we had 16 cores and 4 partial calculations, we can give 4 threads to each partial calculation to accumulate logp for each. Then just do a serial sum over the logp we get back from the 4 partials.

I think this would be nice for large N, small N would def pay too much overhead to beat the vectorized version. I I theres a clever way to compose those lambdas in the single core setting so everything is still vectorized

Does that make sense?

Yes def agree we could use type traits to deduce that

Now that I’m rereading this I can see its incomplete because of how hand wavey I’m being about what it looks like in ops_partials. I’ll write out the tbb and execution components this week