Skip to content

Commit

Permalink
[fix] Multiple fixes:
Browse files Browse the repository at this point in the history
  - Fix error message when a cycle in dependencies is found
  - Fix tests when global module cache was shared between threads
  - Refactor and simplify tests
  • Loading branch information
georgemitenkov committed Oct 22, 2024
1 parent bc42dfd commit bb3e0de
Show file tree
Hide file tree
Showing 23 changed files with 342 additions and 313 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/aptos-vm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ serde = { workspace = true }

[dev-dependencies]
aptos-aggregator = { workspace = true, features = ["testing"] }
aptos-block-executor = { workspace = true, features = ["testing"] }
aptos-language-e2e-tests = { workspace = true }
aptos-types = { workspace = true, features = ["fuzzing"] }
claims = { workspace = true }
Expand Down
156 changes: 146 additions & 10 deletions aptos-move/aptos-vm/src/block_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use aptos_aggregator::{
delayed_change::DelayedChange, delta_change_set::DeltaOp, resolver::TAggregatorV1View,
};
use aptos_block_executor::{
cross_block_caches::get_environment_with_delayed_field_optimization_enabled,
errors::BlockExecutionError, executor::BlockExecutor,
code_cache_global::ImmutableModuleCache, errors::BlockExecutionError, executor::BlockExecutor,
task::TransactionOutput as BlockExecutorTransactionOutput,
txn_commit_hook::TransactionCommitHook, types::InputOutputKey,
};
Expand All @@ -29,27 +28,32 @@ use aptos_types::{
signature_verified_transaction::SignatureVerifiedTransaction, BlockOutput,
TransactionOutput, TransactionStatus,
},
vm::modules::AptosModuleExtension,
write_set::WriteOp,
};
use aptos_vm_environment::environment::AptosEnvironment;
use aptos_vm_logging::{flush_speculative_logs, init_speculative_logs};
use aptos_vm_types::{
abstract_write_op::AbstractResourceWriteOp, module_write_set::ModuleWrite, output::VMOutput,
resolver::ResourceGroupSize,
};
use move_binary_format::{errors::Location, CompiledModule};
use move_core_types::{
language_storage::StructTag,
language_storage::{ModuleId, StructTag},
value::MoveTypeLayout,
vm_status::{StatusCode, VMStatus},
};
use move_vm_runtime::{Module, WithRuntimeEnvironment};
use move_vm_types::delayed_values::delayed_field_id::DelayedFieldID;
use once_cell::sync::{Lazy, OnceCell};
use rayon::ThreadPool;
use std::{
collections::{BTreeMap, HashSet},
hash::Hash,
ops::Deref,
sync::Arc,
};

pub static RAYON_EXEC_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
static RAYON_EXEC_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus::get())
Expand All @@ -59,6 +63,60 @@ pub static RAYON_EXEC_POOL: Lazy<Arc<rayon::ThreadPool>> = Lazy::new(|| {
)
});

/// Immutable global module cache that can be shared across multiple block executions. The size of
/// the cache is fixed within a single block (modules are not inserted or removed) and it is only
/// mutated at the block boundaries. Do not use if multiple blocks are executed concurrently.
static GLOBAL_MODULE_CACHE: Lazy<
Arc<ImmutableModuleCache<ModuleId, CompiledModule, Module, AptosModuleExtension>>,
> = Lazy::new(|| Arc::new(ImmutableModuleCache::empty()));

/// The maximum size of struct name index map in runtime environment. Checked at block boundaries
/// only.
const MAX_STRUCT_NAME_INDEX_MAP_SIZE: usize = 100_000;

/// A cached environment that can be persisted globally across blocks.
static GLOBAL_ENVIRONMENT: Lazy<Mutex<Option<AptosEnvironment>>> = Lazy::new(|| Mutex::new(None));

