MPI Design Discussion

But the master has the blocking gatherv in its reduce() method. Unless the master finishes gathering from every slave, it won’t be able to dispatch next job.

Yes, that’s correct. Now, bear in mind that the way I am distributing the job chunks to the slaves leads to roughly the same execution time for each slave. Recall that I am sending always large job chunks to each slave. Therefore, each slave will have it’s result block ready by about the same time as any other slave. The master must wait for all results to return in order to insert all the results in the AD tree at once. Remember that we can only operate serially on the master since the AD stack is not thread safe. Thus, I don’t think that we can do any asynchronous magic on the master.

In short I distribute all N jobs at once out to the cluster and then collect all the results.

Also: I have benchmarked the code using ODE problems and analytically solvable problems. For ODE problems the speedup is essentially linear in the number of cores and for analytic problems I am getting (for the model I tested) ~13x speedup when using 20 cores if I remember right. Getting linear speedup for the ODE case is to be expected, but the 13x when using 20 cores is not so bad after all, I think.

What I want to say is that the speedups are already nice. I have never programmed up this type of stuff and I am very happy with the performance. That said, I am sure one can do this even more elegant and clever. Right now I am expecting the user to have the jobs in an order which leads to good utilization and I think this will be the case for many problems anyway without the user doing anything special.

So we are assuming sequential processing on the master side, no load balancing concern(or leave it to users), and in need of a slave-side listening state. In that case, would a simple MPI skeleton like below be easier to reason with? (I’m using MPI not boost::mpi but the idea is the same, and I’m also skipping data scattering as well as serialization & deserialization).

Also, do we need a mpi namespace within math?

#define MPI_WORK_TAG     1
#define MPI_EXIT_TAG     2

int main(int argc, char** argv)
{
  int rank;
  MPI_Init(&argc, &argv);
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  (rank == 0)? master() : slave();
  MPI_Finalize();
}

