Skip to content

Commit

Permalink
Added NCCL
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Aug 9, 2016
1 parent 1ab3870 commit 801cc2e
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 10 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ ifeq ($(USE_CUDNN), 1)
COMMON_FLAGS += -DUSE_CUDNN
endif

# NCCL acceleration configuration
ifeq ($(USE_NCCL), 1)
LIBRARIES += nccl
COMMON_FLAGS += -DUSE_NCCL
endif

# configure IO libraries
ifeq ($(USE_OPENCV), 1)
COMMON_FLAGS += -DUSE_OPENCV
Expand Down
4 changes: 4 additions & 0 deletions Makefile.config.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
# cuDNN acceleration switch (uncomment to build with cuDNN).
# USE_CUDNN := 1

# NCCL acceleration switch (uncomment to build with NCCL)
# See https://github.com/NVIDIA/nccl
# USE_NCCL := 1

# CPU-only switch (uncomment to build without GPU support).
# CPU_ONLY := 1

Expand Down
3 changes: 3 additions & 0 deletions include/caffe/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ class Caffe {
// Parallel training info
inline static int solver_count() { return Get().solver_count_; }
inline static void set_solver_count(int val) { Get().solver_count_ = val; }
inline static int solver_rank() { return Get().solver_rank_; }
inline static void set_solver_rank(int val) { Get().solver_rank_ = val; }
inline static bool root_solver() { return Get().root_solver_; }
inline static void set_root_solver(bool val) { Get().root_solver_ = val; }

Expand All @@ -173,6 +175,7 @@ class Caffe {

Brew mode_;
int solver_count_;
int solver_rank_;
bool root_solver_;

private:
Expand Down
4 changes: 2 additions & 2 deletions include/caffe/internal_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class InternalThread {
bool must_stop();

private:
void entry(int device, Caffe::Brew mode, int rand_seed, int solver_count,
bool root_solver);
void entry(int device, Caffe::Brew mode, int rand_seed,
int solver_count, int solver_rank, bool root_solver);

shared_ptr<boost::thread> thread_;
};
Expand Down
33 changes: 32 additions & 1 deletion include/caffe/net.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,31 @@ class Net {
static bool StateMeetsRule(const NetState& state, const NetStateRule& rule,
const string& layer_name);

// Invoked at specific points during an iteration
class Callback {
protected:
virtual void run(int layer) = 0;

template <typename T>
friend class Net;
};
const vector<Callback*>& before_forward() const { return before_forward_; }
void add_before_forward(Callback* value) {
before_forward_.push_back(value);
}
const vector<Callback*>& after_forward() const { return after_forward_; }
void add_after_forward(Callback* value) {
after_forward_.push_back(value);
}
const vector<Callback*>& before_backward() const { return before_backward_; }
void add_before_backward(Callback* value) {
before_backward_.push_back(value);
}
const vector<Callback*>& after_backward() const { return after_backward_; }
void add_after_backward(Callback* value) {
after_backward_.push_back(value);
}

protected:
// Helpers for Init.
/// @brief Append a new top blob to the net.
Expand Down Expand Up @@ -308,7 +333,13 @@ class Net {
bool debug_info_;
/// The root net that actually holds the shared layers in data parallelism
const Net* const root_net_;
DISABLE_COPY_AND_ASSIGN(Net);
// Callbacks
vector<Callback*> before_forward_;
vector<Callback*> after_forward_;
vector<Callback*> before_backward_;
vector<Callback*> after_backward_;

DISABLE_COPY_AND_ASSIGN(Net);
};


Expand Down
49 changes: 49 additions & 0 deletions include/caffe/parallel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <boost/date_time/posix_time/posix_time.hpp>

#include <string>
#include <vector>

#include "caffe/blob.hpp"
Expand All @@ -14,6 +15,10 @@
#include "caffe/syncedmem.hpp"
#include "caffe/util/blocking_queue.hpp"

#ifdef USE_NCCL
#include "caffe/util/nccl.hpp"
#endif

namespace caffe {

// Represents a net parameters. Once a net is created, its parameter buffers can
Expand Down Expand Up @@ -80,6 +85,50 @@ class DevicePair {
int device_;
};

template<typename Dtype>
class NCCL : public GPUParams<Dtype>,
public Solver<Dtype>::Callback,
public Net<Dtype>::Callback {
public:
/**
* In multi-process settings, first create a NCCL id (new_uid), then
* pass it to each process to create connected instances.
*/
NCCL(shared_ptr<Solver<Dtype> > solver, const string& uid = "");
~NCCL();

/**
* In single process settings, create instances without uids and
* call this.
*/
static void init_single_process(vector<NCCL<Dtype>*>* nccls);

static string new_uid();

/**
* Broadcast weigths from rank 0 other solvers.
*/
void bcast();

protected:
void on_start() {}
void on_gradients_ready();
void run(int layer);

#ifdef USE_NCCL
ncclComm_t comm_;
cudaStream_t stream_;
vector<cudaEvent_t> layer_events_;
cudaEvent_t solver_event_;
#endif

shared_ptr<Solver<Dtype> > solver_;
bool layer_wise_;
using Params<Dtype>::size_;
using Params<Dtype>::data_;
using Params<Dtype>::diff_;
};

// Synchronous data parallelism using map-reduce between local GPUs.
template<typename Dtype>
class P2PSync : public GPUParams<Dtype>, public Solver<Dtype>::Callback,
Expand Down
5 changes: 5 additions & 0 deletions include/caffe/util/math_functions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ void caffe_gpu_add_scalar(const int N, const Dtype alpha, Dtype *X);
template <typename Dtype>
void caffe_gpu_scal(const int N, const Dtype alpha, Dtype *X);

#ifndef CPU_ONLY
template <typename Dtype>
void caffe_gpu_scal(const int N, const Dtype alpha, Dtype* X, cudaStream_t str);
#endif

template <typename Dtype>
void caffe_gpu_add(const int N, const Dtype* a, const Dtype* b, Dtype* y);

Expand Down
37 changes: 37 additions & 0 deletions include/caffe/util/nccl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#ifndef CAFFE_UTIL_NCCL_H_
#define CAFFE_UTIL_NCCL_H_
#ifdef USE_NCCL

#include <nccl.h>

#include "caffe/common.hpp"

#define NCCL_CHECK(condition) \
{ \
ncclResult_t result = condition; \
CHECK_EQ(result, ncclSuccess) << " " \
<< ncclGetErrorString(result); \
}

namespace caffe {

namespace nccl {

template <typename Dtype> class dataType;

template<> class dataType<float> {
public:
static const ncclDataType_t type = ncclFloat;
};
template<> class dataType<double> {
public:
static const ncclDataType_t type = ncclDouble;
};

} // namespace nccl

} // namespace caffe

#endif // end USE_NCCL

#endif // CAFFE_UTIL_NCCL_H_
2 changes: 1 addition & 1 deletion src/caffe/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void* Caffe::RNG::generator() {

Caffe::Caffe()
: cublas_handle_(NULL), curand_generator_(NULL), random_generator_(),
mode_(Caffe::CPU), solver_count_(1), root_solver_(true) {
mode_(Caffe::CPU), solver_count_(1), solver_rank_(0), root_solver_(true) {
// Try to create a cublas handler, and report an error if failed (but we will
// keep the program running as one might just want to run CPU code).
if (cublasCreate(&cublas_handle_) != CUBLAS_STATUS_SUCCESS) {
Expand Down
6 changes: 4 additions & 2 deletions src/caffe/internal_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,26 @@ void InternalThread::StartInternalThread() {
Caffe::Brew mode = Caffe::mode();
int rand_seed = caffe_rng_rand();
int solver_count = Caffe::solver_count();
int solver_rank = Caffe::solver_rank();
bool root_solver = Caffe::root_solver();

try {
thread_.reset(new boost::thread(&InternalThread::entry, this, device, mode,
rand_seed, solver_count, root_solver));
rand_seed, solver_count, solver_rank, root_solver));
} catch (std::exception& e) {
LOG(FATAL) << "Thread exception: " << e.what();
}
}

void InternalThread::entry(int device, Caffe::Brew mode, int rand_seed,
int solver_count, bool root_solver) {
int solver_count, int solver_rank, bool root_solver) {
#ifndef CPU_ONLY
CUDA_CHECK(cudaSetDevice(device));
#endif
Caffe::set_mode(mode);
Caffe::set_random_seed(rand_seed);
Caffe::set_solver_count(solver_count);
Caffe::set_solver_rank(solver_rank);
Caffe::set_root_solver(root_solver);

InternalThreadEntry();
Expand Down
13 changes: 12 additions & 1 deletion src/caffe/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -546,10 +546,15 @@ Dtype Net<Dtype>::ForwardFromTo(int start, int end) {
CHECK_LT(end, layers_.size());
Dtype loss = 0;
for (int i = start; i <= end; ++i) {
// LOG(ERROR) << "Forwarding " << layer_names_[i];
for (int c = 0; c < before_forward_.size(); ++c) {
before_forward_[c]->run(i);
}
Dtype layer_loss = layers_[i]->Forward(bottom_vecs_[i], top_vecs_[i]);
loss += layer_loss;
if (debug_info_) { ForwardDebugInfo(i); }
for (int c = 0; c < after_forward_.size(); ++c) {
after_forward_[c]->run(i);
}
}
return loss;
}
Expand Down Expand Up @@ -591,11 +596,17 @@ void Net<Dtype>::BackwardFromTo(int start, int end) {
CHECK_GE(end, 0);
CHECK_LT(start, layers_.size());
for (int i = start; i >= end; --i) {
for (int c = 0; c < before_backward_.size(); ++c) {
before_backward_[c]->run(i);
}
if (layer_need_backward_[i]) {
layers_[i]->Backward(
top_vecs_[i], bottom_need_backward_[i], bottom_vecs_[i]);
if (debug_info_) { BackwardDebugInfo(i); }
}
for (int c = 0; c < after_backward_.size(); ++c) {
after_backward_[c]->run(i);
}
}
}

Expand Down
Loading

0 comments on commit 801cc2e

Please sign in to comment.