Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #14: add windowPut--i.e. put with strategy based on num pending puts #17

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion src/Effect/AVar.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ var AVar = function () {
return cell;
}

function putHead (queue, value) {
var cell = new MutableCell(queue, value);
switch (queue.size) {
case 0:
queue.head = cell;
break;
case 1:
queue.last = queue.head;
/* fallthrough */
default:
cell.next = queue.head;
queue.head.prev = cell;
queue.head = cell;
}
queue.size++;
return cell;
}

function takeLast (queue) {
var cell;
switch (queue.size) {
Expand Down Expand Up @@ -203,6 +221,7 @@ var AVar = function () {

AVar.EMPTY = EMPTY;
AVar.putLast = putLast;
AVar.putHead = putHead;
AVar.takeLast = takeLast;
AVar.takeHead = takeHead;
AVar.deleteCell = deleteCell;
Expand All @@ -221,6 +240,10 @@ exports._newVar = function (value) {
};
};

exports._pendingPuts = function (avar) {
return avar.puts.size;
};

exports._killVar = function (util, error, avar) {
return function () {
if (avar.error === null) {
Expand All @@ -231,7 +254,31 @@ exports._killVar = function (util, error, avar) {
};
};

exports._putVar = function (util, value, avar, cb) {
exports._initPuts = function (util, error, avar) {
return function () {
var p = AVar.takeLast(avar.puts);
if (p !== null) {
p.cb(util.left(error))();
return util.just(p.value);
} else {
return util.nothing;
}
};
};

exports._tailPuts = function (util, error, avar) {
return function () {
var p = AVar.takeHead(avar.puts);
if (p !== null) {
p.cb(util.left(error))();
return util.just(p.value);
} else {
return util.nothing;
}
};
};

exports._snocVar = function (util, value, avar, cb) {
return function () {
var cell = AVar.putLast(avar.puts, { cb: cb, value: value });
AVar.drainVar(util, avar);
Expand All @@ -241,6 +288,16 @@ exports._putVar = function (util, value, avar, cb) {
};
};

exports._consVar = function (util, value, avar, cb) {
return function () {
var cell = AVar.putHead(avar.puts, { cb: cb, value: value });
AVar.drainVar(util, avar);
return function () {
AVar.deleteCell(cell);
};
};
};

exports._takeVar = function (util, avar, cb) {
return function () {
var cell = AVar.putLast(avar.takes, cb);
Expand Down
65 changes: 63 additions & 2 deletions src/Effect/AVar.purs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ module Effect.AVar
, take
, tryTake
, put
, Operation(..)
, windowPut
, tryPut
, read
, tryRead
Expand All @@ -33,6 +35,17 @@ data AVarStatus a
| Filled a
| Empty

data Operation
= Ignore -- Do nothing with the queue
| Fail Error -- Propagate an exception to the callback
| Halt Error -- Kill the internal queue and propagate the exception
| PushHead -- Push callback onto the head
| PushTail -- Push callback onto the tail
| DropHead Error -- Drop the head, and push onto the tail
| DropTail Error -- Drop the tail, and push onto the head
| SwapHead Error -- Replace the head
| SwapTail Error -- Replace the tail
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is having an actual data type necessary? The upside is that you can pattern match and change the operations later (I don't have a use case for this). The downside is that there is a linear dispatching overhead in the windowPut interpreter. Can you think of a use case that takes advantage of a primitive ADT rather than just bundling the logic up in a closure for each strategy?


-- | Creates a new empty AVar.
foreign import empty ∷ ∀ a. Effect (AVar a)

Expand All @@ -50,7 +63,51 @@ kill err avar = Fn.runFn3 _killVar ffiUtil err avar
-- | the AVar becomes available. Returns an effect which will remove the
-- | callback from the pending queue.
put ∷ ∀ a. a → AVar a → AVarCallback Unit → Effect (Effect Unit)
put value avar cb = Fn.runFn4 _putVar ffiUtil value avar cb
put = windowPut (const PushTail)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is by far the most common, maybe it makes sense to avoid the dispatch overhead and call snocVar directly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be less concerned about this if we didn't have an ADT.


-- | Puts a value into an AVar using a strategy determined
-- | dynamically on the basis of the number of pending puts
-- | (i.e. not including the current value of the AVar, if non-EMPTY).
windowPut
:: ∀ a
. (Int -> Operation)
-> a
-> AVar a
-> AVarCallback Unit
-> Effect (Effect Unit)
windowPut strategy value avar cb =
case strategy (_pendingPuts avar) of
-- Do nothing with the queue
Ignore -> pure $ pure unit
-- Propagate an exception to the callback
Fail e -> do
cb $ Left e
pure $ pure unit
-- Kill the internal queue and propagate the exception
Halt e -> do
kill e avar
cb $ Left e
pure $ pure unit
-- Push callback onto the head
PushHead -> Fn.runFn4 _consVar ffiUtil value avar cb
-- Push callback onto the tail
PushTail -> Fn.runFn4 _snocVar ffiUtil value avar cb
-- Drop the head, and push onto the tail
DropHead e -> do
void $ Fn.runFn3 _tailPuts ffiUtil e avar
Fn.runFn4 _snocVar ffiUtil value avar cb
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these operations be atomic? Right now tailPuts/initPuts invokes a callback, which may end up doing something that affects the internal AVar state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, perhaps there is a need to create atomic versions of these combinations where the callbacks corresponding to dropped entries are invoked after the avar has been updated. I'll submit an update.

It is still possible for the callbacks to invalidate the property being enforced by the strategy. Strategies can still be coded to recover from such deviations.

-- Drop the tail, and push onto the head
DropTail e -> do
void $ Fn.runFn3 _initPuts ffiUtil e avar
Fn.runFn4 _consVar ffiUtil value avar cb
-- Replace the head
SwapHead e -> do
void $ Fn.runFn3 _tailPuts ffiUtil e avar
Fn.runFn4 _consVar ffiUtil value avar cb
-- Replace the tail
SwapTail e -> do
void $ Fn.runFn3 _initPuts ffiUtil e avar
Fn.runFn4 _snocVar ffiUtil value avar cb

-- | Attempts to synchronously fill an AVar. If the AVar is already filled,
-- | this will do nothing. Returns true or false depending on if it succeeded.
Expand Down Expand Up @@ -101,13 +158,17 @@ isKilled = case _ of

foreign import _newVar ∷ ∀ a. a → Effect (AVar a)
foreign import _killVar ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect Unit)
foreign import _putVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
foreign import _snocVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
foreign import _consVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit))
foreign import _initPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a))
foreign import _tailPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a))
foreign import _tryPutVar ∷ ∀ a. Fn.Fn3 FFIUtil a (AVar a) (Effect Boolean)
foreign import _takeVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit))
foreign import _tryTakeVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a))
foreign import _readVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit))
foreign import _tryReadVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a))
foreign import _status ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (AVarStatus a))
foreign import _pendingPuts :: ∀ a. AVar a -> Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a pure operation since an avar size is mutable. This can potentially cause issues depending on where and how the actual effects are interleaved.


