Multiprocessing and/or multithreading problem - CmdStanPy

Hi Stan Community,

I’m trying to use reduce_sum in my Stan model to parallelize computations across multiple threads. While the model runs without errors, I noticed in the Mac Activity Monitor that each chain only uses 1 thread, even though I configured threads_per_chain to be 4 and set STAN_NUM_THREADS=4. I’m using a MacBook Pro 2021, M1 Pro, 16 GB Ram, 8 CPU, Sequoia 15.2, CmdStan 2.32.1, CmdStanPy 1.1.0.

Here is the setup and code I’m using:

  • bernoulli_reduce_sum.stan

functions {
  real partial_sum(array[] int y_slice, int start, int end, real theta) {
    return bernoulli_lpmf(y_slice | theta);
  }
}

data {
  int<lower=0> N;
  int<lower=0, upper=1> y[N];
}

transformed data {
  int grainsize = 1;
}

parameters {
  real<lower=0, upper=1> theta;
}

model {
  theta ~ beta(1,1);  // uniform prior on interval 0,1
  target += reduce_sum(partial_sum, y, grainsize, theta);
}
  • Python Code
from cmdstanpy import CmdStanModel

# Set the number of threads
os.environ['STAN_NUM_THREADS'] = '4'  # 4 threads per chain

# Paths to Stan model and data
stan_file = 'models/examples/bernoulli/bernoulli_reduce_sum.stan'
data_file = 'models/examples/bernoulli/bernoulli.data.json'

# Compile the model
model = CmdStanModel(stan_file=stan_file)

# Here I tried with the 'STAN_THREADS = True', but only run one CPU with a lot of threads.
# model = CmdStanModel(stan_file=stan_file_reduce_sum, cpp_options={"STAN_THREADS": True}, compile="force")

# Run sampling with multi-threading
fit = model.sample(
    data=data_file,
    chains=4,               # Number of parallel chains
    parallel_chains=4,      # Each chain runs in its own process
    threads_per_chain=4     # Number of threads per chain
)

The problem is in Mac Activity Monitor is shown that only 4 cores are used, and, when the STAN_THREADS=True parameter is set, it uses only 1 core, with 8 threads but is slower that the original run. The expected behaviour is to have all 8 cores running 8 chains, and each core, with 4 threads, is it really possible?

Thanks a lot!

Hi, to use stan threads you need to compile the stan model with cpp_options={'STAN_THREADS': True}

So adding the additional argument:
model = CmdStanModel(stan_file=stan_file, cpp_options={'STAN_THREADS': True})

Hi Garren, thanks for your answer. As it is shown in the code (but commented) I already tried that but it doesn’t achieve the desire behavior.

# Compile the model
model = CmdStanModel(stan_file=stan_file)

# Here I tried with the 'STAN_THREADS = True', but only run one CPU with a lot of threads.
# model = CmdStanModel(stan_file=stan_file_reduce_sum, cpp_options={"STAN_THREADS": True}, compile="force")

What do you mean by this?

Using the attached image as example, when I run the sample with STAN_THREADS=True the Acitivty Monitor shows:

Just one Process (Name), with Threads = 8
bernoulli_red… --------- Threads = 8

Thanks!

Here the example.

That is the expected behavior with STAN_THREADS=true. All chains will be run from the same process in different threads, and if they spawn additional threads for things like reduce_sum you will have num threads > num chains.

This is advantageous over one process per thread mainly because the data will not be loaded into memory multiple redundant times. But if you want to avoid it for some reason, you can pass force_one_process_per_chain=True to sample()

Is it possible to have N processes (N=4, for example) and in each core run M threads (M=4, for example)?

That should be possible, though I would expect it to be slightly slower than just running one process with M*N threads

I’m running an instance with N=4 chains and M=5 threads per chain with STAN THREADS=True, so, as I understood, it should run just once process with 20 threads (NxM), isn’t it?

I checked in the Activity Monitor and it’s running one process with only 8 threads, is that a constraint of my computer? should I modify another variable or ENV_VAR?

Thanks for your help and support, btw!

It’s entirely possible that the thread scheduler is deciding only 2 threads per chain are necessary – you might find differences tuning the grainsize. It’s also possible something is misconfiguring the number of threads available – can you share the code that you were running to produce that image?

Of course, here the full code:

import os
import cmdstanpy
from cmdstanpy import cmdstan_path, CmdStanModel, write_stan_json, rebuild_cmdstan
import numpy as np
import pandas as pd
import json
import collections
import time
from datetime import datetime
from multiprocessing import cpu_count
from basics import *

