diff --git a/Cargo.lock b/Cargo.lock index 481707efdfdd3..6f614830f71cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2013,6 +2013,7 @@ dependencies = [ "aptos-metrics-core", "aptos-moving-average 0.1.0 (git+https://github.com/aptos-labs/aptos-indexer-processors.git?rev=4801acae7aea30d7e96bbfbe5ec5b04056dfa4cf)", "aptos-protos 1.3.0", + "aptos-transaction-filter", "async-trait", "clap 4.4.14", "futures", @@ -10189,9 +10190,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.7.1" +version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "memmap2" @@ -15431,18 +15432,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54378c645627613241d077a3a79db965db602882668f9136ac42af9ecb730ad" +checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.56" +version = "1.0.61" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" +checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml index c378e10000297..ae70ba89314fe 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/Cargo.toml @@ -19,6 +19,7 @@ aptos-indexer-grpc-utils = { workspace = true } aptos-metrics-core = { workspace = true } aptos-moving-average = { workspace = true } aptos-protos = { workspace = true } +aptos-transaction-filter = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } futures = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs index 797154c86cf8b..c5f621fcde703 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/config.rs @@ -13,8 +13,9 @@ use aptos_protos::{ transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, }; +use aptos_transaction_filter::BooleanTransactionFilter; use serde::{Deserialize, Serialize}; -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use tonic::{codec::CompressionEncoding, transport::Server}; pub const SERVER_NAME: &str = "idxdatasvc"; @@ -69,9 +70,18 @@ pub struct IndexerGrpcDataServiceConfig { pub enable_cache_compression: bool, #[serde(default)] pub in_memory_cache_config: InMemoryCacheConfig, - /// Sender addresses to ignore. Transactions from these addresses will not be indexed. - #[serde(default = "IndexerGrpcDataServiceConfig::default_sender_addresses_to_ignore")] - pub sender_addresses_to_ignore: Vec, + /// Any transaction that matches this filter will be stripped. This means we remove + /// the payload, signature, events, and writesets from it before sending it + /// downstream. This should only be used in an emergency situation, e.g. when txns + /// related to a certain module are too large and are causing issues for the data + /// service. Learn more here: + /// + /// https://www.notion.so/aptoslabs/Runbook-c006a37259394ac2ba904d6b54d180fa?pvs=4#171c210964ec42a89574fc80154f9e85 + /// + /// Generally you will want to start with this with an OR, and then list out + /// separate filters that describe each type of txn we want to strip. + #[serde(default = "IndexerGrpcDataServiceConfig::default_txns_to_strip_filter")] + pub txns_to_strip_filter: BooleanTransactionFilter, } impl IndexerGrpcDataServiceConfig { @@ -84,7 +94,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address: RedisUrl, enable_cache_compression: bool, in_memory_cache_config: InMemoryCacheConfig, - sender_addresses_to_ignore: Vec, + txns_to_strip_filter: BooleanTransactionFilter, ) -> Self { Self { data_service_grpc_tls_config, @@ -97,7 +107,7 @@ impl IndexerGrpcDataServiceConfig { redis_read_replica_address, enable_cache_compression, in_memory_cache_config, - sender_addresses_to_ignore, + txns_to_strip_filter, } } @@ -109,8 +119,9 @@ impl IndexerGrpcDataServiceConfig { false } - pub const fn default_sender_addresses_to_ignore() -> Vec { - vec![] + pub fn default_txns_to_strip_filter() -> BooleanTransactionFilter { + // This filter matches no txns. + BooleanTransactionFilter::new_or(vec![]) } } @@ -170,10 +181,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { self.redis_read_replica_address.clone(), self.file_store_config.clone(), self.data_service_response_channel_size, - self.sender_addresses_to_ignore - .clone() - .into_iter() - .collect::>(), + self.txns_to_strip_filter.clone(), cache_storage_format, Arc::new(in_memory_cache), )?; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs index 908b42ed9a96d..4813efda1aed9 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/metrics.rs @@ -100,12 +100,58 @@ pub static SHORT_CONNECTION_COUNT: Lazy = Lazy::new(|| { .unwrap() }); -/// Count of bytes transfered to the client. This only represents the bytes prepared and ready -/// to send to the client. It does not represent the bytes actually sent to the client. +/// Count of bytes transfered to the client. This only represents the bytes prepared and +/// ready to send to the client. This only t It does not represent the bytes actually +/// sent to the client. +/// +/// This is pre stripping, so it may include bytes for transactions that were later +/// stripped. See BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING for post +/// stirpping. pub static BYTES_READY_TO_TRANSFER_FROM_SERVER: Lazy = Lazy::new(|| { register_int_counter_vec!( "indexer_grpc_data_service_bytes_ready_to_transfer_from_server", - "Count of bytes ready to transfer to the client", + "Count of bytes ready to transfer to the client (pre stripping)", + &[ + "identifier_type", + "identifier", + "email", + "application_name", + "processor" + ], + ) + .unwrap() +}); + +/// Count of bytes transfered to the client. This only represents the bytes prepared and +/// ready to send to the client. This only t It does not represent the bytes actually +/// sent to the client. +/// +/// This is post stripping, meaning some transactions may have been stripped (removing +/// things such as events, writesets, payload, signature). Compare this with +/// BYTES_READY_TO_TRANSFER_FROM_SERVER to see how many bytes were stripped. +pub static BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING: Lazy = + Lazy::new(|| { + register_int_counter_vec!( + "indexer_grpc_data_service_bytes_ready_to_transfer_from_server_after_stripping", + "Count of bytes ready to transfer to the client (post stripping)", + &[ + "identifier_type", + "identifier", + "email", + "application_name", + "processor" + ], + ) + .unwrap() + }); + +/// The number of transactions that had data (such as events, writesets, payload, +/// signature) stripped from them due to the `txns_to_strip_filter`. See +/// `strip_transactions` for more. +pub static NUM_TRANSACTIONS_STRIPPED: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "indexer_grpc_data_service_num_transactions_stripped", + "Number of transactions that had data (such as events, writesets, payload, signature) stripped from them", &[ "identifier_type", "identifier", diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 60ee8045d36dd..752c642f59f72 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -2,8 +2,9 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::{ - BYTES_READY_TO_TRANSFER_FROM_SERVER, CONNECTION_COUNT, ERROR_COUNT, - LATEST_PROCESSED_VERSION_PER_PROCESSOR, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR, + BYTES_READY_TO_TRANSFER_FROM_SERVER, BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING, + CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION_PER_PROCESSOR, + NUM_TRANSACTIONS_STRIPPED, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR, PROCESSED_VERSIONS_COUNT_PER_PROCESSOR, SHORT_CONNECTION_COUNT, }; use anyhow::{Context, Result}; @@ -28,11 +29,12 @@ use aptos_protos::{ indexer::v1::{raw_data_server::RawData, GetTransactionsRequest, TransactionsResponse}, transaction::v1::{transaction::TxnData, Transaction}, }; +use aptos_transaction_filter::{BooleanTransactionFilter, Filterable}; use futures::Stream; use prost::Message; use redis::Client; use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, pin::Pin, str::FromStr, sync::Arc, @@ -77,7 +79,7 @@ pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, pub data_service_response_channel_size: usize, - pub sender_addresses_to_ignore: HashSet, + pub txns_to_strip_filter: BooleanTransactionFilter, pub cache_storage_format: StorageFormat, in_memory_cache: Arc, } @@ -92,10 +94,7 @@ impl std::fmt::Debug for RawDataServerWrapper { "data_service_response_channel_size", &self.data_service_response_channel_size, ) - .field( - "sender_addresses_to_ignore", - &self.sender_addresses_to_ignore, - ) + .field("txns_to_strip_filter", &self.txns_to_strip_filter) .field("cache_storage_format", &self.cache_storage_format) .finish() } @@ -106,7 +105,7 @@ impl RawDataServerWrapper { redis_address: RedisUrl, file_store_config: IndexerGrpcFileStoreConfig, data_service_response_channel_size: usize, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, cache_storage_format: StorageFormat, in_memory_cache: Arc, ) -> anyhow::Result { @@ -118,7 +117,7 @@ impl RawDataServerWrapper { ), file_store_config, data_service_response_channel_size, - sender_addresses_to_ignore, + txns_to_strip_filter, cache_storage_format, in_memory_cache, }) @@ -194,7 +193,7 @@ impl RawData for RawDataServerWrapper { let redis_client = self.redis_client.clone(); let cache_storage_format = self.cache_storage_format; let request_metadata = Arc::new(request_metadata); - let sender_addresses_to_ignore = self.sender_addresses_to_ignore.clone(); + let txns_to_strip_filter = self.txns_to_strip_filter.clone(); let in_memory_cache = self.in_memory_cache.clone(); tokio::spawn({ let request_metadata = request_metadata.clone(); @@ -206,7 +205,7 @@ impl RawData for RawDataServerWrapper { request_metadata, transactions_count, tx, - sender_addresses_to_ignore, + txns_to_strip_filter, current_version, in_memory_cache, ) @@ -394,7 +393,7 @@ async fn data_fetcher_task( request_metadata: Arc, transactions_count: Option, tx: tokio::sync::mpsc::Sender>, - sender_addresses_to_ignore: HashSet, + txns_to_strip_filter: BooleanTransactionFilter, mut current_version: u64, in_memory_cache: Arc, ) { @@ -529,11 +528,22 @@ async fn data_fetcher_task( // 2. Push the data to the response channel, i.e. stream the data to the client. let current_batch_size = transaction_data.as_slice().len(); let end_of_batch_version = transaction_data.as_slice().last().unwrap().version; - let resp_items = get_transactions_responses_builder( + let (resp_items, num_stripped) = get_transactions_responses_builder( transaction_data, chain_id as u32, - &sender_addresses_to_ignore, + &txns_to_strip_filter, ); + NUM_TRANSACTIONS_STRIPPED + .with_label_values(&request_metadata.get_label_values()) + .inc_by(num_stripped as u64); + let bytes_ready_to_transfer_after_stripping = resp_items + .iter() + .flat_map(|response| &response.transactions) + .map(|t| t.encoded_len()) + .sum::(); + BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING + .with_label_values(&request_metadata.get_label_values()) + .inc_by(bytes_ready_to_transfer_after_stripping as u64); let data_latency_in_secs = resp_items .last() .unwrap() @@ -548,7 +558,7 @@ async fn data_fetcher_task( .await { Ok(_) => { - // TODO: Reasses whether this metric useful + // TODO: Reasses whether this metric is useful. LATEST_PROCESSED_VERSION_PER_PROCESSOR .with_label_values(&request_metadata.get_label_values()) .set(end_of_batch_version as i64); @@ -672,22 +682,26 @@ fn ensure_sequential_transactions(mut batches: Vec>) -> Vec, chain_id: u32, - sender_addresses_to_ignore: &HashSet, -) -> Vec { - let filtered_transactions = - filter_transactions_for_sender_addresses(transactions, sender_addresses_to_ignore); - let chunks = chunk_transactions(filtered_transactions, MESSAGE_SIZE_LIMIT); - chunks + txns_to_strip_filter: &BooleanTransactionFilter, +) -> (Vec, usize) { + let (stripped_transactions, num_stripped) = + strip_transactions(transactions, txns_to_strip_filter); + let chunks = chunk_transactions(stripped_transactions, MESSAGE_SIZE_LIMIT); + let responses = chunks .into_iter() .map(|chunk| TransactionsResponse { chain_id: Some(chain_id as u64), transactions: chunk, }) - .collect() + .collect(); + (responses, num_stripped) } // This is a CPU bound operation, so we spawn_blocking @@ -956,37 +970,53 @@ async fn channel_send_multiple_with_timeout( Ok(()) } -fn filter_transactions_for_sender_addresses( +/// This function strips transactions that match the given filter. Stripping means we +/// remove the payload, signature, events, and writesets. Note, the filter can be +/// composed of many conditions, see `BooleanTransactionFilter` for more. +/// +/// This returns the mutated txns and the number of txns that were stripped. +fn strip_transactions( transactions: Vec, - sender_addresses_to_ignore: &HashSet, -) -> Vec { - transactions + txns_to_strip_filter: &BooleanTransactionFilter, +) -> (Vec, usize) { + let mut stripped_count = 0; + + let stripped_transactions: Vec = transactions .into_iter() .map(|mut txn| { - if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { - if let Some(utr) = user_transaction.request.as_mut() { - if sender_addresses_to_ignore.contains(&utr.sender) { + // Note: `is_allowed` means the txn matches the filter, in which case + // we strip it. + if txns_to_strip_filter.is_allowed(&txn) { + stripped_count += 1; + if let Some(info) = txn.info.as_mut() { + info.changes = vec![]; + } + if let Some(TxnData::User(user_transaction)) = txn.txn_data.as_mut() { + user_transaction.events = vec![]; + if let Some(utr) = user_transaction.request.as_mut() { // Wipe the payload and signature. utr.payload = None; utr.signature = None; - user_transaction.events = vec![]; - txn.info.as_mut().unwrap().changes = vec![]; } } } txn }) - .collect() + .collect(); + + (stripped_transactions, stripped_count) } #[cfg(test)] mod tests { - use super::{ensure_sequential_transactions, filter_transactions_for_sender_addresses}; + use super::*; use aptos_protos::transaction::v1::{ transaction::TxnData, Event, Signature, Transaction, TransactionInfo, TransactionPayload, UserTransaction, UserTransactionRequest, WriteSetChange, }; - use std::collections::HashSet; + use aptos_transaction_filter::{ + boolean_transaction_filter::APIFilter, filters::UserTransactionFilterBuilder, + }; #[test] fn test_ensure_sequential_transactions_merges_and_sorts() { @@ -1034,7 +1064,7 @@ mod tests { } #[test] - fn test_transactions_are_filter_correctly() { + fn test_transactions_are_stripped_correctly_sender_addresses() { let sender_address = "0x1234".to_string(); // Create a transaction with a user transaction let txn = Transaction { @@ -1054,12 +1084,25 @@ mod tests { }), ..Default::default() }; - // create ignore list. - let ignore_hash_set: HashSet = vec![sender_address].into_iter().collect(); - let filtered_txn = filter_transactions_for_sender_addresses(vec![txn], &ignore_hash_set); - assert_eq!(filtered_txn.len(), 1); - let txn = filtered_txn.first().unwrap(); + // Create filter for senders to ignore. + let sender_filters = vec![sender_address] + .into_iter() + .map(|address| { + BooleanTransactionFilter::from(APIFilter::UserTransactionFilter( + UserTransactionFilterBuilder::default() + .sender(address) + .build() + .unwrap(), + )) + }) + .collect(); + let filter = BooleanTransactionFilter::new_or(sender_filters); + + let (filtered_txns, num_stripped) = strip_transactions(vec![txn], &filter); + assert_eq!(num_stripped, 1); + assert_eq!(filtered_txns.len(), 1); + let txn = filtered_txns.first().unwrap(); let user_transaction = match &txn.txn_data { Some(TxnData::User(user_transaction)) => user_transaction, _ => panic!("Expected user transaction"),