Skip to content

Commit

Permalink
feat: Support TLS config
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine committed Oct 3, 2024
1 parent e1a12d4 commit 87b00a0
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 150 deletions.
103 changes: 103 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions examples/memstore/dynamic-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,17 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
Some(peer_addr) => {
log::info!("Running in Follower mode");

let ticket = Raft::request_id(options.raft_addr.clone(), peer_addr.clone())
let ticket = Raft::request_id(options.raft_addr.clone(), peer_addr.clone(), None)
.await
.unwrap();
let node_id = ticket.reserved_id;
let mut cfg = build_config(node_id);

cfg.initial_peers = Some(ticket.peers.clone().into());
let cfg = build_config(node_id, Some(ticket.peers.clone().into()));

#[cfg(feature = "inmemory_storage")]
let log_storage = MemStorage::create();

#[cfg(feature = "heed_storage")]
let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone())
let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone())
.expect("Failed to create storage");

#[cfg(feature = "rocksdb_storage")]
Expand Down Expand Up @@ -121,13 +119,13 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
}

let leader_node_id = 1;
let cfg = build_config(leader_node_id);
let cfg = build_config(leader_node_id, None);

#[cfg(feature = "inmemory_storage")]
let log_storage = MemStorage::create();

#[cfg(feature = "heed_storage")]
let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone())
let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone())
.expect("Failed to create storage");

