Trouble with PyStan 3 and python multiprocessing

Trouble running stan estimates in parallel

I am using python’s multiprocessing module to run a number of Stan estimates at once. I actually manage the pool of jobs, launching new ones when old ones finish, e.g. so I can have only 5 or 10 at a time running on a 56-core machine.
It’s hard to diagnose what is going on, but some jobs finish initially and then they just slow down. Soon they hang. The processors go idle and the remaining sampling jobs stall completely.

pystan 2 ran fine in this environment with many jobs at once. I’ve just rewritten things for pystan 3. I’m running on a Ubuntu 20.04 (default) server with 0.7TB of RAM and 56 cores. x86-64. The jobs I’m testing on are small; they should take 30-60 seconds each (and the first couple do).

The jobs are all running the same model, already built, so there is no building going on in parallel.
I could try to build a MWE, but I use my own module (parallel.py · master · Christopher Barrington-Leigh / cpblUtilities · GitLab) for managing jobs, so before I invest tonnes of time (it may be easier for me to go back to pystan2) I would like to know to what extent multiprocessing should work with the pystan3 model, or if others have had success running many pystan3 fits at once.

Thank you!
Chris

Chris,

Can you provide some code to show what it looks like when you call your many pystan jobs via your parallel script? This sort of hanging behavior smells quite like a relatively common issue with forking in Python’s multiprocessing leading to processes that have inadvertently acquired locks that will never release.

It’s somewhat hard to disentangle it, but you mentioned

By this do you mean you build the model before you create the multiprocess? And then you just run model.sample() within the multiprocessing?

I would try the following as diagnostic steps:

  1. Try running one chain per model – pystan (really httpstan) will run multiprocessing under the hood, and I wonder if the nested multiprocessing (if it’s happening is causing issues).
  2. If you aren’t, try building the model within each process, pystan3 includes model caching now so it shouldn’t really be a performance hit to do it, and multiprocessing is less temperamental when you can bundle everything you need within the function called by the multiprocessing.

I’m sure I’ve misunderstood something about what you’re doing, but these are at least initial thoughts you could try.

Thank you Andrew!

Here is what I see when I type Ctrl-C for my process that’s ground to a halt and has been doing absolutely nothing for 8 hours:

