Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Rewrite ans processor with SDK #570

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions rust/processor/src/processors/ans_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async fn insert_to_db(
Ok(())
}

fn insert_current_ans_lookups_query(
pub fn insert_current_ans_lookups_query(
item_to_insert: Vec<CurrentAnsLookup>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -211,7 +211,7 @@ fn insert_current_ans_lookups_query(
)
}

fn insert_ans_lookups_query(
pub fn insert_ans_lookups_query(
item_to_insert: Vec<AnsLookup>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -228,7 +228,7 @@ fn insert_ans_lookups_query(
)
}

fn insert_current_ans_primary_names_query(
pub fn insert_current_ans_primary_names_query(
item_to_insert: Vec<CurrentAnsPrimaryName>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -253,7 +253,7 @@ fn insert_current_ans_primary_names_query(
)
}

fn insert_ans_primary_names_query(
pub fn insert_ans_primary_names_query(
item_to_insert: Vec<AnsPrimaryName>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -270,7 +270,7 @@ fn insert_ans_primary_names_query(
)
}

fn insert_current_ans_lookups_v2_query(
pub fn insert_current_ans_lookups_v2_query(
item_to_insert: Vec<CurrentAnsLookupV2>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -296,7 +296,7 @@ fn insert_current_ans_lookups_v2_query(
)
}

fn insert_ans_lookups_v2_query(
pub fn insert_ans_lookups_v2_query(
item_to_insert: Vec<AnsLookupV2>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -317,7 +317,7 @@ fn insert_ans_lookups_v2_query(
)
}

fn insert_current_ans_primary_names_v2_query(
pub fn insert_current_ans_primary_names_v2_query(
item_to_insert: Vec<CurrentAnsPrimaryNameV2>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand All @@ -342,7 +342,7 @@ fn insert_current_ans_primary_names_v2_query(
)
}

fn insert_ans_primary_names_v2_query(
pub fn insert_ans_primary_names_v2_query(
items_to_insert: Vec<AnsPrimaryNameV2>,
) -> (
impl QueryFragment<Pg> + diesel::query_builder::QueryId + Send,
Expand Down Expand Up @@ -470,7 +470,7 @@ impl ProcessorTrait for AnsProcessor {
}
}

fn parse_ans(
pub fn parse_ans(
transactions: &[Transaction],
ans_v1_primary_names_table_handle: String,
ans_v1_name_records_table_handle: String,
Expand Down
5 changes: 5 additions & 0 deletions rust/sdk-processor/src/config/indexer_processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ impl RunnableConfig for IndexerProcessorConfig {
let fungible_asset_processor = FungibleAssetProcessor::new(self.clone()).await?;
fungible_asset_processor.run_processor().await
},
ProcessorConfig::AnsProcessor(_) => {
// let ans_processor = AnsProcessor::new(self.clone()).await?;
// ans_processor.run_processor().await
Ok(())
},
}
}

Expand Down
3 changes: 3 additions & 0 deletions rust/sdk-processor/src/config/processor_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use ahash::AHashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;

use crate::processors::ans_processor::AnsProcessorConfig;

/// This enum captures the configs for all the different processors that are defined.
///
/// The configs for each processor should only contain configuration specific to that
Expand Down Expand Up @@ -37,6 +39,7 @@ use std::collections::HashSet;
pub enum ProcessorConfig {
EventsProcessor(DefaultProcessorConfig),
FungibleAssetProcessor(DefaultProcessorConfig),
AnsProcessor(AnsProcessorConfig),
}

impl ProcessorConfig {
Expand Down
146 changes: 146 additions & 0 deletions rust/sdk-processor/src/processors/ans_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use crate::{
config::{
db_config::DbConfig, indexer_processor_config::IndexerProcessorConfig,
processor_config::{DefaultProcessorConfig, ProcessorConfig},
},
steps::{
ans_processor::{AnsExtractor, AnsStorer},
common::get_processor_status_saver,
},
utils::{
chain_id::check_or_update_chain_id,
database::{new_db_pool, run_migrations, ArcDbPool},
starting_version::get_starting_version,
},
};
use serde::{Deserialize, Serialize};
use anyhow::Result;
use aptos_indexer_processor_sdk::{
aptos_indexer_transaction_stream::{TransactionStream, TransactionStreamConfig},
builder::ProcessorBuilder,
common_steps::{
TransactionStreamStep, VersionTrackerStep, DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
},
traits::{processor_trait::ProcessorTrait, IntoRunnableStep},
};
use tracing::{debug, info};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct AnsProcessorConfig {
#[serde(flatten)]
pub default: DefaultProcessorConfig,
pub ans_v1_primary_names_table_handle: String,
pub ans_v1_name_records_table_handle: String,
pub ans_v2_contract_address: String,
}
pub struct AnsProcessor {
pub config: IndexerProcessorConfig,
pub db_pool: ArcDbPool,
}

impl AnsProcessor {
pub async fn new(config: IndexerProcessorConfig) -> Result<Self> {
match config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
let conn_pool = new_db_pool(
&postgres_config.connection_string,
Some(postgres_config.db_pool_size),
)
.await
.map_err(|e| {
anyhow::anyhow!(
"Failed to create connection pool for PostgresConfig: {:?}",
e
)
})?;

Ok(Self {
config,
db_pool: conn_pool,
})
},
}
}
}

#[async_trait::async_trait]
impl ProcessorTrait for AnsProcessor {
fn name(&self) -> &'static str {
self.config.processor_config.name()
}

async fn run_processor(&self) -> Result<()> {
// Run migrations
match self.config.db_config {
DbConfig::PostgresConfig(ref postgres_config) => {
run_migrations(
postgres_config.connection_string.clone(),
self.db_pool.clone(),
)
.await;
},
}

// Merge the starting version from config and the latest processed version from the DB.
let starting_version = get_starting_version(&self.config, self.db_pool.clone()).await?;

// Check and update the ledger chain id to ensure we're indexing the correct chain.
let grpc_chain_id = TransactionStream::new(self.config.transaction_stream_config.clone())
.await?
.get_chain_id()
.await?;
check_or_update_chain_id(grpc_chain_id as i64, self.db_pool.clone()).await?;

let processor_config = match self.config.processor_config.clone() {
ProcessorConfig::AnsProcessor(processor_config) => processor_config,
_ => {
return Err(anyhow::anyhow!(
"Invalid processor config for Account Transactions Processor: {:?}",
self.config.processor_config
))
},
};
let channel_size = processor_config.channel_size;
let deprecated_table_flags = TableFlags::from_set(&processor_config.deprecated_tables);

// Define processor steps.
let transaction_stream = TransactionStreamStep::new(TransactionStreamConfig {
starting_version: Some(starting_version),
..self.config.transaction_stream_config.clone()
})
.await?;
let acc_txns_extractor = AnsExtractor {deprecated_table_flags, AnsProcessorConfig {

}};
let acc_txns_storer = AnsStorer::new(self.db_pool.clone(), processor_config);
let version_tracker = VersionTrackerStep::new(
get_processor_status_saver(self.db_pool.clone(), self.config.clone()),
DEFAULT_UPDATE_PROCESSOR_STATUS_SECS,
);

// Connect processor steps together.
let (_, buffer_receiver) = ProcessorBuilder::new_with_inputless_first_step(
transaction_stream.into_runnable_step(),
)
.connect_to(acc_txns_extractor.into_runnable_step(), channel_size)
.connect_to(acc_txns_storer.into_runnable_step(), channel_size)
.connect_to(version_tracker.into_runnable_step(), channel_size)
.end_and_return_output_receiver(channel_size);

loop {
match buffer_receiver.recv().await {
Ok(txn_context) => {
debug!(
"Finished processing transactions from versions [{:?}, {:?}]",
txn_context.metadata.start_version, txn_context.metadata.end_version,
);
},
Err(e) => {
info!("No more transactions in channel: {:?}", e);
break Ok(());
},
}
}
}
}
1 change: 1 addition & 0 deletions rust/sdk-processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod ans_processor;
pub mod events_processor;
pub mod fungible_asset_processor;
135 changes: 135 additions & 0 deletions rust/sdk-processor/src/steps/ans_processor/ans_extractor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use aptos_indexer_processor_sdk::{
aptos_protos::transaction::v1::Transaction,
traits::{async_step::AsyncRunType, AsyncStep, NamedStep, Processable},
types::transaction_context::TransactionContext,
utils::errors::ProcessorError,
};
use async_trait::async_trait;
use processor::{
db::common::models::ans_models::{
ans_lookup::{AnsLookup, AnsPrimaryName, CurrentAnsLookup, CurrentAnsPrimaryName},
ans_lookup_v2::{
AnsLookupV2, AnsPrimaryNameV2, CurrentAnsLookupV2, CurrentAnsPrimaryNameV2,
},
},
processors::ans_processor::{parse_ans, AnsProcessorConfig},
worker::TableFlags,
};

pub struct AnsExtractor
where
Self: Sized + Send + 'static,
{
deprecated_table_flags: TableFlags,
config: AnsProcessorConfig,
}

impl AnsExtractor {
pub fn new(deprecated_table_flags: TableFlags, config: AnsProcessorConfig) -> Self {
Self {
deprecated_table_flags,
config,
}
}
}

#[async_trait]
impl Processable for AnsExtractor {
type Input = Vec<Transaction>;
type Output = (
Vec<CurrentAnsLookup>,
Vec<AnsLookup>,
Vec<CurrentAnsPrimaryName>,
Vec<AnsPrimaryName>,
Vec<CurrentAnsLookupV2>,
Vec<AnsLookupV2>,
Vec<CurrentAnsPrimaryNameV2>,
Vec<AnsPrimaryNameV2>,
);
type RunType = AsyncRunType;

async fn process(
&mut self,
input: TransactionContext<Vec<Transaction>>,
) -> Result<
Option<
TransactionContext<(
Vec<CurrentAnsLookup>,
Vec<AnsLookup>,
Vec<CurrentAnsPrimaryName>,
Vec<AnsPrimaryName>,
Vec<CurrentAnsLookupV2>,
Vec<AnsLookupV2>,
Vec<CurrentAnsPrimaryNameV2>,
Vec<AnsPrimaryNameV2>,
)>,
>,
ProcessorError,
> {
let (
mut all_current_ans_lookups,
mut all_ans_lookups,
mut all_current_ans_primary_names,
mut all_ans_primary_names,
all_current_ans_lookups_v2,
all_ans_lookups_v2,
all_current_ans_primary_names_v2,
mut all_ans_primary_names_v2,
) = parse_ans(
&input.data,
self.config.ans_v1_primary_names_table_handle.clone(),
self.config.ans_v1_name_records_table_handle.clone(),
self.config.ans_v2_contract_address.clone(),
);

if self
.deprecated_table_flags
.contains(TableFlags::ANS_PRIMARY_NAME)
{
all_ans_primary_names.clear();
}
if self
.deprecated_table_flags
.contains(TableFlags::ANS_PRIMARY_NAME_V2)
{
all_ans_primary_names_v2.clear();
}
if self.deprecated_table_flags.contains(TableFlags::ANS_LOOKUP) {
all_ans_lookups.clear();
}
if self
.deprecated_table_flags
.contains(TableFlags::CURRENT_ANS_LOOKUP)
{
all_current_ans_lookups.clear();
}
if self
.deprecated_table_flags
.contains(TableFlags::CURRENT_ANS_PRIMARY_NAME)
{
all_current_ans_primary_names.clear();
}

Ok(Some(TransactionContext {
data: (
all_current_ans_lookups,
all_ans_lookups,
all_current_ans_primary_names,
all_ans_primary_names,
all_current_ans_lookups_v2,
all_ans_lookups_v2,
all_current_ans_primary_names_v2,
all_ans_primary_names_v2,
),
metadata: input.metadata,
}))
}
}

impl AsyncStep for AnsExtractor {}

impl NamedStep for AnsExtractor {
fn name(&self) -> String {
"AnsExtractor".to_string()
}
}
Loading
Loading