From cb1bc9b5856d067179a14cd61ed44986aa3cd686 Mon Sep 17 00:00:00 2001 From: Rohit Grover Date: Sun, 26 May 2019 00:02:19 +1200 Subject: [PATCH 1/3] fix #14: add windowPut--i.e. put with strategy based on num pending puts --- src/Effect/AVar.js | 58 +++++++++++++++++++- src/Effect/AVar.purs | 65 +++++++++++++++++++++- test/Main.purs | 128 ++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 247 insertions(+), 4 deletions(-) diff --git a/src/Effect/AVar.js b/src/Effect/AVar.js index f0a2c95..c39d8ce 100644 --- a/src/Effect/AVar.js +++ b/src/Effect/AVar.js @@ -57,6 +57,24 @@ var AVar = function () { return cell; } + function putHead (queue, value) { + var cell = new MutableCell(queue, value); + switch (queue.size) { + case 0: + queue.head = cell; + break; + case 1: + queue.last = queue.head; + /* fallthrough */ + default: + cell.next = queue.head; + queue.head.prev = cell; + queue.head = cell; + } + queue.size++; + return cell; + } + function takeLast (queue) { var cell; switch (queue.size) { @@ -221,6 +239,10 @@ exports._newVar = function (value) { }; }; +exports._pendingPuts = function (avar) { + return avar.puts.size; +}; + exports._killVar = function (util, error, avar) { return function () { if (avar.error === null) { @@ -231,7 +253,31 @@ exports._killVar = function (util, error, avar) { }; }; -exports._putVar = function (util, value, avar, cb) { +exports._initPuts = function (util, error, avar) { + return function () { + var p = AVar.takeLast(avar.puts); + if (p !== null) { + p.cb(util.left(error))(); + return util.just(p.value); + } else { + return util.nothing; + } + }; +}; + +exports._tailPuts = function (util, error, avar) { + return function () { + var p = AVar.takeHead(avar.puts); + if (p !== null) { + p.cb(util.left(error))(); + return util.just(p.value); + } else { + return util.nothing; + } + }; +}; + +exports._snocVar = function (util, value, avar, cb) { return function () { var cell = AVar.putLast(avar.puts, { cb: cb, value: value }); AVar.drainVar(util, avar); @@ -241,6 +287,16 @@ exports._putVar = function (util, value, avar, cb) { }; }; +exports._consVar = function (util, value, avar, cb) { + return function () { + var cell = AVar.putHead(avar.puts, { cb: cb, value: value }); + AVar.drainVar(util, avar); + return function () { + AVar.deleteCell(cell); + }; + }; +}; + exports._takeVar = function (util, avar, cb) { return function () { var cell = AVar.putLast(avar.takes, cb); diff --git a/src/Effect/AVar.purs b/src/Effect/AVar.purs index 6084110..da088c2 100644 --- a/src/Effect/AVar.purs +++ b/src/Effect/AVar.purs @@ -7,6 +7,8 @@ module Effect.AVar , take , tryTake , put + , Operation(..) + , windowPut , tryPut , read , tryRead @@ -33,6 +35,17 @@ data AVarStatus a | Filled a | Empty +data Operation + = Ignore -- Do nothing with the queue + | Fail Error -- Propagate an exception to the callback + | Halt Error -- Kill the internal queue and propagate the exception + | PushHead -- Push callback onto the head + | PushTail -- Push callback onto the tail + | DropHead Error -- Drop the head, and push onto the tail + | DropTail Error -- Drop the tail, and push onto the head + | SwapHead Error -- Replace the head + | SwapTail Error -- Replace the tail + -- | Creates a new empty AVar. foreign import empty ∷ ∀ a. Effect (AVar a) @@ -50,7 +63,51 @@ kill err avar = Fn.runFn3 _killVar ffiUtil err avar -- | the AVar becomes available. Returns an effect which will remove the -- | callback from the pending queue. put ∷ ∀ a. a → AVar a → AVarCallback Unit → Effect (Effect Unit) -put value avar cb = Fn.runFn4 _putVar ffiUtil value avar cb +put = windowPut (const PushTail) + +-- | Puts a value into an AVar using a strategy determined +-- | dynamically on the basis of the number of pending puts +-- | (i.e. not including the current value of the AVar, if non-EMPTY). +windowPut + :: ∀ a + . (Int -> Operation) + -> a + -> AVar a + -> AVarCallback Unit + -> Effect (Effect Unit) +windowPut strategy value avar cb = + case strategy (_pendingPuts avar) of + -- Do nothing with the queue + Ignore -> pure $ pure unit + -- Propagate an exception to the callback + Fail e -> do + cb $ Left e + pure $ pure unit + -- Kill the internal queue and propagate the exception + Halt e -> do + kill e avar + cb $ Left e + pure $ pure unit + -- Push callback onto the head + PushHead -> Fn.runFn4 _consVar ffiUtil value avar cb + -- Push callback onto the tail + PushTail -> Fn.runFn4 _snocVar ffiUtil value avar cb + -- Drop the head, and push onto the tail + DropHead e -> do + void $ Fn.runFn3 _tailPuts ffiUtil e avar + Fn.runFn4 _snocVar ffiUtil value avar cb + -- Drop the tail, and push onto the head + DropTail e -> do + void $ Fn.runFn3 _initPuts ffiUtil e avar + Fn.runFn4 _consVar ffiUtil value avar cb + -- Replace the head + SwapHead e -> do + void $ Fn.runFn3 _tailPuts ffiUtil e avar + Fn.runFn4 _consVar ffiUtil value avar cb + -- Replace the tail + SwapTail e -> do + void $ Fn.runFn3 _initPuts ffiUtil e avar + Fn.runFn4 _snocVar ffiUtil value avar cb -- | Attempts to synchronously fill an AVar. If the AVar is already filled, -- | this will do nothing. Returns true or false depending on if it succeeded. @@ -101,13 +158,17 @@ isKilled = case _ of foreign import _newVar ∷ ∀ a. a → Effect (AVar a) foreign import _killVar ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect Unit) -foreign import _putVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) +foreign import _snocVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) +foreign import _consVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) +foreign import _initPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a)) +foreign import _tailPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a)) foreign import _tryPutVar ∷ ∀ a. Fn.Fn3 FFIUtil a (AVar a) (Effect Boolean) foreign import _takeVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit)) foreign import _tryTakeVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a)) foreign import _readVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit)) foreign import _tryReadVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a)) foreign import _status ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (AVarStatus a)) +foreign import _pendingPuts :: ∀ a. AVar a -> Int type FFIUtil = { left ∷ ∀ a b. a → Either a b diff --git a/test/Main.purs b/test/Main.purs index 32d1e05..ef7ab1c 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -4,7 +4,7 @@ import Prelude import Effect (Effect) import Effect.AVar as AVar import Effect.Console (log) -import Effect.Exception (error, message) +import Effect.Exception (Error, error, message) import Effect.Ref as Ref import Data.Either (Either(..)) import Data.Foldable (traverse_) @@ -189,6 +189,126 @@ test_cancel = test "cancel" do _ ← AVar.tryPut "a" v3 eq "cdfg" <$> Ref.read ref +putMax + :: ∀ a + . Int + -> Error + -> a + -> AVar.AVar a + -> AVar.AVarCallback Unit + -> Effect (Effect Unit) +putMax max err = AVar.windowPut go + where + go n + | n < max = AVar.PushTail + | otherwise = AVar.Fail err + +test_max1_put_take ∷ Effect Unit +test_max1_put_take = test "max1: put/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "max puts exceeded" + _ ← putMax 1 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "barfoo" <$> Ref.read ref + +test_max2_put_take ∷ Effect Unit +test_max2_put_take = test "max2: put/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "max puts exceeded" + _ ← putMax 2 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "barfoo" <$> Ref.read ref + +test_max1_put_put_put_take ∷ Effect Unit +test_max1_put_put_put_take = test "max1: put/put/put/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "max puts exceeded" + _ ← putMax 1 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← putMax 1 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← putMax 1 err "foo" var $ + case _ of + Left _ -> void $ Ref.modify (_ <> "fail") ref + otherwise -> void $ Ref.modify (_ <> "bar") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "barfailfoobar" <$> Ref.read ref + +putSliding + :: ∀ a + . Int + -> Error + -> a + -> AVar.AVar a + -> AVar.AVarCallback Unit + -> Effect (Effect Unit) +putSliding max err = AVar.windowPut go + where + go n + | n < max = AVar.PushTail + | otherwise = AVar.DropHead err + +test_window1_put_take ∷ Effect Unit +test_window1_put_take = test "win1: put/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "sliding window exceeded" + _ ← putSliding 1 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "barfoo" <$> Ref.read ref + +test_window2_put_take ∷ Effect Unit +test_window2_put_take = test "win2: put/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "sliding window exceeded" + _ ← putSliding 2 err "foo" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "barfoo" <$> Ref.read ref + +test_window1_put_put_put_take_take ∷ Effect Unit +test_window1_put_put_put_take_take = test "win1: put/put/put/take/take" do + ref ← Ref.new "" + var ← AVar.empty + let + err = + error "sliding window exceeded" + _ ← putSliding 1 err "foo1" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar1") ref + _ ← putSliding 1 err "foo2" var $ + case _ of + Left _ -> void $ Ref.modify (_ <> "fail") ref + otherwise -> void $ Ref.modify (_ <> "bar2") ref + _ ← putSliding 1 err "foo3" var $ traverse_ \_ → + void $ Ref.modify (_ <> "bar3") ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + _ ← AVar.take var $ traverse_ \val → + void $ Ref.modify (_ <> val) ref + eq "bar1failfoo1bar3foo3" <$> Ref.read ref + main ∷ Effect Unit main = do test_tryRead_full @@ -206,3 +326,9 @@ main = do test_kill_empty test_kill_pending test_cancel + test_max1_put_take + test_max2_put_take + test_max1_put_put_put_take + test_window1_put_take + test_window2_put_take + test_window1_put_put_put_take_take From 271d9848c6efb80637aa2c9808e4220b172e641f Mon Sep 17 00:00:00 2001 From: Rohit Grover Date: Sun, 26 May 2019 00:40:04 +1200 Subject: [PATCH 2/3] fix build error reported by CI --- src/Effect/AVar.js | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Effect/AVar.js b/src/Effect/AVar.js index c39d8ce..a286a4e 100644 --- a/src/Effect/AVar.js +++ b/src/Effect/AVar.js @@ -221,6 +221,7 @@ var AVar = function () { AVar.EMPTY = EMPTY; AVar.putLast = putLast; + AVar.putHead = putHead; AVar.takeLast = takeLast; AVar.takeHead = takeHead; AVar.deleteCell = deleteCell; From c2ea814c49acaeeb4f2a5d588b2d07f42f7d7ee8 Mon Sep 17 00:00:00 2001 From: Rohit Grover Date: Sat, 8 Jun 2019 10:13:55 +1200 Subject: [PATCH 3/3] windowPut: invoke callbacks arising from deletes after queue management --- src/Effect/AVar.js | 10 ++++------ src/Effect/AVar.purs | 29 +++++++++++++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/Effect/AVar.js b/src/Effect/AVar.js index a286a4e..d127316 100644 --- a/src/Effect/AVar.js +++ b/src/Effect/AVar.js @@ -254,24 +254,22 @@ exports._killVar = function (util, error, avar) { }; }; -exports._initPuts = function (util, error, avar) { +exports._initPuts = function (util, avar) { return function () { var p = AVar.takeLast(avar.puts); if (p !== null) { - p.cb(util.left(error))(); - return util.just(p.value); + return util.just(p); } else { return util.nothing; } }; }; -exports._tailPuts = function (util, error, avar) { +exports._tailPuts = function (util, avar) { return function () { var p = AVar.takeHead(avar.puts); if (p !== null) { - p.cb(util.left(error))(); - return util.just(p.value); + return util.just(p); } else { return util.nothing; } diff --git a/src/Effect/AVar.purs b/src/Effect/AVar.purs index da088c2..4f3eefa 100644 --- a/src/Effect/AVar.purs +++ b/src/Effect/AVar.purs @@ -21,6 +21,7 @@ module Effect.AVar import Prelude import Data.Either (Either(..)) +import Data.Foldable (traverse_) import Data.Function.Uncurried as Fn import Data.Maybe (Maybe(..)) import Effect (Effect) @@ -94,20 +95,28 @@ windowPut strategy value avar cb = PushTail -> Fn.runFn4 _snocVar ffiUtil value avar cb -- Drop the head, and push onto the tail DropHead e -> do - void $ Fn.runFn3 _tailPuts ffiUtil e avar - Fn.runFn4 _snocVar ffiUtil value avar cb + mPut <- Fn.runFn2 _tailPuts ffiUtil avar + canceller <- Fn.runFn4 _snocVar ffiUtil value avar cb + canceller <$ do + traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) -- Drop the tail, and push onto the head DropTail e -> do - void $ Fn.runFn3 _initPuts ffiUtil e avar - Fn.runFn4 _consVar ffiUtil value avar cb + mPut <- Fn.runFn2 _initPuts ffiUtil avar + canceller <- Fn.runFn4 _consVar ffiUtil value avar cb + canceller <$ do + traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) -- Replace the head SwapHead e -> do - void $ Fn.runFn3 _tailPuts ffiUtil e avar - Fn.runFn4 _consVar ffiUtil value avar cb + mPut <- Fn.runFn2 _tailPuts ffiUtil avar + canceller <- Fn.runFn4 _consVar ffiUtil value avar cb + canceller <$ do + traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) -- Replace the tail SwapTail e -> do - void $ Fn.runFn3 _initPuts ffiUtil e avar - Fn.runFn4 _snocVar ffiUtil value avar cb + mPut <- Fn.runFn2 _initPuts ffiUtil avar + canceller <- Fn.runFn4 _snocVar ffiUtil value avar cb + canceller <$ do + traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) -- | Attempts to synchronously fill an AVar. If the AVar is already filled, -- | this will do nothing. Returns true or false depending on if it succeeded. @@ -160,8 +169,8 @@ foreign import _newVar ∷ ∀ a. a → Effect (AVar a) foreign import _killVar ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect Unit) foreign import _snocVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) foreign import _consVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) -foreign import _initPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a)) -foreign import _tailPuts ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect (Maybe a)) +foreign import _initPuts ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe {cb :: AVarCallback a, value :: a})) +foreign import _tailPuts ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe {cb :: AVarCallback a, value :: a})) foreign import _tryPutVar ∷ ∀ a. Fn.Fn3 FFIUtil a (AVar a) (Effect Boolean) foreign import _takeVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit)) foreign import _tryTakeVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a))