From 4672edf617cd2fcf2115b0156a1b4f4f3f40b1a5 Mon Sep 17 00:00:00 2001 From: Alexander Sedelnikov Date: Fri, 2 Feb 2018 17:46:46 +0700 Subject: [PATCH] Add ability to process all transactions from blockchain. Closes #16. --- src/entities/user.ts | 3 + src/helpers/helpers.ts | 28 +++ src/services/app/user/user.account.app.ts | 2 +- src/services/events/web3.events.ts | 224 ++++++++++++++++--- src/services/repositories/user.repository.ts | 13 ++ src/services/tokens/erc20token.service.ts | 4 +- 6 files changed, 236 insertions(+), 38 deletions(-) diff --git a/src/entities/user.ts b/src/entities/user.ts index 241a38d..63fb9d4 100644 --- a/src/entities/user.ts +++ b/src/entities/user.ts @@ -9,6 +9,9 @@ import { VerifyMethod } from './verify.action'; @Index('user_email', () => ({ email: 1 }), { unique: true }) +@Index('user_wallets', () => ({ + 'wallets.address': 1 +}), { unique: true }) export class User { @ObjectIdColumn() id: ObjectID; diff --git a/src/helpers/helpers.ts b/src/helpers/helpers.ts index 40d1a70..55b42fd 100644 --- a/src/helpers/helpers.ts +++ b/src/helpers/helpers.ts @@ -1,4 +1,5 @@ import * as LRU from 'lru-cache'; +import { number } from 'joi'; function escape(str: string): string { return str.replace(/\+/g, '-') @@ -32,6 +33,33 @@ export function chunkArray(srcArray: T[], size: number): T[][] { ); } +/** + * + * @param items + * @param chunkSize + * @param mapFunc + */ +export async function processAsyncIntRangeByChunks( + from: number, to: number, step: number, chunkSize: number, mapFunc: (item: number) => Promise +): Promise { + if (from > to) { + throw new Error('Invalid range'); + } + let data: R[] = []; + let numbers: number[]; + let j: number; + + while(from <= to) { + numbers = []; + for(j = 0; j < chunkSize && from <= to; ++j, from += step) { + numbers.push(from); + } + data = data.concat(await Promise.all(numbers.map(mapFunc))); + } + + return data; +} + /** * * @param items diff --git a/src/services/app/user/user.account.app.ts b/src/services/app/user/user.account.app.ts index ed8166a..3b3df25 100644 --- a/src/services/app/user/user.account.app.ts +++ b/src/services/app/user/user.account.app.ts @@ -84,7 +84,7 @@ export class UserAccountApplication { const existingUser = await getConnection().getMongoRepository(User).findOne({ email }); if (existingUser) { - if (!existingUser.isVerified) { + if (!existingUser.isVerified && bcrypt.compareSync(userData.password, existingUser.passwordHash)) { return this.initiateCreateAndReturnUser(existingUser, initiateVerification); } else { throw new UserExists('User already exists'); diff --git a/src/services/events/web3.events.ts b/src/services/events/web3.events.ts index 8aefb8f..4bbc8a1 100644 --- a/src/services/events/web3.events.ts +++ b/src/services/events/web3.events.ts @@ -1,19 +1,31 @@ import config from '../../config'; import { injectable, inject } from 'inversify'; const Web3 = require('web3'); +const Web3Utils = require('web3-utils'); +const Web3Abi = require('web3-eth-abi'); const net = require('net'); +import { getMongoRepository } from 'typeorm'; import { Transaction, ERC20_TRANSFER, TRANSACTION_STATUS_PENDING, TRANSACTION_STATUS_CONFIRMED, - TRANSACTION_STATUS_FAILED + TRANSACTION_STATUS_FAILED, + ETHEREUM_TRANSFER } from '../../entities/transaction'; -import { getMongoRepository } from 'typeorm'; import { TransactionRepositoryInterface, TransactionRepositoryType } from '../repositories/transaction.repository'; -import { chunkArray, processAsyncItemsByChunks } from '../../helpers/helpers'; +import { chunkArray, processAsyncItemsByChunks, processAsyncIntRangeByChunks } from '../../helpers/helpers'; import { Logger } from '../../logger'; +import { UserRepositoryType, UserRepositoryInterface } from '../repositories/user.repository'; +import { Wallet } from '../../entities/wallet'; +import { Transaction as EthTransaction, Block } from 'web3/types'; +import { toEthChecksumAddress } from '../crypto'; + +type WalletsMap = { [k: string]: Wallet[] }; +type ExtEthTransaction = EthTransaction & { + contractAddress: string; +}; export interface Web3EventInterface { } @@ -28,30 +40,54 @@ function getTxStatusByReceipt(receipt: any): string { return TRANSACTION_STATUS_FAILED; } -const CONCURRENT_PROCESS_PENDING_COUNT = 6; +const CONCURRENT_BLOCK_PROCESS_COUNT = 2; +const CONCURRENT_TRANSACTIONS_PROCESS_COUNT = 4; const TRANSACTION_CHECKING_INTERVAL_TIME: number = 15000; +// @TODO: Need to refacting /* istanbul ignore next */ @injectable() export class Web3Event implements Web3EventInterface { private logger = Logger.getInstance('WEB3_EVENT'); private web3: any; - private erc20Token: any; // @todo: remove or replace this solution by outside service or simple setTimeout/setInterval - private queueWrapper: any; private lastCheckingBlock: number = 0; + private erc20Abi: { + transfer: { + methodSignature: string; + abi: any; + }, + transferFrom: { + methodSignature: string; + abi: any; + } + }; + /** * * @param txRep */ constructor( - @inject(TransactionRepositoryType) private txRep: TransactionRepositoryInterface + @inject(TransactionRepositoryType) private txRep: TransactionRepositoryInterface, + @inject(UserRepositoryType) private userRep: UserRepositoryInterface ) { + this.erc20Abi = { + transfer: { + methodSignature: Web3Abi.encodeFunctionSignature('transfer(address,uint256)').slice(2), + abi: config.contracts.erc20Token.abi.filter(i => i.type === 'function' && i.name === 'transfer').pop() + }, + transferFrom: { + methodSignature: Web3Abi.encodeFunctionSignature('transferFrom(address,uint256,uint256)').slice(2), + abi: config.contracts.erc20Token.abi.filter(i => i.type === 'function' && i.name === 'transferFrom').pop() + } + }; + switch (config.rpc.type) { case 'ipc': this.web3 = new Web3(new Web3.providers.IpcProvider(config.rpc.address, net)); break; + case 'ws': const webSocketProvider = new Web3.providers.WebsocketProvider(config.rpc.address); @@ -62,9 +98,11 @@ export class Web3Event implements Web3EventInterface { this.web3 = new Web3(webSocketProvider); break; + case 'http': this.web3 = new Web3(config.rpc.address); break; + default: throw Error('Unknown Web3 RPC type!'); } @@ -84,8 +122,8 @@ export class Web3Event implements Web3EventInterface { const intervalExecuteMethod = () => { setTimeout(() => { - this.checkPendingTransactions() - .then(() => {}, (err) => { this.logger.error('Error was occurred', err); }) + this.checkTransactions() + .then(() => { }, (err) => { this.logger.error('Error was occurred', err); }) .then(() => { intervalExecuteMethod(); }); }, TRANSACTION_CHECKING_INTERVAL_TIME); }; @@ -95,9 +133,39 @@ export class Web3Event implements Web3EventInterface { /** * + * @param blockData */ - async checkPendingTransactions(): Promise { - this.logger.debug('Check pending transactions in blocks'); + private async getWalletMapInTransactions(transactions: ExtEthTransaction[]): Promise { + const txMaps = {}; + transactions.map(t => t.from).concat(transactions.map(t => t.to)).filter(t => t) + .forEach(t => { + txMaps[t] = 1; + }) + + const walletIds = {}; + (await this.userRep.getAllByWalletAddresses( + Object.keys(txMaps) + )).map(u => u.wallets) + .reduce((allWallets, wallets) => allWallets.concat(wallets), []) + .filter(w => txMaps[w.address]) + .forEach(w => { + walletIds[w.address] = (walletIds[w.address] || []) + walletIds[w.address].push(w); + }); + + return walletIds; + } + + private filterTransactionByWalletAddresses(walletsMap: WalletsMap, transactions: ExtEthTransaction[]): ExtEthTransaction[] { + return transactions + .filter(t => walletsMap[t.from] || walletsMap[t.to]); + } + + /** + * + */ + async checkTransactions(): Promise { + this.logger.debug('Check transactions in blocks'); if (!this.lastCheckingBlock) { this.logger.debug('Get the biggest block height value from local transactions'); @@ -120,26 +188,25 @@ export class Web3Event implements Web3EventInterface { this.lastCheckingBlock = currentBlock; } - this.logger.debug('Check blocks from', currentBlock, 'to', this.lastCheckingBlock); - // @TODO: Also should process blocks in concurrent mode - for (let i = this.lastCheckingBlock; i < currentBlock; i++) { - const blockData = await this.web3.eth.getBlock(i, true); + this.logger.debug('Check blocks from', this.lastCheckingBlock, 'to', currentBlock); + + await processAsyncIntRangeByChunks(this.lastCheckingBlock, currentBlock, 1, CONCURRENT_BLOCK_PROCESS_COUNT, async (i) => { + const blockData: Block = await this.web3.eth.getBlock(i, true); if (!(i % 10)) { this.logger.debug('Blocks processed:', i); } if (!blockData) { - continue; + return; } try { - await processAsyncItemsByChunks(blockData.transactions || [], CONCURRENT_PROCESS_PENDING_COUNT, - transaction => this.processPendingTransaction(transaction, blockData)); + await this.processTransactionsInBlock(blockData); } catch (err) { this.logger.error(err); } - } + }); this.lastCheckingBlock = currentBlock - 1; this.logger.debug('Change lastCheckingBlock to', this.lastCheckingBlock); @@ -159,40 +226,127 @@ export class Web3Event implements Web3EventInterface { this.logger.debug('Process new block headers'); - const blockData = await this.web3.eth.getBlock(data.hash, true); - const transactions = blockData.transactions; - for (let transaction of transactions) { - await this.processPendingTransaction(transaction, blockData); + this.processTransactionsInBlock(await this.web3.eth.getBlock(data.hash, true)); + } + + /** + * + * @param blockData + */ + private async processTransactionsInBlock(blockData: Block) { + if (!blockData || !blockData.transactions || !blockData.transactions.length) { + return {}; + } + + // extend transactions in block by parsing erc20 methods + const sourceTransactions: ExtEthTransaction[] = blockData.transactions.map(t => { + let contractAddress = undefined; + if (t.input.length === 2 + 8 + 64 + 64 && t.input.slice(2, 10) === this.erc20Abi.transfer.methodSignature) { + contractAddress = t.to; + const methodArgs = Web3Abi.decodeParameters(this.erc20Abi.transfer.abi.inputs, t.input.slice(10)); + t.from = t.from; + t.to = methodArgs[0]; + t.value = methodArgs[1]; + } else if (t.input.length === 2 + 8 + 64 + 64 + 64 && t.input.slice(2, 10) === this.erc20Abi.transfer.methodSignature) { + contractAddress = t.to; + const methodArgs = Web3Abi.decodeParameters(this.erc20Abi.transferFrom.abi.inputs, t.input.slice(10)); + t.from = methodArgs[0]; + t.to = methodArgs[1]; + t.value = methodArgs[2]; + } + return { + ...t, + contractAddress + } + }); + + const wallets = await this.getWalletMapInTransactions(sourceTransactions); + if (!Object.keys(wallets).length) { + return; + } + + const transactions = this.filterTransactionByWalletAddresses(wallets, sourceTransactions); + + this.logger.debug('Process transactions in block', transactions.length, 'wallets count', Object.keys(wallets).length); + + await processAsyncItemsByChunks(transactions || [], CONCURRENT_TRANSACTIONS_PROCESS_COUNT, + transaction => this.processTransaction(transaction, blockData, wallets)); + } + + private processNotRegisteredEthereumTransaction(tx: Transaction, ethTx: ExtEthTransaction) { + tx.type = ETHEREUM_TRANSFER; + delete tx.contractAddress; + tx.from = ethTx.from; + tx.to = ethTx.to; + tx.amount = Web3Utils.fromWei(ethTx.value); + } + + private processNotRegisteredContractTransaction(tx: Transaction, ethTx: ExtEthTransaction): boolean { + const methodSignature = ethTx.input.slice(2, 10); + + if (methodSignature === this.erc20Abi.transfer.methodSignature) { + tx.from = ethTx.from; + tx.to = ethTx.to; + tx.amount = ethTx.value; + } else if (methodSignature === this.erc20Abi.transfer.methodSignature) { + tx.from = ethTx.from; + tx.to = ethTx.to; + tx.amount = ethTx.value; + } else { + return false; } + + tx.type = ERC20_TRANSFER; + tx.contractAddress = ethTx.contractAddress; + + return true; } /** * * @param data */ - async processPendingTransaction(data: any, blockData: any): Promise { - const txHash = data.transactionHash || data.hash; - const tx = await this.txRep.getByHash(txHash); - if (!tx || tx.status !== TRANSACTION_STATUS_PENDING) { + async processTransaction(ethTx: ExtEthTransaction, blockData: Block, walletsMap: WalletsMap): Promise { + let tx = await this.txRep.getByHash(ethTx.hash); + // process for not registered tx-s + if (!tx) { + tx = Transaction.createTransaction({ + transactionHash: ethTx.hash, + details: JSON.stringify({ + gas: ethTx.gas, + gasPrice: Web3Utils.fromWei(ethTx.gasPrice, 'gwei') + }) + }); + if (ethTx.value && ethTx.input === '0x') { + this.logger.debug('Process a new ethereum transfer transaction', ethTx.hash); + this.processNotRegisteredEthereumTransaction(tx, ethTx); + } else { + this.logger.debug('Process a new contract transaction', ethTx.hash); + if (!this.processNotRegisteredContractTransaction(tx, ethTx)) { + this.logger.debug('Unknown contract action in transaction, skip this', ethTx.hash); + return; + } + } + } else if (tx.status !== TRANSACTION_STATUS_PENDING) { return; } - this.logger.debug('Check status of pending transaction', txHash); + this.logger.debug('Check status of transaction', ethTx.hash); - // is it need? - const transactionReceipt = await this.web3.eth.getTransactionReceipt(txHash); + const transactionReceipt = await this.web3.eth.getTransactionReceipt(ethTx.hash); if (!transactionReceipt) { return; } - blockData = blockData || await this.web3.eth.getBlock(data.blockNumber); - - this.logger.info('Process pending transaction', txHash); + blockData = blockData || await this.web3.eth.getBlock(ethTx.blockNumber); tx.status = getTxStatusByReceipt(transactionReceipt); + tx.timestamp = blockData.timestamp; tx.blockNumber = blockData.number; + this.logger.debug('Save processed transaction', tx); + await this.txRep.save(tx); } @@ -224,8 +378,8 @@ export class Web3Event implements Web3EventInterface { .on('data', (data) => this.processNewBlockHeaders(data)); // process pending transactions - this.web3.eth.subscribe('pendingTransactions') - .on('data', (txHash) => this.processPendingTransaction(txHash, null)); + // this.web3.eth.subscribe('pendingTransactions') + // .on('data', (txHash) => this.processTransaction(txHash, null, {})); } } diff --git a/src/services/repositories/user.repository.ts b/src/services/repositories/user.repository.ts index 2711de8..d8bc51e 100644 --- a/src/services/repositories/user.repository.ts +++ b/src/services/repositories/user.repository.ts @@ -4,6 +4,7 @@ import { getMongoManager } from 'typeorm'; import { User } from '../../entities/user'; export interface UserRepositoryInterface { + getAllByWalletAddresses(walletAddresses: string[]): Promise; save(u: User): Promise; } @@ -12,6 +13,18 @@ export interface UserRepositoryInterface { */ @injectable() export class UserRepository { + getAllByWalletAddresses(walletAddresses: string[]): Promise { + return getMongoManager().createEntityCursor(User, { + 'wallets.address': { + $in: walletAddresses + }, + }).project({ + 'wallets.address': 1, + 'wallets.ticker': 1 + }) + .toArray(); + } + /** * * @param u diff --git a/src/services/tokens/erc20token.service.ts b/src/services/tokens/erc20token.service.ts index 6bb4332..4a54ea7 100644 --- a/src/services/tokens/erc20token.service.ts +++ b/src/services/tokens/erc20token.service.ts @@ -27,14 +27,14 @@ export class Erc20TokenService { /** * */ - async getBalanceOf(address: string, decimals: number = 18): Promise { + async getBalanceOf(address: string, decimals: number = 0): Promise { return web3utils.fromWei( await this.erc20Token.queryMethod({ methodName: 'balanceOf', gasPrice: '0', arguments: [address] }), - decimalsToUnitMap(decimals) + decimals ? decimalsToUnitMap(decimals) : 'wei' ).toString(); }