CmdStanPy; compiling in parallel on Dask

Greetings,

We’ve set up a workflow to run Stan, with Prefect and Dask and are experiencing a lot of flakyness from the running of Stan in this parallel way. Dask workers run its tasks inside a single container, meaning there exists a possibility of shared process memory/semaphores, shared file descriptors and of course there’s a shared file system.

We are running about 7 parallel work units in a single docker container.

We get errors like these: ValueError('Unable to compile Stan model file: /opt/app/stan/lognormal.stan.') and

RuntimeError("
Error during sampling:
Command and output files:
RunSet: chains=4
 cmd:
    ['/opt/app/stan/lognormal', 'id=1', 'random', 'seed=54321', 'data', 'file=/tmp/tmp_n7k2_fl/bg5uamfe.json', 'output',
    'file=/tmp/tmp_n7k2_fl/lognormal-202111010332-1-1yrrium8.csv',
    'method=sample', 'num_samples=1000', 'num_warmup=500',
    'algorithm=hmc', 'engine=nuts', 'max_depth=15', 'adapt',
    'engaged=1', 'delta=0.9']
   retcodes=[-1, -1, -1, -1]
   csv_files:
    /tmp/tmp_n7k2_fl/lognormal-202111010332-1-1yrrium8.csv
    /tmp/tmp_n7k2_fl/lognormal-202111010332-2-4g2xu7tm.csv
    /tmp/tmp_n7k2_fl/lognormal-202111010332-3-8xvcc0yi.csv
    /tmp/tmp_n7k2_fl/lognormal-202111010332-4-tnh9qg9z.csv") 

Since all the data is in memory, so there’s not a specific use-case for us to go to disk, besides how CmdStanPy works.

How can I make this computation / compilation roboust?

I have tried with a container-wide mutex:

class Snippet:
    def __init__(self, ...):
        self.__model = CmdStanModel(stan_file=stan_file, compile=False)

# ... and then

    @lazy_property
    def compiled_model(self) -> CmdStanModel:
        try:
            with ILock(f"{self.__class__.__name__}", timeout=80):
                self.__model.compile()
                return self.__model
        except ILockException as e:
            raise e

but without success.

Also, any insights on what concurrency invariants I have to take into accounts would be appreciated!

2 Likes

Compiling uses a simple subprocess.Popopen command to run stanc3 in a shell. Sampling is a bit more complicated, using a ThreadedProcessPool - I’m not sure how either of these work with dask. With cmdstanpy 1.0.0 (currently released as a release-candidate or available on github) these functions feature a show_console argument which should stream the output of stdout/stderr directly. This could be helpful for further debugging

1 Like

Tagging @bparbhu, as I think you might know something about Dask here?

1 Like

I’m not sure that there’s a good fit between what Dask offers and what the Stan inference engines need - at least not given the current CmdStan and services interfaces. CmdStanPy just wraps CmdStan and is completely disk-file-centric.

CmdStan 2.28 introduced its own mechanism for more efficient multi-chain parallelization via argument num_chains and num_threads, which uses the Intel TBB scheduler for threading. This feature shares model’s data and transformed data across all chains. Stan constructs reduce_sum and map_rect further allow for within-chain parallelization.

cf - parallelization: 14 Parallelization | CmdStan User’s Guide
num_chains, num_threads: 4 MCMC Sampling | CmdStan User’s Guide

that said, it would be great to rethink the Stan services and user-facing command API to take advantage of Dask.

1 Like

@WardBrian 's suggestion to capture more log streams from cmdstanpy sounds like a good idea. It’s possible there might be other clues inside Dask log streams as well.

Another possible source of information to help diagnose what is going on precisely could be to try to reproduce the problem with the processes instrumented with some kind of tracing (strace? perf events?) that could log what files or syscalls the offending processes were making, although this might be tricky to set up and interpret.

speculation:

The behaviour might be influenced by exactly how Dask (and Prefect?) are configured to execute the tasks. The Dask docs suggest each dask-worker can be configured to start a number of worker processes per worker and the thread pool size per worker process: Worker — Dask.distributed 2021.10.0 documentation

How many processes are the dask-workers configured with? What is the thread pool size? Do the symptoms disappear if you configure each dask-worker to use a thread pool of size 1?

Can the problems happen on the first task processed by a single Dask worker process, or does it only happen after 2+ tasks are processed by the same worker?

I’m not familiar with Dask, but other task execution frameworks i’m more familiar with (temporal/cadence ; celery) often expose lots of ways to configure how tasks are executed. E.g. for temporal there’s config options to restrict how many “activities” (aka tasks) can be run concurrently by a single worker process – hopefully the Dask thread pool size per worker can be used to apply a similar constraint.

The older python Celery task distribution framework also exposed a config option that could be used to automatically restart a worker process after every n task executions – it could be used to mitigate problems where long-running celery workers could get into bad states (due to memory leaks in your task code or libraries etc) if there wasn’t a way to actually fix the problem. If Dask supports a similar option, I wonder what would happen if worker processes were configured to restart after every task execution?

1 Like

Hey all,

So there was an issue we opened for integrating CmdStanPy with Dask here

I’m still interested in working on this but would definitely need some help even with the planning because there’s probably a lot of work to do around this.

I think this also requires more thought and planning put into how someone would integrate even pieces of Dask with CmdStan and what pieces you should look to integrate depending on what you’re trying to achieve.

There are some examples on using PyStan with Dask on the forum here as well.

Though, with all of the work ahead there would be plenty of benefits for integrating any Stan framework into the Dask ecosystem.

One of the advantages would be dealing with easy access to implementing distributed workloads with Cloud based systems and vendors like Coiled and Saturn Cloud.

Also you would essentially take care of a huge amount of dev-ops ground work for setting up things like snowflake clusters and also kubernetes orchestration though dask-kubernetes clusters.

Another advantage would be that you can run heavy-memory bogged down stan programs much easier.

Though, the biggest challenge I see is integrating a file-system based framework that does have really good parallel and distributed computing capabilities and making sure things translate well-enough so that you can take advantage of dask’s parallel computing capabilities and distributed computing abilities which make more of libraries leaning towards in-memory workloads.

I think lazy evaluation is one approach to try and take advantage of dask’s benefits towards a CmdStan model but you can look at how joblib might help you out as there’s a backend for dask with that sci-kit learn-based framework that can pass to the dask executor for prefect.

I know because I’m also implementing an LDA model at work in scikit learn, gensim, and stan in the same way.

There are also dask delayed functions and dask future’s objects which you can make use of, if you design a framework around that from the ground up. Though a safe bet would be to use dask delayed functions around libraries external to dask that don’t have a dask-based version.

You might also want to see how the dask executor for prefect and lazy eval play when it comes to simple stan programs first. The executor around prefect is great but the airflow dask executor is also more mature and might play better.

What I would recommend is that it might be worth thinking of a stan interface that might not be CmdStan but something based off of CmdStan where the file-system is replaced with in-memory generations of outputs and also something built from the ground up purposely for dask and most likely numba as well.

Something else to consider would also be whether or not dask is the best possible solution for Bayesian computing workloads. Something like Vaex (out-of-cpu-bound) would also seem appropriate for Bayesian workloads but there isn’t something like that exists for Dask (out-of-memory) or the fact that they don’t have an ecosystem-friendly sandbox as dask.

Though, ultimately if you get this right you can do a lot as you would open Stan programs up to have easier access to more diverse hardware solutions and also give people things like GPU-clusters and extremely easy access to EC2 instances that are spun up on the fly and can run on your laptop.

Well, that’s my 2 cents. If anyone wants to work on something like this let me know. We can talk more about it on the weekend or something.

-Brian

2 Likes

@haf , i’ve tried to reproduce the symptoms you reported.

First, I tried to see how CmdStanPy behaved when run without prefect or dask, simply using python3 stdlib’s concurrent.futures to compile and sample many CmdStanPy models in parallel using a concurrent.futures.ProcessPoolExecutor or a concurrent.futures.ThreadPoolExecutor on a single machine. This all worked fine without any crashes or instability when I ran experiments submitting 1000 tasks to sample from a compiled model (i was surprised – good work cmdstanpy team).

Next, I tried to use dask distributed instead of concurrent.futures to distribute and execute the work. Depending on how I structure the task execution in dask, I can reproduce some of the same symptoms you reported, provided I do something like the following:

  1. tell dask to execute a function that compiles a stan model. This executes on worker machine A, say
  2. tell dask to execute another function that samples from a compiled stan model, using the model returned from the previous step. This happens to execute on worker machine B, say

This will fail immediately (without any clear or actionable error message) since CmdStanPy model objects are not self-contained data values but instead seem to be thin wrappers that refer to a bunch of filesystem locations – and dask worker machines A and B don’t have a shared filesystem – so the CmdStanPy code is unable to read anything that was written to disk earlier.

It’s also possible to observe similar failures in a single machine and single filesystem, without using dask at all – e.g. get CmdStanPy to compile a model reading the stan model file from some user specified temp directory, then delete that temp directory, then try to fit and do something with the model.

Is it possible that the symptoms you experience are due to CmdStanPy model compilation and sampling executing on different machines that have different filesystems (unlike what CmdStanPy assumes). Or alternatively, is it possible that you are writing the Stan model file into a temp directory during the compilation phase but then deleting that temp directory before the model has finished sampling?

1 Like

I got a proof-of-concept demo working that successfully distributes work of compiling and sampling CmdStanPy models between multiple dask distributed worker processes (running in containers with isolated filesystems etc):

To get this to work I can’t literally pass CmdStanPy models across the network between different worker processes using dask, as those models are not self contained and no longer have access to the files they assume persist in the filesystem of the worker process that compiled them, but what does appear to work (and is a complete hack relying on CmdStanPy implementation details) is distributing a complete copy of the compiled stan model binary – then some code can “fixup” the received CmdStanPy model before sampling by writing the stan model binary into a new temp directory in the new filesystem, and modifying the CmdStanPy model to refer to that.

2 Likes