Skip to content

Commit

Permalink
Feat: signozlogspipelineprocessor: address json body fields in expr s…
Browse files Browse the repository at this point in the history
…tatements in router and add operator (#421)

fixes SigNoz/signoz#5689
  • Loading branch information
raj-k-singh authored Oct 12, 2024
1 parent a975e4d commit 05799f2
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 14 deletions.
42 changes: 42 additions & 0 deletions processor/signozlogspipelineprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,48 @@ func TestBodyFieldReferencesWhenBodyIsJson(t *testing.T) {

// Collect test cases for each op that supports reading fields from JSON body

// router op should be able to specify expressions referring to fields inside JSON body
testConfWithRouter := `
operators:
- id: router_signoz
type: router
routes:
- expr: body.request.id == "test"
output: test-add
- type: add
id: test-add
field: attributes.test
value: test-value
`
testCases = append(testCases, testCase{
"router/happy_case", testConfWithRouter,
makePlog(`{"request": {"id": "test"}}`, map[string]any{}),
makePlog(`{"request": {"id": "test"}}`, map[string]any{"test": "test-value"}),
})
testCases = append(testCases, testCase{
"router/body_not_json", testConfWithRouter,
makePlog(`test`, map[string]any{}),
makePlog(`test`, map[string]any{}),
})

// Add op should be able to specify expressions referring to body JSON field
testAddOpConf := `
operators:
- type: add
field: attributes.request_id
value: EXPR(body.request.id)
`
testCases = append(testCases, testCase{
"add/happy_case", testAddOpConf,
makePlog(`{"request": {"id": "test"}}`, map[string]any{}),
makePlog(`{"request": {"id": "test"}}`, map[string]any{"request_id": "test"}),
})
testCases = append(testCases, testCase{
"add/body_not_json", testAddOpConf,
makePlog(`test`, map[string]any{}),
makePlog(`test`, map[string]any{}),
})

// copy
testCopyOpConf := `
operators:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f BodyField) String() string {
}

// returns nil if the body was not JSON
func parseBodyJson(entry *entry.Entry) map[string]any {
func ParseBodyJson(entry *entry.Entry) map[string]any {
// If body is already a map, return it as-is
bodyMap, ok := entry.Body.(map[string]any)
if ok {
Expand Down Expand Up @@ -105,7 +105,7 @@ func (f BodyField) Get(entry *entry.Entry) (any, bool) {
// If path inside body field has been specified, try to
// interpret body as a map[string]any - parsing JSON if needed
if len(f.Keys) > 0 {
bodyJson := parseBodyJson(entry)
bodyJson := ParseBodyJson(entry)
if bodyJson != nil {
currentValue = bodyJson
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"os"
"sync"

signozstanzaentry "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/entry"
"github.com/expr-lang/expr"
"github.com/expr-lang/expr/ast"
"github.com/expr-lang/expr/vm"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
)

Expand All @@ -19,7 +23,7 @@ var envPool = sync.Pool{
}

// GetExprEnv returns a map of key/value pairs that can be be used to evaluate an expression
func GetExprEnv(e *entry.Entry) map[string]any {
func GetExprEnv(e *entry.Entry, forExprWithBodyFieldRef bool) map[string]any {
env := envPool.Get().(map[string]any)
env["$"] = e.Body
env["body"] = e.Body
Expand All @@ -29,10 +33,88 @@ func GetExprEnv(e *entry.Entry) map[string]any {
env["severity_text"] = e.SeverityText
env["severity_number"] = int(e.Severity)

if forExprWithBodyFieldRef {
env["body_map"] = signozstanzaentry.ParseBodyJson(e)
}

return env
}

// PutExprEnv adds a key/value pair that will can be used to evaluate an expression
func PutExprEnv(e map[string]any) {
envPool.Put(e)
}

func ExprCompile(input string) (
program *vm.Program, hasBodyFieldRef bool, err error,
) {
patcher := &signozExprPatcher{}
program, err = expr.Compile(
input, expr.AllowUndefinedVariables(), expr.Patch(patcher),
)
if err != nil {
return nil, false, err
}

return program, patcher.foundBodyFieldRef, err
}

func ExprCompileBool(input string) (
program *vm.Program, hasBodyFieldRef bool, err error,
) {
patcher := &signozExprPatcher{}
program, err = expr.Compile(
input, expr.AllowUndefinedVariables(), expr.Patch(patcher), expr.AsBool(),
)
if err != nil {
return nil, false, err
}

return program, patcher.foundBodyFieldRef, err
}

type signozExprPatcher struct {
// set to true if the patcher encounters a reference to a body field
// (like `body.request.id`) while compiling an expression
foundBodyFieldRef bool
}

func (p *signozExprPatcher) Visit(node *ast.Node) {
// Change all references to fields inside body (eg: body.request.id)
// to refer inside body_map instead (eg: body_map.request.id)
//
// `body_map` is supplied in expr env's by JSON parsing the log body
// when it contains serialized JSON
memberAccessNode, isMemberNode := (*node).(*ast.MemberNode)
if isMemberNode {
// MemberNode represents a member access in expr
// It can be a field access, a method call, or an array element access.
// Example: `foo.bar` or `foo["bar"]` or `foo.bar()` or `array[0]`
//
// `memberNode.Node` is the node whose property/member is being accessed.
// Eg: the node for `body` in `body.request.id`
//
// `memberNode.Property` is the property being accessed on `memberNode.Node`
// Eg: the AST for `request.id` in `body.request.id`
//
// Change all `MemberNode`s where the target (`memberNode.Node`)
// is `body` to target `body_map` instead.
identifierNode, isIdentifierNode := (memberAccessNode.Node).(*ast.IdentifierNode)
if isIdentifierNode && identifierNode.Value == "body" {
identifierNode.Value = "body_map"
p.foundBodyFieldRef = true
}
}

n, ok := (*node).(*ast.CallNode)
if !ok {
return
}
c, ok := (n.Callee).(*ast.IdentifierNode)
if !ok {
return
}
if c.Value == "env" {
c.Value = "os_env_func"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"go.opentelemetry.io/collector/component"

signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
Expand Down Expand Up @@ -58,11 +59,12 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
exprStr := strings.TrimPrefix(strVal, "EXPR(")
exprStr = strings.TrimSuffix(exprStr, ")")

compiled, err := helper.ExprCompile(exprStr)
compiled, hasBodyFieldRef, err := signozstanzahelper.ExprCompile(exprStr)
if err != nil {
return nil, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err)
}

addOperator.program = compiled
addOperator.valueExprHasBodyFieldRef = hasBodyFieldRef
return addOperator, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
type Transformer struct {
helper.TransformerOperator

Field entry.Field
Value any
program *vm.Program
Field entry.Field
Value any
program *vm.Program
valueExprHasBodyFieldRef bool
}

// Process will process an entry with a add transformation.
Expand All @@ -34,7 +35,7 @@ func (t *Transformer) Transform(e *entry.Entry) error {
return e.Set(t.Field, t.Value)
}
if t.program != nil {
env := signozstanzahelper.GetExprEnv(e)
env := signozstanzahelper.GetExprEnv(e, t.valueExprHasBodyFieldRef)
defer signozstanzahelper.PutExprEnv(env)

result, err := vm.Run(t.program, env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.opentelemetry.io/collector/component"

signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator"
signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
)
Expand Down Expand Up @@ -61,7 +62,7 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error

routes := make([]*Route, 0, len(c.Routes))
for _, routeConfig := range c.Routes {
compiled, err := helper.ExprCompileBool(routeConfig.Expression)
compiled, hasBodyFieldRef, err := signozstanzahelper.ExprCompileBool(routeConfig.Expression)
if err != nil {
return nil, fmt.Errorf("failed to compile expression '%s': %w", routeConfig.Expression, err)
}
Expand All @@ -72,9 +73,10 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
}

route := Route{
Attributer: attributer,
Expression: compiled,
OutputIDs: routeConfig.OutputIDs,
Attributer: attributer,
Expression: compiled,
exprHasBodyFieldRef: hasBodyFieldRef,
OutputIDs: routeConfig.OutputIDs,
}
routes = append(routes, &route)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package router
import (
"context"
"fmt"
"slices"

signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper"
"github.com/expr-lang/expr/vm"
Expand All @@ -24,7 +25,10 @@ type Transformer struct {
// Route is a route on a router operator
type Route struct {
helper.Attributer
Expression *vm.Program

Expression *vm.Program
exprHasBodyFieldRef bool

OutputIDs []string
OutputOperators []operator.Operator
}
Expand All @@ -36,7 +40,10 @@ func (t *Transformer) CanProcess() bool {

// Process will route incoming entries based on matching expressions
func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error {
env := signozstanzahelper.GetExprEnv(entry)
routesHaveBodyFieldRef := slices.ContainsFunc(
t.routes, func(r *Route) bool { return r.exprHasBodyFieldRef },
)
env := signozstanzahelper.GetExprEnv(entry, routesHaveBodyFieldRef)
defer signozstanzahelper.PutExprEnv(env)

for _, route := range t.routes {
Expand Down

0 comments on commit 05799f2

Please sign in to comment.