Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace AsyncHttpClient with Ember #121

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions azure/src/main/scala/quasar/blobstore/azure/Azure.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.concurrent.duration.MILLISECONDS

import cats._
import cats.implicits._
import cats.effect.{Async, ConcurrentEffect, ContextShift, Sync, Timer}
import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer}
import cats.effect.concurrent.Ref
import com.azure.core.credential.{AccessToken, TokenCredential, TokenRequestContext}
import com.azure.identity.ClientSecretCredentialBuilder
Expand All @@ -40,7 +40,7 @@ object Azure extends Logging {
def mkStdStorageUrl(name: AccountName): StorageUrl =
StorageUrl(s"https://${name.value}.blob.core.windows.net/")

def getAccessToken[F[_]: Async: ContextShift](ad: ActiveDirectory): F[Expires[AccessToken]] = {
def getAccessToken[F[_]: ConcurrentEffect: ContextShift](ad: ActiveDirectory): F[Expires[AccessToken]] = {
val mkBuilder =
Sync[F].delay {
(new ClientSecretCredentialBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import quasar.blobstore.paths.BlobPath
import quasar.blobstore.services.DeleteService

import cats.data.Kleisli
import cats.effect.{Async, ContextShift}
import cats.effect.{ConcurrentEffect, ContextShift}
import cats.syntax.functor._
import com.azure.storage.blob.BlobContainerAsyncClient

object AzureDeleteService {

def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient)
def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient)
: DeleteService[F] = {

val res = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import quasar.blobstore.azure.requests.BlobPropsArgs
import quasar.blobstore.services.PropsService

import cats.data.Kleisli
import cats.effect.{Async, ContextShift}
import cats.effect.{ConcurrentEffect, ContextShift}
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.azure.storage.blob.{BlobAsyncClient, BlobContainerAsyncClient}
Expand All @@ -30,12 +30,12 @@ import scala.Option

object AzurePropsService {

def fromBlobPropsArgs[F[_]: Async: ContextShift]
def fromBlobPropsArgs[F[_]: ConcurrentEffect: ContextShift]
: Kleisli[F, BlobPropsArgs, Option[BlobProperties]] =
handlers.recoverToNone(
requests.blobPropsRequestK.map(_.getValue()))

def apply[F[_]: Async: ContextShift](
def apply[F[_]: ConcurrentEffect: ContextShift](
containerClient: BlobContainerAsyncClient,
mkArgs: BlobAsyncClient => BlobPropsArgs)
: PropsService[F, BlobProperties] =
Expand All @@ -46,7 +46,7 @@ object AzurePropsService {
} yield res
}

def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient)
def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient)
: PropsService[F, BlobProperties] =
AzurePropsService[F](
containerClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ package quasar.blobstore.azure
import quasar.blobstore.azure.requests.ContainerPropsArgs
import quasar.blobstore.services.StatusService

import cats.effect.{Async, ContextShift}
import cats.effect.{ConcurrentEffect, ContextShift}
import com.azure.storage.blob.BlobContainerAsyncClient
import com.azure.storage.blob.models.BlobContainerProperties

object AzureStatusService {
def apply[F[_]: Async: ContextShift](args: ContainerPropsArgs): StatusService[F] =
def apply[F[_]: ConcurrentEffect: ContextShift](args: ContainerPropsArgs): StatusService[F] =
(requests.containerPropsRequestK[F] andThen
converters.responseToBlobstoreStatusK[F, BlobContainerProperties] mapF
handlers.recoverToBlobstoreStatus[F]
).apply(args)

def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient): StatusService[F] =
def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient): StatusService[F] =
AzureStatusService[F](ContainerPropsArgs(containerClient, null))
}
8 changes: 2 additions & 6 deletions azure/src/main/scala/quasar/blobstore/azure/reactive.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ import cats.effect._
import fs2.Stream
import fs2.interop.reactivestreams._
import reactor.core.publisher.{Flux, Mono}
import reactor.core.scala.publisher.ScalaConverters._

object reactive {

def streamToFlux[F[_]: ConcurrentEffect, A](s: Stream[F, A]): Flux[A] =
Flux.from(s.toUnicastPublisher)

def monoToAsync[F[_]: ContextShift, A](
mono: Mono[A])(
implicit F: Async[F])
: F[A] =
Async.fromFuture(F.delay(mono.asScala.toFuture))
def monoToAsync[F[_]: ConcurrentEffect, A](mono: Mono[A]): F[A] =
fromPublisher[F, A](mono).compile.lastOrError
Copy link
Contributor Author

@jsantos17 jsantos17 Mar 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably the most significant change. We used to use this to convert, but it looks obsolete given that Scala has supported Java lambdas since 2016. It was also bringing in ancient versions of reactor-netty (the source of this stack trace)

This appeared to be the most straightforward way to convert Monos to IO, given Monos are also publishers.


def fluxToStream[F[_]: ConcurrentEffect: ContextShift, A](
flux: Flux[A])
Expand Down
14 changes: 7 additions & 7 deletions azure/src/main/scala/quasar/blobstore/azure/requests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.{Boolean, Byte}
import scala.Predef._

import cats.data.Kleisli
import cats.effect.{Async, ConcurrentEffect, ContextShift, Sync}
import cats.effect.{ConcurrentEffect, ContextShift, Sync}
import cats.syntax.flatMap._
import cats.syntax.functor._
import com.azure.core.http.rest.Response
Expand Down Expand Up @@ -54,13 +54,13 @@ object requests {
blobClient: BlobAsyncClient,
blobRequestConditions: BlobRequestConditions)

def blobPropsRequest[F[_]: Async: ContextShift](
def blobPropsRequest[F[_]: ConcurrentEffect: ContextShift](
args: BlobPropsArgs)
: F[Response[BlobProperties]] =
Sync[F].delay(args.blobClient.getPropertiesWithResponse(args.blobRequestConditions)) >>=
reactive.monoToAsync[F, Response[BlobProperties]]

def blobPropsRequestK[F[_]: Async: ContextShift]
def blobPropsRequestK[F[_]: ConcurrentEffect: ContextShift]
: Kleisli[F, BlobPropsArgs, Response[BlobProperties]] =
Kleisli(blobPropsRequest[F])

Expand All @@ -80,12 +80,12 @@ object requests {
containerClient: BlobContainerAsyncClient,
leaseId: String)

def containerPropsRequest[F[_]: Async: ContextShift](
def containerPropsRequest[F[_]: ConcurrentEffect: ContextShift](
args: ContainerPropsArgs): F[Response[BlobContainerProperties]] =
Sync[F].delay(args.containerClient.getPropertiesWithResponse(args.leaseId)) >>=
reactive.monoToAsync[F, Response[BlobContainerProperties]]

def containerPropsRequestK[F[_]: Async: ContextShift]
def containerPropsRequestK[F[_]: ConcurrentEffect: ContextShift]
: Kleisli[F, ContainerPropsArgs, Response[BlobContainerProperties]] =
Kleisli(containerPropsRequest[F])

Expand All @@ -95,13 +95,13 @@ object requests {
parallelTransferOptions: ParallelTransferOptions,
overwrite: Boolean)

def uploadRequest[F[_]: Async: ContextShift](args: UploadRequestArgs)
def uploadRequest[F[_]: ConcurrentEffect: ContextShift](args: UploadRequestArgs)
: F[BlockBlobItem] =
Sync[F].delay(
args.blobClient.upload(args.bytes, args.parallelTransferOptions, args.overwrite)
) >>= reactive.monoToAsync[F, BlockBlobItem]

def uploadRequestK[F[_]: Async: ContextShift]
def uploadRequestK[F[_]: ConcurrentEffect: ContextShift]
: Kleisli[F, UploadRequestArgs, BlockBlobItem] =
Kleisli(uploadRequest[F])
}
8 changes: 3 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ThisBuild / githubWorkflowBuildPreamble +=
List("decryptSecret gcs/src/test/resources/bad-auth-file.json.enc"),
name = Some("Decrypt bad gcp service account json key"))

val AwsSdkVersion = "2.15.34"
val AwsSdkVersion = "2.16.21"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be bumped because of netty version change, right? Needs to be done in other libs using the aws sdk as well.

And I think something similar needs to happen for azure as well because they also use netty.

It can be quite a pain to get all of the netty versions line up across all libs. Depending on how far we are with this, it may be easier to roll back https://github.com/precog/quasar-datasource-url/pull/763 et al and publish our own fix directly on top of http4s 0.21.25 so that nothing related to netty changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we came to the same conclusion and are giving that a go.

val Fs2Version = "2.5.6"
val MonixVersion = "3.4.0"
val SpecsVersion = "4.10.6"
Expand Down Expand Up @@ -72,7 +72,6 @@ lazy val azure = project
libraryDependencies ++= Seq(
"com.azure" % "azure-storage-blob" % "12.9.0",
"com.azure" % "azure-identity" % "1.2.0",
"io.projectreactor" %% "reactor-scala-extensions" % "0.6.0",
"com.codecommit" %% "cats-effect-testing-specs2" % "0.4.1" % Test))

lazy val gcs = project
Expand All @@ -89,6 +88,5 @@ lazy val gcs = project
"io.argonaut" %% "argonaut" % ArgonautVersion,
"org.apache.logging.log4j" % "log4j-core" % Log4jVersion % Test,
"org.apache.logging.log4j" % "log4j-slf4j-impl" % Log4jVersion % Test,
"org.http4s" %% "http4s-async-http-client" % Http4sVersion,
"org.http4s" %% "http4s-argonaut" % Http4sVersion,
"com.github.markusbernhardt" % "proxy-vole" % "1.0.5"))
"org.http4s" %% "http4s-ember-client" % Http4sVersion,
"org.http4s" %% "http4s-argonaut" % Http4sVersion))
114 changes: 0 additions & 114 deletions gcs/src/main/scala/quasar/blobstore/gcs/AsyncHttpClientBuilder.scala

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2020 Precog Data
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package quasar.blobstore.gcs

