diff --git a/core/src/execution/errors.rs b/core/src/execution/errors.rs index 3f53a704..812206b8 100644 --- a/core/src/execution/errors.rs +++ b/core/src/execution/errors.rs @@ -31,6 +31,8 @@ pub enum ExecutionError { BlockReceiptsRootMismatch(BlockTag), #[error("filter not found: 0x{0:x}")] FilterNotFound(U256), + #[error("log does not match filter")] + LogFilterMismatch(), } /// Errors that can occur during evm.rs calls diff --git a/core/src/execution/mod.rs b/core/src/execution/mod.rs index 521a4cf3..53ac0c81 100644 --- a/core/src/execution/mod.rs +++ b/core/src/execution/mod.rs @@ -332,7 +332,7 @@ impl> ExecutionClient { ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(), ); } - + self.ensure_logs_match_filter(&logs, &filter).await?; self.verify_logs(&logs).await?; Ok(logs) } @@ -345,7 +345,7 @@ impl> ExecutionClient { // only concerned with filters created via helios return Err(ExecutionError::FilterNotFound(filter_id).into()); } - Some(FilterType::Logs) => { + Some(FilterType::Logs(filter)) => { // underlying RPC takes care of keeping track of changes let filter_changes = self.rpc.get_filter_changes(filter_id).await?; let logs = filter_changes.as_logs().unwrap_or(&[]); @@ -356,6 +356,7 @@ impl> ExecutionClient { ) .into()); } + self.ensure_logs_match_filter(logs, filter).await?; self.verify_logs(logs).await?; FilterChanges::Logs(logs.to_vec()) } @@ -387,14 +388,27 @@ impl> ExecutionClient { } pub async fn get_filter_logs(&self, filter_id: U256) -> Result> { - let logs = self.rpc.get_filter_logs(filter_id).await?; - if logs.len() > MAX_SUPPORTED_LOGS_NUMBER { - return Err( - ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(), - ); + let filter_type = self.state.get_filter(&filter_id).await; + + match &filter_type { + Some(FilterType::Logs(filter)) => { + let logs = self.rpc.get_filter_logs(filter_id).await?; + if logs.len() > MAX_SUPPORTED_LOGS_NUMBER { + return Err(ExecutionError::TooManyLogsToProve( + logs.len(), + MAX_SUPPORTED_LOGS_NUMBER, + ) + .into()); + } + self.ensure_logs_match_filter(&logs, filter).await?; + self.verify_logs(&logs).await?; + Ok(logs) + } + _ => { + // only concerned with filters created via helios + return Err(ExecutionError::FilterNotFound(filter_id).into()); + } } - self.verify_logs(&logs).await?; - Ok(logs) } pub async fn uninstall_filter(&self, filter_id: U256) -> Result { @@ -421,7 +435,9 @@ impl> ExecutionClient { let filter_id = self.rpc.new_filter(&filter).await?; // record the filter in the state - self.state.push_filter(filter_id, FilterType::Logs).await; + self.state + .push_filter(filter_id, FilterType::Logs(filter)) + .await; Ok(filter_id) } @@ -449,6 +465,50 @@ impl> ExecutionClient { Ok(filter_id) } + /// Ensure that each log entry in the given array of logs match the given filter. + async fn ensure_logs_match_filter(&self, logs: &[Log], filter: &Filter) -> Result<()> { + fn log_matches_filter(log: &Log, filter: &Filter) -> bool { + if let Some(block_hash) = filter.get_block_hash() { + if log.block_hash.unwrap() != block_hash { + return false; + } + } + if let Some(from_block) = filter.get_from_block() { + if log.block_number.unwrap() < from_block { + return false; + } + } + if let Some(to_block) = filter.get_to_block() { + if log.block_number.unwrap() > to_block { + return false; + } + } + if !filter.address.matches(&log.address()) { + return false; + } + for (i, topic) in filter.topics.iter().enumerate() { + if let Some(log_topic) = log.topics().get(i) { + if !topic.matches(log_topic) { + return false; + } + } else { + // if filter topic is not present in log, it's a mismatch + return false; + } + } + true + } + for log in logs { + if !log_matches_filter(log, filter) { + return Err(ExecutionError::LogFilterMismatch().into()); + } + } + Ok(()) + } + + /// Verify the integrity of each log entry in the given array of logs by + /// checking its inclusion in the corresponding transaction receipt + /// and verifying the transaction receipt itself against the block's receipt root. async fn verify_logs(&self, logs: &[Log]) -> Result<()> { // Collect all (unique) block numbers let block_nums = logs diff --git a/core/src/execution/state.rs b/core/src/execution/state.rs index c9f509c5..7802b462 100644 --- a/core/src/execution/state.rs +++ b/core/src/execution/state.rs @@ -7,7 +7,7 @@ use alloy::{ consensus::BlockHeader, network::{primitives::HeaderResponse, BlockResponse}, primitives::{Address, B256, U256}, - rpc::types::BlockTransactions, + rpc::types::{BlockTransactions, Filter}, }; use eyre::{eyre, Result}; use tokio::{ @@ -373,7 +373,8 @@ struct TransactionLocation { #[derive(Clone)] pub enum FilterType { - Logs, + // filter content + Logs(Filter), // block number when the filter was created or last queried NewBlock(u64), PendingTransactions,