From 5f4648747b3fd9be9f5965512695c9c0400658db Mon Sep 17 00:00:00 2001 From: marktwtn Date: Sat, 23 Mar 2019 03:05:09 +0800 Subject: [PATCH 01/19] perf: Optimize trytes from trits with x86 SIMD Without SIMD optimization: Input size(byte) - Average time(nsec) 81 - 406.3 243 - 444.6 With SIMD optimization: Input size(byte) - Average time(nsec) 81 - 261.5 243 - 162.7 Hardware information: architecture - x86_64 CPU - AMD Ryzen 5 2400G Related #92. --- src/trinary.c | 4 + src/trinary_sse42.h | 175 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+) diff --git a/src/trinary.c b/src/trinary.c index 9e2b8b2..d9866d0 100644 --- a/src/trinary.c +++ b/src/trinary.c @@ -133,6 +133,9 @@ Trobject_t *trytes_from_trits(Trobject_t *trits) return NULL; } +#if defined(__SSE4_2__) + return trytes_from_trits_sse42(trits); +#else Trobject_t *trytes = NULL; int8_t *src = (int8_t *) malloc(trits->len / 3); @@ -150,6 +153,7 @@ Trobject_t *trytes_from_trits(Trobject_t *trits) free(src); return trytes; +#endif } Trobject_t *trits_from_trytes(Trobject_t *trytes) diff --git a/src/trinary_sse42.h b/src/trinary_sse42.h index 8d47159..50d32a0 100644 --- a/src/trinary_sse42.h +++ b/src/trinary_sse42.h @@ -2,8 +2,28 @@ #define TRINARY_SSE42_H_ #include +#include "constants.h" #define BLOCK_8BIT(type) (sizeof(type) / sizeof(int8_t)) +#define COMMA0 +#define COMMA1 , +#define COMMA(x) COMMA##x +#define INDEX_3DIFF_0F 0x00, 0x03, 0x06, 0x09, 0x0C, 0x0F +#define INDEX_3DIFF_1D 0x01, 0x04, 0x07, 0x0A, 0x0D +#define INDEX_3DIFF_2E 0x02, 0x05, 0x08, 0x0B, 0x0E +#define REPEAT0(str) +#define REPEAT1(str) str +#define REPEAT2(str) REPEAT1(str), str +#define REPEAT3(str) REPEAT2(str), str +#define REPEAT4(str) REPEAT3(str), str +#define REPEAT5(str) REPEAT4(str), str +#define REPEAT6(str) REPEAT5(str), str +#define REPEAT7(str) REPEAT6(str), str +#define REPEAT8(str) REPEAT7(str), str +#define REPEAT9(str) REPEAT8(str), str +#define REPEAT10(str) REPEAT9(str), str +#define REPEAT11(str) REPEAT10(str), str +#define REPEAT(n, str) REPEAT##n(str) static inline bool validateTrits_sse42(Trobject_t *trits) { @@ -72,4 +92,159 @@ static inline bool validateTrytes_sse42(Trobject_t *trytes) return true; } +static inline Trobject_t *trytes_from_trits_sse42(Trobject_t *trits) +{ + Trobject_t *trytes = NULL; + int8_t *src = (int8_t *) malloc(trits->len / 3); + + const int block_8bit = BLOCK_8BIT(__m128i); + const int8_t setMSB = 0x80; + const __m128i tryteAlphabet[2] = { + _mm_setr_epi8(TryteAlphabet[0], TryteAlphabet[1], TryteAlphabet[2], + TryteAlphabet[3], TryteAlphabet[4], TryteAlphabet[5], + TryteAlphabet[6], TryteAlphabet[7], TryteAlphabet[8], + TryteAlphabet[9], TryteAlphabet[10], TryteAlphabet[11], + TryteAlphabet[12], TryteAlphabet[13], TryteAlphabet[14], + TryteAlphabet[15]), + _mm_setr_epi8(TryteAlphabet[16], TryteAlphabet[17], TryteAlphabet[18], + TryteAlphabet[19], TryteAlphabet[20], TryteAlphabet[21], + TryteAlphabet[22], TryteAlphabet[23], TryteAlphabet[24], + TryteAlphabet[25], TryteAlphabet[26], 0, 0, 0, 0, 0)}; + /* For shuffling the bytes of the input trits */ + const __m128i shuffleLow[3] = { + _mm_setr_epi8(REPEAT(0, setMSB) COMMA(0) INDEX_3DIFF_0F COMMA(1) + REPEAT(10, setMSB)), + _mm_setr_epi8(REPEAT(6, setMSB) COMMA(1) INDEX_3DIFF_2E COMMA(1) + REPEAT(5, setMSB)), + _mm_setr_epi8(REPEAT(11, setMSB) COMMA(1) INDEX_3DIFF_1D COMMA(0) + REPEAT(0, setMSB))}; + const __m128i shuffleMid[3] = { + _mm_setr_epi8(REPEAT(0, setMSB) COMMA(0) INDEX_3DIFF_1D COMMA(1) + REPEAT(11, setMSB)), + _mm_setr_epi8(REPEAT(5, setMSB) COMMA(1) INDEX_3DIFF_0F COMMA(1) + REPEAT(5, setMSB)), + _mm_setr_epi8(REPEAT(11, setMSB) COMMA(1) INDEX_3DIFF_2E COMMA(0) + REPEAT(0, setMSB))}; + const __m128i shuffleHigh[3] = { + _mm_setr_epi8(REPEAT(0, setMSB) COMMA(0) INDEX_3DIFF_2E COMMA(1) + REPEAT(11, setMSB)), + _mm_setr_epi8(REPEAT(5, setMSB) COMMA(1) INDEX_3DIFF_1D COMMA(1) + REPEAT(6, setMSB)), + _mm_setr_epi8(REPEAT(10, setMSB) COMMA(1) INDEX_3DIFF_0F COMMA(0) + REPEAT(0, setMSB))}; + /* The mask with interleaved bytes of 0xFF and 0x00 */ + const __m128i byteInterMask = + _mm_set_epi32(0xFF00FF00, 0xFF00FF00, 0xFF00FF00, 0xFF00FF00); + + /* Start converting */ + for (int i = 0; i < trits->len / 3 / block_8bit; i++) { + /* Get trit data */ + __m128i dataFirst = _mm_loadu_si128((__m128i *) (trits->data) + i * 3); + __m128i dataMid = + _mm_loadu_si128((__m128i *) (trits->data) + i * 3 + 1); + __m128i dataLast = + _mm_loadu_si128((__m128i *) (trits->data) + i * 3 + 2); + /* + * Each block represents a trit. + * shuffle + * ---------------- ------ ------ ------ ------ + * dataFirst = | a1 | a2 | a3 | ...... | f1 | lowTrit = | a1 | ... | f1 | ... | p1 | + * ---------------- ------ ------ ------ ------ + * ---------------- ------ ------ ------ ------ + * dataMid = | f2 | f3 | g1 | ...... | k2 | => midTrit = | a2 | ... | f2 | ... | p2 | + * ---------------- ------ ------ ------ ------ + * ---------------- ------ ------ ------ ------ + * dataLast = | k3 | l1 | l2 | ...... | p3 | highTrit = | a3 | ... | f3 | ... | p3 | + * ---------------- ------ ------ ------ ------ + */ + __m128i lowTrit = _mm_or_si128( + _mm_shuffle_epi8(dataFirst, shuffleLow[0]), + _mm_or_si128(_mm_shuffle_epi8(dataMid, shuffleLow[1]), + _mm_shuffle_epi8(dataLast, shuffleLow[2]))); + __m128i midTrit = _mm_or_si128( + _mm_shuffle_epi8(dataFirst, shuffleMid[0]), + _mm_or_si128(_mm_shuffle_epi8(dataMid, shuffleMid[1]), + _mm_shuffle_epi8(dataLast, shuffleMid[2]))); + __m128i highTrit = _mm_or_si128( + _mm_shuffle_epi8(dataFirst, shuffleHigh[0]), + _mm_or_si128(_mm_shuffle_epi8(dataMid, shuffleHigh[1]), + _mm_shuffle_epi8(dataLast, shuffleHigh[2]))); + /* lowResult = (lowTrit) */ + __m128i lowResult = lowTrit; + /* midResult = (midTrit * 3) */ + __m128i midResult = _mm_or_si128( + _mm_and_si128( + byteInterMask, + _mm_mullo_epi16(_mm_and_si128(midTrit, byteInterMask), + _mm_set_epi16(0x0003, 0x0003, 0x0003, 0x0003, + 0x0003, 0x0003, 0x0003, 0x0003))), + _mm_andnot_si128( + byteInterMask, + _mm_mullo_epi16( + _mm_and_si128(midTrit, ~byteInterMask), + _mm_set_epi16(0x0003, 0x0003, 0x0003, 0x0003, 0x0003, + 0x0003, 0x0003, 0x0003)))); + /* highResult = (highTrit * 9) */ + __m128i highResult = _mm_or_si128( + _mm_and_si128( + byteInterMask, + _mm_mullo_epi16(_mm_and_si128(highTrit, byteInterMask), + _mm_set_epi16(0x0009, 0x0009, 0x0009, 0x0009, + 0x0009, 0x0009, 0x0009, 0x0009))), + _mm_andnot_si128( + byteInterMask, + _mm_mullo_epi16( + _mm_and_si128(highTrit, ~byteInterMask), + _mm_set_epi16(0x0009, 0x0009, 0x0009, 0x0009, 0x0009, + 0x0009, 0x0009, 0x0009)))); + /* alphabetOffset = (lowResult + midResult + highResult) */ + __m128i alphabetOffset = + _mm_add_epi8(lowResult, _mm_add_epi8(midResult, highResult)); + /* Check whether the offset is < 0 */ + __m128i tmpMask = + _mm_cmplt_epi8(alphabetOffset, _mm_set_epi32(0, 0, 0, 0)); + /* If the offset is < 0, then offset += 27 */ + __m128i alphabetOffsetAdd = _mm_add_epi8( + alphabetOffset, + _mm_set_epi32(0x1B1B1B1B, 0x1B1B1B1B, 0x1B1B1B1B, 0x1B1B1B1B)); + alphabetOffset = + _mm_or_si128(_mm_and_si128(tmpMask, alphabetOffsetAdd), + _mm_andnot_si128(tmpMask, alphabetOffset)); + + /* Assign tryte alphabet */ + /* If the offset is >= 16 (> 15), then the compared result byte = 0xFF, + * else = 0x00 */ + __m128i cmpResult = _mm_cmpgt_epi8( + alphabetOffset, _mm_set_epi8(15, 15, 15, 15, 15, 15, 15, 15, 15, 15, + 15, 15, 15, 15, 15, 15)); + /* Use the offset to get the correct tryte alphabet from tryteAlphabet[] + */ + __m128i resultLt = _mm_shuffle_epi8(tryteAlphabet[0], alphabetOffset); + __m128i resultGe = _mm_shuffle_epi8( + tryteAlphabet[1], + /* alphabetOffset - 16 */ + _mm_sub_epi8(alphabetOffset, + _mm_set_epi8(16, 16, 16, 16, 16, 16, 16, 16, 16, 16, + 16, 16, 16, 16, 16, 16))); + __m128i result = _mm_or_si128(_mm_andnot_si128(cmpResult, resultLt), + _mm_and_si128(cmpResult, resultGe)); + /* Store the tryte result */ + _mm_store_si128((__m128i *) (src + i * block_8bit), result); + } + for (int i = ((trits->len) / 3 / block_8bit) * block_8bit; + i < trits->len / 3; i++) { + int j = trits->data[i * 3] + trits->data[i * 3 + 1] * 3 + + trits->data[i * 3 + 2] * 9; + + if (j < 0) + j += 27; + src[i] = TryteAlphabet[j]; + } + + trytes = initTrytes(src, trits->len / 3); + free(src); + + return trytes; +} + #endif From 08b7539b0d49bbbb550bc471b1bc9dd3bfd35af2 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Mon, 6 May 2019 13:35:46 +0800 Subject: [PATCH 02/19] refactor: Remove redundant conditional compilation --- src/trinary.c | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/trinary.c b/src/trinary.c index d9866d0..d972105 100644 --- a/src/trinary.c +++ b/src/trinary.c @@ -35,12 +35,11 @@ static bool validateTrits(Trobject_t *trits) #if defined(__SSE4_2__) return validateTrits_sse42(trits); -#else +#endif for (int i = 0; i < trits->len; i++) if (trits->data[i] < -1 || trits->data[i] > 1) return false; return true; -#endif } static bool validateTrytes(Trobject_t *trytes) @@ -50,13 +49,12 @@ static bool validateTrytes(Trobject_t *trytes) #if defined(__SSE4_2__) return validateTrytes_sse42(trytes); -#else +#endif for (int i = 0; i < trytes->len; i++) if ((trytes->data[i] < 'A' || trytes->data[i] > 'Z') && trytes->data[i] != '9') return false; return true; -#endif } Trobject_t *initTrits(int8_t *src, int len) @@ -135,7 +133,7 @@ Trobject_t *trytes_from_trits(Trobject_t *trits) #if defined(__SSE4_2__) return trytes_from_trits_sse42(trits); -#else +#endif Trobject_t *trytes = NULL; int8_t *src = (int8_t *) malloc(trits->len / 3); @@ -153,7 +151,6 @@ Trobject_t *trytes_from_trits(Trobject_t *trits) free(src); return trytes; -#endif } Trobject_t *trits_from_trytes(Trobject_t *trytes) From 0638cc9b301cbda136a4b7bb824a89fcc4fc095e Mon Sep 17 00:00:00 2001 From: marktwtn Date: Mon, 6 May 2019 14:28:20 +0800 Subject: [PATCH 03/19] docs: Revise description of IRI adaptation --- README.md | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index f4076b2..a2b9a2e 100644 --- a/README.md +++ b/README.md @@ -30,15 +30,10 @@ After integrating dcurl into IRI, performance of [attachToTangle](https://iota.r [Modified IRI accepting external PoW Library](https://github.com/DLTcollab/iri) Supported IRI version: 1.7.0 -* ```$ cd ~/iri && mvn compile && mvn package``` -* ```$ java -jar target/iri.jar -p --pearldiver-exlib dcurl``` - -or with the **deprecated** commands - -* ```$ cd ~/iri && mvn compile && mvn package``` -* ```$ cp ~/dcurl/build/libdcurl.so ~/iri``` -* ```$ cd ~/iri && java -Djava.library.path=./ -jar target/iri.jar -p --pearldiver-exlib dcurl``` +Load the external dcurl shared library from the installed JAR file +* ```$ cd ~/iri && make check``` +* ```$ java -jar target/iri.jar -p --pearldiver-exlib dcurl``` ## Adoptions Here is a partial list of open source projects that have adopted dcurl. If you From 9da2888d002600179938ffd612fe9cbf585c52f8 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Tue, 7 May 2019 15:02:33 +0800 Subject: [PATCH 04/19] docs: Specify the JDK version For generating JNI related files for IRI with javah, the JDK version should be specified to 8. Close #88. --- docs/build-n-test.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/build-n-test.md b/docs/build-n-test.md index fe9551c..0578313 100644 --- a/docs/build-n-test.md +++ b/docs/build-n-test.md @@ -1,7 +1,7 @@ # Building and Testing ## Prerequisites -* Check JDK installation and set environment variable `JAVA_HOME` if you wish to specify. +* Check JDK 8 installation and set environment variable `JAVA_HOME` if you wish to specify. * If target platform lacks of Intel SSE instructions, multi-threaded pure C implementation would be used as the fallback. * Install the OpenCL and GPU driver before calculating the PoW with GPU. * For FPGA-based hardware accelerator, [Lampa Lab's Cyclone V FPGA PoW](https://github.com/LampaLab/iota_fpga) is taken as the basis. From 07fa4524015b5c48eaf482c78303758433824fb0 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Fri, 10 May 2019 18:49:07 +0800 Subject: [PATCH 05/19] docs: Add CMake build dependency --- docs/build-n-test.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/build-n-test.md b/docs/build-n-test.md index 0578313..04982b8 100644 --- a/docs/build-n-test.md +++ b/docs/build-n-test.md @@ -8,7 +8,7 @@ - File `soc_system.rbf` is only valid for DE10-nano board, and you have to synthesize to get appropriate `soc_system.rbf` for Arrow SoCKit board. - [RBF file](https://github.com/DLTcollab/iota_fpga/releases/tag/v0.3-sockit) can be downloaded from our release. - Moreover, you need to download [Lampa Lab-provided Linux image](https://github.com/LampaLab/iota_fpga/releases/tag/v0.1) to flash into the micro-SD card. The root password is `123456`. - +* Install `CMake` for building git submodules. ## Build Instructions * dcurl allows various combinations of build configurations to fit final use scenarios. From 3d26225836ccfee0597df138e7a7f028bc98b9d5 Mon Sep 17 00:00:00 2001 From: Jim Huang Date: Fri, 10 May 2019 22:01:22 +0800 Subject: [PATCH 06/19] Drop remotepow since it is inactive --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index a2b9a2e..0857fec 100644 --- a/README.md +++ b/README.md @@ -63,8 +63,6 @@ pull requests to dcurl. it faciliates dcurl to perform hardware-accelerated PoW operations on edge devices. -5. [remotepow](https://github.com/tylerw1369/remotepow/tree/Dcurl): delegate PoW to remote servers - ## Licensing `dcurl` is freely redistributable under the MIT License. From a5283187a06f40fa16938d6a95f5a075af0acd59 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Mon, 13 May 2019 17:02:22 +0800 Subject: [PATCH 07/19] fix: Correct the variable name --- mk/submodule.mk | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mk/submodule.mk b/mk/submodule.mk index c867fcb..8be3d84 100644 --- a/mk/submodule.mk +++ b/mk/submodule.mk @@ -17,7 +17,7 @@ endif LIBTUV_PATH = deps/libtuv LIBTUV_INCLUDE := -I $(LIBTUV_PATH)/include LIBTUV_PLATFORM := $(UNAME_M)-$(UNAME_S) -LIBTUV_BOARD := $(BUILD_BOARD) +LIBTUV_BOARD := $(BOARD) # PIC (Position-Independent-Code) library LIBTUV_LIBRARY := $(LIBTUV_PATH)/build/$(LIBTUV_PLATFORM)/release/lib/libtuv.o From 791e93bba031f2f28f5b3c672568a9619ca88518 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Tue, 14 May 2019 14:33:38 +0800 Subject: [PATCH 08/19] fix: Select suitable compiler on DE10-nano board The compiler of the board DE10-nano on arm-linux environment is different from the default compiler assigned by libtuv. The commit fix the problem and allow libtuv to detect and use the suitable compiler. Close #150. --- deps/libtuv | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/libtuv b/deps/libtuv index e2f7bff..9cd1daa 160000 --- a/deps/libtuv +++ b/deps/libtuv @@ -1 +1 @@ -Subproject commit e2f7bffda59f70fcdccc9e5e8fa37ce2a8c3ffdb +Subproject commit 9cd1daaab0b1e491683a271aff582937bb376977 From cb2d3b296e65d12d729e4cebc4e0e6a0bab96de1 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Tue, 14 May 2019 15:36:06 +0800 Subject: [PATCH 09/19] perf: Optimize trits from trytes with x86 SIMD Without SIMD optimization Input size(byte) - Average time(nsec) 81 - 355.6 2592 - 5752.3 2673 - 6273.0 With SIMD optimization Input size(byte) - Average time(nsec) 81 - 167.1 2592 - 1751.8 2673 - 2098.8 Hardware information architecture - x86_64 CPU - AMD Ryzen 5 2400G Close #92. --- src/trinary.c | 3 + src/trinary_sse42.h | 197 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 200 insertions(+) diff --git a/src/trinary.c b/src/trinary.c index d972105..ac12e87 100644 --- a/src/trinary.c +++ b/src/trinary.c @@ -163,6 +163,9 @@ Trobject_t *trits_from_trytes(Trobject_t *trytes) return NULL; } +#if defined(__SSE4_2__) + return trits_from_trytes_sse42(trytes); +#endif Trobject_t *trits = NULL; int8_t *src = (int8_t *) malloc(trytes->len * 3); diff --git a/src/trinary_sse42.h b/src/trinary_sse42.h index 50d32a0..5c60c19 100644 --- a/src/trinary_sse42.h +++ b/src/trinary_sse42.h @@ -247,4 +247,201 @@ static inline Trobject_t *trytes_from_trits_sse42(Trobject_t *trits) return trytes; } +static inline Trobject_t *trits_from_trytes_sse42(Trobject_t *trytes) +{ + Trobject_t *trits = NULL; + int8_t *src = (int8_t *) malloc(trytes->len * 3); + + const int block_8bit = BLOCK_8BIT(__m128i); + /* For setting the most significant bit of a byte */ + const int8_t setMSB = 0x80; + static int8_t TrytesToTritsMappings[][3] = { + {0, 0, 0}, {1, 0, 0}, {-1, 1, 0}, {0, 1, 0}, {1, 1, 0}, + {-1, -1, 1}, {0, -1, 1}, {1, -1, 1}, {-1, 0, 1}, {0, 0, 1}, + {1, 0, 1}, {-1, 1, 1}, {0, 1, 1}, {1, 1, 1}, {-1, -1, -1}, + {0, -1, -1}, {1, -1, -1}, {-1, 0, -1}, {0, 0, -1}, {1, 0, -1}, + {-1, 1, -1}, {0, 1, -1}, {1, 1, -1}, {-1, -1, 0}, {0, -1, 0}, + {1, -1, 0}, {-1, 0, 0}}; + /* The set and range for indicating the trits value (0, 1, -1) + * of the corresponding trytes */ + /* '9', 'C', 'F', 'I', 'L', 'O', 'R', 'U', 'X' */ + const char *setLowTrit0 = "9CFILORUX"; + /* 'A', 'D', 'G', 'J', 'M', 'P', 'S', 'V', 'Y' */ + const char *setLowTritP1 = "ADGJMPSVY"; + /* 'B', 'E', 'H', 'K', 'N', 'Q', 'T', 'W', 'Z' */ + const char *setLowTritN1 = "BEHKNQTWZ"; + /* '9', 'A', 'H', 'I', 'J', 'Q', 'R', 'S', 'Z' */ + const char *rangeMidTrit0 = "99AAHJQSZZ"; + /* 'B', 'C', 'D', 'K', 'L', 'M', 'T', 'U', 'V' */ + const char *rangeMidTritP1 = "BDKMTV"; + /* 'E', 'F', 'G', 'N', 'O', 'P', 'W', 'X', 'Y' */ + const char *rangeMidTritN1 = "EGNPWY"; + /* '9', 'A', 'B', 'C', 'D', 'W', 'X', 'Y', 'Z' */ + const char *rangeHighTrit0 = "99ADWZ"; + /* 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M' */ + const char *rangeHighTritP1 = "EM"; + /* 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V' */ + const char *rangeHighTritN1 = "NV"; + /* Convert the char array to the 128-bit data */ + const __m128i patternLowTrit0 = _mm_loadu_si128((__m128i *) (setLowTrit0)); + const __m128i patternLowTritP1 = + _mm_loadu_si128((__m128i *) (setLowTritP1)); + const __m128i patternLowTritN1 = + _mm_loadu_si128((__m128i *) (setLowTritN1)); + const __m128i patternMidTrit0 = + _mm_loadu_si128((__m128i *) (rangeMidTrit0)); + const __m128i patternMidTritP1 = + _mm_loadu_si128((__m128i *) (rangeMidTritP1)); + const __m128i patternMidTritN1 = + _mm_loadu_si128((__m128i *) (rangeMidTritN1)); + const __m128i patternHighTrit0 = + _mm_loadu_si128((__m128i *) (rangeHighTrit0)); + const __m128i patternHighTritP1 = + _mm_loadu_si128((__m128i *) (rangeHighTritP1)); + const __m128i patternHighTritN1 = + _mm_loadu_si128((__m128i *) (rangeHighTritN1)); + /* The 128-bit data with the repeated same bytes */ + const __m128i posOne = _mm_set1_epi8(1); + const __m128i negOne = _mm_set1_epi8(-1); + const __m128i zero = _mm_set1_epi8(0); + /* For shuffling the bytes of the trits transformed from the input trytes */ + const __m128i shuffleFirst[3] = { + _mm_setr_epi8(0x00, REPEAT2(setMSB), 0x01, REPEAT2(setMSB), 0x02, + REPEAT2(setMSB), 0x03, REPEAT2(setMSB), 0x04, + REPEAT2(setMSB), 0x05), + _mm_setr_epi8(REPEAT1(setMSB), 0x00, REPEAT2(setMSB), 0x01, + REPEAT2(setMSB), 0x02, REPEAT2(setMSB), 0x03, + REPEAT2(setMSB), 0x04, REPEAT2(setMSB)), + _mm_setr_epi8(REPEAT2(setMSB), 0x00, REPEAT2(setMSB), 0x01, + REPEAT2(setMSB), 0x02, REPEAT2(setMSB), 0x03, + REPEAT2(setMSB), 0x04, REPEAT1(setMSB))}; + const __m128i shuffleMid[3] = { + _mm_setr_epi8(REPEAT2(setMSB), 0x06, REPEAT2(setMSB), 0x07, + REPEAT2(setMSB), 0x08, REPEAT2(setMSB), 0x09, + REPEAT2(setMSB), 0x0A, REPEAT1(setMSB)), + _mm_setr_epi8(0x05, REPEAT2(setMSB), 0x06, REPEAT2(setMSB), 0x07, + REPEAT2(setMSB), 0x08, REPEAT2(setMSB), 0x09, + REPEAT2(setMSB), 0x0A), + _mm_setr_epi8(REPEAT1(setMSB), 0x05, REPEAT2(setMSB), 0x06, + REPEAT2(setMSB), 0x07, REPEAT2(setMSB), 0x08, + REPEAT2(setMSB), 0x09, REPEAT2(setMSB))}; + const __m128i shuffleLast[3] = { + _mm_setr_epi8(REPEAT1(setMSB), 0x0B, REPEAT2(setMSB), 0x0C, + REPEAT2(setMSB), 0x0D, REPEAT2(setMSB), 0x0E, + REPEAT2(setMSB), 0x0F, REPEAT2(setMSB)), + _mm_setr_epi8(REPEAT2(setMSB), 0x0B, REPEAT2(setMSB), 0x0C, + REPEAT2(setMSB), 0x0D, REPEAT2(setMSB), 0x0E, + REPEAT2(setMSB), 0x0F, REPEAT1(setMSB)), + _mm_setr_epi8(0x0A, REPEAT2(setMSB), 0x0B, REPEAT2(setMSB), 0x0C, + REPEAT2(setMSB), 0x0D, REPEAT2(setMSB), 0x0E, + REPEAT2(setMSB), 0x0F)}; + + /* Start converting */ + /* The for loop handles the group of the 128-bit characters without the + * end-of-string */ + for (int i = 0; i < (trytes->len) / block_8bit; i++) { + /* Get tryte data */ + __m128i data = _mm_loadu_si128((__m128i *) (trytes->data) + i); + + /* The masks for setting the corresponding trits */ + __m128i maskLowTrit0 = _mm_cmpistrm( + patternLowTrit0, data, + /* Signed byte comparison */ + _SIDD_SBYTE_OPS | + /* Compare with the character set */ + _SIDD_CMP_EQUAL_ANY | + /* Expand the corrsponding bit result to byte unit */ + _SIDD_UNIT_MASK); + __m128i maskLowTritP1 = _mm_cmpistrm( + patternLowTritP1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_EQUAL_ANY | _SIDD_UNIT_MASK); + __m128i maskLowTritN1 = _mm_cmpistrm( + patternLowTritN1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_EQUAL_ANY | _SIDD_UNIT_MASK); + __m128i maskMidTrit0 = _mm_cmpistrm( + patternMidTrit0, data, + /* Signed byte comparison */ + _SIDD_SBYTE_OPS | + /* Compare with the character range */ + _SIDD_CMP_RANGES | + /* Expand the corrsponding bit result to byte unit */ + _SIDD_UNIT_MASK); + __m128i maskMidTritP1 = + _mm_cmpistrm(patternMidTritP1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_RANGES | _SIDD_UNIT_MASK); + __m128i maskMidTritN1 = + _mm_cmpistrm(patternMidTritN1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_RANGES | _SIDD_UNIT_MASK); + __m128i maskHighTrit0 = + _mm_cmpistrm(patternHighTrit0, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_RANGES | _SIDD_UNIT_MASK); + __m128i maskHighTritP1 = + _mm_cmpistrm(patternHighTritP1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_RANGES | _SIDD_UNIT_MASK); + __m128i maskHighTritN1 = + _mm_cmpistrm(patternHighTritN1, data, + _SIDD_SBYTE_OPS | _SIDD_CMP_RANGES | _SIDD_UNIT_MASK); + + /* + * Each block represents a trit. + * shuffle + * ------ ------ ------ ---------------- ------ + * lowTrit = | a1 | ... | f1 | ... | p1 | dataFirst = | a1 | a2 | a3 | ...... | f1 | + * ------ ------ ------ ---------------- ------ + * ------ ------ ------ ---------------- ------ + * midTrit = | a2 | ... | f2 | ... | p2 | => dataMid = | f2 | f3 | g1 | ...... | k2 | + * ------ ------ ------ ---------------- ------ + * ------ ------ ------ ---------------- ------ + * highTrit = | a3 | ... | f3 | ... | p3 | dataLast = | k3 | l1 | l2 | ...... | p3 | + * ------ ------ ------ ---------------- ------ + */ + __m128i lowTrit = + _mm_or_si128(_mm_and_si128(maskLowTrit0, zero), + _mm_or_si128(_mm_and_si128(maskLowTritP1, posOne), + _mm_and_si128(maskLowTritN1, negOne))); + __m128i midTrit = + _mm_or_si128(_mm_and_si128(maskMidTrit0, zero), + _mm_or_si128(_mm_and_si128(maskMidTritP1, posOne), + _mm_and_si128(maskMidTritN1, negOne))); + __m128i highTrit = + _mm_or_si128(_mm_and_si128(maskHighTrit0, zero), + _mm_or_si128(_mm_and_si128(maskHighTritP1, posOne), + _mm_and_si128(maskHighTritN1, negOne))); + /* Initialize with 0 */ + __m128i dataFirst = _mm_set1_epi8(0); + __m128i dataMid = _mm_set1_epi8(0); + __m128i dataLast = _mm_set1_epi8(0); + dataFirst = _mm_or_si128( + _mm_shuffle_epi8(lowTrit, shuffleFirst[0]), + _mm_or_si128(_mm_shuffle_epi8(midTrit, shuffleFirst[1]), + _mm_shuffle_epi8(highTrit, shuffleFirst[2]))); + dataMid = _mm_or_si128( + _mm_shuffle_epi8(lowTrit, shuffleMid[0]), + _mm_or_si128(_mm_shuffle_epi8(midTrit, shuffleMid[1]), + _mm_shuffle_epi8(highTrit, shuffleMid[2]))); + dataLast = _mm_or_si128( + _mm_shuffle_epi8(lowTrit, shuffleLast[0]), + _mm_or_si128(_mm_shuffle_epi8(midTrit, shuffleLast[1]), + _mm_shuffle_epi8(highTrit, shuffleLast[2]))); + + /* Store the 3 * 128-bit trits converted from trytes */ + _mm_store_si128((__m128i *) (src + (3 * i) * block_8bit), dataFirst); + _mm_store_si128((__m128i *) (src + (3 * i + 1) * block_8bit), dataMid); + _mm_store_si128((__m128i *) (src + (3 * i + 2) * block_8bit), dataLast); + } + /* The rest of the trytes */ + for (int i = (trytes->len / block_8bit) * block_8bit; i < trytes->len; + i++) { + int idx = (trytes->data[i] == '9') ? 0 : trytes->data[i] - 'A' + 1; + for (int j = 0; j < 3; j++) { + src[i * 3 + j] = TrytesToTritsMappings[idx][j]; + } + } + + trits = initTrits(src, trytes->len * 3); + free(src); + + return trits; +} + #endif From 4f0c94d72ebd7bc53a24358e01eda67f9810b305 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Mon, 13 May 2019 18:01:12 +0800 Subject: [PATCH 10/19] refactor: Simplification of output debug messages Close #153. --- src/common.h | 11 +++++++++++ src/implcontext.c | 6 ++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/common.h b/src/common.h index 87d5005..d064dc5 100644 --- a/src/common.h +++ b/src/common.h @@ -1,7 +1,9 @@ #ifndef COMMON_H_ #define COMMON_H_ +#include #include +#include #include #define __DCURL_MAJOR__ 0 @@ -10,6 +12,15 @@ double diff_in_second(struct timespec t1, struct timespec t2); +static inline void ddprintf(const char *format, ...) { +#if defined(ENABLE_DEBUG) + va_list ap; + va_start(ap, format); + vprintf(format, ap); + va_end(ap); +#endif +} + typedef struct _pow_info PoW_Info; struct _pow_info { diff --git a/src/implcontext.c b/src/implcontext.c index d09dbc3..711f884 100644 --- a/src/implcontext.c +++ b/src/implcontext.c @@ -14,12 +14,10 @@ bool registerImplContext(ImplContext *impl_ctx) bool initializeImplContext(ImplContext *impl_ctx) { bool res = impl_ctx->initialize(impl_ctx); -#if defined(ENABLE_DEBUG) if (res) { - printf(MSG_PREFIX "Implementation %s is initialized successfully\n", - impl_ctx->description); + ddprintf(MSG_PREFIX "Implementation %s is initialized successfully\n", + impl_ctx->description); } -#endif return res; } From c3dce38bd812c0c0c747c6c0e80c389505c9cf32 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Mon, 27 May 2019 11:27:07 +0800 Subject: [PATCH 11/19] feat: Make dcurl set the threadpool size of libtuv In the initialization phase of dcurl, the threadpool size is preset to the (maximum processor - 1) with the new libtuv API uv_set_threadpool_size(). Then the threadpool would be initialized in the first call of dcurl_entry() with the preset size. Close #149. --- deps/libtuv | 2 +- src/pow_avx.c | 1 + src/pow_c.c | 1 + src/pow_sse.c | 1 + 4 files changed, 4 insertions(+), 1 deletion(-) diff --git a/deps/libtuv b/deps/libtuv index 9cd1daa..4f51adb 160000 --- a/deps/libtuv +++ b/deps/libtuv @@ -1 +1 @@ -Subproject commit 9cd1daaab0b1e491683a271aff582937bb376977 +Subproject commit 4f51adbd57db2aa8a497462cf92f1d08fa8b52fc diff --git a/src/pow_avx.c b/src/pow_avx.c index 25a67f8..1feb72b 100644 --- a/src/pow_avx.c +++ b/src/pow_avx.c @@ -607,6 +607,7 @@ static bool PoWAVX_Context_Initialize(ImplContext *impl_ctx) impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1; uv_loop_init(&ctx[i].loop); } + uv_set_threadpool_size(nproc); impl_ctx->context = ctx; uv_mutex_init(&impl_ctx->lock); return true; diff --git a/src/pow_c.c b/src/pow_c.c index 5c44e26..640b659 100644 --- a/src/pow_c.c +++ b/src/pow_c.c @@ -371,6 +371,7 @@ static bool PoWC_Context_Initialize(ImplContext *impl_ctx) impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1; uv_loop_init(&ctx[i].loop); } + uv_set_threadpool_size(nproc); impl_ctx->context = ctx; uv_mutex_init(&impl_ctx->lock); return true; diff --git a/src/pow_sse.c b/src/pow_sse.c index b2e77e7..cec30d1 100644 --- a/src/pow_sse.c +++ b/src/pow_sse.c @@ -389,6 +389,7 @@ static bool PoWSSE_Context_Initialize(ImplContext *impl_ctx) impl_ctx->bitmap = impl_ctx->bitmap << 1 | 0x1; uv_loop_init(&ctx[i].loop); } + uv_set_threadpool_size(nproc); impl_ctx->context = ctx; uv_mutex_init(&impl_ctx->lock); return true; From 476d17af2b798c5fdaa0f60f541377d442ae0cd5 Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Tue, 21 May 2019 19:41:21 -0700 Subject: [PATCH 12/19] test: Add thread-safe test Resolved #151 --- Makefile | 3 +- tests/test-multi-pow.c | 108 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 tests/test-multi-pow.c diff --git a/Makefile b/Makefile index ea3d45f..bbc91b6 100644 --- a/Makefile +++ b/Makefile @@ -84,7 +84,8 @@ TESTS = \ trinary \ curl \ dcurl \ - pow + pow \ + multi-pow TESTS := $(addprefix $(OUT)/test-, $(TESTS)) diff --git a/tests/test-multi-pow.c b/tests/test-multi-pow.c new file mode 100644 index 0000000..f3d3efe --- /dev/null +++ b/tests/test-multi-pow.c @@ -0,0 +1,108 @@ +/* Test program for thread-safe dcurl */ +#include +#include "common.h" +#include "dcurl.h" + +#define THREAD_MAX 10 + +typedef struct _dcurl_item dcurl_item; +struct _dcurl_item { + int mwm; + int8_t input_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ + int8_t output_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ +}; + +void *dcurl_entry_cb(void *arg) +{ + dcurl_item *item = (dcurl_item *) arg; + /* test dcurl Implementation with mwm = 14 */ + int8_t *ret_trytes = + dcurl_entry(item->input_trytes, item->mwm, 0); + assert(ret_trytes && "dcurl_entry() failed"); + memcpy(item->output_trytes, ret_trytes, TRANSACTION_TRYTES_LENGTH); + free(ret_trytes); + + pthread_exit(NULL); +} + +int main() +{ + char *trytes = + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "99999999999999999A9RGRKVGWMWMKOLVMDFWJUHNUNYWZTJADGGPZGXNLERLXYWJE9WQH" + "WWBMCPZMVVMJUMWWBLZLNMLDCGDJ999999999999999999999999999999999999999999" + "999999999999YGYQIVD99999999999999999999TXEFLKNPJRBYZPORHZU9CEMFIFVVQBU" + "STDGSJCZMBTZCDTTJVUFPTCCVHHORPMGCURKTH9VGJIXUQJVHK99999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999999999999999999999999999999999999999999999999999999999999" + "9999999999999"; + + int mwm = 14; + pthread_t threads[THREAD_MAX]; + dcurl_item items[THREAD_MAX]; + + dcurl_init(); + + for (int i = 0; i < THREAD_MAX; i++) { + memcpy(items[i].input_trytes, trytes, TRANSACTION_TRYTES_LENGTH); + items[i].mwm = mwm; + pthread_create(&threads[i], NULL, dcurl_entry_cb, + (void *) &items[i]); + } + + for (int i = 0; i < THREAD_MAX; i++) + pthread_join(threads[i], NULL); + + for (int i = 0; i < THREAD_MAX; i++) { + Trytes_t *trytes_t = + initTrytes(items[i].output_trytes, TRANSACTION_TRYTES_LENGTH); + assert(trytes_t && "initTrytes() failed"); + Trytes_t *hash_trytes = hashTrytes(trytes_t); + assert(hash_trytes && "hashTrytes() failed"); + + /* Validation */ + Trits_t *ret_trits = trits_from_trytes(hash_trytes); + for (int j = 243 - 1; j >= 243 - items[i].mwm; j--) { + assert(ret_trits->data[j] == 0 && "Validation failed"); + } + + freeTrobject(trytes_t); + freeTrobject(hash_trytes); + freeTrobject(ret_trits); + } + + dcurl_destroy(); + + return 0; +} From 88a5cb63c0ba34d3ece00894058ce19fd0c75f6e Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Wed, 24 Apr 2019 15:56:08 -0700 Subject: [PATCH 13/19] feat: Remote interface interoperate distributed computing resources * make BUILD_REMOTE=1 to build libdcurl.so and remote-worker * make BUILD_REMOTE=1 check with RabbitMQ broker and remote-worker * RPC with exclusive callback queues with TTL property * AMQP connection management for multiple threads * Implement local fallback PoW when remote interface fails Related #137 --- .gitmodules | 3 + Makefile | 32 +++- deps/rabbitmq-c | 1 + mk/defs.mk | 3 + mk/remote.mk | 8 + mk/submodule.mk | 31 ++++ src/dcurl.c | 39 +++++ src/remote_common.c | 323 +++++++++++++++++++++++++++++++++++++++++ src/remote_common.h | 53 +++++++ src/remote_interface.c | 237 ++++++++++++++++++++++++++++++ src/remote_interface.h | 61 ++++++++ src/remote_worker.c | 77 ++++++++++ 12 files changed, 862 insertions(+), 6 deletions(-) create mode 160000 deps/rabbitmq-c create mode 100644 mk/remote.mk create mode 100644 src/remote_common.c create mode 100644 src/remote_common.h create mode 100644 src/remote_interface.c create mode 100644 src/remote_interface.h create mode 100644 src/remote_worker.c diff --git a/.gitmodules b/.gitmodules index db8e5f6..400b111 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "deps/libtuv"] path = deps/libtuv url = https://github.com/DLTcollab/libtuv.git +[submodule "deps/rabbitmq-c"] + path = deps/rabbitmq-c + url = https://github.com/alanxz/rabbitmq-c.git diff --git a/Makefile b/Makefile index bbc91b6..749d45e 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,10 @@ ifeq ("$(BUILD_FPGA_ACCEL)","1") CFLAGS += -DENABLE_FPGA_ACCEL endif +ifeq ("$(BUILD_REMOTE)","1") +CFLAGS += -DENABLE_REMOTE +endif + ifeq ("$(BUILD_JNI)","1") include mk/java.mk endif @@ -99,6 +103,9 @@ PREQ := config $(TESTS) $(LIBS) ifeq ("$(BUILD_JNI)","1") PREQ += $(JARS) endif +ifeq ("$(BUILD_REMOTE)", "1") +PREQ += $(OUT)/remote-worker +endif all: $(PREQ) .DEFAULT_GOAL := all @@ -140,23 +147,36 @@ OBJS += \ pow_fpga_accel.o endif +ifeq ("$(BUILD_REMOTE)", "1") +OBJS += \ + remote_common.o \ + remote_interface.o + +WORKER_OBJS := $(addprefix $(OUT)/worker-,$(filter-out remote_interface.o, $(OBJS))) +WORKER_CFLAGS := $(filter-out -DENABLE_REMOTE, $(CFLAGS)) +endif + OBJS := $(addprefix $(OUT)/, $(OBJS)) $(OUT)/test-%.o: tests/test-%.c $(LIBTUV_PATH)/include $(VECHO) " CC\t$@\n" $(Q)$(CC) -o $@ $(CFLAGS) -I $(SRC) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $< -$(OUT)/%.o: $(SRC)/%.c $(LIBTUV_PATH)/include +$(OUT)/%.o: $(SRC)/%.c $(LIBTUV_PATH)/include $(LIBRABBITMQ_PATH)/build/include $(VECHO) " CC\t$@\n" - $(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) -c -MMD -MF $@.d $< + $(Q)$(CC) -o $@ $(CFLAGS) $(LIBTUV_INCLUDE) $(LIBRABBITMQ_INCLUDE) -c -MMD -MF $@.d $< -$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY) +$(OUT)/test-%: $(OUT)/test-%.o $(OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) $(VECHO) " LD\t$@\n" - $(Q)$(CC) -o $@ $^ $(LDFLAGS) + $(Q)$(CC) -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) -$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY) +$(OUT)/libdcurl.so: $(OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) $(VECHO) " LD\t$@\n" - $(Q)$(CC) -shared -o $@ $^ $(LDFLAGS) + $(Q)$(CC) -shared -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) + +ifeq ("$(BUILD_REMOTE)", "1") +include mk/remote.mk +endif include mk/common.mk diff --git a/deps/rabbitmq-c b/deps/rabbitmq-c new file mode 160000 index 0000000..75a21e5 --- /dev/null +++ b/deps/rabbitmq-c @@ -0,0 +1 @@ +Subproject commit 75a21e51db5d70ea807473621141b4417d81b56f diff --git a/mk/defs.mk b/mk/defs.mk index 989ee08..deded44 100644 --- a/mk/defs.mk +++ b/mk/defs.mk @@ -19,6 +19,9 @@ BUILD_GPU ?= 0 # Build FPGA backend or not BUILD_FPGA_ACCEL ?= 0 +# Build facilities of remote procedure calls +BUILD_REMOTE ?= 0 + # Build JNI glue as the bridge between dcurl and IRI BUILD_JNI ?= 0 diff --git a/mk/remote.mk b/mk/remote.mk new file mode 100644 index 0000000..667f8e7 --- /dev/null +++ b/mk/remote.mk @@ -0,0 +1,8 @@ +# Build remote-worker +$(OUT)/worker-%.o: $(SRC)/%.c $(LIBTUV_PATH)/include $(LIBRABBITMQ_PATH)/build/include + $(VECHO) " CC\t$@\n" + $(Q)$(CC) -o $@ $(WORKER_CFLAGS) $(LIBTUV_INCLUDE) $(LIBRABBITMQ_INCLUDE) -c -MMD -MF $@.d $< + +$(OUT)/remote-worker: $(OUT)/remote_worker.o $(WORKER_OBJS) $(LIBTUV_LIBRARY) $(LIBRABBITMQ_LIBRARY) + $(VECHO) " LD\t$@\n" + $(Q)$(CC) -o $@ $^ $(LDFLAGS) $(LIBRABBITMQ_LINK) diff --git a/mk/submodule.mk b/mk/submodule.mk index 8be3d84..12c7164 100644 --- a/mk/submodule.mk +++ b/mk/submodule.mk @@ -26,3 +26,34 @@ $(LIBTUV_PATH)/include: $(LIBTUV_LIBRARY): $(MAKE) -C $(LIBTUV_PATH) TUV_BUILD_TYPE=release TUV_CREATE_PIC_LIB=yes TUV_PLATFORM=$(LIBTUV_PLATFORM) TUV_BOARD=$(LIBTUV_BOARD) + +# librabbitmq related variables +LIBRABBITMQ_PATH = deps/rabbitmq-c +LIBRABBITMQ_INCLUDE := -I $(LIBRABBITMQ_PATH)/build/include +LIBRABBITMQ_LIB_PATH := $(LIBRABBITMQ_PATH)/build/librabbitmq/ +ifeq ($(UNAME_S),darwin) + # macOS + LIBRABBITMQ_LINK := -Wl,-rpath,$(LIBRABBITMQ_LIB_PATH) -L$(LIBRABBITMQ_LIB_PATH) -lrabbitmq + LIBRABBITMQ_LIBRARY := $(LIBRABBITMQ_LIB_PATH)/librabbitmq.dylib +else + LIBRABBITMQ_LINK := -Wl,-rpath=$(LIBRABBITMQ_LIB_PATH) -L$(LIBRABBITMQ_LIB_PATH) -lrabbitmq + LIBRABBITMQ_LIBRARY := $(LIBRABBITMQ_LIB_PATH)/librabbitmq.so +endif + +$(LIBRABBITMQ_PATH)/build/include: + git submodule update --init $(LIBRABBITMQ_PATH) + mkdir $(LIBRABBITMQ_PATH)/build +ifeq ($(UNAME_S),darwin) + # macOS + cd $(LIBRABBITMQ_PATH)/build && \ + cmake -DOPENSSL_ROOT_DIR=/usr/local/opt/openssl/ -DCMAKE_INSTALL_PREFIX=. .. && \ + cmake --build . --target install +else + cd $(LIBRABBITMQ_PATH)/build && \ + cmake -DCMAKE_INSTALL_PREFIX=. .. && \ + cmake --build . --target install +endif + +$(LIBRABBITMQ_LIBRARY): + cd $(LIBRABBITMQ_PATH)/build && \ + cmake --build . diff --git a/src/dcurl.c b/src/dcurl.c index aca0dab..88235da 100644 --- a/src/dcurl.c +++ b/src/dcurl.c @@ -15,6 +15,9 @@ #if defined(ENABLE_FPGA_ACCEL) #include "pow_fpga_accel.h" #endif +#if defined(ENABLE_REMOTE) +#include "remote_interface.h" +#endif #include "implcontext.h" #include "trinary.h" #include "uv.h" @@ -48,6 +51,11 @@ extern ImplContext PoWCL_Context; extern ImplContext PoWFPGAAccel_Context; #endif +#if defined(ENABLE_REMOTE) +extern RemoteImplContext Remote_Context; +static uv_sem_t notify_remote; +#endif + bool dcurl_init() { bool ret = true; @@ -68,6 +76,11 @@ bool dcurl_init() ret &= registerImplContext(&PoWFPGAAccel_Context); #endif +#if defined(ENABLE_REMOTE) + ret &= initializeRemoteContext(&Remote_Context); + uv_sem_init(¬ify_remote, 0); +#endif + uv_sem_init(¬ify, 0); return isInitialized = ret; } @@ -77,6 +90,10 @@ void dcurl_destroy() ImplContext *impl = NULL; struct list_head *p; +#if defined(ENABLE_REMOTE) + destroyRemoteContext(&Remote_Context); +#endif + list_for_each (p, &IMPL_LIST) { impl = list_entry(p, ImplContext, list); destroyImplContext(impl); @@ -96,6 +113,28 @@ int8_t *dcurl_entry(int8_t *trytes, int mwm, int threads) if (!isInitialized) return NULL; +#if defined(ENABLE_REMOTE) + do { + if (enterRemoteContext(&Remote_Context)) { + pow_ctx = getRemoteContext(&Remote_Context, trytes, mwm); + goto remote_pow; + } + uv_sem_wait(¬ify_remote); + } while (1); + +remote_pow: + if (!doRemoteContext(&Remote_Context, pow_ctx)) { + goto local_pow; + } else { + res = getRemoteResult(&Remote_Context, pow_ctx); + } + freeRemoteContext(&Remote_Context, pow_ctx); + exitRemoteContext(&Remote_Context); + uv_sem_post(¬ify_remote); + return res; + +local_pow: +#endif do { list_for_each (p, &IMPL_LIST) { impl = list_entry(p, ImplContext, list); diff --git a/src/remote_common.c b/src/remote_common.c new file mode 100644 index 0000000..d9dd902 --- /dev/null +++ b/src/remote_common.c @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "remote_common.h" +#include +#include +#include "common.h" + +bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) +{ + switch (x.reply_type) { + case AMQP_RESPONSE_NORMAL: + return true; + + case AMQP_RESPONSE_NONE: + ddprintf("%s: missing RPC reply type!\n", context); + break; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + ddprintf("%s: %s\n", context, + amqp_error_string2(x.library_error)); + break; + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (x.reply.id) { + case AMQP_CONNECTION_CLOSE_METHOD: { + amqp_connection_close_t *m = + (amqp_connection_close_t *) x.reply.decoded; + ddprintf("%s: server connection error %uh, message: %.*s\n", + context, m->reply_code, (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + case AMQP_CHANNEL_CLOSE_METHOD: { + amqp_channel_close_t *m = (amqp_channel_close_t *) x.reply.decoded; + ddprintf("%s: server channel error %uh, message: %.*s\n", + context, m->reply_code, (int) m->reply_text.len, + (char *) m->reply_text.bytes); + break; + } + default: + ddprintf("%s: unknown server error, method id 0x%08X\n", + context, x.reply.id); + break; + } + break; + } + + return false; +} + +bool die_on_error(int x, char const *context) +{ + if (x < 0) { + ddprintf("%s: %s\n", context, amqp_error_string2(x)); + return false; + } + + return true; +} + +bool connect_broker(amqp_connection_state_t *conn) +{ + amqp_socket_t *socket = NULL; + + /* Connect to the rabbitmq broker */ + *conn = amqp_new_connection(); + socket = amqp_tcp_socket_new(*conn); + if (amqp_socket_open(socket, HOSTNAME, 5672) != AMQP_STATUS_OK) { + ddprintf("%s\n", "The rabbitmq broker is closed"); + goto destroy_connection; + } + + /* Login to the rabbitmq broker */ + if (!die_on_amqp_error(amqp_login(*conn, "/", AMQP_DEFAULT_MAX_CHANNELS, + AMQP_DEFAULT_FRAME_SIZE, 0, + AMQP_SASL_METHOD_PLAIN, "guest", "guest"), + "Logging in")) + goto connection_close; + + /* Open the channel in the rabbitmq broker */ + amqp_channel_open(*conn, 1); + if (!(die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Opennng the channel"))) + goto channel_close; + + return true; + +channel_close: + amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS); + +connection_close: + amqp_connection_close(*conn, AMQP_REPLY_SUCCESS); + +destroy_connection: + amqp_destroy_connection(*conn); + + return false; +} + +bool declare_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name) +{ + /* Declare a durable queue */ + amqp_queue_declare(*conn, channel, amqp_cstring_bytes(queue_name), 0, 1, 0, + 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Declaring the queue")) + return false; + + return true; +} + +bool declare_callback_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t *reply_to_queue) +{ + /* Declare a exclusive private queues with TTL = 10s */ + amqp_table_entry_t entries[1]; + amqp_table_t table; + entries[0].key = amqp_cstring_bytes("x-message-ttl"); + entries[0].value.kind = AMQP_FIELD_KIND_U32; + entries[0].value.value.u32 = 10 * 1000; // 10s + table.num_entries = 1; + table.entries = entries; + + amqp_queue_declare_ok_t *r = + amqp_queue_declare(*conn, channel, amqp_empty_bytes, 0, 0, 1, 0, table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Declaring the private queue with TTL = 10s")) + return false; + + *reply_to_queue = amqp_bytes_malloc_dup(r->queue); + return true; +} + +bool set_consuming_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name) +{ + amqp_basic_consume(*conn, channel, amqp_cstring_bytes(queue_name), + amqp_empty_bytes, 0, 0, 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Set the consuming queue")) + return false; + + return true; +} + +bool consume_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope) +{ + amqp_maybe_release_buffers(*conn); + /* Consume a message from the queue in the rabbitmq broker */ + if (!die_on_amqp_error(amqp_consume_message(*conn, envelope, NULL, 0), + "Consuming the message")) + return false; + + return true; +} + +bool wait_response_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t callback_queue, + char *frame_body, + int body_len) +{ + amqp_frame_t frame; +#if defined(ENABLE_DEBUG) + amqp_basic_deliver_t *d; + amqp_basic_properties_t *p; +#endif + size_t body_target; + size_t body_received; + + amqp_basic_consume(*conn, channel, callback_queue, amqp_empty_bytes, 0, 1, + 0, amqp_empty_table); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Set the callback queue")) + return false; + + /*Wait for three frames: AMQP_FRAME_METHOD, AMQP_FRAME_HEADER, + * AMQP_FRAME_BODY*/ + for (;;) { + amqp_maybe_release_buffers(*conn); + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame")) + return false; + + ddprintf(MSG_PREFIX "Frame type: %u channel: %u\n", frame.frame_type, frame.channel); + + if (frame.frame_type != AMQP_FRAME_METHOD) + continue; + + ddprintf(MSG_PREFIX "Method: %s\n", amqp_method_name(frame.payload.method.id)); + + if (frame.payload.method.id != AMQP_BASIC_DELIVER_METHOD) + continue; + +#if defined(ENABLE_DEBUG) + d = (amqp_basic_deliver_t *) frame.payload.method.decoded; + ddprintf(MSG_PREFIX "Delivery: %u exchange: %.*s routingkey: %.*s\n", + (unsigned) d->delivery_tag, (int) d->exchange.len, + (char *) d->exchange.bytes, (int) d->routing_key.len, + (char *) d->routing_key.bytes); +#endif + + amqp_maybe_release_buffers(*conn); + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame")) + return false; + + if (frame.frame_type != AMQP_FRAME_HEADER) { + ddprintf("Unexpected header!"); + return false; + } + +#if defined(ENABLE_DEBUG) + p = (amqp_basic_properties_t *) frame.payload.properties.decoded; + if (p->_flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + ddprintf(MSG_PREFIX "Content-type: %.*s\n", (int) p->content_type.len, + (char *) p->content_type.bytes); + } +#endif + ddprintf("---\n"); + + body_target = (size_t) frame.payload.properties.body_size; + body_received = 0; + while (body_received < body_target) { + amqp_simple_wait_frame(*conn, &frame); + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), + "Wait body frame")) + return false; + + if (frame.frame_type != AMQP_FRAME_BODY) { + ddprintf("Unexpected body"); + return false; + } + + body_received += frame.payload.body_fragment.len; + } + if (body_received != body_target) { + ddprintf("Received body is small than body target"); + return false; + } + + memcpy(frame_body, (char *) frame.payload.body_fragment.bytes, + body_len); + + ddprintf(MSG_PREFIX "PoW result: %.*s", (int) frame.payload.body_fragment.len, + (char *) frame.payload.body_fragment.bytes); + + /* everything was fine, we can quit now because we received the reply */ + return true; + } +} + +bool publish_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = AMQP_DELIVERY_PERSISTENT; + + /* Publish messages by default exchange */ + if (!die_on_error(amqp_basic_publish(*conn, channel, amqp_cstring_bytes(""), + amqp_cstring_bytes(queue_name), 0, 0, + &props, amqp_cstring_bytes(message)), + "Publish the message")) + return false; + + return true; +} + +bool publish_message_with_reply_to(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + amqp_bytes_t reply_to_queue, + char *message) +{ + amqp_basic_properties_t props; + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | + AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_REPLY_TO_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = AMQP_DELIVERY_PERSISTENT; + props.reply_to = amqp_bytes_malloc_dup(reply_to_queue); + + if (!die_on_error(amqp_basic_publish(*conn, channel, amqp_cstring_bytes(""), + amqp_cstring_bytes(queue_name), 0, 0, + &props, amqp_cstring_bytes(message)), + "Publishing the message with reply_to")) + return false; + + ddprintf(MSG_PREFIX "callback queue %s \n", (char *) props.reply_to.bytes); + amqp_bytes_free(props.reply_to); + + return true; +} + +bool acknowledge_broker(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope) +{ + /* Make sure a message is never lost */ + if (!die_on_error(amqp_basic_ack(*conn, channel, envelope->delivery_tag, 0), + "Acknowledging the broker")) + return false; + + return true; +} + +void disconnect_broker(amqp_connection_state_t *conn) +{ + amqp_channel_close(*conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close(*conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(*conn); +} diff --git a/src/remote_common.h b/src/remote_common.h new file mode 100644 index 0000000..a063601 --- /dev/null +++ b/src/remote_common.h @@ -0,0 +1,53 @@ +#ifndef REMOTE_COMMON_H_ +#define REMOTE_COMMON_H_ + +#include +#include +#include "amqp.h" +#include "amqp_tcp_socket.h" + +#define HOSTNAME "localhost" +#define MSG_PREFIX "[dcurl-remote] " + +bool connect_broker(amqp_connection_state_t *conn); + +bool declare_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name); + +bool declare_callback_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t *reply_to_queue); + +bool set_consuming_queue(amqp_connection_state_t *conn, + amqp_channel_t channel, + char const *queue_name); + +bool consume_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope); + +bool wait_response_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_bytes_t callback_queue, + char *frame_body, + int body_len); + +bool publish_message(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + char *message); + +bool publish_message_with_reply_to(amqp_connection_state_t *conn, + amqp_channel_t channel, + char *queue_name, + amqp_bytes_t reply_to_queue, + char *message); + +bool acknowledge_broker(amqp_connection_state_t *conn, + amqp_channel_t channel, + amqp_envelope_t *envelope); + +void disconnect_broker(amqp_connection_state_t *conn); + +#endif diff --git a/src/remote_interface.c b/src/remote_interface.c new file mode 100644 index 0000000..76f0c6f --- /dev/null +++ b/src/remote_interface.c @@ -0,0 +1,237 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "remote_interface.h" +#include +#include "trinary.h" + +bool initializeRemoteContext(RemoteImplContext *remote_ctx) +{ + bool res = remote_ctx->initialize(remote_ctx); + if (res) { + ddprintf(MSG_PREFIX "Implementation %s is initialized successfully\n", + remote_ctx->description); + } + return res; +} + +void destroyRemoteContext(RemoteImplContext *remote_ctx) +{ + return remote_ctx->destroy(remote_ctx); +} + +bool enterRemoteContext(RemoteImplContext *remote_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + if (remote_ctx->num_working_thread >= remote_ctx->num_max_thread) { + uv_mutex_unlock(&remote_ctx->lock); + return false; /* Access Failed */ + } + remote_ctx->num_working_thread++; + uv_mutex_unlock(&remote_ctx->lock); + return true; /* Access Success */ +} + +void *getRemoteContext(RemoteImplContext *remote_ctx, int8_t *trytes, int mwm) +{ + return remote_ctx->getPoWContext(remote_ctx, trytes, mwm); +} + +bool doRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->doThePoW(remote_ctx, pow_ctx); +} + +int8_t *getRemoteResult(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->getPoWResult(pow_ctx); +} + +bool freeRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + return remote_ctx->freePoWContext(remote_ctx, pow_ctx); +} + +void exitRemoteContext(RemoteImplContext *remote_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + remote_ctx->num_working_thread--; + uv_mutex_unlock(&remote_ctx->lock); +} + +bool PoWValidation(int8_t *output_trytes, int mwm) +{ + Trytes_t *trytes_t = initTrytes(output_trytes, TRANSACTION_TRYTES_LENGTH); + if (!trytes_t) { + ddprintf("PoW Validation: Initialization of Trytes fails\n"); + goto fail_to_inittrytes; + } + + Trytes_t *hash_trytes = hashTrytes(trytes_t); + if (!hash_trytes) { + ddprintf("PoW Validation: Hashing trytes fails\n"); + goto fail_to_hashtrytes; + } + + Trits_t *ret_trits = trits_from_trytes(hash_trytes); + for (int i = 243 - 1; i >= 243 - mwm; i--) { + if (ret_trits->data[i] != 0) { + ddprintf("PoW Validation fails\n"); + goto fail_to_validation; + } + } + + return true; + +fail_to_validation: + freeTrobject(ret_trits); + freeTrobject(hash_trytes); +fail_to_hashtrytes: + freeTrobject(trytes_t); +fail_to_inittrytes: + return false; +} + +static bool Remote_doPoW(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + char buf[4]; + char messagebody[TRANSACTION_TRYTES_LENGTH + 4]; + + amqp_bytes_t reply_to_queue; + + PoW_Remote_Context *ctx = (PoW_Remote_Context *) pow_ctx; + + /* Message body format: transacton | mwm */ + memcpy(messagebody, ctx->input_trytes, TRANSACTION_TRYTES_LENGTH); + sprintf(buf, "%d", ctx->mwm); + memcpy(messagebody + TRANSACTION_TRYTES_LENGTH, buf, 4); + + if (!declare_callback_queue(&remote_ctx->conn[ctx->indexOfContext], 1, &reply_to_queue)) + goto fail; + + if (!publish_message_with_reply_to(&remote_ctx->conn[ctx->indexOfContext], 1, + "incoming_queue", reply_to_queue, + messagebody)) + goto fail; + + if (!wait_response_message(&remote_ctx->conn[ctx->indexOfContext], 1, reply_to_queue, + (char *) (ctx->output_trytes), + TRANSACTION_TRYTES_LENGTH)) + goto fail; + + amqp_bytes_free(reply_to_queue); + + PoWValidation(ctx->output_trytes, ctx->mwm); + + return true; + +fail: + return false; +} + +static bool Remote_init(RemoteImplContext *remote_ctx) +{ + if (remote_ctx->num_max_thread <= 0) + goto fail_to_init; + + PoW_Remote_Context *ctx = (PoW_Remote_Context *) malloc( + sizeof(PoW_Remote_Context) * remote_ctx->num_max_thread); + + memset(remote_ctx->slots, 0, remote_ctx->num_max_thread * sizeof(bool)); + + for(int i = 0 ; i < CONN_MAX; i++) + { + if (!connect_broker(&remote_ctx->conn[i])) + goto fail_to_init; + } + if (!declare_queue(&remote_ctx->conn[0], 1, "incoming_queue")) + goto fail_to_init; + + remote_ctx->context = ctx; + + uv_mutex_init(&remote_ctx->lock); + + return true; + +fail_to_init: + return false; +} + +static void Remote_destroy(RemoteImplContext *remote_ctx) +{ + PoW_Remote_Context *ctx = (PoW_Remote_Context *) remote_ctx->context; + + for(int i = 0; i < CONN_MAX; i++) + disconnect_broker(&remote_ctx->conn[i]); + + free(ctx); +} + +static void *Remote_getPoWContext(RemoteImplContext *remote_ctx, + int8_t *trytes, + int mwm) +{ + uv_mutex_lock(&remote_ctx->lock); + + for (int i = 0; i < remote_ctx->num_max_thread; i++) { + if (!remote_ctx->slots[i]) { + remote_ctx->slots[i] = true; + + uv_mutex_unlock(&remote_ctx->lock); + PoW_Remote_Context *ctx = + remote_ctx->context + sizeof(PoW_Remote_Context) * i; + memcpy(ctx->input_trytes, trytes, TRANSACTION_TRYTES_LENGTH); + ctx->mwm = mwm; + ctx->indexOfContext = i; + + return ctx; + } + } + + uv_mutex_unlock(&remote_ctx->lock); + + return NULL; /* It should not happen */ +} + +static bool Remote_freePoWContext(RemoteImplContext *remote_ctx, void *pow_ctx) +{ + uv_mutex_lock(&remote_ctx->lock); + + remote_ctx->slots[((PoW_Remote_Context *) pow_ctx)->indexOfContext] = false; + + uv_mutex_unlock(&remote_ctx->lock); + + return true; +} + +static int8_t *Remote_getPoWResult(void *pow_ctx) +{ + int8_t *ret = (int8_t *) malloc(sizeof(int8_t) * TRANSACTION_TRYTES_LENGTH); + if (!ret) + return NULL; + memcpy(ret, ((PoW_Remote_Context *) pow_ctx)->output_trytes, + TRANSACTION_TRYTES_LENGTH); + return ret; +} + +static PoW_Info Remote_getPoWInfo(void *pow_ctx) +{ + return ((PoW_Remote_Context *) pow_ctx)->pow_info; +} + +RemoteImplContext Remote_Context = { + .context = NULL, + .description = "Remote interface", + .num_max_thread = CONN_MAX, // 1 <= num_max_thread + .num_working_thread = 0, + .initialize = Remote_init, + .destroy = Remote_destroy, + .getPoWContext = Remote_getPoWContext, + .freePoWContext = Remote_freePoWContext, + .doThePoW = Remote_doPoW, + .getPoWResult = Remote_getPoWResult, + .getPoWInfo = Remote_getPoWInfo, +}; diff --git a/src/remote_interface.h b/src/remote_interface.h new file mode 100644 index 0000000..0851984 --- /dev/null +++ b/src/remote_interface.h @@ -0,0 +1,61 @@ +#ifndef REMOTE_INTERFACE_H_ +#define REMOTE_INTERFACE_H_ + +#include +#include +#include "common.h" +#include "constants.h" +#include "remote_common.h" +#include "uv.h" + +#define CONN_MAX 20 + +typedef struct _pow_remote_context PoW_Remote_Context; +typedef struct _remote_impl_context RemoteImplContext; + +struct _pow_remote_context { + /* Thread management */ + int indexOfContext; + /* Arguments of PoW */ + int8_t input_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ + int8_t output_trytes[TRANSACTION_TRYTES_LENGTH]; /* 2673 */ + int mwm; + /* PoW-related information */ + PoW_Info pow_info; +}; + +struct _remote_impl_context { + void *context; + char *description; + /* Connection parameters */ + amqp_connection_state_t conn[CONN_MAX]; + /* Thread management */ + uv_mutex_t lock; + bool slots[CONN_MAX]; /* Used to tell which slot is + available */ + int num_max_thread; + int num_working_thread; + + /* Functions of Implementation Context */ + bool (*initialize)(RemoteImplContext *remote_ctx); + void (*destroy)(RemoteImplContext *remote_ctx); + /* Private PoW Context for each thread */ + void *(*getPoWContext)(RemoteImplContext *remote_ctx, + int8_t *trytes, + int mwm); + bool (*doThePoW)(RemoteImplContext *remote_ctx, void *pow_ctx); + int8_t *(*getPoWResult)(void *pow_ctx); + PoW_Info (*getPoWInfo)(void *pow_ctx); + bool (*freePoWContext)(RemoteImplContext *remote_ctx, void *pow_ctx); +}; + +bool initializeRemoteContext(RemoteImplContext *remote_ctx); +void destroyRemoteContext(RemoteImplContext *remote_ctx); +bool enterRemoteContext(RemoteImplContext *remote_ctx); +void *getRemoteContext(RemoteImplContext *remote_ctx, int8_t *trytes, int mwm); +bool doRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx); +int8_t *getRemoteResult(RemoteImplContext *remote_ctx, void *pow_ctx); +bool freeRemoteContext(RemoteImplContext *remote_ctx, void *pow_ctx); +void exitRemoteContext(RemoteImplContext *remote_ctx); + +#endif diff --git a/src/remote_worker.c b/src/remote_worker.c new file mode 100644 index 0000000..12c9136 --- /dev/null +++ b/src/remote_worker.c @@ -0,0 +1,77 @@ +/* + * Copyright (C) 2019 dcurl Developers. + * Use of this source code is governed by MIT license that can be + * found in the LICENSE file. + */ + +#include "constants.h" +#include "dcurl.h" +#include "remote_common.h" +#include "common.h" + +int main(int argc, char const *const *argv) +{ + char trytes[TRANSACTION_TRYTES_LENGTH]; + char buf[4]; + int mwm; + + amqp_connection_state_t conn; + amqp_envelope_t envelope; + + dcurl_init(); + + if (!connect_broker(&conn)) + goto fail; + + if (!set_consuming_queue(&conn, 1, "incoming_queue")) + goto fail; + + for (;;) { + if (!consume_message(&conn, 1, &envelope)) + goto fail; + + ddprintf(MSG_PREFIX "Delivery %u, exchange %.*s, routingkey %.*s, callback queue: %s " + "\n", + (unsigned) envelope.delivery_tag, (int) envelope.exchange.len, + (char *) envelope.exchange.bytes, (int) envelope.routing_key.len, + (char *) envelope.routing_key.bytes, + (char *) envelope.message.properties.reply_to.bytes); + if (envelope.message.properties._flags & AMQP_BASIC_CONTENT_TYPE_FLAG) { + ddprintf(MSG_PREFIX "Content-type: %.*s\n", + (int) envelope.message.properties.content_type.len, + (char *) envelope.message.properties.content_type.bytes); + } + + /* Message body format: transacton | mwm */ + memcpy(trytes, envelope.message.body.bytes, TRANSACTION_TRYTES_LENGTH); + memcpy(buf, envelope.message.body.bytes + TRANSACTION_TRYTES_LENGTH, 4); + mwm = strtol(buf, NULL, 10); + + ddprintf(MSG_PREFIX "Doing PoW with mwm = %d...\n", mwm); + + int8_t *ret_trytes = dcurl_entry((int8_t *) trytes, mwm, 0); + memset(buf, '0', sizeof(buf)); + ddprintf(MSG_PREFIX "PoW is done\n"); + + if (!acknowledge_broker(&conn, 1, &envelope)) + goto fail; + ddprintf(MSG_PREFIX "Sending an ack is done\n"); + + /* Publish a message of remote PoW result */ + if (!publish_message( + &conn, 1, + (char *) envelope.message.properties.reply_to.bytes, + (char *) ret_trytes)) + goto fail; + + free(ret_trytes); + amqp_destroy_envelope(&envelope); + ddprintf(MSG_PREFIX "Publishing PoW result to callback queue is done\n"); + ddprintf(MSG_PREFIX "---\n"); + } + +fail: + dcurl_destroy(); + + return -1; +} From 08e1ceeda75bb3390c2c82f2f8810dce51b217ed Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Sun, 12 May 2019 16:45:48 -0700 Subject: [PATCH 14/19] docs: Describe implementation of remote interface --- docs/remote-interface.md | 67 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 docs/remote-interface.md diff --git a/docs/remote-interface.md b/docs/remote-interface.md new file mode 100644 index 0000000..26d6e22 --- /dev/null +++ b/docs/remote-interface.md @@ -0,0 +1,67 @@ +# Remote interface +## Introduction + +``` + +-----------------------------------------------+ + | remote interface | + | +----------------------------------------+ | + | +----------------------------------------+| | + | +----------------------------------------+|+ | + | | RabbitMQ-provided RPC |+ | + | +----------------------------------------+ | + | | ^ | + +------------|---------------------|------------+ + +------------|---------------------|------------+ + | | RabbitMQ broker | | + | | | | + | | +------------------+ | + | v +------------------+| | + | +------------------+ +------------------+|| | + | | incoming queue | | private queue ||| | + | | | | ||| | + | | - trytes | | - PoW result ||| | + | | - mwm | | ||+ | + | | | | |+ | + | +------------------+ +------------------+ | + | | ^ | + +------------|---------------------|------------+ + v | + +---------------------------------------------+ + +---------------------------------------------+| + +---------------------------------------------+|+ + | remote worker |+ + +---------------------------------------------+ +``` +To support asynchronous remote procedure call, remote interface in dcurl provides an interface named as `Remote_ImplContext` to implement it. dcurl currently uses RabbitMQ C client to implement asynchronous RPC in remote interface. Remote interface provides thread management to support an asynchronous RPC per thread. + +Here are detailed implementations of the RabbitMQ-provided RPC pattern as follows: +* Asynchronous RPC requests are inserted into the message queue, `incoming_queue`, in RabbitMQ broker +* Asynchronous RPCs with exclusive private queues (callback queues) with TTL = 10s property +* Correlation ID is not used +* An asynchronous RPC uses a connection to RabbitMQ broker +* Remote workers can obtain requests from `incoming_queue` by default exchange of RabbitMQ broker + +## How to test remote interface in localhost +You need to open three terminals + +Terminal 1: Run the RabbitMQ broker You can quickly use docker to run the RabbitMQ broker, rabbitmq +``` +$ sudo docker run -d rabbitmq +``` + +Terminal 2: Run remote workers +``` +$ ./build/remote-worker +``` +How to build remote worker on FPGA board +``` +$ make BUILD_REMOTE=1 BUILD_FPGA_ACCEL=1 BOARD=de10nano +``` + +Terminal 3: Run check +``` +$ make BUILD_REMOTE=1 BUILD_DEBUG=1 check +``` + +## Requirements +Remote interface requires RabbitMQ broker From ab94edb383181d80af1c8a452ec8139cae36844e Mon Sep 17 00:00:00 2001 From: ajubuntu Date: Tue, 4 Jun 2019 20:46:20 -0700 Subject: [PATCH 15/19] refactor: Change blocked RPC to non-blocked RPC In the case of all remote workers which are not workable, blocked PRCs not get RPC results until remote workers are workable. Therefore, remote interface needs to change blocked RPC to non-blocked RPC with 10s. When time-out occurs, remote interface goes to run local dcurl. Pass test with a broker and remote interface without remote worker. Close #161 --- src/remote_common.c | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/src/remote_common.c b/src/remote_common.c index d9dd902..9a39669 100644 --- a/src/remote_common.c +++ b/src/remote_common.c @@ -7,6 +7,7 @@ #include "remote_common.h" #include #include +#include #include "common.h" bool die_on_amqp_error(amqp_rpc_reply_t x, char const *context) @@ -186,7 +187,12 @@ bool wait_response_message(amqp_connection_state_t *conn, * AMQP_FRAME_BODY*/ for (;;) { amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + struct timeval t = {10, 0}; /* RPC timeout: 10s*/ + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait method frame")) return false; @@ -209,7 +215,11 @@ bool wait_response_message(amqp_connection_state_t *conn, #endif amqp_maybe_release_buffers(*conn); - amqp_simple_wait_frame(*conn, &frame); + + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait header frame")) return false; @@ -230,7 +240,10 @@ bool wait_response_message(amqp_connection_state_t *conn, body_target = (size_t) frame.payload.properties.body_size; body_received = 0; while (body_received < body_target) { - amqp_simple_wait_frame(*conn, &frame); + if (!die_on_error(amqp_simple_wait_frame_noblock(*conn, &frame, &t), + "RPC timeout")) + return false; + if (!die_on_amqp_error(amqp_get_rpc_reply(*conn), "Wait body frame")) return false; From 07107ac1762b9205e053dbf00907ee4c0ddffcc0 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Wed, 5 Jun 2019 14:10:32 +0800 Subject: [PATCH 16/19] fix: Fix the error detected by undefined behavior Sanitizer Allocate enough space for loading the string as an 128-bit data. Close #157. --- src/trinary_sse42.h | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/trinary_sse42.h b/src/trinary_sse42.h index 5c60c19..33f3c3e 100644 --- a/src/trinary_sse42.h +++ b/src/trinary_sse42.h @@ -5,6 +5,7 @@ #include "constants.h" #define BLOCK_8BIT(type) (sizeof(type) / sizeof(int8_t)) +#define BYTE_OF_128BIT 16 #define COMMA0 #define COMMA1 , #define COMMA(x) COMMA##x @@ -61,7 +62,7 @@ static inline bool validateTrytes_sse42(Trobject_t *trytes) { const int block_8bit = BLOCK_8BIT(__m128i); /* Characters from 'A' to 'Z' and '9' to '9' */ - const char *range = "AZ99"; + const char range[BYTE_OF_128BIT] = "AZ99"; __m128i pattern = _mm_loadu_si128((__m128i *) (range)); /* The for loop handles the group of the 128-bit characters without the * end-of-string */ @@ -265,23 +266,23 @@ static inline Trobject_t *trits_from_trytes_sse42(Trobject_t *trytes) /* The set and range for indicating the trits value (0, 1, -1) * of the corresponding trytes */ /* '9', 'C', 'F', 'I', 'L', 'O', 'R', 'U', 'X' */ - const char *setLowTrit0 = "9CFILORUX"; + const char setLowTrit0[BYTE_OF_128BIT] = "9CFILORUX"; /* 'A', 'D', 'G', 'J', 'M', 'P', 'S', 'V', 'Y' */ - const char *setLowTritP1 = "ADGJMPSVY"; + const char setLowTritP1[BYTE_OF_128BIT] = "ADGJMPSVY"; /* 'B', 'E', 'H', 'K', 'N', 'Q', 'T', 'W', 'Z' */ - const char *setLowTritN1 = "BEHKNQTWZ"; + const char setLowTritN1[BYTE_OF_128BIT] = "BEHKNQTWZ"; /* '9', 'A', 'H', 'I', 'J', 'Q', 'R', 'S', 'Z' */ - const char *rangeMidTrit0 = "99AAHJQSZZ"; + const char rangeMidTrit0[BYTE_OF_128BIT] = "99AAHJQSZZ"; /* 'B', 'C', 'D', 'K', 'L', 'M', 'T', 'U', 'V' */ - const char *rangeMidTritP1 = "BDKMTV"; + const char rangeMidTritP1[BYTE_OF_128BIT] = "BDKMTV"; /* 'E', 'F', 'G', 'N', 'O', 'P', 'W', 'X', 'Y' */ - const char *rangeMidTritN1 = "EGNPWY"; + const char rangeMidTritN1[BYTE_OF_128BIT] = "EGNPWY"; /* '9', 'A', 'B', 'C', 'D', 'W', 'X', 'Y', 'Z' */ - const char *rangeHighTrit0 = "99ADWZ"; + const char rangeHighTrit0[BYTE_OF_128BIT] = "99ADWZ"; /* 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M' */ - const char *rangeHighTritP1 = "EM"; + const char rangeHighTritP1[BYTE_OF_128BIT] = "EM"; /* 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V' */ - const char *rangeHighTritN1 = "NV"; + const char rangeHighTritN1[BYTE_OF_128BIT] = "NV"; /* Convert the char array to the 128-bit data */ const __m128i patternLowTrit0 = _mm_loadu_si128((__m128i *) (setLowTrit0)); const __m128i patternLowTritP1 = From eef03bb12910ba4115680b3179d80e3419e62750 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Wed, 5 Jun 2019 22:29:25 +0800 Subject: [PATCH 17/19] CI: Allow CI exits when detecting undefined behavior --- mk/sanitizers.mk | 3 +++ 1 file changed, 3 insertions(+) diff --git a/mk/sanitizers.mk b/mk/sanitizers.mk index 34f8ab8..b2f862d 100644 --- a/mk/sanitizers.mk +++ b/mk/sanitizers.mk @@ -17,5 +17,8 @@ ifeq ("$(LOWER_VER)","$(SANITIZER_MIN_GCC_VER)") ifneq ("$(SANITIZER)","") CFLAGS += -fsanitize=$(SANITIZER) LDFLAGS += -fsanitize=$(SANITIZER) + ifeq ("$(SANITIZER)","undefined") + CFLAGS += -fno-sanitize-recover + endif endif endif From 2867d2e31fbd989e3627ac8ee1a816059da4c0e7 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Thu, 6 Jun 2019 14:10:04 +0800 Subject: [PATCH 18/19] test: Modify the difficulty and resource of testcases Change the mwm from 9(testnet) to 14(mainnet). Use the fixed number of threads instead of the maximum available threads. The fixed number is the limit. If the available CPU cores are less than the limit value, dcurl uses the maximum available threads. Otherwise, the limit number of threads are used. The modification of the thread number is for not exhausting the hardware resource. --- tests/test-dcurl.c | 6 +++--- tests/test-multi-pow.c | 2 +- tests/test-pow.c | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/test-dcurl.c b/tests/test-dcurl.c index 040cb2e..fcee3bd 100644 --- a/tests/test-dcurl.c +++ b/tests/test-dcurl.c @@ -47,12 +47,12 @@ int main() "9999999999999999999999999999999999999999999999999999999999999999999999" "9999999999999"; - int mwm = 9; + int mwm = 14; for (int loop_count = 0; loop_count < LOOP_MAX; loop_count++) { - /* test dcurl Implementation with mwm = 9 */ + /* test dcurl Implementation with mwm = 14 */ dcurl_init(); - int8_t *ret_trytes = dcurl_entry((int8_t *) trytes, mwm, 0); + int8_t *ret_trytes = dcurl_entry((int8_t *) trytes, mwm, 8); assert(ret_trytes); dcurl_destroy(); diff --git a/tests/test-multi-pow.c b/tests/test-multi-pow.c index f3d3efe..ac9a876 100644 --- a/tests/test-multi-pow.c +++ b/tests/test-multi-pow.c @@ -17,7 +17,7 @@ void *dcurl_entry_cb(void *arg) dcurl_item *item = (dcurl_item *) arg; /* test dcurl Implementation with mwm = 14 */ int8_t *ret_trytes = - dcurl_entry(item->input_trytes, item->mwm, 0); + dcurl_entry(item->input_trytes, item->mwm, 8); assert(ret_trytes && "dcurl_entry() failed"); memcpy(item->output_trytes, ret_trytes, TRANSACTION_TRYTES_LENGTH); free(ret_trytes); diff --git a/tests/test-pow.c b/tests/test-pow.c index a53310d..522a169 100644 --- a/tests/test-pow.c +++ b/tests/test-pow.c @@ -143,7 +143,7 @@ int main() /* test implementation with mwm = 14 */ initializeImplContext(PoW_Context_ptr); void *pow_ctx = - getPoWContext(PoW_Context_ptr, (int8_t *) trytes, mwm, 0); + getPoWContext(PoW_Context_ptr, (int8_t *) trytes, mwm, 8); assert(pow_ctx); for (int count = 0; count < pow_total; count++) { From 1624c2a7d3e97f84ac2d985099fdd81bc1bbb866 Mon Sep 17 00:00:00 2001 From: marktwtn Date: Wed, 19 Jun 2019 12:18:27 +0800 Subject: [PATCH 19/19] Release version 0.4.0 --- Makefile | 2 +- src/common.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 749d45e..6c4d100 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -VERSION = 0.3.0 +VERSION = 0.4.0 OUT ?= ./build SRC := src diff --git a/src/common.h b/src/common.h index d064dc5..c675118 100644 --- a/src/common.h +++ b/src/common.h @@ -7,7 +7,7 @@ #include #define __DCURL_MAJOR__ 0 -#define __DCURL_MINOR__ 1 +#define __DCURL_MINOR__ 4 #define __DCURL_PATCH__ 0 double diff_in_second(struct timespec t1, struct timespec t2);