Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[minor] Add pool.onQuiescent(cell) #93

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions core/src/main/scala/com/phaller/rasync/HandlerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ import lattice.{ DefaultKey, Key, Updater }
import org.opalj.graphs._

import scala.collection.immutable.Queue
import scala.util.{ Success, Try }

/* Need to have reference equality for CAS.
*
* quiescenceCellHandlers use NEXTCallbackRunnable, because (a) pool might reach quiescence
* repeatedly and (b) cell might not be completed, when quiescence is reached.
*/
private class PoolState(val handlers: List[() => Unit] = List(), val submittedTasks: Int = 0) {
private class PoolState(
val quiescenceHandlers: List[() => Unit] = List(),
val quiescenceCellHandlers: List[NextCallbackRunnable[_, _]] = List(),
val submittedTasks: Int = 0) {
def isQuiescent(): Boolean =
submittedTasks == 0
}
Expand Down Expand Up @@ -63,13 +70,27 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
if (state.isQuiescent) {
execute(new Runnable { def run(): Unit = handler() })
} else {
val newState = new PoolState(handler :: state.handlers, state.submittedTasks)
val newState = new PoolState(handler :: state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks)
val success = poolState.compareAndSet(state, newState)
if (!success)
onQuiescent(handler)
}
}

@tailrec
final def onQuiescent[K <: Key[V], V](cell: Cell[K, V])(handler: Try[V] => Unit): Unit = {
val state = poolState.get()
if (state.isQuiescent) {
execute(new Runnable { def run(): Unit = handler(Success(cell.getResult())) })
} else {
val runnable = new NextConcurrentCallbackRunnable(this, null, cell, handler)
val newState = new PoolState(state.quiescenceHandlers, runnable :: state.quiescenceCellHandlers, state.submittedTasks)
val success = poolState.compareAndSet(state, newState)
if (!success)
onQuiescent(cell)(handler)
}
}

/**
* Register a cell with this HandlerPool.
*
Expand Down Expand Up @@ -265,7 +286,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
var submitSuccess = false
while (!submitSuccess) {
val state = poolState.get()
val newState = new PoolState(state.handlers, state.submittedTasks + 1)
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks + 1)
submitSuccess = poolState.compareAndSet(state, newState)
}
}
Expand All @@ -276,28 +297,34 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
*/
private def decSubmittedTasks(i: Int = 1): Unit = {
var success = false
var handlersToRun: Option[List[() => Unit]] = None
var quiescenceHandlersToRun: Option[List[() => Unit]] = None
var quiescenceCellHandlersToRun: Option[List[NextCallbackRunnable[_, _]]] = None
while (!success) {
val state = poolState.get()
if (state.submittedTasks > i) {
handlersToRun = None
val newState = new PoolState(state.handlers, state.submittedTasks - i)
quiescenceHandlersToRun = None
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks - i)
success = poolState.compareAndSet(state, newState)
} else if (state.submittedTasks == 1) {
handlersToRun = Some(state.handlers)
quiescenceHandlersToRun = Some(state.quiescenceHandlers)
quiescenceCellHandlersToRun = Some(state.quiescenceCellHandlers)
val newState = new PoolState()
success = poolState.compareAndSet(state, newState)
} else {
throw new Exception("BOOM")
}
}
if (handlersToRun.nonEmpty) {
handlersToRun.get.foreach { handler =>

if (quiescenceHandlersToRun.nonEmpty) {
quiescenceHandlersToRun.get.foreach { handler =>
execute(new Runnable {
def run(): Unit = handler()
})
}
}
if (quiescenceCellHandlersToRun.nonEmpty)
quiescenceCellHandlersToRun.get.foreach(_.execute())

}

// Shouldn't we use:
Expand Down
136 changes: 133 additions & 3 deletions core/src/test/scala/com/phaller/rasync/test/pool.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.phaller.rasync
package test

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch }

import org.scalatest.FunSuite

import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import lattice.{ Lattice, StringIntKey, StringIntUpdater, Updater }

import scala.util.{ Failure, Success }
import lattice.{ StringIntKey, StringIntUpdater, Updater }

class PoolSuite extends FunSuite {
test("onQuiescent") {
Expand All @@ -17,7 +19,7 @@ class PoolSuite extends FunSuite {
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true) }: Unit }
pool.execute { () => { p1.success(true); () } }
pool.onQuiescent { () => p2.success(true) }
try {
Await.result(p2.future, 1.seconds)
Expand Down Expand Up @@ -69,4 +71,132 @@ class PoolSuite extends FunSuite {
assert(regCells.size === 1000)
}

test("onQuiescent(cell): incomplete cell") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter[StringIntKey, Int]("somekey")(new StringIntUpdater, pool)

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
pool.onQuiescent { () => p2.success(true) }
try {
Await.result(p2.future, 1.seconds)
} catch {
case t: Throwable =>
assert(false, s"failure after $i iterations")
}
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 0)
latch.countDown()
case Failure(_) => assert(false)
}

latch.await()

pool.shutdown()
}

test("onQuiescent(cell): completed cell") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter.completed[Int](10)(new StringIntUpdater, pool)

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
pool.onQuiescent { () => p2.success(true) }
try {
Await.result(p2.future, 1.seconds)
} catch {
case t: Throwable =>
assert(false, s"failure after $i iterations")
}
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 10)
latch.countDown()
case Failure(_) => assert(false)
}

latch.await()

pool.shutdown()
}

test("onQuiescent(cell): incomplete cell, added in non-quiescent state") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter[StringIntKey, Int]("somekey")(new StringIntUpdater, pool)

pool.execute(() => {
// Add all tasks and handler in this thread to ensure
// that the pool is not quiescent while adding.

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 0)
latch.countDown()
case Failure(_) => assert(false)
}
})

latch.await()

pool.shutdown()
}

test("onQuiescent(cell): completed cell, added in non-quiescent state") {
val latch = new CountDownLatch(1)

val pool = new HandlerPool
val completer1 = CellCompleter.completed[Int](10)(new StringIntUpdater, pool)

pool.execute(() => {
// Add all tasks and handler in this thread to ensure
// that the pool is not quiescent while adding.

var i = 0
while (i < 10000) {
val p1 = Promise[Boolean]()
val p2 = Promise[Boolean]()
pool.execute { () => { p1.success(true); () } }
i += 1
}

pool.onQuiescent(completer1.cell) {
case Success(x) =>
assert(x === 10)
latch.countDown()
case Failure(_) => assert(false)
}
})

latch.await()

pool.shutdown()
}

}