Skip to content

Commit

Permalink
rt: add LocalRuntime (#6808)
Browse files Browse the repository at this point in the history
This change adds LocalRuntime, a new unstable runtime type which cannot be transferred across thread boundaries and supports spawn_local when called from the thread which owns the runtime.

The initial set of docs for this are iffy. Documentation is absent right now at the module level, with the docs for the LocalRuntime struct itself being somewhat duplicative of those for the `Runtime` type. This can be addressed later as stabilization nears.

This API has a few interesting implementation details:
- because it was considered beneficial to reuse the same Handle as the normal runtime, it is possible to call spawn_local from a runtime context while on a different thread from the one which drives the runtime and owns it. This forces us to check the thread ID before attempting a local spawn.
- An empty LocalOptions struct is passed into the build_local method in order to build the runtime. This will eventually have stuff in it like hooks.

Relates to #6739.
  • Loading branch information
Noah-Kennedy authored Oct 12, 2024
1 parent 5ada511 commit 512e9de
Show file tree
Hide file tree
Showing 12 changed files with 760 additions and 16 deletions.
81 changes: 72 additions & 9 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
#![cfg_attr(loom, allow(unused_imports))]

use crate::runtime::handle::Handle;
#[cfg(tokio_unstable)]
use crate::runtime::TaskMeta;
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
#[cfg(tokio_unstable)]
use crate::runtime::{LocalOptions, LocalRuntime, TaskMeta};
use crate::util::rand::{RngSeed, RngSeedGenerator};

use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use std::fmt;
use std::io;
use std::thread::ThreadId;
use std::time::Duration;

/// Builds Tokio Runtime with custom configuration values.
Expand Down Expand Up @@ -800,6 +803,37 @@ impl Builder {
}
}

/// Creates the configured `LocalRuntime`.
///
/// The returned `LocalRuntime` instance is ready to spawn tasks.
///
/// # Panics
/// This will panic if `current_thread` is not the selected runtime flavor.
/// All other runtime flavors are unsupported by [`LocalRuntime`].
///
/// [`LocalRuntime`]: [crate::runtime::LocalRuntime]
///
/// # Examples
///
/// ```
/// use tokio::runtime::Builder;
///
/// let rt = Builder::new_current_thread().build_local(&mut Default::default()).unwrap();
///
/// rt.block_on(async {
/// println!("Hello from the Tokio runtime");
/// });
/// ```
#[allow(unused_variables, unreachable_patterns)]
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
pub fn build_local(&mut self, options: &LocalOptions) -> io::Result<LocalRuntime> {
match &self.kind {
Kind::CurrentThread => self.build_current_thread_local_runtime(),
_ => panic!("Only current_thread is supported when building a local runtime"),
}
}

fn get_cfg(&self, workers: usize) -> driver::Cfg {
driver::Cfg {
enable_pause_time: match self.kind {
Expand Down Expand Up @@ -1191,8 +1225,40 @@ impl Builder {
}

fn build_current_thread_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::scheduler::{self, CurrentThread};
use crate::runtime::{runtime::Scheduler, Config};
use crate::runtime::runtime::Scheduler;

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(None)?;

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

#[cfg(tokio_unstable)]
fn build_current_thread_local_runtime(&mut self) -> io::Result<LocalRuntime> {
use crate::runtime::local_runtime::LocalRuntimeScheduler;

let tid = std::thread::current().id();

let (scheduler, handle, blocking_pool) =
self.build_current_thread_runtime_components(Some(tid))?;

Ok(LocalRuntime::from_parts(
LocalRuntimeScheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
}

fn build_current_thread_runtime_components(
&mut self,
local_tid: Option<ThreadId>,
) -> io::Result<(CurrentThread, Handle, BlockingPool)> {
use crate::runtime::scheduler;
use crate::runtime::Config;

let (driver, driver_handle) = driver::Driver::new(self.get_cfg(1))?;

Expand Down Expand Up @@ -1227,17 +1293,14 @@ impl Builder {
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
local_tid,
);

let handle = Handle {
inner: scheduler::Handle::CurrentThread(handle),
};

Ok(Runtime::from_parts(
Scheduler::CurrentThread(scheduler),
handle,
blocking_pool,
))
Ok((scheduler, handle, blocking_pool))
}

fn metrics_poll_count_histogram_builder(&self) -> Option<HistogramBuilder> {
Expand Down
29 changes: 27 additions & 2 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@ impl Handle {
/// # Panics
///
/// This function panics if the provided future panics, if called within an
/// asynchronous execution context, or if a timer future is executed on a
/// runtime that has been shut down.
/// asynchronous execution context, or if a timer future is executed on a runtime that has been
/// shut down.
///
/// # Examples
///
Expand Down Expand Up @@ -348,6 +348,31 @@ impl Handle {
self.inner.spawn(future, id)
}

#[track_caller]
#[allow(dead_code)]
pub(crate) unsafe fn spawn_local_named<F>(
&self,
future: F,
_meta: SpawnMeta<'_>,
) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: 'static,
{
let id = crate::runtime::task::Id::next();
#[cfg(all(
tokio_unstable,
tokio_taskdump,
feature = "rt",
target_os = "linux",
any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64")
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn_local(future, id)
}

/// Returns the flavor of the current `Runtime`.
///
/// # Examples
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/local_runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod runtime;

mod options;

pub use options::LocalOptions;
pub use runtime::LocalRuntime;
pub(super) use runtime::LocalRuntimeScheduler;
12 changes: 12 additions & 0 deletions tokio/src/runtime/local_runtime/options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::marker::PhantomData;

/// `LocalRuntime`-only config options
///
/// Currently, there are no such options, but in the future, things like `!Send + !Sync` hooks may
/// be added.
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct LocalOptions {
/// Marker used to make this !Send and !Sync.
_phantom: PhantomData<*mut u8>,
}
Loading

0 comments on commit 512e9de

Please sign in to comment.