Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #280 #281

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
sudo: false
language: scala
scala:
- 2.11.8
jdk:
- oraclejdk8
- 2.12.8
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to submit the various build updates as a separate PR, unless those updates are required by the other changes?
(I find small, cohesive PRs generally are best)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .travis.yml file is currently broken; see: https://travis-ci.community/t/install-of-oracle-jdk-8-failing/3038

Do you want to fix the .travis.yml file as part of a separate issue? If so, this PR will fail until the travis ci issue is resolved.

cache:
directories:
- '$HOME/.ivy2/cache'
12 changes: 7 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases/"

val akkaVersion = "2.5.23"

val scalaCheck = "org.scalacheck" %% "scalacheck" % "1.14.0"
val specs2 = "org.specs2" %% "specs2-core" % "4.3.2"
val scalaTest = "org.scalatest" %% "scalatest" % "3.0.5"
val mockito = "org.mockito" % "mockito-core" % "2.21.0"
val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.14"
val akkaStreamTestKit = "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion

val snakeYaml = "org.yaml" % "snakeyaml" % "1.21"
val commonsIO = "commons-io" % "commons-io" % "2.6"
Expand All @@ -14,11 +16,11 @@ val bouncyCastle = "org.bouncycastle" % "bcpkix-jdk15on" % "1.60"

// the client API request/response handing uses Akka Http
val akkaHttp = "com.typesafe.akka" %% "akka-http" % "10.1.3"
val akkaStream = "com.typesafe.akka" %% "akka-stream" % "2.5.14"
val akka = "com.typesafe.akka" %% "akka-actor" % "2.5.14"
val akkaStream = "com.typesafe.akka" %% "akka-stream" % akkaVersion
val akka = "com.typesafe.akka" %% "akka-actor" % akkaVersion

// Skuber uses akka logging, so the examples config uses the akka slf4j logger with logback backend
val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % "2.5.14"
val akkaSlf4j = "com.typesafe.akka" %% "akka-slf4j" % akkaVersion
val logback = "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

// the Json formatters are based on Play Json
Expand All @@ -29,7 +31,7 @@ scalacOptions += "-target:jvm-1.8"

scalacOptions in Test ++= Seq("-Yrangepos")

version in ThisBuild := "2.2.0"
version in ThisBuild := "2.3.0"

sonatypeProfileName := "io.skuber"

Expand Down
73 changes: 61 additions & 12 deletions client/src/main/scala/skuber/api/client/KubernetesClient.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package skuber.api.client

import akka.stream.scaladsl.{Sink, Source}
import akka.http.scaladsl.Http
import akka.util.ByteString
import play.api.libs.json.{Writes,Format}
import play.api.libs.json.{Format, Writes}
import skuber.{DeleteOptions, HasStatusSubresource, LabelSelector, ListOptions, ListResource, ObjectResource, Pod, ResourceDefinition, Scale}
import skuber.api.patch.Patch
import skuber.api.watch.WatchSource
import skuber.batch.Job

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.{Future, Promise}

/**
Expand Down Expand Up @@ -88,6 +92,15 @@ trait KubernetesClient {
*/
def deleteWithOptions[O <: ObjectResource](name: String, options: DeleteOptions)(implicit rd: ResourceDefinition[O], lc: LoggingContext): Future[Unit]

/**
* Monitor a resource existence until no longer available
* @param name the name of the resource to monitor its existence
* @param monitorRepeatDelay delay for repeating the monitoring as long as the resource is available by name
* @tparam O the specific object resource type e.g. Pod, Deployment
* @return A future that will be set to success when Kubernetes confirm the resource is no longer available by name, otherwise failure
*/
def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)(implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit]

/**
* Delete all resources of specified type in current namespace
* @tparam L list resource type of resources to delete e.g. PodList, DeploymentList
Expand Down Expand Up @@ -216,10 +229,12 @@ trait KubernetesClient {
* Watch a specific object resource continuously. This returns a source that will continue to produce
* events on any updates to the object even if the server times out, by transparently restarting the watch as needed.
* @param obj the object resource to watch
* @param pool reuse a skuber pool for querying the server if any or create a new one
* @tparam O the type of the resource e.g Pod
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized
* value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any.
*/
def watchContinuously[O <: ObjectResource](obj: O)(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]
def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])(implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reluctant to modify the existing watch methods API signatures here, it affects all clients. In fact, because there are other watch related enhancements that I am working on, I plan to add a single new method to the existing API which will return a separate API object for some new/enhanced watch/list related functionality. That new API might be the right place to consider this new pool parameter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current API completely hides the fact that there's a pool involved and therefore prevents doing anything with it:

  • reuse the same across multiple calls (see the PiJobsSequential example)
  • create an independent pool for each call (see the PiJobsParallel example)
  • shutting down a pool when it is no longer needed

If you're thinking of refactoring this API, it would be good to keep the above use cases in mind; the distinction between sequential & parallel invocations of long-duration operations is very important for us.

I don't know how to support these use cases without changing the API in some incompatible way. Please advise how you want to proceed. I'm OK in deferring action until you refactor the API as you mentioned.


/**
* Watch a specific object resource continuously. This returns a source that will continue to produce
Expand All @@ -232,11 +247,13 @@ trait KubernetesClient {
* applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified,
* a single ADDED event will be produced for an already existing object followed by events for any future changes.
* @param bufSize optional buffer size for received object updates, normally the default is more than enough
* @param pool reuse a skuber pool for querying the server if any or create a new one
* @tparam O the type of the resource
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized
* value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any.
*/
def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]
def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])]