#[cfg(feature = "rocksdb_storage")]
Expand Down
2 changes: 1 addition & 1 deletion examples/memstore/src/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use memstore_example_harness::state_machine::LogEntry;
#[actix_rt::main]
async fn main() {
println!("---Message propose---");
let mut leader_client = create_client(&"127.0.0.1:60061").await.unwrap();
let mut leader_client = create_client(&"127.0.0.1:60061", None).await.unwrap();

leader_client
.propose(raft_service::ProposeArgs {
Expand Down
5 changes: 2 additions & 3 deletions examples/memstore/static-members/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,13 @@ async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
.get_node_id_by_addr(options.raft_addr.clone())
.unwrap();

let mut cfg = build_config(node_id);
cfg.initial_peers = Some(initial_peers.clone());
let cfg = build_config(node_id, Some(initial_peers.clone()));

#[cfg(feature = "inmemory_storage")]
let log_storage = MemStorage::create();

#[cfg(feature = "heed_storage")]
let log_storage = HeedStorage::create(&cfg.log_dir, &cfg.clone(), logger.clone())
let log_storage = HeedStorage::create(cfg.get_log_dir(), &cfg.clone(), logger.clone())
.expect("Failed to create storage");

#[cfg(feature = "rocksdb_storage")]
Expand Down
37 changes: 25 additions & 12 deletions examples/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use raftify::{Config, RaftConfig};
use raftify::{Config, ConfigBuilder, Peers, RaftConfig, TlsConfig};

use crate::utils::{ensure_directory_exist, get_storage_path};

pub fn build_config(node_id: u64) -> Config {
pub fn build_config(node_id: u64, initial_peers: Option<Peers>) -> Config {
let raft_config = RaftConfig {
id: node_id,
election_tick: 10,
Expand All @@ -14,14 +14,27 @@ pub fn build_config(node_id: u64) -> Config {
let storage_pth = get_storage_path("./logs", node_id);
ensure_directory_exist(&storage_pth).expect("Failed to create storage directory");

Config {
bootstrap_from_snapshot: false,
tick_interval: 0.2,
log_dir: storage_pth.clone(),
save_compacted_logs: true,
compacted_log_dir: storage_pth,
compacted_log_size_threshold: 1024 * 1024 * 1024,
raft_config,
..Default::default()
}
let client_tls_config = TlsConfig {
cert_path: None,
key_path: None,
ca_cert_path: Some("./ca_cert.pem".to_string()),
domain_name: Some("localhost".to_string()),
};

let config_builder = ConfigBuilder::new()
.log_dir("./logs".to_owned())
.save_compacted_logs(true)
.compacted_log_dir("./logs".to_owned())
.compacted_log_size_threshold(1024 * 1024 * 1024)
.raft_config(raft_config)
.tick_interval(0.2)
.global_client_tls_config(client_tls_config);

let config_builder = if let Some(peers) = initial_peers {
config_builder.initial_peers(peers)
} else {
config_builder
};

config_builder.build()
}
27 changes: 17 additions & 10 deletions harness/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use raftify::{Config, RaftConfig};
use raftify::{Config, ConfigBuilder, Peers, RaftConfig};

use crate::utils::{ensure_directory_exist, get_storage_path};

pub fn build_config(node_id: u64) -> Config {
pub fn build_config(node_id: u64, initial_peers: Option<Peers>) -> Config {
let raft_config = RaftConfig {
id: node_id,
election_tick: 10,
Expand All @@ -14,12 +14,19 @@ pub fn build_config(node_id: u64) -> Config {
let storage_path = get_storage_path("./logs", node_id);
ensure_directory_exist(&storage_path).expect("Failed to create storage directory");

Config {
log_dir: storage_path.clone(),
save_compacted_logs: true,
compacted_log_dir: storage_path,
compacted_log_size_threshold: 1024 * 1024 * 1024,
raft_config,
..Default::default()
}
let config_builder = ConfigBuilder::new()
.log_dir("./logs".to_owned())
.save_compacted_logs(true)
.compacted_log_dir("./logs".to_owned())
.compacted_log_size_threshold(1024 * 1024 * 1024)
.raft_config(raft_config)
.tick_interval(0.2);

let config_builder = if let Some(peers) = initial_peers {
config_builder.initial_peers(peers)
} else {
config_builder
};

config_builder.build()
}
30 changes: 16 additions & 14 deletions harness/src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ fn run_raft(
should_be_leader: bool,
) -> Result<JoinHandle<Result<()>>> {
let peer = peers.get(node_id).unwrap();
let mut cfg = build_config(*node_id);
cfg.initial_peers = if should_be_leader {
None
} else {
Some(peers.clone())
};
let cfg = build_config(
*node_id,
if should_be_leader {
None
} else {
Some(peers.clone())
},
);

let store = HashStore::new();
let logger = build_logger();
let storage_pth = get_storage_path(cfg.log_dir.as_str(), *node_id);
let storage_pth = get_storage_path(cfg.get_log_dir(), *node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(
Expand Down Expand Up @@ -141,9 +143,9 @@ pub async fn spawn_extra_node(
slog: build_logger(),
});

let cfg = build_config(node_id);
let cfg = build_config(node_id, None);
let store = HashStore::new();
let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
let storage_pth = get_storage_path(cfg.get_log_dir(), node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?;
Expand All @@ -167,16 +169,16 @@ pub async fn spawn_and_join_extra_node(
let logger = Arc::new(Slogger {
slog: build_logger(),
});
let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned())
let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned(), None)
.await
.unwrap();

let node_id = join_ticket.reserved_id;
let mut cfg = build_config(node_id);
cfg.initial_peers = Some(join_ticket.peers.clone().into());
let cfg = build_config(node_id, Some(join_ticket.peers.clone().into()));

let store = HashStore::new();

let storage_pth = get_storage_path(cfg.log_dir.as_str(), node_id);
let storage_pth = get_storage_path(cfg.get_log_dir(), node_id);
ensure_directory_exist(storage_pth.as_str())?;

let storage = HeedStorage::create(&storage_pth, &cfg, logger.clone())?;
Expand All @@ -202,7 +204,7 @@ pub async fn spawn_and_join_extra_node(
pub async fn join_nodes(rafts: Vec<&Raft>, raft_addrs: Vec<&str>, peer_addr: &str) {
let mut tickets = vec![];
for (raft, raft_addr) in rafts.iter().zip(raft_addrs.into_iter()) {
let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned())
let join_ticket = Raft::request_id(raft_addr.to_owned(), peer_addr.to_owned(), None)
.await
.unwrap();

Expand Down
3 changes: 2 additions & 1 deletion raftify-cli/src/commands/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ pub fn debug_persitsted_all<LogStorage: StableStorage>(path_str: &str, logger: s
}

pub async fn debug_node(addr: &str) -> Result<()> {
let mut client = create_client(&addr).await?;
// TODO: Support TLS configuration
let mut client = create_client(&addr, None).await?;
let response = client.debug_node(raft_service::Empty {}).await?;
let json = response.into_inner().result_json;
let parsed: HashMap<String, Value> = serde_json::from_str(&json).unwrap();
Expand Down
Loading

0 comments on commit 87b00a0

Please sign in to comment.