Skip to content

Commit

Permalink
feat(grpc): adds support for parsing incoming and outcoming payload, …
Browse files Browse the repository at this point in the history
…header and trailer.
  • Loading branch information
jcchavezs committed Sep 20, 2020
1 parent 1c762e0 commit 9856169
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 25 deletions.
18 changes: 17 additions & 1 deletion middleware/grpc/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,22 @@ func WithClientInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ClientO
}
}

// WithClientOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the response payload
func WithClientOutPayloadParser(parser func(*stats.OutPayload, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outPayload = parser
}
}

// WithClientOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the response payload
func WithClientOutHeaderParser(parser func(*stats.OutHeader, zipkin.Span)) ClientOption {
return func(h *clientHandler) {
h.handleRPCParser.outHeader = parser
}
}

// NewClientHandler returns a stats.Handler which can be used with grpc.WithStatsHandler to add
// tracing to a gRPC client. The gRPC method name is used as the span name and by default the only
// tags are the gRPC status code if the call fails.
Expand Down
105 changes: 105 additions & 0 deletions middleware/grpc/client_parser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2019 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc_test

import (
"context"
"testing"

"github.com/openzipkin/zipkin-go"
zipkingrpc "github.com/openzipkin/zipkin-go/middleware/grpc"
service "github.com/openzipkin/zipkin-go/proto/testing"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
)

func TestGRPCClientCanAccessToPayloadAndMetadata(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer()
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{
responseHeader: metadata.Pairs("test_key", "test_value_1"),
responseTrailer: metadata.Pairs("test_key", "test_value_2"),
})

dialer := initListener(s)

ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"bufnet",
grpc.WithContextDialer(dialer),
grpc.WithInsecure(),
grpc.WithStatsHandler(zipkingrpc.NewClientHandler(
tracer,
zipkingrpc.WithClientOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.Span) {
m, ok := outPayload.Payload.(*service.HelloRequest)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloResponse")
}
if want, have := "Hello", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.Span) {
if want, have := "test_value", outHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInPayloadParser(func(inPayload *stats.InPayload, span zipkin.Span) {
m, ok := inPayload.Payload.(*service.HelloResponse)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloRequest")
}
if want, have := "World", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
if want, have := "test_value_1", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithClientInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
if want, have := "test_value_2", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
)),
)

if err != nil {
t.Fatalf("Failed to dial bufnet: %v", err)
}
defer conn.Close()

client := service.NewHelloServiceClient(conn)

ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

spans := flusher()
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}
}
5 changes: 5 additions & 0 deletions middleware/grpc/grpc_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (g *sequentialIdGenerator) reset() {

type TestHelloService struct {
service.UnimplementedHelloServiceServer
responseHeader metadata.MD
responseTrailer metadata.MD
}

func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest) (*service.HelloResponse, error) {
Expand Down Expand Up @@ -158,6 +160,9 @@ func (s *TestHelloService) Hello(ctx context.Context, req *service.HelloRequest)
}
}

grpc.SetTrailer(ctx, s.responseTrailer)
grpc.SendHeader(ctx, s.responseHeader)

return resp, nil
}

Expand Down
34 changes: 25 additions & 9 deletions middleware/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 The OpenZipkin Authors
// Copyright 2020 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,19 +48,35 @@ func WithServerInPayloadParser(parser func(*stats.InPayload, zipkin.Span)) Serve
}
}

// WithserverInTrailerParser adds a parser for the stats.InTrailer to be able to access
// the request trailer
func WithserverInTrailerParser(parser func(*stats.InTrailer, zipkin.Span)) ServerOption {
// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request outgoing metadata
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inTrailer = parser
h.handleRPCParser.inHeader = parser
}
}

// WithServerInHeaderParser adds a parser for the stats.InHeader to be able to access
// the request payload
func WithServerInHeaderParser(parser func(*stats.InHeader, zipkin.Span)) ServerOption {
// WithServerOutPayloadParser adds a parser for the stats.OutPayload to be able to access
// the response payload
func WithServerOutPayloadParser(parser func(*stats.OutPayload, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.inHeader = parser
h.handleRPCParser.outPayload = parser
}
}

// WithServerOutTrailerParser adds a parser for the stats.OutTrailer to be able to access
// the response trailer
func WithServerOutTrailerParser(parser func(*stats.OutTrailer, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outTrailer = parser
}
}

// WithServerOutHeaderParser adds a parser for the stats.OutHeader to be able to access
// the response payload
func WithServerOutHeaderParser(parser func(*stats.OutHeader, zipkin.Span)) ServerOption {
return func(h *serverHandler) {
h.handleRPCParser.outHeader = parser
}
}

Expand Down
45 changes: 33 additions & 12 deletions middleware/grpc/server_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,30 +77,56 @@ func TestGRPCServerCreatesASpanAndContext(t *testing.T) {
}
}

