From 98561693febb322a9ff937ed9574ac62b7294a3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Carlos=20Ch=C3=A1vez?= Date: Sun, 20 Sep 2020 10:47:47 +0200 Subject: [PATCH] feat(grpc): adds support for parsing incoming and outcoming payload, header and trailer. --- middleware/grpc/client.go | 18 ++++- middleware/grpc/client_parser_test.go | 105 ++++++++++++++++++++++++++ middleware/grpc/grpc_suite_test.go | 5 ++ middleware/grpc/server.go | 34 ++++++--- middleware/grpc/server_parser_test.go | 45 ++++++++--- middleware/grpc/shared.go | 21 +++++- 6 files changed, 203 insertions(+), 25 deletions(-) create mode 100644 middleware/grpc/client_parser_test.go diff --git a/middleware/grpc/client.go b/middleware/grpc/client.go index ba44d6ca..902c92a6 100644 --- a/middleware/grpc/client.go +++ b/middleware/grpc/client.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -66,6 +66,22 @@ func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientO } } +// WithClientOutPayloadParser adds a parser for the stats.OutPayload to be able to access +// the response payload +func WithClientOutPayloadParser(parser func(*stats.OutPayload, zipkin.Span)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.outPayload = parser + } +} + +// WithClientOutHeaderParser adds a parser for the stats.OutHeader to be able to access +// the response payload +func WithClientOutHeaderParser(parser func(*stats.OutHeader, zipkin.Span)) ClientOption { + return func(h *clientHandler) { + h.handleRPCParser.outHeader = parser + } +} + // NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add // tracing to a gRPC client. The gRPC method name is used as the span name and by default the only // tags are the gRPC status code if the call fails. diff --git a/middleware/grpc/client_parser_test.go b/middleware/grpc/client_parser_test.go new file mode 100644 index 00000000..9898a438 --- /dev/null +++ b/middleware/grpc/client_parser_test.go @@ -0,0 +1,105 @@ +// Copyright 2019 The OpenZipkin Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc_test + +import ( + "context" + "testing" + + "github.com/openzipkin/zipkin-go" + zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc" + service "github.com/openzipkin/zipkin-go/proto/testing" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/stats" +) + +func TestGRPCClientCanAccessToPayloadAndMetadata(t *testing.T) { + tracer, flusher := createTracer(false) + + s := grpc.NewServer() + defer s.Stop() + + service.RegisterHelloServiceServer(s, &TestHelloService{ + responseHeader: metadata.Pairs("test_key", "test_value_1"), + responseTrailer: metadata.Pairs("test_key", "test_value_2"), + }) + + dialer := initListener(s) + + ctx := context.Background() + conn, err := grpc.DialContext( + ctx, + "bufnet", + grpc.WithContextDialer(dialer), + grpc.WithInsecure(), + grpc.WithStatsHandler(zipkingrpc.NewClientHandler( + tracer, + zipkingrpc.WithClientOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.Span) { + m, ok := outPayload.Payload.(*service.HelloRequest) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloResponse") + } + if want, have := "Hello", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.Span) { + if want, have := "test_value", outHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInPayloadParser(func(inPayload *stats.InPayload, span zipkin.Span) { + m, ok := inPayload.Payload.(*service.HelloResponse) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloRequest") + } + if want, have := "World", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) { + if want, have := "test_value_1", inHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithClientInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) { + if want, have := "test_value_2", inTrailer.Trailer.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + )), + ) + + if err != nil { + t.Fatalf("Failed to dial bufnet: %v", err) + } + defer conn.Close() + + client := service.NewHelloServiceClient(conn) + + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value")) + _, err = client.Hello(ctx, &service.HelloRequest{ + Payload: "Hello", + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + spans := flusher() + if want, have := 1, len(spans); want != have { + t.Errorf("unexpected number of spans, want %d, have %d", want, have) + } +} diff --git a/middleware/grpc/grpc_suite_test.go b/middleware/grpc/grpc_suite_test.go index e8497350..5f763bad 100644 --- a/middleware/grpc/grpc_suite_test.go +++ b/middleware/grpc/grpc_suite_test.go @@ -125,6 +125,8 @@ func (g *sequentialIdGenerator) reset() { type TestHelloService struct { service.UnimplementedHelloServiceServer + responseHeader metadata.MD + responseTrailer metadata.MD } func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) { @@ -158,6 +160,9 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) } } + grpc.SetTrailer(ctx, s.responseTrailer) + grpc.SendHeader(ctx, s.responseHeader) + return resp, nil } diff --git a/middleware/grpc/server.go b/middleware/grpc/server.go index f229420e..afd31ce7 100644 --- a/middleware/grpc/server.go +++ b/middleware/grpc/server.go @@ -1,4 +1,4 @@ -// Copyright 2019 The OpenZipkin Authors +// Copyright 2020 The OpenZipkin Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -48,19 +48,35 @@ func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) Serve } } -// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access -// the request trailer -func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption { +// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access +// the request outgoing metadata +func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption { return func(h *serverHandler) { - h.handleRPCParser.inTrailer = parser + h.handleRPCParser.inHeader = parser } } -// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access -// the request payload -func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption { +// WithServerOutPayloadParser adds a parser for the stats.OutPayload to be able to access +// the response payload +func WithServerOutPayloadParser(parser func(*stats.OutPayload, zipkin.Span)) ServerOption { return func(h *serverHandler) { - h.handleRPCParser.inHeader = parser + h.handleRPCParser.outPayload = parser + } +} + +// WithServerOutTrailerParser adds a parser for the stats.OutTrailer to be able to access +// the response trailer +func WithServerOutTrailerParser(parser func(*stats.OutTrailer, zipkin.Span)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.outTrailer = parser + } +} + +// WithServerOutHeaderParser adds a parser for the stats.OutHeader to be able to access +// the response payload +func WithServerOutHeaderParser(parser func(*stats.OutHeader, zipkin.Span)) ServerOption { + return func(h *serverHandler) { + h.handleRPCParser.outHeader = parser } } diff --git a/middleware/grpc/server_parser_test.go b/middleware/grpc/server_parser_test.go index 979e3224..32663b5c 100644 --- a/middleware/grpc/server_parser_test.go +++ b/middleware/grpc/server_parser_test.go @@ -77,7 +77,7 @@ func TestGRPCServerCreatesASpanAndContext(t *testing.T) { } } -func TestGRPCServerCanAccessToHeaders(t *testing.T) { +func TestGRPCServerCanAccessToPayloadAndMetadata(t *testing.T) { tracer, flusher := createTracer(false) s := grpc.NewServer( @@ -85,14 +85,37 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { zipkingrpc.NewServerHandler( tracer, zipkingrpc.ServerTags(map[string]string{"default": "tag"}), + zipkingrpc.WithServerInPayloadParser(func(inPayload *stats.InPayload, span zipkin.Span) { + m, ok := inPayload.Payload.(*service.HelloRequest) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloRequest") + } + if want, have := "Hello", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) { if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have { - t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + t.Errorf("incorrect header value, want %q, have %q", want, have) + } + }), + zipkingrpc.WithServerOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.Span) { + m, ok := outPayload.Payload.(*service.HelloResponse) + if !ok { + t.Fatal("failed to cast the payload as a service.HelloResponse") + } + if want, have := "World", m.Payload; want != have { + t.Errorf("incorrect payload: want %q, have %q", want, have) + } + }), + zipkingrpc.WithServerOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.Span) { + if want, have := "test_value_1", outHeader.Header.Get("test_key")[0]; want != have { + t.Errorf("incorrect header value, want %q, have %q", want, have) } }), - zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) { - if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have { - t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have) + zipkingrpc.WithServerOutTrailerParser(func(outTrailer *stats.OutTrailer, span zipkin.Span) { + if want, have := "test_value_2", outTrailer.Trailer.Get("test_key")[0]; want != have { + t.Errorf("incorrect trailer value, want %q, have %q", want, have) } }), ), @@ -100,7 +123,10 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { ) defer s.Stop() - service.RegisterHelloServiceServer(s, &TestHelloService{}) + service.RegisterHelloServiceServer(s, &TestHelloService{ + responseHeader: metadata.Pairs("test_key", "test_value_1"), + responseTrailer: metadata.Pairs("test_key", "test_value_2"), + }) dialer := initListener(s) @@ -118,7 +144,7 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { client := service.NewHelloServiceClient(conn) - ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value") + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value")) _, err = client.Hello(ctx, &service.HelloRequest{ Payload: "Hello", }) @@ -130,9 +156,4 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) { if want, have := 1, len(spans); want != have { t.Errorf("unexpected number of spans, want %d, have %d", want, have) } - - span := spans[0] - if want, have := model.Server, span.Kind; want != have { - t.Errorf("unexpected kind, want %q, have %q", want, have) - } } diff --git a/middleware/grpc/shared.go b/middleware/grpc/shared.go index 8cc14ca0..622657b2 100644 --- a/middleware/grpc/shared.go +++ b/middleware/grpc/shared.go @@ -28,9 +28,12 @@ import ( ) type handleRPCParser struct { - inPayload func(*stats.InPayload, zipkin.Span) - inTrailer func(*stats.InTrailer, zipkin.Span) - inHeader func(*stats.InHeader, zipkin.Span) + inPayload func(*stats.InPayload, zipkin.Span) + inTrailer func(*stats.InTrailer, zipkin.Span) + inHeader func(*stats.InHeader, zipkin.Span) + outPayload func(*stats.OutPayload, zipkin.Span) + outTrailer func(*stats.OutTrailer, zipkin.Span) + outHeader func(*stats.OutHeader, zipkin.Span) } // A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of @@ -59,6 +62,18 @@ func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) { if h.inTrailer != nil { h.inTrailer(rs, span) } + case *stats.OutPayload: + if h.outPayload != nil { + h.outPayload(rs, span) + } + case *stats.OutHeader: + if h.outHeader != nil { + h.outHeader(rs, span) + } + case *stats.OutTrailer: + if h.outTrailer != nil { + h.outTrailer(rs, span) + } case *stats.End: s, ok := status.FromError(rs.Error) // rs.Error should always be convertable to a status, this is just a defensive check.