Cmdstanpy ValueError with multiprocess

I am using cmdstanpy in a package I put together.

I am able to run the program in sequence, but when I try to run in parallel I get the following error.

ValueError: line 7977: bad draw, expecting 18 items, found 14

import numpy as np
import pandas as pd
import tablespoon as tbsp
from multiprocessing import Pool

# pull and clean data
# columns must have the columns "ds" and "y"
# this time series is at the daily level. the seasonality is 7 days.
def pred_things(df):
    sn = tbsp.Snaive()
    df_pred = sn.predict(df, horizon=10, frequency="D", lag=7, uncertainty_samples=8000)
    return df_pred


df_ca = (
    pd.read_csv("https://storage.googleapis.com/data_xvzf/m5_state_sales.csv")
    .query("state_id == 'CA'")
    .rename(columns={"date": "ds", "sales": "y"})
)
df_tx = (
    pd.read_csv("https://storage.googleapis.com/data_xvzf/m5_state_sales.csv")
    .query("state_id == 'TX'")
    .rename(columns={"date": "ds", "sales": "y"})
)
list_df = [df_ca, df_tx]
# this works
list(map(pred_things, list_df))
# this does not
with Pool() as P:
    parallel_prod = list(P.map(pred_things, list_df))

print(parallel_prod)

The backend lines of code used in the package are here

Here’s my understanding, based on trying to get CmdStanPy to work with dask distributed:

CmdStanPy is very filesystem-oriented: it writes Stan model code into the given directory, transpiles the model code to c++ code, compiles that c++ code into an executable. The executable writes output files as CSV data into the directory.

If you want to run multiple instances of a Stan model using CmdStanPy in parallel, the simplest way to make it work without crashing will be to ensure that a unique directory is used for each CmdStanPy of the parallel operations – e.g. if you have n independent things to predict in your input list_df, allocate n empty temporary working directories.

Otherwise, with a single shared directory, each of the n CmdStanPy operations will each assume they have exclusive access to that shared directory, and will overwrite each other’s CSV output, and perhaps also overwrite each others C++ code or model binary executables! (ugh).

This really derails any attempt to write simple python code that assumes model fitting and prediction using CmdStanPy is pure-functional and has no side effects: CmdStanPy is not at all pure functional, it is heavily coupled to an assumed shared resource of the filesystem, so if writing parallel code you need to carefully manage and coordinate use of that shared filesystem resource to use CmdStanPy correctly.

I believe you could achieve that by allocating a unique temporary directory using python’s tempfile — Generate temporary files and directories — Python 3.10.0 documentation . I believe the most reliable and correct way to do this is to write a copy of the same stan model code into each temp directory, and include the unique temp directory in the stan_file argument to CmdStanModel, and then later when sampling you would also need to specify unique directories for output_dir during the sample method call.

One downside from this simple and reliable approach is that you will end up wastefully compiling the same Stan model n times, which will be much slower than compiling the stan model once and then executing it n times with n different data sets, writing the output CSV files to n different output directories. Perhaps one of the CmdStanPy developers may be able to suggest a reliable and supported way to use the CmdStanPy API that would avoid this.

If you are curious, I have some experimental code from a month ago where I tried to get a similar thing to work, so the same CmdStanPy model could be compiled once and then run in parallel across a distributed cluster of machines using Dask, that don’t have a shared filesystem. To get that to work, I added some code that would use CmdStanPy to generate a compiled stan model executable, then read the binary data for that executable from disk – the binary data then gets sent over the wire by dask to a new worker machine – the worker machine is then responsible for rewriting the model executable binary into a new temporary directory in the new filesystem when it came time to sample from the model. The technique is completely independent of Dask, so you might be able to reuse it, or something similar:

What I got working also seems like a complete hack that depends on CmdStanPy implementation details, and might break in future if a new version of CmdStanPy or CmdStan is released that make internal changes in how things are arranged on disk. I wouldn’t trust using this technique in production code without a lot of careful testing and code review.

It’d be safer and more reliable, but slower, to not attempt to trick CmdStanPy into reusing the same model executable, and just let it compile and sample from n unrelated models with n different data sets that just so happen to have the same stan model code.

3 Likes

This is a great response. Thanks for the detailed description.

I am going to start with the laziest thing first by chatting with the devs. While that log is in the fire I am going to study your implementation.

hi Alex,

I looked at your code, and if you supply a unique chain_id to your predict function that gets passed along to your call CmdStanPy sample method here: tablespoon/forecasters.py at 61d23c0f44e418e9b840a9356be833f2f992bfa4 · alexhallam/tablespoon · GitHub,
then the resulting CSV filenames will be unique.

if this helps, below is something I wrote a long time ago when I was playing around with a cluster running SLURM. this is a proof-of-concept demo of how to compile a Stan program once on the head node and then run 100 chains on the cluster, saving all CSV output files in a shared directory. the key is that each run has a unique chain id, thus the output files don’t get clobbered.

slurm script - assumes that compiled model exe file exists

# distribute N chains, M chains per node on the cluster

# set up all slurm directives
# common args:
#  - cmdstan_path, model_exe, seed, output_dir, data_path
# unique arg:  chain_id  - jobs array number:  %a

#!/bin/bash
#SBATCH --job-name=cmdstanpy_runs
#SBATCH --output=cmdstanpy_stdout-%j-%a.out
#SBATCH --error=cmdstanpu_stderr-%j-%a.err
#SBATCH --nodes=20
#SBATCH --cpus-per-task=1
#SBATCH -a 0-100
python run_chain cmdstan_path model_exe seed chain_id output_dir data_path

program run_chain.py

# User CmdStanPy to run one chain
# Required args:
# - cmdstanpath
# - model_exe
# - seed
# - chain_id
# - output_dir
# - data_file

import os
import sys

from cmdstanpy.model import CmdStanModel, set_cmdstan_path, cmdstan_path

useage = """\
run_chain.py <cmdstan_path> <model_exe> <seed> <chain_id> <output_dir> (<data_path>)\
"""

def main():
    if (len(sys.argv) < 5):
        print(missing arguments)
        print(useage)
        sys.exit(1)
    a_cmdstan_path = sys.argv[1]
    a_model_exe = sys.argv[2]
    a_seed = int(sys.argv[3])
    a_chain_id = int(sys.argv[4])
    a_output_dir = sys.argv[5]
    a_data_file = None
    if (len(sys.argv) > 6):
        a_data_file = sys.argv[6]

    set_cmdstan_path(a_cmdstan_path)
    print(cmdstan_path())

    mod = CmdStanModel(exe_file=a_model_exe)
    mod.sample(chains=1, chain_ids=a_chain_id, seed=a_seed, output_dir=a_output_dir, data_file=a_data_file)

if __name__ == "__main__":
    main()
2 Likes

Thanks! I was able to add the chain_ids parameter to my function and pass that to sample. Seems to be working great!

1 Like