Skip to content

Commit

Permalink
Merge pull request #93 from marktwtn/libtuv-thread-pool-integration
Browse files Browse the repository at this point in the history
Integrate libtuv thread pool to eliminate thread creation overhead
  • Loading branch information
jserv committed Feb 12, 2019
2 parents 8853166 + 728aa2a commit c5147ab
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 42 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "deps/libtuv"]
path = deps/libtuv
url = https://github.com/DLTcollab/libtuv.git
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ endif
# Check specific CPU features available on build host
include mk/cpu-features.mk

# Handle git submodule
include mk/submodule.mk

ifeq ("$(BUILD_AVX)","1")
CFLAGS += -mavx -DENABLE_AVX
ifeq ("$(call cpu_feature,AVX2)","1")
Expand Down Expand Up @@ -117,17 +120,17 @@ OBJS := $(addprefix $(OUT)/, $(OBJS))

$(OUT)/test-%.o: tests/test-%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $<

$(OUT)/%.o: $(SRC)/%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $<

$(OUT)/test-%: $(OUT)/test-%.o $(OBJS)
$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -o $@ $^ $(LDFLAGS)

$(OUT)/libdcurl.so: $(OBJS)
$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -shared -o $@ $^ $(LDFLAGS)

Expand Down
1 change: 1 addition & 0 deletions deps/libtuv
Submodule libtuv added at 3177b5
21 changes: 21 additions & 0 deletions mk/submodule.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copy from the Makefile of libtuv to support different platforms
UNAME_M := $(shell uname -m)
UNAME_S := $(shell uname -s)
ifeq ($(UNAME_S),Linux)
UNAME_S := linux
endif
ifeq ($(UNAME_S),Darwin)
# macOS
UNAME_S := darwin
endif

# libtuv related variables
LIBTUV_PATH = deps/libtuv
LIBTUV_INCLUDE := -I $(LIBTUV_PATH)/include
LIBTUV_PLATFORM := $(UNAME_M)-$(UNAME_S)
# PIC (Position-Independent-Code) library
LIBTUV_LIBRARY := $(LIBTUV_PATH)/build/$(LIBTUV_PLATFORM)/release/lib/libtuv.o

$(LIBTUV_LIBRARY):
git submodule update --init $(LIBTUV_PATH)
$(MAKE) -C $(LIBTUV_PATH) TUV_BUILD_TYPE=release TUV_CREATE_PIC_LIB=yes
32 changes: 21 additions & 11 deletions src/pow_avx.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -415,9 +416,9 @@ long long int pwork256(int8_t mid[], int mwm, int8_t nonce[], int n,
}
#endif

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork256(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
Expand All @@ -429,7 +430,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -494,7 +494,8 @@ bool PowAVX(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -518,12 +519,14 @@ bool PowAVX(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -564,29 +567,33 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -596,7 +603,10 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
static void PoWAVX_Context_Destroy(ImplContext *impl_ctx)
{
PoW_AVX_Context *ctx = (PoW_AVX_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_avx.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -25,7 +26,9 @@ typedef struct _pow_avx_context PoW_AVX_Context;
struct _pow_avx_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
36 changes: 23 additions & 13 deletions src/pow_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -176,12 +177,12 @@ static int64_t pwork(int8_t mid[], int mwm, int8_t nonce[], int n,
return loop_cpu(lmid, hmid, mwm, nonce, stopPoW);
}

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);

pthread_mutex_lock(pworkInfo->lock);
if (pworkInfo->ret >= 0) {
Expand All @@ -190,7 +191,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -255,7 +255,8 @@ bool PowC(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -279,12 +280,14 @@ bool PowC(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -324,29 +327,33 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -356,7 +363,10 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
static void PoWC_Context_Destroy(ImplContext *impl_ctx)
{
PoW_C_Context *ctx = (PoW_C_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -26,7 +27,9 @@ typedef struct _pow_c_context PoW_C_Context;
struct _pow_c_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
Loading

0 comments on commit c5147ab

Please sign in to comment.