Four chains vs four jobs

Hey everyone,

We’re running into a bit of an issue on our cluster where Stan only seems to want to run two of the four chains in parallel (this is a new issue, Stan has not been recompiled, and the issue is replicated in old models with old data that we knew to run four chains in parallel previously.) My question is: are there any important differences between running four jobs with one chain per job, and running one job with four chains? One potential workaround for us, assuming that there are no important differences, is to just send our jobs to the cluster one chain at a time to take advantage of within-chain parallelization, while enjoying an ad hoc between-chains parallelization through the simultaneous running of multiple jobs.

As always, I appreciate your help!

Hi

There is no difference between running 4 parallel jobs each with a single chain, or running a single job with 4 chains. In essences, this is what stan does. It creates 4 parallel processes which is executed independently from one another. When some chains finish before others (assuming here you have compiled with STAN_THREADS enabled) the cores from the completed chains are redistributed to running chains.

After all chains have finished, diagnostics (rhat and neff) are computed on all 4 chains. If you run a single chains per job, the diagnostics obtained would be that a single chain (which would be the only difference). There should however be a way of combining these csv files such that you can obtain the diagnostics across all jobs (not completely sure though).

It may be worth while to post the errors you are encountering when running you model model with 4 chains. Maybe one of the developers would be able to assist with why this is occurring.

1 Like

If you are using cmdstanr, the function as_cmdstan_fit combines the individual csv files from each of your fits:

1 Like

Unfortunately, this seems like it could be an issue on the cluster side. We’re not experiencing any errors from the Stan program that I could share. I just see in the slurm file that it’s running chains 1 and 3 in parallel, and then, when one of them finishes, starts chain 2, and when one more of them finishes, starts chain 4. So it will only run two chains simultaneously, but throws no errors.

Just to clarify @Garren_Hermanus’s response above. Stan uses a threadpool internally for both running multiple chains and for parallelism within those chains. If you set num_threads >= num_chains, Stan will allocate a thread to each chain. If the model uses within-chain parallelism (e.g., reduce_sum) then additional threads will be requested (if available). Once a chain finishes, then those threads are also made available to the parallelism within other chains.

However, this is only the case when running cmdstan models directly. CmdStanR’s sampling interface was implemented before cmdstan supported this, and so the parallel_chains and threads_per_chain arguments are used to manually spawn separate processes and allocate threads for parallelism. This is in the works to be changed in the next release though.

EDIT: Edited to say that @Garren_Hermanus was accurate, just adding some technical clarification to their response

I use num_thread = -1 based on this documentation in the CmdStan User’s Guide:

“Valid values for num_threads are positive integers and -1. If num_threads is set to -1, all available cores will be used.”

Would you suggest not approaching it this way?

It sounds like the automatic detection is underestimating the number of cores available, so the easy solution is to explicitly specify how many you want to be used

1 Like

Understood! Thank you!

Our cluster administrator has asked me to clarify that we are using MPI, and is suggesting to me that only map_rect, and not reduce_sum can take advantage of it.

In an unrelated question (in that this is not the cause of the loss of between-chain paralellization), we are running a new model that requires us to run two likelihood functions. As such, we are using two reduce_sum wrap functions now – one for each likelihood. My assumption, perhaps naively was that it would run those functions as independent parallel operations, but one-at-a-time. That is, I didn’t expect it to run both reduce_sum functions in parallel, but I expected it to run parallelization within each reduce_sum function, and run the two reduce_sum functions in sequence. But it occurs to me that perhaps when confronted with two reduce_sum functions, Stan simply doesn’t know what to do and the parallelization breaks down entirely. Is this the case?

Our cluster administrator has asked me to clarify that we are using MPI, and is suggesting to me that only map_rect, and not reduce_sum can take advantage of it.

Your admin is correct, MPI is only used by map_rect. See this CmdStan Manual Chapter for more info

But it occurs to me that perhaps when confronted with two reduce_sum functions, Stan simply doesn’t know what to do and the parallelization breaks down entirely. Is this the case?

No, Stan has no issues with multiple reduce_sum functions in a single model. Your initial assumption (independent parallel operations) is correct

1 Like

I see. We have been using reduce_sum on cmdstan-2.30.1-mpi. Would this be expected to fail? I’ve looked at all of the documentation, and I don’t see what needs to be set up in order for reduce_sum to work. If it doesn’t depend on MPI, could there be any steps that I’m missing in correctly preparing Stan to run reduce_sum in parallel? Would reduce_sum be expected to run regardless of whether or not the CmdStan is running on an MPI module? And I’ve been compiling the models with make STAN_THREADS=TRUE ModelFolder/ModelName using openmpi/4.1.3/gcc-11.3.0. Would I have been making problems for the model to utilize reduce_sum?

