Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate libtuv thread pool to eliminate thread creation overhead #93

Merged
merged 2 commits into from
Feb 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "deps/libtuv"]
path = deps/libtuv
url = https://github.com/DLTcollab/libtuv.git
11 changes: 7 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ endif
# Check specific CPU features available on build host
include mk/cpu-features.mk

# Handle git submodule
include mk/submodule.mk
marktwtn marked this conversation as resolved.
Show resolved Hide resolved

ifeq ("$(BUILD_AVX)","1")
CFLAGS += -mavx -DENABLE_AVX
ifeq ("$(call cpu_feature,AVX2)","1")
Expand Down Expand Up @@ -117,17 +120,17 @@ OBJS := $(addprefix $(OUT)/, $(OBJS))

$(OUT)/test-%.o: tests/test-%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $<

$(OUT)/%.o: $(SRC)/%.c
$(VECHO) " CC\t$@\n"
$(Q)$(CC) -o $@ $(CFLAGS) -c -MMD -MF $@.d $<
$(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $<

$(OUT)/test-%: $(OUT)/test-%.o $(OBJS)
$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -o $@ $^ $(LDFLAGS)

$(OUT)/libdcurl.so: $(OBJS)
$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY)
$(VECHO) " LD\t$@\n"
$(Q)$(CC) -shared -o $@ $^ $(LDFLAGS)

Expand Down
1 change: 1 addition & 0 deletions deps/libtuv
Submodule libtuv added at 3177b5
21 changes: 21 additions & 0 deletions mk/submodule.mk
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copy from the Makefile of libtuv to support different platforms
UNAME_M := $(shell uname -m)
UNAME_S := $(shell uname -s)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add FIXME to mention the limitation of supported operating system listing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm......
I'm not sure what you expect to see.
Like listing the operating system that dcurl supports but libtuv does not or vice versa?

ifeq ($(UNAME_S),Linux)
UNAME_S := linux
endif
ifeq ($(UNAME_S),Darwin)
# macOS
UNAME_S := darwin
endif

# libtuv related variables
LIBTUV_PATH = deps/libtuv
LIBTUV_INCLUDE := -I $(LIBTUV_PATH)/include
LIBTUV_PLATFORM := $(UNAME_M)-$(UNAME_S)
# PIC (Position-Independent-Code) library
LIBTUV_LIBRARY := $(LIBTUV_PATH)/build/$(LIBTUV_PLATFORM)/release/lib/libtuv.o