/// Returns the cached environment if it exists and has the same configuration as if it was
/// created based on the current state, or creates a new one and caches it. Should only be
/// called at the block boundaries.
fn get_environment_with_delayed_field_optimization_enabled<K, DC, VC, E>(
state_view: &impl StateView,
global_module_cache: &ImmutableModuleCache<K, DC, VC, E>,
) -> Result<AptosEnvironment, VMStatus>
where
K: Hash + Eq + Clone,
VC: Deref<Target = Arc<DC>>,
{
// Create a new environment.
let current_env = AptosEnvironment::new_with_delayed_field_optimization_enabled(state_view);

// Lock the cache, and check if the environment is the same.
let mut global_environment = GLOBAL_ENVIRONMENT.lock();
if let Some(previous_env) = global_environment.as_ref() {
if &current_env == previous_env {
let runtime_env = previous_env.runtime_environment();
let struct_name_index_map_size = runtime_env
.struct_name_index_map_size()
.map_err(|e| e.finish(Location::Undefined).into_vm_status())?;
if struct_name_index_map_size > MAX_STRUCT_NAME_INDEX_MAP_SIZE {
// Cache is too large, flush it. Also flush the module cache.
runtime_env.flush_struct_name_and_info_caches();
global_module_cache.flush_unchecked();
}
return Ok(previous_env.clone());
}
}

// It is not cached or has changed, so we have to reset it. As a result, we need to flush
// the cross-block cache because we need to reload all modules with new configs.
*global_environment = Some(current_env.clone());
drop(global_environment);
global_module_cache.flush_unchecked();

Ok(current_env)
}