/**
* Watch all object resources of a specified type continuously. This returns a source that will continue to produce
Expand All @@ -248,24 +265,29 @@ trait KubernetesClient {
* applicable type (e.g. PodList, DeploymentList) and then supplies that to this method to receive any future updates. If no resource version is specified,
* a single ADDED event will be produced for an already existing object followed by events for any future changes.
* @param bufSize optional buffer size for received object updates, normally the default is more than enough
* @param pool reuse a skuber pool for querying the server if any or create a new one
* @tparam O the type pf the resource
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized
* value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any.
*/
def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]
def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])]

/**
* Watch all object resources of a specified type continuously, passing the specified options to the API server with the watch request.
* This returns a source that will continue to produce events even if the server times out, by transparently restarting the watch as needed.
*
* @param options a set of list options to pass to the server. See https://godoc.org/k8s.io/apimachinery/pkg/apis/meta/v1#ListOptions
* for the meaning of the options. Note that the `watch` flag in the options will be ignored / overridden by the client, which
* ensures a watch is always requested on the server.
* @param bufsize optional buffer size for received object updates, normally the default is more than enough
* @param pool reuse a skuber pool for querying the server if any or create a new one
* @tparam O the resource type to watch
* @return A future containing an Akka streams Source of WatchEvents that will be emitted
* @return A future containing an Akka streams Source of WatchEvents that will be emitted where the materialized
* value is a pair of the skuber pool used and the underlying Akka host connection pool used, if any.
*/
def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _]
def watchWithOptions[O <: ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])]

/**
* Get the scale subresource of the named object resource
Expand Down Expand Up @@ -350,6 +372,33 @@ trait KubernetesClient {
tty: Boolean = false,
maybeClose: Option[Promise[Unit]] = None)(implicit lc: LoggingContext): Future[Unit]

/**
* Execute a job, monitoring the progress of its pod until completion and monitor its deletion until complete
* @param job the Kubernetes job to execute
* @param labelSelector the label selector for monitoring the job's pod status
* @param podProgress the predicate for monitoring the pod status while satisfied before deleting the job
* @param podCompletion a callback invoked at the completion of the job's pod (successful or not),
* after which the job will be deleted if and only if the podCompletion result is true
* @param watchContinuouslyRequestTimeout the delay for continuously monitoring the pod progress
* @param deletionMonitorRepeatDelay the delay for continuously monitoring the job deletion
* @param pool a skuber pool to reuse, if any, or to create otherwise
* @param bufSize optional buffer size for received object updates, normally the default is more than enough
* @return A future consisting of a triple of the following:
* - the skuber pool suitable for subsequently executing other jobs on the same server
* - the akka host connection pool that can be shutdown when no further jobs need to be executed on the same server
* - the last pod status received when the pod progress predicate became unsatisfied
*/
def executeJobAndWaitUntilDeleted(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I try to keep the API generic in regards to resource types, which reflects the actual Kubernetes REST API design largely (exception of some specific methods on pods such as exec). This is specific to Job resources, so again my thoughts are that it could be implemented in the examples sub-project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

job: Job,
labelSelector: LabelSelector,
podProgress: WatchEvent[Pod] => Boolean,
podCompletion: WatchEvent[Pod] => Future[Boolean],
watchContinuouslyRequestTimeout: Duration,
deletionMonitorRepeatDelay: FiniteDuration,
pool: Option[Pool[WatchSource.Start[Pod]]],
bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod]):
Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])]