cwd = os.getcwd()
cmdstan_path()

# Set the number of threads to use
os.environ['STAN_NUM_THREADS'] = '19' # big M

def fit_stan_model(df_events,
                   model,
                   dict_model_event,
                   dict_model_columns,
                   tournament,
                   n_partition=0
                  ):
    
    data = get_processed_data(df_events,
                              tournament,
                              model,
                              dict_model_event,
                              dict_model_columns
                              )
    
    # data_file = "/Users/pablogalaz/Dropbox/Doctorado/Tesis/cap_2/PG/stan_inputs/"+model+"/model.data.json"
    data_file = cwd+"/models/"+model+"/model.data.json"
    write_stan_json(data_file, data)

    # stan_file = "/Users/pablogalaz/Dropbox/Doctorado/Tesis/cap_2/PG/stan_inputs/"+model+"/model.stan"
    stan_file = cwd+"/models/"+model+"/model.stan"
    model_built = CmdStanModel(stan_file=stan_file, 
                               cpp_options={"STAN_THREADS": True}, # "STAN_OPENCL": True}, 
                               compile="force"
                               )
    
    print(model_built.exe_info())

    print("Data and stan file were created!")
    
    iter_warmup = 1000 # 0
    iter_sampling = 1000 # 0
    if model == "passes":
        iter_warmup=50 # 200
        iter_sampling=50 # 200
        
    if model == "dribblings":
        iter_warmup=50 # 0
        iter_sampling=50 # 0
        
    if model == "choice":
        iter_warmup=50 # 200
        iter_sampling=50 # 200
    
    # num_cpu_to_use = max(1, cpu_count()-1)
    parallel_chains = cpu_count() # num_cpu_to_use
    chains=4
    threads_per_chain=5

    print("Starting sampling!")
    
    model_sampled = model_built.sample(data=data_file, 
                                       show_progress=True, 
                                       # show_console=True,
                                       iter_warmup=iter_warmup, 
                                       iter_sampling=iter_sampling,
                                       chains=chains,
                                       parallel_chains=parallel_chains,
                                       # force_one_process_per_chain=True,
                                       threads_per_chain=threads_per_chain,
                                       # thin=1 # check this parameter
                                      )
    
    ##### get values from model ######
    df_results = model_sampled.summary()
    df_draws = model_sampled.draws_pd()
    
    print("Saving results!")
    
    #Save results
    path_to_save_results_csv = os.path.join(os.getcwd(),'results',tournament,model+'.csv')
    path_to_save_samples_csv = os.path.join(os.getcwd(),'samples',tournament,model+'.csv')
    
    df_results.to_csv(path_to_save_results_csv)
    df_draws.to_csv(path_to_save_samples_csv)
    
    return model_sampled

models = ["goals","dribblings","choice","passes"]
tournaments = ["France", "Italy", "Spain", "England", "Germany"]

dict_model_event = get_dict_model_event()
dict_model_columns = get_dict_model_columns()

for model in models:
    if model == "passes":
        for tournament in tournaments:
            if tournament == "France":
                print(model, tournament)
                start = time.time()
                print(datetime.now().strftime("%H:%M:%S"))

                df_events = pd.read_csv("data/df_"+tournament+"_events.csv", index_col=0)

                data = get_processed_data(df_events,
                                        tournament,
                                        model,
                                        dict_model_event,
                                        dict_model_columns,
                                        )

                model_sampled = fit_stan_model(df_events=df_events,
                                            model=model,
                                            dict_model_event=dict_model_event,
                                            dict_model_columns=dict_model_columns,
                                            tournament=tournament
                                            )
                end = time.time()
                print(datetime.now().strftime("%H:%M:%S"))
                ex_time = round((end - start)/60, 1)
                df_results = model_sampled.summary()
                print("N_params > 1.1: ", len(df_results[df_results.R_hat>1.1]), "out of", len(df_results), "Exe. Time:", ex_time, "minutes")

Setting this will not do anything, because cmdstanpy will override it according to the other settings:

This is a somewhat odd line, and especially if the cpu count is at least 4, will do nothing:

Following the rest of the control flow in the cmdstanpy code, the STAN_NUM_THREADS variable should end up being set to 20 in your code. So you should end up with one process and somewhere between 4 (one for each chain) and 20 (5 for each chain) threads being used, depending on how much work the reduce sum is really doing.

To confirm CmdStanPy isn’t getting in the way here, you could always run it on the command line directly and see what happens in Activity Monitor