diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index c265f19a2d9..c6ac4924434 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -53,6 +53,9 @@ lazy_static! { .expect("invalid GRAPH_ETHEREUM_TARGET_TRIGGERS_PER_BLOCK_RANGE"); } +/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 +const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320]; + pub struct Chain { logger_factory: LoggerFactory, name: String, @@ -161,7 +164,7 @@ impl Blockchain for Chain { Ok(Arc::new(adapter)) } - fn new_block_stream( + async fn new_block_stream( &self, deployment: DeploymentLocator, start_blocks: Vec, @@ -194,6 +197,16 @@ impl Blockchain for Chain { self.name, requirements )); + // Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used. + // This is ok because Celo blocks are always final. And we _need_ to do this because + // some events appear only in eth_getLogs but not in transaction receipts. + // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50. + let chain_id = self.eth_adapters.cheapest().unwrap().chain_id().await?; + let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) { + false => self.reorg_threshold, + true => 0, + }; + Ok(BlockStream::new( writable, chain_store, @@ -203,7 +216,7 @@ impl Blockchain for Chain { deployment.hash, filter, start_blocks, - self.reorg_threshold, + reorg_threshold, logger, metrics, *MAX_BLOCK_RANGE_SIZE, diff --git a/chain/ethereum/src/data_source.rs b/chain/ethereum/src/data_source.rs index 1e9a308b715..4a6feea5aad 100644 --- a/chain/ethereum/src/data_source.rs +++ b/chain/ethereum/src/data_source.rs @@ -8,7 +8,7 @@ use std::collections::BTreeMap; use std::str::FromStr; use std::{convert::TryFrom, sync::Arc}; use tiny_keccak::keccak256; -use web3::types::Log; +use web3::types::{Log, Transaction}; use graph::{ blockchain::{self, Blockchain, DataSource as _}, @@ -484,15 +484,28 @@ impl DataSource { ) ); - let transaction = Arc::new( + // Special case: In Celo, there are Epoch Rewards events, which do not have an + // associated transaction and instead have `transaction_hash == block.hash`, + // in which case we pass a dummy transaction to the mappings. + // See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50. + let transaction = if log.transaction_hash != block.hash { block .transaction_for_log(&log) - .context("Found no transaction for event")?, - ); + .context("Found no transaction for event")? + } else { + // Infer some fields from the log and fill the rest with zeros. + Transaction { + hash: log.transaction_hash.unwrap(), + block_hash: block.hash, + block_number: block.number, + transaction_index: log.transaction_index, + ..Transaction::default() + } + }; Ok(Some(MappingTrigger::Log { block, - transaction, + transaction: Arc::new(transaction), log: log.cheap_clone(), params, handler: event_handler, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index a2e72fd627c..2442dad253e 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -790,6 +790,10 @@ impl EthereumAdapter { .collect(), ) } + + pub async fn chain_id(&self) -> Result { + Ok(u64::try_from(self.web3.eth().chain_id().compat().await?).unwrap()) + } } #[async_trait] diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 74899b7f6e9..cad88f3d68c 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -459,7 +459,8 @@ where ctx.state.filter.clone(), ctx.block_stream_metrics.clone(), ctx.inputs.unified_api_version.clone(), - )? + ) + .await? .map_err(CancelableError::Error) .cancelable(&block_stream_canceler, || CancelableError::Cancel) .compat(); diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index b86403b271c..6ffdc0ac9c2 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -97,7 +97,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + 'static { stopwatch_metrics: StopwatchMetrics, ) -> Result, Error>; - fn new_block_stream( + async fn new_block_stream( &self, deployment: DeploymentLocator, start_blocks: Vec,