Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure logs returned by RPC match filter #507

Merged
merged 6 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/execution/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub enum ExecutionError {
BlockReceiptsRootMismatch(BlockTag),
#[error("filter not found: 0x{0:x}")]
FilterNotFound(U256),
#[error("log does not match filter")]
LogFilterMismatch(),
}

/// Errors that can occur during evm.rs calls
Expand Down
80 changes: 70 additions & 10 deletions core/src/execution/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
}

self.ensure_logs_match_filter(&logs, &filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
Expand All @@ -345,7 +345,7 @@
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
Some(FilterType::Logs) => {
Some(FilterType::Logs(filter)) => {
// underlying RPC takes care of keeping track of changes
let filter_changes = self.rpc.get_filter_changes(filter_id).await?;
let logs = filter_changes.as_logs().unwrap_or(&[]);
Expand All @@ -356,6 +356,7 @@
)
.into());
}
self.ensure_logs_match_filter(logs, filter).await?;
self.verify_logs(logs).await?;
FilterChanges::Logs(logs.to_vec())
}
Expand All @@ -369,7 +370,7 @@
// so next call can filter starting from the prev call's (last block number + 1)
self.state
.push_filter(
filter_id,

Check failure on line 373 in core/src/execution/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

unneeded `return` statement
FilterType::NewBlock(blocks.last().unwrap().header().number()),
)
.await;
Expand All @@ -387,14 +388,27 @@
}

pub async fn get_filter_logs(&self, filter_id: U256) -> Result<Vec<Log>> {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(
ExecutionError::TooManyLogsToProve(logs.len(), MAX_SUPPORTED_LOGS_NUMBER).into(),
);
let filter_type = self.state.get_filter(&filter_id).await;

match &filter_type {
Some(FilterType::Logs(filter)) => {
let logs = self.rpc.get_filter_logs(filter_id).await?;
if logs.len() > MAX_SUPPORTED_LOGS_NUMBER {
return Err(ExecutionError::TooManyLogsToProve(
logs.len(),
MAX_SUPPORTED_LOGS_NUMBER,
)
.into());
}
self.ensure_logs_match_filter(&logs, filter).await?;
self.verify_logs(&logs).await?;
Ok(logs)
}
_ => {
// only concerned with filters created via helios
return Err(ExecutionError::FilterNotFound(filter_id).into());
}
}
self.verify_logs(&logs).await?;
Ok(logs)
}

pub async fn uninstall_filter(&self, filter_id: U256) -> Result<bool> {
Expand All @@ -421,7 +435,9 @@
let filter_id = self.rpc.new_filter(&filter).await?;

// record the filter in the state
self.state.push_filter(filter_id, FilterType::Logs).await;
self.state
.push_filter(filter_id, FilterType::Logs(filter))
.await;

Ok(filter_id)
}
Expand Down Expand Up @@ -449,6 +465,50 @@
Ok(filter_id)
}

/// Ensure that each log entry in the given array of logs match the given filter.
async fn ensure_logs_match_filter(&self, logs: &[Log], filter: &Filter) -> Result<()> {
fn log_matches_filter(log: &Log, filter: &Filter) -> bool {
if let Some(block_hash) = filter.get_block_hash() {
if log.block_hash.unwrap() != block_hash {
return false;
}
}
if let Some(from_block) = filter.get_from_block() {
if log.block_number.unwrap() < from_block {
return false;
}
}
if let Some(to_block) = filter.get_to_block() {
if log.block_number.unwrap() > to_block {
return false;
}
}
if !filter.address.matches(&log.address()) {
return false;
}
for (i, topic) in filter.topics.iter().enumerate() {
if let Some(log_topic) = log.topics().get(i) {
eshaan7 marked this conversation as resolved.
Show resolved Hide resolved
if !topic.matches(log_topic) {
return false;
}
} else {
// if filter topic is not present in log, it's a mismatch
return false;
}
}
true
}
for log in logs {
if !log_matches_filter(log, filter) {
return Err(ExecutionError::LogFilterMismatch().into());
}
}
Ok(())
}

/// Verify the integrity of each log entry in the given array of logs by
/// checking its inclusion in the corresponding transaction receipt
/// and verifying the transaction receipt itself against the block's receipt root.
async fn verify_logs(&self, logs: &[Log]) -> Result<()> {
// Collect all (unique) block numbers
let block_nums = logs
Expand All @@ -472,7 +532,7 @@
let receipts_logs_encoded = receipts
.into_iter()
.filter_map(|receipt| {
let logs = N::receipt_logs(&receipt);

Check failure on line 535 in core/src/execution/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

writing `&Vec` instead of `&[_]` involves a new object where a slice will do
if logs.is_empty() {
None
} else {
Expand Down
5 changes: 3 additions & 2 deletions core/src/execution/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
consensus::BlockHeader,
network::{primitives::HeaderResponse, BlockResponse},
primitives::{Address, B256, U256},
rpc::types::BlockTransactions,
rpc::types::{BlockTransactions, Filter},
};
use eyre::{eyre, Result};
use tokio::{
Expand Down Expand Up @@ -303,7 +303,7 @@
}

fn prune_before(&mut self, n: u64) {
loop {

Check failure on line 306 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

this loop could be written as a `while let` loop
if let Some((oldest, _)) = self.blocks.first_key_value() {
let oldest = *oldest;
if oldest < n {
Expand All @@ -324,7 +324,7 @@

if let Some(block) = self.blocks.get(&n) {
let prev = n - 1;
if self.blocks.get(&prev).is_none() {

Check failure on line 327 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary use of `get(&prev).is_none()`
let backfilled = self.rpc.get_block(block.header().parent_hash()).await?;

if N::is_hash_valid(&backfilled)
Expand Down Expand Up @@ -372,8 +372,9 @@
}

#[derive(Clone)]
pub enum FilterType {

Check failure on line 375 in core/src/execution/state.rs

View workflow job for this annotation

GitHub Actions / clippy

large size difference between variants
Logs,
// filter content
Logs(Filter),
// block number when the filter was created or last queried
NewBlock(u64),
PendingTransactions,
Expand Down
Loading