Hi!
I have been advocating the use of a threadpool as implemented in the Intel TBB as opposed to using our current C++11 threading model which creates new threads everytime one dispatches asynchronous work. I finally have a good way to benchmark this and to observe the speed difference.
I am using as example a hierarchical Poisson reduce with G=500 groups and N=10 observations per group. The Stan code is below. I am using 4 different approaches:
- TBB parallel reduce (very easy to write down for the user)
- TBB parallel map (somewhat more data-prep required)
- vanilla
map_rect
(codes in the same way as the map, so data-prep is needed and it’s klunky to code with) - serial code written in a very straightforward easy way
The results are here:
To me they show that:
- the TBB with the threadpool has clearly the best scaling behavior and outperform the current C++11 thread based approach
- the TBB parallel map 1 core performance is faster than the serial code, probably due to better/smaller AD graph footprint at the cost of somewhat cumbersome coding
- the
map_rect
is given the scalable memory allocator form the TBB, so without the TBB (as of now in Stan) things will even be worse formap_rect
What is super amazing here is that the execution time on 6 cores is 5x faster than the respective serial program - that is 70s run time go down to just 14s!
Here are the codes which run with the parallel-ad-tape-3
branch on stan-math:
functions {
// runs reduce over the index range start to end. Mapping from
// data-index set to group indices pre-stored in gidx
real hierarchical_reduce(int start, int end, int[] y, vector log_lambda_group, int[] gidx) {
return poisson_log_lpmf(y[start:end]| log_lambda_group[gidx[start:end]]);
}
// function defined in C++ which calls parallel_reduce_sum with a
// lambda functor which simply binds the arguments passed into it
real parallel_hierarchical_reduce(int[] y, vector log_lambda_group, int[] gidx, int grainsize);
// map-rect alterantive
vector mr_reduce(vector beta, vector log_lambda, real[] xr, int[] xi) {
real lp = poisson_log_lpmf(xi| log_lambda[1]);
return [lp]';
}
real hierarchical_map(int g, vector log_lambda, int[,] yg) {
real lp = poisson_log_lpmf(yg[g]| log_lambda[g]);
return lp;
}
// parallel_map TBB based alternative
real[] parallel_hierarchical_map(int[] group, vector log_lambda, int[,] yg);
}
data {
int<lower=0> N;
int<lower=0> G;
int<lower=0> grainsize;
int<lower=0,upper=3> method;
}
transformed data {
real true_log_lambda = log(5.0);
real true_tau = log(10)/1.96;
int y[N*G];
int yg[G,N];
real xr[G,0];
int gidx[N*G];
int start[G];
int end[G];
int group[G];
vector[0] theta_dummy;
print("Simulating problem size: G = ", G, "; N = ", N);
if (method == 0) {
print("Using parallel reduce TBB with grainsize = ", grainsize);
} else if (method == 1) {
print("Using parallel map TBB");
} else if (method == 2) {
print("Using map_rect.");
} else if (method == 3) {
print("Using serial evaluation only.");
}
for(g in 1:G) {
real lambda_group = lognormal_rng(true_log_lambda, true_tau);
int elem = 1;
group[g] = g;
start[g] = (g-1) * N + 1;
end[g] = g * N;
for(i in start[g]:end[g]) {
y[i] = poisson_rng(lambda_group);
yg[g,elem] = y[i];
gidx[i] = g;
elem += 1;
}
}
}
parameters {
real log_lambda;
real<lower=0> tau;
vector[G] eta;
}
model {
vector[G] log_lambda_group = log_lambda + eta * tau;
real lpmf = 0;
if (method == 0) {
lpmf = parallel_hierarchical_reduce(y, log_lambda_group, gidx, grainsize);
} else if (method == 1) {
lpmf = sum(parallel_hierarchical_map(group, log_lambda_group, yg));
} else if (method == 2) {
vector[1] log_lambda_group_tilde[G];
for(g in 1:G)
log_lambda_group_tilde[g,1] = log_lambda_group[g];
lpmf = sum(map_rect(mr_reduce, theta_dummy, log_lambda_group_tilde, xr, yg));
} else if (method == 3) {
lpmf = poisson_log_lpmf(y| log_lambda_group[gidx]);
}
target += lpmf;
target += std_normal_lpdf(log_lambda);
target += std_normal_lpdf(tau);
target += std_normal_lpdf(eta);
}
generated quantities {
real msq_log_lambda = square(true_log_lambda - log_lambda);
real msq_tau = square(true_tau - tau);
}
#include <stan/math.hpp>
#include <boost/iterator/counting_iterator.hpp>
#include <stan/math/prim/scal/functor/parallel_reduce_sum.hpp>
#include <stan/math/rev/scal/functor/parallel_reduce_sum.hpp>
#include <stan/math/prim/scal/functor/parallel_for_each.hpp>
#include <stan/math/rev/scal/functor/parallel_for_each.hpp>
namespace poisson_hierarchical_scale_model_namespace {
// user provided function
template <typename T3__>
typename boost::math::tools::promote_args<T3__>::type
hierarchical_reduce(const int& start,
const int& end,
const std::vector<int>& y,
const Eigen::Matrix<T3__, Eigen::Dynamic, 1>& log_lambda_group,
const std::vector<int>& gidx, std::ostream* pstream__);
template <typename T3>
inline typename boost::math::tools::promote_args<T3>::type
parallel_hierarchical_reduce(const std::vector<int>& y,
const Eigen::Matrix<T3, Eigen::Dynamic, 1>& log_lambda_group,
const std::vector<int>& gidx,
const int& grainsize,
std::ostream* pstream__) {
typedef boost::counting_iterator<int> count_iter;
typedef typename boost::math::tools::promote_args<T3>::type return_t;
const int elems = y.size();
typedef typename boost::math::tools::promote_args<T3>::type local_scalar_t__;
typedef local_scalar_t__ fun_return_scalar_t__;
// C++ only binds with a lambda the arguments of the user function
return_t lpmf = stan::math::parallel_reduce_sum(
count_iter(1), count_iter(elems+1), return_t(0.0),
[&](int start, int end) {
return hierarchical_reduce(start, end, y, log_lambda_group, gidx, pstream__);
}, grainsize);
/* C++ only version
return_t lpmf = stan::math::parallel_reduce_sum(
count_iter(1), count_iter(elems+1), return_t(0.0),
[&](int start, int end) {
const std::size_t partial_elems = end-start+1;
std::vector<int> partial_data;
partial_data.insert(partial_data.end(), y.begin() + start - 1,
y.begin() + end);
std::vector<return_t> partial_log_lambda(partial_elems);
for(std::size_t i = 0; i != partial_elems; ++i)
partial_log_lambda[i] = log_lambda_group[gidx[start + i-1]-1];
return stan::math::poisson_log_lpmf(partial_data, partial_log_lambda);
}, grainsize);
*/
return lpmf;
}
template <typename T1__>
typename boost::math::tools::promote_args<T1__>::type
hierarchical_map(const int& g,
const Eigen::Matrix<T1__, Eigen::Dynamic, 1>& log_lambda,
const std::vector<std::vector<int> >& yg, std::ostream* pstream__);
template <typename T1__>
std::vector<typename boost::math::tools::promote_args<T1__>::type>
parallel_hierarchical_map(const std::vector<int>& group,
const Eigen::Matrix<T1__, Eigen::Dynamic, 1>& log_lambda,
const std::vector<std::vector<int> >& yg, std::ostream* pstream__) {
typedef typename boost::math::tools::promote_args<T1__>::type return_t;
const int elems = group.size();
// C++ only binds with a lambda the arguments of the user function
std::vector<return_t> lpmf = stan::math::parallel_map(
group.begin(), group.end(),
[&](int g) -> return_t {
return hierarchical_map(g, log_lambda, yg, pstream__);
});
return lpmf;
}
}