Parallel reduce in the Stan language

Hi!

I have been working on a parallel map and parallel reduce functions in Stan. This is based on the Intel TBB (so no more sharding) and the parallel reduce allows to combine vectorization with parallelization - and I am seeing very promising results with my prototype I have, so I think it’s about time to start to think about how this could look like function signature wise.

Currently I am envisioning for the reduce (summation reduction only) a C++ signature:

template <class InputIt, class T, class BinaryFunction>
constexpr T parallel_reduce_sum(InputIt first, InputIt last, T init, BinaryFunction f);

So you give it an arbitrary input iterator sequence, the initial element to which the sum is initialized to and a binary function f. The binary function will be called with sub-slices f(*slice_start, *slice_end) and the function is then expected to return the sum reduction in the sub-range as represented by the iterators.

The idea is to glue things together using C++ lambda’s. So this is how it could look like being using in C++ code:

typedef boost::counting_iterator<std::size_t> count_iter;
std::vector<int> data(10000, 4);
var lambda = 6.0;

var poisson_lpdf = stan::math::parallel_reduce_sum(
        count_iter(0), count_iter(10000), var(0.0),
        [&](std::size_t start, std::size_t end) {
          const std::vector<int> partial_data(data.begin() + start,
                                              data.begin() + end);
          return stan::math::poisson_lpmf(partial_data, lambda);
        });

I think that is very neat and useful. The user can make references to any objects in the environment and plugging things together is very easy.

The question now is how can we expose this to the Stan language and do we have to wait for Stan3 for this type of stuff.

One possibility could be

functions {
  real reducer(int start, int end, real[] data, real lambda) {
     return poisson_lpmf(data[start:end] | lambda);
  }
}
...
model {
   target += parallel_reduce(reducer, data, lambda);
}

So the Stan function parallel_reduce would take as first argument the function doing the reduction. The reducer function then gets passed in as first to arguments the starting and end index for the slice to be reduced. Followed by that would be an arbitrary number of additional arguments which get passed on to the reducer function (as const refs, of course).

Does that make sense to people? Can we pull this off with the current parser or should we wait for Stan3? I would hope that we can get this in without too much effort in the current parser. Though, I have to say that getting this to work will require a bit of refactoring at the backend. However, I have coded up a prototype which reduces 10^4 data items of a poisson likelihood and this approach can double the evaluation speed of such a loop! So modestly large models get speed up by this already and large models will get a scalable program as we combine vectorization with parallelization.

Needless to say that all of this will be some work to pull off… and I am happily taking any hand I can take here.

Cheers,
Sebastian

For completeness, the parallel map would look like this in C++:

template <class InputIt, class UnaryFunction>
constexpr auto parallel_map(InputIt first, InputIt last, UnaryFunction f);

The function then returns a std::vector and each element of that sequence stores a function evaluation of f(*elem) where elem is an iterator between first and last. With the logic above this basically enables a much easier to use variant of map_rect, because no more data packing or unpacking is needed at all. This should allow a much bigger user-base to use parallelization as it avoids index wars for the user.

8 Likes

At least stanc3, the new transpiler, because we’ve put a hold on all work on the C++ compiler under the expectation the new one will be done in a couple months and replace it. We’re definitely building it all with closures in mind, so they should be much easier to add than they would be in C++.

I’m not sure why the reducer is responsible for slicing the data. Shouldn’t parallel_reduce do that and send the data out?

With how many cores?

Much less than it would’ve required before stanc3, that’s for sure.

This makes it a lot easier to implement it for me and I do not think that this would be such a super-inconvenient thing for the user to do. I see it as a nice to have to make the reducer do the slicing. It’s just that the user knows the data structure far better (what is a vector / scalar / what needs to be sliced). So I would opt for a bit more user knowledge in the function and a much reduced implementation headache for me/us. At this point I am concerned about making a solid prototype which convinces more people to work in this with me. I think/hope that the prototype is now at this point.

Of the top of my head it was:

  • 1 core: 320ms
  • 2 cores: 170ms
  • 3 cores: 150ms

and starting from 4 cores the performance degrades right now.

Using C++17’s transform_reduce’ double functor signature(one for transform, one for reduce) makes it more future proof. I don’t mind naming it same actually.

1 Like

I do not think that this would be a good thing. When you transform each element you are throwing each element onto the AD stack which is exactly what we want to prevent with vectorisation of the reduce operation.

So I would suggest instead to have a parallel map (which basically is the transform) and a parallel reduce separate from one another. You can then stick the output of parallel map into parallel reduce if you want to. While there is some gain in doing that at once, I do not think that it is worthwhile to combine in Stan-math. Also, the user can always include in the reduce operation the transform step if he wants.

