-
Notifications
You must be signed in to change notification settings - Fork 22
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* ZIO support * Build process fixes --------- Co-authored-by: Fristi <mark@vectos.net>
- Loading branch information
Showing
11 changed files
with
398 additions
and
22 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
...la/org/polyvariant/ocadotechnology/sttp/oauth2/cache/zio/CachingAccessTokenProvider.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
package org.polyvariant.sttp.oauth2.cache.zio | ||
|
||
import org.polyvariant.sttp.oauth2.AccessTokenProvider | ||
import org.polyvariant.sttp.oauth2.ClientCredentialsToken | ||
import org.polyvariant.sttp.oauth2.Secret | ||
import org.polyvariant.sttp.oauth2.cache.ExpiringCache | ||
import org.polyvariant.sttp.oauth2.cache.zio.CachingAccessTokenProvider.TokenWithExpirationTime | ||
import org.polyvariant.sttp.oauth2.common.Scope | ||
import zio.Clock | ||
import zio.Semaphore | ||
import zio._ | ||
|
||
import java.time.Instant | ||
import scala.concurrent.duration.Duration | ||
|
||
final class CachingAccessTokenProvider[R]( | ||
delegate: AccessTokenProvider[RIO[R, _]], | ||
semaphore: Semaphore, | ||
tokenCache: ExpiringCache[RIO[R, _], Option[Scope], TokenWithExpirationTime] | ||
) extends AccessTokenProvider[RIO[R, _]] { | ||
|
||
override def requestToken(scope: Option[Scope]): RIO[R, ClientCredentialsToken.AccessTokenResponse] = | ||
getFromCache(scope).flatMap { | ||
case Some(value) => ZIO.succeed(value) | ||
case None => semaphore.withPermit(acquireToken(scope)) | ||
} | ||
|
||
private def acquireToken(scope: Option[Scope]): ZIO[R, Throwable, ClientCredentialsToken.AccessTokenResponse] = | ||
getFromCache(scope).flatMap { | ||
case Some(value) => ZIO.succeed(value) | ||
case None => fetchAndSaveToken(scope) | ||
} | ||
|
||
private def getFromCache(scope: Option[Scope]) = | ||
tokenCache.get(scope).flatMap { entry => | ||
Clock.instant.map { now => | ||
entry match { | ||
case Some(value) => Some(value.toAccessTokenResponse(now)) | ||
case None => None | ||
} | ||
} | ||
} | ||
|
||
private def fetchAndSaveToken(scope: Option[Scope]) = | ||
for { | ||
token <- delegate.requestToken(scope) | ||
tokenWithExpiry <- calculateExpiryInstant(token) | ||
_ <- tokenCache.put(scope, tokenWithExpiry, tokenWithExpiry.expirationTime) | ||
} yield token | ||
|
||
private def calculateExpiryInstant(response: ClientCredentialsToken.AccessTokenResponse) = | ||
Clock.instant.map(TokenWithExpirationTime.from(response, _)) | ||
|
||
} | ||
|
||
object CachingAccessTokenProvider { | ||
|
||
def apply[R]( | ||
delegate: AccessTokenProvider[RIO[R, _]], | ||
tokenCache: ExpiringCache[RIO[R, _], Option[Scope], TokenWithExpirationTime] | ||
): RIO[R, CachingAccessTokenProvider[R]] = Semaphore.make(permits = 1).map(new CachingAccessTokenProvider(delegate, _, tokenCache)) | ||
|
||
def refCacheInstance(delegate: AccessTokenProvider[Task]): Task[CachingAccessTokenProvider[Any]] = | ||
ZioRefExpiringCache[Option[Scope], TokenWithExpirationTime].flatMap(CachingAccessTokenProvider(delegate, _)) | ||
|
||
final case class TokenWithExpirationTime( | ||
accessToken: Secret[String], | ||
domain: Option[String], | ||
expirationTime: Instant, | ||
scope: Option[Scope] | ||
) { | ||
|
||
def toAccessTokenResponse(now: Instant): ClientCredentialsToken.AccessTokenResponse = { | ||
val newExpiresIn = Duration.fromNanos(java.time.Duration.between(now, expirationTime).toNanos) | ||
ClientCredentialsToken.AccessTokenResponse(accessToken, domain, newExpiresIn, scope) | ||
} | ||
|
||
} | ||
|
||
object TokenWithExpirationTime { | ||
|
||
def from(token: ClientCredentialsToken.AccessTokenResponse, now: Instant): TokenWithExpirationTime = { | ||
val expirationTime = now.plusNanos(token.expiresIn.toNanos) | ||
TokenWithExpirationTime(token.accessToken, token.domain, expirationTime, token.scope) | ||
} | ||
|
||
} | ||
|
||
} |
35 changes: 35 additions & 0 deletions
35
...ain/scala/org/polyvariant/ocadotechnology/sttp/oauth2/cache/zio/ZioRefExpiringCache.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package org.polyvariant.sttp.oauth2.cache.zio | ||
|
||
import org.polyvariant.sttp.oauth2.cache.ExpiringCache | ||
import org.polyvariant.sttp.oauth2.cache.zio.ZioRefExpiringCache.Entry | ||
import zio.Clock | ||
import zio.Ref | ||
import zio.Task | ||
import zio.ZIO | ||
|
||
import java.time.Instant | ||
|
||
final class ZioRefExpiringCache[K, V] private (ref: Ref[Map[K, Entry[V]]]) extends ExpiringCache[Task, K, V] { | ||
|
||
override def get(key: K): Task[Option[V]] = | ||
ref.get.map(_.get(key)).flatMap { entry => | ||
Clock.instant.flatMap { now => | ||
(entry, now) match { | ||
case (Some(Entry(value, expiryInstant)), now) => | ||
if (now.isBefore(expiryInstant)) ZIO.succeed(Some(value)) else remove(key).as(None) | ||
case _ => | ||
ZIO.none | ||
} | ||
} | ||
} | ||
|
||
override def put(key: K, value: V, expirationTime: Instant): Task[Unit] = ref.update(_ + (key -> Entry(value, expirationTime))) | ||
|
||
override def remove(key: K): Task[Unit] = ref.update(_ - key) | ||
} | ||
|
||
object ZioRefExpiringCache { | ||
private final case class Entry[V](value: V, expirationTime: Instant) | ||
|
||
def apply[K, V]: Task[ExpiringCache[Task, K, V]] = Ref.make(Map.empty[K, Entry[V]]).map(new ZioRefExpiringCache(_)) | ||
} |
72 changes: 72 additions & 0 deletions
72
...ariant/ocadotechnology/sttp/oauth2/cache/zio/CachingAccessTokenProviderParallelSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
package org.polyvariant.sttp.oauth2.cache.zio | ||
|
||
import org.polyvariant.sttp.oauth2.ClientCredentialsToken.AccessTokenResponse | ||
import org.polyvariant.sttp.oauth2.Secret | ||
import org.polyvariant.sttp.oauth2.cache.ExpiringCache | ||
import org.polyvariant.sttp.oauth2.cache.zio.CachingAccessTokenProvider.TokenWithExpirationTime | ||
import org.polyvariant.sttp.oauth2.common.Scope | ||
import zio.test._ | ||
import zio.{Duration => ZDuration} | ||
import zio.Ref | ||
import zio.Task | ||
import zio.ZIO | ||
|
||
import java.time.Instant | ||
import scala.concurrent.duration._ | ||
|
||
object CachingAccessTokenProviderParallelSpec extends ZIOSpecDefault { | ||
|
||
private val testScope: Option[Scope] = Scope.of("test-scope") | ||
private val token = AccessTokenResponse(Secret("secret"), None, 10.seconds, testScope) | ||
|
||
private val sleepDuration: FiniteDuration = 1.second | ||
|
||
def spec = suite("CachingAccessTokenProvider")( | ||
test("block multiple parallel") { | ||
prepareTest.flatMap { case (delegate, cachingProvider) => | ||
delegate.setToken(testScope, token) *> | ||
(cachingProvider.requestToken(testScope) zipPar cachingProvider.requestToken(testScope)).map { case (result1, result2) => | ||
assert(result1)(Assertion.equalTo(token.copy(expiresIn = result1.expiresIn))) && | ||
assert(result2)(Assertion.equalTo(token.copy(expiresIn = result2.expiresIn))) && | ||
// if both calls would be made in parallel, both would get the same expiresIn from TestAccessTokenProvider. | ||
// When blocking is in place, the second call would be delayed by sleepDuration and would hit the cache, | ||
// which has Instant on top of which new expiresIn would be calculated | ||
assert(diffInExpirations(result1, result2))(Assertion.isGreaterThanEqualTo(sleepDuration)) | ||
} | ||
} | ||
}, | ||
test("not block multiple parallel access if its already in cache") { | ||
prepareTest.flatMap { case (delegate, cachingProvider) => | ||
delegate.setToken(testScope, token) *> cachingProvider.requestToken(testScope) *> | ||
(cachingProvider.requestToken(testScope) zipPar cachingProvider.requestToken(testScope)) map { case (result1, result2) => | ||
assert(result1)(Assertion.equalTo(token.copy(expiresIn = result1.expiresIn))) && | ||
assert(result2)(Assertion.equalTo(token.copy(expiresIn = result2.expiresIn))) && | ||
// second call should not be forced to wait sleepDuration, because some active token is already in cache | ||
assert(diffInExpirations(result1, result2))(Assertion.isLessThan(sleepDuration)) | ||
} | ||
} | ||
} | ||
) @@ TestAspect.withLiveEnvironment | ||
|
||
private def diffInExpirations(result1: AccessTokenResponse, result2: AccessTokenResponse) = | ||
if (result1.expiresIn > result2.expiresIn) result1.expiresIn - result2.expiresIn else result2.expiresIn - result1.expiresIn | ||
|
||
class DelayingCache[K, V](delegate: ExpiringCache[Task, K, V]) extends ExpiringCache[Task, K, V] { | ||
override def get(key: K): Task[Option[V]] = delegate.get(key) | ||
|
||
override def put(key: K, value: V, expirationTime: Instant): Task[Unit] = | ||
ZIO.sleep(ZDuration.fromScala(sleepDuration)) *> delegate.put(key, value, expirationTime) | ||
|
||
override def remove(key: K): Task[Unit] = delegate.remove(key) | ||
} | ||
|
||
private def prepareTest = | ||
for { | ||
state <- Ref.make[TestAccessTokenProvider.State](TestAccessTokenProvider.State.empty) | ||
delegate = TestAccessTokenProvider(state) | ||
cache <- ZioRefExpiringCache[Option[Scope], TokenWithExpirationTime] | ||
delayingCache = new DelayingCache(cache) | ||
cachingProvider <- CachingAccessTokenProvider(delegate, delayingCache) | ||
} yield (delegate, cachingProvider) | ||
|
||
} |
Oops, something went wrong.