Skip to content

Commit

Permalink
Handoff to instance now works
Browse files Browse the repository at this point in the history
  • Loading branch information
TheButlah committed Mar 25, 2024
1 parent 25eedea commit f22bf5f
Show file tree
Hide file tree
Showing 13 changed files with 222 additions and 101 deletions.
7 changes: 3 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ members = [
"apps/social/networking",
"apps/social/server",
"crates/egui-picking",
"crates/nexus-voicechat",
"crates/picking-xr",
"crates/replicate/client",
"crates/replicate/common",
Expand Down Expand Up @@ -74,6 +73,7 @@ features = [
"deref",
"deref_mut",
"mul",
"from",
]

[workspace.dependencies.opus]
Expand Down
11 changes: 0 additions & 11 deletions crates/nexus-voicechat/Cargo.toml

This file was deleted.

14 changes: 0 additions & 14 deletions crates/nexus-voicechat/src/lib.rs

This file was deleted.

16 changes: 13 additions & 3 deletions crates/replicate/client/examples/example-client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use color_eyre::{eyre::WrapErr, Result};
use replicate_client::manager::Manager;
use replicate_client::{instance::Instance, manager::Manager};
use replicate_common::did::{AuthenticationAttestation, Did, DidPrivateKey};
use tracing::info;
use tracing_subscriber::{filter::LevelFilter, EnvFilter};
Expand Down Expand Up @@ -36,7 +36,7 @@ async fn main() -> Result<()> {

let auth_attest = AuthenticationAttestation::new(did, &did_private_key);

let mut manager = Manager::connect(args.url, auth_attest)
let mut manager = Manager::connect(args.url, &auth_attest)
.await
.wrap_err("failed to connect to manager")?;
info!("Connected to manager!");
Expand All @@ -45,7 +45,17 @@ async fn main() -> Result<()> {
.instance_create()
.await
.wrap_err("failed to create instance")?;
info!("Got instance: {instance_id}");

let instance_url = manager
.instance_url(instance_id)
.await
.wrap_err("failed to get instance url")?;
info!("Got instance {instance_id} at: {instance_url}");

let _instance = Instance::connect(instance_url, auth_attest)
.await
.wrap_err("failed to connect to instance")?;
info!("Connected to instance!");

Ok(())
}
99 changes: 50 additions & 49 deletions crates/replicate/client/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,34 @@
use std::sync::{atomic::AtomicU16, RwLock, RwLockReadGuard, RwLockWriteGuard};

use base64::prelude::{Engine, BASE64_URL_SAFE_NO_PAD};
use eyre::{bail, ensure, Result, WrapErr};
use futures::{SinkExt, StreamExt};
use replicate_common::{
data_model::{DataModel, Entity, State},
did::AuthenticationAttestation,
};
use tracing::warn;
use url::Url;
use wtransport::{endpoint::ConnectOptions, ClientConfig, Endpoint, RecvStream};
use wtransport::{endpoint::ConnectOptions, ClientConfig, Endpoint};

use crate::CertHashDecodeErr;

use replicate_common::messages::instance::{Clientbound as Cb, Serverbound as Sb};
type RpcFramed = replicate_common::Framed<wtransport::stream::BiStream, Cb, Sb>;

/// Client api for interacting with a particular instance on the server.
/// Instances manage persistent, realtime state updates for many concurrent clients.
#[derive(Debug)]
pub struct Instance {
_conn: wtransport::Connection,
_url: String,
_url: Url,
/// Used to reliably push state updates from server to client. This happens for all
/// entities when the client initially connects, as well as when the server is
/// marking an entity as "stable", meaning its state is no longer changing frame to
/// frame. This allows the server to reduce network bandwidth.
_stable_states: RecvStream,
// _stable_states: RecvStream,
/// Used for general RPC.
_rpc: RpcFramed,
/// Current sequence number.
// TODO: Figure out how sequence numbers work
_state_seq: StateSeq,
Expand All @@ -118,9 +125,42 @@ impl Instance {
pub async fn connect(
url: Url,
auth_attest: AuthenticationAttestation,
) -> Result<Self, ConnectErr> {
let _conn = connect_to_url(url, auth_attest).await?;
todo!()
) -> Result<Self> {
let conn = connect_to_url(&url, auth_attest)
.await
.wrap_err("failed to connect to server")?;

let bi = wtransport::stream::BiStream::join(
conn.open_bi()
.await
.wrap_err("could not initiate bi stream")?
.await
.wrap_err("could not finish opening bi stream")?,
);
let mut rpc = RpcFramed::new(bi);

// Do handshake before anything else
{
rpc.send(Sb::HandshakeRequest)
.await
.wrap_err("failed to send handshake request")?;
let Some(msg) = rpc.next().await else {
bail!("Server disconnected before completing handshake");
};
let msg = msg.wrap_err("error while receiving handshake response")?;
ensure!(
msg == Cb::HandshakeResponse,
"invalid message during handshake"
);
}

Ok(Self {
_conn: conn,
_url: url,
_state_seq: Default::default(),
dm: RwLock::new(DataModel::new()),
_rpc: rpc,
})
}

/// Asks the server to reserve for this client a list of entity ids and store them
Expand All @@ -130,7 +170,7 @@ impl Instance {
pub async fn reserve_entities(
&self,
#[allow(clippy::ptr_arg)] _entities: &mut Vec<Entity>,
) -> Result<(), ReserveErr> {
) -> Result<()> {
todo!()
}

Expand All @@ -154,52 +194,13 @@ pub enum RecvState<'a> {
}

/// Sequence number for state messages
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct StateSeq(AtomicU16);

mod error {
use crate::CertHashDecodeErr;
use wtransport::error::{ConnectingError, SendDatagramError, StreamWriteError};

#[derive(thiserror::Error, Debug)]
pub enum ReserveErr {}

#[derive(thiserror::Error, Debug)]
pub enum DeleteErr {}

#[derive(thiserror::Error, Debug)]
pub enum SendStateErr {
#[error("error while sending state across network: {0}")]
Dgram(#[from] SendDatagramError),
}

#[derive(thiserror::Error, Debug)]
pub enum SendReliableStateErr {
#[error("error while finalizing state to network: {0}")]
StreamWrite(#[from] StreamWriteError),
}

#[derive(thiserror::Error, Debug)]
pub enum RecvStateErr {}

#[derive(thiserror::Error, Debug)]
pub enum ConnectErr {
#[error("failed to create webtransport client: {0}")]
ClientCreate(#[from] std::io::Error),
#[error("failed to connect to webtransport endoint: {0}")]
WtConnectingError(#[from] ConnectingError),
#[error(transparent)]
InvalidCertHash(#[from] CertHashDecodeErr),
#[error(transparent)]
Other(#[from] Box<dyn std::error::Error>),
}
}
pub use self::error::*;

async fn connect_to_url(
url: Url,
url: &Url,
auth_attest: AuthenticationAttestation,
) -> Result<wtransport::Connection, ConnectErr> {
) -> Result<wtransport::Connection> {
let cert_hash = if let Some(frag) = url.fragment() {
let cert_hash = BASE64_URL_SAFE_NO_PAD
.decode(frag)
Expand Down
18 changes: 17 additions & 1 deletion crates/replicate/client/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl Manager {
/// our DID.
pub async fn connect(
url: Url,
auth_attest: AuthenticationAttestation,
auth_attest: &AuthenticationAttestation,
) -> Result<Self> {
let cert_hash = if let Some(frag) = url.fragment() {
let cert_hash = BASE64_URL_SAFE_NO_PAD
Expand Down Expand Up @@ -105,6 +105,7 @@ impl Manager {
"invalid message during handshake"
);
}

Ok(Self {
_conn: conn,
_url: url,
Expand All @@ -126,4 +127,19 @@ impl Manager {
Some(Ok(_)) => Err(eyre!("unexpected response")),
}
}

pub async fn instance_url(&mut self, id: InstanceId) -> Result<Url> {
self.framed
.send(Sb::InstanceUrlRequest { id })
.await
.wrap_err("failed to write message")?;
match self.framed.next().await {
None => Err(eyre!("server disconnected")),
Some(Err(err)) => {
Err(eyre::Report::new(err).wrap_err("failed to receive message"))
}
Some(Ok(Cb::InstanceUrlResponse { url })) => Ok(url),
Some(Ok(_)) => Err(eyre!("unexpected response")),
}
}
}
1 change: 1 addition & 0 deletions crates/replicate/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ thiserror.workspace = true
tokio = { workspace = true, default-features = false }
tokio-serde.workspace = true
tokio-util.workspace = true
url.workspace = true
uuid = { workspace = true, features = ["v4", "serde"] }
11 changes: 11 additions & 0 deletions crates/replicate/common/src/messages/instance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Serverbound {
HandshakeRequest,
}

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Clientbound {
HandshakeResponse,
}
3 changes: 3 additions & 0 deletions crates/replicate/common/src/messages/manager.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use serde::{Deserialize, Serialize};
use url::Url;

use crate::InstanceId;

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Serverbound {
InstanceUrlRequest { id: InstanceId },
InstanceCreateRequest,
HandshakeRequest,
}

#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub enum Clientbound {
InstanceUrlResponse { url: Url },
InstanceCreateResponse { id: InstanceId },
HandshakeResponse,
}
1 change: 1 addition & 0 deletions crates/replicate/common/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
//! We should switch to protobuf or capnproto as soon as we prove the networking
//! works.

pub mod instance;
pub mod manager;
2 changes: 2 additions & 0 deletions crates/replicate/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes.workspace = true
clap.workspace = true
color-eyre.workspace = true
dashmap = "5.5.3"
derive_more.workspace = true
eyre.workspace = true
futures.workspace = true
replicate-common.path = "../common"
Expand All @@ -21,6 +22,7 @@ tokio-serde = { workspace = true, features = ["json"] }
tokio-util = { workspace = true, features = ["codec"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing.workspace = true
url.workspace = true
uuid = { version = "1.6.1", features = ["v4", "serde"] }
wtransport.workspace = true

Expand Down
Loading

0 comments on commit f22bf5f

Please sign in to comment.