/// Output type wrapper used by block executor. VM output is stored first, then
/// transformed into TransactionOutput type that is returned.
#[derive(Debug)]
Expand Down Expand Up @@ -390,13 +448,16 @@ impl BlockExecutorTransactionOutput for AptosTransactionOutput {
pub struct BlockAptosVM;

impl BlockAptosVM {
pub fn execute_block_on_thread_pool<
fn execute_block_on_thread_pool<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
>(
executor_thread_pool: Arc<ThreadPool>,
executor_thread_pool: Arc<rayon::ThreadPool>,
signature_verified_block: &[SignatureVerifiedTransaction],
state_view: &S,
global_module_cache: Arc<
ImmutableModuleCache<ModuleId, CompiledModule, Module, AptosModuleExtension>,
>,
config: BlockExecutorConfig,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Expand All @@ -409,15 +470,25 @@ impl BlockAptosVM {
}

BLOCK_EXECUTOR_CONCURRENCY.set(config.local.concurrency_level as i64);

let environment = get_environment_with_delayed_field_optimization_enabled(
state_view,
global_module_cache.as_ref(),
)?;

let executor = BlockExecutor::<
SignatureVerifiedTransaction,
AptosExecutorTask,
S,
L,
ExecutableTestType,
>::new(config, executor_thread_pool, transaction_commit_listener);
>::new(
config,
executor_thread_pool,
global_module_cache,
transaction_commit_listener,
);

let environment = get_environment_with_delayed_field_optimization_enabled(state_view)?;
let ret = executor.execute_block(environment, signature_verified_block, state_view);
match ret {
Ok(block_output) => {
Expand Down Expand Up @@ -449,7 +520,27 @@ impl BlockAptosVM {
}
}

/// Uses shared thread pool to execute blocks.
pub fn execute_block_on_thread_pool_without_global_module_cache<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
>(
executor_thread_pool: Arc<rayon::ThreadPool>,
signature_verified_block: &[SignatureVerifiedTransaction],
state_view: &S,
config: BlockExecutorConfig,
transaction_commit_listener: Option<L>,
) -> Result<BlockOutput<TransactionOutput>, VMStatus> {
Self::execute_block_on_thread_pool::<S, L>(
executor_thread_pool,
signature_verified_block,
state_view,
Arc::new(ImmutableModuleCache::empty()),
config,
transaction_commit_listener,
)
}

/// Uses shared thread pool and shared global module cache to execute blocks.
pub fn execute_block<
S: StateView + Sync,
L: TransactionCommitHook<Output = AptosTransactionOutput>,
Expand All @@ -463,8 +554,53 @@ impl BlockAptosVM {
Arc::clone(&RAYON_EXEC_POOL),
signature_verified_block,
state_view,
Arc::clone(&GLOBAL_MODULE_CACHE),
config,
transaction_commit_listener,
)
}
}

#[cfg(test)]
mod test {
use super::*;
use aptos_block_executor::code_cache_global::ImmutableModuleCache;
use aptos_language_e2e_tests::data_store::FakeDataStore;
use aptos_types::on_chain_config::{FeatureFlag, Features};
use aptos_vm_environment::environment::AptosEnvironment;
use claims::assert_ok;
use move_vm_types::code::mock_verified_code;

#[test]
fn test_cross_block_module_cache_flush() {
let global_module_cache = ImmutableModuleCache::empty();

global_module_cache.insert(0, mock_verified_code(0, None));
assert_eq!(global_module_cache.size(), 1);

global_module_cache.flush_unchecked();
assert_eq!(global_module_cache.size(), 0);

// Now check that cache is flushed when the environment is flushed.
let mut state_view = FakeDataStore::default();
let env_old = AptosEnvironment::new_with_delayed_field_optimization_enabled(&state_view);

for i in 0..10 {
global_module_cache.insert(i, mock_verified_code(i, None));
}
assert_eq!(global_module_cache.size(), 10);

let mut features = Features::default();
features.disable(FeatureFlag::KEYLESS_ACCOUNTS);
state_view.set_features(features);

// New environment means we need to also flush global caches - to invalidate struct name
// indices.
let env_new = assert_ok!(get_environment_with_delayed_field_optimization_enabled(
&state_view,
&global_module_cache,
));
assert!(env_old != env_new);
assert_eq!(global_module_cache.size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl<S: StateView + Sync + Send + 'static> ShardedExecutorService<S> {
);
});
s.spawn(move |_| {
let ret = BlockAptosVM::execute_block_on_thread_pool(
let ret = BlockAptosVM::execute_block_on_thread_pool_without_global_module_cache(
executor_thread_pool,
&signature_verified_transactions,
aggr_overridden_state_view.as_ref(),
Expand Down
2 changes: 1 addition & 1 deletion aptos-move/block-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-mvhashmap = { workspace = true }
aptos-types = { workspace = true }
aptos-vm-environment = { workspace = true }
aptos-vm-logging = { workspace = true }
aptos-vm-types = { workspace = true }
arc-swap = { workspace = true }
Expand Down Expand Up @@ -62,6 +61,7 @@ rand = { workspace = true }
test-case = { workspace = true }

[features]
testing = []
fuzzing = ["criterion", "proptest", "proptest-derive"]

[[bench]]
Expand Down
12 changes: 6 additions & 6 deletions aptos-move/block-executor/src/captured_reads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
cross_block_caches::ImmutableModuleCache, types::InputOutputKey,
code_cache_global::ImmutableModuleCache, types::InputOutputKey,
value_exchange::filter_value_for_exchange,
};
use anyhow::bail;
Expand Down Expand Up @@ -873,17 +873,17 @@ where
#[cfg(test)]
mod test {
use super::*;
use crate::{
proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType},
types::test_types::{mock_deserialized_code, mock_verified_code},
};
use crate::proptest_types::types::{raw_metadata, KeyType, MockEvent, ValueType};
use aptos_mvhashmap::{types::StorageVersion, MVHashMap};
use aptos_types::executable::ExecutableTestType;
use claims::{
assert_err, assert_gt, assert_matches, assert_none, assert_ok, assert_ok_eq, assert_some_eq,
};
use move_vm_types::{
code::{MockDeserializedCode, MockVerifiedCode, ModuleCache},
code::{
mock_deserialized_code, mock_verified_code, MockDeserializedCode, MockVerifiedCode,
ModuleCache,
},
delayed_values::delayed_field_id::DelayedFieldID,
};
use test_case::test_case;
Expand Down
3 changes: 1 addition & 2 deletions aptos-move/block-executor/src/code_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use crate::{
captured_reads::CacheRead,
cross_block_caches::get_global_module_cache,
view::{LatestView, ViewState},
};
use ambassador::delegate_to_methods;
Expand Down Expand Up @@ -146,7 +145,7 @@ impl<'a, T: Transaction, S: TStateView<Key = T::Key>, X: Executable> ModuleCache
> {
// First, look up the module in the cross-block global module cache. Record the read for
// later validation in case the read module is republished.
if let Some(module) = get_global_module_cache().get(key) {
if let Some(module) = self.global_module_cache.get(key) {
match &self.latest_view {
ViewState::Sync(state) => state
.captured_reads
Expand Down
Loading

0 comments on commit bb3e0de

Please sign in to comment.