type FFIUtil =
{ left ∷ ∀ a b. a → Either a b
Expand Down
128 changes: 127 additions & 1 deletion test/Main.purs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import Prelude
import Effect (Effect)
import Effect.AVar as AVar
import Effect.Console (log)
import Effect.Exception (error, message)
import Effect.Exception (Error, error, message)
import Effect.Ref as Ref
import Data.Either (Either(..))
import Data.Foldable (traverse_)
Expand Down Expand Up @@ -189,6 +189,126 @@ test_cancel = test "cancel" do
_ ← AVar.tryPut "a" v3
eq "cdfg" <$> Ref.read ref

putMax
:: ∀ a
. Int
-> Error
-> a
-> AVar.AVar a
-> AVar.AVarCallback Unit
-> Effect (Effect Unit)
putMax max err = AVar.windowPut go
where
go n
| n < max = AVar.PushTail
| otherwise = AVar.Fail err

test_max1_put_take ∷ Effect Unit
test_max1_put_take = test "max1: put/take" do
ref ← Ref.new ""
var ← AVar.empty
let
err =
error "max puts exceeded"
_ ← putMax 1 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "barfoo" <$> Ref.read ref

test_max2_put_take ∷ Effect Unit
test_max2_put_take = test "max2: put/take" do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this test that's different from max1?

ref ← Ref.new ""
var ← AVar.empty
let
err =
error "max puts exceeded"
_ ← putMax 2 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "barfoo" <$> Ref.read ref

test_max1_put_put_put_take ∷ Effect Unit
test_max1_put_put_put_take = test "max1: put/put/put/take" do
ref ← Ref.new ""
var ← AVar.empty
let
err =
error "max puts exceeded"
_ ← putMax 1 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← putMax 1 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← putMax 1 err "foo" var $
case _ of
Left _ -> void $ Ref.modify (_ <> "fail") ref
otherwise -> void $ Ref.modify (_ <> "bar") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "barfailfoobar" <$> Ref.read ref

putSliding
:: ∀ a
. Int
-> Error
-> a
-> AVar.AVar a
-> AVar.AVarCallback Unit
-> Effect (Effect Unit)
putSliding max err = AVar.windowPut go
where
go n
| n < max = AVar.PushTail
| otherwise = AVar.DropHead err

test_window1_put_take ∷ Effect Unit
test_window1_put_take = test "win1: put/take" do
ref ← Ref.new ""
var ← AVar.empty
let
err =
error "sliding window exceeded"
_ ← putSliding 1 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "barfoo" <$> Ref.read ref

test_window2_put_take ∷ Effect Unit
test_window2_put_take = test "win2: put/take" do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise with max2, it's not clear to me what this is testing vs the first one.

ref ← Ref.new ""
var ← AVar.empty
let
err =
error "sliding window exceeded"
_ ← putSliding 2 err "foo" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "barfoo" <$> Ref.read ref

test_window1_put_put_put_take_take ∷ Effect Unit
test_window1_put_put_put_take_take = test "win1: put/put/put/take/take" do
ref ← Ref.new ""
var ← AVar.empty
let
err =
error "sliding window exceeded"
_ ← putSliding 1 err "foo1" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar1") ref
_ ← putSliding 1 err "foo2" var $
case _ of
Left _ -> void $ Ref.modify (_ <> "fail") ref
otherwise -> void $ Ref.modify (_ <> "bar2") ref
_ ← putSliding 1 err "foo3" var $ traverse_ \_ →
void $ Ref.modify (_ <> "bar3") ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
_ ← AVar.take var $ traverse_ \val →
void $ Ref.modify (_ <> val) ref
eq "bar1failfoo1bar3foo3" <$> Ref.read ref

main ∷ Effect Unit
main = do
test_tryRead_full
Expand All @@ -206,3 +326,9 @@ main = do
test_kill_empty
test_kill_pending
test_cancel
test_max1_put_take
test_max2_put_take
test_max1_put_put_put_take
test_window1_put_take
test_window2_put_take
test_window1_put_put_put_take_take
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, it seems like we should test all the strategies on some level.