Skip to content

Commit

Permalink
Implemented CurlMultiSocket
Browse files Browse the repository at this point in the history
This implementation uses curl multi socket drive, and uses
FileDescriptorPoller from cats effect.
It allows using this library alongside other cats effect libraries.
  • Loading branch information
hnaderi committed Jul 4, 2023
1 parent 235d969 commit 829ba96
Show file tree
Hide file tree
Showing 11 changed files with 681 additions and 10 deletions.
3 changes: 3 additions & 0 deletions curl/src/main/scala/org/http4s/curl/http/CurlClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@ package org.http4s.curl.http
import cats.effect._
import org.http4s.client.Client
import org.http4s.curl.unsafe.CurlExecutorScheduler
import org.http4s.curl.unsafe.CurlMultiSocket

private[curl] object CurlClient {
def apply(ec: CurlExecutorScheduler): Client[IO] = Client(CurlRequest(ec, _))

def multiSocket(ms: CurlMultiSocket): Client[IO] = Client(CurlRequest.applyMultiSocket(ms, _))

def get: IO[Client[IO]] = IO.executionContext.flatMap {
case ec: CurlExecutorScheduler => IO.pure(apply(ec))
case _ => IO.raiseError(new RuntimeException("Not running on CurlExecutorScheduler"))
Expand Down
65 changes: 65 additions & 0 deletions curl/src/main/scala/org/http4s/curl/http/CurlRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.http4s.Response
import org.http4s.curl.internal.Utils
import org.http4s.curl.internal._
import org.http4s.curl.unsafe.CurlExecutorScheduler
import org.http4s.curl.unsafe.CurlMultiSocket

private[curl] object CurlRequest {
private def setup(
Expand Down Expand Up @@ -78,6 +79,57 @@ private[curl] object CurlRequest {
)
)

private def setup(
handle: CurlEasy,
send: RequestSend,
recv: RequestRecv,
req: Request[IO],
): Resource[IO, Unit] =
Utils.newZone.flatMap(implicit zone =>
CurlSList().evalMap(headers =>
IO {
// TODO add in options
// handle.setVerbose(true)

import org.http4s.curl.unsafe.libcurl_const
import scala.scalanative.unsafe._
import org.http4s.Header
import org.http4s.HttpVersion
import org.typelevel.ci._

handle.setCustomRequest(toCString(req.method.renderString))

handle.setUpload(true)

handle.setUrl(toCString(req.uri.renderString))

val httpVersion = req.httpVersion match {
case HttpVersion.`HTTP/1.0` => libcurl_const.CURL_HTTP_VERSION_1_0
case HttpVersion.`HTTP/1.1` => libcurl_const.CURL_HTTP_VERSION_1_1
case HttpVersion.`HTTP/2` => libcurl_const.CURL_HTTP_VERSION_2
case HttpVersion.`HTTP/3` => libcurl_const.CURL_HTTP_VERSION_3
case _ => libcurl_const.CURL_HTTP_VERSION_NONE
}
handle.setHttpVersion(httpVersion)

req.headers // curl adds these headers automatically, so we explicitly disable them
.transform(Header.Raw(ci"Expect", "") :: Header.Raw(ci"Transfer-Encoding", "") :: _)
.foreach(header => headers.append(header.toString))

handle.setHttpHeader(headers.toPtr)

handle.setReadData(Utils.toPtr(send))
handle.setReadFunction(RequestSend.readCallback(_, _, _, _))

handle.setHeaderData(Utils.toPtr(recv))
handle.setHeaderFunction(RequestRecv.headerCallback(_, _, _, _))

handle.setWriteData(Utils.toPtr(recv))
handle.setWriteFunction(RequestRecv.writeCallback(_, _, _, _))
}
)
)

def apply(ec: CurlExecutorScheduler, req: Request[IO]): Resource[IO, Response[IO]] = for {
gc <- GCRoot()
handle <- CurlEasy()
Expand All @@ -89,4 +141,17 @@ private[curl] object CurlRequest {
_ <- req.body.through(send.pipe).compile.drain.background
resp <- recv.response()
} yield resp

def applyMultiSocket(ms: CurlMultiSocket, req: Request[IO]): Resource[IO, Response[IO]] = for {
gc <- GCRoot()
handle <- CurlEasy()
flow <- FlowControl(handle)
send <- RequestSend(flow)
recv <- RequestRecv(flow)
_ <- gc.add(send, recv)
_ <- setup(handle, send, recv, req)
_ <- ms.addHandlerTerminating(handle, recv.onTerminated).toResource
_ <- req.body.through(send.pipe).compile.drain.background
resp <- recv.response()
} yield resp
}
30 changes: 30 additions & 0 deletions curl/src/main/scala/org/http4s/curl/unsafe/CURLMcode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.curl.unsafe

import org.http4s.curl.CurlError

import scala.scalanative.unsafe._

final private[curl] case class CURLMcode(value: CInt) extends AnyVal {
@inline def isOk: Boolean = value == 0
@inline def isError: Boolean = value != 0
@inline def throwOnError: Unit =
if (isError) {
throw CurlError.fromMCode(this)
}
}
30 changes: 30 additions & 0 deletions curl/src/main/scala/org/http4s/curl/unsafe/CURLcode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.curl.unsafe

import org.http4s.curl.CurlError

import scala.scalanative.unsafe._

final private[curl] case class CURLcode(value: CInt) extends AnyVal {
@inline def isOk: Boolean = value == 0
@inline def isError: Boolean = value != 0
@inline def throwOnError: Unit =
if (isError) {
throw CurlError.fromCode(this)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import cats.effect.kernel.Resource
import cats.effect.unsafe.PollingExecutorScheduler
import org.http4s.curl.CurlError

import scala.annotation.nowarn
import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.scalanative.unsafe._
import scala.scalanative.unsigned._

@nowarn
final private[curl] class CurlExecutorScheduler(multiHandle: Ptr[libcurl.CURLM], pollEvery: Int)
extends PollingExecutorScheduler(pollEvery) {

Expand Down
Loading

0 comments on commit 829ba96

Please sign in to comment.