Skip to content
This repository has been archived by the owner on Dec 26, 2022. It is now read-only.

Commit

Permalink
feat(db): Implement transaction reattach service
Browse files Browse the repository at this point in the history
Get all pending transactions from ScyllaDB every 5 mins. Update the status of transaction if it has been confirmed.  Preform reattachment if the transaction has been pended since 30 minutes ago.

Close #414
  • Loading branch information
YingHan-Chen authored and jserv committed Jan 16, 2020
1 parent 15db7fb commit df53b0b
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 15 deletions.
22 changes: 22 additions & 0 deletions docs/reattacher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Transaction reattacher

`Transaction reattacher` is a service that helps persistent pending transactions to be re-attached to the Tangle. A persistent transaction is a transaction that does not be confirmed more than 30 minutes.

When enabling the external database for transaction reattachment, `Tangle-Accelerator` will store transactions issued by API [Send Transfer Message](https://github.com/DLTcollab/tangle-accelerator/wiki/Send-Transfer-Message).

`Transaction reattacher` will periodically read pending transactions from a specific ScyllaDB cluster, and get the latest inclusion status of those transactions from an IOTA full node. `Reattacher` will update the newest inclusion status to the ScyllaDB cluster. For persistent transactions, `reattacher` performs reattachment, which will do tips selection and PoW for the original bundle, and reattach it to the Tangle. After reattachment, `reattacher` will update the new transaction hash to the ScyllaDB cluster.


See [docs/build.md] for more information about enabling transaction reattachment.

## Build Instructions

`bazel build //reattacher`

The reattacher support following options :

* `DB_HOST`: binding address of ScyllDB cluster
* `IRI_HOST`: binding address of IRI
* `IRI_PORT`: port of IRI

If you do not specify `DB_HOST` or `IRI_HOST`, the address will be set as `localhost`.
9 changes: 9 additions & 0 deletions reattacher/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
cc_binary(
name = "reattacher",
srcs = ["reattacher_main.c"],
deps = [
"//accelerator:ta_config",
"//storage",
"@entangled//cclient/api",
],
)
173 changes: 173 additions & 0 deletions reattacher/reattacher_main.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (C) 2020 BiiLabs Co., Ltd. and Contributors
* All Rights Reserved.
* This is free software; you can redistribute it and/or modify it under the
* terms of the MIT license. A copy of the license can be found in the file
* "LICENSE" at the root of this distribution.
*/
#include <getopt.h>
#include <sys/time.h>"
#include "accelerator/config.h"
#include "cclient/api/core/core_api.h"
#include "cclient/api/extended/extended_api.h"
#include "common/model/bundle.h"
#include "storage/ta_storage.h"

#define PRESISTENT_PENDING_SECOND 1800 /**< 30 mins */
#define DELAY_INTERVAL 300 /**< 5 mins */

#define logger_id scylladb_logger_id

static status_t init_iota_client_service(iota_client_service_t* const serv) {
if (serv == NULL) {
ta_log_error("Invalid NULL pointer\n");
return SC_TA_NULL;
}
serv->http.path = "/";
serv->http.content_type = "application/json";
serv->http.accept = "application/json";
serv->http.api_version = 1;
serv->http.ca_pem = NULL;
serv->serializer_type = SR_JSON;
if (iota_client_core_init(serv) != RC_OK) {
ta_log_error("Failed to connect to IRI.\n");
return SC_TA_OOM;
}
return SC_OK;
}

static status_t handle_pending_txn(iota_client_service_t* iota_service, db_client_service_t* db_service,
db_identity_t* obj) {
status_t ret = SC_OK;
hash243_queue_t req_txn = NULL;
get_inclusion_states_res_t* res = get_inclusion_states_res_new();
flex_trit_t trit_hash[NUM_TRITS_HASH];
char tryte_hash[NUM_TRYTES_HASH];
flex_trits_from_trytes(trit_hash, NUM_TRITS_HASH, (const tryte_t*)db_ret_identity_hash(obj), NUM_TRYTES_HASH,
NUM_TRYTES_HASH);

hash243_queue_push(&req_txn, trit_hash);

if (iota_client_get_latest_inclusion(iota_service, req_txn, res) != RC_OK ||
get_inclusion_states_res_states_count(res) != 1) {
ret = SC_CCLIENT_FAILED_RESPONSE;
ta_log_error("Failed to get inclustion status\n");
db_show_identity_info(obj);
goto exit;
}
if (get_inclusion_states_res_states_at(res, 0)) {
// confirmed transaction
ta_log_info("Find confirmed transaction\n");
db_set_identity_status(obj, CONFIRMED_TXN);
ret = db_insert_identity_table(db_service, obj);
if (ret != SC_OK) {
ta_log_error("Failed to insert identity table\n");
db_show_identity_info(obj);
goto exit;
}
} else if (db_ret_identity_time_elapsed(obj) > PRESISTENT_PENDING_SECOND) {
// reattach
ta_log_info("Reattach pending transaction\n");
db_show_identity_info(obj);
bundle_transactions_t* res_bundle_txn;
bundle_transactions_new(&res_bundle_txn);

if (iota_client_replay_bundle(iota_service, trit_hash, MILESTONE_DEPTH, MWM, NULL, res_bundle_txn) != RC_OK) {
ta_log_error("Failed to reattach to Tangle\n");
db_show_identity_info(obj);
ret = SC_CCLIENT_FAILED_RESPONSE;
goto reattach_done;
}

/**
* < get the second transaction in the bundle,
* the first transaction is the original transaction before reattachment
*/
iota_transaction_t* txn = bundle_at(res_bundle_txn, 1);
flex_trits_to_trytes((tryte_t*)tryte_hash, NUM_TRYTES_HASH, transaction_hash(txn), NUM_TRITS_HASH, NUM_TRITS_HASH);

db_set_identity_hash(obj, (cass_byte_t*)tryte_hash, NUM_TRYTES_HASH);
db_set_identity_timestamp(obj, time(NULL));

ret = db_insert_identity_table(db_service, obj);
if (ret != SC_OK) {
ta_log_error("Failed to insert identity table\n");
goto exit;
}

reattach_done:
bundle_transactions_free(&res_bundle_txn);
}

exit:
hash243_queue_free(&req_txn);
get_inclusion_states_res_free(&res);

return ret;
}

int main(int argc, char** argv) {
int optIdx;
db_client_service_t db_service;
iota_client_service_t iota_service;
db_service.host = strdup("localhost");
iota_service.http.host = "localhost";
iota_service.http.port = 14265;

const struct option longOpt[] = {{"iri_host", required_argument, NULL, 'h'},
{"iri_port", required_argument, NULL, 'p'},
{"db_host", required_argument, NULL, 'd'},
{NULL, 0, NULL, 0}};

/* Parse the command line options */
/* TODO: Support macOS since getopt_long() is GNU extension */
while (1) {
int cmdOpt = getopt_long(argc, argv, "b:", longOpt, &optIdx);
if (cmdOpt == -1) break;

/* Invalid option */
if (cmdOpt == '?') break;

if (cmdOpt == 'h') {
iota_service.http.host = optarg;
}
if (cmdOpt == 'p') {
iota_service.http.port = atoi(optarg);
}
if (cmdOpt == 'd') {
free(db_service.host);
db_service.host = strdup(optarg);
}
}
if (ta_logger_init() != SC_OK) {
ta_log_error("logger init fail\n");
return EXIT_FAILURE;
}
scylladb_logger_init();
if (db_client_service_init(&db_service, DB_USAGE_REATTACH) != SC_OK) {
ta_log_error("Failed to init db client service\n");
return EXIT_FAILURE;
}
if (init_iota_client_service(&iota_service) != SC_OK) {
ta_log_error("Failed to init iota client service\n");
return EXIT_FAILURE;
}
while (1) {
db_identity_array_t* id_array = db_identity_array_new();
db_get_identity_objs_by_status(&db_service, PENDING_TXN, id_array);
db_identity_t* itr;
IDENTITY_TABLE_ARRAY_FOREACH(id_array, itr) {
if (handle_pending_txn(&iota_service, &db_service, itr) != SC_OK) {
ta_log_warning("Failed to handle pending transaction\n");
db_show_identity_info(itr);
}
}
db_identity_array_free(&id_array);
sleep(DELAY_INTERVAL);
}

db_client_service_free(&db_service);
iota_client_core_destroy(&iota_service);
scylladb_logger_release();
return 0;
}
35 changes: 24 additions & 11 deletions storage/scylladb_identity.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,24 @@ struct db_identity_s {
CassUuid uuid;
cass_int64_t timestamp;
cass_int8_t status;
cass_byte_t hash[NUM_FLEX_TRITS_BUNDLE];
cass_byte_t hash[DB_NUM_TRYTES_HASH];
};

status_t db_show_identity_info(db_identity_t* obj) {
if (obj == NULL) {
ta_log_error("Invaild NULL pointer : obj\n");
return SC_TA_NULL;
}
char uuid_string[DB_UUID_STRING_LENGTH];
char hash[DB_NUM_TRYTES_HASH + 1];
db_get_identity_uuid_string(obj, uuid_string);
memcpy(hash, obj->hash, DB_NUM_TRYTES_HASH);
hash[DB_NUM_TRYTES_HASH] = 0;
ta_log_info("identity info\n uuid string : %s\nhash :%s\ntimestamp : %ld\ntime eclapsed : %ld\nstatus : %d\n",
uuid_string, hash, obj->timestamp, db_ret_identity_time_elapsed(obj), obj->status);
return SC_OK;
}

status_t db_identity_new(db_identity_t** obj) {
*obj = (db_identity_t*)malloc(sizeof(struct db_identity_s));
if (NULL == *obj) {
Expand Down Expand Up @@ -122,11 +137,11 @@ status_t db_set_identity_hash(db_identity_t* obj, const cass_byte_t* hash, size_
if (hash == NULL) {
ta_log_error("NULL pointer to hash to insert into identity table\n");
}
if (length != NUM_FLEX_TRITS_HASH) {
if (length != DB_NUM_TRYTES_HASH) {
ta_log_error("SC_STORAGE_INVAILD_INPUT\n");
return SC_STORAGE_INVAILD_INPUT;
}
memcpy(obj->hash, hash, NUM_FLEX_TRITS_HASH);
memcpy(obj->hash, hash, DB_NUM_TRYTES_HASH);
return SC_OK;
}

Expand Down Expand Up @@ -187,7 +202,7 @@ status_t db_insert_tx_into_identity(db_client_service_t* service, const char* ha
ta_log_error("db new identity failed\n");
goto exit;
}
if ((ret = db_set_identity_hash(identity, (cass_byte_t*)hash, NUM_FLEX_TRITS_HASH)) != SC_OK) {
if ((ret = db_set_identity_hash(identity, (cass_byte_t*)hash, DB_NUM_TRYTES_HASH)) != SC_OK) {
ta_log_error("db set identity hash failed\n");
goto exit;
}
Expand Down Expand Up @@ -251,7 +266,7 @@ status_t db_init_identity_keyspace(db_client_service_t* service, bool need_drop,
static CassStatement* ret_insert_identity_statement(const CassPrepared* prepared, const db_identity_t* obj) {
CassStatement* statement = NULL;
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes_by_name(statement, "hash", obj->hash, NUM_FLEX_TRITS_HASH);
cass_statement_bind_bytes_by_name(statement, "hash", obj->hash, DB_NUM_TRYTES_HASH);
cass_statement_bind_int8_by_name(statement, "status", obj->status);
cass_statement_bind_int64_by_name(statement, "ts", obj->timestamp);
cass_statement_bind_uuid_by_name(statement, "id", obj->uuid);
Expand All @@ -262,9 +277,7 @@ status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* o
status_t ret = SC_OK;
const CassPrepared* insert_prepared = NULL;
CassStatement* statement = NULL;
const char* insert_query =
"INSERT INTO identity (id, ts, status, hash)"
"VALUES (?, ?, ?, ?);";
const char* query = "UPDATE identity Set ts = ?, status = ?, hash =? Where id = ?";
if (service == NULL) {
ta_log_error("NULL pointer to ScyllaDB client service for connection endpoint(s)");
return SC_TA_NULL;
Expand All @@ -274,7 +287,7 @@ status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* o
return SC_TA_NULL;
}

if (prepare_query(service->session, insert_query, &insert_prepared) != CASS_OK) {
if (prepare_query(service->session, query, &insert_prepared) != CASS_OK) {
ta_log_error("%s\n", "prepare INSERT query fail");
return SC_STORAGE_CASSANDRA_QUREY_FAIL;
}
Expand Down Expand Up @@ -327,7 +340,7 @@ static status_t get_identity_array(CassSession* session, CassStatement* statemen
itr = (db_identity_t*)utarray_back(identity_array);

cass_value_get_bytes(cass_row_get_column_by_name(row, "hash"), &hash, &len);
db_set_identity_hash(itr, hash, NUM_FLEX_TRITS_HASH);
db_set_identity_hash(itr, hash, DB_NUM_TRYTES_HASH);
cass_value_get_int8(cass_row_get_column_by_name(row, "status"), &value);
db_set_identity_status(itr, value);
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
Expand Down Expand Up @@ -402,7 +415,7 @@ status_t db_get_identity_objs_by_hash(db_client_service_t* service, const cass_b
return SC_STORAGE_CASSANDRA_QUREY_FAIL;
}
statement = cass_prepared_bind(select_prepared);
cass_statement_bind_bytes_by_name(statement, "hash", hash, NUM_FLEX_TRITS_HASH);
cass_statement_bind_bytes_by_name(statement, "hash", hash, DB_NUM_TRYTES_HASH);
ret = get_identity_array(service->session, statement, identity_array);

cass_prepared_free(select_prepared);
Expand Down
12 changes: 12 additions & 0 deletions storage/scylladb_identity.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ typedef UT_array db_identity_array_t;

typedef enum { PENDING_TXN = 0, INSERTING_TXN, CONFIRMED_TXN, NUM_OF_TXN_STATUS } db_txn_status_t;

#define DB_NUM_TRYTES_HASH NUM_TRYTES_HASH

/**
* Allocate memory of db_identity_array_t
*/
Expand Down Expand Up @@ -253,6 +255,16 @@ status_t db_get_identity_objs_by_hash(db_client_service_t* service, const cass_b
*/
status_t db_insert_identity_table(db_client_service_t* service, db_identity_t* obj);

/**
* @brief show logger info for details of identity object
*
* @param[in] obj pointer to db_identity_t
* @return
* - SC_OK on success
* - SC_TA_NULL/SC_STORAGE_INVAILD_INPUT on error
*/
status_t db_show_identity_info(db_identity_t* obj);

#ifdef __cplusplus
}
#endif
Expand Down
8 changes: 4 additions & 4 deletions tests/test_scylladb.c
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ void test_db_get_identity_objs_by_status(db_client_service_t* db_client_service)
db_get_identity_uuid_string(itr, uuid_string);

TEST_ASSERT_EQUAL_STRING(uuid_string, identities[idx].uuid_string);
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (flex_trit_t*)identities[idx].hash,
sizeof(flex_trit_t) * NUM_FLEX_TRITS_HASH);
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (cass_byte_t*)identities[idx].hash,
sizeof(cass_byte_t) * DB_NUM_TRYTES_HASH);
idx++;
}
db_identity_array_free(&db_identity_array);
Expand All @@ -194,8 +194,8 @@ void test_db_get_identity_objs_by_uuid_string(db_client_service_t* db_client_ser
db_identity_t* itr;
int idx = 0;
IDENTITY_TABLE_ARRAY_FOREACH(db_identity_array, itr) {
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (flex_trit_t*)identities[idx].hash,
sizeof(flex_trit_t) * NUM_FLEX_TRITS_HASH);
TEST_ASSERT_EQUAL_MEMORY(db_ret_identity_hash(itr), (cass_byte_t*)identities[idx].hash,
sizeof(cass_byte_t) * DB_NUM_TRYTES_HASH);
TEST_ASSERT_EQUAL_INT(db_ret_identity_status(itr), identities[idx].status);
idx++;
}
Expand Down

0 comments on commit df53b0b

Please sign in to comment.