Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2] Implement sanitizers to operate on OTLP data #5551

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright (c) 2024 The Jaeger Authors.
varshith257 marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2

import (
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Constants for the replacement names
const (
serviceNameReplacement = "empty-service-name"
nullProcessServiceName = "null-process-and-service-name"
)

// NewEmptyServiceNameSanitizer returns a function that replaces empty service names
// with a predefined string.
func NewEmptyServiceNameSanitizer() SanitizeSpan {
return sanitizeEmptyServiceName

Check warning on line 19 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L18-L19

Added lines #L18 - L19 were not covered by tests
}

// sanitizeEmptyServiceName sanitizes the service names in the span attributes.
func sanitizeEmptyServiceName(span ptrace.Span) ptrace.Span {
attributes := span.Attributes()
serviceNameAttr, ok := attributes.Get("service.name")

Check warning on line 25 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L23-L25

Added lines #L23 - L25 were not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

service name is stored in the Resource attributes, not in the span.

Copy link
Contributor Author

@varshith257 varshith257 Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro span.Resource undefined (type ptrace.Span has no field or method Resource)compilerMissingFieldOrMethod

This means we need to access the resource from the ptrace.ResourceSpans. I think we need to work with ptrace.Traces instead of individual ptrace.Span and access the ResourceSpans which contains the Resource and associated ScopeSpans. Then we need to iterate through each ScopeSpans and their Spans. Am I in the right direction?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to work with ptrace.Traces

@james-ryans this is an interesting situation. In v1 pipeline all non-span data (like batch.Process) was first denormalized into each span so that the unit of work in the pipeline was a single span. In v2 storage API we want to support batching for better efficiency, but it would also mean that unless storage supports storing normalized data (like storing resource / scope records separately, which is unlikely in k/v stores world), each storage would need to do its own denormalization. It also creates challenges to shared capabilities like sanitizers here, since they need to operate on data uniformly before the storage is called.

We can provide a shared lib to do the flattening / denormalization of resources[]/scopes[]/spans[] into just spans[] (essentially otlp2jaeger does that today already), but I don't think it's a good solution considering that OTLP spans cannot distinguish between resource/scope/span attributes without some magic prefixes / naming. In v1 model we had a clear slot Span.Process to maintain that distinction. I am thinking that in v2 storage implementations their internal data model should also be Span{ Resource=..., Scope=..., rest of span }, to avoid unnecessary flattening.

Coming back to sanitizers, I think Sanitize(pdata.Traces) is a good starting point signature. Some sanitizers would only look at Resources in that input, while others might look at everything. The alternative would be to have distinct sanitizers for Resource, Scope, and Span, but I don't think we need this complexity at this point.

@varshith257 does this answer your question?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this answer your question?

Yes. Thanks for it


if !ok {

Check warning on line 27 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L27

Added line #L27 was not covered by tests
// If service.name is missing, set it to nullProcessServiceName
attributes.PutStr("service.name", nullProcessServiceName)
} else if serviceNameAttr.Str() == "" {

Check warning on line 30 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L29-L30

Added lines #L29 - L30 were not covered by tests
// If service.name is empty, replace it with serviceNameReplacement
attributes.PutStr("service.name", serviceNameReplacement)

Check warning on line 32 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L32

Added line #L32 was not covered by tests
}

return span

Check warning on line 35 in cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/empty_service_name_sanitizer.go#L35

Added line #L35 was not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2
14 changes: 14 additions & 0 deletions cmd/collector/app/sanitizer_v2/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2

import (
"testing"

"github.com/jaegertracing/jaeger/pkg/testutils"
)

func TestMain(m *testing.M) {
testutils.VerifyGoLeaks(m)
}
31 changes: 31 additions & 0 deletions cmd/collector/app/sanitizer_v2/sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2

import "go.opentelemetry.io/collector/pdata/ptrace"

// SanitizeSpan sanitizes/normalizes spans. Any business logic that needs to be applied to normalize the contents of a
// span should implement this interface.
type SanitizeSpan func(span ptrace.Span) ptrace.Span

// NewStandardSanitizers are automatically applied by SpanProcessor.
func NewStandardSanitizers() []SanitizeSpan {
return []SanitizeSpan{
NewEmptyServiceNameSanitizer(),

Check warning on line 15 in cmd/collector/app/sanitizer_v2/sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/sanitizer.go#L13-L15

Added lines #L13 - L15 were not covered by tests
varshith257 marked this conversation as resolved.
Show resolved Hide resolved
}
}

// NewChainedSanitizer creates a Sanitizer from the variadic list of passed Sanitizers.
// If the list only has one element, it is returned directly to minimize indirection.
func NewChainedSanitizer(sanitizers ...SanitizeSpan) SanitizeSpan {
if len(sanitizers) == 1 {
return sanitizers[0]

Check warning on line 23 in cmd/collector/app/sanitizer_v2/sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/sanitizer.go#L21-L23

Added lines #L21 - L23 were not covered by tests
}
return func(span ptrace.Span) ptrace.Span {
for _, s := range sanitizers {
span = s(span)

Check warning on line 27 in cmd/collector/app/sanitizer_v2/sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/sanitizer.go#L25-L27

Added lines #L25 - L27 were not covered by tests
}
return span

Check warning on line 29 in cmd/collector/app/sanitizer_v2/sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/sanitizer.go#L29

Added line #L29 was not covered by tests
}
}
47 changes: 47 additions & 0 deletions cmd/collector/app/sanitizer_v2/service_name_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
)

// Cache interface similar to the one in V1
type Cache interface {
Get(alias string) string
IsEmpty() bool
}

// NewServiceNameSanitizer creates a service name sanitizer with a given cache.
func NewServiceNameSanitizer(cache Cache) SanitizeSpan {
varshith257 marked this conversation as resolved.
Show resolved Hide resolved
sanitizer := serviceNameSanitizer{cache: cache}
return sanitizer.Sanitize

Check warning on line 20 in cmd/collector/app/sanitizer_v2/service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/service_name_sanitizer.go#L18-L20

Added lines #L18 - L20 were not covered by tests
}

// serviceNameSanitizer sanitizes the service names in span annotations given a source of truth alias to service cache.
type serviceNameSanitizer struct {
varshith257 marked this conversation as resolved.
Show resolved Hide resolved
cache Cache
}

// Sanitize sanitizes the service names in the span annotations.
func (s serviceNameSanitizer) Sanitize(span ptrace.Span) ptrace.Span {
if s.cache.IsEmpty() {
return span

Check warning on line 31 in cmd/collector/app/sanitizer_v2/service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/service_name_sanitizer.go#L29-L31

Added lines #L29 - L31 were not covered by tests
}

attributes := span.Attributes()
serviceNameAttr, exists := attributes.Get("service.name")
if !exists || serviceNameAttr.Type() != pcommon.ValueTypeStr {
return span

Check warning on line 37 in cmd/collector/app/sanitizer_v2/service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/service_name_sanitizer.go#L34-L37

Added lines #L34 - L37 were not covered by tests
}

alias := serviceNameAttr.Str()
serviceName := s.cache.Get(alias)
if serviceName != "" {
attributes.PutStr("service.name", serviceName)

Check warning on line 43 in cmd/collector/app/sanitizer_v2/service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/service_name_sanitizer.go#L40-L43

Added lines #L40 - L43 were not covered by tests
}

return span

Check warning on line 46 in cmd/collector/app/sanitizer_v2/service_name_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/service_name_sanitizer.go#L46

Added line #L46 was not covered by tests
}
57 changes: 57 additions & 0 deletions cmd/collector/app/sanitizer_v2/utf8_sanitizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package sanitizer_v2

import (
"fmt"
"unicode/utf8"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

const (
invalidOperation = "InvalidOperationName"
invalidService = "InvalidServiceName"
invalidTagKey = "InvalidTagKey"
)

// UTF8Sanitizer sanitizes all strings in spans.
type UTF8Sanitizer struct {
logger *zap.Logger
}

// NewUTF8Sanitizer creates a UTF8 sanitizer with logging functionality.
func NewUTF8Sanitizer(logger *zap.Logger) SanitizeSpan {
return UTF8Sanitizer{logger: logger}.Sanitize

Check warning on line 28 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}

// Sanitize sanitizes the UTF8 in the spans.
func (s UTF8Sanitizer) Sanitize(span ptrace.Span) ptrace.Span {
if !utf8.ValidString(span.Name()) {
s.logger.Info("Invalid utf8 operation name", zap.String("operation_name", span.Name()))
span.SetName(invalidOperation)

Check warning on line 35 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L32-L35

Added lines #L32 - L35 were not covered by tests
}

attributes := span.Attributes()
serviceNameAttr, ok := attributes.Get("service.name")
if ok && !utf8.ValidString(serviceNameAttr.Str()) {
s.logger.Info("Invalid utf8 service name", zap.String("service_name", serviceNameAttr.Str()))
attributes.PutStr("service.name", invalidService)

Check warning on line 42 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L38-L42

Added lines #L38 - L42 were not covered by tests
}

sanitizeAttributes(attributes)
return span

Check warning on line 46 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L45-L46

Added lines #L45 - L46 were not covered by tests
}

// sanitizeAttributes sanitizes attributes to ensure UTF8 validity.
func sanitizeAttributes(attributes pcommon.Map) {
attributes.Range(func(k string, v pcommon.Value) bool {
if v.Type() == pcommon.ValueTypeStr && !utf8.ValidString(v.Str()) {
attributes.PutStr(k, fmt.Sprintf("%s:%s", k, v.Str()))

Check warning on line 53 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L50-L53

Added lines #L50 - L53 were not covered by tests
}
return true

Check warning on line 55 in cmd/collector/app/sanitizer_v2/utf8_sanitizer.go

View check run for this annotation

Codecov / codecov/patch

cmd/collector/app/sanitizer_v2/utf8_sanitizer.go#L55

Added line #L55 was not covered by tests
})
}
Loading