Skip to content

Commit

Permalink
Move parsing of subscribable events off critical path
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 24, 2024
1 parent f6d1998 commit 3e96013
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 24 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions execution/executor-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-crypto = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-infallible = { workspace = true }
aptos-scratchpad = { workspace = true }
aptos-secure-net = { workspace = true }
aptos-storage-interface = { workspace = true }
Expand All @@ -24,6 +25,8 @@ bcs = { workspace = true }
criterion = { workspace = true }
derive_more = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }

Expand Down
12 changes: 6 additions & 6 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#![forbid(unsafe_code)]

use crate::transactions_with_output::TransactionsWithOutput;
use crate::{planned::Planned, transactions_with_output::TransactionsWithOutput};
use aptos_drop_helper::DropHelper;
use aptos_storage_interface::{cached_state_view::StateCache, state_delta::StateDelta};
use aptos_types::{
Expand Down Expand Up @@ -34,7 +34,7 @@ impl ExecutionOutput {
state_cache: StateCache,
block_end_info: Option<BlockEndInfo>,
next_epoch_state: Option<EpochState>,
subscribable_events: Vec<ContractEvent>,
subscribable_events: Planned<Vec<ContractEvent>>,
) -> Self {
if is_block {
// If it's a block, ensure it ends with state checkpoint.
Expand Down Expand Up @@ -73,7 +73,7 @@ impl ExecutionOutput {
state_cache: StateCache::new_empty(state.current.clone()),
block_end_info: None,
next_epoch_state: None,
subscribable_events: vec![],
subscribable_events: Planned::ready(vec![]),
})
}

Expand All @@ -90,7 +90,7 @@ impl ExecutionOutput {
state_cache: StateCache::new_dummy(),
block_end_info: None,
next_epoch_state: None,
subscribable_events: vec![],
subscribable_events: Planned::ready(vec![]),
})
}

Expand All @@ -109,7 +109,7 @@ impl ExecutionOutput {
state_cache: StateCache::new_dummy(),
block_end_info: None,
next_epoch_state: self.next_epoch_state.clone(),
subscribable_events: vec![],
subscribable_events: Planned::ready(vec![]),
})
}

Expand Down Expand Up @@ -155,7 +155,7 @@ pub struct Inner {
/// Only present if the block is the last block of an epoch, and is parsed output of the
/// state cache.
pub next_epoch_state: Option<EpochState>,
pub subscribable_events: Vec<ContractEvent>,
pub subscribable_events: Planned<Vec<ContractEvent>>,
}

impl Inner {
Expand Down
1 change: 1 addition & 0 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
mod error;
pub mod execution_output;
mod ledger_update_output;
pub mod planned;
pub mod state_checkpoint_output;
pub mod state_compute_result;
pub mod transactions_with_output;
Expand Down
75 changes: 75 additions & 0 deletions execution/executor-types/src/planned.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_infallible::Mutex;
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use std::{ops::Deref, sync::mpsc::Receiver};

#[derive(Debug)]
pub struct Planned<T> {
value: OnceCell<T>,
rx: OnceCell<Mutex<Receiver<T>>>,
}

impl<T> Planned<T> {
pub fn place_holder() -> Self {
Self {
value: OnceCell::new(),
rx: OnceCell::new(),
}
}

pub fn plan(&self, thread_pool: &ThreadPool, getter: impl FnOnce() -> T + Send + 'static)
where
T: Send + 'static,
{
let (tx, rx) = std::sync::mpsc::channel();

thread_pool.spawn(move || {
tx.send(getter()).ok();
});

self.rx.set(Mutex::new(rx)).expect("Already planned.");
}

pub fn ready(t: T) -> Self {
Self {
value: OnceCell::with_value(t),
rx: OnceCell::new(),
}
}

pub fn get(&self) -> &T {
if let Some(t) = self.value.get() {
t
} else {
let rx = self.rx.get().expect("Not planned").lock();
if self.value.get().is_none() {
let t = rx.recv().expect("Plan failed.");
self.value.set(t).map_err(|_| "").expect("Already set.");
}
self.value.get().expect("Must have been set.")
}
}
}

impl<T> Deref for Planned<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.get()
}
}

pub trait Plan {
fn plan<T: Send + 'static>(&self, getter: impl FnOnce() -> T + Send + 'static) -> Planned<T>;
}

impl Plan for ThreadPool {
fn plan<T: Send + 'static>(&self, getter: impl FnOnce() -> T + Send + 'static) -> Planned<T> {
let planned = Planned::<T>::place_holder();
planned.plan(self, getter);
planned
}
}
43 changes: 25 additions & 18 deletions execution/executor/src/workflow/do_get_execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use aptos_executor_service::{
remote_executor_client::{get_remote_addresses, REMOTE_SHARDED_BLOCK_EXECUTOR},
};
use aptos_executor_types::{
execution_output::ExecutionOutput, should_forward_to_subscription_service,
execution_output::ExecutionOutput, planned::Planned, should_forward_to_subscription_service,
transactions_with_output::TransactionsWithOutput,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
Expand All @@ -24,6 +24,7 @@ use aptos_types::{
config::BlockExecutorConfigFromOnchain,
partitioner::{ExecutableTransactions, PartitionedTransactions},
},
contract_event::ContractEvent,
epoch_state::EpochState,
on_chain_config::{ConfigurationResource, OnChainConfig, ValidatorSet},
state_store::{
Expand Down Expand Up @@ -311,21 +312,8 @@ impl Parser {
.then(|| Self::ensure_next_epoch_state(&to_commit))
.transpose()?
};
let subscribable_events = {
let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__subscribable_events"]);
to_commit
.transaction_outputs()
.iter()
.flat_map(|o| {
o.events()
.iter()
.filter(|e| should_forward_to_subscription_service(e))
})
.cloned()
.collect_vec()
};

Ok(ExecutionOutput::new(
let out = ExecutionOutput::new(
is_block,
first_version,
statuses_for_input_txns,
Expand All @@ -335,8 +323,24 @@ impl Parser {
state_cache,
block_end_info,
next_epoch_state,
subscribable_events,
))
Planned::place_holder(),
);
let ret = out.clone();
ret.subscribable_events
.plan(THREAD_MANAGER.get_non_exe_cpu_pool(), move || {
Self::get_subscribable_events(&out)
});
Ok(ret)
}

fn get_subscribable_events(out: &ExecutionOutput) -> Vec<ContractEvent> {
out.to_commit
.transaction_outputs
.iter()
.flat_map(TransactionOutput::events)
.filter(|e| should_forward_to_subscription_service(e))
.cloned()
.collect_vec()
}

fn extract_retries(
Expand Down Expand Up @@ -543,6 +547,9 @@ mod tests {
];
let execution_output =
Parser::parse(0, txns, txn_outs, StateCache::new_dummy(), None, None).unwrap();
assert_eq!(vec![event_0, event_2], execution_output.subscribable_events);
assert_eq!(
vec![event_0, event_2],
*execution_output.subscribable_events
);
}
}

0 comments on commit 3e96013

Please sign in to comment.