Skip to content

Commit

Permalink
rt: add LocalRuntime
Browse files Browse the repository at this point in the history
This change adds LocalRuntime, a new unstable runtime type which cannot be transferred across thread boundaries and supports spawn_local when called from the thread which owns the runtime.

The initial set of docs for this are iffy. Documentation is absent right now at the module level, with the docs for the LocalRuntime struct itself being somewhat duplicative of those for the `Runtime` type. This can probably be addressed later as stabilization nears.

This API has a few interesting implementation details:
- because it was considered beneficial to reuse the same Handle as the normal runtime, it is possible to call spawn_local from a runtime context while on a different thread from the one which drives the runtime and owns it. This forces us to check the thread ID before attempting a local spawn.
- An empty LocalOptions struct is passed into the build_local method in order to build the runtime. This will eventually have stuff in it like hooks.

Relates to #6739.
  • Loading branch information
Noah-Kennedy committed Oct 2, 2024
1 parent 2c14f88 commit e6e7aff
Show file tree
Hide file tree
Showing 10 changed files with 618 additions and 15 deletions.
72 changes: 64 additions & 8 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
use crate::runtime::handle::Handle;
#[cfg(tokio_unstable)]
use crate::runtime::TaskMeta;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
use crate::runtime::{
blocking, driver, Callback, HistogramBuilder, LocalOptions, LocalRuntime, Runtime, TaskCallback,
};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use std::fmt;
use std::io;
use std::thread::ThreadId;
use std::time::Duration;

/// Builds Tokio Runtime with custom configuration values.
Expand Down Expand Up @@ -800,6 +805,29 @@ impl Builder {
}
}

/// Creates the configured `LocalRuntime`.
///
/// The returned `LocalRuntime` instance is ready to spawn tasks.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
/// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
#[allow(unused_variables)]
pub fn build_local(&mut self, options: &mut LocalOptions) -> io::Result<LocalRuntime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_local_runtime(),
_ => panic!("Only current_thread is supported when building a local runtime"),
}
}

fn get_cfg(&self, workers: usize) -> driver::Cfg {
driver::Cfg {
enable_pause_time: match self.kind {
Expand Down Expand Up @@ -1191,8 +1219,39 @@ impl Builder {
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
use crate::runtime::runtime::Scheduler;

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(None)?;

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
use crate::runtime::local_runtime::LocalRuntimeScheduler;

let tid = std::thread::current().id();

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(Some(tid))?;

Ok(LocalRuntime::from_parts(
LocalRuntimeScheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

fn build_current_thread_runtime_components(
&mut self,
local_tid: Option<ThreadId>,
) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
use crate::runtime::scheduler;
use crate::runtime::Config;

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;

Expand Down Expand Up @@ -1227,17 +1286,14 @@ impl Builder {
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
local_tid,
);

let handle = Handle {
inner: scheduler::Handle::CurrentThread(handle),
};

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
Ok((scheduler, handle, blocking_pool))
}

fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
Expand Down
28 changes: 26 additions & 2 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ impl Handle {
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
/// asynchronous execution context, or if a timer future is executed on a runtime that has been
/// shut down.
///
/// # Examples
///
Expand Down Expand Up @@ -345,6 +345,30 @@ impl Handle {
self.inner.spawn(future, id)
}

#[track_caller]
pub(crate) unsafe fn spawn_local_named<F>(
&self,
future: F,
_name: Option<&str>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
self.inner.spawn_local(future, id)
}

/// Returns the flavor of the current `Runtime`.
///
/// # Examples
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/local_runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod runtime;

mod options;

pub use options::LocalOptions;
pub use runtime::LocalRuntime;
pub(super) use runtime::LocalRuntimeScheduler;
9 changes: 9 additions & 0 deletions tokio/src/runtime/local_runtime/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/// LocalRuntime-only config options
///
/// Currently, there are no such options, but in the future, things like `!Send + !Sync` hooks may
/// be added.
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct LocalOptions {
// todo add local hooks at a later point
}
Loading

0 comments on commit e6e7aff

Please sign in to comment.