diff --git a/azure/src/main/scala/quasar/blobstore/azure/Azure.scala b/azure/src/main/scala/quasar/blobstore/azure/Azure.scala index 69a1635..aee6bb3 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/Azure.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/Azure.scala @@ -26,7 +26,7 @@ import scala.concurrent.duration.MILLISECONDS import cats._ import cats.implicits._ -import cats.effect.{Async, ConcurrentEffect, ContextShift, Sync, Timer} +import cats.effect.{ConcurrentEffect, ContextShift, Sync, Timer} import cats.effect.concurrent.Ref import com.azure.core.credential.{AccessToken, TokenCredential, TokenRequestContext} import com.azure.identity.ClientSecretCredentialBuilder @@ -40,7 +40,7 @@ object Azure extends Logging { def mkStdStorageUrl(name: AccountName): StorageUrl = StorageUrl(s"https://${name.value}.blob.core.windows.net/") - def getAccessToken[F[_]: Async: ContextShift](ad: ActiveDirectory): F[Expires[AccessToken]] = { + def getAccessToken[F[_]: ConcurrentEffect: ContextShift](ad: ActiveDirectory): F[Expires[AccessToken]] = { val mkBuilder = Sync[F].delay { (new ClientSecretCredentialBuilder) diff --git a/azure/src/main/scala/quasar/blobstore/azure/AzureDeleteService.scala b/azure/src/main/scala/quasar/blobstore/azure/AzureDeleteService.scala index a742e3b..d640ef6 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/AzureDeleteService.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/AzureDeleteService.scala @@ -21,13 +21,13 @@ import quasar.blobstore.paths.BlobPath import quasar.blobstore.services.DeleteService import cats.data.Kleisli -import cats.effect.{Async, ContextShift} +import cats.effect.{ConcurrentEffect, ContextShift} import cats.syntax.functor._ import com.azure.storage.blob.BlobContainerAsyncClient object AzureDeleteService { - def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient) + def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient) : DeleteService[F] = { val res = for { diff --git a/azure/src/main/scala/quasar/blobstore/azure/AzurePropsService.scala b/azure/src/main/scala/quasar/blobstore/azure/AzurePropsService.scala index 5c08283..2e6421c 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/AzurePropsService.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/AzurePropsService.scala @@ -20,7 +20,7 @@ import quasar.blobstore.azure.requests.BlobPropsArgs import quasar.blobstore.services.PropsService import cats.data.Kleisli -import cats.effect.{Async, ContextShift} +import cats.effect.{ConcurrentEffect, ContextShift} import cats.syntax.flatMap._ import cats.syntax.functor._ import com.azure.storage.blob.{BlobAsyncClient, BlobContainerAsyncClient} @@ -30,12 +30,12 @@ import scala.Option object AzurePropsService { - def fromBlobPropsArgs[F[_]: Async: ContextShift] + def fromBlobPropsArgs[F[_]: ConcurrentEffect: ContextShift] : Kleisli[F, BlobPropsArgs, Option[BlobProperties]] = handlers.recoverToNone( requests.blobPropsRequestK.map(_.getValue())) - def apply[F[_]: Async: ContextShift]( + def apply[F[_]: ConcurrentEffect: ContextShift]( containerClient: BlobContainerAsyncClient, mkArgs: BlobAsyncClient => BlobPropsArgs) : PropsService[F, BlobProperties] = @@ -46,7 +46,7 @@ object AzurePropsService { } yield res } - def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient) + def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient) : PropsService[F, BlobProperties] = AzurePropsService[F]( containerClient, diff --git a/azure/src/main/scala/quasar/blobstore/azure/AzureStatusService.scala b/azure/src/main/scala/quasar/blobstore/azure/AzureStatusService.scala index ce836f1..ee8986c 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/AzureStatusService.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/AzureStatusService.scala @@ -19,17 +19,17 @@ package quasar.blobstore.azure import quasar.blobstore.azure.requests.ContainerPropsArgs import quasar.blobstore.services.StatusService -import cats.effect.{Async, ContextShift} +import cats.effect.{ConcurrentEffect, ContextShift} import com.azure.storage.blob.BlobContainerAsyncClient import com.azure.storage.blob.models.BlobContainerProperties object AzureStatusService { - def apply[F[_]: Async: ContextShift](args: ContainerPropsArgs): StatusService[F] = + def apply[F[_]: ConcurrentEffect: ContextShift](args: ContainerPropsArgs): StatusService[F] = (requests.containerPropsRequestK[F] andThen converters.responseToBlobstoreStatusK[F, BlobContainerProperties] mapF handlers.recoverToBlobstoreStatus[F] ).apply(args) - def mk[F[_]: Async: ContextShift](containerClient: BlobContainerAsyncClient): StatusService[F] = + def mk[F[_]: ConcurrentEffect: ContextShift](containerClient: BlobContainerAsyncClient): StatusService[F] = AzureStatusService[F](ContainerPropsArgs(containerClient, null)) } diff --git a/azure/src/main/scala/quasar/blobstore/azure/reactive.scala b/azure/src/main/scala/quasar/blobstore/azure/reactive.scala index 66b6d5a..cefb717 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/reactive.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/reactive.scala @@ -20,18 +20,14 @@ import cats.effect._ import fs2.Stream import fs2.interop.reactivestreams._ import reactor.core.publisher.{Flux, Mono} -import reactor.core.scala.publisher.ScalaConverters._ object reactive { def streamToFlux[F[_]: ConcurrentEffect, A](s: Stream[F, A]): Flux[A] = Flux.from(s.toUnicastPublisher) - def monoToAsync[F[_]: ContextShift, A]( - mono: Mono[A])( - implicit F: Async[F]) - : F[A] = - Async.fromFuture(F.delay(mono.asScala.toFuture)) + def monoToAsync[F[_]: ConcurrentEffect, A](mono: Mono[A]): F[A] = + fromPublisher[F, A](mono).compile.lastOrError def fluxToStream[F[_]: ConcurrentEffect: ContextShift, A]( flux: Flux[A]) diff --git a/azure/src/main/scala/quasar/blobstore/azure/requests.scala b/azure/src/main/scala/quasar/blobstore/azure/requests.scala index b5d4ba5..9ead129 100644 --- a/azure/src/main/scala/quasar/blobstore/azure/requests.scala +++ b/azure/src/main/scala/quasar/blobstore/azure/requests.scala @@ -21,7 +21,7 @@ import scala.{Boolean, Byte} import scala.Predef._ import cats.data.Kleisli -import cats.effect.{Async, ConcurrentEffect, ContextShift, Sync} +import cats.effect.{ConcurrentEffect, ContextShift, Sync} import cats.syntax.flatMap._ import cats.syntax.functor._ import com.azure.core.http.rest.Response @@ -54,13 +54,13 @@ object requests { blobClient: BlobAsyncClient, blobRequestConditions: BlobRequestConditions) - def blobPropsRequest[F[_]: Async: ContextShift]( + def blobPropsRequest[F[_]: ConcurrentEffect: ContextShift]( args: BlobPropsArgs) : F[Response[BlobProperties]] = Sync[F].delay(args.blobClient.getPropertiesWithResponse(args.blobRequestConditions)) >>= reactive.monoToAsync[F, Response[BlobProperties]] - def blobPropsRequestK[F[_]: Async: ContextShift] + def blobPropsRequestK[F[_]: ConcurrentEffect: ContextShift] : Kleisli[F, BlobPropsArgs, Response[BlobProperties]] = Kleisli(blobPropsRequest[F]) @@ -80,12 +80,12 @@ object requests { containerClient: BlobContainerAsyncClient, leaseId: String) - def containerPropsRequest[F[_]: Async: ContextShift]( + def containerPropsRequest[F[_]: ConcurrentEffect: ContextShift]( args: ContainerPropsArgs): F[Response[BlobContainerProperties]] = Sync[F].delay(args.containerClient.getPropertiesWithResponse(args.leaseId)) >>= reactive.monoToAsync[F, Response[BlobContainerProperties]] - def containerPropsRequestK[F[_]: Async: ContextShift] + def containerPropsRequestK[F[_]: ConcurrentEffect: ContextShift] : Kleisli[F, ContainerPropsArgs, Response[BlobContainerProperties]] = Kleisli(containerPropsRequest[F]) @@ -95,13 +95,13 @@ object requests { parallelTransferOptions: ParallelTransferOptions, overwrite: Boolean) - def uploadRequest[F[_]: Async: ContextShift](args: UploadRequestArgs) + def uploadRequest[F[_]: ConcurrentEffect: ContextShift](args: UploadRequestArgs) : F[BlockBlobItem] = Sync[F].delay( args.blobClient.upload(args.bytes, args.parallelTransferOptions, args.overwrite) ) >>= reactive.monoToAsync[F, BlockBlobItem] - def uploadRequestK[F[_]: Async: ContextShift] + def uploadRequestK[F[_]: ConcurrentEffect: ContextShift] : Kleisli[F, UploadRequestArgs, BlockBlobItem] = Kleisli(uploadRequest[F]) } diff --git a/build.sbt b/build.sbt index 86664cf..cbcb944 100644 --- a/build.sbt +++ b/build.sbt @@ -23,7 +23,7 @@ ThisBuild / githubWorkflowBuildPreamble += List("decryptSecret gcs/src/test/resources/bad-auth-file.json.enc"), name = Some("Decrypt bad gcp service account json key")) -val AwsSdkVersion = "2.15.34" +val AwsSdkVersion = "2.16.21" val Fs2Version = "2.5.6" val MonixVersion = "3.4.0" val SpecsVersion = "4.10.6" @@ -72,7 +72,6 @@ lazy val azure = project libraryDependencies ++= Seq( "com.azure" % "azure-storage-blob" % "12.9.0", "com.azure" % "azure-identity" % "1.2.0", - "io.projectreactor" %% "reactor-scala-extensions" % "0.6.0", "com.codecommit" %% "cats-effect-testing-specs2" % "0.4.1" % Test)) lazy val gcs = project @@ -89,6 +88,5 @@ lazy val gcs = project "io.argonaut" %% "argonaut" % ArgonautVersion, "org.apache.logging.log4j" % "log4j-core" % Log4jVersion % Test, "org.apache.logging.log4j" % "log4j-slf4j-impl" % Log4jVersion % Test, - "org.http4s" %% "http4s-async-http-client" % Http4sVersion, - "org.http4s" %% "http4s-argonaut" % Http4sVersion, - "com.github.markusbernhardt" % "proxy-vole" % "1.0.5")) + "org.http4s" %% "http4s-ember-client" % Http4sVersion, + "org.http4s" %% "http4s-argonaut" % Http4sVersion)) diff --git a/gcs/src/main/scala/quasar/blobstore/gcs/AsyncHttpClientBuilder.scala b/gcs/src/main/scala/quasar/blobstore/gcs/AsyncHttpClientBuilder.scala deleted file mode 100644 index c02dd08..0000000 --- a/gcs/src/main/scala/quasar/blobstore/gcs/AsyncHttpClientBuilder.scala +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright 2020 Precog Data - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package quasar.blobstore.gcs - -import scala.Predef._ - -import cats.effect.{ConcurrentEffect, Resource, Sync} - -import com.github.markusbernhardt.proxy.ProxySearch - -import org.asynchttpclient.proxy.{ProxyServer, ProxyServerSelector} -import org.asynchttpclient.uri.Uri -import org.asynchttpclient.{AsyncHttpClientConfig, DefaultAsyncHttpClientConfig} - -import org.http4s.client.Client -import org.http4s.client.asynchttpclient.AsyncHttpClient - -import org.slf4s.Logging - -import scala.{ - StringContext, - Int, - List, - Option -} -import quasar.blobstore.CompatConverters.All._ - -import java.net.{InetSocketAddress, ProxySelector} -import java.net.Proxy -import java.net.Proxy.{Type => ProxyType} -import java.util.concurrent.{Executors, ThreadFactory} -import java.util.concurrent.atomic.AtomicInteger - - -object AsyncHttpClientBuilder extends Logging { - def apply[F[_]: ConcurrentEffect]: Resource[F, Client[F]] = - Resource.eval(Search[F]).flatMap(selector => - AsyncHttpClient.resource[F](mkConfig(selector))) - - def mkConfig[F[_]](proxySelector: ProxySelector): AsyncHttpClientConfig = - new DefaultAsyncHttpClientConfig.Builder() - .setMaxConnectionsPerHost(200) - .setMaxConnections(400) - .setRequestTimeout(Int.MaxValue) - .setShutdownTimeout(Int.MaxValue) - .setReadTimeout(Int.MaxValue) - .setConnectTimeout(Int.MaxValue) - .setProxyServerSelector(ProxyVoleProxyServerSelector(proxySelector)) - .setThreadFactory(NamedDaemonThreadFactory("http4s-async-http-client-worker")) - .build() - - private def sortProxies(proxies: List[Proxy]): List[Proxy] = - proxies.sortWith((l, r) => (l.`type`, r.`type`) match { - case (ProxyType.HTTP, ProxyType.DIRECT) => true - case (ProxyType.SOCKS, ProxyType.DIRECT) => true - case _ => false - }) - - private case class ProxyVoleProxyServerSelector(selector: ProxySelector) extends ProxyServerSelector { - def select(uri: Uri): ProxyServer = { - ProxySelector.setDefault(selector) // NB: I don't think this is necessary - - Option(selector) - .flatMap(s => Option(s.select(uri.toJavaNetURI))) - .flatMap(proxies0 => { - val proxies = proxies0.asScala.toList - log.debug(s"Found proxies: $proxies") - - val sortedProxies = sortProxies(proxies) - log.debug(s"Prioritized proxies as: $sortedProxies") - - sortedProxies.headOption - }) - .flatMap(server => Option(server.address)) - .map(_.asInstanceOf[InetSocketAddress]) // because Java - .map(uriToProxyServer) - .orNull // because Java x2 - } - - private def uriToProxyServer(u: InetSocketAddress): ProxyServer = - (new ProxyServer.Builder(u.getHostName, u.getPort)).build - } -} - -final case class NamedDaemonThreadFactory(name: String) extends ThreadFactory { - val threadNo = new AtomicInteger(0) - val backingThreadFactory = Executors.defaultThreadFactory() - - def newThread(r: java.lang.Runnable) = { - val thread = backingThreadFactory.newThread(r) - thread.setName(name + "-" + threadNo.incrementAndGet().toString) - thread.setDaemon(true) - thread - } -} - -object Search { - def apply[F[_]: Sync]: F[ProxySelector] = - Sync[F].delay(ProxySearch.getDefaultProxySearch.getProxySelector) -} diff --git a/gcs/src/main/scala/quasar/blobstore/gcs/EmberHttpClientBuilder.scala b/gcs/src/main/scala/quasar/blobstore/gcs/EmberHttpClientBuilder.scala new file mode 100644 index 0000000..6ebc919 --- /dev/null +++ b/gcs/src/main/scala/quasar/blobstore/gcs/EmberHttpClientBuilder.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Precog Data + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package quasar.blobstore.gcs + +import scala.concurrent.duration.Duration + +import cats.effect.{ConcurrentEffect, ContextShift, Resource, Timer} + +import org.http4s.client.Client +import org.http4s.ember.client.EmberClientBuilder + +import org.slf4s.Logging + +object EmberHttpClientBuilder extends Logging { + def apply[F[_]: ConcurrentEffect: ContextShift: Timer]: Resource[F, Client[F]] = + EmberClientBuilder + .default[F] + .withMaxTotal(400) + .withMaxPerKey(_ => 200) // the underlying pool is keyed by (scheme, host). i.e connection limit per host + .withTimeout(Duration.Inf) + .withMaxResponseHeaderSize(262144) + .withIdleConnectionTime(Duration.Inf) + .build +} diff --git a/gcs/src/main/scala/quasar/blobstore/gcs/GCSClient.scala b/gcs/src/main/scala/quasar/blobstore/gcs/GCSClient.scala index b316b96..f47bb6f 100644 --- a/gcs/src/main/scala/quasar/blobstore/gcs/GCSClient.scala +++ b/gcs/src/main/scala/quasar/blobstore/gcs/GCSClient.scala @@ -19,7 +19,7 @@ package quasar.blobstore.gcs import scala._ import scala.Predef._ -import cats.effect.{Concurrent, ConcurrentEffect, Resource, Sync} +import cats.effect.{Concurrent, ConcurrentEffect, ContextShift, Resource, Sync, Timer} import cats.implicits._ import org.http4s.client.Client import org.http4s.client.middleware.{RequestLogger, ResponseLogger} @@ -60,9 +60,9 @@ object GCSClient { Client(signAndSubmit) } - def apply[F[_]: ConcurrentEffect](cfg: ServiceAccountConfig) + def apply[F[_]: ConcurrentEffect: ContextShift: Timer](cfg: ServiceAccountConfig) : Resource[F, Client[F]] = - AsyncHttpClientBuilder[F] + EmberHttpClientBuilder[F] .map[F, Client[F]](sign(cfg)) .map[F, Client[F]](http4sLogger) } diff --git a/gcs/src/main/scala/quasar/blobstore/gcs/GoogleCloudStorage.scala b/gcs/src/main/scala/quasar/blobstore/gcs/GoogleCloudStorage.scala index 6cee1d8..6cf0132 100644 --- a/gcs/src/main/scala/quasar/blobstore/gcs/GoogleCloudStorage.scala +++ b/gcs/src/main/scala/quasar/blobstore/gcs/GoogleCloudStorage.scala @@ -24,11 +24,11 @@ import com.google.auth.oauth2.AccessToken import org.http4s.Uri import org.http4s.client.Client import org.slf4s.Logging -import cats.effect.ConcurrentEffect +import cats.effect.{ConcurrentEffect, ContextShift, Timer} object GoogleCloudStorage extends Logging { - def mkContainerClient[F[_]: ConcurrentEffect](cfg: ServiceAccountConfig): Resource[F, Client[F]] = + def mkContainerClient[F[_]: ConcurrentEffect: ContextShift: Timer](cfg: ServiceAccountConfig): Resource[F, Client[F]] = GCSClient(cfg) private def bucketUrl(bucket: Bucket) =