func TestGRPCServerCanAccessToHeaders(t *testing.T) {
func TestGRPCServerCanAccessToPayloadAndMetadata(t *testing.T) {
tracer, flusher := createTracer(false)

s := grpc.NewServer(
grpc.StatsHandler(
zipkingrpc.NewServerHandler(
tracer,
zipkingrpc.ServerTags(map[string]string{"default": "tag"}),
zipkingrpc.WithServerInPayloadParser(func(inPayload *stats.InPayload, span zipkin.Span) {
m, ok := inPayload.Payload.(*service.HelloRequest)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloRequest")
}
if want, have := "Hello", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInHeaderParser(func(inHeader *stats.InHeader, span zipkin.Span) {
if want, have := "test_value", inHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerOutPayloadParser(func(outPayload *stats.OutPayload, span zipkin.Span) {
m, ok := outPayload.Payload.(*service.HelloResponse)
if !ok {
t.Fatal("failed to cast the payload as a service.HelloResponse")
}
if want, have := "World", m.Payload; want != have {
t.Errorf("incorrect payload: want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerOutHeaderParser(func(outHeader *stats.OutHeader, span zipkin.Span) {
if want, have := "test_value_1", outHeader.Header.Get("test_key")[0]; want != have {
t.Errorf("incorrect header value, want %q, have %q", want, have)
}
}),
zipkingrpc.WithServerInTrailerParser(func(inTrailer *stats.InTrailer, span zipkin.Span) {
if want, have := "test_value", inTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("unexpected metadata value in header, want: %q, have %q", want, have)
zipkingrpc.WithServerOutTrailerParser(func(outTrailer *stats.OutTrailer, span zipkin.Span) {
if want, have := "test_value_2", outTrailer.Trailer.Get("test_key")[0]; want != have {
t.Errorf("incorrect trailer value, want %q, have %q", want, have)
}
}),
),
),
)
defer s.Stop()

service.RegisterHelloServiceServer(s, &TestHelloService{})
service.RegisterHelloServiceServer(s, &TestHelloService{
responseHeader: metadata.Pairs("test_key", "test_value_1"),
responseTrailer: metadata.Pairs("test_key", "test_value_2"),
})

dialer := initListener(s)

Expand All @@ -118,7 +144,7 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) {

client := service.NewHelloServiceClient(conn)

ctx = metadata.AppendToOutgoingContext(ctx, "test_key", "test_value")
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("test_key", "test_value"))
_, err = client.Hello(ctx, &service.HelloRequest{
Payload: "Hello",
})
Expand All @@ -130,9 +156,4 @@ func TestGRPCServerCanAccessToHeaders(t *testing.T) {
if want, have := 1, len(spans); want != have {
t.Errorf("unexpected number of spans, want %d, have %d", want, have)
}

span := spans[0]
if want, have := model.Server, span.Kind; want != have {
t.Errorf("unexpected kind, want %q, have %q", want, have)
}
}
21 changes: 18 additions & 3 deletions middleware/grpc/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ import (
)

type handleRPCParser struct {
inPayload func(*stats.InPayload, zipkin.Span)
inTrailer func(*stats.InTrailer, zipkin.Span)
inHeader func(*stats.InHeader, zipkin.Span)
inPayload func(*stats.InPayload, zipkin.Span)
inTrailer func(*stats.InTrailer, zipkin.Span)
inHeader func(*stats.InHeader, zipkin.Span)
outPayload func(*stats.OutPayload, zipkin.Span)
outTrailer func(*stats.OutTrailer, zipkin.Span)
outHeader func(*stats.OutHeader, zipkin.Span)
}

// A RPCHandler can be registered using WithClientRPCHandler or WithServerRPCHandler to intercept calls to HandleRPC of
Expand Down Expand Up @@ -59,6 +62,18 @@ func handleRPC(ctx context.Context, rs stats.RPCStats, h handleRPCParser) {
if h.inTrailer != nil {
h.inTrailer(rs, span)
}
case *stats.OutPayload:
if h.outPayload != nil {
h.outPayload(rs, span)
}
case *stats.OutHeader:
if h.outHeader != nil {
h.outHeader(rs, span)
}
case *stats.OutTrailer:
if h.outTrailer != nil {
h.outTrailer(rs, span)
}
case *stats.End:
s, ok := status.FromError(rs.Error)
// rs.Error should always be convertable to a status, this is just a defensive check.
Expand Down

0 comments on commit 9856169

Please sign in to comment.