-
Notifications
You must be signed in to change notification settings - Fork 8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix #14: add windowPut--i.e. put with strategy based on num pending puts #17
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,8 @@ module Effect.AVar | |
, take | ||
, tryTake | ||
, put | ||
, Operation(..) | ||
, windowPut | ||
, tryPut | ||
, read | ||
, tryRead | ||
|
@@ -19,6 +21,7 @@ module Effect.AVar | |
|
||
import Prelude | ||
import Data.Either (Either(..)) | ||
import Data.Foldable (traverse_) | ||
import Data.Function.Uncurried as Fn | ||
import Data.Maybe (Maybe(..)) | ||
import Effect (Effect) | ||
|
@@ -33,6 +36,17 @@ data AVarStatus a | |
| Filled a | ||
| Empty | ||
|
||
data Operation | ||
= Ignore -- Do nothing with the queue | ||
| Fail Error -- Propagate an exception to the callback | ||
| Halt Error -- Kill the internal queue and propagate the exception | ||
| PushHead -- Push callback onto the head | ||
| PushTail -- Push callback onto the tail | ||
| DropHead Error -- Drop the head, and push onto the tail | ||
| DropTail Error -- Drop the tail, and push onto the head | ||
| SwapHead Error -- Replace the head | ||
| SwapTail Error -- Replace the tail | ||
|
||
-- | Creates a new empty AVar. | ||
foreign import empty ∷ ∀ a. Effect (AVar a) | ||
|
||
|
@@ -50,7 +64,59 @@ kill err avar = Fn.runFn3 _killVar ffiUtil err avar | |
-- | the AVar becomes available. Returns an effect which will remove the | ||
-- | callback from the pending queue. | ||
put ∷ ∀ a. a → AVar a → AVarCallback Unit → Effect (Effect Unit) | ||
put value avar cb = Fn.runFn4 _putVar ffiUtil value avar cb | ||
put = windowPut (const PushTail) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this is by far the most common, maybe it makes sense to avoid the dispatch overhead and call snocVar directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would be less concerned about this if we didn't have an ADT. |
||
|
||
-- | Puts a value into an AVar using a strategy determined | ||
-- | dynamically on the basis of the number of pending puts | ||
-- | (i.e. not including the current value of the AVar, if non-EMPTY). | ||
windowPut | ||
:: ∀ a | ||
. (Int -> Operation) | ||
-> a | ||
-> AVar a | ||
-> AVarCallback Unit | ||
-> Effect (Effect Unit) | ||
windowPut strategy value avar cb = | ||
case strategy (_pendingPuts avar) of | ||
-- Do nothing with the queue | ||
Ignore -> pure $ pure unit | ||
-- Propagate an exception to the callback | ||
Fail e -> do | ||
cb $ Left e | ||
pure $ pure unit | ||
-- Kill the internal queue and propagate the exception | ||
Halt e -> do | ||
kill e avar | ||
cb $ Left e | ||
pure $ pure unit | ||
-- Push callback onto the head | ||
PushHead -> Fn.runFn4 _consVar ffiUtil value avar cb | ||
-- Push callback onto the tail | ||
PushTail -> Fn.runFn4 _snocVar ffiUtil value avar cb | ||
-- Drop the head, and push onto the tail | ||
DropHead e -> do | ||
mPut <- Fn.runFn2 _tailPuts ffiUtil avar | ||
canceller <- Fn.runFn4 _snocVar ffiUtil value avar cb | ||
canceller <$ do | ||
traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) | ||
-- Drop the tail, and push onto the head | ||
DropTail e -> do | ||
mPut <- Fn.runFn2 _initPuts ffiUtil avar | ||
canceller <- Fn.runFn4 _consVar ffiUtil value avar cb | ||
canceller <$ do | ||
traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) | ||
-- Replace the head | ||
SwapHead e -> do | ||
mPut <- Fn.runFn2 _tailPuts ffiUtil avar | ||
canceller <- Fn.runFn4 _consVar ffiUtil value avar cb | ||
canceller <$ do | ||
traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) | ||
-- Replace the tail | ||
SwapTail e -> do | ||
mPut <- Fn.runFn2 _initPuts ffiUtil avar | ||
canceller <- Fn.runFn4 _snocVar ffiUtil value avar cb | ||
canceller <$ do | ||
traverse_ <@> mPut $ \{cb: cb'} -> cb' (Left e) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we didn't have an ADT, then I think these could easily be implemented in the FFI and avoid the effect thunk and I'm not a big fan of the for_ mPut \{ cb: cb'} -> cb' (Left e)
pure canceller This will optimize better. |
||
|
||
-- | Attempts to synchronously fill an AVar. If the AVar is already filled, | ||
-- | this will do nothing. Returns true or false depending on if it succeeded. | ||
|
@@ -101,13 +167,17 @@ isKilled = case _ of | |
|
||
foreign import _newVar ∷ ∀ a. a → Effect (AVar a) | ||
foreign import _killVar ∷ ∀ a. Fn.Fn3 FFIUtil Error (AVar a) (Effect Unit) | ||
foreign import _putVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) | ||
foreign import _snocVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) | ||
foreign import _consVar ∷ ∀ a. Fn.Fn4 FFIUtil a (AVar a) (AVarCallback Unit) (Effect (Effect Unit)) | ||
foreign import _initPuts ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe {cb :: AVarCallback a, value :: a})) | ||
foreign import _tailPuts ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe {cb :: AVarCallback a, value :: a})) | ||
foreign import _tryPutVar ∷ ∀ a. Fn.Fn3 FFIUtil a (AVar a) (Effect Boolean) | ||
foreign import _takeVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit)) | ||
foreign import _tryTakeVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a)) | ||
foreign import _readVar ∷ ∀ a. Fn.Fn3 FFIUtil (AVar a) (AVarCallback a) (Effect (Effect Unit)) | ||
foreign import _tryReadVar ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (Maybe a)) | ||
foreign import _status ∷ ∀ a. Fn.Fn2 FFIUtil (AVar a) (Effect (AVarStatus a)) | ||
foreign import _pendingPuts :: ∀ a. AVar a -> Int | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't a pure operation since an avar size is mutable. This can potentially cause issues depending on where and how the actual effects are interleaved. |
||
|
||
type FFIUtil = | ||
{ left ∷ ∀ a b. a → Either a b | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,7 +4,7 @@ import Prelude | |
import Effect (Effect) | ||
import Effect.AVar as AVar | ||
import Effect.Console (log) | ||
import Effect.Exception (error, message) | ||
import Effect.Exception (Error, error, message) | ||
import Effect.Ref as Ref | ||
import Data.Either (Either(..)) | ||
import Data.Foldable (traverse_) | ||
|
@@ -189,6 +189,126 @@ test_cancel = test "cancel" do | |
_ ← AVar.tryPut "a" v3 | ||
eq "cdfg" <$> Ref.read ref | ||
|
||
putMax | ||
:: ∀ a | ||
. Int | ||
-> Error | ||
-> a | ||
-> AVar.AVar a | ||
-> AVar.AVarCallback Unit | ||
-> Effect (Effect Unit) | ||
putMax max err = AVar.windowPut go | ||
where | ||
go n | ||
| n < max = AVar.PushTail | ||
| otherwise = AVar.Fail err | ||
|
||
test_max1_put_take ∷ Effect Unit | ||
test_max1_put_take = test "max1: put/take" do | ||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "max puts exceeded" | ||
_ ← putMax 1 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "barfoo" <$> Ref.read ref | ||
|
||
test_max2_put_take ∷ Effect Unit | ||
test_max2_put_take = test "max2: put/take" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the motivation for this test that's different from |
||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "max puts exceeded" | ||
_ ← putMax 2 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "barfoo" <$> Ref.read ref | ||
|
||
test_max1_put_put_put_take ∷ Effect Unit | ||
test_max1_put_put_put_take = test "max1: put/put/put/take" do | ||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "max puts exceeded" | ||
_ ← putMax 1 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← putMax 1 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← putMax 1 err "foo" var $ | ||
case _ of | ||
Left _ -> void $ Ref.modify (_ <> "fail") ref | ||
otherwise -> void $ Ref.modify (_ <> "bar") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "barfailfoobar" <$> Ref.read ref | ||
|
||
putSliding | ||
:: ∀ a | ||
. Int | ||
-> Error | ||
-> a | ||
-> AVar.AVar a | ||
-> AVar.AVarCallback Unit | ||
-> Effect (Effect Unit) | ||
putSliding max err = AVar.windowPut go | ||
where | ||
go n | ||
| n < max = AVar.PushTail | ||
| otherwise = AVar.DropHead err | ||
|
||
test_window1_put_take ∷ Effect Unit | ||
test_window1_put_take = test "win1: put/take" do | ||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "sliding window exceeded" | ||
_ ← putSliding 1 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "barfoo" <$> Ref.read ref | ||
|
||
test_window2_put_take ∷ Effect Unit | ||
test_window2_put_take = test "win2: put/take" do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise with max2, it's not clear to me what this is testing vs the first one. |
||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "sliding window exceeded" | ||
_ ← putSliding 2 err "foo" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "barfoo" <$> Ref.read ref | ||
|
||
test_window1_put_put_put_take_take ∷ Effect Unit | ||
test_window1_put_put_put_take_take = test "win1: put/put/put/take/take" do | ||
ref ← Ref.new "" | ||
var ← AVar.empty | ||
let | ||
err = | ||
error "sliding window exceeded" | ||
_ ← putSliding 1 err "foo1" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar1") ref | ||
_ ← putSliding 1 err "foo2" var $ | ||
case _ of | ||
Left _ -> void $ Ref.modify (_ <> "fail") ref | ||
otherwise -> void $ Ref.modify (_ <> "bar2") ref | ||
_ ← putSliding 1 err "foo3" var $ traverse_ \_ → | ||
void $ Ref.modify (_ <> "bar3") ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
_ ← AVar.take var $ traverse_ \val → | ||
void $ Ref.modify (_ <> val) ref | ||
eq "bar1failfoo1bar3foo3" <$> Ref.read ref | ||
|
||
main ∷ Effect Unit | ||
main = do | ||
test_tryRead_full | ||
|
@@ -206,3 +326,9 @@ main = do | |
test_kill_empty | ||
test_kill_pending | ||
test_cancel | ||
test_max1_put_take | ||
test_max2_put_take | ||
test_max1_put_put_put_take | ||
test_window1_put_take | ||
test_window2_put_take | ||
test_window1_put_put_put_take_take | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In general, it seems like we should test all the strategies on some level. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is having an actual data type necessary? The upside is that you can pattern match and change the operations later (I don't have a use case for this). The downside is that there is a linear dispatching overhead in the
windowPut
interpreter. Can you think of a use case that takes advantage of a primitive ADT rather than just bundling the logic up in a closure for each strategy?