Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add version to BeaconEngineMessage FCU #12089

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions bin/reth/src/commands/debug_cmd/replay_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use reth_engine_util::engine_store::{EngineMessageStore, StoredEngineApiMessage}
use reth_fs_util as fs;
use reth_network::{BlockDownloaderProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_node_api::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_api::{
EngineApiMessageVersion, NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine,
};
use reth_node_ethereum::{EthEngineTypes, EthEvmConfig, EthExecutorProvider};
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_provider::{
Expand Down Expand Up @@ -166,8 +168,13 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
debug!(target: "reth::cli", filepath = %filepath.display(), ?message, "Forwarding Engine API message");
match message {
StoredEngineApiMessage::ForkchoiceUpdated { state, payload_attrs } => {
let response =
beacon_engine_handle.fork_choice_updated(state, payload_attrs).await?;
let response = beacon_engine_handle
.fork_choice_updated(
state,
payload_attrs,
EngineApiMessageVersion::default(),
)
.await?;
debug!(target: "reth::cli", ?response, "Received for forkchoice updated");
}
StoredEngineApiMessage::NewPayload { payload, sidecar } => {
Expand Down
3 changes: 2 additions & 1 deletion crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use alloy_rpc_types_engine::ForkchoiceState;
use futures_util::{future::BoxFuture, FutureExt};
use reth_beacon_consensus::{BeaconEngineMessage, ForkchoiceStatus};
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_evm::execute::BlockExecutorProvider;
use reth_provider::{CanonChainTracker, StateProviderFactory};
use reth_stages_api::PipelineEvent;
Expand Down Expand Up @@ -155,6 +155,7 @@ where
state,
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
});
debug!(target: "consensus::auto", ?state, "Sent fork choice update");

Expand Down
7 changes: 5 additions & 2 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::{
ExecutionPayload, ExecutionPayloadSidecar, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use futures::TryFutureExt;
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
Expand Down Expand Up @@ -60,9 +60,10 @@ where
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
Ok(self
.send_fork_choice_updated(state, payload_attrs)
.send_fork_choice_updated(state, payload_attrs, version)
.map_err(|_| BeaconForkChoiceUpdateError::EngineUnavailable)
.await??
.await?)
Expand All @@ -74,12 +75,14 @@ where
&self,
state: ForkchoiceState,
payload_attrs: Option<Engine::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> oneshot::Receiver<RethResult<OnForkChoiceUpdated>> {
let (tx, rx) = oneshot::channel();
let _ = self.to_engine.send(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
});
rx
}
Expand Down
4 changes: 3 additions & 1 deletion crates/consensus/beacon/src/engine/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy_rpc_types_engine::{
ForkchoiceUpdateError, ForkchoiceUpdated, PayloadId, PayloadStatus, PayloadStatusEnum,
};
use futures::{future::Either, FutureExt};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::RethResult;
use reth_payload_primitives::PayloadBuilderError;
use std::{
Expand Down Expand Up @@ -156,6 +156,8 @@ pub enum BeaconEngineMessage<Engine: EngineTypes> {
state: ForkchoiceState,
/// The payload attributes for block building.
payload_attrs: Option<Engine::PayloadAttributes>,
/// The Engine API Version.
version: EngineApiMessageVersion,
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<RethResult<OnForkChoiceUpdated>>,
},
Expand Down
17 changes: 14 additions & 3 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_blockchain_tree_api::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
};
use reth_engine_primitives::{EngineTypes, PayloadTypes};
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes, PayloadTypes};
use reth_errors::{BlockValidationError, ProviderResult, RethError, RethResult};
use reth_network_p2p::{
sync::{NetworkSyncUpdater, SyncState},
Expand Down Expand Up @@ -428,7 +428,12 @@ where
} else if let Some(attrs) = attrs {
// the CL requested to build a new payload on top of this new VALID head
let head = outcome.into_header().unseal();
self.process_payload_attributes(attrs, head, state)
self.process_payload_attributes(
attrs,
head,
state,
EngineApiMessageVersion::default(),
)
} else {
OnForkChoiceUpdated::valid(PayloadStatus::new(
PayloadStatusEnum::Valid,
Expand Down Expand Up @@ -1160,6 +1165,7 @@ where
attrs: <N::Engine as PayloadTypes>::PayloadAttributes,
head: Header,
state: ForkchoiceState,
_version: EngineApiMessageVersion,
) -> OnForkChoiceUpdated {
// 7. Client software MUST ensure that payloadAttributes.timestamp is greater than timestamp
// of a block referenced by forkchoiceState.headBlockHash. If this condition isn't held
Expand Down Expand Up @@ -1855,7 +1861,12 @@ where
// sensitive, hence they are polled first.
if let Poll::Ready(Some(msg)) = this.engine_message_stream.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version: _version,
} => {
this.on_forkchoice_updated(state, payload_attrs, tx);
}
BeaconEngineMessage::NewPayload { payload, sidecar, tx } => {
Expand Down
10 changes: 8 additions & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_engine_primitives::EngineApiMessageVersion;
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::{either::Either, test_utils::MockExecutorProvider};
use reth_evm_ethereum::execute::EthExecutorProvider;
Expand Down Expand Up @@ -93,7 +94,9 @@ impl<DB> TestEnv<DB> {
&self,
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
self.engine_handle.fork_choice_updated(state, None).await
self.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await
}

/// Sends the `ForkchoiceUpdated` message to the consensus engine and retries if the engine
Expand All @@ -103,7 +106,10 @@ impl<DB> TestEnv<DB> {
state: ForkchoiceState,
) -> Result<ForkchoiceUpdated, BeaconForkChoiceUpdateError> {
loop {
let result = self.engine_handle.fork_choice_updated(state, None).await?;
let result = self
.engine_handle
.fork_choice_updated(state, None, EngineApiMessageVersion::default())
.await?;
if !result.is_syncing() {
return Ok(result)
}
Expand Down
4 changes: 3 additions & 1 deletion crates/engine/local/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use eyre::OptionExt;
use futures_util::{stream::Fuse, StreamExt};
use reth_beacon_consensus::BeaconEngineMessage;
use reth_chainspec::EthereumHardforks;
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{
BuiltPayload, PayloadAttributesBuilder, PayloadBuilder, PayloadKind, PayloadTypes,
Expand Down Expand Up @@ -167,6 +167,7 @@ where
state: self.forkchoice_state(),
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
})?;

let res = rx.await??;
Expand All @@ -193,6 +194,7 @@ where
state: self.forkchoice_state(),
payload_attrs: Some(self.payload_attributes_builder.build(timestamp)),
tx,
version: EngineApiMessageVersion::default(),
})?;

let res = rx.await??.await?;
Expand Down
23 changes: 17 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use reth_chain_state::{
};
use reth_chainspec::EthereumHardforks;
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::BlockExecutorProvider;
use reth_payload_builder::PayloadBuilderHandle;
Expand Down Expand Up @@ -969,6 +969,7 @@ where
&mut self,
state: ForkchoiceState,
attrs: Option<T::PayloadAttributes>,
version: EngineApiMessageVersion,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine::tree", ?attrs, "invoked forkchoice update");
self.metrics.engine.forkchoice_updated_messages.increment(1);
Expand Down Expand Up @@ -1018,7 +1019,7 @@ where
// to return an error
ProviderError::HeaderNotFound(state.head_block_hash.into())
})?;
let updated = self.process_payload_attributes(attr, &tip, state);
let updated = self.process_payload_attributes(attr, &tip, state, version);
return Ok(TreeOutcome::new(updated))
}