void master()
{
  int njob, rank, data;
  double result;
  MPI_Status status;
  MPI_Comm_size(MPI_COMM_WORLD, &njob);

  {
    for (rank = 1; rank < njob; ++rank) {
      data = 1;                 // first job
      MPI_Send(&data, 1, MPI_INT, rank, MPI_WORK_TAG, MPI_COMM_WORLD);
    }

    for (rank = 1; rank < njob; ++rank) {
      MPI_Recv(&result, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
               MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }
  }

  {
    for (rank = 1; rank < njob; ++rank) {
      data = 2;                 // second job
      MPI_Send(&data, 1, MPI_INT, rank, MPI_WORK_TAG, MPI_COMM_WORLD);
    }

    for (rank = 1; rank < njob; ++rank) {
      MPI_Recv(&result, 1, MPI_DOUBLE, MPI_ANY_SOURCE,
               MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    }
  }

  {
    // next job
  }

  for (rank = 1; rank < njob; ++rank) {
    MPI_Send(0, 0, MPI_INT, rank, MPI_EXIT_TAG, MPI_COMM_WORLD);
  }
}

void slave()
{
  double result;
  int data;
  MPI_Status status;
  while(1) {
    MPI_Recv(&data, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

    if (status.MPI_TAG == MPI_EXIT_TAG) break;

    result = double(data)*double(data); // hard work

    MPI_Send(&result, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
  }
}

The intention is to put the MPI calls in the same level, as well as to use tag to flag the listening state. Will this design achieve the goal? If not, what would I be missing here?

EDIT: typo

I am not sure if I follow on this one. What do you mean here and how does this help us with what problem?

Again… what is the benefit of tags here? In the current implementation I am using essentially C++ types to tell the system what to do. The types are meaningful to our C++ system and the translation to what they trigger is handled by the boost libraries automatically. All I need to do is to register the types using macros. So there is also a define, but managing these lookup tables is done by boost.

So where does the current system has shortcomings and how do you address these?

Design elements of what we have are from my view:

  • mpi_cluster

    • provides management of cluster resource
    • allows execution of any function object you like (so we can easily introduce other mpi stuff besides the mpi_parallel_call)
    • no need to setup lookup tables (like tag defines). These are handled by boost serialization + boost mpi for us.
  • mpi_parallel_call

    • can call any functor you like with parameters and data
    • supports concept of static data which ensures that not changing data is only ever transmitted once and then cached locally for future reuse.
    • the parameters passed into the functors can be var or double (this changes the type of the functor), but reuse of static data is still warranted.
    • ensures that communication is done in big chunks which are fast to transmit.
  • map_rect_mpi

    • supports nested calls to the facility. Thus you can nest map_rect calls into each other if you wish so.
  • map_rect_serial

    • computes things in exactly the same way as the mpi version. So we are getting exact reproducible results no matter which version you use.

All of the above is implemented with minimal impact on the overall stan-math architecture and with minimal impact on needs to the Stan parser.

I’m probably least familiar with stan in this thread and definitely knows less then @wds15 about current design. For the problems like ODEs reported here, IMO the current design suits perfectly. My main question is what future use of MPI would emerge and how the current design would adapt to them.

For example, it seems that we are not concerned about load balancing. Will this change in the future? What if the number of tasks >> the number of slaves and we need dispatch them in a round-robin fashion? Or even further what if the number of tasks is determined dynamically?

Also, does the current design suits the needs of @anon75146577 and @avehtari for their sparse matrix project?(or do we even care?) For example, parallel matrix operations often require inter-slave communication, how do we do that in current setup?

Like I said, I don’t know if we have those concerns. If yes, I’d suggest keep things simple, by exposing MPI structures in master and slave in common SPMD fashion. If not, then all things great imo.

I think the MPI specifics are to this end only ever designed by myself. The mpi_cluster concept is a very lightweight and yet general implementation as it allows you to execute what you like on the cluster in a way which nicely integrates with C++ idioms.

I think the biggest concern we have is to get this branch into Stan ASAP since it performs darn well already. However, what you raise are valid points - so I have chosen the most simplistic scheduling you can do and more sophistication may speed up things further. Let me explain once more: If I have 1000 jobs and I have 10 cores then I will slice the work into 10 packets each of size 100. Hence all workers will receive equal amount of work. The rational is that on average each work packet should be the same computational cost (on average). Hence, this split should work ok and another upside of this way is that I can transmit all parameters block wise a single time.

In the current setup you would create a command which we initiate once the sparse matrix stuff starts to execute. Then you have full control over the cluster and can do whatever you like. You would use the full cluster to compute the operation itself and the gradients, store things on the AD stack and then things continue to execute.

Ok, I have shown the good performance already a few times of MPI, but to this end mostly with synthetic examples. I just compiled a realistic example

  • Hierarchical ODE based model
  • 1300 subjects
  • real world data set

The running time on a single core takes 2.6 days to finish. I have setup things such that the 10 & 15 core run were on a single machine while the 20, 40 & 80 core run were distributed in blocks of 10 (so 2, 4 and 8 machines) onto the cluster which is networked using infiniband. The key question to me was how well the performance scales and the result is stunning. The 64h on a single core go down to about 1h which is a 62x speedup, but look for yourself.

… and all results match exactly.

5 Likes

My biggest concern now is to get the interfaces and installs sorted. As long as the interface (here map_rect) to users is OK, we can change infrastructure later. I think we’re in good shape on the interface now as it’s simple.

We’ll probably want to move to some kind of more mature queue-based load balancing. But then there’s issue of getting the data transported. I’m not an expert, but I believe there’ll be a lot of work to do here.

This is going to be a matter of interleaving management of implicit operations called by matrix operations and user-defined parallelism.

I don’t think we can anticipate what they’re going to need yet. There are GPUs (probably not much use for sparse matrices), multi-process and multi-threading as contenders.

:-)