diff --git a/config/config.json b/config/config.json index 6f687077a..64d5fb3f0 100644 --- a/config/config.json +++ b/config/config.json @@ -77,7 +77,14 @@ "port": "3306", "host": "localhost", "dialect": "mysql", - "logging": false + "logging": false, + "pool": { + "max": 120, + "min": 0, + "acquire": 60000, + "idle": 10000, + "evict": 1000 + } } } } @@ -260,7 +267,14 @@ "port": "3306", "host": "localhost", "dialect": "mysql", - "logging": false + "logging": false, + "pool": { + "max": 120, + "min": 0, + "acquire": 60000, + "idle": 10000, + "evict": 1000 + } } } } @@ -413,7 +427,14 @@ "port": "3306", "host": "localhost", "dialect": "mysql", - "logging": false + "logging": false, + "pool": { + "max": 120, + "min": 0, + "acquire": 60000, + "idle": 10000, + "evict": 1000 + } } } } @@ -597,7 +618,14 @@ "port": "3306", "host": "localhost", "dialect": "mysql", - "logging": false + "logging": false, + "pool": { + "max": 120, + "min": 0, + "acquire": 60000, + "idle": 10000, + "evict": 1000 + } } } } @@ -781,7 +809,14 @@ "port": "3306", "host": "localhost", "dialect": "mysql", - "logging": false + "logging": false, + "pool": { + "max": 120, + "min": 0, + "acquire": 60000, + "idle": 10000, + "evict": 1000 + } } } } diff --git a/ot-node.js b/ot-node.js index 5614b42e3..4ce41e5af 100644 --- a/ot-node.js +++ b/ot-node.js @@ -89,6 +89,18 @@ class OTNode { this.logger, this.config, ); + + MigrationExecutor.executeServiceAgreementPruningMigration( + this.container, + this.logger, + this.config, + ); + + MigrationExecutor.executeRemoveDuplicateServiceAgreementMigration( + this.container, + this.logger, + this.config, + ); } checkNodeVersion() { diff --git a/package-lock.json b/package-lock.json index 135deab96..bc8afa769 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "6.5.0", + "version": "6.5.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "6.5.0", + "version": "6.5.1", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^2.4.3", diff --git a/package.json b/package.json index bb88a9c32..e7f54dafa 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "6.5.0", + "version": "6.5.1", "description": "OTNode V6", "main": "index.js", "type": "module", diff --git a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js index baea3a9ec..7932f684c 100644 --- a/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js +++ b/src/commands/protocols/common/epoch-check/blockchain-epoch-check-command.js @@ -51,10 +51,15 @@ class BlockchainEpochCheckCommand extends Command { command.period, ); + const numberOfBlockchains = this.blockchainModuleManager.getImplementationNames().length; + // We don't expect to have this many transactions in one epoch check window. // This is just to make sure we don't schedule too many commands and block the queue // TODO: find general solution for all commands scheduling blockchain transactions - totalTransactions = Math.min(totalTransactions, COMMAND_QUEUE_PARALLELISM * 0.3); + totalTransactions = Math.min( + totalTransactions, + Math.floor(COMMAND_QUEUE_PARALLELISM / numberOfBlockchains), + ); const transactionQueueLength = this.blockchainModuleManager.getTotalTransactionQueueLength(blockchain); diff --git a/src/constants/constants.js b/src/constants/constants.js index ea771a96f..e06bf467a 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -555,7 +555,7 @@ export const ARCHIVE_UPDATE_RESPONSES_FOLDER = 'update_responses'; * How many commands will run in parallel * @type {number} */ -export const COMMAND_QUEUE_PARALLELISM = 100; +export const COMMAND_QUEUE_PARALLELISM = 150; export const GET_LATEST_SERVICE_AGREEMENT_BATCH_SIZE = 50; diff --git a/src/migration/migration-executor.js b/src/migration/migration-executor.js index 0c84318a6..77469b986 100644 --- a/src/migration/migration-executor.js +++ b/src/migration/migration-executor.js @@ -18,6 +18,8 @@ import MarkStakingEventsAsProcessedMigration from './mark-staking-events-as-proc import RemoveServiceAgreementsForChiadoMigration from './remove-service-agreements-for-chiado-migration.js'; import MultipleOpWalletsUserConfigurationMigration from './multiple-op-wallets-user-configuration-migration.js'; import GetOldServiceAgreementsMigration from './get-old-service-agreements-migration.js'; +import ServiceAgreementPruningMigration from './service-agreement-pruning-migration.js'; +import RemoveDuplicateServiceAgreementMigration from './remove-duplicate-service-agreement-migration.js'; class MigrationExecutor { static async executePullShardingTableMigration(container, logger, config) { @@ -442,6 +444,64 @@ class MigrationExecutor { } } + static async executeServiceAgreementPruningMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + const serviceAgreementService = container.resolve('serviceAgreementService'); + + const migration = new ServiceAgreementPruningMigration( + 'serviceAgreementPruningMigration', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ); + if (!(await migration.migrationAlreadyExecuted())) { + try { + await migration.migrate(); + } catch (error) { + logger.error( + `Unable to execute service agreement pruning migration. Error: ${error.message}`, + ); + } + } + } + + static async executeRemoveDuplicateServiceAgreementMigration(container, logger, config) { + if ( + process.env.NODE_ENV === NODE_ENVIRONMENTS.DEVELOPMENT || + process.env.NODE_ENV === NODE_ENVIRONMENTS.TEST + ) + return; + + const repositoryModuleManager = container.resolve('repositoryModuleManager'); + const blockchainModuleManager = container.resolve('blockchainModuleManager'); + + const migration = new RemoveDuplicateServiceAgreementMigration( + 'removeDuplicateServiceAgreementMigration', + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + ); + if (!(await migration.migrationAlreadyExecuted())) { + try { + await migration.migrate(); + } catch (error) { + logger.error( + `Unable to execute remove duplicate service agreement migration. Error: ${error.message}`, + ); + } + } + } + static exitNode(code = 0) { process.exit(code); } diff --git a/src/migration/remove-duplicate-service-agreement-migration.js b/src/migration/remove-duplicate-service-agreement-migration.js new file mode 100644 index 000000000..489596173 --- /dev/null +++ b/src/migration/remove-duplicate-service-agreement-migration.js @@ -0,0 +1,47 @@ +import BaseMigration from './base-migration.js'; + +class RemoveDuplicateServiceAgreementMigration extends BaseMigration { + constructor(migrationName, logger, config, repositoryModuleManager, blockchainModuleManager) { + super(migrationName, logger, config); + this.repositoryModuleManager = repositoryModuleManager; + this.blockchainModuleManager = blockchainModuleManager; + } + + async executeMigration() { + const blockchainIds = this.blockchainModuleManager.getImplementationNames(); + + for (const blockchainId of blockchainIds) { + const incorrectServiceAgreementId = []; + const duplicateTokenIdsRestult = + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.findDuplicateServiceAgreements(blockchainId); + const duplicateTokenIds = duplicateTokenIdsRestult.map((t) => t.dataValues.token_id); + const findDuplicateServiceAgreements = + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.findServiceAgreementsByTokenIds( + duplicateTokenIds, + blockchainId, + ); + for (const serviceAgreement of findDuplicateServiceAgreements) { + try { + const blockchainAssertionId = + // eslint-disable-next-line no-await-in-loop + await this.blockchainModuleManager.getAssertionIdByIndex( + blockchainId, + serviceAgreement.assetStorageContractAddress, + serviceAgreement.tokenId, + serviceAgreement.stateIndex, + ); + if (serviceAgreement.assertionId !== blockchainAssertionId) { + incorrectServiceAgreementId.push(serviceAgreement.agreementId); + } + } catch (error) { + incorrectServiceAgreementId.push(serviceAgreement.agreementId); + } + } + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.removeServiceAgreements(incorrectServiceAgreementId); + } + } +} +export default RemoveDuplicateServiceAgreementMigration; diff --git a/src/migration/service-agreement-pruning-migration.js b/src/migration/service-agreement-pruning-migration.js new file mode 100644 index 000000000..c29ba4de1 --- /dev/null +++ b/src/migration/service-agreement-pruning-migration.js @@ -0,0 +1,46 @@ +import BaseMigration from './base-migration.js'; + +class ServiceAgreementPruningMigration extends BaseMigration { + constructor( + migrationName, + logger, + config, + repositoryModuleManager, + blockchainModuleManager, + serviceAgreementService, + ) { + super(migrationName, logger, config); + this.repositoryModuleManager = repositoryModuleManager; + this.blockchainModuleManager = blockchainModuleManager; + this.serviceAgreementService = serviceAgreementService; + } + + async executeMigration() { + const blockchainIds = this.blockchainModuleManager.getImplementationNames(); + + // eslint-disable-next-line no-await-in-loop + for (const blockchainId of blockchainIds) { + const assetStorageContractAddresses = + // eslint-disable-next-line no-await-in-loop + await this.blockchainModuleManager.getAssetStorageContractAddresses(blockchainId); + + const countOfServiceAgreementsToBeRemoved = + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.getCountOfServiceAgreementsByBlockchainAndContract( + blockchainId, + assetStorageContractAddresses[0], + ); + + // removeServiceAgreementsByBlockchainAndContract deletes in batches od 100_000 + const numberOfIteration = Math.ceil(countOfServiceAgreementsToBeRemoved / 100_000); + for (let i = 0; i < numberOfIteration; i += 1) { + // eslint-disable-next-line no-await-in-loop + await this.repositoryModuleManager.removeServiceAgreementsByBlockchainAndContract( + blockchainId, + assetStorageContractAddresses[0], + ); + } + } + } +} +export default ServiceAgreementPruningMigration; diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index 6ea134353..19d1eeb9d 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -397,18 +397,17 @@ class Libp2pService { let readResponseStart; let readResponseEnd; let response; + const abortSignalEventListener = async () => { + stream.abort(); + response = null; + }; const timeoutController = new TimeoutController(timeout); try { readResponseStart = Date.now(); - timeoutController.signal.addEventListener( - 'abort', - async () => { - stream.abort(); - response = null; - }, - { once: true }, - ); + timeoutController.signal.addEventListener('abort', abortSignalEventListener, { + once: true, + }); response = await this._readMessageFromStream( stream, @@ -420,12 +419,12 @@ class Libp2pService { throw Error('Message timed out!'); } - timeoutController.signal.removeEventListener('abort'); + timeoutController.signal.removeEventListener('abort', abortSignalEventListener); timeoutController.clear(); readResponseEnd = Date.now(); } catch (error) { - timeoutController.signal.removeEventListener('abort'); + timeoutController.signal.removeEventListener('abort', abortSignalEventListener); timeoutController.clear(); readResponseEnd = Date.now(); diff --git a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js index 062a6876e..844a523dc 100644 --- a/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js +++ b/src/modules/repository/implementation/sequelize/repositories/service-agreement-repository.js @@ -190,7 +190,7 @@ class ServiceAgreementRepository { ['scoreFunctionId', 'DESC'], [Sequelize.col('timeLeftInSubmitCommitWindow'), 'ASC'], ], - limit: 100, + limit: 500, raw: true, }); } @@ -249,7 +249,7 @@ class ServiceAgreementRepository { ['scoreFunctionId', 'DESC'], [Sequelize.col('timeLeftInSubmitProofWindow'), 'ASC'], ], - limit: 100, + limit: 500, raw: true, }); } @@ -286,6 +286,51 @@ class ServiceAgreementRepository { }, }); } + + async getCountOfServiceAgreementsByBlockchainAndContract(blockchainId, contract) { + return this.model.count({ + where: { + blockchainId, + assetStorageContractAddress: { + [Sequelize.Op.ne]: contract, + }, + }, + }); + } + + // Sequelize destroy method doesn't support limit + async removeServiceAgreementsByBlockchainAndContract(blockchainId, contract) { + const query = ` + DELETE FROM service_agreement + WHERE blockchain_id = '${blockchainId}' + AND asset_storage_contract_address != '${contract}' + LIMIT 100000; + `; + await this.sequelize.query(query, { + type: Sequelize.QueryTypes.DELETE, + }); + } + + async findDuplicateServiceAgreements(blockchainId) { + return this.model.findAll({ + attributes: ['token_id', [Sequelize.fn('COUNT', Sequelize.col('*')), 'count']], + where: { + blockchain_id: `${blockchainId}`, + }, + group: ['token_id'], + having: Sequelize.literal('count > 1'), + }); + } + + async findServiceAgreementsByTokenIds(tokenIds, blockchainId) { + return this.model.findAll({ + where: { + tokenId: { [Sequelize.Op.in]: tokenIds }, + blockchainId, + }, + order: [['token_id']], + }); + } } export default ServiceAgreementRepository; diff --git a/src/modules/repository/repository-module-manager.js b/src/modules/repository/repository-module-manager.js index 30ad35a16..eea9d4983 100644 --- a/src/modules/repository/repository-module-manager.js +++ b/src/modules/repository/repository-module-manager.js @@ -403,6 +403,22 @@ class RepositoryModuleManager extends BaseModuleManager { } } + async getCountOfServiceAgreementsByBlockchainAndContract(blockchainId, contract) { + if (this.initialized) { + return this.getRepository( + 'service_agreement', + ).getCountOfServiceAgreementsByBlockchainAndContract(blockchainId, contract); + } + } + + async removeServiceAgreementsByBlockchainAndContract(blockchainId, contract) { + if (this.initialized) { + return this.getRepository( + 'service_agreement', + ).removeServiceAgreementsByBlockchainAndContract(blockchainId, contract); + } + } + async getEligibleAgreementsForSubmitCommit( timestampSeconds, blockchain, @@ -473,6 +489,17 @@ class RepositoryModuleManager extends BaseModuleManager { ); } + async findDuplicateServiceAgreements(blockchainId) { + return this.getRepository('service_agreement').findDuplicateServiceAgreements(blockchainId); + } + + async findServiceAgreementsByTokenIds(tokenIds, blockchainId) { + return this.getRepository('service_agreement').findServiceAgreementsByTokenIds( + tokenIds, + blockchainId, + ); + } + async createParanetRecord(name, description, paranetId, blockchainId) { this.getRepository('paranet').createParanetRecord( name,