Process Process-31:
Process Process-27:
Process Process-24:
Process Process-13:
Process Process-22:
Process Process-16:
Process Process-18:
Process Process-21:
Process Process-19:
Process Process-12:
Process Process-15:
Traceback (most recent call last):
  File "./stanmaster_syntheticdata.py", line 1212, in <module>
    runFunctionsInParallel(fns, names=names,  parallel=defaults['server']['parallel'], maxAtOnce=MAX_AT_ONCE, # Four cores, or 8, it seems, per estimate
  File "/home/meuser/bin/cpblUtilities/parallel.py", line 134, in runFunctionsInParallel
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/home/meuser/bin/cpblUtilities/parallel.py", line 342, in launch_jobs
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
    self.updateStatus()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
Traceback (most recent call last):
  File "/home/meuser/bin/cpblUtilities/parallel.py", line 365, in updateStatus
KeyboardInterrupt
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
    self.status[ii] = self.jobs[ii].status()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
KeyboardInterrupt
KeyboardInterrupt
    if not self.queue.empty():
  File "/usr/lib/python3.8/multiprocessing/queues.py", line 123, in empty
    return not self._poll()
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 257, in poll
Traceback (most recent call last):
    return self._poll(timeout)
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 424, in _poll
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
    r = wait([self], timeout)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 318, in _bootstrap
    util._exit_function()
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/connection.py", line 925, in wait
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/util.py", line 357, in _exit_function
    p.join()
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 47, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.8/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
KeyboardInterrupt
    selector.register(obj, selectors.EVENT_READ)
  File "/usr/lib/python3.8/selectors.py", line 352, in register
    key = super().register(fileobj, events, data)
  File "/usr/lib/python3.8/selectors.py", line 238, in register
    key = SelectorKey(fileobj, self._fileobj_lookup(fileobj), events, data)
  File "/usr/lib/python3.8/selectors.py", line 225, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/usr/lib/python3.8/selectors.py", line 33, in _fileobj_to_fd
    if isinstance(fileobj, int):
KeyboardInterrupt

And even after I’ve kill python, htop shows a couple of hundred of zombie processes, which look like my command line call (by zombie I mean zero cpu use):

/usr/bin/python3 ./stanmaster_syntheticdata.py estimate --experiment=fig8 --sampleN=302 --allow-cheating --max-at-once=11

The actual set-up of multiprocessing is like this:

        fns,names =[],[]
        for ii,row in dfe.iterrows():
            pdict = row[pcols].to_dict(into=OrderedDict)
            print('{}\n{}\nN={}\nmodel={}\n{}'.format('='*80 , row, SAMPLE_N, MODELN, '='*80)) 
            fns+= [[run_synthetic_dataset, [pdict], dict( model=MODELN, N=SAMPLE_N, recreate_data=False, ologit = False, stata=False, nIter=1000, allow_cheating= ALLOW_CHEATING)]]
            names +=['V{}{}-stanmixture-{}'.format(list(row[pcols].values), row['hashname'], SAMPLE_N)]

        runFunctionsInParallel(fns, names=names,  parallel=defaults['server']['parallel'], maxAtOnce=MAX_AT_ONCE, # Four cores, or 8, it seems, per estimate
                               )

And that run_synthetic_dataset calls both build and sample:

        posterior = stan.build(stancode,  data=datadict)
        fit = posterior.sample(num_chains=4, num_samples=nIter)

It’s just that the model happens to have already been built, earlier. i.e., every process calls build.

I think the multiprocessing jobs are actually never returning, i.e. if I set it to have max 11 jobs, then 11 estimates run but do not return.

Running with “num_chains=1” in .sample makes no difference (except for fewer samples).

I hope I understood and tried your two suggestions.

I may try launching the jobs each through os.system().

Thank you!
Chris

Well, I have not solved this problem but I have worked around it by giving my module the ability to pass arguments back to itself for a single estimate, so the parallel manager just launches os.system() jobs which call Stan once. That way, everything runs through using exactly 11x4 cores at at time.

This is not a good solution for me because this means I will inevitably make the mistake of launching a large job and then forget that I must not edit (continue developing) the python module while that job is running.

This is all in the murky-world of parallel programming, so it’s hard to pinpoint the exact issue here. This issue sounds like some of the warnings present in the pipes and queues section of Python’s multiprocessing documentation, perhaps taking a look at that could help uncover some of the issues.

Also:

Perhaps this is a good impetus for maintaining separate development/main branches of your code. Keeping development separate from currently-running processes is perhaps a good idea nonetheless. Although, I recognize this isn’t the core of the issue that you’re having.

Try the following code (if running in jupyter, remember to call the following line)

import nest_asyncio; nest_asyncio.apply()`

Run each build + sample call in a thread (httpstan async behaviour will make this parallel without GIL).
This example uses the same data, but in your application, you can change this in each loop.

import asyncio
import arviz as az
import stan

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_fit_thread(program_code, data, num_chains, num_samples, random_seed):
    """Sample posterior model in a thread."""
    # create event loop for the thread
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # build and fit
    posterior = stan.build(
        program_code, 
        data=data, 
        random_seed=random_seed
    )
    fit = posterior.sample(
        num_chains=num_chains, 
        num_samples=num_samples
    )
    return fit

if __name__ == "__main__":

    schools_code = """
    data {
      int<lower=0> J;         // number of schools
      real y[J];              // estimated treatment effects
      real<lower=0> sigma[J]; // standard error of effect estimates
    }
    parameters {
      real mu;                // population treatment effect
      real<lower=0> tau;      // standard deviation in treatment effects
      vector[J] eta;          // unscaled deviation from mu by school
    }
    transformed parameters {
      vector[J] theta = mu + tau * eta;        // school treatment effects
    }
    model {
      target += normal_lpdf(eta | 0, 1);       // prior log-density
      target += normal_lpdf(y | theta, sigma); // log-likelihood
    }
    """

    schools_data = {"J": 8,
                    "y": [28,  8, -3,  7, -1,  1, 18, 12],
                    "sigma": [15, 10, 16, 11,  9, 11, 10, 18]}

    # build to cache
    posterior = stan.build(schools_code, data=schools_data)
    # run once so threads won't crash (why?)
    fit = posterior.sample(num_chains=1, num_samples=1)

    n_workers = 4
    n_calls = 4
    with ThreadPoolExecutor(max_workers=n_workers) as executor:
        futures = {}
        for i in range(n_calls):
            # remember to set random_seed manually
            # or all chains are the same (assuming same data)
            futures[(executor.submit(
                get_fit_thread, 
                program_code=schools_code, 
                data=schools_data,
                num_chains=1,
                num_samples=10_000,
                random_seed=i+1 # seed needs to be 1 or larger
            ))] = i
        outputs = {futures[future]: future.result() for future in as_completed(futures)}

    # example for combining fits, (sorted due to async order)
    idata = az.concat([az.from_pystan(outputs[i]) for i in sorted(outputs)], dim="chain")
    print(az.summary(idata))

For some reason one needs to build and sample posterior model at least once before calling the threading calls. I don’t know why this is, but probably something in httpstan needs to be triggered. (cc @ariddell any ideas what is going on?)

1 Like

It’s really hard to say. There’s a lot that has changed between PyStan 2.19 (Stan Math 2.19) and PyStan 3 (Stan Math 4.1.0) that could be interacting with uses of multiprocessing.

Perhaps CmdStan or CmdStanPy would work in this particular case?

Would this work with cmdstanpy as well?

It might, but I would use processes for it.

1 Like

Another relatively simple option for sweeping over a lot of compute-heavy number crunching scenarios is to write a Makefile that expresses each bit of number crunching as a rule to compute a results report file from a scenario input file, then you can get make to execute the rules in parallel by something like make -j 16 to throw 16 cores at it. If each number crunching rule defined in the Makefile is implemented as launching a python process to run a python script, then each python script can execute as a simple, self-contained process and read and write to files specified on the command line by the Makefile, and does not be responsible for forking new processes or spawning threads or so on.

This is probably pretty similar to the workaround using os.system() to launch processes from python as given above, but instead of using a top-level python script to kick off all the python “worker” processes, maybe that can just be done with make. Can be a pretty simple and effective tool for some batch parameter sweep workflows but might be a bad choice if the pystan calls are a but a small subroutine in a much larger existing python workflow.

1 Like