$(LIBTUV_LIBRARY):
git submodule update --init $(LIBTUV_PATH)
$(MAKE) -C $(LIBTUV_PATH) TUV_BUILD_TYPE=release TUV_CREATE_PIC_LIB=yes
32 changes: 21 additions & 11 deletions src/pow_avx.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -415,9 +416,9 @@ long long int pwork256(int8_t mid[], int mwm, int8_t nonce[], int n,
}
#endif

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork256(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
Expand All @@ -429,7 +430,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -494,7 +494,8 @@ bool PowAVX(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -518,12 +519,14 @@ bool PowAVX(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -564,29 +567,33 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -596,7 +603,10 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx)
static void PoWAVX_Context_Destroy(ImplContext *impl_ctx)
{
PoW_AVX_Context *ctx = (PoW_AVX_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_avx.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -25,7 +26,9 @@ typedef struct _pow_avx_context PoW_AVX_Context;
struct _pow_avx_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
36 changes: 23 additions & 13 deletions src/pow_c.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <stdint.h>
#include <stdio.h>
#include <string.h>
#include <uv.h>
#include "cpu-utils.h"
#include "curl.h"
#include "implcontext.h"
Expand Down Expand Up @@ -176,12 +177,12 @@ static int64_t pwork(int8_t mid[], int mwm, int8_t nonce[], int n,
return loop_cpu(lmid, hmid, mwm, nonce, stopPoW);
}

static void *pworkThread(void *pitem)
static void work_cb(uv_work_t *req)
{
Pwork_struct *pworkInfo = (Pwork_struct *) pitem;
Pwork_struct *pworkInfo = (Pwork_struct *) req->data;
pworkInfo->ret = pwork(pworkInfo->mid, pworkInfo->mwm,
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);
pworkInfo->nonce, pworkInfo->n,
pworkInfo->stopPoW);

pthread_mutex_lock(pworkInfo->lock);
if (pworkInfo->ret >= 0) {
Expand All @@ -190,7 +191,6 @@ static void *pworkThread(void *pitem)
pworkInfo->n = -1;
}
pthread_mutex_unlock(pworkInfo->lock);
pthread_exit(NULL);
}

static int8_t *tx_to_cstate(Trytes_t *tx)
Expand Down Expand Up @@ -255,7 +255,8 @@ bool PowC(void *pow_ctx)
ctx->pow_info.time = 0;
ctx->pow_info.hash_count = 0;
pthread_mutex_init(&ctx->lock, NULL);
pthread_t *threads = ctx->threads;
uv_loop_t *loop_ptr = &ctx->loop;
uv_work_t *work_req = ctx->work_req;
Pwork_struct *pitem = ctx->pitem;
int8_t **nonce_array = ctx->nonce_array;

Expand All @@ -279,12 +280,14 @@ bool PowC(void *pow_ctx)
pitem[i].lock = &ctx->lock;
pitem[i].stopPoW = &ctx->stopPoW;
pitem[i].ret = 0;
pthread_create(&threads[i], NULL, pworkThread, (void *) &pitem[i]);
work_req[i].data = &pitem[i];
uv_queue_work(loop_ptr, &work_req[i], work_cb, NULL);
}

uv_run(loop_ptr, UV_RUN_DEFAULT);

int completedIndex = -1;
for (int i = 0; i < ctx->num_threads; i++) {
pthread_join(threads[i], NULL);
if (pitem[i].n == -1)
completedIndex = i;
ctx->pow_info.hash_count += (uint64_t) (pitem[i].ret >= 0 ? pitem[i].ret : -pitem[i].ret + 1);
Expand Down Expand Up @@ -324,29 +327,33 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
if (!ctx) return false;

/* Pre-allocate Memory Chunk for each field */
void *threads_chunk = malloc(impl_ctx->num_max_thread * sizeof(pthread_t) * nproc);
void *work_req_chunk = malloc(impl_ctx->num_max_thread * sizeof(uv_work_t) * nproc);
void *pitem_chunk = malloc(impl_ctx->num_max_thread * sizeof(Pwork_struct) * nproc);
void *nonce_ptr_chunk = malloc(impl_ctx->num_max_thread * sizeof(int8_t *) * nproc);
void *nonce_chunk = malloc(impl_ctx->num_max_thread * NONCE_TRITS_LENGTH * nproc);
if (!threads_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;
if (!work_req_chunk || !pitem_chunk || !nonce_ptr_chunk || !nonce_chunk) goto fail;

for (int i = 0; i < impl_ctx->num_max_thread; i++) {
ctx[i].threads = (pthread_t *) (threads_chunk + i * sizeof(pthread_t) * nproc);
ctx[i].work_req = (uv_work_t *) (work_req_chunk + i * sizeof(uv_work_t) * nproc);
ctx[i].pitem = (Pwork_struct *) (pitem_chunk + i * sizeof(Pwork_struct) * nproc);
ctx[i].nonce_array = (int8_t **) (nonce_ptr_chunk + i * sizeof(int8_t *) * nproc);
for (int j = 0; j < nproc; j++)
ctx[i].nonce_array[j] = (int8_t *) (nonce_chunk + i * NONCE_TRITS_LENGTH * nproc +
j * NONCE_TRITS_LENGTH);
ctx[i].num_max_threads = nproc;
impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1;
uv_loop_init(&ctx[i].loop);
}
impl_ctx->context = ctx;
pthread_mutex_init(&impl_ctx->lock, NULL);
return true;

fail:
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx);
free(threads_chunk);
free(work_req_chunk);
free(pitem_chunk);
free(nonce_ptr_chunk);
free(nonce_chunk);
Expand All @@ -356,7 +363,10 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx)
static void PoWC_Context_Destroy(ImplContext *impl_ctx)
{
PoW_C_Context *ctx = (PoW_C_Context *) impl_ctx->context;
free(ctx[0].threads);
for (int i = 0; i < impl_ctx->num_max_thread; i++) {
uv_loop_close(&ctx[i].loop);
}
free(ctx[0].work_req);
free(ctx[0].pitem);
free(ctx[0].nonce_array[0]);
free(ctx[0].nonce_array);
Expand Down
5 changes: 4 additions & 1 deletion src/pow_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <pthread.h>
#include <uv.h>
#include "common.h"
#include "constants.h"

Expand All @@ -26,7 +27,9 @@ typedef struct _pow_c_context PoW_C_Context;
struct _pow_c_context {
/* Resource of computing */
pthread_mutex_t lock;
pthread_t *threads;
/* Data type of libtuv */
uv_loop_t loop;
uv_work_t *work_req;
Pwork_struct *pitem;
int8_t **nonce_array;
int stopPoW;
Expand Down
Loading