Off the current convo but relevant to topic, is the reason for map_rects signature so that you can pull out the var and data types? If so, could we make it variadic by pulling those out with something like I have below?

this gets cl::Buffers, but I think you could do the same scheme to pull out vars / data and have a void on the non-var / data inputs when passing the other

We use that function here to unpack and get the buffers for the kernel.

In Torsten we use something similar signature like

template<typename F, typename T1, typename T2, typename... Ts>
auto omp_binary(const F& f, const std::vector<T1>& v1, const std::vector<T2>& v2, const Ts&... other) {
  std::vector<decltype(F()(v1[0], v2[0], other...))> result(v1.size());
....
}
1 Like

Given that I understand the slicing correctly, I agree with @wds15 that giving the user the responsibility for slicing is a good idea. Imagine having a complex custom log-density function in which some arguments need to be sliced in weird ways, then it’s far easier for me, as a user, to do that. In fact, it could sometimes be the only option to achieve the desired slicing.

I think I would also be able to automate that in brms by writing reducer functions on the fly tuned to the specific likelihood structure of the model.

Nice! I’m trying to pseudo out how that would look with just the map_rect signature

template <int call_id, typename F, typename T_shared_param, typename T_job_param>
Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type, Eigen::Dynamic, 1>
map_rect(
  const Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1>& shared_params,
  const std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>>& job_params,
  const std::vector<std::vector<double>>& x_r,
  const std::vector<std::vector<int>>& x_i,
  std::ostream* msgs = nullptr
)

Something like (?)

// to_shared_params returns Eigen::Matrix<T_shared_param, Eigen::Dynamic, 1> or nothing (I think that's possible?
// to_job_params returns std::vector<Eigen::Matrix<T_job_param, Eigen::Dynamic, 1>> or nothing
// to_data_vars returns std::vector<std::vector<T>> where T is an arithmetic type
template <int call_id, typename F, typename T_shared_param, typename T_job_param, typename... Args>
Eigen::Matrix<typename stan::return_type<T_shared_param, T_job_param>::type, Eigen::Dynamic, 1>
map_rect(
  const to_shared_params<,T_shared_param, Args>::type&... shared_params,
  const to_job_params<T_job_param, Args>::type&... job_params,
  const to_data_vars<Args>&... data_var,
  std::ostream* msgs = nullptr)

I’m not terribly familiar with how the rest of the code works though. What is the difference between the T_shared_param and T_job_param? Will they be vars or doubles or just whatever? Looking at the docs it looks like they are params and therefore vars?

Also the above is very ‘give a man a hammer and everything is a nail’, probably not the nicest way, just the way I’ve done something like this recently.

If we expect this to work on a complex custom function on arbitrary slicing of data, how can we ensure parallel performance?

You are probably right, perhaps I am expecting too much of this.

Let’s make a simple example. Suppose we have some kind of an autoregressive model, then we could write

reducer(int start, int end, vector y, real rho, vector b, matrix X, real sigma) {
   vector[end - start + 1] ym1;
   if (start == 1) {
     ym1 = [0, y[start:(end - 1)]];
   } else {
     ym1 = y[(start - 1):(end - 1)];
   }
   return normal_lpdf(y[start:end] | X[start:end] * b + ym1 * rho, sigma);
}

Not sure if this example makes any sense at all in this context, and I am not saying we couldn’t build this model with parallel_reduce doing the slicing. I just mean, we can make the interface cleaner by letting everything happen inside a user-defined function.

1 Like

This is exactly as I had it in my mind…the users knows best how to summarize the slices.

To ensure parallel performance all we need as a guarantee is that all operations are independent, which we can ensure by the function signature as inputs are constant refs. In case some index ranges are heavier than others to compute, we can (hopefully) rely on the work stealing queuing of the tbb which will balance that.

…and in addition things get a lot easier to implement if we let users handle slicing…moreover, we could use such a parallel reduce to implement a reduce with automatic slicing; its an additional feature which can be enabled later if really needed.

Another simplification I have made is to focus on the summation reduction, but I suppose that is a good starting point and will be the major reduction operation used in Stan.

Yes. And the packing into arrays is so that we can get away with a single internal function signature.

In the future, this should all be unrolled with parameter packs and variadic arguments.

I couldn’t see the connection here. But for the unpacking function, that’s sort of what the parameter pack version would look like.

I think we’ll be able to use traits to figure out what is data and what needs a derivative. We’ll still need to be told which are the state variables.

When you have N jobs running in parallel, the shared parameters are shared among all 1, 2, …, N jobs, whereas the job parameters are specific to each job. For instance, in a model with a latent parameter for each observation and some prior parameters, the priors would be shared parameters and the data-specific latent parameters would be job-specific parameters divided up along with the data.

1 Like