Skip to content

Commit

Permalink
go/runtime: refactor controlPlane out of controlPlaneAuthorizer
Browse files Browse the repository at this point in the history
`controlPlane` encapsulates commonalities in calling control plane APIs
on behalf of a data-plane task context.
  • Loading branch information
jgraettinger committed Sep 30, 2024
1 parent 12d582b commit fd93dc4
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 63 deletions.
88 changes: 28 additions & 60 deletions go/runtime/authorizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@ package runtime

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"runtime/pprof"
"strings"
"sync"
"time"

pf "github.com/estuary/flow/go/protocols/flow"
"github.com/golang-jwt/jwt/v5"
"go.gazette.dev/core/auth"
pb "go.gazette.dev/core/broker/protocol"
"google.golang.org/grpc/metadata"
)

// ControlPlaneAuthorizer is a pb.Authorizer which obtains tokens and
// controlPlaneAuthorizer is a pb.Authorizer which obtains tokens and
// data-plane endpoints through the Estuary Authorization API.
//
// Specifically it:
Expand All @@ -30,26 +25,20 @@ import (
// and evaluation of authorization rules.
// 4. Caches and re-uses the authorization result (success or failure)
// until its expiration.
type ControlPlaneAuthorizer struct {
controlAPI pb.Endpoint
dataplaneFQDN string
delegate *auth.KeyedAuth
type controlPlaneAuthorizer struct {
controlPlane *controlPlane

cache struct {
m map[authCacheKey]authCacheValue
mu sync.Mutex
}
}

// NewControlPlaneAuthorizer returns a ControlPlaneAuthorizer which uses the
// given `controlAPI` endpoint to obtain authorizations, in the context of
// this `dataplaneFQDN` and the `delegate` KeyedAuth which is capable of
// signing tokens for `dataplaneFQDN`.
func NewControlPlaneAuthorizer(delegate *auth.KeyedAuth, dataplaneFQDN string, controlAPI pb.Endpoint) *ControlPlaneAuthorizer {
var a = &ControlPlaneAuthorizer{
controlAPI: controlAPI,
dataplaneFQDN: dataplaneFQDN,
delegate: delegate,
// newControlPlaneAuthorizer returns a controlPlaneAuthorizer which uses the
// given `controlAPI` endpoint to obtain authorizations.
func newControlPlaneAuthorizer(cp *controlPlane) *controlPlaneAuthorizer {
var a = &controlPlaneAuthorizer{
controlPlane: cp,
}
a.cache.m = make(map[authCacheKey]authCacheValue)

Expand Down Expand Up @@ -87,12 +76,12 @@ func (v *authCacheValue) apply(ctx context.Context) (context.Context, error) {
return metadata.AppendToOutgoingContext(ctx, "authorization", v.token), nil
}

func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) {
func (a *controlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) {
var name = claims.Selector.Include.ValueOf("name")

// Authorizations to shard recovery logs are self-signed.
if strings.HasPrefix(name, "recovery/") {
return a.delegate.Authorize(ctx, claims, exp)
return a.controlPlane.keyedAuth.Authorize(ctx, claims, exp)
}

var shardID, ok = pprof.Label(ctx, "shard")
Expand Down Expand Up @@ -127,7 +116,6 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims
// We must issue a new request to the authorization server.
// Begin by self-signing our request as a JWT.

claims.Issuer = a.dataplaneFQDN
claims.Subject = shardID
claims.Capability |= pf.Capability_AUTHORIZE // Required for delegated authorization.
claims.IssuedAt = &jwt.NumericDate{Time: now}
Expand All @@ -145,7 +133,7 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims

// Attempt to fetch an authorization token from the control plane.
// Cache errors for a period of time to prevent thundering herds on errors.
if token, address, expiresAt, err := doAuthFetch(a.controlAPI, claims, a.delegate.Keys[0]); err != nil {
if token, address, expiresAt, err := doAuthFetch(a.controlPlane, claims); err != nil {
value = authCacheValue{
address: "",
err: err,
Expand All @@ -168,50 +156,30 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims
return value.apply(ctx)
}

func doAuthFetch(controlAPI pb.Endpoint, claims pb.Claims, key jwt.VerificationKey) (string, pb.Endpoint, time.Time, error) {
var token, err = jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(key)
func doAuthFetch(cp *controlPlane, claims pb.Claims) (string, pb.Endpoint, time.Time, error) {
reqToken, err := cp.signClaims(claims)
if err != nil {
return "", "", time.Time{}, fmt.Errorf("failed to self-sign authorization request: %w", err)
}
token = `{"token":"` + token + `"}`

var brokerAddress pb.Endpoint
var url = controlAPI.URL()
url.Path = path.Join(url.Path, "/authorize/task")
var request = struct {
Token string `json:"token"`
}{reqToken}

// Invoke the authorization API, perhaps multiple times if asked to retry.
for {
httpResp, err := http.Post(url.String(), "application/json", strings.NewReader(token))
if err != nil {
return "", "", time.Time{}, fmt.Errorf("failed to POST to authorization API: %w", err)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return "", "", time.Time{}, fmt.Errorf("failed to read authorization API response: %w", err)
}
if httpResp.StatusCode != 200 {
return "", "", time.Time{}, fmt.Errorf("authorization failed (%s): %s %s", httpResp.Status, string(respBody), token)
}

var response struct {
Token string
BrokerAddress pb.Endpoint
RetryMillis uint64
}
if err = json.Unmarshal(respBody, &response); err != nil {
return "", "", time.Time{}, fmt.Errorf("failed to decode authorization response: %w", err)
}
var response struct {
Token string
BrokerAddress pb.Endpoint
RetryMillis uint64
}

if response.RetryMillis != 0 {
time.Sleep(time.Millisecond * time.Duration(response.RetryMillis))
} else {
token, brokerAddress = response.Token, response.BrokerAddress
break
}
// We intentionally use context.Background and not the request context
// because we cache authorizations.
if err = callControlAPI(context.Background(), cp, "/authorize/task", &request, &response); err != nil {
return "", "", time.Time{}, err
}

claims = pb.Claims{}
if _, _, err = jwt.NewParser().ParseUnverified(token, &claims); err != nil {
if _, _, err = jwt.NewParser().ParseUnverified(response.Token, &claims); err != nil {
return "", "", time.Time{}, fmt.Errorf("authorization server returned invalid token: %w", err)
}

Expand All @@ -221,7 +189,7 @@ func doAuthFetch(controlAPI pb.Endpoint, claims pb.Claims, key jwt.VerificationK
return "", "", time.Time{}, fmt.Errorf("authorization server did not include an expires-at claim")
}

return token, brokerAddress, claims.ExpiresAt.Time, nil
return response.Token, response.BrokerAddress, claims.ExpiresAt.Time, nil
}

var _ pb.Authorizer = &ControlPlaneAuthorizer{}
var _ pb.Authorizer = &controlPlaneAuthorizer{}
94 changes: 94 additions & 0 deletions go/runtime/control_plane.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package runtime

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math/rand/v2"
"net/http"
"path"
"time"

"github.com/golang-jwt/jwt/v5"
"go.gazette.dev/core/auth"
pb "go.gazette.dev/core/broker/protocol"
)

type controlPlane struct {
endpoint pb.Endpoint
dataplaneFQDN string
keyedAuth *auth.KeyedAuth
}

func newControlPlane(keyedAuth *auth.KeyedAuth, dataplaneFQDN string, controlAPI pb.Endpoint) *controlPlane {
var cp = &controlPlane{
endpoint: controlAPI,
dataplaneFQDN: dataplaneFQDN,
keyedAuth: keyedAuth,
}
return cp
}

func (cp *controlPlane) signClaims(claims pb.Claims) (string, error) {
claims.Issuer = cp.dataplaneFQDN

// Go's `json` encoding is incorrect with respect to canonical
// protobuf JSON encoding (explicit `null` is not allowed).
// This patches the encoding so it's conformant.
if claims.Selector.Include.Labels == nil {
claims.Selector.Include.Labels = []pb.Label{}
}
if claims.Selector.Exclude.Labels == nil {
claims.Selector.Exclude.Labels = []pb.Label{}
}

return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(cp.keyedAuth.Keys[0])
}

func callControlAPI[Request any, Response any](ctx context.Context, cp *controlPlane, resource string, request *Request, response *Response) error {
var url = cp.endpoint.URL()
url.Path = path.Join(url.Path, resource)

var reqBytes, err = json.Marshal(request)
if err != nil {
return fmt.Errorf("failed to encode %s API request body: %w", resource, err)
}

for {
httpReq, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(reqBytes))
if err != nil {
return fmt.Errorf("failed to build POST request to %s: %w", resource, err)
}
httpReq.Header.Add("content-type", "application/json")

httpResp, err := http.DefaultClient.Do(httpReq)
if err != nil {
return fmt.Errorf("failed to POST to %s API: %w", resource, err)
}
respBody, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read %s API response: %w", resource, err)
}

// Parse the response for an indication of whether we should retry.
var skim struct {
RetryMillis uint64
}

if sc := httpResp.StatusCode; sc >= 500 && sc < 600 {
skim.RetryMillis = rand.Uint64N(4_750) + 250 // Random backoff in range [0.250s, 5s].
} else if sc != 200 {
return fmt.Errorf("%s: %s", httpResp.Status, string(respBody))
} else if err = json.Unmarshal(respBody, &skim); err != nil {
return fmt.Errorf("failed to decode %s API response: %w", resource, err)
} else if skim.RetryMillis != 0 {
time.Sleep(time.Millisecond * time.Duration(skim.RetryMillis))
} else if err := json.Unmarshal(respBody, response); err != nil {
return fmt.Errorf("failed to decode %s API response: %w", resource, err)
} else {
return nil
}
}
}
8 changes: 5 additions & 3 deletions go/runtime/flow_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,20 @@ func (f *FlowConsumer) InitApplication(args runconsumer.InitArgs) error {
return fmt.Errorf("catalog builds service: %w", err)
}

var controlPlane *controlPlane
var localAuthorizer = args.Service.Authorizer

if keyedAuth, ok := localAuthorizer.(*auth.KeyedAuth); ok && !config.Flow.TestAPIs {
// Wrap the underlying KeyedAuth Authorizer to use the control-plane's Authorize API.
args.Service.Authorizer = NewControlPlaneAuthorizer(
controlPlane = newControlPlane(
keyedAuth,
config.Flow.DataPlaneFQDN,
config.Flow.ControlAPI,
)

// Unwrap the raw JournalClient from its current AuthJournalClient,
// Wrap the underlying KeyedAuth Authorizer to use the control-plane's Authorize API.
// Next unwrap the raw JournalClient from its current AuthJournalClient,
// and then replace it with one built using our wrapped Authorizer.
args.Service.Authorizer = newControlPlaneAuthorizer(controlPlane)
var rawClient = args.Service.Journals.(*pb.ComposedRoutedJournalClient).JournalClient.(*pb.AuthJournalClient).Inner
args.Service.Journals.(*pb.ComposedRoutedJournalClient).JournalClient = pb.NewAuthJournalClient(rawClient, args.Service.Authorizer)
}
Expand Down

0 comments on commit fd93dc4

Please sign in to comment.