import scala.concurrent.duration.Duration

import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer}

import org.http4s.client.Client
import org.http4s.ember.client.EmberClientBuilder

import org.slf4s.Logging

object EmberHttpClientBuilder extends Logging {
def apply[F[_]: ConcurrentEffect: ContextShift: Timer]: Resource[F, Client[F]] =
EmberClientBuilder
.default[F]
.withMaxTotal(400)
.withMaxPerKey(_ => 200) // the underlying pool is keyed by (scheme, host). i.e connection limit per host
.withTimeout(Duration.Inf)
.withMaxResponseHeaderSize(262144)
.withIdleConnectionTime(Duration.Inf)
.build
}
6 changes: 3 additions & 3 deletions gcs/src/main/scala/quasar/blobstore/gcs/GCSClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package quasar.blobstore.gcs
import scala._
import scala.Predef._

import cats.effect.{Concurrent, ConcurrentEffect, Resource, Sync}
import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
import cats.implicits._
import org.http4s.client.Client
import org.http4s.client.middleware.{RequestLogger, ResponseLogger}
Expand Down Expand Up @@ -60,9 +60,9 @@ object GCSClient {
Client(signAndSubmit)
}

def apply[F[_]: ConcurrentEffect](cfg: ServiceAccountConfig)
def apply[F[_]: ConcurrentEffect: ContextShift: Timer](cfg: ServiceAccountConfig)
: Resource[F, Client[F]] =
AsyncHttpClientBuilder[F]
EmberHttpClientBuilder[F]
.map[F, Client[F]](sign(cfg))
.map[F, Client[F]](http4sLogger)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import com.google.auth.oauth2.AccessToken
import org.http4s.Uri
import org.http4s.client.Client
import org.slf4s.Logging
import cats.effect.ConcurrentEffect
import cats.effect.{ConcurrentEffect, ContextShift, Timer}

object GoogleCloudStorage extends Logging {

def mkContainerClient[F[_]: ConcurrentEffect](cfg: ServiceAccountConfig): Resource[F, Client[F]] =
def mkContainerClient[F[_]: ConcurrentEffect: ContextShift: Timer](cfg: ServiceAccountConfig): Resource[F, Client[F]] =
GCSClient(cfg)

private def bucketUrl(bucket: Bucket) =
Expand Down