Skip to content
ct-clmsn edited this page Feb 8, 2022 · 20 revisions

Welcome to the jmq-collectives wiki!

This library implements collective communication algorithms used in HPC technologies (ie: MPI, OpenSHMEM). The point of this library is to provide a pure Java implementation of the algorithms using the JeroMQ library. The OpenMPI installation and Java bindings should suffice for most people. This library is for people that don't have access to OpenMPI and would still like to learn, utilize, or work with an SPMD programming environment. JeroMQ is implemented in 100% Java and comes with it's own set of caveats. It's suggested a user review the caveats on the JeroMQ wiki prior to using this library.

Usage Scenario

The use case for this library looks something like the following. You have a lot of data to process that has been loaded into a linear (Java Array, Iterator, or Stream) data structure. The processing task is embarrassingly parallel or the processing task requires a lot of shared state. Typically you'd like to exhaust the threads available on your machine but, you've hit the peak of what your processor can support. Note, the machine in question 100% dedicated to your JVM program. At this point, you'll need to "scale out" to another machine connected by network hardware (for jeromq, this is ethernet hardware). This is where SPMD comes into play. You send a copy of the program to another machine and run both programs 'simultaneously' and the programs use SPMD collective communication algorithms to send data to each other in an efficient manner.

Where Hadoop and Friends Fit Into the Picture

Hadoop does a fair job of building around the SPMD model. Hadoop stacks distribute copies of the code via Yarn across HDFS (Hadoop File System) storage nodes. In Hadoop, the HDFS distributed file system acts as 'memory' for the application (in the context of Hadoop or Spark, RAM serves as application/processor memory and block storage cache). Sophisticated caching techniques (what Spark calls 'in-memory' processing) are utilized to minimize network communications from the distributed file system. Hadoop places a premium on being resilient and scaling. User provide Hadoop a directory of files that their application code will process. Hadoop splits the list files based on the number of processing nodes a user requested. A user's code must conform to the Map-Reduce model provided by Hadoop. Hadoop begins processing the user's application code.

This library implements SPMD with an emphasis on traditional scientific computing styled problems (stencils, meshes, etc) and utilizing data structures loaded into processor memory (RAM/application memory - not distributed disk!) and direct machine-to-machine communications. Users can implement their own checkpoint-restart systems (something Spark automates using Directed Acyclic Graphs) to handle errors. This library does not perform leader election or consistency checking. The problem being solved requires the full attention of the machine/JVM.

Most Apparent Use Cases

  • Scientific Computing (Stencils, Fluid Flows, Meshes, etc)
  • Data Frame processing
  • Ensemble Machine Learning training and inference
  • Distributed Memory Linear Algebra (SUMMA, Cannon, k-SVD, etc)
  • Distributed Sorting

Avoid doing at all costs

  • Do not call a collective operation from within application threads!
  • Collectives should be called from the main program/process.

Design Considerations and Suggestions

The C++ and Rust versions of this ZeroMQ library all rely on pointers/references and lengths to manage the amount of information that is bundled into JeroMQ Messages. Java has a concept of references but, having to write library code to handle container types or chunking and blocking data to be passed through the collective communication routine seems like a task better situated for the user.

Situations will eventually arise where users could, or be tempted to, pass large user defined data structures that have to be serialized and could cause performance or communication issues. Java's support for Iterators and Streams can be used to play the role of a reasonable proxy for the traditional place raw pointers and offsets have existed.

If there is an occasion where an element or set of elements are too large to communicate by broadcast, consider using a pipelined-tree broadcast. Serialize the data into a stream, read chunks off the stream and pass them into multiple rounds of broadcast. On the receiving end of the communication, users will have to write received bytes into a buffer in order to deserialize the data. This may not be the most efficient mechanism for handling this problem, but, it does solve the problem.

As for gather and scatter, because streams are returned, a user could send non-symmetric chunks or blocks. Since gather/scatter accept variable length data, there are opportunities for users to partition and organize elements prior to execution of the gather/scatter collective communication.

Collectives Interface

public interface Collectives {

    public <Data extends java.io.Serializable> Data broadcast(Data data)
        throws IOException, ClassNotFoundException;

    public <Data extends java.io.Serializable> Data reduce(final Data init, java.util.function.BinaryOperator<Data> fn, java.util.stream.Stream<Data> data)
        throws IOException, ClassNotFoundException;

    public void barrier()
        throws IOException, ClassNotFoundException;

    public <Data extends java.io.Serializable> java.util.stream.Stream<Data> scatter(java.util.Iterator<Data> data, final long data_size)
        throws IOException, ClassNotFoundException;

    public <Data extends java.io.Serializable> java.util.stream.Stream<Data> gather(java.util.Iterator<Data> data, final long data_size)
        throws IOException, ClassNotFoundException;
}