/**
* Return list of API versions supported by the server
* @param lc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import akka.http.scaladsl.model._
import akka.http.scaladsl.settings.{ClientConnectionSettings, ConnectionPoolSettings}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.{ConnectionContext, Http, HttpsConnectionContext}
import akka.pattern.after
import akka.stream.Materializer
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.util.ByteString
import com.typesafe.config.{Config, ConfigFactory}
import javax.net.ssl.SSLContext
import play.api.libs.json.{Format, Writes, Reads}
import play.api.libs.json.{Format, Reads, Writes}
import skuber._
import skuber.api.client.exec.PodExecImpl
import skuber.api.client.{K8SException => _, _}
Expand All @@ -22,6 +23,7 @@ import skuber.json.PlayJsonSupportForAkkaHttp._
import skuber.json.format.apiobj.statusReads
import skuber.json.format.{apiVersionsFormat, deleteOptionsFmt, namespaceListFmt}
import skuber.api.patch._
import skuber.batch.Job

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -403,6 +405,17 @@ class KubernetesClientImpl private[client] (
} yield ()
}

override def monitorResourceUntilUnavailable[O <: ObjectResource](name: String, monitorRepeatDelay: FiniteDuration)(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use watch here instead of polling?

implicit fmt: Format[O], rd: ResourceDefinition[O]): Future[Unit] =
getOption[O](name).flatMap {
case None =>
Future.successful(())
case Some(_) =>
after(monitorRepeatDelay, actorSystem.scheduler)(
monitorResourceUntilUnavailable[O](name, monitorRepeatDelay)
)
}

override def deleteAll[L <: ListResource[_]]()(
implicit fmt: Format[L], rd: ResourceDefinition[L], lc: LoggingContext): Future[L] =
{
Expand Down Expand Up @@ -481,30 +494,30 @@ class KubernetesClientImpl private[client] (
Watch.eventsOnKind[O](this, sinceResourceVersion, bufSize)
}

override def watchContinuously[O <: ObjectResource](obj: O)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] =
override def watchContinuously[O <: ObjectResource](obj: O, pool: Option[Pool[WatchSource.Start[O]]])(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] =
{
watchContinuously(obj.name)
watchContinuously(obj.name, pool = pool)
}

override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] =
override def watchContinuously[O <: ObjectResource](name: String, sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] =
{
val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds) )
WatchSource(this, buildLongPollingPool(), Some(name), options, bufSize)
WatchSource(this, pool.getOrElse(buildLongPollingPool()), Some(name), options, bufSize)
}

override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] =
override def watchAllContinuously[O <: ObjectResource](sinceResourceVersion: Option[String] = None, bufSize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] =
{
val options=ListOptions(resourceVersion = sinceResourceVersion, timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds))
WatchSource(this, buildLongPollingPool(), None, options, bufSize)
WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufSize)
}

override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], _] =
override def watchWithOptions[O <: skuber.ObjectResource](options: ListOptions, bufsize: Int = 10000, pool: Option[Pool[WatchSource.Start[O]]] = None)(
implicit fmt: Format[O], rd: ResourceDefinition[O], lc: LoggingContext): Source[WatchEvent[O], (Pool[WatchSource.Start[O]], Option[Http.HostConnectionPool])] =
{
WatchSource(this, buildLongPollingPool(), None, options, bufsize)
WatchSource(this, pool.getOrElse(buildLongPollingPool()), None, options, bufsize)
}

private def buildLongPollingPool[O <: ObjectResource]() = {
Expand Down Expand Up @@ -622,6 +635,45 @@ class KubernetesClientImpl private[client] (
PodExecImpl.exec(this, podName, command, maybeContainerName, maybeStdin, maybeStdout, maybeStderr, tty, maybeClose)
}

override def executeJobAndWaitUntilDeleted(
job: Job,
labelSelector: LabelSelector,
podProgress: WatchEvent[Pod] => Boolean,
podCompletion: WatchEvent[Pod] => Future[Boolean],
watchContinuouslyRequestTimeout: Duration,
deletionMonitorRepeatDelay: FiniteDuration,
pool: Option[Pool[WatchSource.Start[Pod]]],
bufSize: Int = 10000)(implicit jfmt: Format[Job], pfmt: Format[Pod], jrd: ResourceDefinition[Job], prd: ResourceDefinition[Pod])
: Future[(Pool[WatchSource.Start[Pod]], Option[Http.HostConnectionPool], WatchEvent[Pod])] =
for {
j <- create(job)
(p, hcp, lastPodEvent) <- {
watchWithOptions[Pod](
options = ListOptions(
labelSelector = Some(labelSelector),
timeoutSeconds = Some(watchContinuouslyRequestTimeout.toSeconds)
),
bufsize = bufSize,
pool = pool
)
.takeWhile(podProgress, inclusive = true)
.toMat(Sink.last)(Keep.both)
.run() match {
case ((pool: Pool[WatchSource.Start[Pod]],
hostConnectionPool: Option[Http.HostConnectionPool]),
f: Future[WatchEvent[Pod]]) =>
f.map { ev =>
(pool, hostConnectionPool, ev)
}
}
}
delete <- podCompletion(lastPodEvent)
_ <- if (delete)
deleteWithOptions[Job](name = j.metadata.name, options = DeleteOptions(propagationPolicy = Some(DeletePropagation.Foreground)))
.flatMap(_ => monitorResourceUntilUnavailable[Job](j.metadata.name, deletionMonitorRepeatDelay))
else Future.successful(())
} yield (p, hcp, lastPodEvent)

override def close: Unit =
{
isClosed = true
Expand Down
10 changes: 8 additions & 2 deletions client/src/main/scala/skuber/api/client/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package skuber.api
import java.time.Instant
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.Materializer
import akka.stream.scaladsl.Flow
Expand All @@ -23,7 +23,13 @@ import skuber.api.client.impl.KubernetesClientImpl
*/
package object client {

type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), NotUsed]
/**
* The materialized value is an optional host connection pool.
* For testing, allows mocking without creating a host connection pool.
* For development and production, provides access to the host connection pool created (if none was provided).
* @tparam T The type of elements flowing in and out.
*/
type Pool[T] = Flow[(HttpRequest, T), (Try[HttpResponse], T), Option[Http.HostConnectionPool]]

final val sysProps = new SystemProperties

Expand Down
Loading