Skip to content

Commit

Permalink
Clean up things
Browse files Browse the repository at this point in the history
  • Loading branch information
yorickpeterse committed Aug 27, 2024
1 parent f5e9db5 commit 4256039
Show file tree
Hide file tree
Showing 30 changed files with 772 additions and 445 deletions.
132 changes: 112 additions & 20 deletions rt/src/scheduler/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ pub struct Thread {
/// A value of 0 indicates the thread isn't blocked.
blocked_at: u64,

/// The number of nested blocking calls we're in.
blocked_nesting: u64,

/// The ID of the network poller assigned to this thread.
///
/// Threads are each assigned a network poller in a round-robin fashion.
Expand Down Expand Up @@ -206,6 +209,7 @@ impl Thread {
work: pool.threads[id].queue.clone(),
backup: false,
blocked_at: NOT_BLOCKING,
blocked_nesting: 0,
network_poller,
rng: thread_rng(),
stacks: StackPool::new(pool.stack_size),
Expand All @@ -222,6 +226,7 @@ impl Thread {
work: pool.threads[0].queue.clone(),
backup: true,
blocked_at: NOT_BLOCKING,
blocked_nesting: 0,
network_poller,
rng: thread_rng(),
stacks: StackPool::new(pool.stack_size),
Expand Down Expand Up @@ -267,10 +272,20 @@ impl Thread {
return;
}

// It's possible a user signals the start of a blocking call while this
// was already done so. This ensures that we handle such cases
// gracefully instead of potentially leaving the thread in a weird
// state.
if self.blocked_nesting > 0 {
self.blocked_nesting += 1;
return;
}

let epoch = self.pool.current_epoch();
let shared = &self.pool.threads[self.id];

self.blocked_at = epoch;
self.blocked_nesting = 1;
shared.blocked_at.store(epoch, Ordering::Release);

// The monitor thread may be sleeping indefinitely if we're the first
Expand All @@ -296,30 +311,18 @@ impl Thread {
return;
}

let shared = &self.pool.threads[self.id];
let epoch = self.blocked_at;
self.blocked_nesting = self.blocked_nesting.saturating_sub(1);

if shared
.blocked_at
.compare_exchange(
epoch,
NOT_BLOCKING,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
// The monitor thread determined we took too long and we have to
// become a backup thread.
self.backup = true;
if self.blocked_nesting > 0 {
return;
}

self.blocked_at = NOT_BLOCKING;
self.reset_blocked_at();

// If the closure took too long to run (e.g. an IO operation took too
// long), we have to give up running the process. If we continue running
// we could mess up whatever thread has taken over our queue/work, and
// we'd be using the OS thread even longer than we already have.
// If the operation took too long to run, we have to give up running the
// process. If we continue running we could mess up whatever thread has
// taken over our queue/work, and we'd be using the OS thread even
// longer than we already have.
//
// We schedule onto the global queue because if another thread took over
// but found no other work, it may have gone to sleep. In that case
Expand All @@ -340,6 +343,28 @@ impl Thread {
}
}

fn reset_blocked_at(&mut self) {
let shared = &self.pool.threads[self.id];
let epoch = self.blocked_at;

if shared
.blocked_at
.compare_exchange(
epoch,
NOT_BLOCKING,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_err()
{
// The monitor thread determined we took too long and we have to
// become a backup thread.
self.backup = true;
}

self.blocked_at = NOT_BLOCKING;
}

fn run(&mut self, state: &State) {
while self.pool.is_alive() {
if self.backup {
Expand Down Expand Up @@ -555,6 +580,15 @@ impl Thread {
CURRENT_PROCESS.set(null_mut());
}

// It's possible that we finish work with an uneven number of calls to
// `start_blocking` and `stop_blocking`, such as when the developer
// didn't pay attention to the documentation telling them to do just
// that.
if self.blocked_nesting > 0 {
self.blocked_nesting = 0;
self.reset_blocked_at();
}

match self.action.take() {
Action::Terminate => {
// Process termination can't be safely done on the process'
Expand Down Expand Up @@ -1196,6 +1230,7 @@ mod tests {
thread.start_blocking();

assert_eq!(thread.blocked_at, 4);
assert_eq!(thread.blocked_nesting, 1);
assert_eq!(pool.threads[1].blocked_at.load(Ordering::Acquire), 4);
assert_eq!(pool.monitor.status.load(), MonitorStatus::Notified);
}
Expand All @@ -1213,6 +1248,7 @@ mod tests {

assert!(!thread.backup);
assert!(pool.global.lock().unwrap().is_empty());
assert_eq!(thread.blocked_nesting, 0);

thread.start_blocking();
pool.threads[1].blocked_at.store(NOT_BLOCKING, Ordering::Release);
Expand All @@ -1221,6 +1257,62 @@ mod tests {
assert!(thread.backup);
assert_eq!(thread.blocked_at, NOT_BLOCKING);
assert_eq!(pool.global.lock().unwrap().len(), 1);
assert_eq!(thread.blocked_nesting, 0);
}

#[test]
fn test_thread_start_blocking_nested() {
let state = setup();
let pool = &state.scheduler.pool;
let mut thread = Thread::new(1, 0, pool.clone());
let class = empty_process_class("A");
let process = new_process(*class).take_and_forget();

thread.start_blocking();
thread.start_blocking();
thread.start_blocking();
pool.threads[1].blocked_at.store(NOT_BLOCKING, Ordering::Release);

thread.stop_blocking(process);
assert!(!thread.backup);
assert!(pool.global.lock().unwrap().is_empty());
assert_eq!(thread.blocked_nesting, 2);

thread.stop_blocking(process);
assert!(!thread.backup);
assert!(pool.global.lock().unwrap().is_empty());
assert_eq!(thread.blocked_nesting, 1);

thread.stop_blocking(process);
assert!(thread.backup);
assert_eq!(pool.global.lock().unwrap().len(), 1);
assert_eq!(thread.blocked_nesting, 0);
assert_eq!(thread.blocked_at, NOT_BLOCKING);
}

#[test]
fn test_thread_start_blocking_without_stop_blocking() {
let class = empty_process_class("A");
let proc = new_process_with_message(*class, method).take_and_forget();
let state = setup();
let pool = &state.scheduler.pool;
let mut thread = Thread::new(1, 0, pool.clone());

pool.epoch.store(4, Ordering::Release);
pool.monitor.status.store(MonitorStatus::Sleeping);

thread.schedule(proc);
thread.start_blocking();
pool.threads[1].blocked_at.store(NOT_BLOCKING, Ordering::Release);
thread.run(&state);

assert!(thread.backup);
assert_eq!(thread.blocked_nesting, 0);
assert_eq!(
pool.threads[1].blocked_at.load(Ordering::Acquire),
NOT_BLOCKING
);
assert_eq!(pool.monitor.status.load(), MonitorStatus::Notified);
}

#[test]
Expand Down
22 changes: 22 additions & 0 deletions std/src/std/alloc.inko
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import std.libc.freebsd (self as libc) if freebsd
import std.libc.linux (self as libc) if linux
import std.libc.mac (self as libc) if mac

fn free(pointer: Pointer[UInt8]) {
libc.free(pointer)
}

fn resize[T](buffer: Pointer[T], size: Int) -> Pointer[T] {
let ptr = libc.realloc(buffer as Pointer[UInt8], size)

# In this case there's nothing we can do but abort.
if ptr as Int == 0 and size != 0 {
panic('std.libc.unix.alloc.resize() failed to allocate memory')
}

ptr as Pointer[T]
}

fn copy[T](from: Pointer[T], to: Pointer[T], size: Int) {
libc.memmove(to as Pointer[UInt8], from as Pointer[UInt8], size)
}
12 changes: 6 additions & 6 deletions std/src/std/array.inko
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# An ordered, integer-indexed generic collection of values.
import std.alloc
import std.clone (Clone)
import std.cmp (Compare, Contains, Equal, Ordering, max, min)
import std.drop (Drop)
import std.fmt (Format, Formatter)
import std.hash (Hash, Hasher)
import std.iter (Iter, Stream)
import std.libc.unix.alloc (self as alloc_imp) if unix
import std.option (Option)
import std.rand (Shuffle)

Expand Down Expand Up @@ -113,7 +113,7 @@ class builtin Array[T] {
if size < 0 { panic('The capacity must be greater than or equal to zero') }

let vsize = _INKO.size_of_type_parameter(T)
let buffer = alloc_imp.resize(0x0 as Pointer[T], size: size * vsize)
let buffer = alloc.resize(0x0 as Pointer[T], size: size * vsize)

Array(size: 0, capacity: size, buffer: buffer)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ class builtin Array[T] {
let vsize = _INKO.size_of_type_parameter(T)

@capacity = max(@capacity * 2, @capacity + size)
@buffer = alloc_imp.resize(@buffer, @capacity * vsize)
@buffer = alloc.resize(@buffer, @capacity * vsize)
}

# Removes all values in the Array.
Expand Down Expand Up @@ -250,7 +250,7 @@ class builtin Array[T] {
let val = addr.0
let vsize = _INKO.size_of_type_parameter(T)

alloc_imp.copy(
alloc.copy(
from: addr as Int + vsize as Pointer[T],
to: addr,
size: len - index - 1 * vsize,
Expand Down Expand Up @@ -598,7 +598,7 @@ class builtin Array[T] {
let to = address_of(index + 1)
let vsize = _INKO.size_of_type_parameter(T)

alloc_imp.copy(from, to, size: @size - index * vsize)
alloc.copy(from, to, size: @size - index * vsize)
}

write_to(index, value)
Expand Down Expand Up @@ -733,7 +733,7 @@ impl Array if T: mut {
impl Drop for Array {
fn mut drop {
clear
alloc_imp.free(@buffer as Pointer[UInt8])
alloc.free(@buffer as Pointer[UInt8])
}
}

Expand Down
23 changes: 9 additions & 14 deletions std/src/std/fs/unix/dir.inko
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,16 @@ import std.iter (Iter)
# different names for `readdir()` (e.g. `readdir$INODE64` on macOS). To handle
# this, platform specific code (e.g. calculating the offset of `dirent.d_name`)
# is pushed into specific modules.
import std.libc.freebsd.dir (self as dir_imp) if freebsd
import std.libc.linux.dir (self as dir_imp) if linux
import std.libc.mac.amd64.dir (self as dir_imp) if mac and amd64
import std.libc.mac.arm64.dir (self as dir_imp) if mac and arm64
import std.libc.freebsd (self as libc) if freebsd
import std.libc.linux (self as libc) if linux
import std.libc.mac (self as libc) if mac

# These constants are the same across the various Unix platforms we support, so
# we can just define them here.
let DT_DIR = 4
let DT_LNK = 10
let DT_REG = 8

fn extern closedir(stream: Pointer[UInt8]) -> Int32

fn extern strlen(pointer: Pointer[UInt8]) -> UInt64

fn extern inko_reset_error

# The byte of the "." character.
Expand All @@ -31,7 +26,7 @@ class ReadDirectory {
let @stream: Pointer[UInt8]

fn static new(path: String) -> Result[ReadDirectory, Error] {
let stream = dir_imp.opendir(path.to_pointer)
let stream = libc.opendir(path.to_pointer)

if stream as Int != 0 {
Result.Ok(ReadDirectory(stream))
Expand All @@ -48,7 +43,7 @@ impl Iter[Result[(String, FileType), Error]] for ReadDirectory {
# don't reschedule the process until after the call and the `errno` check.
inko_reset_error

let entry = dir_imp.readdir(@stream)
let entry = libc.readdir(@stream)

if entry as Int == 0 {
return match Error.last_os_error {
Expand All @@ -57,8 +52,8 @@ impl Iter[Result[(String, FileType), Error]] for ReadDirectory {
}
}

let name_ptr = dir_imp.dirent_name(entry)
let name_len = strlen(name_ptr) as Int
let name_ptr = libc.dirent_name(entry)
let name_len = libc.strlen(name_ptr) as Int
let first = name_ptr.0 as Int

# Skip "."
Expand All @@ -74,7 +69,7 @@ impl Iter[Result[(String, FileType), Error]] for ReadDirectory {
}

let name = String.from_pointer(name_ptr)
let type = match dir_imp.dirent_type(entry) {
let type = match libc.dirent_type(entry) {
case DT_DIR -> FileType.Directory
case DT_REG -> FileType.File
case DT_LNK -> FileType.SymbolicLink
Expand All @@ -90,6 +85,6 @@ impl Drop for ReadDirectory {
fn mut drop {
# closedir() only errors if the stream itself is invalid, which shouldn't be
# the case at this point.
closedir(@stream)
libc.closedir(@stream)
}
}
Loading

0 comments on commit 4256039

Please sign in to comment.