From fd93dc4876b1af91f40e8d5641b66ede1b737376 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Sun, 29 Sep 2024 16:10:46 -0500 Subject: [PATCH] go/runtime: refactor `controlPlane` out of `controlPlaneAuthorizer` `controlPlane` encapsulates commonalities in calling control plane APIs on behalf of a data-plane task context. --- go/runtime/authorizer.go | 88 +++++++++++----------------------- go/runtime/control_plane.go | 94 +++++++++++++++++++++++++++++++++++++ go/runtime/flow_consumer.go | 8 ++-- 3 files changed, 127 insertions(+), 63 deletions(-) create mode 100644 go/runtime/control_plane.go diff --git a/go/runtime/authorizer.go b/go/runtime/authorizer.go index 56324df81a..29ac505a4d 100644 --- a/go/runtime/authorizer.go +++ b/go/runtime/authorizer.go @@ -2,11 +2,7 @@ package runtime import ( "context" - "encoding/json" "fmt" - "io" - "net/http" - "path" "runtime/pprof" "strings" "sync" @@ -14,12 +10,11 @@ import ( pf "github.com/estuary/flow/go/protocols/flow" "github.com/golang-jwt/jwt/v5" - "go.gazette.dev/core/auth" pb "go.gazette.dev/core/broker/protocol" "google.golang.org/grpc/metadata" ) -// ControlPlaneAuthorizer is a pb.Authorizer which obtains tokens and +// controlPlaneAuthorizer is a pb.Authorizer which obtains tokens and // data-plane endpoints through the Estuary Authorization API. // // Specifically it: @@ -30,10 +25,8 @@ import ( // and evaluation of authorization rules. // 4. Caches and re-uses the authorization result (success or failure) // until its expiration. -type ControlPlaneAuthorizer struct { - controlAPI pb.Endpoint - dataplaneFQDN string - delegate *auth.KeyedAuth +type controlPlaneAuthorizer struct { + controlPlane *controlPlane cache struct { m map[authCacheKey]authCacheValue @@ -41,15 +34,11 @@ type ControlPlaneAuthorizer struct { } } -// NewControlPlaneAuthorizer returns a ControlPlaneAuthorizer which uses the -// given `controlAPI` endpoint to obtain authorizations, in the context of -// this `dataplaneFQDN` and the `delegate` KeyedAuth which is capable of -// signing tokens for `dataplaneFQDN`. -func NewControlPlaneAuthorizer(delegate *auth.KeyedAuth, dataplaneFQDN string, controlAPI pb.Endpoint) *ControlPlaneAuthorizer { - var a = &ControlPlaneAuthorizer{ - controlAPI: controlAPI, - dataplaneFQDN: dataplaneFQDN, - delegate: delegate, +// newControlPlaneAuthorizer returns a controlPlaneAuthorizer which uses the +// given `controlAPI` endpoint to obtain authorizations. +func newControlPlaneAuthorizer(cp *controlPlane) *controlPlaneAuthorizer { + var a = &controlPlaneAuthorizer{ + controlPlane: cp, } a.cache.m = make(map[authCacheKey]authCacheValue) @@ -87,12 +76,12 @@ func (v *authCacheValue) apply(ctx context.Context) (context.Context, error) { return metadata.AppendToOutgoingContext(ctx, "authorization", v.token), nil } -func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) { +func (a *controlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims, exp time.Duration) (context.Context, error) { var name = claims.Selector.Include.ValueOf("name") // Authorizations to shard recovery logs are self-signed. if strings.HasPrefix(name, "recovery/") { - return a.delegate.Authorize(ctx, claims, exp) + return a.controlPlane.keyedAuth.Authorize(ctx, claims, exp) } var shardID, ok = pprof.Label(ctx, "shard") @@ -127,7 +116,6 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims // We must issue a new request to the authorization server. // Begin by self-signing our request as a JWT. - claims.Issuer = a.dataplaneFQDN claims.Subject = shardID claims.Capability |= pf.Capability_AUTHORIZE // Required for delegated authorization. claims.IssuedAt = &jwt.NumericDate{Time: now} @@ -145,7 +133,7 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims // Attempt to fetch an authorization token from the control plane. // Cache errors for a period of time to prevent thundering herds on errors. - if token, address, expiresAt, err := doAuthFetch(a.controlAPI, claims, a.delegate.Keys[0]); err != nil { + if token, address, expiresAt, err := doAuthFetch(a.controlPlane, claims); err != nil { value = authCacheValue{ address: "", err: err, @@ -168,50 +156,30 @@ func (a *ControlPlaneAuthorizer) Authorize(ctx context.Context, claims pb.Claims return value.apply(ctx) } -func doAuthFetch(controlAPI pb.Endpoint, claims pb.Claims, key jwt.VerificationKey) (string, pb.Endpoint, time.Time, error) { - var token, err = jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(key) +func doAuthFetch(cp *controlPlane, claims pb.Claims) (string, pb.Endpoint, time.Time, error) { + reqToken, err := cp.signClaims(claims) if err != nil { return "", "", time.Time{}, fmt.Errorf("failed to self-sign authorization request: %w", err) } - token = `{"token":"` + token + `"}` - var brokerAddress pb.Endpoint - var url = controlAPI.URL() - url.Path = path.Join(url.Path, "/authorize/task") + var request = struct { + Token string `json:"token"` + }{reqToken} - // Invoke the authorization API, perhaps multiple times if asked to retry. - for { - httpResp, err := http.Post(url.String(), "application/json", strings.NewReader(token)) - if err != nil { - return "", "", time.Time{}, fmt.Errorf("failed to POST to authorization API: %w", err) - } - respBody, err := io.ReadAll(httpResp.Body) - if err != nil { - return "", "", time.Time{}, fmt.Errorf("failed to read authorization API response: %w", err) - } - if httpResp.StatusCode != 200 { - return "", "", time.Time{}, fmt.Errorf("authorization failed (%s): %s %s", httpResp.Status, string(respBody), token) - } - - var response struct { - Token string - BrokerAddress pb.Endpoint - RetryMillis uint64 - } - if err = json.Unmarshal(respBody, &response); err != nil { - return "", "", time.Time{}, fmt.Errorf("failed to decode authorization response: %w", err) - } + var response struct { + Token string + BrokerAddress pb.Endpoint + RetryMillis uint64 + } - if response.RetryMillis != 0 { - time.Sleep(time.Millisecond * time.Duration(response.RetryMillis)) - } else { - token, brokerAddress = response.Token, response.BrokerAddress - break - } + // We intentionally use context.Background and not the request context + // because we cache authorizations. + if err = callControlAPI(context.Background(), cp, "/authorize/task", &request, &response); err != nil { + return "", "", time.Time{}, err } claims = pb.Claims{} - if _, _, err = jwt.NewParser().ParseUnverified(token, &claims); err != nil { + if _, _, err = jwt.NewParser().ParseUnverified(response.Token, &claims); err != nil { return "", "", time.Time{}, fmt.Errorf("authorization server returned invalid token: %w", err) } @@ -221,7 +189,7 @@ func doAuthFetch(controlAPI pb.Endpoint, claims pb.Claims, key jwt.VerificationK return "", "", time.Time{}, fmt.Errorf("authorization server did not include an expires-at claim") } - return token, brokerAddress, claims.ExpiresAt.Time, nil + return response.Token, response.BrokerAddress, claims.ExpiresAt.Time, nil } -var _ pb.Authorizer = &ControlPlaneAuthorizer{} +var _ pb.Authorizer = &controlPlaneAuthorizer{} diff --git a/go/runtime/control_plane.go b/go/runtime/control_plane.go new file mode 100644 index 0000000000..e915e05eb5 --- /dev/null +++ b/go/runtime/control_plane.go @@ -0,0 +1,94 @@ +package runtime + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand/v2" + "net/http" + "path" + "time" + + "github.com/golang-jwt/jwt/v5" + "go.gazette.dev/core/auth" + pb "go.gazette.dev/core/broker/protocol" +) + +type controlPlane struct { + endpoint pb.Endpoint + dataplaneFQDN string + keyedAuth *auth.KeyedAuth +} + +func newControlPlane(keyedAuth *auth.KeyedAuth, dataplaneFQDN string, controlAPI pb.Endpoint) *controlPlane { + var cp = &controlPlane{ + endpoint: controlAPI, + dataplaneFQDN: dataplaneFQDN, + keyedAuth: keyedAuth, + } + return cp +} + +func (cp *controlPlane) signClaims(claims pb.Claims) (string, error) { + claims.Issuer = cp.dataplaneFQDN + + // Go's `json` encoding is incorrect with respect to canonical + // protobuf JSON encoding (explicit `null` is not allowed). + // This patches the encoding so it's conformant. + if claims.Selector.Include.Labels == nil { + claims.Selector.Include.Labels = []pb.Label{} + } + if claims.Selector.Exclude.Labels == nil { + claims.Selector.Exclude.Labels = []pb.Label{} + } + + return jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(cp.keyedAuth.Keys[0]) +} + +func callControlAPI[Request any, Response any](ctx context.Context, cp *controlPlane, resource string, request *Request, response *Response) error { + var url = cp.endpoint.URL() + url.Path = path.Join(url.Path, resource) + + var reqBytes, err = json.Marshal(request) + if err != nil { + return fmt.Errorf("failed to encode %s API request body: %w", resource, err) + } + + for { + httpReq, err := http.NewRequestWithContext(ctx, "POST", url.String(), bytes.NewReader(reqBytes)) + if err != nil { + return fmt.Errorf("failed to build POST request to %s: %w", resource, err) + } + httpReq.Header.Add("content-type", "application/json") + + httpResp, err := http.DefaultClient.Do(httpReq) + if err != nil { + return fmt.Errorf("failed to POST to %s API: %w", resource, err) + } + respBody, err := io.ReadAll(httpResp.Body) + if err != nil { + return fmt.Errorf("failed to read %s API response: %w", resource, err) + } + + // Parse the response for an indication of whether we should retry. + var skim struct { + RetryMillis uint64 + } + + if sc := httpResp.StatusCode; sc >= 500 && sc < 600 { + skim.RetryMillis = rand.Uint64N(4_750) + 250 // Random backoff in range [0.250s, 5s]. + } else if sc != 200 { + return fmt.Errorf("%s: %s", httpResp.Status, string(respBody)) + } else if err = json.Unmarshal(respBody, &skim); err != nil { + return fmt.Errorf("failed to decode %s API response: %w", resource, err) + } else if skim.RetryMillis != 0 { + time.Sleep(time.Millisecond * time.Duration(skim.RetryMillis)) + } else if err := json.Unmarshal(respBody, response); err != nil { + return fmt.Errorf("failed to decode %s API response: %w", resource, err) + } else { + return nil + } + } +} diff --git a/go/runtime/flow_consumer.go b/go/runtime/flow_consumer.go index e857dc4cf3..cb25325320 100644 --- a/go/runtime/flow_consumer.go +++ b/go/runtime/flow_consumer.go @@ -208,18 +208,20 @@ func (f *FlowConsumer) InitApplication(args runconsumer.InitArgs) error { return fmt.Errorf("catalog builds service: %w", err) } + var controlPlane *controlPlane var localAuthorizer = args.Service.Authorizer if keyedAuth, ok := localAuthorizer.(*auth.KeyedAuth); ok && !config.Flow.TestAPIs { - // Wrap the underlying KeyedAuth Authorizer to use the control-plane's Authorize API. - args.Service.Authorizer = NewControlPlaneAuthorizer( + controlPlane = newControlPlane( keyedAuth, config.Flow.DataPlaneFQDN, config.Flow.ControlAPI, ) - // Unwrap the raw JournalClient from its current AuthJournalClient, + // Wrap the underlying KeyedAuth Authorizer to use the control-plane's Authorize API. + // Next unwrap the raw JournalClient from its current AuthJournalClient, // and then replace it with one built using our wrapped Authorizer. + args.Service.Authorizer = newControlPlaneAuthorizer(controlPlane) var rawClient = args.Service.Journals.(*pb.ComposedRoutedJournalClient).JournalClient.(*pb.AuthJournalClient).Inner args.Service.Journals.(*pb.ComposedRoutedJournalClient).JournalClient = pb.NewAuthJournalClient(rawClient, args.Service.Authorizer) }