Skip to content

Commit

Permalink
replicate: make manager take &self (#113)
Browse files Browse the repository at this point in the history
Taking &self is important for being able to use an Arc<Manager> for
spawned async tasks.

Special thanks to @SafariMonkey for pair programming with me.
  • Loading branch information
TheButlah authored Jun 20, 2024
1 parent db97b23 commit 8950c6e
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 31 deletions.
2 changes: 1 addition & 1 deletion crates/replicate/client/examples/example-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> Result<()> {

let args = Args::parse();

let mut manager = Manager::connect(args.url, args.token.as_deref())
let manager = Manager::connect(args.url, args.token.as_deref())
.await
.wrap_err("failed to connect to manager")?;
info!("Connected to manager!");
Expand Down
91 changes: 63 additions & 28 deletions crates/replicate/client/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

use std::fmt::Debug;

use eyre::{bail, ensure, eyre, Context};
use eyre::Result;
use eyre::{bail, ensure, Context, OptionExt};
use futures::sink::SinkExt;
use futures::stream::StreamExt;
use replicate_common::{
messages::manager::{Clientbound as Cb, Serverbound as Sb},
InstanceId,
};
use tokio::sync::{mpsc, oneshot};
use url::Url;

use crate::connect_to_url;
use crate::Ascii;

type Result<T> = eyre::Result<T>;
/// The number of queued rpc calls allowed before we start erroring.
const RPC_CAPACITY: usize = 64;

type Framed = replicate_common::Framed<wtransport::stream::BiStream, Cb, Sb>;

/// Manages instances on the instance server. Under the hood, this is all done
Expand All @@ -27,7 +31,8 @@ type Framed = replicate_common::Framed<wtransport::stream::BiStream, Cb, Sb>;
pub struct Manager {
_conn: wtransport::Connection,
_url: Url,
framed: Framed,
task: tokio::task::JoinHandle<Result<()>>,
request_tx: mpsc::Sender<(Sb, oneshot::Sender<Cb>)>,
}

impl Manager {
Expand Down Expand Up @@ -70,40 +75,70 @@ impl Manager {
);
}

let (request_tx, request_rx) = mpsc::channel(RPC_CAPACITY);
let task = tokio::spawn(manager_task(framed, request_rx));

Ok(Self {
_conn: conn,
_url: url,
framed,
task,
request_tx,
})
}

pub async fn instance_create(&mut self) -> Result<InstanceId> {
self.framed
.send(Sb::InstanceCreateRequest)
pub async fn instance_create(&self) -> Result<InstanceId> {
let response = self.request(Sb::InstanceCreateRequest).await?;
let Cb::InstanceCreateResponse { id } = response else {
bail!("unexpected response: {response:?}");
};
Ok(id)
}

pub async fn instance_url(&self, id: InstanceId) -> Result<Url> {
let response = self.request(Sb::InstanceUrlRequest { id }).await?;
let Cb::InstanceUrlResponse { url } = response else {
bail!("unexpected response: {response:?}");
};
Ok(url)
}

/// Panics if the connection is already dead
async fn request(&self, request: Sb) -> Result<Cb> {
let (response_tx, response_rx) = oneshot::channel();
self.request_tx
.send((request, response_tx))
.await
.wrap_err("failed to write message")?;
match self.framed.next().await {
None => Err(eyre!("server disconnected")),
Some(Err(err)) => {
Err(eyre::Report::new(err).wrap_err("failed to receive message"))
}
Some(Ok(Cb::InstanceCreateResponse { id })) => Ok(id),
Some(Ok(_)) => Err(eyre!("unexpected response")),
}
.wrap_err("failed to send to manager task")?;
response_rx
.await
.wrap_err("failed to receive from manager task")
}

pub async fn instance_url(&mut self, id: InstanceId) -> Result<Url> {
self.framed
.send(Sb::InstanceUrlRequest { id })
/// Destroys the manager and reaps any errors from its networking task
pub async fn join(self) -> Result<()> {
self.task
.await
.wrap_err("failed to write message")?;
match self.framed.next().await {
None => Err(eyre!("server disconnected")),
Some(Err(err)) => {
Err(eyre::Report::new(err).wrap_err("failed to receive message"))
}
Some(Ok(Cb::InstanceUrlResponse { url })) => Ok(url),
Some(Ok(_)) => Err(eyre!("unexpected response")),
}
.wrap_err("panic in manager task, file a bug report on github uwu")?
.wrap_err("error in task")
}
}

async fn manager_task(
mut framed: Framed,
mut request_rx: mpsc::Receiver<(Sb, oneshot::Sender<Cb>)>,
) -> Result<()> {
while let Some((request, response_tx)) = request_rx.recv().await {
framed
.send(request)
.await
.wrap_err("error while sending request")?;
let response = framed
.next()
.await
.ok_or_eyre("expected a response from the server")?
.wrap_err("error while receiving response")?;
let _ = response_tx.send(response);
}
// We only return ok when the manager struct was dropped
Ok(())
}
4 changes: 2 additions & 2 deletions crates/replicate/common/src/messages/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use url::Url;

use crate::InstanceId;

#[derive(Serialize, Deserialize, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)]
pub enum Serverbound {
InstanceUrlRequest { id: InstanceId },
InstanceCreateRequest,
HandshakeRequest,
}

#[derive(Serialize, Deserialize, Eq, PartialEq)]
#[derive(Serialize, Deserialize, Eq, PartialEq, Debug)]
pub enum Clientbound {
InstanceUrlResponse { url: Url },
InstanceCreateResponse { id: InstanceId },
Expand Down

0 comments on commit 8950c6e

Please sign in to comment.