From 88a5cb63c0ba34d3ece00894058ce19fd0c75f6e Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Wed, 24 Apr 2019 15:56:08 -0700 Subject: [PATCH] feat: Remote interface interoperate distributed computing resources * make BUILD_REMOTE=1 to build libdcurl.so and remote-worker * make BUILD_REMOTE=1 check with RabbitMQ broker and remote-worker * RPC with exclusive callback queues with TTL property * AMQP connection management for multiple threads * Implement local fallback PoW when remote interface fails Related #137 --- .gitmodules | 3 + Makefile | 32 +++- deps/rabbitmq-c | 1 + mk/defs.mk | 3 + mk/remote.mk | 8 + mk/submodule.mk | 31 ++++ src/dcurl.c | 39 +++++ src/remote_common.c | 323 +++++++++++++++++++++++++++++++++++++++++ src/remote_common.h | 53 +++++++ src/remote_interface.c | 237 ++++++++++++++++++++++++++++++ src/remote_interface.h | 61 ++++++++ src/remote_worker.c | 77 ++++++++++ 12 files changed, 862 insertions(+), 6 deletions(-) create mode 160000 deps/rabbitmq-c create mode 100644 mk/remote.mk create mode 100644 src/remote_common.c create mode 100644 src/remote_common.h create mode 100644 src/remote_interface.c create mode 100644 src/remote_interface.h create mode 100644 src/remote_worker.c diff --git a/.gitmodules b/.gitmodules index db8e5f6..400b111 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "deps/libtuv"] path = deps/libtuv url = https://github.com/DLTcollab/libtuv.git +[submodule "deps/rabbitmq-c"] + path = deps/rabbitmq-c + url = https://github.com/alanxz/rabbitmq-c.git diff --git a/Makefile b/Makefile index bbc91b6..749d45e 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,10 @@ ifeq ("$(BUILD_FPGA_ACCEL)","1") CFLAGS += -DENABLE_FPGA_ACCEL endif +ifeq ("$(BUILD_REMOTE)","1") +CFLAGS += -DENABLE_REMOTE +endif + ifeq ("$(BUILD_JNI)","1") include mk/java.mk endif @@ -99,6 +103,9 @@ PREQ := config $(TESTS) $(LIBS) ifeq ("$(BUILD_JNI)","1") PREQ += $(JARS) endif +ifeq ("$(BUILD_REMOTE)", "1") +PREQ += $(OUT)/remote-worker +endif all: $(PREQ) .DEFAULT_GOAL := all @@ -140,23 +147,36 @@ OBJS += \ pow_fpga_accel.o endif +ifeq ("$(BUILD_REMOTE)", "1") +OBJS += \ + remote_common.o \ + remote_interface.o + +WORKER_OBJS := $(addprefix $(OUT)/worker-,$(filter-out remote_interface.o, $(OBJS))) +WORKER_CFLAGS := $(filter-out -DENABLE_REMOTE, $(CFLAGS)) +endif + OBJS := $(addprefix $(OUT)/, $(OBJS)) $(OUT)/test-%.o: tests/test-%.c $(LIBTUV_PATH)/include $(VECHO) " CC\t$@\n" $(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $< -$(OUT)/%.o: $(SRC)/%.c $(LIBTUV_PATH)/include +$(OUT)/%.o: $(SRC)/%.c $(LIBTUV_PATH)/include $(LIBRABBITMQ_PATH)/build/include $(VECHO) " CC\t$@\n" - $(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $< + $(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) $(LIBRABBITMQ_INCLUDE) -c -MMD -MF $@.d $< -$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY) +$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) $(VECHO) " LD\t$@\n" - $(Q)$(CC) -o $@ $^ $(LDFLAGS) + $(Q)$(CC) -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) -$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY) +$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) $(VECHO) " LD\t$@\n" - $(Q)$(CC) -shared -o $@ $^ $(LDFLAGS) + $(Q)$(CC) -shared -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) + +ifeq ("$(BUILD_REMOTE)", "1") +include mk/remote.mk +endif include mk/common.mk diff --git a/deps/rabbitmq-c b/deps/rabbitmq-c new file mode 160000 index 0000000..75a21e5 --- /dev/null +++ b/deps/rabbitmq-c @@ -0,0 +1 @@ +Subproject commit 75a21e51db5d70ea807473621141b4417d81b56f diff --git a/mk/defs.mk b/mk/defs.mk index 989ee08..deded44 100644 --- a/mk/defs.mk +++ b/mk/defs.mk @@ -19,6 +19,9 @@ BUILD_GPU ?= 0 # Build FPGA backend or not BUILD_FPGA_ACCEL ?= 0 +# Build facilities of remote procedure calls +BUILD_REMOTE ?= 0 + # Build JNI glue as the bridge between dcurl and IRI BUILD_JNI ?= 0 diff --git a/mk/remote.mk b/mk/remote.mk new file mode 100644 index 0000000..667f8e7 --- /dev/null +++ b/mk/remote.mk @@ -0,0 +1,8 @@ +# Build remote-worker +$(OUT)/worker-%.o: $(SRC)/%.c $(LIBTUV_PATH)/include $(LIBRABBITMQ_PATH)/build/include + $(VECHO) " CC\t$@\n" + $(Q)$(CC) -o $@ $(WORKER_CFLAGS) $(LIBTUV_INCLUDE) $(LIBRABBITMQ_INCLUDE) -c -MMD -MF $@.d $< + +$(OUT)/remote-worker: $(OUT)/remote_worker.o $(WORKER_OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) + $(VECHO) " LD\t$@\n" + $(Q)$(CC) -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) diff --git a/mk/submodule.mk b/mk/submodule.mk index 8be3d84..12c7164 100644 --- a/mk/submodule.mk +++ b/mk/submodule.mk @@ -26,3 +26,34 @@ $(LIBTUV_PATH)/include: $(LIBTUV_LIBRARY): $(MAKE) -C $(LIBTUV_PATH) TUV_BUILD_TYPE=release TUV_CREATE_PIC_LIB=yes TUV_PLATFORM=$(LIBTUV_PLATFORM) TUV_BOARD=$(LIBTUV_BOARD) + +# librabbitmq related variables +LIBRABBITMQ_PATH = deps/rabbitmq-c +LIBRABBITMQ_INCLUDE := -I $(LIBRABBITMQ_PATH)/build/include +LIBRABBITMQ_LIB_PATH := $(LIBRABBITMQ_PATH)/build/librabbitmq/ +ifeq ($(UNAME_S),darwin) + # macOS + LIBRABBITMQ_LINK := -Wl,-rpath,$(LIBRABBITMQ_LIB_PATH) -L$(LIBRABBITMQ_LIB_PATH) -lrabbitmq + LIBRABBITMQ_LIBRARY := $(LIBRABBITMQ_LIB_PATH)/librabbitmq.dylib +else + LIBRABBITMQ_LINK := -Wl,-rpath=$(LIBRABBITMQ_LIB_PATH) -L$(LIBRABBITMQ_LIB_PATH) -lrabbitmq + LIBRABBITMQ_LIBRARY := $(LIBRABBITMQ_LIB_PATH)/librabbitmq.so +endif + +$(LIBRABBITMQ_PATH)/build/include: + git submodule update --init $(LIBRABBITMQ_PATH) + mkdir $(LIBRABBITMQ_PATH)/build +ifeq ($(UNAME_S),darwin) + # macOS + cd $(LIBRABBITMQ_PATH)/build && \ + cmake -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl/ -DCMAKE_INSTALL_PREFIX=. .. && \ + cmake --build . --target install +else + cd $(LIBRABBITMQ_PATH)/build && \ + cmake -DCMAKE_INSTALL_PREFIX=. .. && \ + cmake --build . --target install +endif + +$(LIBRABBITMQ_LIBRARY): + cd $(LIBRABBITMQ_PATH)/build && \ + cmake --build . diff --git a/src/dcurl.c b/src/dcurl.c index aca0dab..88235da 100644 --- a/src/dcurl.c +++ b/src/dcurl.c @@ -15,6 +15,9 @@ #if defined(ENABLE_FPGA_ACCEL) #include "pow_fpga_accel.h" #endif +#if defined(ENABLE_REMOTE) +#include "remote_interface.h" +#endif #include "implcontext.h" #include "trinary.h" #include "uv.h" @@ -48,6 +51,11 @@ extern ImplContext PoWCL_Context; extern ImplContext PoWFPGAAccel_Context; #endif +#if defined(ENABLE_REMOTE) +extern RemoteImplContext Remote_Context; +static uv_sem_t notify_remote; +#endif + bool dcurl_init() { bool ret = true; @@ -68,6 +76,11 @@ bool dcurl_init() ret &= registerImplContext(&PoWFPGAAccel_Context); #endif +#if defined(ENABLE_REMOTE) + ret &= initializeRemoteContext(&Remote_Context); + uv_sem_init(¬ify_remote, 0); +#endif + uv_sem_init(¬ify, 0); return isInitialized = ret; } @@ -77,6 +90,10 @@ void dcurl_destroy() ImplContext *impl = NULL; struct list_head *p; +#if defined(ENABLE_REMOTE) + destroyRemoteContext(&Remote_Context); +#endif + list_for_each (p, &IMPL_LIST) { impl = list_entry(p, ImplContext, list); destroyImplContext(impl); @@ -96,6 +113,28 @@ int8_t *dcurl_entry(int8_t *trytes, int mwm, int threads) if (!isInitialized) return NULL; +#if defined(ENABLE_REMOTE) + do { + if (enterRemoteContext(&Remote_Context)) { + pow_ctx = getRemoteContext(&Remote_Context, trytes, mwm); + goto remote_pow; + } + uv_sem_wait(¬ify_remote); + } while (1); + +remote_pow: + if (!doRemoteContext(&Remote_Context, pow_ctx)) { + goto local_pow; + } else { + res = getRemoteResult(&Remote_Context, pow_ctx); + } + freeRemoteContext(&Remote_Context, pow_ctx); + exitRemoteContext(&Remote_Context); + uv_sem_post(¬ify_remote); + return res; + +local_pow: +#endif do { list_for_each (p, &IMPL_LIST) { impl = list_entry(p, ImplContext, list); diff --git a/src/remote_common.c b/src/remote_common.c new file mode 100644 index 0000000..d9dd902 --- /dev/null +++ b/src/remote_common.c @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "remote_common.h" +#include +#include +#include "common.h" + +bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) +{ + switch (x.reply_type) { + case AMQP_RESPONSE_NORMAL: + return true; + + case AMQP_RESPONSE_NONE: + ddprintf("%s: missing RPC reply type!\n", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + ddprintf("%s: %s\n", context, + amqp_error_string2(x.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = + (amqp_connection_close_t *) x.reply.decoded; + ddprintf("%s: server connection error %uh, message: %.*s\n", + context, m->reply_code, (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + ddprintf("%s: server channel error %uh, message: %.*s\n", + context, m->reply_code, (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + default: + ddprintf("%s: unknown server error, method id 0x%08X\n", + context, x.reply.id); + break; + } + break; + } + + return false; +} + +bool die_on_error(int x, char const *context) +{ + if (x < 0) { + ddprintf("%s: %s\n", context, amqp_error_string2(x)); + return false; + } + + return true; +} + +bool connect_broker(amqp_connection_state_t *conn) +{ + amqp_socket_t *socket = NULL; + + /* Connect to the rabbitmq broker */ + *conn = amqp_new_connection(); + socket = amqp_tcp_socket_new(*conn); + if (amqp_socket_open(socket, HOSTNAME, 5672) != AMQP_STATUS_OK) { + ddprintf("%s\n", "The rabbitmq broker is closed"); + goto destroy_connection; + } + + /* Login to the rabbitmq broker */ + if (!die_on_amqp_error(amqp_login(*conn, "/", AMQP_DEFAULT_MAX_CHANNELS, + AMQP_DEFAULT_FRAME_SIZE, 0, + AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in")) + goto connection_close; + + /* Open the channel in the rabbitmq broker */ + amqp_channel_open(*conn, 1); + if (!(die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Opennng the channel"))) + goto channel_close; + + return true; + +channel_close: + amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS); + +connection_close: + amqp_connection_close(*conn, AMQP_REPLY_SUCCESS); + +destroy_connection: + amqp_destroy_connection(*conn); + + return false; +} + +bool declare_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name) +{ + /* Declare a durable queue */ + amqp_queue_declare(*conn, channel, amqp_cstring_bytes(queue_name), 0, 1, 0, + 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Declaring the queue")) + return false; + + return true; +} + +bool declare_callback_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t *reply_to_queue) +{ + /* Declare a exclusive private queues with TTL = 10s */ + amqp_table_entry_t entries[1]; + amqp_table_t table; + entries[0].key = amqp_cstring_bytes("x-message-ttl"); + entries[0].value.kind = AMQP_FIELD_KIND_U32; + entries[0].value.value.u32 = 10 * 1000; // 10s + table.num_entries = 1; + table.entries = entries; + + amqp_queue_declare_ok_t *r = + amqp_queue_declare(*conn, channel, amqp_empty_bytes, 0, 0, 1, 0, table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Declaring the private queue with TTL = 10s")) + return false; + + *reply_to_queue = amqp_bytes_malloc_dup(r->queue); + return true; +} + +bool set_consuming_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name) +{ + amqp_basic_consume(*conn, channel, amqp_cstring_bytes(queue_name), + amqp_empty_bytes, 0, 0, 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Set the consuming queue")) + return false; + + return true; +} + +bool consume_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope) +{ + amqp_maybe_release_buffers(*conn); + /* Consume a message from the queue in the rabbitmq broker */ + if (!die_on_amqp_error(amqp_consume_message(*conn, envelope, NULL, 0), + "Consuming the message")) + return false; + + return true; +} + +bool wait_response_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t callback_queue, + char *frame_body, + int body_len) +{ + amqp_frame_t frame; +#if defined(ENABLE_DEBUG) + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; +#endif + size_t body_target; + size_t body_received; + + amqp_basic_consume(*conn, channel, callback_queue, amqp_empty_bytes, 0, 1, + 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Set the callback queue")) + return false; + + /*Wait for three frames: AMQP_FRAME_METHOD, AMQP_FRAME_HEADER, + * AMQP_FRAME_BODY*/ + for (;;) { + amqp_maybe_release_buffers(*conn); + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame")) + return false; + + ddprintf(MSG_PREFIX "Frame type: %u channel: %u\n", frame.frame_type, frame.channel); + + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + ddprintf(MSG_PREFIX "Method: %s\n", amqp_method_name(frame.payload.method.id)); + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + +#if defined(ENABLE_DEBUG) + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + ddprintf(MSG_PREFIX "Delivery: %u exchange: %.*s routingkey: %.*s\n", + (unsigned) d->delivery_tag, (int) d->exchange.len, + (char *) d->exchange.bytes, (int) d->routing_key.len, + (char *) d->routing_key.bytes); +#endif + + amqp_maybe_release_buffers(*conn); + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame")) + return false; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + ddprintf("Unexpected header!"); + return false; + } + +#if defined(ENABLE_DEBUG) + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + ddprintf(MSG_PREFIX "Content-type: %.*s\n", (int) p->content_type.len, + (char *) p->content_type.bytes); + } +#endif + ddprintf("---\n"); + + body_target = (size_t) frame.payload.properties.body_size; + body_received = 0; + while (body_received < body_target) { + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Wait body frame")) + return false; + + if (frame.frame_type != AMQP_FRAME_BODY) { + ddprintf("Unexpected body"); + return false; + } + + body_received += frame.payload.body_fragment.len; + } + if (body_received != body_target) { + ddprintf("Received body is small than body target"); + return false; + } + + memcpy(frame_body, (char *) frame.payload.body_fragment.bytes, + body_len); + + ddprintf(MSG_PREFIX "PoW result: %.*s", (int) frame.payload.body_fragment.len, + (char *) frame.payload.body_fragment.bytes); + + /* everything was fine, we can quit now because we received the reply */ + return true; + } +} + +bool publish_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = AMQP_DELIVERY_PERSISTENT; + + /* Publish messages by default exchange */ + if (!die_on_error(amqp_basic_publish(*conn, channel, amqp_cstring_bytes(""), + amqp_cstring_bytes(queue_name), 0, 0, + &props, amqp_cstring_bytes(message)), + "Publish the message")) + return false; + + return true; +} + +bool publish_message_with_reply_to(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + amqp_bytes_t reply_to_queue, + char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | + AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = AMQP_DELIVERY_PERSISTENT; + props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); + + if (!die_on_error(amqp_basic_publish(*conn, channel, amqp_cstring_bytes(""), + amqp_cstring_bytes(queue_name), 0, 0, + &props, amqp_cstring_bytes(message)), + "Publishing the message with reply_to")) + return false; + + ddprintf(MSG_PREFIX "callback queue %s \n", (char *) props.reply_to.bytes); + amqp_bytes_free(props.reply_to); + + return true; +} + +bool acknowledge_broker(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope) +{ + /* Make sure a message is never lost */ + if (!die_on_error(amqp_basic_ack(*conn, channel, envelope->delivery_tag, 0), + "Acknowledging the broker")) + return false; + + return true; +} + +void disconnect_broker(amqp_connection_state_t *conn) +{ + amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close(*conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(*conn); +} diff --git a/src/remote_common.h b/src/remote_common.h new file mode 100644 index 0000000..a063601 --- /dev/null +++ b/src/remote_common.h @@ -0,0 +1,53 @@ +#ifndef REMOTE_COMMON_H_ +#define REMOTE_COMMON_H_ + +#include +#include +#include "amqp.h" +#include "amqp_tcp_socket.h" + +#define HOSTNAME "localhost" +#define MSG_PREFIX "[dcurl-remote] " + +bool connect_broker(amqp_connection_state_t *conn); + +bool declare_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name); + +bool declare_callback_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t *reply_to_queue); + +bool set_consuming_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name); + +bool consume_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope); + +bool wait_response_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t callback_queue, + char *frame_body, + int body_len); + +bool publish_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + char *message); + +bool publish_message_with_reply_to(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + amqp_bytes_t reply_to_queue, + char *message); + +bool acknowledge_broker(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope); + +void disconnect_broker(amqp_connection_state_t *conn); + +#endif diff --git a/src/remote_interface.c b/src/remote_interface.c new file mode 100644 index 0000000..76f0c6f --- /dev/null +++ b/src/remote_interface.c @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "remote_interface.h" +#include +#include "trinary.h" + +bool initializeRemoteContext(RemoteImplContext *remote_ctx) +{ + bool res = remote_ctx->initialize(remote_ctx); + if (res) { + ddprintf(MSG_PREFIX "Implementation %s is initialized successfully\n", + remote_ctx->description); + } + return res; +} + +void destroyRemoteContext(RemoteImplContext *remote_ctx) +{ + return remote_ctx->destroy(remote_ctx); +} + +bool enterRemoteContext(RemoteImplContext *remote_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + if (remote_ctx->num_working_thread >= remote_ctx->num_max_thread) { + uv_mutex_unlock(&remote_ctx->lock); + return false; /* Access Failed */ + } + remote_ctx->num_working_thread++; + uv_mutex_unlock(&remote_ctx->lock); + return true; /* Access Success */ +} + +void *getRemoteContext(RemoteImplContext *remote_ctx, int8_t *trytes, int mwm) +{ + return remote_ctx->getPoWContext(remote_ctx, trytes, mwm); +} + +bool doRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->doThePoW(remote_ctx, pow_ctx); +} + +int8_t *getRemoteResult(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->getPoWResult(pow_ctx); +} + +bool freeRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->freePoWContext(remote_ctx, pow_ctx); +} + +void exitRemoteContext(RemoteImplContext *remote_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + remote_ctx->num_working_thread--; + uv_mutex_unlock(&remote_ctx->lock); +} + +bool PoWValidation(int8_t *output_trytes, int mwm) +{ + Trytes_t *trytes_t = initTrytes(output_trytes, TRANSACTION_TRYTES_LENGTH); + if (!trytes_t) { + ddprintf("PoW Validation: Initialization of Trytes fails\n"); + goto fail_to_inittrytes; + } + + Trytes_t *hash_trytes = hashTrytes(trytes_t); + if (!hash_trytes) { + ddprintf("PoW Validation: Hashing trytes fails\n"); + goto fail_to_hashtrytes; + } + + Trits_t *ret_trits = trits_from_trytes(hash_trytes); + for (int i = 243 - 1; i >= 243 - mwm; i--) { + if (ret_trits->data[i] != 0) { + ddprintf("PoW Validation fails\n"); + goto fail_to_validation; + } + } + + return true; + +fail_to_validation: + freeTrobject(ret_trits); + freeTrobject(hash_trytes); +fail_to_hashtrytes: + freeTrobject(trytes_t); +fail_to_inittrytes: + return false; +} + +static bool Remote_doPoW(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + char buf[4]; + char messagebody[TRANSACTION_TRYTES_LENGTH + 4]; + + amqp_bytes_t reply_to_queue; + + PoW_Remote_Context *ctx = (PoW_Remote_Context *) pow_ctx; + + /* Message body format: transacton | mwm */ + memcpy(messagebody, ctx->input_trytes, TRANSACTION_TRYTES_LENGTH); + sprintf(buf, "%d", ctx->mwm); + memcpy(messagebody + TRANSACTION_TRYTES_LENGTH, buf, 4); + + if (!declare_callback_queue(&remote_ctx->conn[ctx->indexOfContext], 1, &reply_to_queue)) + goto fail; + + if (!publish_message_with_reply_to(&remote_ctx->conn[ctx->indexOfContext], 1, + "incoming_queue", reply_to_queue, + messagebody)) + goto fail; + + if (!wait_response_message(&remote_ctx->conn[ctx->indexOfContext], 1, reply_to_queue, + (char *) (ctx->output_trytes), + TRANSACTION_TRYTES_LENGTH)) + goto fail; + + amqp_bytes_free(reply_to_queue); + + PoWValidation(ctx->output_trytes, ctx->mwm); + + return true; + +fail: + return false; +} + +static bool Remote_init(RemoteImplContext *remote_ctx) +{ + if (remote_ctx->num_max_thread <= 0) + goto fail_to_init; + + PoW_Remote_Context *ctx = (PoW_Remote_Context *) malloc( + sizeof(PoW_Remote_Context) * remote_ctx->num_max_thread); + + memset(remote_ctx->slots, 0, remote_ctx->num_max_thread * sizeof(bool)); + + for(int i = 0 ; i < CONN_MAX; i++) + { + if (!connect_broker(&remote_ctx->conn[i])) + goto fail_to_init; + } + if (!declare_queue(&remote_ctx->conn[0], 1, "incoming_queue")) + goto fail_to_init; + + remote_ctx->context = ctx; + + uv_mutex_init(&remote_ctx->lock); + + return true; + +fail_to_init: + return false; +} + +static void Remote_destroy(RemoteImplContext *remote_ctx) +{ + PoW_Remote_Context *ctx = (PoW_Remote_Context *) remote_ctx->context; + + for(int i = 0; i < CONN_MAX; i++) + disconnect_broker(&remote_ctx->conn[i]); + + free(ctx); +} + +static void *Remote_getPoWContext(RemoteImplContext *remote_ctx, + int8_t *trytes, + int mwm) +{ + uv_mutex_lock(&remote_ctx->lock); + + for (int i = 0; i < remote_ctx->num_max_thread; i++) { + if (!remote_ctx->slots[i]) { + remote_ctx->slots[i] = true; + + uv_mutex_unlock(&remote_ctx->lock); + PoW_Remote_Context *ctx = + remote_ctx->context + sizeof(PoW_Remote_Context) * i; + memcpy(ctx->input_trytes, trytes, TRANSACTION_TRYTES_LENGTH); + ctx->mwm = mwm; + ctx->indexOfContext = i; + + return ctx; + } + } + + uv_mutex_unlock(&remote_ctx->lock); + + return NULL; /* It should not happen */ +} + +static bool Remote_freePoWContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + + remote_ctx->slots[((PoW_Remote_Context *) pow_ctx)->indexOfContext] = false; + + uv_mutex_unlock(&remote_ctx->lock); + + return true; +} + +static int8_t *Remote_getPoWResult(void *pow_ctx) +{ + int8_t *ret = (int8_t *) malloc(sizeof(int8_t) * TRANSACTION_TRYTES_LENGTH); + if (!ret) + return NULL; + memcpy(ret, ((PoW_Remote_Context *) pow_ctx)->output_trytes, + TRANSACTION_TRYTES_LENGTH); + return ret; +} + +static PoW_Info Remote_getPoWInfo(void *pow_ctx) +{ + return ((PoW_Remote_Context *) pow_ctx)->pow_info; +} + +RemoteImplContext Remote_Context = { + .context = NULL, + .description = "Remote interface", + .num_max_thread = CONN_MAX, // 1 <= num_max_thread + .num_working_thread = 0, + .initialize = Remote_init, + .destroy = Remote_destroy, + .getPoWContext = Remote_getPoWContext, + .freePoWContext = Remote_freePoWContext, + .doThePoW = Remote_doPoW, + .getPoWResult = Remote_getPoWResult, + .getPoWInfo = Remote_getPoWInfo, +}; diff --git a/src/remote_interface.h b/src/remote_interface.h new file mode 100644 index 0000000..0851984 --- /dev/null +++ b/src/remote_interface.h @@ -0,0 +1,61 @@ +#ifndef REMOTE_INTERFACE_H_ +#define REMOTE_INTERFACE_H_ + +#include +#include +#include "common.h" +#include "constants.h" +#include "remote_common.h" +#include "uv.h" + +#define CONN_MAX 20 + +typedef struct _pow_remote_context PoW_Remote_Context; +typedef struct _remote_impl_context RemoteImplContext; + +struct _pow_remote_context { + /* Thread management */ + int indexOfContext; + /* Arguments of PoW */ + int8_t input_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ + int8_t output_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ + int mwm; + /* PoW-related information */ + PoW_Info pow_info; +}; + +struct _remote_impl_context { + void *context; + char *description; + /* Connection parameters */ + amqp_connection_state_t conn[CONN_MAX]; + /* Thread management */ + uv_mutex_t lock; + bool slots[CONN_MAX]; /* Used to tell which slot is + available */ + int num_max_thread; + int num_working_thread; + + /* Functions of Implementation Context */ + bool (*initialize)(RemoteImplContext *remote_ctx); + void (*destroy)(RemoteImplContext *remote_ctx); + /* Private PoW Context for each thread */ + void *(*getPoWContext)(RemoteImplContext *remote_ctx, + int8_t *trytes, + int mwm); + bool (*doThePoW)(RemoteImplContext *remote_ctx, void *pow_ctx); + int8_t *(*getPoWResult)(void *pow_ctx); + PoW_Info (*getPoWInfo)(void *pow_ctx); + bool (*freePoWContext)(RemoteImplContext *remote_ctx, void *pow_ctx); +}; + +bool initializeRemoteContext(RemoteImplContext *remote_ctx); +void destroyRemoteContext(RemoteImplContext *remote_ctx); +bool enterRemoteContext(RemoteImplContext *remote_ctx); +void *getRemoteContext(RemoteImplContext *remote_ctx, int8_t *trytes, int mwm); +bool doRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx); +int8_t *getRemoteResult(RemoteImplContext *remote_ctx, void *pow_ctx); +bool freeRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx); +void exitRemoteContext(RemoteImplContext *remote_ctx); + +#endif diff --git a/src/remote_worker.c b/src/remote_worker.c new file mode 100644 index 0000000..12c9136 --- /dev/null +++ b/src/remote_worker.c @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "constants.h" +#include "dcurl.h" +#include "remote_common.h" +#include "common.h" + +int main(int argc, char const *const *argv) +{ + char trytes[TRANSACTION_TRYTES_LENGTH]; + char buf[4]; + int mwm; + + amqp_connection_state_t conn; + amqp_envelope_t envelope; + + dcurl_init(); + + if (!connect_broker(&conn)) + goto fail; + + if (!set_consuming_queue(&conn, 1, "incoming_queue")) + goto fail; + + for (;;) { + if (!consume_message(&conn, 1, &envelope)) + goto fail; + + ddprintf(MSG_PREFIX "Delivery %u, exchange %.*s, routingkey %.*s, callback queue: %s " + "\n", + (unsigned) envelope.delivery_tag, (int) envelope.exchange.len, + (char *) envelope.exchange.bytes, (int) envelope.routing_key.len, + (char *) envelope.routing_key.bytes, + (char *) envelope.message.properties.reply_to.bytes); + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + ddprintf(MSG_PREFIX "Content-type: %.*s\n", + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); + } + + /* Message body format: transacton | mwm */ + memcpy(trytes, envelope.message.body.bytes, TRANSACTION_TRYTES_LENGTH); + memcpy(buf, envelope.message.body.bytes + TRANSACTION_TRYTES_LENGTH, 4); + mwm = strtol(buf, NULL, 10); + + ddprintf(MSG_PREFIX "Doing PoW with mwm = %d...\n", mwm); + + int8_t *ret_trytes = dcurl_entry((int8_t *) trytes, mwm, 0); + memset(buf, '0', sizeof(buf)); + ddprintf(MSG_PREFIX "PoW is done\n"); + + if (!acknowledge_broker(&conn, 1, &envelope)) + goto fail; + ddprintf(MSG_PREFIX "Sending an ack is done\n"); + + /* Publish a message of remote PoW result */ + if (!publish_message( + &conn, 1, + (char *) envelope.message.properties.reply_to.bytes, + (char *) ret_trytes)) + goto fail; + + free(ret_trytes); + amqp_destroy_envelope(&envelope); + ddprintf(MSG_PREFIX "Publishing PoW result to callback queue is done\n"); + ddprintf(MSG_PREFIX "---\n"); + } + +fail: + dcurl_destroy(); + + return -1; +}