Expand All @@ -1038,7 +1039,7 @@ where
}

if let Some(attr) = attrs {
let updated = self.process_payload_attributes(attr, &tip, state);
let updated = self.process_payload_attributes(attr, &tip, state, version);
return Ok(TreeOutcome::new(updated))
}

Expand All @@ -1054,7 +1055,8 @@ where
if self.engine_kind.is_opstack() {
if let Some(attr) = attrs {
debug!(target: "engine::tree", head = canonical_header.number, "handling payload attributes for canonical head");
let updated = self.process_payload_attributes(attr, &canonical_header, state);
let updated =
self.process_payload_attributes(attr, &canonical_header, state, version);
return Ok(TreeOutcome::new(updated))
}
}
Expand Down Expand Up @@ -1206,8 +1208,14 @@ where
}
EngineApiRequest::Beacon(request) => {
match request {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
let mut output = self.on_forkchoice_updated(state, payload_attrs);
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
} => {
let mut output =
self.on_forkchoice_updated(state, payload_attrs, version);

if let Ok(res) = &mut output {
// track last received forkchoice state
Expand Down Expand Up @@ -2484,6 +2492,7 @@ where
attrs: T::PayloadAttributes,
head: &Header,
state: ForkchoiceState,
_version: EngineApiMessageVersion,
) -> OnForkChoiceUpdated {
// 7. Client software MUST ensure that payloadAttributes.timestamp is greater than timestamp
// of a block referenced by forkchoiceState.headBlockHash. If this condition isn't held
Expand Down Expand Up @@ -2808,6 +2817,7 @@ mod tests {
state: fcu_state,
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
}
.into(),
))
Expand Down Expand Up @@ -3097,6 +3107,7 @@ mod tests {
},
payload_attrs: None,
tx,
version: EngineApiMessageVersion::default(),
}
.into(),
))
Expand Down
7 changes: 6 additions & 1 deletion crates/engine/util/src/engine_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,12 @@ impl EngineMessageStore {
fs::create_dir_all(&self.path)?; // ensure that store path had been created
let timestamp = received_at.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_millis();
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx: _tx } => {
BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx: _tx,
version: _version,
} => {
let filename = format!("{}-fcu-{}.json", timestamp, state.head_block_hash);
fs::write(
self.path.join(filename),
Expand Down
20 changes: 17 additions & 3 deletions crates/engine/util/src/reorg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use alloy_rpc_types_engine::{
use futures::{stream::FuturesUnordered, Stream, StreamExt, TryFutureExt};
use itertools::Either;
use reth_beacon_consensus::{BeaconEngineMessage, BeaconOnNewPayloadError, OnForkChoiceUpdated};
use reth_engine_primitives::EngineTypes;
use reth_engine_primitives::{EngineApiMessageVersion, EngineTypes};
use reth_errors::{BlockExecutionError, BlockValidationError, RethError, RethResult};
use reth_ethereum_forks::EthereumHardforks;
use reth_evm::{
Expand Down Expand Up @@ -211,18 +211,32 @@ where
state: reorg_forkchoice_state,
payload_attrs: None,
tx: reorg_fcu_tx,
version: EngineApiMessageVersion::default(),
},
]);
*this.state = EngineReorgState::Reorg { queue };
continue
}
(Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }), _) => {
(
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}),
_,
) => {
// Record last forkchoice state forwarded to the engine.
// We do not care if it's valid since engine should be able to handle
// reorgs that rely on invalid forkchoice state.
*this.last_forkchoice_state = Some(state);
*this.forkchoice_states_forwarded += 1;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
(item, _) => item,
};
Expand Down
14 changes: 12 additions & 2 deletions crates/engine/util/src/skip_fcu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,25 @@ where
loop {
let next = ready!(this.stream.poll_next_unpin(cx));
let item = match next {
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx }) => {
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
}) => {
if this.skipped < this.threshold {
*this.skipped += 1;
tracing::warn!(target: "engine::stream::skip_fcu", ?state, ?payload_attrs, threshold=this.threshold, skipped=this.skipped, "Skipping FCU");
let _ = tx.send(Ok(OnForkChoiceUpdated::syncing()));
continue
}
*this.skipped = 0;
Some(BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx })
Some(BeaconEngineMessage::ForkchoiceUpdated {
state,
payload_attrs,
tx,
version,
})
}
next => next,
};
Expand Down
11 changes: 6 additions & 5 deletions crates/payload/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,22 +324,23 @@ where
}

/// The version of Engine API message.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)]
pub enum EngineApiMessageVersion {
/// Version 1
V1,
V1 = 1,
/// Version 2
///
/// Added in the Shanghai hardfork.
V2,
V2 = 2,
/// Version 3
///
/// Added in the Cancun hardfork.
V3,
#[default]
V3 = 3,
/// Version 4
///
/// Added in the Prague hardfork.
V4,
V4 = 4,
Comment on lines 329 to +343
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep we can solve this like this

}

/// Determines how we should choose the payload to return.
Expand Down
Loading