Map-Reduce examples?

Are there any additional resources for learning how to do map-reduce with stan other than section 22 of the user guide? Any examples with public data out there? Thanks!

1 Like

I found Richard McElreath’s multithreadign and map-reduce example and I was able to follow it with cmdstan.

I’m now trying to make the same thing work with rstan and failing. In case it is relevant, i’m trying to run this with an i7 that has 12 threads and is running on ubuntu. I have a few questions:

  1. Because I have 12 threads, I thought that is should run this with 4 chains one core each and STAN_NUM_THREADS=3 . Is this right?
  2. To set the number of threads i’m running Sys.setenv(STAN_NUM_THREADS = 3). Is this the right way of doing that?

I know i’m doing something wrong because the version without multi-threading runs in 235 seconds and the one with multi-threading in 420 seconds. This is my Makevers file:

CXXFLAGS += -DSTAN_THREADS
CXXFLAGS += -pthread
CXX14FLAGS=-O3 -march=native -mtune=native
CXX14FLAGS += -fPIC

and this is mysessionInfo:

> sessionInfo()
R version 3.5.2 (2018-12-20)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 18.04.2 LTS

Matrix products: default
BLAS: /usr/lib/x86_64-linux-gnu/blas/libblas.so.3.7.1
LAPACK: /usr/lib/x86_64-linux-gnu/lapack/liblapack.so.3.7.1

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8       
 [4] LC_COLLATE=en_US.UTF-8     LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C                  LC_ADDRESS=C              
[10] LC_TELEPHONE=C             LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
[1] rstan_2.18.2       StanHeaders_2.18.1 ggplot2_3.1.0      tictoc_1.0         dplyr_0.8.0.1     

loaded via a namespace (and not attached):
 [1] Rcpp_1.0.0         plyr_1.8.4         pillar_1.3.1       compiler_3.5.2     prettyunits_1.0.2 
 [6] tools_3.5.2        digest_0.6.18      pkgbuild_1.0.2     evaluate_0.13      tibble_2.0.1      
[11] gtable_0.2.0       pkgconfig_2.0.2    rlang_0.3.1        cli_1.0.1          rstudioapi_0.9.0  
[16] yaml_2.2.0         parallel_3.5.2     xfun_0.5           loo_2.0.0          gridExtra_2.3     
[21] withr_2.1.2        knitr_1.21         stats4_3.5.2       grid_3.5.2         tidyselect_0.2.5  
[26] glue_1.3.0         inline_0.3.15      R6_2.4.0           processx_3.2.1     fansi_0.4.0       
[31] rmarkdown_1.11     callr_3.1.1        purrr_0.3.0        magrittr_1.5       htmltools_0.3.6   
[36] codetools_0.2-16   matrixStats_0.54.0 scales_1.0.0       ps_1.3.0           rsconnect_0.8.13  
[41] assertthat_0.2.0   colorspace_1.4-0   utf8_1.1.4         lazyeval_0.2.1     munsell_0.5.0     
[46] crayon_1.3.4   

What am I doing wrong?

In case it helps, here is the whole markdown:

---
title: "Multithreading and Map-Reduce in Stan"
output: html_document
---

```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)

Data

library(dplyr)
library(tictoc)
d <- read.csv( "RedcardData.csv" , stringsAsFactors=FALSE )
table(d$redCards )
glimpse(d)
d2 <- d[ !is.na(d$rater1) , ]

Without Multithreading

data {
  int N;
  int n_redcards[N];
  int n_games[N];
  real rating[N];
}
parameters {
  vector[2] beta;
}
model {
  beta ~ normal(0,1);
  n_redcards ~ binomial_logit( n_games , beta[1] + beta[2] * to_vector(rating) );
}

library(rstan)
stan_data <- list(N = nrow(d2), n_redcards = d2$redCards, n_games = d2$games, rating = d2$rater1)
tic('Without Multithreading')
fit <- sampling(logistic0, stan_data, chains=4, cores=4, seed=1982)
toc()

With Multithreading

functions {
  vector lp_reduce( vector beta , vector theta , real[] xr , int[] xi ) {
    int n = size(xr);
    int y[n] = xi[1:n];
    int m[n] = xi[(n+1):(2*n)];
    real lp = binomial_logit_lpmf( y | m , beta[1] + to_vector(xr) * beta[2] );
    return [lp]';
  }
} 

data {
  int N;
  int n_redcards[N];
  int n_games[N];
  real rating[N];
}

transformed data {
  // 7 shards
  // M = N/7 = 124621/7 = 17803
  int n_shards = 7;
  int M = N/n_shards;
  int xi[n_shards, 2*M];  // 2M because two variables, and they get stacked in array
  real xr[n_shards, M];
  // an empty set of per-shard parameters
  vector[0] theta[n_shards];
  // split into shards
  for ( i in 1:n_shards ) {
    int j = 1 + (i-1)*M;
    int k = i*M;
    xi[i,1:M] = n_redcards[ j:k ];
    xi[i,(M+1):(2*M)] = n_games[ j:k ];
    xr[i] = rating[ j:k ];
  }
}

parameters {
  vector[2] beta;
}

model {
  beta ~ normal(0,1);
  target += sum( map_rect( lp_reduce , beta , theta , xr , xi ) );
}


Sys.setenv(STAN_NUM_THREADS = 3) ### Is this the right way of doing this???
tic('With Multithreading')
fit <- sampling(logistic1, stan_data, chains=4, cores=4, seed=1982)
toc()

Thanks for the help!

1 Like

That should be

CXX14FLAGS=-O3 -march=native -mtune=native 
CXX14FLAGS += -fPIC
CXX14FLAGS += -DSTAN_THREADS -pthread

Thanks, @bgoodri! I got this to work with a slightly different Makevars copied from @Gregory’s post

CXX14FLAGS = -DSTAN_THREADS -pthread
CXX14FLAGS += -O3 -march=native -mtune=native
CXX14FLAGS += -fPIC

However, the multithreaded version took 318.84 seconds while the single-threaded took 247.153 seconds. Any advice? For example, should I run fewer chains and more threads?? Or should I set cores=1 and threads=12?

Threading is less efficient than using one core per chain, so if you have enough RAM, I would set cores equal to the number of chains. My guess is that your parallel::detectCores(logical = FALSE) is not 12

parallel::detectCores(logical = FALSE)
#> [1] 12

Created on 2019-03-02 by the reprex package (v0.2.1)

Is an Intel® Core™ i7-8700 CPU @ 3.20GHz

I also ran 1 chain with 1 core just to make sure I was getting some speed gain by using 12 threads. The single thread version took 161 seconds to run while the multi-threaded took 105.

Any advise on what is the best I can do with this CPU?