We have been using reduce_sum on cmdstan-2.30.1-mpi.

Well that changes some things for your original issue. 2.30 is two years old, and predates the parallel handling of the num_chains argument

Would this be expected to fail? I’ve looked at all of the documentation, and I don’t see what needs to be set up in order for reduce_sum to work.

What do you mean by “expected to fail” or reduce_sum not working? What failures or errors are you seeing?

1 Like

So for CmdStan 2.30.1 I would have to use parallel_chains and threads_per_chain?

And when I say reduce_sum failing or not working I mean that the model will run, but there will be no benefit from reduce_sum – that it will run the model completely sequentially, with no within-chain parallelization. We do not get back any errors from our models using reduce_sum, the models run to completion, and the posteriors look good. The concern is that we are wasting cluster resources and our time because reduce_sum is there, but not actually being taken advantage of by the CmdStan

Can you clarify how you’re running the models now (what Cmdstan/cmdstanr call you’re using), and why you think that reduce_sum isn’t being executed in parallel?

1 Like

This is the way it’s called on the cluster:

#! /bin/bash
#SBATCH --ntasks=128
#SBATCH --ntasks-per-core=1
#SBATCH --constraint=x2680
#SBATCH --exclusive
#SBATCH --partition=multinode
#SBATCH --mem-per-cpu=2g

module load openmpi/4.1.3/gcc-11.3.0

srun --mpi=pmix ./ModelName \
sample \
num_chains=4 \
num_samples=1000 \
num_warmup=1000 \
num_threads=-1 \
data file=filename.json \
output file=output.csv \

This is saved as a .sh file, which we then execute using a Terminal command from the model folder.

I’ve been advised by the cluster administrator that they are not seeing any evidence of the model benefiting from within-chain parallelization.

This is what they said:

“Speaking of which. My scaling experiments aren’t complete but I alread
see that runtime DECREASES when you go from 1 node (28 tasks, 56 CPUs)
to 4 nodes (112 tasks, 224 CPUs) for a single, relatively short chain.
Your model is probably getting zero benefit from the added
parallelism. That’s not good for cluster efficiency and negatively
affects the priority of your future jobs.”

This is our model code:

// Modeling code: fit the CASANDRE parameters for 1. a perceptual task with multiple confidence conditions and sensitivity values, 2. a value-based decision-making task, and 3. a correlation coefficient in a Bayesian hierarchical model

