Skip to content

Commit

Permalink
Integrate libtuv thread pool into dcurl
Browse files Browse the repository at this point in the history
To reduce the overhead of creating and eliminating the threads repeatedly,
we integrate the thread pool of libtuv with git submodule.
The pthread-related functions and data types are replaced with the corresonding
ones of libtuv.
The compilation of libtuv library is written in the file mk/submodule.mk.

Experiment:
Call clock_gettime() right before and after the functions for getting the thread.
The functions are pthread_create() (without thread pool)
and uv_queue_work() (with thread pool).
Use test-multi-pow.py as testcase since it initializes and destroys dcurl only once and
does the PoW multiple times, like what IRI does.
The experiment result shows the time of getting each thread
and the thread number of a PoW execution is 7.

Experiment result (unit: second):
Without thread pool
thread0: 0.000028384
thread1: 0.000025127
thread2: 0.000024748
thread3: 0.000023925
thread4: 0.000024126
thread5: 0.000025328
thread6: 0.000052900
thread0: 0.000049344
thread1: 0.000039575
thread2: 0.000036720
thread3: 0.000036249
thread4: 0.000034606
thread5: 0.000034676
thread6: 0.000033444

With thread pool
thread0: 0.000124327
thread1: 0.000002084
thread2: 0.000001052
thread3: 0.000000150
thread4: 0.000000121
thread5: 0.000000080
thread6: 0.000000090
thread0: 0.000000291
thread1: 0.000000080
thread2: 0.000000050
thread3: 0.000000050
thread4: 0.000000050
thread5: 0.000000060
thread6: 0.000000050

The first consumed time of getting the thread from thread pool is longer
since it is in charge of preallocating and initalizing the threads.

Close #58.
  • Loading branch information
marktwtn committed Feb 11, 2019
1 parent ee0804a commit 97613e3
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 43 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "deps/libtuv"]
path = deps/libtuv
url = https://github.com/DLTcollab/libtuv.git
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ endif
# Check specific CPU features available on build host
include mk/cpu-features.mk

# Handle git submodule
include mk/submodule.mk

ifeq ("$(BUILD_AVX)","1")
CFLAGS += -mavx -DENABLE_AVX
ifeq ("$(call cpu_feature,AVX2)","1")
Expand Down Expand Up @@ -117,17 +120,17 @@ OBJS := $(addprefix $(OUT)/, $(OBJS))

$(OUT)/test-%.o: tests/test-%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) -I $(LIBTUV_INCLUDE_PATH) -c -MMD -MF $@.d $<

$(OUT)/%.o: $(SRC)/%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) -I $(LIBTUV_INCLUDE_PATH) -c -MMD -MF $@.d $<

$(OUT)/test-%: $(OUT)/test-%.o $(OBJS)
$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -o $@ $^ $(LDFLAGS)

$(OUT)/libdcurl.so: $(OBJS)
$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -shared -o $@ $^ $(LDFLAGS)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ enabled in multi-threaded execuction fashion, resulting in much faster proof-of-
Reference Implementation (IRI). Additionally, dcurl also supports the FPGA-accelerated solution further described in docs/FPGA-ACCEL.md

# Warning
* You need to configure paths and flags of OpenCL installation in ```mk/opencl.mk```
* You need to configure paths and flags of OpenCL installation in ```mk/opencl.mk```.
* dcurl will automatically configure all the GPU divices on your platform.
* Check JDK installation and set JAVA_HOME if you wish to specify.
* If your platform doesn't support Intel SSE, dcurl would be compiled with naive implementation.
Expand Down
1 change: 1 addition & 0 deletions deps/libtuv
Submodule libtuv added at 3177b5
9 changes: 9 additions & 0 deletions mk/submodule.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# libtuv related variables
LIBTUV_PATH = deps/libtuv
LIBTUV_INCLUDE_PATH := $(LIBTUV_PATH)/include
# PIC (Position-Independent-Code) library
LIBTUV_LIBRARY := $(LIBTUV_PATH)/build/x86_64-linux/release/lib/libtuv.o

$(LIBTUV_LIBRARY):
git submodule update --init $(LIBTUV_PATH)
$(MAKE) -C $(LIBTUV_PATH) TUV_BUILD_TYPE=release TUV_CREATE_PIC_LIB=yes
32 changes: 21 additions & 11 deletions src/pow_avx.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -415,9 +416,9 @@ long long int pwork256(int8_t mid[], int mwm, int8_t nonce[], int n,
}
#endif

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork256(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
Expand All @@ -429,7 +430,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -494,7 +494,8 @@ bool PowAVX(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -518,12 +519,14 @@ bool PowAVX(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -564,29 +567,33 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -596,7 +603,10 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
static void PoWAVX_Context_Destroy(ImplContext *impl_ctx)
{
PoW_AVX_Context *ctx = (PoW_AVX_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_avx.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -25,7 +26,9 @@ typedef struct _pow_avx_context PoW_AVX_Context;
struct _pow_avx_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
36 changes: 23 additions & 13 deletions src/pow_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -176,12 +177,12 @@ static int64_t pwork(int8_t mid[], int mwm, int8_t nonce[], int n,
return loop_cpu(lmid, hmid, mwm, nonce, stopPoW);
}

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);

pthread_mutex_lock(pworkInfo->lock);
if (pworkInfo->ret >= 0) {
Expand All @@ -190,7 +191,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -255,7 +255,8 @@ bool PowC(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -279,12 +280,14 @@ bool PowC(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -324,29 +327,33 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -356,7 +363,10 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
static void PoWC_Context_Destroy(ImplContext *impl_ctx)
{
PoW_C_Context *ctx = (PoW_C_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -26,7 +27,9 @@ typedef struct _pow_c_context PoW_C_Context;
struct _pow_c_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
Loading

0 comments on commit 97613e3

Please sign in to comment.