Map_rect causing substantial slowdown; trying to understand how to fix

So, my real model is a time series with ~1000 observations (3 years with daily data) and ~25 predictors.

You can imagine this as a 1000x25 matrix with each cell representing the contribution of the ith predictor on the jth observation. To get the final prediction, sum across the rows.

The columns are independent, whereas the rows are not. For each of the 25 predictors, there is a fairly expensive set of operations to fill in the column.

It takes about ~2 days to fit the model as-is. And I’d like to run it for longer! We expect to have to make regular tweaks with new data so bringing the runtime down is a major priority.

Without multithreading, the gradient evaluation is about ~0.35 seconds.

Since the unit of parallelization has to be the column, reduce_sum is out of the question, since we need to return a vector, and not a real. But map_rect is very much in play.

However, when I “repacked” the model and used map_rect, with 8 threads, the gradient evaluation jumped to 6.2 seconds – so about a ~20x jump. Within this setup, more threads is better (with 4 threads, gradient eval was ~8 seconds), but not approaching the efficiency the non-multithreaded model. Aside from passing the data back and forth, there are no additional operations in the model between the non-multithreaded and the multithreaded versions.

To test how much of an impact the passing of data back and forth had, I wrote a toy model to illustrate the issue. It’s all in the notebook here.

This is on a Mac. I used the release candidate for cmdstanr to compile the model. I haven’t done anything with MPI and I don’t know how to! (Sidenote, the stan wiki for MPI parallelism which is linked-to in several places seems to be dead. I have not found an alternative).

I am wondering if there is anything that can be done!

1 Like

Really? Just transpose your data… or just do not slice your data. You can simply give to reduce_sum as slicing argument and integer array over the index set of your columns. I would start with that, so

int cols_idx[num_cols];
for(j in 1:num_cols) cols_idx[j] = j;

Then you just give cols_idx to the slicing argument of reduce_sum and pass along the entire data as shared argument (not a problem, because shared data is never copied)

Sorry, that description of the situation was just a conceptual motivation. As I mentioned above,

The long computation in question is to fill in the columns. Specifically, there’s a recursive operation (a convolution) going down the column in each columns, so you can’t parallelize across rows. E.g. what goes in the 10th row depends on the 9th, 8th, etc.

Well, you could parallelize, but that would be parallelizing the “rowsum” operation which is not the bottleneck.

The relevant part of the stan model looks like this conceptually*:

for (i in 1:n_cols) {
  the_matrix[, i] = some_expensive_operation()
}
for(i in 1:n_rows) {
  prediction[i] = sum(the_matrix[i, ])
}

actual ~ normal(prediction)

I was hoping to parallelize the first loop, which returns a vector. This is why reduce_sum does not fit the bill.

*Actually, it’s written more like this

vector[n_timesteps] prediction = rep_vector(0, n_timesteps)
for (i in 1:n_cols) {
  prediction += some_expensive_operation()
}

actual ~ normal(prediction)

the only point where anything is getting summarized as a real is the sampling statement at the end, which can hardly be worth parallelizing as it’s only ~1000 observations

Does that make sense?

Is this a discrete convolution? How is it coded?

(you are right in that this structure is not really amenable to reduce_sum)

The dummy code you provide for map_rect is not surprising to see that map_rect is not any good. There is always overhead with parallelisation, so that each unit should represent some significant amount of work to offset the extra burden due to going parallel.

Are u using threads or MPI?

Yes, exactly. It is coded as the following: The max_horizon term just ensures that we are not going through the trouble to calculate a ~0 too many times over. It just limits the number of computations.

vector shift_decay(vector x, real shift, real concentration, int max_horizon){
    int n = num_elements(x);
    vector[n] shift_decayed = rep_vector(0, n);
    vector[max_horizon] weights = rep_vector(0, max_horizon);

    for(t in 1:max_horizon) weights[max_horizon-t+1] = exp(neg_binomial_2_lpmf(t-1 | shift, concentration));

    for(i in 1:n) {
      if(i <= max_horizon) {
        shift_decayed[i] = dot_product(weights[(max_horizon-i+1):max_horizon], x[1:i]);
      } else {
        shift_decayed[i] = dot_product(weights, x[(i-max_horizon+1):i]);
      }

    }

    return(shift_decayed);
  }

I am using threads!

I don’t know how to compile the model to use MPI instead and the instructions linked-to in the manual are no longer live on github. I haven’t been able to find something that I (a layperson to all things C++) could understand. These links take you a “new page” in wikipedia:

If you use Stan >= 2.21, then the TBB is used for threading and there should not be a speed gain by going MPI.

Is it possible to share the weights calculation maybe for all the columns?

If not, then I don’t quite see how to speedup this thing. Using dot_product is key here, which you already do.

1 Like

Just checking – is this true on a Mac as well? Is it worth trying with MPI, in any case? It seems that the slowdown is caused by passing the data between threads, so if one of these approaches dominates the other on that dimension, it may be worth trying.

Unfortunately not. The shift and concentration terms are unique to each column :/

Ok! Thanks for trying, though. Much appreciated.

Maybe I am off here… but looking at your shift_decay function again suggests that this function acts on a full column, right? In addition, every row returned is independent of the previous rows, since the dot product does not involve shift_decay from before. The prediction per time-point can be represented as a sum over dot products. I kind of have the impression, that you can re-order how the sums are executed and then each row in the prediction becomes independent of the others.

All what you probably would need is to pre-calculate this weights matrix first which will likely become a shared argument to reduce_sum.

Anyway, this is an index war apparently which is easily messed up… and no guarantees that this will speed things up or waste you time ;).

Absolutely, but if I had 1000 time points, wouldn’t I need 1000 reduce_sum calls? Something like:

for(i in 1:n_timesteps) {
  prediction[i] = reduce_sum(...)
}

Is a that totally crazy thing to do?

No… you would need to work out how the sums can be swapped. Right now you calculate per column and then per rows and rows represent observations. You need to swap that. So calculate per row (per data-item) the stuff over the columns. Then you hopefully end-up at something which is

target += reduce_sum(do-the-data-likelihood-in-partial-sums, ...)

Firing off one reduce_sum per prediction is not what you want, no.

Ah, of course. When you put it like this:

It’s very clear.

reduce_sum is just a device to break up large sums of independent terms. Nothing more. The data likelihood is just that. So you need to provide a function which can calculate any arbitrary partial sum. Which partial sum is going to be calculate will not be under your control. That’s it. I thought this comes out in the user manual + reference manual.

It does – not the user manual’s fault. It just seemed like – since the “rows” were not conditionally independent, that this would be intractable. But with a bit of creative thinking on your part, seemingly not!

I see.

Coming up with reduce_sum took some time and some creativity to get there. I am relatively happy with it so far given it’s flexibility.

I am going to present on it during StanCon. Here is my “recipe” to convert any model (as a dump from my slide right now):

Use reduce_sum only once everything works!

Recommended workflow

  1. Define a test of correctness for the model.

  2. Optimize the model for single-core performance first… test!

  3. Choose a meaningful per term li(yi,ηi,ξ)

  4. Restructure in steps the model:

  5. Define as a function the per term li(yi,ηi,ξ)

  6. Define a function calculating the partial sum f(j,k)

  7. Test these by writing the log-likelihood as target += f(1, m_1) + f(m_1+1, m_2) + f(m_2+1, N);

  8. Optionally rewrite f(j,k) to use vectorization over multiple terms li(yi,ηi,ξ)… test!

  9. Finally, use reduce_sum (or reduce_sum_static)

  10. For reduce_sum_static you must find a meaningful grainsize

1 Like