functions {

 //Custom quantile function for lognormal distribution
 matrix log_normal_qf(vector x, vector m, vector s) {

  //takes a vector sampling the x-axis in terms of cumulative density, plus vectors of mu and sigma parameters
  return exp(rep_matrix(s' * sqrt(2), size(x)) .* rep_matrix(inv_erfc(-2 * x + 2), size(m)) + rep_matrix(m', size(x)));

 }

 //Custom copula function
 real normal_copula(real u, real v, real rho) {

  real rho_sq = square(rho);
  
  return (0.5 * rho * (-2. * u * v + square(u) * rho + square(v) * rho)) / (-1. + rho_sq) - 0.5 * log1m(rho_sq);

 }

 //Likelihood model for CASANDRE
 real casandre_log(array[] matrix resps, vector guess, matrix sds, matrix sm, vector nconf, vector pconf) {

  //declare and initialize log likelihood variable
  real llhC;
  llhC = 0;

  //loop over participants to calculate log likelihood
  for (n in 1:size(resps)) {

   //declare the index-sizing variable
   int m[rows(resps[n])];

   //build the logical vector of non-zero trials
   for (tt in 1:size(m)) {
      m[tt] = sum(resps[n][tt]) == 1;
   }

   //declare the likelihood variable
   matrix[rows(sm),4] lhC;

   //check if subject has any non-zero data before running
   if (sum(resps[n]) != 0) {

    //declare incrementing and index variables
    int t;
    int ind[sum(m)];

    //initialize incrementing variable
    t = 1;

    //declare and calculate mu parameters for normal cdfs
    matrix[rows(sm),rows(sds)] avgs;
    avgs = rep_matrix(sm[,n],rows(sds)).*rep_matrix(sds[,n]',rows(sm));
  
    //loop over trials
    for (tr in 1:rows(sm)){

     //check if trial has any non-zero data before running
     if (sum(resps[n][tr]) != 0) {

      //declare sample vector
      matrix[3,rows(sds)] raws;

      //sample the cumulative density of normals along the transformed x-axis for each confidence bin
      for (rws in 1:rows(sds)) {
       raws[1,rws] = normal_cdf(-nconf[n],avgs[tr,rws],sds[rws,n]);
      }
      for (rws in 1:rows(sds)) {
       raws[2,rws] = normal_cdf(0,avgs[tr,rws],sds[rws,n]);
      }
      for (rws in 1:rows(sds)) {
       raws[3,rws] = normal_cdf(pconf[n],avgs[tr,rws],sds[rws,n]);
      }

      //declare the cumulative likelihood variable
      vector[3] ratiodist;

      //take the mean of the sampled densities
      ratiodist[1] = mean(raws[1]);
      ratiodist[2] = mean(raws[2]);
      ratiodist[3] = mean(raws[3]);

      //implement a lapse rate parameter to absorb unmodeled noise, and calculate likelihoods of each choice possibility in the trial
      lhC[tr,1] = ((guess[n]/4) + ((1-guess[n])*ratiodist[1]));
      lhC[tr,2] = ((guess[n]/4) + ((1-guess[n])*(ratiodist[2]-ratiodist[1])));
      lhC[tr,3] = ((guess[n]/4) + ((1-guess[n])*(ratiodist[3]-ratiodist[2])));
      lhC[tr,4] = ((guess[n]/4) + ((1-guess[n])*(1-ratiodist[3])));

      //add this trial to the index
      ind[t] = tr;

      //increment the index
      t = t + 1;

     }

    }

    //multiply the choice matrix by the log of the likelihood matrix
    llhC += sum(columns_dot_product(resps[n][ind], log(lhC[ind])));

   }
  
  }

  //return the log likelihood for all data given the sampled parameters
  return llhC;
  
 }

 //Partial sum function
 real partial_sum_casandre_log(array[] matrix slice_n_resps, int start, int end, vector guess, matrix sds, matrix sm, vector nconf, vector pconf) {

 //dynamically slice data into the log likelihood function
 return casandre_log(slice_n_resps, guess[start:end], sds[:,start:end], sm[:,start:end], nconf[start:end], pconf[start:end]);

 }
  
}
data {

 //declare the number of subjects
 int ns;

 //declare the number of trials in the largest unpadded data set for each experiment
 int nt_p;
 int nt_v;

 //declare the number of conditions and number of contrast levels in the perceptual domain
 int ncond;
 int ncont;

 //declare the response matrix (one subject per array) for perceptual domain
 array[ncond,ncont,ns] matrix[nt_p,4] resps_p;

 //declare the response matrix (one per subject) for the value-based domain
 array[ns] matrix[nt_v,4] resps_v;

 //declare the experimental values matrices (one subject per matrix in each array) for perceptual domain
 array[ncond,ncont] matrix[nt_p,ns] vals_p;

 //declare the experimental values matrices (one subject per matrix) for value-based domain
 matrix[nt_v,ns] vals_v;

 //declare the number of x-axis samples and the vector sampling the x-axis in terms of cumulative density
 int sampn;
 vector[sampn] sampx;

}
parameters {

 //Perceptual parameters
 vector<lower=0,upper=1>[ns] guess_p; //lapse rate
 matrix<lower=0>[ncont,ns] sens_p; //stimulus sensitivities
 matrix[ncont,ns] crit_p; //decision criteria
 vector<lower=0>[ns] meta_p; //meta-uncertainties
 matrix<lower=0>[ncond,ns] nconf_p; //negative confidence criteria
 matrix<lower=0>[ncond,ns] pconf_p; //positive confidence criteria

 //Value-based parameters
 vector<lower=0,upper=1>[ns] guess_v; //lapse rate
 vector<lower=0>[ns] sens_v; //stimulus sensitivities
 vector<lower=0>[ns] meta_v; //meta-uncertainties
 vector<lower=0>[ns] nconf_v; //negative confidence criteria
 vector<lower=0>[ns] pconf_v; //positive confidence criteria

 //Perceptual hyperparameters
 vector[ncont] ssm_p; //mu hyperparameters of each contrast level's stimulus sensitivity lognormal prior
 vector<lower=0>[ncont] sss_p; //sigma hyperparameters of each contrast level's stimulus sensitivity lognormal prior
 vector[ncont] scm_p; //mu hyperparameters of each contrast level's decision criterion normal prior
 vector<lower=0>[ncont] scs_p; //sigma hyperparameters of each contrast level's s decision criterion normal prior
 real mum_p; //mu hyperparameters of each condition's meta-uncertainty lognormal prior
 real<lower=0> mus_p; //sigma hyperparameters of each condition's meta-uncertainty lognormal prior
 vector[ncond] nccm_p; //mu hyperparameters of each condition's negative confidence criterion lognormal prior
 vector<lower=0>[ncond] nccs_p; //sigma hyperparameters of each condition's negative confidence criterion lognormal prior
 vector[ncond] pccm_p; //mu hyperparameters of each condition's positive confidence criterion lognormal prior
 vector<lower=0>[ncond] pccs_p; //sigma hyperparameters of each condition's positive confidence criterion lognormal prior

 //Value-based hyperparameters
 real ssm_v;
 real<lower=1> sss_v;
 real mum_v;
 real<lower=1>mus_v;
 real nccm_v;
 real<lower=1> nccs_v;
 real pccm_v;
 real<lower=1> pccs_v;

 //Correlation parameter
 real<lower=-1,upper=1> rho;

}
model {

 //Perceptual hyperpriors
 ssm_p ~ normal(0,1);
 sss_p ~ lognormal(0,1);
 scm_p ~ normal(0,1);
 scs_p ~ lognormal(0,1);
 mum_p ~ normal(0,1);
 mus_p ~ lognormal(0,1);
 nccm_p ~ normal(0,1);
 nccs_p ~ lognormal(0,1);
 pccm_p ~ normal(0,1);
 pccs_p ~ lognormal(0,1);

 //Value-based hyperpriors
 ssm_v ~ normal(0,1);
 sss_v ~ lognormal(0,1);
 mum_v ~ normal(0,1);
 mus_v ~ lognormal(0,1);
 nccm_v ~ normal(0,1);
 nccs_v ~ lognormal(0,1);
 pccm_v ~ normal(0,1);
 pccs_v ~ lognormal(0,1);

 //Perceptial priors
 guess_p  ~ beta(1,197.0/3.0);
 meta_p ~ lognormal(mum_p,mus_p);

 for (cont in 1:ncont) {
  sens_p[cont] ~ lognormal(ssm_p[cont],sss_p[cont]);
  crit_p[cont] ~ normal(scm_p[cont],scs_p[cont]);
 }

 for (cond in 1:ncond) {
  nconf_p[cond] ~ lognormal(nccm_p[cond],nccs_p[cond]);
  pconf_p[cond] ~ lognormal(pccm_p[cond],pccs_p[cond]);
 }

 //Value-based priors
 guess_v ~ beta(1,197.0/3.0);
 sens_v ~ lognormal(ssm_v,sss_v);
 meta_v ~ lognormal(mum_v,mus_v);
 nconf_v ~ lognormal(nccm_v,nccs_v);
 pconf_v ~ lognormal(pccm_v,pccs_v);

 //Perceptual likelihood model (with local variable calculations)
 for (cond in 1:ncond) {

  //declare the transformed x-axis matrices
  matrix[sampn,ns] xtrans_p;
  matrix[sampn,ns] sds_p;

  //calculate the transformed x-axis matrices
  xtrans_p = log_normal_qf(sampx,-0.5*log1p(meta_p.^2),sqrt(log1p(meta_p.^2)));
  sds_p = 1./xtrans_p;

  for (cont in 1:ncont) {

   //check if condition and contrast level combination has non-zero data
   if (sum(abs(vals_p[cond][cont])) != 0) {

    //declare matrices for calculating each contrast level's transformed data
    array[ncont] matrix[nt_p,ns] om_p;
    array[ncont] row_vector[ns] cm_p;
    array[ncont] matrix[nt_p,ns] sm_p;

    //orientations and decision criteria in terms of standard deviations
    om_p[cont] = vals_p[cond][cont].*rep_matrix(sens_p[cont],nt_p);
    cm_p[cont] = crit_p[cont].*sens_p[cont];
    
    //transform the data
    sm_p[cont] = om_p[cont]-rep_matrix(cm_p[cont],nt_p);

    //hand the data and parameters to the reduce sum wrap function for dynamic within-chain parallelization
    target += reduce_sum(partial_sum_casandre_log, resps_p[cond][cont], 1, guess_p, sds_p, sm_p[cont], nconf_p[cond]', pconf_p[cond]');

   }

  }

 }

 //Value-based likelihood model (with local variable calculations)

 //declare the transformed x-axis matrices
 matrix[sampn,ns] xtrans_v;
 matrix[sampn,ns] sds_v;

 //calculate the transformed x-axis matrices
 xtrans_v = log_normal_qf(sampx,-0.5*log1p(meta_v.^2),sqrt(log1p(meta_v.^2)));
 sds_v = 1./xtrans_v;

 //declare matrices for calculating transformed data
 matrix[nt_v,ns] om_v;
 matrix[nt_v,ns] sm_v;

 //oreintations in terms of standard deviations
 om_v = vals_v.*rep_matrix(sens_v',nt_v);

 //transform the data
 sm_v = om_v;
 
 //hand the data and parameter to the reduce sum wrap function for dynamic within-chain parallelization
 target += reduce_sum(partial_sum_casandre_log, resps_v, 1, guess_v, sds_v, sm_v, nconf_v, pconf_v);

 //Correlation copula
 real y_p;
 real y_v;

 for (n in 1:ns) {

  y_p = inv_Phi(lognormal_cdf(meta_p[n],mum_p,mus_p));
  y_v = inv_Phi(lognormal_cdf(meta_v[n],mum_v,mus_v));

  target += normal_copula(y_p,y_v,rho);

 }

}

The first thing to emphasise here is that you cannot parallelise across multiple nodes with reduce_sum - parallelising over multiple nodes requires MPI (and map_rect).

Additionally, rather than setting num_threads=-1 and requesting higher and higher CPU counts, it’s more effective to simply compare num_threads=1 with num_threads=4 (or some other small value). You can also use profiling statements to explicitly evaluate computation times.

Also, what is the value of ns in this model? Your reduce_sum call is parallelising over an array of length ns - so you won’t see any benefit from requesting more than ns cores per chain

1 Like

I see. Yes. This is utilizing multiple nodes, which serve us with 128 CPU. So I assume that reduce_sum is either doing nothing, or is just giving us whatever it can within the CPU granted to it by a single node?

ns is our number of subjects, which runs as high as 136 for our experiments.

I can see about switching over to map_rect to take advantage of multiple cores if that’s our only option, but from what I recall, that has a steeper learning curve than reduce_sum. Am I looking at a very heavy rewrite of the code in order to take advantage of map_rect and MPI across multiple cores? I’m guessing it’s not as easy as substituting in map_rect where reduce_sum once was and changing a few things around.

ns is our number of subjects, which runs as high as 136 for our experiments.

I can see about switching over to map_rect to take advantage of multiple cores if that’s our only option

Given that you only have 136 elements to parallelise over, you don’t need that many cores (unless each element is extremely computationally expensive). You have to keep in mind that there is non-trivial overhead in copying parameters to each thread, so having a thread per element isn’t always faster.

I’d really strongly suggest just starting with a much smaller number of requested cores, so that you can have them all in a single node, and explicitly specifying num_threads.

1 Like

I’m going to give it a shot on one node. It looks like I can ask for up to 128 cpu on a single node if I use the correct one. I’ll match that with num_thread = 128 in the batch code.

It seems that I have no choice, at least as of now, but to use CmdStan-2.30.1-mpi, and to compile my code using the openmpi module with STAN_THREADS=TRUE, because there is no non-mpi version of CmdStan on our cluster. With MPI not being utilized, this shouldn’t change anything about the behavior of the model or reduce_sum, right?

Otherwise, I may have to see if it’s possible to request that the current non-mpi CmdStan module be added to the cluster, though I don’t know a. whether these type of requests are honored, or b. how long it would take for it to be added if a request is accepted, so I may be stuck with CmdStan-2.30.1-mpi for the time being. That’s why I ask.

I’m going to give it a shot on one node. It looks like I can ask for up to 128 cpu on a single node if I use the correct one. I’ll match that with num_thread = 128 in the batch code.

Again, that is likely to be far far too many for only 136 elements. If you want to see if there is a difference with reduce_sum being used, just start with 1 chain and 1 thread and then compare to 1 chain and 2 threads.

There is only a benefit from parallelism if the amount of work completed by each thread outweighs the cost of spawning, preparing, and then cleaning up the thread.

You can also use the profiling blocks I linked above to investigate in more detail which parts of the likelihood are using the most time computationally, and how much benefit is being seen from parallelisation.

With MPI not being utilized, this shouldn’t change anything about the behavior of the model or reduce_sum, right?

Yes, as you are not using map_rect there would be no impact from MPI.

1 Like