Help with reduce_sum

Hello again. My struggle with within-chain parallelization using reduce_sum is unfortunately ramping up. I moved to using CmdStanR (after CmdStanPy did not work with multi threading), but when I run this model using reduce_sum I have two problems:
(1) the model with reduce_sum takes twice as much time as the model without reduce_sum.
(2) even though I designed the model to have two threads and defined threads_per_chain = 2, I see that I have 3 running threads. Maybe these problems are related.
I am really stuck…

model {
    for (n in 1:N) {
        for (w in 1:W) {
            if (w == 1) {                                                                                 
                X[n,w] ~ normal(mu_prior_x,sigma_v);                           
            else {         
                 X[n, w] ~  normal((A * X[n, w-1] + B * U[n, w-1]), Q);                                                          
            theta_pr[n, w] ~ normal(C[1,] * X[n, w],sigma_r);                             
            int grainsize = 13;
            vector[tr_max] a;
            vector[tr_max] b;
            vector[tr_max] c;

            a[ :tr[n, w]]   = data1[n, w, :tr[n, w]]  ./ (1 + theta[n, w] * data2[n, w, :tr[n, w]]);
            b[ :tr[n, w]]  = data3[n, w, :tr[n, w]];
            c[ :tr[n, w]] = a[ :tr[n, w]] - b[ :tr[n, w]];
            target += reduce_sum(partial_sum, choice[n, w, :tr[n, w]], grainsize, c[:tr[n, w]], theta[n, w]);      

cmdstan version 2.23.0
macOS Sierra
R version 3.3.2.

1 Like

Hi @nerpa. It will help if we can read your partial_sum function (or ideally show a minimal working example), along with your R code that calls the program, including what you set as the grainsize. Further, it is generally best to a) place as much of the program inside the partial_sum function(s) and b) move things like

X[n,w] ~ normal(mu_prior_x,sigma_v);

outside loops in favor of summing over a vector and incrementing target. Also, what hardware are you trying to use for threading?

1 Like

Thank you! Here is the partial_sum function - it’s pretty standard

functions {

    real partial_sum(int[] y_slice, int start, int end, vector x, vector beta) {
        return bernoulli_logit_lpmf(y_slice | beta * x[start:end]);

My model is really big, so I am not even sure how to put smaller parts of it to keep people here going through it.

that’s what I call from R:

model_parallel <- cmdstan_model("model_parallel.stan", cpp_options = list(stan_threads = TRUE))
time1 = system.time(fit1 <- model_parallel$sample(data_model,
                                             chains = 2,
                                             parallel_chains = 2,
                                             threads_per_chain = 2,
                                             refresh = 1000))

grainsize is hardcoded at this level to be 13 (it’s in the model part I attached) since the overall number of trials is 26 and I am testing with 2 threads.

UPDATE - I use macbook with 16GB 2.9 GHz Intel Core i7.

I wish I could move things like X[n,w] ~ normal(mu_prior_x,sigma_v); but I am coding an evolving process in time (w) so I am not sure if that’s possible to move it out.

Sorry I missed that you defined it in the model block, I was expecting that to be data. You can move

int grainsize = 13;

to transformed data and it will only be called once when the model first runs, instead of every iteration within both loops, for example.

By trials, do you mean you only have 26 rows of data? The function bernoulli_logit_lpmf is already highly optimized and the reduce_sum function has some overhead for parameters, so on balance, it may or may not be faster, but if your model is mostly running code outside that function, then you may not get much speedup anyway because the code outside reduce_sum is the limiting factor + the overhead in the function call, and you can search discourse for previous discussions on this.

Also you can slice vectors to avoid loops, see for example, Stan User Guide section 2.1 on time series:

y[2:N] ~ normal(alpha + beta * y[1:(N - 1)], sigma);
1 Like

yes, i will move int grainsize = 13; to transformed data, thanks.
My test data has 26 rows, but the actual data is much larger + I have more models that I fit in every N-W loop with different parameters and I hoped reduce_sum will speed things up.

By slicing, you mean that I could slice X instead of looping over it?

1 Like

With such a small test set, parallelizing may well be slower, but the more data the larger the benefit.

Yes, among other things.

1 Like

That’s helpful and encouraging! I will try with larger data sets.

What else could I slice here? At this point, my full model with a subset of subjects (n=15) fits for a week. So every suggestion how to speed things up will be highly appreciated.

I have one more question - does reduce_sum really wait until all shards are being processed before moving forward? It is still unclear why I saw 3 threads running instead of 2.

It’s not just slicing. This is getting off-topic, but I would focus on building the model piece by piece, trying to apply the various concepts in the user guide, and post a different question with the full model (it can be hard to diagnose efficiency issues without seeing how the whole model is coded and fits together; even the direction of indexing in arrays versus matrices can give speedups), if you can, for efficiency advice.

Hard to say with the information here … the way you specified the call, it should have 4 total threads, two per chain.

1 Like

I wasn’t clear, sorry - I had 3 threads per chain while I defined only 2.

Will do in the future. Thanks!

sad, but true - just filed implement threading logic · Issue #256 · stan-dev/cmdstanpy · GitHub to correct this.

Thank so much for keeping up with this! Do you have any rough estimate when it might work? (I really don’t like R so far :)
Btw, I read your filed note, and multithreading doesn’t work in cmdstanpy even when I set
`os.environ[‘STAN_NUM_THREADS’]. Hope this helps!

chains are run via Python’s subprocess module - setting the environment variable isn’t enough - according to SO, it needs to be specified in the call to Popen:

1 Like

I think we already do that

you’re correct, we do -

will investigate.

update - this seems to be the reason why this isn’t working:

will be fixed by

1 Like

I didn’t have any problems with this example using CmdStanPy

import cmdstanpy

model_code = """
functions {
  real partial_sum(int[] slice_n_redcards,
                   int start, int end,
                   int[] n_games,
                   vector rating,
                   vector beta) {
    return binomial_logit_lpmf(slice_n_redcards |
                               beta[1] + beta[2] * rating[start:end]);
data {
  int<lower=0> N;
  int<lower=0> n_redcards[N];
  int<lower=0> n_games[N];
  vector[N] rating;
  int<lower=1> grainsize;
parameters {
  vector[2] beta;
model {

  beta[1] ~ normal(0, 10);
  beta[2] ~ normal(0, 1);

  target += reduce_sum(partial_sum, n_redcards, grainsize,
                       n_games, rating, beta);
model_file = "./part_sum_example.stan"
with open(model_file, "w") as f:
    print(model_code, file=f)

model = cmdstanpy.CmdStanModel(stan_file=model_file, cpp_options={"STAN_THREADS" : 1})

import os
os.environ["STAN_NUM_THREADS"] = "4"

import pandas as pd
data = pd.read_csv("./RedcardData.csv")
data = data.dropna(subset=["rater1"])

stan_data = {
    "N" : data.shape[0],
    "n_redcards" : data["redCards"].values,
    "n_games" : data["games"].values,
    "rating" : data["rater1"].values,
    "grainsize" : 1,

# see cpu use, just to make sure threading works
fit = model.sample(data=stan_data, chains=1) 

on MacOSX High Sierra (10.13.6), running similar script -

import os
from cmdstanpy  import CmdStanModel
test_path = os.path.join('rs_test')
logistic1_stan = os.path.join(test_path,'logistic1.stan')
redcard_data = os.path.join(test_path,'redcard.json')
logistic1 = CmdStanModel(stan_file=logistic1_stan, cpp_options={"STAN_THREADS":True})
os.environ['STAN_NUM_THREADS'] = '4'
fit = logistic1.sample(data=redcard_data, chains=1)

Utility Monitor app shows logistic running with a single thread.

I’ve got a fix that implements the same logic as CmdStanR -

e.g. sample call looks like:

fit = logistic1.sample(data=redcard_data, chains=3, parallel_chains=3,threads_per_chain=2)

tested via Utility Monitor - not sure how this can be tested via unit test framework - in any case,
adding minimal unit tests and updating docs now.

Do you think this will work? I think it needs to be 1(not True).

yes, it works - the makefiles only check to see if the variable is defined. Even cpp_options={"STAN_THREADS":False} works.

Ok, I need to test this. I think I tested first with None and it didn’t work.