diff --git a/deno.json b/deno.json index 03923ed9f..9250d33d6 100644 --- a/deno.json +++ b/deno.json @@ -11,17 +11,13 @@ }, "exclude": [ "build", - "website", "www", - "packages" ] }, "fmt": { "exclude": [ "build", - "website", "www", - "packages", "CODE_OF_CONDUCT.md", "README.md" ] @@ -29,7 +25,6 @@ "test": { "exclude": [ "build", - "packages" ] }, "compilerOptions": { diff --git a/lib/abort-signal.ts b/lib/abort-signal.ts index fc94ed0fc..169d877f5 100644 --- a/lib/abort-signal.ts +++ b/lib/abort-signal.ts @@ -1,5 +1,5 @@ import type { Operation } from "./types.ts"; -import { resource } from "./instructions.ts"; +import { resource } from "./resource.ts"; /** * Create an @@ -29,7 +29,7 @@ import { resource } from "./instructions.ts"; * ``` */ export function useAbortSignal(): Operation { - return resource(function* AbortSignal(provide) { + return resource(function* (provide) { let controller = new AbortController(); try { yield* provide(controller.signal); diff --git a/lib/action.ts b/lib/action.ts new file mode 100644 index 000000000..bf57fd912 --- /dev/null +++ b/lib/action.ts @@ -0,0 +1,34 @@ +import { Err, Ok } from "./result.ts"; +import { Effect, Operation } from "./types.ts"; + +interface Resolver { + (resolve: (value: T) => void, reject: (error: Error) => void): () => void; +} + +export function action(resolver: Resolver, desc?: string): Operation { + return { + *[Symbol.iterator]() { + let action: Effect = { + description: desc ?? "action", + enter: (settle) => { + let resolve = (value: T) => { + settle(Ok(value)); + }; + let reject = (error: Error) => { + settle(Err(error)); + }; + let discard = resolver(resolve, reject); + return (discarded) => { + try { + discard(); + discarded(Ok()); + } catch (error) { + discarded(Err(error)); + } + }; + }, + }; + return (yield action) as T; + }, + }; +} diff --git a/lib/all.ts b/lib/all.ts index c8d32e0cc..d386587fe 100644 --- a/lib/all.ts +++ b/lib/all.ts @@ -1,6 +1,6 @@ import type { Operation, Task, Yielded } from "./types.ts"; -import { spawn } from "./instructions.ts"; -import { call } from "./call.ts"; +import { spawn } from "./spawn.ts"; +import { encapsulate, trap } from "./task.ts"; /** * Block and wait for all of the given operations to complete. Returns @@ -27,20 +27,23 @@ import { call } from "./call.ts"; * @param ops a list of operations to wait for * @returns the list of values that the operations evaluate to, in the order they were given */ -export function all[] | []>( +export function* all[] | []>( ops: T, ): Operation> { - return call(function* () { - let tasks: Task[] = []; - for (let operation of ops) { - tasks.push(yield* spawn(() => operation)); - } - let results = []; - for (let task of tasks) { - results.push(yield* task); - } - return results as All; - }); + return yield* trap(() => + encapsulate(function* (): Operation> { + let tasks: Task[] = []; + + for (let operation of ops) { + tasks.push(yield* spawn(() => operation)); + } + let results = []; + for (let task of tasks) { + results.push(yield* task); + } + return results as All; + }) + ); } /** diff --git a/lib/call.ts b/lib/call.ts index 86e0662aa..371553ffe 100644 --- a/lib/call.ts +++ b/lib/call.ts @@ -1,6 +1,6 @@ -import type { Instruction, Operation } from "./types.ts"; -import { action } from "./instructions.ts"; -import { pause } from "./pause.ts"; +import { constant } from "./constant.ts"; +import { action } from "./action.ts"; +import { Operation } from "./types.ts"; /** * A uniform integration type representing anything that can be evaluated @@ -24,30 +24,21 @@ import { pause } from "./pause.ts"; * await run(() => hello(function*() { return "world!" })); "hello world!"; * ``` */ -export type Callable = - | Operation - | Promise - | (() => Operation) - | (() => Promise) - | (() => T); +export interface Callable< + T extends Operation | Promise | unknown, + TArgs extends unknown[] = [], +> { + (...args: TArgs): T; +} /** - * Pause the current operation, then runs a promise, async function, plain function, - * or operation within a new scope. The calling operation will be resumed (or errored) - * once call is completed. + * Pause the current operation, async function, plain function, or operation function. + * The calling operation will be resumed (or errored) once call is completed. * * `call()` is a uniform integration point for calling async functions, - * evaluating promises, generator functions, operations, and plain - * functions. - * - * It can be used to treat a promise as an operation: - * - * @example - * ```javascript - * let response = yield* call(fetch('https://google.com')); - * ``` + * generator functions, and plain functions. * - * or an async function: + * To call an async function: * * @example * ```typescript @@ -59,128 +50,52 @@ export type Callable = * } * ``` * - * It can be used to run an operation in a separate scope to ensure that any - * resources allocated will be cleaned up: - * - * @example - * ```javascript - * yield* call(function*() { - * let socket = yield* useSocket(); - * return yield* socket.read(); - * }); // => socket is destroyed before returning - * ``` - * - * It can be used to run a plain function: + * or a plain function: * * @example * ```javascript * yield* call(() => "a string"); * ``` - * - * Because `call()` runs within its own {@link Scope}, it can also be used to - * establish [error boundaries](https://frontside.com/effection/docs/errors). - * - * @example - * ```javascript - * function* myop() { - * let task = yield* spawn(function*() { - * throw new Error("boom!"); - * }); - * yield* task; - * } - * - * function* runner() { - * try { - * yield* myop(); - * } catch (err) { - * // this will never get hit! - * } - * } - * - * function* runner() { - * try { - * yield* call(myop); - * } catch(err) { - * // properly catches `spawn` errors! - * } - * } - * ``` - * - * @param callable the operation, promise, async function, generator funnction, or plain function to call as part of this operation + * @param callable the operation, promise, async function, generator funnction, + * or plain function to call as part of this operation */ -export function call(callable: () => Operation): Operation; -export function call(callable: () => Promise): Operation; -export function call(callable: () => T): Operation; -export function call(callable: Operation): Operation; -export function call(callable: Promise): Operation; -export function call(callable: Callable): Operation { - return action(function* (resolve, reject) { - try { - if (typeof callable === "function") { - let fn = callable as () => Operation | Promise | T; - resolve(yield* toop(fn())); - } else { - resolve(yield* toop(callable)); - } - } catch (error) { - reject(error); - } - }); -} -function toop( - op: Operation | Promise | T, +export function call( + callable: Callable, + ...args: TArgs ): Operation { - if (isPromise(op)) { - return expect(op); - } else if (isIterable(op)) { - let iter = op[Symbol.iterator](); - if (isInstructionIterator(iter)) { - // operation - return op; - } else { - // We are assuming that if an iterator does *not* have `.throw` then - // it must be a built-in iterator and we should return the value as-is. - return bare(op as T); - } - } else { - return bare(op as T); - } -} - -function bare(val: T): Operation { return { [Symbol.iterator]() { - return { next: () => ({ done: true, value: val }) }; + let target = callable.call(void (0), ...args); + if ( + typeof target === "string" || Array.isArray(target) || + target instanceof Map || target instanceof Set + ) { + return constant(target)[Symbol.iterator](); + } else if (isPromise(target)) { + return action(function wait(resolve, reject) { + target.then(resolve, reject); + return () => {}; + }, `async call ${callable.name}()`)[Symbol.iterator](); + } else if (isOperation(target)) { + return target[Symbol.iterator](); + } else { + return constant(target)[Symbol.iterator](); + } }, }; } +1; -function expect(promise: Promise): Operation { - return pause((resolve, reject) => { - promise.then(resolve, reject); - return () => {}; - }); -} - -function isFunc(f: unknown): f is (...args: unknown[]) => unknown { - return typeof f === "function"; -} - -function isPromise(p: unknown): p is Promise { - if (!p) return false; - return isFunc((p as Promise).then); -} - -// iterator must implement both `.next` and `.throw` -// built-in iterators are not considered iterators to `call()` -function isInstructionIterator(it: unknown): it is Iterator { - if (!it) return false; - return isFunc((it as Iterator).next) && - isFunc((it as Iterator).throw); +function isPromise( + target: Operation | Promise | T, +): target is Promise { + return target && typeof (target as Promise).then === "function"; } -function isIterable(it: unknown): it is Iterable { - if (!it) return false; - return typeof (it as Iterable)[Symbol.iterator] === "function"; +function isOperation( + target: Operation | Promise | T, +): target is Operation { + return target && + typeof (target as Operation)[Symbol.iterator] === "function"; } diff --git a/lib/callcc.ts b/lib/callcc.ts new file mode 100644 index 000000000..881e8fab5 --- /dev/null +++ b/lib/callcc.ts @@ -0,0 +1,25 @@ +import { lift } from "./lift.ts"; +import { Err, Ok, Result, unbox } from "./result.ts"; +import { Operation } from "./types.ts"; +import { withResolvers } from "./with-resolvers.ts"; +import { spawn } from "./spawn.ts"; +import { encapsulate } from "./task.ts"; + +export function* callcc( + op: ( + resolve: (value: T) => Operation, + reject: (error: Error) => Operation, + ) => Operation, +): Operation { + let result = withResolvers>(); + + let resolve = lift((value: T) => result.resolve(Ok(value))); + + let reject = lift((error: Error) => result.resolve(Err(error))); + + return yield* encapsulate(function* () { + yield* spawn(() => op(resolve, reject)); + + return unbox(yield* result.operation); + }); +} diff --git a/lib/constant.ts b/lib/constant.ts new file mode 100644 index 000000000..bace3fed7 --- /dev/null +++ b/lib/constant.ts @@ -0,0 +1,7 @@ +import type { Operation } from "./types.ts"; + +export function constant(value: T): Operation { + return { + [Symbol.iterator]: () => ({ next: () => ({ done: true, value }) }), + }; +} diff --git a/lib/context.ts b/lib/context.ts index 39ac0612e..c457ab58f 100644 --- a/lib/context.ts +++ b/lib/context.ts @@ -1,31 +1,61 @@ -import type { Context } from "./types.ts"; -import { create } from "./run/create.ts"; -import { useScope } from "./run/scope.ts"; +import { Context, Effect, Operation, Scope } from "./types.ts"; +import { Ok } from "./result.ts"; +import { useScope } from "./scope.ts"; -export function createContext(key: string, defaultValue?: T): Context { - let context: Context = create>(`Context`, { key }, { +export function createContext(name: string, defaultValue?: T): Context { + let context: Context = { + name, defaultValue, - *get() { - let scope = yield* useScope(); - return scope.get(context); + *get(): Operation { + return (yield Get(context)) as T | undefined; }, - *set(value: T) { - let scope = yield* useScope(); - return scope.set(context, value); + *set(value: T): Operation { + return (yield Set(context, value)) as T; + }, + *expect(): Operation { + return (yield Expect(context)) as T; + }, + *delete(): Operation { + return (yield Delete(context)) as boolean; }, - *[Symbol.iterator]() { - let value = yield* context.get(); - if (typeof value === "undefined") { - throw new MissingContextError(`missing required context: '${key}'`); - } else { - return value; + *with(value: T, operation: (value: T) => Operation): Operation { + let scope = yield* useScope(); + let original = scope.hasOwn(context) ? scope.get(context) : undefined; + try { + return yield* operation(scope.set(context, value)); + } finally { + if (typeof original === "undefined") { + scope.delete(context); + } else { + scope.set(context, original); + } } }, - }); + }; return context; } -class MissingContextError extends Error { - name = "MissingContextError"; +const Get = (context: Context) => + UseScope((scope) => scope.get(context), `get(${context.name})`); +const Set = (context: Context, value: T) => + UseScope( + (scope) => scope.set(context, value), + `set(${context.name}, ${value})`, + ); +const Expect = (context: Context) => + UseScope((scope) => scope.expect(context), `expect(${context.name})`); +const Delete = (context: Context) => + UseScope((scope) => scope.expect(context), `delete(${context.name})`); + +function UseScope(fn: (scope: Scope) => T, description: string): Effect { + return { + description, + enter: (resolve, { scope }) => { + resolve(Ok(fn(scope))); + return (resolve) => { + resolve(Ok()); + }; + }, + }; } diff --git a/lib/contexts.ts b/lib/contexts.ts new file mode 100644 index 000000000..fdc3d3c5f --- /dev/null +++ b/lib/contexts.ts @@ -0,0 +1,15 @@ +import { createContext } from "./context.ts"; +import { Coroutine, Scope } from "./types.ts"; + +export const Routine = createContext>( + "@effection/coroutine", +); + +export const Generation = createContext( + "@effection/scope.generation", + 0, +); + +export const Children = createContext>( + "@effection/scope.children", +); diff --git a/lib/coroutine.ts b/lib/coroutine.ts new file mode 100644 index 000000000..9a2634927 --- /dev/null +++ b/lib/coroutine.ts @@ -0,0 +1,91 @@ +import { Generation } from "./contexts.ts"; +import { ReducerContext } from "./reducer.ts"; +import { Ok } from "./result.ts"; +import { Coroutine, Operation, Scope, Subscriber } from "./types.ts"; + +export interface CoroutineOptions { + scope: Scope; + operation(): Operation; +} + +export function createCoroutine( + { operation, scope }: CoroutineOptions, +): Coroutine { + let subscribers = new Set>(); + + let send: Subscriber = (item) => { + try { + for (let subscriber of subscribers) { + subscriber(item); + } + } finally { + if (item.done) { + subscribers.clear(); + } + } + }; + + let reducer = scope.expect(ReducerContext); + + let iterator: Coroutine["data"]["iterator"] | undefined = undefined; + + let routine = { + scope, + data: { + get iterator() { + if (!iterator) { + iterator = operation()[Symbol.iterator](); + } + return iterator; + }, + discard: (resolve) => resolve(Ok()), + }, + next(result, subscriber) { + if (subscriber) { + subscribers.add(subscriber); + } + routine.data.discard((exit) => { + routine.data.discard = (resolve) => resolve(Ok()); + reducer.reduce([ + scope.expect(Generation), + routine, + exit.ok ? result : exit, + send as Subscriber, + "next", + ]); + }); + + return () => subscriber && subscribers.delete(subscriber); + }, + return(result, subscriber?: Subscriber) { + if (subscriber) { + subscribers.add(subscriber as Subscriber); + } + routine.data.discard((exit) => { + routine.data.discard = (resolve) => resolve(Ok()); + reducer.reduce([ + scope.expect(Generation), + routine, + exit.ok ? result : exit, + send as Subscriber, + "return", + ]); + }); + + return () => + subscriber && subscribers.delete(subscriber as Subscriber); + }, + } as Coroutine; + + return routine; +} + +export function* useCoroutine(): Operation { + return (yield { + description: "useCoroutine()", + enter: (resolve, routine) => { + resolve(Ok(routine)); + return (uninstalled) => uninstalled(Ok()); + }, + }) as Coroutine; +} diff --git a/lib/deps.ts b/lib/deps.ts deleted file mode 100644 index d9aa97de8..000000000 --- a/lib/deps.ts +++ /dev/null @@ -1,2 +0,0 @@ -export { assert } from "https://deno.land/std@0.158.0/testing/asserts.ts"; -export * from "https://deno.land/x/continuation@0.1.5/mod.ts"; diff --git a/lib/drain.ts b/lib/drain.ts new file mode 100644 index 000000000..2f0be08e1 --- /dev/null +++ b/lib/drain.ts @@ -0,0 +1,10 @@ +import { Result } from "./result.ts"; +import { Subscriber } from "./types.ts"; + +export function drain(end: (result: Result) => void): Subscriber { + return (next) => { + if (next.done) { + end(next.value); + } + }; +} diff --git a/lib/each.ts b/lib/each.ts index cde01436d..a5127ec7c 100644 --- a/lib/each.ts +++ b/lib/each.ts @@ -1,5 +1,9 @@ import type { Operation, Stream, Subscription } from "./types.ts"; import { createContext } from "./context.ts"; +import { race } from "./race.ts"; +import { resource } from "./resource.ts"; +import { withResolvers } from "./with-resolvers.ts"; +import { useScope } from "./scope.ts"; /** * Consume an effection stream using a simple for-of loop. @@ -28,39 +32,56 @@ import { createContext } from "./context.ts"; export function each(stream: Stream): Operation> { return { *[Symbol.iterator]() { - let subscription = yield* stream; - let current = yield* subscription.next(); - let stack = yield* EachStack.get(); - if (!stack) { - stack = yield* EachStack.set([]); + let scope = yield* useScope(); + if (!scope.hasOwn(EachStack)) { + scope.set(EachStack, []); } + return yield* resource(function* (provide) { + let done = withResolvers(); - let context: EachLoop = { subscription, current }; + let subscription = yield* stream; + let current = yield* subscription.next(); - stack.push(context); + let stack = scope.expect(EachStack); - let iterator: Iterator = { - next() { - if (context.stale) { - let error = new Error( - `for each loop did not use each.next() operation before continuing`, - ); - error.name = "IterationError"; - throw error; - } else { - context.stale = true; - return context.current; - } - }, - return() { - stack!.pop(); - return { done: true, value: void 0 }; - }, - }; + let context: EachLoop = { + subscription, + current, + finish() { + context.finish = () => {}; + stack.pop(); + done.resolve(); + }, + }; - return { - [Symbol.iterator]: () => iterator, - }; + stack.push(context); + + let iterator: Iterator = { + next() { + if (context.stale) { + let error = new Error( + `for each loop did not use each.next() operation before continuing`, + ); + error.name = "IterationError"; + throw error; + } else { + context.stale = true; + return context.current; + } + }, + return() { + context.finish(); + return { done: true, value: void 0 }; + }, + }; + + yield* race([ + done.operation, + provide({ + [Symbol.iterator]: () => iterator, + }), + ]); + }); }, }; } @@ -69,7 +90,7 @@ each.next = function next() { return { name: "each.next()", *[Symbol.iterator]() { - let stack = yield* EachStack; + let stack = yield* EachStack.expect(); let context = stack[stack.length - 1]; if (!context) { let error = new Error(`cannot call next() outside of an iteration`); @@ -80,7 +101,7 @@ each.next = function next() { delete context.stale; context.current = current; if (current.done) { - stack.pop(); + context.finish(); } }, } as Operation; @@ -89,6 +110,7 @@ each.next = function next() { interface EachLoop { subscription: Subscription; current: IteratorResult; + finish: () => void; stale?: true; } diff --git a/lib/ensure.ts b/lib/ensure.ts index b10266102..4d331adbc 100644 --- a/lib/ensure.ts +++ b/lib/ensure.ts @@ -1,5 +1,5 @@ import { Operation } from "./types.ts"; -import { resource } from "./instructions.ts"; +import { resource } from "./resource.ts"; /** * Run the given function or operation when the current operation diff --git a/lib/events.ts b/lib/events.ts index 42ae18aec..b4e05f931 100644 --- a/lib/events.ts +++ b/lib/events.ts @@ -1,6 +1,6 @@ // deno-lint-ignore-file no-explicit-any ban-types import { createSignal } from "./signal.ts"; -import { resource } from "./instructions.ts"; +import { resource } from "./resource.ts"; import type { Operation, Stream, Subscription } from "./types.ts"; type FN = (...any: any[]) => any; diff --git a/lib/instructions.ts b/lib/instructions.ts deleted file mode 100644 index 043d34f5e..000000000 --- a/lib/instructions.ts +++ /dev/null @@ -1,303 +0,0 @@ -import type { - Frame, - Instruction, - Operation, - Provide, - Reject, - Resolve, - Result, - Task, -} from "./types.ts"; - -import { reset, shift } from "./deps.ts"; -import { shiftSync } from "./shift-sync.ts"; -import { Err, Ok } from "./result.ts"; - -/** - * Indefinitely pause execution of the current operation. It is typically - * used in conjunction with an {@link action} to mark the boundary - * between setup and teardown. - * - * ```js - * function onEvent(listener, name) { - * return action(function* (resolve) { - * try { - * listener.addEventListener(name, resolve); - * yield* suspend(); - * } finally { - * listener.removeEventListener(name, resolve); - * } - * }); - * } - * ``` - * - * An operation will remain suspended until its enclosing scope is destroyed, - * at which point it proceeds as though return had been called from the point - * of suspension. Once an operation suspends once, further suspend operations - * are ignored. - * - * @returns an operation that suspends the current operation - */ -export function suspend(): Operation { - return instruction(Suspend); -} - -function Suspend(frame: Frame) { - return shiftSync>((k) => { - if (frame.aborted) { - k.tail(Ok(void 0)); - } - }); -} - -/** - * Create an {@link Operation} that can be either resolved (or rejected) with - * a synchronous callback. This is the Effection equivalent of `new Promise()`. - * - * The action body is itself an operation that runs in a new scope that is - * destroyed completely before program execution returns to the point where the - * action was yielded to. - * - * For example: - * - * ```js - * let five = yield* action(function*(resolve, reject) { - * setTimeout(() => { - * if (Math.random() > 5) { - * resolve(5) - * } else { - * reject(new Error("bad luck!")); - * } - * }, 1000); - * }); - * - * ``` - * - * However, it is customary to explicitly {@link suspend} inside the body of the - * action so that whenever the action resolves, appropriate cleanup code can - * run. The preceeding example would be more correctly written as: - * - * ```js - * let five = yield* action(function*(resolve) { - * let timeoutId = setTimeout(() => { - * if (Math.random() > 5) { - * resolve(5) - * } else { - * reject(new Error("bad luck!")); - * } - * }, 1000); - * try { - * yield* suspend(); - * } finally { - * clearTimout(timeoutId); - * } - * }); - * ``` - * - * @typeParam T - type of the action's result. - * @param operation - body of the action - * @returns an operation producing the resolved value, or throwing the rejected error - */ -export function action( - operation: (resolve: Resolve, reject: Reject) => Operation, -): Operation { - return instruction(function Action(frame) { - return shift>(function* (k) { - let settle = yield* reset>>(function* () { - let result = yield* shiftSync>((k) => k.tail); - - let destruction = yield* child.destroy(); - - if (!destruction.ok) { - k.tail(destruction); - } else { - k.tail(result); - } - }); - - let resolve: Resolve = (value) => settle(Ok(value)); - let reject: Reject = (error) => settle(Err(error)); - - let child = frame.createChild(function* () { - yield* operation(resolve, reject); - yield* suspend(); - }); - - yield* reset(function* () { - let result = yield* child; - if (!result.ok) { - k.tail(result); - } - }); - - child.enter(); - }); - }); -} - -/** - * Run another operation concurrently as a child of the current one. - * - * The spawned operation will begin executing immediately and control will - * return to the caller when it reaches its first suspend point. - * - * ### Example - * - * ```typescript - * import { main, sleep, suspend, spawn } from 'effection'; - * - * await main(function*() { - * yield* spawn(function*() { - * yield* sleep(1000); - * console.log("hello"); - * }); - * yield* spawn(function*() { - * yield* sleep(2000); - * console.log("world"); - * }); - * yield* suspend(); - * }); - * ``` - * - * You should prefer using the spawn operation over calling - * {@link Scope.run} from within Effection code. The reason being that a - * synchronous failure in the spawned operation will not be caught - * until the next yield point when using `run`, which results in lines - * being executed that should not. - * - * ### Example - * - * ```typescript - * import { main, suspend, spawn, useScope } from 'effection'; - * - * await main(function*() { - * yield* useScope(); - * - * scope.run(function*() { - * throw new Error('boom!'); - * }); - * - * console.log('this code will run and probably should not'); - * - * yield* suspend(); // <- error is thrown after this. - * }); - * ``` - * @param operation the operation to run as a child of the current task - * @typeParam T the type that the spawned task evaluates to - * @returns a {@link Task} representing a handle to the running operation - */ -export function spawn(operation: () => Operation): Operation> { - return instruction(function Spawn(frame) { - return shift>>(function (k) { - let child = frame.createChild(operation); - - child.enter(); - - k.tail(Ok(child.getTask())); - - return reset(function* () { - let result = yield* child; - if (!result.ok) { - yield* frame.crash(result.error); - } - }); - }); - }); -} - -/** - * Define an Effection [resource](https://frontside.com/effection/docs/resources) - * - * Resources are a type of operation that passes a value back to its caller - * while still allowing that operation to run in the background. It does this - * by invoking the special `provide()` operation. The caller pauses until the - * resource operation invokes `provide()` at which point the caller resumes with - * passed value. - * - * `provide()` suspends the resource operation until the caller passes out of - * scope. - * - * @example - * ```javascript - * function useWebSocket(url) { - * return resource(function*(provide) { - * let socket = new WebSocket(url); - * yield* once(socket, 'open'); - * - * try { - * yield* provide(socket); - * } finally { - * socket.close(); - * yield* once(socket, 'close'); - * } - * }) - * } - * - * await main(function*() { - * let socket = yield* useWebSocket("wss://example.com"); - * socket.send("hello world"); - * }); - * ``` - * - * @param operation the operation defining the lifecycle of the resource - * @returns an operation yielding the resource - */ -export function resource( - operation: (provide: Provide) => Operation, -): Operation { - return instruction((frame) => - shift>(function (k) { - function provide(value: T) { - k.tail(Ok(value)); - return suspend(); - } - - let child = frame.createChild(() => operation(provide)); - - child.enter(); - - return reset(function* () { - let result = yield* child; - if (!result.ok) { - k.tail(result); - yield* frame.crash(result.error); - } - }); - }) - ); -} - -/** - * @ignore - */ -export function getframe(): Operation { - return instruction((frame) => - shiftSync>((k) => k.tail(Ok(frame))) - ); -} - -// An optimized iterator that yields the instruction on the first call -// to next, then returns its value on the second. Equivalent to: -// { -// *[Symbol.iterator]() { return yield instruction; } -// } -function instruction(i: Instruction): Operation { - return { - [Symbol.iterator]() { - let entered = false; - return { - next(value) { - if (!entered) { - entered = true; - return { done: false, value: i }; - } else { - return { done: true, value }; - } - }, - throw(error) { - throw error; - }, - }; - }, - }; -} diff --git a/lib/lazy-promise.ts b/lib/lazy-promise.ts new file mode 100644 index 000000000..6348ca57b --- /dev/null +++ b/lib/lazy-promise.ts @@ -0,0 +1,69 @@ +import { Err, Ok, Result } from "./result.ts"; + +export function lazyPromiseWithResolvers(): ReturnType> { + let result: Result | undefined = undefined; + + let settle = (outcome: Result) => { + if (!result) { + result = outcome; + } + }; + + let resolve = ((value: T) => settle(Ok(value))) as PromiseWithResolvers< + T + >["resolve"]; + let reject = (error: Error) => settle(Err(error)); + + let promise = lazyPromise((resolve, reject) => { + let record = (result: Result) => { + if (result.ok) { + resolve(result.value); + } else { + reject(result.error); + } + }; + + if (result) { + record(result); + } else { + settle = record; + } + }); + + return { promise, resolve, reject }; +} + +export function lazyPromise( + resolver: ( + resolve: (value: T) => void, + reject: (error: Error) => void, + ) => void, +): Promise { + let _promise: Promise | undefined = undefined; + + let reify = async () => { + if (!_promise) { + _promise = new Promise(resolver); + } + return await _promise; + }; + + let promise: Promise = Object.create(Promise.prototype, { + then: { + enumerable: false, + value: (...args: Parameters["then"]>) => reify().then(...args), + }, + catch: { + enumerable: false, + value: (...args: Parameters["catch"]>) => + reify().catch(...args), + }, + finally: { + enumerable: false, + value: (...args: Parameters["finally"]>) => + reify().finally(...args), + }, + }); + + return promise; +} diff --git a/lib/lazy.ts b/lib/lazy.ts deleted file mode 100644 index 355c1f4a2..000000000 --- a/lib/lazy.ts +++ /dev/null @@ -1,8 +0,0 @@ -export function lazy(create: () => T): () => T { - let thunk = () => { - let value = create(); - thunk = () => value; - return value; - }; - return () => thunk(); -} diff --git a/lib/lift.ts b/lib/lift.ts index 9bdab566a..19a51cc84 100644 --- a/lib/lift.ts +++ b/lib/lift.ts @@ -1,4 +1,4 @@ -import { shift } from "./deps.ts"; +import { action } from "./action.ts"; import { type Operation } from "./types.ts"; /** @@ -20,14 +20,13 @@ export function lift( fn: (...args: TArgs) => TReturn, ): (...args: TArgs) => Operation { return (...args: TArgs) => { - return ({ - *[Symbol.iterator]() { - return yield () => { - return shift(function* (k) { - k.tail({ ok: true, value: fn(...args) }); - }); - }; - }, + return action((resolve, reject) => { + try { + resolve(fn(...args)); + } catch (error) { + reject(error); + } + return () => {}; }); }; } diff --git a/lib/main.ts b/lib/main.ts index 68e366517..e7b84f1d6 100644 --- a/lib/main.ts +++ b/lib/main.ts @@ -1,7 +1,8 @@ import { createContext } from "./context.ts"; import { type Operation } from "./types.ts"; -import { action } from "./instructions.ts"; +import { callcc } from "./callcc.ts"; import { run } from "./run.ts"; +import { useScope } from "./scope.ts"; import process from "node:process"; /** @@ -18,8 +19,8 @@ import process from "node:process"; * @param returns an operation that exits the program */ export function* exit(status: number, message?: string): Operation { - let escape = yield* ExitContext; - escape({ status, message }); + let escape = yield* ExitContext.expect(); + yield* escape({ status, message }); } /** @@ -63,7 +64,7 @@ export async function main( let hardexit = (_status: number) => {}; let result = await run(() => - action(function* (resolve) { + callcc(function* (resolve) { // action will return shutdown immediately upon resolve, so stash // this function in the exit context so it can be called anywhere. yield* ExitContext.set(resolve); @@ -72,10 +73,14 @@ export async function main( // Node and Deno from exiting prematurely. let interval = setInterval(() => {}, Math.pow(2, 30)); + let scope = yield* useScope(); + try { let interrupt = { - SIGINT: () => resolve({ status: 130, signal: "SIGINT" }), - SIGTERM: () => resolve({ status: 143, signal: "SIGTERM" }), + SIGINT: () => + scope.run(() => resolve({ status: 130, signal: "SIGINT" })), + SIGTERM: () => + scope.run(() => resolve({ status: 143, signal: "SIGTERM" })), }; yield* withHost({ @@ -113,7 +118,7 @@ export async function main( yield* exit(0); } catch (error) { - resolve({ status: 1, error }); + yield* resolve({ status: 1, error }); } finally { clearInterval(interval); } @@ -135,7 +140,7 @@ export async function main( hardexit(result.status); } -const ExitContext = createContext<(exit: Exit) => void>("exit"); +const ExitContext = createContext<(exit: Exit) => Operation>("exit"); interface Exit { status: number; diff --git a/lib/maybe.ts b/lib/maybe.ts new file mode 100644 index 000000000..5a3f86e01 --- /dev/null +++ b/lib/maybe.ts @@ -0,0 +1,20 @@ +export type Maybe = { + readonly exists: false; +} | { + readonly exists: true; + readonly value: T; +}; + +export function Just(): Maybe; +export function Just(value: T): Maybe; +export function Just(value?: T): Maybe { + if (typeof value === "undefined") { + return { exists: true } as Maybe; + } else { + return { exists: true, value }; + } +} + +export function Nothing(): Maybe { + return { exists: false }; +} diff --git a/lib/mod.ts b/lib/mod.ts index 6f3b41899..9fa806ba9 100644 --- a/lib/mod.ts +++ b/lib/mod.ts @@ -1,19 +1,22 @@ export * from "./types.ts"; -export * from "./channel.ts"; +export * from "./action.ts"; export * from "./context.ts"; -export * from "./instructions.ts"; -export * from "./call.ts"; -export * from "./run.ts"; +export * from "./scope.ts"; +export * from "./suspend.ts"; export * from "./sleep.ts"; -export * from "./async.ts"; -export * from "./abort-signal.ts"; -export * from "./result.ts"; -export * from "./lift.ts"; -export * from "./events.ts"; -export * from "./main.ts"; +export * from "./run.ts"; +export * from "./spawn.ts"; +export * from "./resource.ts"; +export * from "./call.ts"; +export * from "./race.ts"; export * from "./all.ts"; -export * from "./each.ts"; +export * from "./lift.ts"; export * from "./queue.ts"; export * from "./signal.ts"; +export * from "./channel.ts"; +export * from "./each.ts"; export * from "./ensure.ts"; -export * from "./race.ts"; +export * from "./events.ts"; +export * from "./abort-signal.ts"; +export * from "./main.ts"; +export * from "./with-resolvers.ts"; diff --git a/lib/pause.ts b/lib/pause.ts deleted file mode 100644 index bebac56c4..000000000 --- a/lib/pause.ts +++ /dev/null @@ -1,22 +0,0 @@ -import type { Operation, Reject, Resolve, Result } from "./types.ts"; -import { Err, Ok } from "./result.ts"; -import { shift } from "./deps.ts"; - -export function* pause( - install: (resolve: Resolve, reject: Reject) => Resolve, -): Operation { - let uninstall = () => {}; - try { - return yield function pause_i() { - return shift>(function* (k) { - let resolve = (value: T) => k.tail(Ok(value)); - let reject = (error: Error) => k.tail(Err(error)); - uninstall = install(resolve, reject); - }); - }; - } finally { - if (uninstall) { - uninstall(); - } - } -} diff --git a/lib/queue.ts b/lib/queue.ts index d4989c6fb..152044652 100644 --- a/lib/queue.ts +++ b/lib/queue.ts @@ -1,5 +1,5 @@ import type { Resolve, Subscription } from "./types.ts"; -import { pause } from "./pause.ts"; +import { action } from "./action.ts"; /** * A FIFO queue which can be used to implement the {@link Subscription} @@ -74,7 +74,7 @@ export function createQueue(): Queue { if (item) { return item; } else { - return yield* pause((resolve) => { + return yield* action((resolve) => { consumers.add(resolve); return () => consumers.delete(resolve); }); diff --git a/lib/race.ts b/lib/race.ts index d4e835634..564a93e5a 100644 --- a/lib/race.ts +++ b/lib/race.ts @@ -1,5 +1,10 @@ -import type { Operation, Yielded } from "./types.ts"; -import { action, spawn } from "./instructions.ts"; +import { spawn } from "./spawn.ts"; +import { encapsulate, trap } from "./task.ts"; +import type { Operation, Task, Yielded } from "./types.ts"; +import { withResolvers } from "./with-resolvers.ts"; +import { Err, Ok, Result } from "./result.ts"; +//import { useScope } from "./scope.ts"; +//import { transfer } from "./scope.ts"; /** * Race the given operations against each other and return the value of @@ -24,18 +29,51 @@ import { action, spawn } from "./instructions.ts"; * @param operations a list of operations to race against each other * @returns the value of the fastest operation */ -export function race>( + +export function* race>( operations: readonly T[], ): Operation> { - return action>(function* (resolve, reject) { - for (let operation of operations) { - yield* spawn(function* () { - try { - resolve((yield* operation) as Yielded); - } catch (error) { - reject(error); - } - }); - } - }); + // let caller = yield* useScope(); + let winner = withResolvers>>("await winner"); + + let tasks: Task[] = []; + + // encapsulate the race in a hermetic scope. + let result = yield* trap(() => + encapsulate(function* () { + for (let operation of operations.toReversed()) { + tasks.push( + yield* spawn(function* () { + // let contestant = yield* useScope(); + try { + let value = yield* operation; + + // Transfer the winner to the contestant + // transfer({ from: contestant, to: caller }); + winner.resolve(Ok(value as Yielded)); + } catch (error) { + winner.resolve(Err(error)); + } + }), + ); + } + return yield* winner.operation; + }) + ); + + let shutdown: Task[] = []; + + for (let task of tasks) { + shutdown.push(yield* spawn(task.halt)); + } + + for (let task of shutdown) { + yield* task; + } + + if (result.ok) { + return result.value; + } else { + throw result.error; + } } diff --git a/lib/reducer.ts b/lib/reducer.ts new file mode 100644 index 000000000..f29e16b5b --- /dev/null +++ b/lib/reducer.ts @@ -0,0 +1,102 @@ +import { createContext } from "./context.ts"; +import { Err, Ok, Result } from "./result.ts"; +import { Coroutine, Subscriber } from "./types.ts"; + +export class Reducer { + reducing = false; + readonly queue = createPriorityQueue(); + + reduce = ( + thunk: Thunk, + ) => { + let { queue } = this; + + queue.enqueue(thunk); + + if (this.reducing) return; + + try { + this.reducing = true; + + let item = queue.dequeue(); + while (item) { + let [, routine, result, notify, method = "next" as const] = item; + try { + notify({ done: false, value: result }); + const iterator = routine.data.iterator; + if (result.ok) { + if (method === "next") { + let next = iterator.next(result.value); + if (next.done) { + notify({ done: true, value: Ok(next.value) }); + } else { + let action = next.value; + routine.data.discard = action.enter(routine.next, routine); + } + } else if (iterator.return) { + let next = iterator.return(result.value); + if (next.done) { + notify({ done: true, value: Ok(result) }); + } else { + let action = next.value; + routine.data.discard = action.enter(routine.next, routine); + } + } else { + notify({ done: true, value: result }); + } + } else if (iterator.throw) { + let next = iterator.throw(result.error); + if (next.done) { + notify({ done: true, value: Ok(next.value) }); + } else { + let action = next.value; + routine.data.discard = action.enter(routine.next, routine); + } + } else { + throw result.error; + } + } catch (error) { + notify({ done: true, value: Err(error) }); + } + item = queue.dequeue(); + } + } finally { + this.reducing = false; + } + }; +} + +export const ReducerContext = createContext( + "@effection/reducer", + new Reducer(), +); + +type Thunk = [ + number, + Coroutine, + Result, + Subscriber, + "return" | "next", +]; + +// This is a pretty hokey priority queue that uses an array for storage +// so enqueue is O(n). However, `n` is generally small. revisit. +function createPriorityQueue() { + let thunks: Thunk[] = []; + + return { + enqueue(thunk: Thunk): void { + let [priority] = thunk; + let index = thunks.findIndex(([p]) => p >= priority); + if (index === -1) { + thunks.push(thunk); + } else { + thunks.splice(index, 0, thunk); + } + }, + + dequeue(): Thunk | undefined { + return thunks.shift(); + }, + }; +} diff --git a/lib/resource.ts b/lib/resource.ts new file mode 100644 index 000000000..562272c08 --- /dev/null +++ b/lib/resource.ts @@ -0,0 +1,35 @@ +import { suspend } from "./suspend.ts"; +import { spawn } from "./spawn.ts"; +import { Operation } from "./types.ts"; +import { trap } from "./task.ts"; +import { Ok } from "./result.ts"; +import { useCoroutine } from "./coroutine.ts"; + +export interface Provide { + (value: T): Operation; +} + +export function resource( + op: (provide: Provide) => Operation, +): Operation { + return { + *[Symbol.iterator]() { + let caller = yield* useCoroutine(); + + function* provide(value: T): Operation { + caller.next(Ok(value)); + yield* suspend(); + } + + // establishing a control boundary lets us catch errors in + // resource initializer + return yield* trap(function* () { + yield* spawn(() => op(provide)); + return (yield { + description: "await resource", + enter: () => (uninstalled) => uninstalled(Ok()), + }) as T; + }); + }, + }; +} diff --git a/lib/result.ts b/lib/result.ts index 77750ae8b..8649a2468 100644 --- a/lib/result.ts +++ b/lib/result.ts @@ -1,11 +1,38 @@ -import type { Result } from "./types.ts"; +/** + * @ignore + */ +export type Result = { + readonly ok: true; + value: T; +} | { + readonly ok: false; + error: Error; +}; /** * @ignore */ -export const Ok = (value: T): Result => ({ ok: true, value }); +export function Ok(): Result; +export function Ok(value: T): Result; +export function Ok(value?: T): Result { + if (typeof value === "undefined") { + return { ok: true } as Result; + } + return ({ ok: true, value }); +} /** * @ignore */ export const Err = (error: Error): Result => ({ ok: false, error }); + +/** + * @ignore + */ +export function unbox(result: Result): T { + if (result.ok) { + return result.value; + } else { + throw result.error; + } +} diff --git a/lib/run.ts b/lib/run.ts index 23a91a6f5..8db08c6a4 100644 --- a/lib/run.ts +++ b/lib/run.ts @@ -1,6 +1,6 @@ -import type { Operation, Task } from "./types.ts"; -import { createFrame } from "./run/frame.ts"; -export * from "./run/scope.ts"; +import { Operation, Task } from "./types.ts"; + +import { global } from "./scope.ts"; /** * Execute an operation. @@ -29,7 +29,5 @@ export * from "./run/scope.ts"; * @returns a task representing the running operation. */ export function run(operation: () => Operation): Task { - let frame = createFrame({ operation }); - frame.enter(); - return frame.getTask(); + return global.run(operation); } diff --git a/lib/run/create.ts b/lib/run/create.ts deleted file mode 100644 index 6cc36f11c..000000000 --- a/lib/run/create.ts +++ /dev/null @@ -1,14 +0,0 @@ -export function create( - tag: string, - attrs: Partial, - prototype: Partial, -): T { - let properties: Record = {}; - for (let [key, value] of Object.entries(attrs)) { - properties[key] = { enumerable: true, value }; - } - return Object.create({ - ...prototype, - [Symbol.toStringTag]: tag, - }, properties); -} diff --git a/lib/run/frame.ts b/lib/run/frame.ts deleted file mode 100644 index 0e15967e6..000000000 --- a/lib/run/frame.ts +++ /dev/null @@ -1,204 +0,0 @@ -import type { Frame, Instruction, Operation, Result } from "../types.ts"; - -import { evaluate, shift } from "../deps.ts"; -import { shiftSync } from "../shift-sync.ts"; -import { lazy } from "../lazy.ts"; -import { Err, Ok } from "../result.ts"; - -import type { Exit, FrameResult } from "./types.ts"; -import { createValue } from "./value.ts"; -import { createTask } from "./task.ts"; -import { create } from "./create.ts"; - -let ids = 0; - -export interface FrameOptions { - operation(): Operation; - parent?: Frame["context"]; -} - -export function createFrame(options: FrameOptions): Frame { - return evaluate>(function* () { - let { operation, parent } = options; - let children = new Set(); - let context = Object.create(parent ?? {}); - let thunks: IteratorResult>[] = [{ - done: false, - value: $next(void 0), - }]; - - let crash: Error | undefined = void 0; - - let interrupt = () => {}; - - let [setResults, results] = yield* createValue>(); - - let frame = yield* shiftSync>((k) => { - let self: Frame = create>("Frame", { id: ids++, context }, { - createChild(operation: () => Operation) { - let child = createFrame({ operation, parent: self.context }); - children.add(child); - evaluate(function* () { - yield* child; - children.delete(child); - }); - return child; - }, - getTask() { - let task = createTask(self); - self.getTask = () => task; - return task; - }, - enter() { - k.tail(self); - }, - crash(error: Error) { - abort(error); - return results; - }, - destroy() { - abort(); - return results; - }, - [Symbol.iterator]: results[Symbol.iterator], - }); - - let abort = (reason?: Error) => { - if (!self.aborted) { - self.aborted = true; - crash = reason; - thunks.unshift({ done: false, value: $abort() }); - interrupt(); - } - }; - - return self; - }); - - let iterator = lazy(() => operation()[Symbol.iterator]()); - - let thunk = thunks.pop()!; - - while (!thunk.done) { - let getNext = thunk.value; - try { - let next: IteratorResult = getNext(iterator()); - - if (next.done) { - thunks.unshift({ done: true, value: Ok(next.value) }); - } else { - let instruction = next.value; - - let outcome = yield* shift(function* (k) { - interrupt = () => k.tail({ type: "interrupted" }); - - try { - k.tail({ - type: "settled", - result: yield* instruction(frame), - }); - } catch (error) { - k.tail({ type: "settled", result: Err(error) }); - } - }); - - if (outcome.type === "settled") { - if (outcome.result.ok) { - thunks.unshift({ - done: false, - value: $next(outcome.result.value), - }); - } else { - thunks.unshift({ - done: false, - value: $throw(outcome.result.error), - }); - } - } - } - } catch (error) { - thunks.unshift({ done: true, value: Err(error) }); - } - thunk = thunks.pop()!; - } - - frame.exited = true; - - let result = thunk.value; - - let exit: Exit; - - if (!result.ok) { - exit = { type: "result", result }; - } else if (crash) { - exit = { type: "crashed", error: crash }; - } else if (frame.aborted) { - exit = { type: "aborted" }; - } else { - exit = { type: "result", result }; - } - - let destruction = Ok(void 0); - - while (children.size !== 0) { - for (let child of [...children].reverse()) { - let teardown = yield* child.destroy(); - if (!teardown.ok) { - destruction = teardown; - } - } - } - - if (!destruction.ok) { - setResults({ ok: false, error: destruction.error, exit, destruction }); - } else { - if (exit.type === "aborted") { - setResults({ ok: true, value: void 0, exit, destruction }); - } else if (exit.type === "result") { - let { result } = exit; - if (result.ok) { - setResults({ ok: true, value: void 0, exit, destruction }); - } else { - setResults({ ok: false, error: result.error, exit, destruction }); - } - } else { - setResults({ ok: false, error: exit.error, exit, destruction }); - } - } - }); -} - -type Thunk = ReturnType; - -// deno-lint-ignore no-explicit-any -const $next = (value: any) => - function $next(i: Iterator) { - return i.next(value); - }; - -const $throw = (error: Error) => - function $throw(i: Iterator) { - if (i.throw) { - return i.throw(error); - } else { - throw error; - } - }; - -const $abort = (value?: unknown) => - function $abort(i: Iterator) { - if (i.return) { - return i.return(value as unknown as T); - } else { - return { done: true, value } as IteratorResult; - } - }; - -type InstructionResult = - | { - type: "settled"; - result: Result; - } - | { - type: "interrupted"; - }; diff --git a/lib/run/scope.ts b/lib/run/scope.ts deleted file mode 100644 index ffb230563..000000000 --- a/lib/run/scope.ts +++ /dev/null @@ -1,78 +0,0 @@ -import type { Context, Frame, Future, Operation, Scope } from "../types.ts"; -import { evaluate } from "../deps.ts"; -import { create } from "./create.ts"; -import { createFrame } from "./frame.ts"; -import { getframe, suspend } from "../instructions.ts"; - -/** - * Get the scope of the currently running {@link Operation}. - * - * @returns an operation yielding the current scope - */ -export function* useScope(): Operation { - let frame = yield* getframe(); - let [scope] = createScope(frame); - return scope; -} - -/** - * Create a new scope to serve as an entry point from normal - * JavaScript execution into Effection. - * - * When creating a fresh scope (as opposed to capturing a reference to - * an existing one via {@link useScope}) it is the responsibility of - * the creator of the new scope to destroy it when it is no longer needed. - * - * @example - * ```javascript - * let [scope, destroy] = createScope(); - * let task = scope.run(function*() { - * //do some long running work - * }); - * - * //... later - * await destroy(); - * - * ``` - * @returns a tuple containing the new scope, and a function to destroy it. - */ -export function createScope(frame?: Frame): [Scope, () => Future] { - let parent = frame ?? createFrame({ operation: suspend }); - - let scope = create("Scope", {}, { - run(operation: () => Operation) { - if (parent.exited) { - let error = new Error( - `cannot call run() on a scope that has already been exited`, - ); - error.name = "InactiveScopeError"; - throw error; - } - - let frame = parent.createChild(operation); - frame.enter(); - - evaluate(function* () { - let result = yield* frame; - if (!result.ok) { - yield* parent.crash(result.error); - } - }); - - return frame.getTask(); - }, - get(context: Context) { - let { key, defaultValue } = context; - return (parent.context[key] ?? defaultValue) as T | undefined; - }, - set(context: Context, value: T) { - let { key } = context; - parent.context[key] = value; - return value; - }, - }); - - parent.enter(); - - return [scope, parent.getTask().halt]; -} diff --git a/lib/run/task.ts b/lib/run/task.ts deleted file mode 100644 index 46d4ff7b4..000000000 --- a/lib/run/task.ts +++ /dev/null @@ -1,108 +0,0 @@ -import type { Frame, Future, Reject, Resolve, Result, Task } from "../types.ts"; - -import { evaluate } from "../deps.ts"; -import { Err } from "../result.ts"; -import { action } from "../instructions.ts"; - -import type { FrameResult } from "./types.ts"; -import { create } from "./create.ts"; - -export function createTask( - frame: Frame, -): Task { - let promise: Promise; - - let awaitResult = (resolve: Resolve, reject: Reject) => { - evaluate(function* () { - let result = getResult(yield* frame); - - if (result.ok) { - resolve(result.value); - } else { - reject(result.error); - } - }); - }; - - let getPromise = () => { - promise = new Promise((resolve, reject) => { - awaitResult(resolve, reject); - }); - getPromise = () => promise; - return promise; - }; - - let task = create>("Task", {}, { - *[Symbol.iterator]() { - let frameResult = evaluate | void>(() => frame); - if (frameResult) { - let result = getResult(frameResult); - if (result.ok) { - return result.value; - } else { - throw result.error; - } - } else { - return yield* action(function* (resolve, reject) { - awaitResult(resolve, reject); - }); - } - }, - then: (...args) => getPromise().then(...args), - catch: (...args) => getPromise().catch(...args), - finally: (...args) => getPromise().finally(...args), - halt() { - let haltPromise: Promise; - let getHaltPromise = () => { - haltPromise = new Promise((resolve, reject) => { - awaitHaltResult(resolve, reject); - }); - getHaltPromise = () => haltPromise; - frame.destroy(); - return haltPromise; - }; - let awaitHaltResult = (resolve: Resolve, reject: Reject) => { - evaluate(function* () { - let { destruction } = yield* frame; - if (destruction.ok) { - resolve(); - } else { - reject(destruction.error); - } - }); - }; - return create>("Future", {}, { - *[Symbol.iterator]() { - let result = evaluate | void>(() => frame); - - if (result) { - if (!result.ok) { - throw result.error; - } - } else { - yield* action(function* (resolve, reject) { - awaitHaltResult(resolve, reject); - frame.destroy(); - }); - } - }, - then: (...args) => getHaltPromise().then(...args), - catch: (...args) => getHaltPromise().catch(...args), - finally: (...args) => getHaltPromise().finally(...args), - }); - }, - }); - return task; -} - -function getResult(result: FrameResult): Result { - if (!result.ok) { - return result; - } else if (result.exit.type === "aborted") { - return Err(Error("halted")); - } else if (result.exit.type === "crashed") { - return Err(result.exit.error); - } else { - return result.exit.result; - } -} diff --git a/lib/run/types.ts b/lib/run/types.ts deleted file mode 100644 index 87b1bf2ca..000000000 --- a/lib/run/types.ts +++ /dev/null @@ -1,19 +0,0 @@ -import type { Result } from "../types.ts"; - -export type Exit = { - type: "aborted"; -} | { - type: "crashed"; - error: Error; -} | { - type: "result"; - result: Result; -}; - -/** - * @ignore - */ -export type FrameResult = Result & { - exit: Exit; - destruction: Result; -}; diff --git a/lib/run/value.ts b/lib/run/value.ts deleted file mode 100644 index a493fbbf3..000000000 --- a/lib/run/value.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { type Computation, reset } from "../deps.ts"; -import { type Resolve } from "../types.ts"; -import { shiftSync } from "../shift-sync.ts"; - -export function* createValue(): Computation<[Resolve, Computation]> { - let result: { value: T } | void = void 0; - let listeners = new Set>(); - - let resolve = yield* reset>(function* () { - let value = yield* shiftSync((k) => k.tail); - - result = { value }; - - for (let listener of listeners) { - listeners.delete(listener); - listener(value); - } - }); - - let event: Computation = { - [Symbol.iterator]() { - if (result) { - return sync(result.value); - } else { - return shiftSync((k) => { - listeners.add(k.tail); - })[Symbol.iterator](); - } - }, - }; - return [resolve, event]; -} - -export interface Queue { - add(item: T): void; - next(): Computation; -} - -export function sync(value: T) { - return { - next() { - return { done: true, value } as const; - }, - }; -} diff --git a/lib/scope.ts b/lib/scope.ts new file mode 100644 index 000000000..24efc3bbd --- /dev/null +++ b/lib/scope.ts @@ -0,0 +1,105 @@ +import { Children, Generation } from "./contexts.ts"; +import { Context, Effect, Future, Operation, Scope, Task } from "./types.ts"; +import { Err, Ok, unbox } from "./result.ts"; +import { createTask } from "./task.ts"; + +export const [global] = createScopeInternal(); + +export function createScope( + parent: Scope = global, +): [Scope, () => Future] { + let [scope, destroy] = createScopeInternal(parent); + return [scope, () => parent.run(destroy)]; +} + +export function createScopeInternal( + parent?: Scope, +): [ScopeInternal, () => Operation] { + let destructors = new Set<() => Operation>(); + + let contexts: Record = Object.create( + parent ? (parent as ScopeInternal).contexts : null, + ); + let scope: ScopeInternal = Object.create({ + [Symbol.toStringTag]: "Scope", + contexts, + get(context: Context): T | undefined { + return (contexts[context.name] ?? context.defaultValue) as T | undefined; + }, + set(context: Context, value: T): T { + return contexts[context.name] = value; + }, + expect(context: Context): T { + let value = scope.get(context); + if (typeof value === "undefined") { + let error = new Error(context.name); + error.name = `MissingContextError`; + throw error; + } + return value; + }, + delete(context: Context): boolean { + return delete contexts[context.name]; + }, + hasOwn(context: Context): boolean { + return !!Reflect.getOwnPropertyDescriptor(contexts, context.name); + }, + run(operation: () => Operation): Task { + let { task, start } = createTask({ operation, owner: scope }); + start(); + return task; + }, + spawn(operation: () => Operation): Operation> { + return { + *[Symbol.iterator]() { + let { task, start } = createTask({ operation, owner: scope }); + start(); + return task; + }, + }; + }, + + ensure(op: () => Operation): () => void { + destructors.add(op); + return () => destructors.delete(op); + }, + }); + + scope.set(Generation, scope.expect(Generation) + 1); + scope.set(Children, new Set()); + parent?.expect(Children).add(scope); + + let unbind = parent ? (parent as ScopeInternal).ensure(destroy) : () => {}; + + function* destroy(): Operation { + parent?.expect(Children).delete(scope); + unbind(); + let outcome = Ok(); + for (let destructor of [...destructors].reverse()) { + try { + destructors.delete(destructor); + yield* destructor(); + } catch (error) { + outcome = Err(error); + } + } + unbox(outcome); + } + + return [scope, destroy]; +} + +export interface ScopeInternal extends Scope { + contexts: Record; + ensure(op: () => Operation): () => void; +} + +export function* useScope(): Operation { + return (yield { + description: `useScope()`, + enter(resolve, { scope }) { + resolve(Ok(scope)); + return (resolve) => resolve(Ok()); + }, + } as Effect) as Scope; +} diff --git a/lib/shift-sync.ts b/lib/shift-sync.ts deleted file mode 100644 index f88e79098..000000000 --- a/lib/shift-sync.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { type Computation, type Continuation, shift } from "./deps.ts"; - -/** - * Create a shift computation where the body of the shift can be resolved - * in a single step. - * - * before: - * ```ts - * yield* shift(function*(k) { return k; }); - * ``` - * after: - * yield* shiftSync(k => k); - */ -export function shiftSync( - block: (resolve: Continuation, reject: Continuation) => void, -): Computation { - return shift((resolve, reject) => { - return { - [Symbol.iterator]: () => ({ - next() { - let value = block(resolve, reject); - return { done: true, value }; - }, - }), - }; - }); -} diff --git a/lib/signal.ts b/lib/signal.ts index 9c41739f9..621caf9a2 100644 --- a/lib/signal.ts +++ b/lib/signal.ts @@ -1,7 +1,7 @@ import type { Stream, Subscription } from "./types.ts"; import { createQueue, type Queue } from "./queue.ts"; -import { resource } from "./instructions.ts"; +import { resource } from "./resource.ts"; import { createContext } from "./context.ts"; /** @@ -117,7 +117,7 @@ export function createSignal(): Signal { let subscribers = new Set>(); let subscribe = resource>(function* (provide) { - let newQueue = yield* SignalQueueFactory; + let newQueue = yield* SignalQueueFactory.expect(); let queue = newQueue(); subscribers.add(queue); diff --git a/lib/sleep.ts b/lib/sleep.ts index 0ccb96485..5a914fda1 100644 --- a/lib/sleep.ts +++ b/lib/sleep.ts @@ -1,5 +1,5 @@ -import type { Operation } from "./types.ts"; -import { action, suspend } from "./instructions.ts"; +import { Operation } from "./types.ts"; +import { action } from "./action.ts"; /** * Sleep for the given amount of milliseconds. @@ -17,12 +17,8 @@ import { action, suspend } from "./instructions.ts"; * @param duration - the number of milliseconds to sleep */ export function sleep(duration: number): Operation { - return action(function* sleep(resolve) { + return action((resolve) => { let timeoutId = setTimeout(resolve, duration); - try { - yield* suspend(); - } finally { - clearTimeout(timeoutId); - } - }); + return () => clearTimeout(timeoutId); + }, `sleep(${duration})`); } diff --git a/lib/spawn.ts b/lib/spawn.ts new file mode 100644 index 000000000..286f5c993 --- /dev/null +++ b/lib/spawn.ts @@ -0,0 +1,22 @@ +import { Ok } from "./result.ts"; +import { ScopeInternal } from "./scope.ts"; +import { createTask, NewTask } from "./task.ts"; +import type { Effect, Operation, Task } from "./types.ts"; + +export function* spawn(op: () => Operation): Operation> { + let { task, start } = (yield Spawn(op)) as NewTask; + start(); + return task; +} + +function Spawn(operation: () => Operation): Effect> { + return { + description: `spawn(${operation.name})`, + enter: (resolve, { scope }) => { + resolve(Ok(createTask({ owner: scope as ScopeInternal, operation }))); + return (done) => { + done(Ok()); + }; + }, + }; +} diff --git a/lib/suspend.ts b/lib/suspend.ts new file mode 100644 index 000000000..f21918be0 --- /dev/null +++ b/lib/suspend.ts @@ -0,0 +1,6 @@ +import { action } from "./action.ts"; +import { Operation } from "./types.ts"; + +export function suspend(): Operation { + return action(() => () => {}, "suspend"); +} diff --git a/lib/task.ts b/lib/task.ts new file mode 100644 index 000000000..6769c0db0 --- /dev/null +++ b/lib/task.ts @@ -0,0 +1,242 @@ +import { createContext } from "./context.ts"; +import { Routine } from "./contexts.ts"; +import { createCoroutine } from "./coroutine.ts"; +import { drain } from "./drain.ts"; +import { lazyPromise, lazyPromiseWithResolvers } from "./lazy-promise.ts"; +import { Just, Maybe, Nothing } from "./maybe.ts"; +import { Err, Ok, Result, unbox } from "./result.ts"; +import { createScopeInternal, type ScopeInternal } from "./scope.ts"; +import type { + Coroutine, + Future, + Operation, + Resolve, + Scope, + Task, +} from "./types.ts"; + +export interface TaskOptions { + owner: ScopeInternal; + operation(): Operation; +} + +export interface NewTask { + scope: Scope; + routine: Coroutine>; + task: Task; + start(): void; +} + +export function createTask(options: TaskOptions): NewTask { + let { owner, operation } = options; + let [scope, destroy] = createScopeInternal(owner); + + TaskGroup.ensureOwn(scope); + + let routine = createCoroutine({ + scope, + operation: () => trapset(() => after(operation, destroy)), + }); + + let { promise, resolve, reject } = lazyPromiseWithResolvers(); + + let initiateHalt = (resolve: Resolve>) => { + if (scope.hasOwn(TrapContext)) { + let trap = scope.expect(TrapContext); + let current = routine.data.discard; + routine.data.discard = (exit) => + current((result) => { + if (!result.ok) { + trap.result = result; + } + exit(result); + }); + return routine.return( + trap.result = Ok(Nothing()), + drain((result) => resolve(result.ok ? Ok() : result)), + ); + } else { + return routine.return( + Ok(Nothing()), + drain((result) => resolve(result.ok ? Ok() : result)), + ); + } + }; + + let halt = lazyPromise((resolve, reject) => { + initiateHalt((result) => result.ok ? resolve() : reject(result.error)); + }); + + Object.defineProperty(halt, Symbol.iterator, { + enumerable: false, + *value(): Operation { + yield ({ + description: "halt", + enter: (resolve) => { + let unsubscribe = initiateHalt(resolve); + + return (done) => { + unsubscribe(); + done(Ok()); + }; + }, + }); + }, + }); + + let task = Object.defineProperty(promise, "halt", { + enumerable: false, + value: () => halt as Future, + }) as Task; + + let group = TaskGroup.ensureOwn(owner); + + let link = group.link(owner, task); + + scope.set(Routine, routine); + + let start = () => + routine.next( + Ok(), + drain((result) => { + link.close(result); + if (result.ok) { + if (result.value.exists) { + resolve(result.value.value); + } else { + reject(new Error("halted")); + } + } else { + reject(result.error); + } + }), + ); + + return { task, scope, routine, start }; +} + +export const TaskGroupContext = createContext("@effection/tasks"); + +export function encapsulate(operation: () => Operation): Operation { + return TaskGroupContext.with(new TaskGroup(), function* (group) { + try { + return yield* operation(); + } finally { + yield* group.halt(); + } + }); +} + +class TaskGroup { + static ensureOwn(scope: ScopeInternal): TaskGroup { + if (!scope.hasOwn(TaskGroupContext)) { + let group = scope.set(TaskGroupContext, new TaskGroup()); + scope.ensure(() => group.halt()); + } + return scope.expect(TaskGroupContext); + } + + links = new Set>(); + + link(owner: Scope, task: Task) { + return new TaskLink(owner, task, this.links); + } + + *halt() { + let links = [...this.links].reverse(); + links.forEach((link) => link.sever()); + let outcome = Ok(); + for (let link of links) { + let result = yield* box(link.task.halt); + if (!result.ok) { + outcome = result; + } + } + return unbox(outcome); + } +} + +class TaskLink { + constructor( + public owner: Scope, + public task: Task, + public links: Set>, + ) { + this.links.add(this); + } + + close(result: Result>) { + this.links.delete(this); + if (!result.ok) { + let trap = this.owner.get(TrapContext); + if (trap) { + trap.result = result; + this.owner.get(Routine)?.return(trap.result); + } + } + } + + sever() { + this.links.delete(this); + this.close = () => {}; + } +} + +function* box(op: () => Operation): Operation> { + try { + return Ok(yield* op()); + } catch (error) { + return Err(error); + } +} + +const TrapContext = createContext<{ result: Result> }>( + "@effection/trap", +); + +function trapset(op: () => Operation): Operation> { + let result = Ok(Nothing()); + return TrapContext.with({ result }, function* (trap) { + try { + let value = yield* op(); + if (trap.result === result) { + trap.result = Ok(Just(value)); + } + } catch (error) { + trap.result = Err(error); + } finally { + return unbox(trap.result) as Maybe; + } + }); +} + +export function* trap(op: () => Operation): Operation { + let outcome = yield* trapset(op); + if (outcome.exists) { + return outcome.value; + } else { + return (yield { + description: "propagate halt", + enter: (resolve, routine) => { + let trap = routine.scope.expect(TrapContext); + trap.result = Ok(Nothing()); + routine.return(trap.result); + resolve(Ok()); + return (resolve) => { + resolve(Ok()); + }; + }, + }) as T; + } +} + +function* after( + op: () => Operation, + epilogue: () => Operation, +): Operation { + try { + return yield* op(); + } finally { + yield* epilogue(); + } +} diff --git a/lib/types.ts b/lib/types.ts index be841d9f3..4774baf5f 100644 --- a/lib/types.ts +++ b/lib/types.ts @@ -1,5 +1,4 @@ -// deno-lint-ignore-file no-explicit-any -import type { Computation } from "./deps.ts"; +import type { Result } from "./result.ts"; /** * An `Operation` in Effection describes an abstract computation. An operation @@ -41,7 +40,7 @@ import type { Computation } from "./deps.ts"; * */ export interface Operation { - [Symbol.iterator](): Iterator; + [Symbol.iterator](): Iterator, T, unknown>; } /** @@ -53,7 +52,7 @@ export interface Operation { * things, if the operation resolves synchronously, it will continue within the * same tick of the run loop. */ -export interface Future extends Promise, Operation {} +export interface Future extends Operation, Promise {} /** * A handle to a concurrently running operation that lets you either use the @@ -148,11 +147,95 @@ export interface Task extends Future { halt(): Future; } -export type Resolve = (value: T) => void; +/** + * The Effection equivalent of an [`AsyncIterator`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator) + * + * A subscription acts like a stateful queue that provides a sequence of values + * via the next() method. Normally a subscription is created via a + * {@link Stream}. + * + * @see https://effection.deno.dev/docs/collections#subscription + */ +export interface Subscription { + next(): Operation>; +} -export type Reject = (error: Error) => void; +/** + * The Effection equivalent of an [`AsyncIterable`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols). + * + * Like async iterables, streams do not actually have state themselves, but + * contain the recipe for how to create a {@link Subscription} + * + * @see https://frontside.com/effection/docs/collections#stream + */ +export type Stream = Operation>; -export type Provide = (value: T) => Operation; +/** + * `Context` defines a value which is in effect for a given scope which is an + * (action, resource, call, or spawn). + * + * Unless a context value is defined for a particular scope, it will inherit + * its value from its parent scope. + */ +export interface Context { + /** + * A unique identifier for this context. + */ + name: string; + /** + * The value returned by this context when it is not present on a scop.e + */ + defaultValue?: T; + + /** + * Read the current value of this context if it exists. + * + * @returns an operation that yields the current value if it exists, or undefined otherwise. + * @see {@link Scope#get} for reading a context value outside of a running operation + */ + get(): Operation; + + /** + * Set the value of a context on the current scope. It will not effect the value of its + * containing scope and will only be visible by this scope and its children. + * + * @returns an operation yielding the value being set + * @see {@link Scope#set} for setting a context value outside of a running operation + */ + set(value: T): Operation; + + /** + * Read the current value of the context or fail if it does not exist + * + * @returns an operation that yields the context value + * @see {@link Scope#expect} for reading a required context value outside of a running operation + */ + expect(): Operation; + + /** + * Remove a context value from the current scope. This will only effect the current scope and + * not its parent value. + * + * @returns true if the value existed uniquely on this scope. + */ + delete(): Operation; + + /** + * Evaluate an operation using `value` for the context. Once the operation is completed, the context + * will be reverted to its original value, or removed if it was not present originally. + * + * @example + * ```ts + * let user = yield* login(); + * yield* UserContext.with(user, function*() { + * //do stuff + * }) + * ``` + * + * @returns the result of evaluating the operation. + */ + with(value: T, operation: (value: T) => Operation): Operation; +} /** * A programatic API to interact with an Effection scope from outside of an @@ -185,17 +268,29 @@ export type Provide = (value: T) => Operation; */ export interface Scope { /** - * Spawn an {@link Operation} within `Scope`. + * Run an {@link Operation} within `Scope`. * * This is used to create concurrent tasks from _outside_ of a running - * operation. + * operation. To create concurrent tasks from _within_ an already + * running operation, use {@link Scope#spawn} * * @param operation - the operation to run - * @returns a task rep + * @returns a task of the running operation */ run(operation: () => Operation): Task; + + /** + * Spawn an {@link Operation} within `Scope`. + * + * This is used to create concurrent tasks from _within_ a running + * operation. To create concurrent from outside of Effection, use + * {@link Scope#run} + */ + spawn(operation: () => Operation): Operation>; + /** * Get a {@link Context} value from outside of an operation. + * * @param context - the context to get * @returns the value of that context in this scope if it exists */ @@ -203,68 +298,35 @@ export interface Scope { /** * Set the value of a {@link Context} from outside of an operation + * * @param context - the context to set * @param value - the value to set for this context * @returns - the value that was set */ set(context: Context, value: T): T; -} - -/** - * The Effection equivalent of an [`AsyncIterable`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Iteration_protocols#the_async_iterator_and_async_iterable_protocols). - * - * Like async iterables, streams do not actually have state themselves, but - * contain the recipe for how to create a {@link Subscription} - * - * @see https://frontside.com/effection/docs/collections#stream - */ -export type Stream = Operation>; - -/** - * The Effection equivalent of an [`AsyncIterator`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncIterator) - * - * A subscription acts like a stateful queue that provides a sequence of values - * via the next() method. Normally a subscription is created via a - * {@link Stream}. - * - * @see https://effection.deno.dev/docs/collections#subscription - */ -export interface Subscription { - next(): Operation>; -} - -/** - * `Context`` defines a value which is in effect for a given scope which is an - * (action, resource, call, or spawn). - * - * When used as an operation, it gets the value of the Context from - * the current scope. If it has not been set, and there is no default - * value, then a `MissingContextError` will be raised. - * - * Unless a context value is defined for a particular scope, it will inherit - * its value from its parent scope. - */ -export interface Context extends Operation { - /** - * A unique identifier for this context. - */ - readonly key: string; /** - * The value of the context if has not been defined for the current operation. + * Get a {@link Context} value from outside of an operation, and throw + * a `MissingContextError` if this context is not specified for this scope. + * + * @param context - the context to get + * @returns the value of that context in this scope if it exists */ - readonly defaultValue?: T; + expect(context: Context): T; /** - * Set the value of the Context for the current scope. + * Remove a {@link Context} value from this scope. + * + * @param context - the context to delete */ - set(value: T): Operation; + delete(context: Context): boolean; /** - * Get the value of the Context from the current scope. If it has not been - * set, and there is no default value, then this will return `undefined`. + * Check if scope has its own unique value for `context`. + * + * @returns `true` if scope has its own context, `false` if context is not present, or inherited from its parent. */ - get(): Operation; + hasOwn(context: Context): boolean; } /** @@ -276,39 +338,42 @@ export type Yielded> = T extends Operation ? TYield : never; -/* low-level interface Which you probably will not need */ +// low-level private apis. /** * @ignore */ -export type Result = { - readonly ok: true; - value: T; -} | { - readonly ok: false; - error: Error; -}; +export interface Effect { + description: string; + enter( + resolve: Resolve>, + routine: Coroutine, + ): (resolve: Resolve>) => void; +} /** * @ignore */ -export interface Instruction { - (frame: Frame): Computation>; +export interface Coroutine { + scope: Scope; + data: { + discard(resolve: Resolve>): void; + iterator: Iterator, T, unknown>; + }; + next(result: Result, subscriber?: Subscriber): () => void; + return(result: Result, subcriber?: Subscriber): () => void; } -import type { FrameResult } from "./run/types.ts"; +/** + * @ignore + */ +export interface Subscriber { + (result: IteratorResult, Result>): void; +} /** * @ignore */ -export interface Frame extends Computation> { - id: number; - context: Record; - exited?: true; - aborted?: boolean; - getTask(): Task; - createChild(operation: () => Operation): Frame; - enter(): void; - crash(error: Error): Computation>; - destroy(): Computation>; +export interface Resolve { + (value: T): void; } diff --git a/lib/with-resolvers.ts b/lib/with-resolvers.ts new file mode 100644 index 000000000..ac504f203 --- /dev/null +++ b/lib/with-resolvers.ts @@ -0,0 +1,49 @@ +import { Err, Ok, Result } from "./result.ts"; +import { action } from "./action.ts"; +import type { Operation } from "./types.ts"; + +export interface WithResolvers { + operation: Operation; + resolve(value: T): void; + reject(error: Error): void; +} + +export function withResolvers(description?: string): WithResolvers { + let continuations = new Set<(result: Result) => void>(); + let result: Result | undefined = undefined; + + let operation: Operation = action( + function (resolve, reject) { + let settle = (outcome: Result) => { + if (outcome.ok) { + resolve(outcome.value); + } else { + reject(outcome.error); + } + }; + + if (result) { + settle(result); + return () => {}; + } else { + continuations.add(settle); + return () => continuations.delete(settle); + } + }, + description, + ); + + let settle = (outcome: Result) => { + if (!result) { + result = outcome; + } + for (let continuation of continuations) { + continuation(result); + } + }; + + let resolve = (value: T) => settle(Ok(value)); + let reject = (error: Error) => settle(Err(error)); + + return { operation, resolve, reject }; +} diff --git a/test/abort-signal.test.ts b/test/abort-signal.test.ts index f4d0f2379..ddaf35227 100644 --- a/test/abort-signal.test.ts +++ b/test/abort-signal.test.ts @@ -1,10 +1,12 @@ -import { describe, expect, it, mock } from "./suite.ts"; +import { describe, expect, it } from "./suite.ts"; import { run, useAbortSignal } from "../mod.ts"; describe("useAbortSignal()", () => { it("aborts whenever it passes out of scope", async () => { - let abort = mock.fn(); - + let aborted = false; + let abort = () => { + aborted = true; + }; let signal = await run(function* () { let signal = yield* useAbortSignal(); signal.addEventListener("abort", abort); @@ -12,6 +14,6 @@ describe("useAbortSignal()", () => { return signal; }); expect(signal.aborted).toBe(true); - expect(abort).toHaveBeenCalled(); + expect(aborted).toEqual(true); }); }); diff --git a/test/action.test.ts b/test/action.test.ts deleted file mode 100644 index f2287e37d..000000000 --- a/test/action.test.ts +++ /dev/null @@ -1,122 +0,0 @@ -import { describe, expect, it } from "./suite.ts"; - -import { action, run, suspend } from "../mod.ts"; - -describe("action", () => { - it("can resolve", async () => { - let didClear = false; - let task = run(() => - action(function* (resolve) { - let timeout = setTimeout(() => resolve(42), 5); - try { - yield* suspend(); - } finally { - didClear = true; - clearTimeout(timeout); - } - }) - ); - - await expect(task).resolves.toEqual(42); - expect(didClear).toEqual(true); - }); - - it("can reject", async () => { - let didClear = false; - let error = new Error("boom"); - let task = run(() => - action(function* (_, reject) { - let timeout = setTimeout(() => reject(error), 5); - try { - yield* suspend(); - } finally { - didClear = true; - clearTimeout(timeout); - } - }) - ); - - await expect(task).rejects.toEqual(error); - expect(didClear).toEqual(true); - }); - - it("can resolve without ever suspending", async () => { - let result = await run(() => - action(function* (resolve) { - resolve("hello"); - }) - ); - - expect(result).toEqual("hello"); - }); - - it("can reject without ever suspending", async () => { - let error = new Error("boom"); - let task = run(() => - action(function* (_, reject) { - reject(error); - }) - ); - await expect(task).rejects.toEqual(error); - }); - - it("can resolve before it suspends", async () => { - expect( - await run(() => - action(function* (resolve) { - resolve("hello"); - yield* suspend(); - }) - ), - ).toEqual("hello"); - }); - - it("can reject before it suspends", async () => { - let error = new Error("boom"); - let task = run(() => - action(function* (_, reject) { - reject(error); - yield* suspend(); - }) - ); - await expect(task).rejects.toEqual(error); - }); - - it("fails if the operation fails", async () => { - let task = run(() => - action(function* () { - throw new Error("boom"); - }) - ); - await expect(task).rejects.toHaveProperty("message", "boom"); - }); - - it("fails if the shutdown fails", async () => { - let error = new Error("boom"); - let task = run(() => - action(function* (resolve) { - let timeout = setTimeout(resolve, 5); - try { - yield* suspend(); - } finally { - clearTimeout(timeout); - // deno-lint-ignore no-unsafe-finally - throw error; - } - }) - ); - await expect(task).rejects.toEqual(error); - }); - - it("does not reach code that should be aborted", async () => { - let didReach = false; - await run(function Main() { - return action(function* MyAction(resolve) { - resolve(10); - yield* suspend(); - didReach = true; - }); - }); - expect(didReach).toEqual(false); - }); -}); diff --git a/test/call.test.ts b/test/call.test.ts index bf02a3b48..19229097a 100644 --- a/test/call.test.ts +++ b/test/call.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "./suite.ts"; -import { call, run, spawn, suspend } from "../mod.ts"; +import { call, run } from "../mod.ts"; describe("call", () => { it("evaluates an operation function", async () => { @@ -13,16 +13,6 @@ describe("call", () => { }); }); - it("evaluates an operation directly", async () => { - await expect(run(() => - call({ - *[Symbol.iterator]() { - return 42; - }, - }) - )).resolves.toEqual(42); - }); - it("evaluates an async function", async () => { await expect(run(() => call(async function () { @@ -38,27 +28,6 @@ describe("call", () => { ); }); - it("evaluates a promise directly", async () => { - await expect(run(() => call(Promise.resolve(42)))).resolves.toEqual(42); - }); - - it("can be used as an error boundary", async () => { - let error = new Error("boom!"); - let result = await run(function* () { - try { - yield* call(function* () { - yield* spawn(function* () { - throw error; - }); - yield* suspend(); - }); - } catch (error) { - return error; - } - }); - expect(result).toEqual(error); - }); - it("evaluates a vanilla function", async () => { await expect(run(() => call(() => 42))).resolves.toEqual(42); }); diff --git a/test/context.test.ts b/test/context.test.ts index 379e3a0ce..7a8eec4dc 100644 --- a/test/context.test.ts +++ b/test/context.test.ts @@ -1,6 +1,6 @@ import { describe, expect, it } from "./suite.ts"; -import { call, createContext, run } from "../mod.ts"; +import { createContext, run, sleep, spawn } from "../mod.ts"; const numbers = createContext("number", 3); @@ -8,19 +8,20 @@ describe("context", () => { it("has the initial value available at all times", async () => { expect( await run(function* () { - return yield* numbers; + return yield* numbers.get(); }), ).toEqual(3); }); it("can be set within a given scope, but reverts after", async () => { let values = await run(function* () { - let before = yield* numbers; - let within = yield* call(function* () { - yield* numbers.set(22); - return yield* numbers; + let before = yield* numbers.get(); + + let within = yield* numbers.with(22, function*() { + return yield* numbers.get(); }); - let after = yield* numbers; + + let after = yield* numbers.get(); return [before, within, after]; }); @@ -36,7 +37,32 @@ describe("context", () => { it("is an error to expect() when context is missing", async () => { await expect(run(function* () { - yield* createContext("missing"); + yield* createContext("missing").expect(); })).rejects.toHaveProperty("name", "MissingContextError"); }); + + it("inherits values from parent tasks", async () => { + let context = createContext("just-a-string"); + await run(function* () { + yield* context.set("hello"); + + let task = yield* spawn(function* () { + return yield* context.get(); + }); + + expect(yield* task).toEqual("hello"); + }); + }); + + it("does see values that are set by child tasks", async () => { + let context = createContext("just-a-string"); + await run(function* () { + yield* context.set("hello"); + yield* spawn(function* () { + yield* context.set("goodbye"); + }); + yield* sleep(1); + expect(yield* context.get()).toEqual("hello"); + }); + }); }); diff --git a/test/each.test.ts b/test/each.test.ts index 57015323d..b794d31f2 100644 --- a/test/each.test.ts +++ b/test/each.test.ts @@ -1,21 +1,19 @@ import { describe, expect, it } from "./suite.ts"; -import { createChannel, each, run, spawn, suspend } from "../mod.ts"; +import { createQueue, each, resource, run, spawn, Stream } from "../mod.ts"; describe("each", () => { it("can be used to iterate a stream", async () => { await run(function* () { - let channel = createChannel(); let actual = [] as string[]; - yield* spawn(function* () { + let channel = sequence("one", "two", "three"); + let consumer = yield* spawn(function* () { for (let value of yield* each(channel)) { actual.push(value); yield* each.next(); } }); - yield* channel.send("one"); - yield* channel.send("two"); - yield* channel.send("three"); + yield* consumer; expect(actual).toEqual(["one", "two", "three"]); }); @@ -24,10 +22,10 @@ describe("each", () => { it("can be used to iterate nested streams", async () => { await run(function* () { let actual = [] as string[]; - let outer = createChannel(); - let inner = createChannel(); + let outer = sequence("one", "two"); + let inner = sequence("three", "four", "five"); - yield* spawn(function* () { + let consumer = yield* spawn(function* () { for (let value of yield* each(outer)) { actual.push(value); for (let value of yield* each(inner)) { @@ -38,46 +36,41 @@ describe("each", () => { } }); - yield* outer.send("one"); - yield* inner.send("two"); - yield* inner.send("two and a half"); - yield* inner.close(); - yield* outer.send("three"); - yield* inner.send("four"); - yield* inner.close(); - yield* outer.close(); + yield* consumer; - expect(actual).toEqual(["one", "two", "two and a half", "three", "four"]); + expect(actual).toEqual([ + "one", + "three", + "four", + "five", + "two", + "three", + "four", + "five", + ]); }); }); it("handles context correctly if you break out of a loop", async () => { await expect(run(function* () { - let channel = createChannel(); + let seq = sequence("hello world"); - yield* spawn(function* () { - for (let _ of yield* each(channel)) { - break; - } - // we're out of the loop, each.next() should be invalid. - yield* each.next(); - }); + for (let _ of yield* each(seq)) { + break; + } - yield* channel.send("hello"); - yield* suspend(); + // we're out of the loop, each.next() should be invalid. + yield* each.next(); })).rejects.toHaveProperty("name", "IterationError"); }); it("throws an error if you forget to invoke each.next()", async () => { await expect(run(function* () { - let channel = createChannel(); - yield* spawn(function* () { - for (let _ of yield* each(channel)) { - _; - } - }); - yield* channel.send("hello"); - yield* suspend(); + let seq = sequence("hello"); + + for (let _ of yield* each(seq)) { + _; + } })).rejects.toHaveProperty("name", "IterationError"); }); @@ -87,4 +80,38 @@ describe("each", () => { "MissingContextError", ); }); + + it("closes the stream after exiting from the loop", async () => { + let state = { status: "pending" }; + let stream: Stream = resource(function* (provide) { + try { + state.status = "active"; + yield* provide(yield* sequence("one", "two")); + } finally { + state.status = "closed"; + } + }); + + await run(function* () { + yield* spawn(function* () { + for (let _ of yield* each(stream)) { + expect(state.status).toEqual("active"); + yield* each.next(); + } + + expect(state.status).toEqual("closed"); + }); + }); + }); }); + +function sequence(...values: string[]): Stream { + return resource(function* (provide) { + let q = createQueue(); + for (let value of values) { + q.add(value); + } + q.close(); + yield* provide(q); + }); +} diff --git a/test/lift.test.ts b/test/lift.test.ts index d22402962..80494236e 100644 --- a/test/lift.test.ts +++ b/test/lift.test.ts @@ -1,23 +1,20 @@ -import { createSignal, each, lift, run, sleep, spawn } from "../mod.ts"; +import { lift, run, withResolvers, spawn } from "../mod.ts"; import { describe, expect, it } from "./suite.ts"; describe("lift", () => { it("safely does not continue if the call stops the operation", async () => { let reached = false; - await run(function* () { - let signal = createSignal(); - - yield* spawn(function* () { - yield* sleep(0); - yield* lift(signal.close)(); - + await run(function* main() { + let resolvers = withResolvers(); + yield* spawn(function* lifter() { + yield* lift(resolvers.resolve)("resolve it!"); reached = true; }); - for (let _ of yield* each(signal)); + yield* resolvers.operation; }); - expect(reached).toBe(false); + expect(reached).toEqual(false); }); }); diff --git a/test/main.test.ts b/test/main.test.ts index f31124f2c..66a4a7392 100644 --- a/test/main.test.ts +++ b/test/main.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it, useCommand } from "./suite.ts"; -import { run, sleep } from "../mod.ts"; +import { run, sleep, spawn, resource, type Operation } from "../mod.ts"; describe("main", () => { it("gracefully shuts down on SIGINT", async () => { @@ -123,9 +123,6 @@ describe("main", () => { }); }); -import type { Operation } from "../lib/types.ts"; -import { resource, spawn } from "../lib/instructions.ts"; - interface Buffer { content: string; } @@ -165,5 +162,5 @@ function* detect( } yield* sleep(10); } - expect(buffer.content).toMatch(text); + expect(buffer.content).toContain(text); } diff --git a/test/main/fail.exit.ts b/test/main/fail.exit.ts index fc890fe4b..30053a039 100644 --- a/test/main/fail.exit.ts +++ b/test/main/fail.exit.ts @@ -1,6 +1,4 @@ -import { sleep } from "../../lib/sleep.ts"; -import { exit, main } from "../../lib/main.ts"; -import { spawn, suspend } from "../../lib/instructions.ts"; +import { exit, main, sleep, spawn, suspend } from "../../mod.ts"; await main(function* () { yield* spawn(function* () { diff --git a/test/main/fail.unexpected.ts b/test/main/fail.unexpected.ts index ea0fb0f7b..d2edbaed3 100644 --- a/test/main/fail.unexpected.ts +++ b/test/main/fail.unexpected.ts @@ -1,6 +1,4 @@ -import { sleep } from "../../lib/sleep.ts"; -import { main } from "../../lib/main.ts"; -import { spawn, suspend } from "../../lib/instructions.ts"; +import { main, sleep, spawn, suspend } from "../../mod.ts"; await main(function* () { yield* spawn(function* () { diff --git a/test/main/just.suspend.ts b/test/main/just.suspend.ts index 0013ad646..e584014b6 100644 --- a/test/main/just.suspend.ts +++ b/test/main/just.suspend.ts @@ -1,5 +1,4 @@ -import { suspend } from "../../lib/instructions.ts"; -import { main } from "../../lib/main.ts"; +import { main, suspend } from "../../mod.ts"; await main(function* () { console.log("started"); diff --git a/test/main/ok.daemon.ts b/test/main/ok.daemon.ts index df96dea92..28b0cee70 100644 --- a/test/main/ok.daemon.ts +++ b/test/main/ok.daemon.ts @@ -1,5 +1,4 @@ -import { main } from "../../lib/main.ts"; -import { sleep } from "../../lib/sleep.ts"; +import { main, sleep } from "../../mod.ts"; await main(function* () { console.log(`started: ${Deno.pid}`); diff --git a/test/main/ok.exit.ts b/test/main/ok.exit.ts index c553beffa..392c12fc5 100644 --- a/test/main/ok.exit.ts +++ b/test/main/ok.exit.ts @@ -1,6 +1,4 @@ -import { sleep } from "../../lib/sleep.ts"; -import { spawn, suspend } from "../../lib/instructions.ts"; -import { exit, main } from "../../lib/main.ts"; +import { exit, main, sleep, spawn, suspend } from "../../mod.ts"; await main(function* () { yield* spawn(function* () { diff --git a/test/main/ok.implicit.ts b/test/main/ok.implicit.ts index ef3ed8373..2b4ae0403 100644 --- a/test/main/ok.implicit.ts +++ b/test/main/ok.implicit.ts @@ -1,6 +1,4 @@ -import { sleep } from "../../lib/sleep.ts"; -import { spawn, suspend } from "../../lib/instructions.ts"; -import { main } from "../../lib/main.ts"; +import { main, sleep, spawn, suspend } from "../../mod.ts"; await main(function* () { yield* spawn(function* () { diff --git a/test/queue.test.ts b/test/queue.test.ts index 44282ed5d..905e78803 100644 --- a/test/queue.test.ts +++ b/test/queue.test.ts @@ -1,12 +1,5 @@ import { describe, expect, it } from "./suite.ts"; -import { - action, - createQueue, - type Operation, - run, - sleep, - spawn, -} from "../mod.ts"; +import { createQueue, type Operation, race, run, sleep, spawn } from "../mod.ts"; describe("Queue", () => { it("adds value to an already waiting listener", async () => { @@ -53,15 +46,5 @@ describe("Queue", () => { }); function abortAfter(op: Operation, ms: number): Operation { - return action(function* (resolve, reject) { - yield* spawn(function* () { - try { - resolve(yield* op); - } catch (error) { - reject(error); - } - }); - yield* sleep(ms); - resolve(); - }); + return race([op, sleep(ms)]); } diff --git a/test/race.test.ts b/test/race.test.ts index a3a6c3b6b..07486eade 100644 --- a/test/race.test.ts +++ b/test/race.test.ts @@ -9,7 +9,15 @@ import { syncResolve, } from "./suite.ts"; -import { call, type Operation, race, run } from "../mod.ts"; +import { + call, + type Operation, + race, + run, + sleep, + spawn, + suspend, +} from "../mod.ts"; describe("race()", () => { it("resolves when one of the given operations resolves asynchronously first", async () => { @@ -73,4 +81,30 @@ describe("race()", () => { race([resolve("hello"), resolve(42), resolve("world"), resolve(true)]), ); }); + + it.skip("transfers tasks created in a contestant into the parent", async () => { + type State = { name: string; status: string }; + + function* op(name: string, delay: number): Operation { + let state = { name, status: "pending" }; + yield* spawn(function* () { + try { + state.status = `open`; + yield* suspend(); + } finally { + state.status = `closed`; + } + }); + yield* sleep(delay); + return state; + } + + await run(function* () { + let state = yield* race([op("winner", 0)]); + expect(state).toEqual({ name: "winner", status: "open" }); + }); + }); + + it.skip("doesn't adopt childless scopes", async () => { + }); }); diff --git a/test/resource.test.ts b/test/resource.test.ts index a3b48f3db..9475854a1 100644 --- a/test/resource.test.ts +++ b/test/resource.test.ts @@ -3,23 +3,6 @@ import { Operation, resource, run, sleep, spawn, suspend } from "../mod.ts"; type State = { status: string }; -function createResource(container: State): Operation { - return resource(function* (provide) { - yield* spawn(function* () { - yield* sleep(5); - container.status = "active"; - }); - - yield* sleep(1); - - try { - yield* provide(container); - } finally { - container.status = "finalized"; - } - }); -} - describe("resource", () => { it("runs resource in task scope", async () => { let state = { status: "pending" }; @@ -44,11 +27,28 @@ describe("resource", () => { await expect(task).rejects.toHaveProperty("message", "moo"); }); + it("can catch an error in init", async () => { + let task = run(function* () { + try { + yield* resource(function* () { + throw new Error("moo"); + }); + } catch (error) { + return error; + } + }); + + await expect(task).resolves.toMatchObject({ message: "moo" }); + }); + it("raises an error if an error occurs after init", async () => { let task = run(function* () { - yield* spawn(function* () { - yield* sleep(5); - throw new Error("moo"); + yield* resource(function* (provide) { + yield* spawn(function* () { + yield* sleep(5); + throw new Error("moo"); + }); + yield* provide(); }); try { yield* sleep(10); @@ -72,8 +72,26 @@ describe("resource", () => { yield* createResource(state); yield* suspend(); }); + await task.halt(); expect(state.status).toEqual("pending"); }); }); + +function createResource(container: State): Operation { + return resource(function* (provide) { + yield* spawn(function* () { + yield* sleep(5); + container.status = "active"; + }); + + yield* sleep(1); + + try { + yield* provide(container); + } finally { + container.status = "finalized"; + } + }); +} diff --git a/test/run.test.ts b/test/run.test.ts index 52c803a95..e321c1540 100644 --- a/test/run.test.ts +++ b/test/run.test.ts @@ -1,6 +1,5 @@ import { blowUp, createNumber, describe, expect, it } from "./suite.ts"; -import { run, sleep, spawn, suspend } from "../mod.ts"; -import type { Task } from "../mod.ts"; +import { action, run, sleep, spawn, suspend, type Task } from "../mod.ts"; describe("run()", () => { it("can run an operation", async () => { @@ -34,6 +33,7 @@ describe("run()", () => { let two = yield* blowUp(); return one + two; }); + await expect(task).rejects.toEqual(error); }); @@ -41,7 +41,8 @@ describe("run()", () => { let task = run(function () { throw new Error("boom"); }); - await expect(task).rejects.toHaveProperty("message", "boom"); + + await expect(task).rejects.toMatchObject({ message: "boom" }); }); it("can recover from errors in promise", async () => { @@ -90,7 +91,21 @@ describe("run()", () => { }); await task.halt(); + await expect(task).rejects.toHaveProperty("message", "halted"); + expect(halted).toEqual(true); + }); + it("can halt a task as an operation", async () => { + let halted = false; + let task = run(function* () { + try { + yield* suspend(); + } finally { + halted = true; + } + }); + + await run(task.halt); await expect(task).rejects.toHaveProperty("message", "halted"); expect(halted).toEqual(true); }); @@ -126,30 +141,31 @@ describe("run()", () => { try { yield* suspend(); } finally { - yield* sleep(10); + yield* sleep(1); completed = true; } }); await task.halt(); + await expect(task).rejects.toMatchObject({ message: "halted" }); expect(completed).toEqual(true); }); - it("cannot explicitly suspend in a finally block", async () => { - let done = false; - let task = run(function* () { - try { - yield* suspend(); - } finally { - yield* suspend(); - done = true; - } - }); - - await task.halt(); - expect(done).toEqual(true); - }); + // // it("cannot explicitly suspend in a finally block", async () => { + // // let done = false; + // // let task = run(function* () { + // // try { + // // yield* suspend(); + // // } finally { + // // yield* suspend(); + // // done = true; + // // } + // // }); + + // // await run(task.halt); + // // expect(done).toEqual(true); + // // }); it("can suspend in yielded finally block", async () => { let things: string[] = []; @@ -170,7 +186,6 @@ describe("run()", () => { }); await task.halt(); - await expect(task).rejects.toHaveProperty("message", "halted"); expect(things).toEqual(["first", "second"]); @@ -179,44 +194,42 @@ describe("run()", () => { it("can be halted while in the generator", async () => { let task = run(function* Main() { yield* spawn(function* Boomer() { - yield* sleep(2); throw new Error("boom"); }); yield* suspend(); }); - await expect(task).rejects.toHaveProperty("message", "boom"); + await expect(task).rejects.toMatchObject({ message: "boom" }); }); it("can halt itself", async () => { let task: Task = run(function* () { - yield* sleep(3); + yield* sleep(0); yield* task.halt(); }); - await expect(task).rejects.toHaveProperty("message", "halted"); + await expect(task).rejects.toMatchObject({ message: "halted" }); }); it("can halt itself between yield points", async () => { - let task: Task = run(function* () { - yield* sleep(1); + let task: Task = run(function* root() { + yield* sleep(0); - yield* spawn(function* () { + yield* spawn(function* child() { yield* task.halt(); }); yield* suspend(); }); - await expect(task).rejects.toHaveProperty("message", "halted"); + await expect(task).rejects.toMatchObject({ message: "halted" }); }); it("can delay halt if child fails", async () => { let didRun = false; - let task = run(function* () { + let task = run(function* Main() { yield* spawn(function* willBoom() { - yield* sleep(5); throw new Error("boom"); }); try { @@ -227,22 +240,40 @@ describe("run()", () => { } }); - await run(() => sleep(10)); - await expect(task).rejects.toHaveProperty("message", "boom"); expect(didRun).toEqual(true); }); + it("handles error in entering suspend point", async () => { + let error = new Error("boom!"); + let task = run(function* () { + yield* action(() => { + throw error; + }); + }); + + await expect(task).rejects.toEqual(error); + }); + + it("handles errors in exiting suspend points", async () => { + let error = new Error("boom!"); + let task = run(function* () { + yield* action(() => () => { + throw error; + }); + }); + + await expect(task.halt()).rejects.toEqual(error); + }); + it("can throw error when child blows up", async () => { let task = run(function* Main() { yield* spawn(function* Boomer() { - yield* sleep(5); throw new Error("boom"); }); try { yield* suspend(); } finally { - // deno-lint-ignore no-unsafe-finally throw new Error("bang"); } }); @@ -250,6 +281,18 @@ describe("run()", () => { await expect(task).rejects.toHaveProperty("message", "bang"); }); + it("throws an error in halt() if its finally block blows up", async () => { + let task = run(function* main() { + try { + yield* suspend(); + } finally { + throw new Error("moo"); + } + }); + + await expect(task.halt()).rejects.toMatchObject({ message: "moo" }); + }); + it("propagates errors", async () => { try { await run(function* () { diff --git a/test/scope.test.ts b/test/scope.test.ts index 588202a95..42fdf9f3c 100644 --- a/test/scope.test.ts +++ b/test/scope.test.ts @@ -4,19 +4,24 @@ import { createScope, resource, run, + sleep, suspend, useScope, } from "../mod.ts"; describe("Scope", () => { - it("can be used to run actions", async () => { + it("can be used to run operations", async () => { let [scope] = createScope(); - let t1 = scope.run(function* () { - return 1; - }); - let t2 = scope.run(function* () { - return 2; - }); + let t1 = run(() => + scope.spawn(function* () { + return 1; + }) + ); + let t2 = run(() => + scope.spawn(function* () { + return 2; + }) + ); expect(await t1).toEqual(1); expect(await t2).toEqual(2); }); @@ -24,9 +29,11 @@ describe("Scope", () => { it("succeeds on close if the frame has errored", async () => { let error = new Error("boom!"); let [scope, close] = createScope(); - let bomb = scope.run(function* () { - throw error; - }); + let bomb = run(() => + scope.spawn(function* () { + throw error; + }) + ); await expect(bomb).rejects.toEqual(error); await expect(close()).resolves.toBeUndefined(); }); @@ -34,14 +41,16 @@ describe("Scope", () => { it("errors on close if there is an problem in teardown", async () => { let error = new Error("boom!"); let [scope, close] = createScope(); - scope.run(function* () { - try { - yield* suspend(); - } finally { - // deno-lint-ignore no-unsafe-finally - throw error; - } - }); + run(() => + scope.spawn(function* () { + try { + yield* suspend(); + } finally { + // deno-lint-ignore no-unsafe-finally + throw error; + } + }) + ); await expect(close()).rejects.toEqual(error); }); @@ -50,14 +59,18 @@ describe("Scope", () => { let [scope, close] = createScope(); let tester: Tester = {}; - scope.run(function* () { - yield* useTester(tester); - yield* suspend(); - }); + run(() => + scope.spawn(function* () { + yield* useTester(tester); + yield* suspend(); + }) + ); - scope.run(function* () { - throw error; - }); + run(() => + scope.spawn(function* () { + throw error; + }) + ); await expect(close()).resolves.toBeUndefined(); expect(tester.status).toEqual("closed"); }); @@ -66,10 +79,11 @@ describe("Scope", () => { let tester: Tester = {}; await run(function* () { let scope = yield* useScope(); - scope.run(function* () { + yield* scope.spawn(function* () { yield* useTester(tester); yield* suspend(); }); + yield* sleep(1); expect(tester.status).toEqual("open"); }); expect(tester.status).toEqual("closed"); @@ -79,7 +93,7 @@ describe("Scope", () => { let cxt = createContext("number"); function* incr() { - let value = yield* cxt; + let value = yield* cxt.expect(); return yield* cxt.set(value + 1); } @@ -87,14 +101,15 @@ describe("Scope", () => { let scope = yield* useScope(); yield* cxt.set(1); - let first = yield* scope.run(incr); - let second = yield* scope.run(incr); - let third = yield* scope.run(incr); + let first = yield* scope.spawn(incr); + let second = yield* scope.spawn(incr); + let third = yield* scope.spawn(incr); + + expect(yield* first).toEqual(2); + expect(yield* second).toEqual(2); + expect(yield* third).toEqual(2); - expect(yield* cxt).toEqual(1); - expect(first).toEqual(2); - expect(second).toEqual(2); - expect(third).toEqual(2); + expect(yield* cxt.expect()).toEqual(1); }); }); @@ -104,14 +119,15 @@ describe("Scope", () => { expect(scope.get(context)).toEqual(void 0); expect(scope.set(context, "Hello World!")).toEqual("Hello World!"); expect(scope.get(context)).toEqual("Hello World!"); - await expect(scope.run(() => context)).resolves.toEqual("Hello World!"); + await expect(run(() => scope.spawn(() => context.expect()))).resolves + .toEqual("Hello World!"); }); it("propagates uncaught errors within a scope", async () => { let error = new Error("boom"); let result = run(function* () { let scope = yield* useScope(); - scope.run(function* () { + yield* scope.spawn(function* () { throw error; }); yield* suspend(); @@ -119,10 +135,22 @@ describe("Scope", () => { await expect(result).rejects.toBe(error); }); - it("throws an error if you try to run() with a dead scope", async () => { - let scope = await run(useScope); + it("destroys derived scopes when a scope is destroyed", async () => { + let [parent, destroy] = createScope(); + let [child] = createScope(parent); + + let halted = false; + + child.run(function* () { + try { + yield* suspend(); + } finally { + halted = true; + } + }); - expect(() => scope.run(function* () {})).toThrow("cannot call"); + await destroy(); + expect(halted).toEqual(true); }); }); diff --git a/test/signal.test.ts b/test/signal.test.ts index 110e90313..4c353f923 100644 --- a/test/signal.test.ts +++ b/test/signal.test.ts @@ -3,7 +3,6 @@ import { createQueue, createScope, createSignal, - each, run, SignalQueueFactory, spawn, @@ -14,10 +13,12 @@ describe("createSignal", () => { let msgs: string[] = []; let signal = createSignal(); let root = run(function* () { + let subscription = yield* signal; let task = yield* spawn(function* () { - for (let msg of yield* each(signal)) { - msgs.push(msg); - yield* each.next(); + let next = yield* subscription.next(); + while (!next.done) { + msgs.push(next.value); + next = yield* subscription.next(); } }); @@ -56,10 +57,12 @@ describe("createSignal", () => { let signal = createSignal(); let root = scope.run(function* () { + let subscription = yield* signal; let task = yield* spawn(function* () { - for (let msg of yield* each(signal)) { - msgs.push(msg); - yield* each.next(); + let next = yield* subscription.next(); + while (!next.done) { + msgs.push(next.value); + next = yield* subscription.next(); } }); diff --git a/test/spawn.test.ts b/test/spawn.test.ts index 2c5db170b..6191aba9b 100644 --- a/test/spawn.test.ts +++ b/test/spawn.test.ts @@ -1,16 +1,14 @@ import { describe, expect, it } from "./suite.ts"; -import { action, run, sleep, spawn, suspend } from "../mod.ts"; +import { run, sleep, spawn, suspend } from "../mod.ts"; describe("spawn", () => { it("can spawn a new child task", async () => { - let root = run(function* () { - let child = yield* spawn(function* () { + let root = run(function* root() { + let child = yield* spawn(function* child() { let one = yield* Promise.resolve(12); let two = yield* Promise.resolve(55); - return one + two; }); - return yield* child; }); await expect(root).resolves.toEqual(67); @@ -18,8 +16,8 @@ describe("spawn", () => { it("halts child when halted", async () => { let child; - let root = run(function* () { - child = yield* spawn(function* () { + let root = run(function* root() { + child = yield* spawn(function* child() { yield* suspend(); }); @@ -33,7 +31,7 @@ describe("spawn", () => { it("halts child when finishing normally", async () => { let child; - let result = run(function* () { + let result = run(function* parent() { child = yield* spawn(function* () { yield* suspend(); }); @@ -64,7 +62,6 @@ describe("spawn", () => { let error = new Error("moo"); let root = run(function* () { child = yield* spawn(function* () { - yield* sleep(1); throw error; }); @@ -90,20 +87,20 @@ describe("spawn", () => { it("rejects when child errors during completing", async () => { let child; - let root = run(function* () { - child = yield* spawn(function* () { + let root = run(function* root() { + child = yield* spawn(function* child() { try { yield* suspend(); } finally { - // deno-lint-ignore no-unsafe-finally throw new Error("moo"); } }); + yield* sleep(0); return "foo"; }); + await expect(child).rejects.toMatchObject({ message: "moo" }); await expect(root).rejects.toHaveProperty("message", "moo"); - await expect(child).rejects.toHaveProperty("message", "moo"); }); it("rejects when child errors during halting", async () => { @@ -113,7 +110,6 @@ describe("spawn", () => { try { yield* suspend(); } finally { - // deno-lint-ignore no-unsafe-finally throw new Error("moo"); } }); @@ -167,6 +163,7 @@ describe("spawn", () => { result.push("second done"); } }); + yield* sleep(0); }); expect(result).toEqual([ @@ -177,29 +174,11 @@ describe("spawn", () => { ]); }); - it("can catch an error spawned inside of an action", async () => { - let error = new Error("boom!"); - let value = await run(function* () { - try { - yield* action(function* TheAction() { - yield* spawn(function* TheBomb() { - yield* sleep(1); - throw error; - }); - yield* sleep(5000); - }); - } catch (err) { - return err; - } - }); - expect(value).toBe(error); - }); - it("halts children on explicit halt", async () => { let child; let root = run(function* () { child = yield* spawn(function* () { - yield* sleep(20); + yield* sleep(2); return "foo"; }); @@ -210,4 +189,19 @@ describe("spawn", () => { await expect(child).rejects.toHaveProperty("message", "halted"); }); + + it("raises an uncatchable error if a spawned child fails", async () => { + let task = run(function* () { + yield* spawn(function* () { + yield* sleep(5); + throw new Error("moo"); + }); + try { + yield* sleep(10); + } catch (error) { + return error; + } + }); + await expect(task).rejects.toHaveProperty("message", "moo"); + }); }); diff --git a/test/suite.ts b/test/suite.ts index 2e9f292c6..6ecd8913f 100644 --- a/test/suite.ts +++ b/test/suite.ts @@ -1,31 +1,15 @@ -export * from "https://deno.land/std@0.163.0/testing/bdd.ts"; -export { expect, mock } from "https://deno.land/x/expect@v0.3.0/mod.ts"; -export { expectType } from "https://esm.sh/ts-expect@1.3.0?pin=v123"; +import { action, call, sleep, resource, spawn } from "../mod.ts"; -import { - action, - call, - type Operation, - resource, - sleep, - spawn, -} from "../mod.ts"; +import { Operation } from "../lib/types.ts"; -declare global { - interface Promise extends Operation {} -} - -Object.defineProperty(Promise.prototype, Symbol.iterator, { - get(this: Promise) { - return expect(this)[Symbol.iterator]; - }, -}); - -function expect(promise: Promise): Operation { - return action(function* (resolve, reject) { - promise.then(resolve, reject); - }); -} +export { + afterEach, + beforeEach, + describe, + it, +} from "https://deno.land/std@0.223.0/testing/bdd.ts"; +export { expect } from "jsr:@std/expect"; +export { expectType } from "npm:ts-expect@1.3.0"; export function* createNumber(value: number): Operation { yield* sleep(1); @@ -37,6 +21,21 @@ export function* blowUp(): Operation { throw new Error("boom"); } +declare global { + interface Promise extends Operation {} +} + +Object.defineProperty(Promise.prototype, Symbol.iterator, { + get(this: Promise) { + let then = this.then.bind(this); + let suspense = action(function wait(resolve, reject) { + then(resolve, reject); + return () => {}; + }); + return suspense[Symbol.iterator]; + }, +}); + export function* asyncResolve( duration: number, value: string, @@ -53,14 +52,6 @@ export function* asyncReject( throw new Error(`boom: ${value}`); } -export function* syncResolve(value: string): Operation { - return value; -} - -export function* syncReject(value: string): Operation { - throw new Error(`boom: ${value}`); -} - export function asyncResource( duration: number, value: string, @@ -76,6 +67,14 @@ export function asyncResource( }); } +export function* syncResolve(value: string): Operation { + return value; +} + +export function* syncReject(value: string): Operation { + throw new Error(`boom: ${value}`); +} + export function useCommand( cmd: string, options?: Deno.CommandOptions, @@ -88,7 +87,7 @@ export function useCommand( } finally { try { process.kill("SIGINT"); - yield* call(process.status); + yield* call(() => process.status); } catch (error) { // if the process already quit, then this error is expected. // unfortunately there is no way (I know of) to check this