Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
refactor: operator struct replaces global mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed May 30, 2023
1 parent 0441de1 commit a603abe
Show file tree
Hide file tree
Showing 19 changed files with 1,382 additions and 1,102 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ boot_node_addr.conf
data
*.swp
.vscode
poi-radio-e2e-tests
*.json
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chrono = "0.4"
serde = "1.0.163"
serde_json = "1.0.96"
sha3 = "0.10.8"
derive-getters = "0.2.1"
tokio = { version = "1.28.1", features = ["full", "rt"] }
# tokio = { version = "1.28", features = ["full", "tracing", "rt", "parking_lot"] }
anyhow = "1.0"
Expand Down
47 changes: 19 additions & 28 deletions benches/attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ extern crate criterion;

mod attestation {
use criterion::{black_box, criterion_group, Criterion};
use poi_radio::attestation::{
use poi_radio::operator::attestation::{
compare_attestations, local_comparison_point, update_blocks, Attestation,
};
use std::{collections::HashMap, sync::Arc};
use tokio::{runtime::Runtime, sync::Mutex as AsyncMutex, task};
use std::{
collections::HashMap,
sync::{Arc, Mutex as SyncMutex},
};

criterion_group!(
benches,
Expand Down Expand Up @@ -85,18 +87,15 @@ mod attestation {
black_box(remote_attestations.insert("my-awesome-hash".to_string(), remote_blocks));
black_box(local_attestations.insert("my-awesome-hash".to_string(), local_blocks));

let rt = Runtime::new().unwrap();
rt.block_on(async {
c.bench_function("compare_attestations", |b| {
b.iter(|| {
task::spawn(compare_attestations(
42,
black_box(remote_attestations.clone()),
black_box(Arc::new(AsyncMutex::new(local_attestations.clone()))),
"my-awesome-hash",
))
})
});
c.bench_function("compare_attestations", |b| {
b.iter(|| {
compare_attestations(
42,
black_box(remote_attestations.clone()),
black_box(Arc::new(SyncMutex::new(local_attestations.clone()))),
"my-awesome-hash",
)
})
});
}

Expand Down Expand Up @@ -131,19 +130,11 @@ mod attestation {
black_box(HashMap::new());
black_box(local_attestations.insert("hash".to_string(), local_blocks.clone()));
black_box(local_attestations.insert("hash2".to_string(), local_blocks));
let local = black_box(Arc::new(AsyncMutex::new(local_attestations)));

let rt = Runtime::new().unwrap();
rt.block_on(async {
c.bench_function("comparison_point", |b| {
b.iter(|| {
task::spawn(local_comparison_point(
black_box(local.clone()),
"hash".to_string(),
120,
))
})
});
let local: Arc<SyncMutex<HashMap<String, HashMap<u64, Attestation>>>> =
black_box(Arc::new(SyncMutex::new(local_attestations)));

c.bench_function("comparison_point", |b| {
b.iter(|| local_comparison_point(black_box(local.clone()), "hash".to_string(), 120))
});
}
}
Expand Down
35 changes: 16 additions & 19 deletions benches/gossips.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
use criterion::async_executor::FuturesExecutor;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use poi_radio::operator::RadioOperator;

use rand::{thread_rng, Rng};
use secp256k1::SecretKey;
use std::collections::HashMap;
use std::sync::{Arc, Mutex as SyncMutex};
use tokio::sync::Mutex as AsyncMutex;

use graphcast_sdk::networks::NetworkName;
use graphcast_sdk::{BlockPointer, NetworkPointer};
use poi_radio::attestation::LocalAttestationsMap;
use poi_radio::operation::gossip_poi;
use poi_radio::{config::Config, CONFIG};
use poi_radio::config::Config;

fn gossip_poi_bench(c: &mut Criterion) {
let identifiers = black_box(vec!["identifier1".to_string(), "identifier2".to_string()]);
let network_chainhead_blocks: Arc<AsyncMutex<HashMap<NetworkName, BlockPointer>>> =
black_box(Arc::new(AsyncMutex::new(Default::default())));
let network_chainhead_blocks: HashMap<NetworkName, BlockPointer> =
black_box(Default::default());
let subgraph_network_latest_blocks: HashMap<String, NetworkPointer> =
black_box(Default::default());
let local_attestations: Arc<AsyncMutex<LocalAttestationsMap>> =
black_box(Arc::new(AsyncMutex::new(Default::default())));
let pk = black_box(generate_random_private_key());

let config = black_box(Config {
radio_name: String::from("test"),
graph_node_endpoint: String::from("http://localhost:8030/graphql"),
private_key: Some(pk.display_secret().to_string()),
mnemonic: None,
Expand All @@ -48,24 +45,24 @@ fn gossip_poi_bench(c: &mut Criterion) {
discord_webhook: None,
telegram_token: None,
telegram_chat_id: None,
metrics_host: None,
metrics_host: String::from("0.0.0.0"),
metrics_port: None,
server_host: None,
server_host: String::from("0.0.0.0"),
server_port: None,
log_format: String::from("pretty"),
persistence_file_path: None,
});
_ = black_box(CONFIG.set(Arc::new(SyncMutex::new(config))));

c.bench_function("gossip_poi", move |b| {
b.to_async(FuturesExecutor).iter(|| async {
gossip_poi(
identifiers.clone(),
&network_chainhead_blocks,
&subgraph_network_latest_blocks,
local_attestations.clone(),
)
.await
RadioOperator::new(config.clone())
.await
.gossip_poi(
identifiers.clone(),
&network_chainhead_blocks,
&subgraph_network_latest_blocks,
)
.await
})
});
}
Expand Down
90 changes: 77 additions & 13 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::collections::HashSet;

use autometrics::autometrics;
use clap::Parser;
use derive_getters::Getters;
use ethers::signers::WalletError;
use graphcast_sdk::{
build_wallet,
callbook::CallBook,
graphcast_agent::{GraphcastAgent, GraphcastAgentConfig, GraphcastAgentError},
graphcast_id_address,
graphql::{
Expand All @@ -12,17 +17,18 @@ use graphcast_sdk::{
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

use crate::radio_name;
use crate::state::PersistedState;
use crate::{active_allocation_hashes, syncing_deployment_hashes};

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize)]
#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default)]
pub enum CoverageLevel {
Minimal,
#[default]
OnChain,
Comprehensive,
}

#[derive(Clone, Debug, Parser, Serialize, Deserialize)]
#[derive(Clone, Debug, Parser, Serialize, Deserialize, Getters, Default)]
#[clap(
name = "poi-radio",
about = "Cross-check POIs with other Indexer in real time",
Expand Down Expand Up @@ -198,10 +204,11 @@ pub struct Config {
#[clap(
long,
value_name = "METRICS_HOST",
help = "If set, the Radio will expose Prometheus metrics on the given host (off by default). This requires having a local Prometheus server running and scraping metrics on the given port.",
default_value = "0.0.0.0",
help = "If port is set, the Radio will expose Prometheus metrics on the given host. This requires having a local Prometheus server running and scraping metrics on the given port.",
env = "METRICS_HOST"
)]
pub metrics_host: Option<String>,
pub metrics_host: String,
#[clap(
long,
value_name = "METRICS_PORT",
Expand All @@ -212,10 +219,11 @@ pub struct Config {
#[clap(
long,
value_name = "SERVER_HOST",
help = "If set, the Radio will expose API service on the given host (off by default).",
default_value = "0.0.0.0",
help = "If port is set, the Radio will expose API service on the given host.",
env = "SERVER_HOST"
)]
pub server_host: Option<String>,
pub server_host: String,
#[clap(
long,
value_name = "SERVER_PORT",
Expand All @@ -240,6 +248,13 @@ pub struct Config {
default_value = "pretty"
)]
pub log_format: String,
#[clap(
long,
value_name = "RADIO_NAME",
env = "RADIO_NAME",
default_value = "poi-radio"
)]
pub radio_name: String,
}

impl Config {
Expand Down Expand Up @@ -275,14 +290,13 @@ impl Config {

pub async fn to_graphcast_agent_config(
&self,
radio_name: &'static str,
) -> Result<GraphcastAgentConfig, GraphcastAgentError> {
let wallet_key = self.wallet_input().unwrap().to_string();
let topics = self.topics.clone();

GraphcastAgentConfig::new(
wallet_key,
radio_name,
self.radio_name.clone(),
self.registry_subgraph.clone(),
self.network_subgraph.clone(),
self.graph_node_endpoint.clone(),
Expand All @@ -299,15 +313,18 @@ impl Config {

pub async fn basic_info(&self) -> Result<(String, f32), QueryError> {
// Using unwrap directly as the query has been ran in the set-up validation
let wallet = build_wallet(self.wallet_input().unwrap()).unwrap();
let wallet = build_wallet(
self.wallet_input()
.map_err(|e| QueryError::Other(e.into()))?,
)
.map_err(|e| QueryError::Other(e.into()))?;
// The query here must be Ok but so it is okay to panic here
// Alternatively, make validate_set_up return wallet, address, and stake
let my_address = query_registry_indexer(
self.registry_subgraph.to_string(),
graphcast_id_address(&wallet),
)
.await
.unwrap();
.await?;
let my_stake =
query_network_subgraph(self.network_subgraph.to_string(), my_address.clone())
.await
Expand Down Expand Up @@ -339,9 +356,56 @@ impl Config {
}

pub async fn create_graphcast_agent(&self) -> Result<GraphcastAgent, GraphcastAgentError> {
let config = self.to_graphcast_agent_config(radio_name()).await.unwrap();
let config = self.to_graphcast_agent_config().await.unwrap();
GraphcastAgent::new(config).await
}

pub fn callbook(&self) -> CallBook {
CallBook::new(
self.graph_node_endpoint.clone(),
self.registry_subgraph.clone(),
self.network_subgraph.clone(),
)
}

/// Generate a set of unique topics along with given static topics
#[autometrics]
pub async fn generate_topics(&self, indexer_address: String) -> Vec<String> {
let static_topics = HashSet::from_iter(self.topics().to_vec());
let topics = match self.coverage {
CoverageLevel::Minimal => static_topics,
CoverageLevel::OnChain => {
let mut topics: HashSet<String> = active_allocation_hashes(
self.callbook().graph_network(),
indexer_address.clone(),
)
.await
.into_iter()
.collect();
topics.extend(static_topics);
topics
}
CoverageLevel::Comprehensive => {
let active_topics: HashSet<String> = active_allocation_hashes(
self.callbook().graph_network(),
indexer_address.clone(),
)
.await
.into_iter()
.collect();
let mut additional_topics: HashSet<String> =
syncing_deployment_hashes(self.graph_node_endpoint())
.await
.into_iter()
.collect();

additional_topics.extend(active_topics);
additional_topics.extend(static_topics);
additional_topics
}
};
topics.into_iter().collect::<Vec<String>>()
}
}

#[derive(Debug, thiserror::Error)]
Expand Down
2 changes: 1 addition & 1 deletion src/graphql/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use graphcast_sdk::graphql::QueryError;
use graphql_client::{GraphQLQuery, Response};
use serde_derive::{Deserialize, Serialize};

// Maybe later on move graphql to SDK as the queries are pretty standarded
use graphcast_sdk::graphql::QueryError;

/// Derived GraphQL Query to Proof of Indexing
#[derive(GraphQLQuery, Serialize, Deserialize, Debug)]
Expand Down
Loading

0 comments on commit a603abe

Please sign in to comment.