Skip to content

Commit

Permalink
ethereum: Support Celo block reward events
Browse files Browse the repository at this point in the history
  • Loading branch information
leoyvens committed Jul 30, 2021
1 parent 0308f44 commit 1575f3a
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
17 changes: 15 additions & 2 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ lazy_static! {
.expect("invalid GRAPH_ETHEREUM_TARGET_TRIGGERS_PER_BLOCK_RANGE");
}

/// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320
const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320];

pub struct Chain {
logger_factory: LoggerFactory,
name: String,
Expand Down Expand Up @@ -161,7 +164,7 @@ impl Blockchain for Chain {
Ok(Arc::new(adapter))
}

fn new_block_stream(
async fn new_block_stream(
&self,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
Expand Down Expand Up @@ -194,6 +197,16 @@ impl Blockchain for Chain {
self.name, requirements
));

// Special case: Detect Celo and set the threshold to 0, so that eth_getLogs is always used.
// This is ok because Celo blocks are always final. And we _need_ to do this because
// some events appear only in eth_getLogs but not in transaction receipts.
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
let chain_id = self.eth_adapters.cheapest().unwrap().chain_id().await?;
let reorg_threshold = match CELO_CHAIN_IDS.contains(&chain_id) {
false => self.reorg_threshold,
true => 0,
};

Ok(BlockStream::new(
writable,
chain_store,
Expand All @@ -203,7 +216,7 @@ impl Blockchain for Chain {
deployment.hash,
filter,
start_blocks,
self.reorg_threshold,
reorg_threshold,
logger,
metrics,
*MAX_BLOCK_RANGE_SIZE,
Expand Down
23 changes: 18 additions & 5 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::BTreeMap;
use std::str::FromStr;
use std::{convert::TryFrom, sync::Arc};
use tiny_keccak::keccak256;
use web3::types::Log;
use web3::types::{Log, Transaction};

use graph::{
blockchain::{self, Blockchain, DataSource as _},
Expand Down Expand Up @@ -484,15 +484,28 @@ impl DataSource {
)
);

let transaction = Arc::new(
// Special case: In Celo, there are Epoch Rewards events, which do not have an
// associated transaction and instead have `transaction_hash == block.hash`,
// in which case we pass a dummy transaction to the mappings.
// See also ca0edc58-0ec5-4c89-a7dd-2241797f5e50.
let transaction = if log.transaction_hash != block.hash {
block
.transaction_for_log(&log)
.context("Found no transaction for event")?,
);
.context("Found no transaction for event")?
} else {
// Infer some fields from the log and fill the rest with zeros.
Transaction {
hash: log.transaction_hash.unwrap(),
block_hash: block.hash,
block_number: block.number,
transaction_index: log.transaction_index,
..Transaction::default()
}
};

Ok(Some(MappingTrigger::Log {
block,
transaction,
transaction: Arc::new(transaction),
log: log.cheap_clone(),
params,
handler: event_handler,
Expand Down
4 changes: 4 additions & 0 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,10 @@ impl EthereumAdapter {
.collect(),
)
}

pub async fn chain_id(&self) -> Result<u64, Error> {
Ok(u64::try_from(self.web3.eth().chain_id().compat().await?).unwrap())
}
}

#[async_trait]
Expand Down
3 changes: 2 additions & 1 deletion core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,8 @@ where
ctx.state.filter.clone(),
ctx.block_stream_metrics.clone(),
ctx.inputs.unified_api_version.clone(),
)?
)
.await?
.map_err(CancelableError::Error)
.cancelable(&block_stream_canceler, || CancelableError::Cancel)
.compat();
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ pub trait Blockchain: Debug + Sized + Send + Sync + 'static {
stopwatch_metrics: StopwatchMetrics,
) -> Result<Arc<Self::TriggersAdapter>, Error>;

fn new_block_stream(
async fn new_block_stream(
&self,
deployment: DeploymentLocator,
start_blocks: Vec<BlockNumber>,
Expand Down

0 comments on commit 1575f3a

Please sign in to comment.