From 05799f238df5f808f5dbe7ae7102b53061534545 Mon Sep 17 00:00:00 2001 From: Raj Kamal Singh <1133322+raj-k-singh@users.noreply.github.com> Date: Sat, 12 Oct 2024 13:25:52 +0530 Subject: [PATCH] Feat: signozlogspipelineprocessor: address json body fields in expr statements in router and add operator (#421) fixes https://github.com/SigNoz/signoz/issues/5689 --- .../processor_test.go | 42 ++++++++++ .../stanza/entry/body_field.go | 4 +- .../stanza/operator/helper/expr.go | 84 ++++++++++++++++++- .../stanza/operator/operators/add/config.go | 4 +- .../operator/operators/add/transformer.go | 9 +- .../operator/operators/router/config.go | 10 ++- .../operator/operators/router/transformer.go | 11 ++- 7 files changed, 150 insertions(+), 14 deletions(-) diff --git a/processor/signozlogspipelineprocessor/processor_test.go b/processor/signozlogspipelineprocessor/processor_test.go index abfbc50b..39517562 100644 --- a/processor/signozlogspipelineprocessor/processor_test.go +++ b/processor/signozlogspipelineprocessor/processor_test.go @@ -454,6 +454,48 @@ func TestBodyFieldReferencesWhenBodyIsJson(t *testing.T) { // Collect test cases for each op that supports reading fields from JSON body + // router op should be able to specify expressions referring to fields inside JSON body + testConfWithRouter := ` + operators: + - id: router_signoz + type: router + routes: + - expr: body.request.id == "test" + output: test-add + - type: add + id: test-add + field: attributes.test + value: test-value + ` + testCases = append(testCases, testCase{ + "router/happy_case", testConfWithRouter, + makePlog(`{"request": {"id": "test"}}`, map[string]any{}), + makePlog(`{"request": {"id": "test"}}`, map[string]any{"test": "test-value"}), + }) + testCases = append(testCases, testCase{ + "router/body_not_json", testConfWithRouter, + makePlog(`test`, map[string]any{}), + makePlog(`test`, map[string]any{}), + }) + + // Add op should be able to specify expressions referring to body JSON field + testAddOpConf := ` + operators: + - type: add + field: attributes.request_id + value: EXPR(body.request.id) + ` + testCases = append(testCases, testCase{ + "add/happy_case", testAddOpConf, + makePlog(`{"request": {"id": "test"}}`, map[string]any{}), + makePlog(`{"request": {"id": "test"}}`, map[string]any{"request_id": "test"}), + }) + testCases = append(testCases, testCase{ + "add/body_not_json", testAddOpConf, + makePlog(`test`, map[string]any{}), + makePlog(`test`, map[string]any{}), + }) + // copy testCopyOpConf := ` operators: diff --git a/processor/signozlogspipelineprocessor/stanza/entry/body_field.go b/processor/signozlogspipelineprocessor/stanza/entry/body_field.go index 43001fd9..6fc551aa 100644 --- a/processor/signozlogspipelineprocessor/stanza/entry/body_field.go +++ b/processor/signozlogspipelineprocessor/stanza/entry/body_field.go @@ -55,7 +55,7 @@ func (f BodyField) String() string { } // returns nil if the body was not JSON -func parseBodyJson(entry *entry.Entry) map[string]any { +func ParseBodyJson(entry *entry.Entry) map[string]any { // If body is already a map, return it as-is bodyMap, ok := entry.Body.(map[string]any) if ok { @@ -105,7 +105,7 @@ func (f BodyField) Get(entry *entry.Entry) (any, bool) { // If path inside body field has been specified, try to // interpret body as a map[string]any - parsing JSON if needed if len(f.Keys) > 0 { - bodyJson := parseBodyJson(entry) + bodyJson := ParseBodyJson(entry) if bodyJson != nil { currentValue = bodyJson } diff --git a/processor/signozlogspipelineprocessor/stanza/operator/helper/expr.go b/processor/signozlogspipelineprocessor/stanza/operator/helper/expr.go index b6c4d42a..85de873e 100644 --- a/processor/signozlogspipelineprocessor/stanza/operator/helper/expr.go +++ b/processor/signozlogspipelineprocessor/stanza/operator/helper/expr.go @@ -7,6 +7,10 @@ import ( "os" "sync" + signozstanzaentry "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/entry" + "github.com/expr-lang/expr" + "github.com/expr-lang/expr/ast" + "github.com/expr-lang/expr/vm" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" ) @@ -19,7 +23,7 @@ var envPool = sync.Pool{ } // GetExprEnv returns a map of key/value pairs that can be be used to evaluate an expression -func GetExprEnv(e *entry.Entry) map[string]any { +func GetExprEnv(e *entry.Entry, forExprWithBodyFieldRef bool) map[string]any { env := envPool.Get().(map[string]any) env["$"] = e.Body env["body"] = e.Body @@ -29,6 +33,10 @@ func GetExprEnv(e *entry.Entry) map[string]any { env["severity_text"] = e.SeverityText env["severity_number"] = int(e.Severity) + if forExprWithBodyFieldRef { + env["body_map"] = signozstanzaentry.ParseBodyJson(e) + } + return env } @@ -36,3 +44,77 @@ func GetExprEnv(e *entry.Entry) map[string]any { func PutExprEnv(e map[string]any) { envPool.Put(e) } + +func ExprCompile(input string) ( + program *vm.Program, hasBodyFieldRef bool, err error, +) { + patcher := &signozExprPatcher{} + program, err = expr.Compile( + input, expr.AllowUndefinedVariables(), expr.Patch(patcher), + ) + if err != nil { + return nil, false, err + } + + return program, patcher.foundBodyFieldRef, err +} + +func ExprCompileBool(input string) ( + program *vm.Program, hasBodyFieldRef bool, err error, +) { + patcher := &signozExprPatcher{} + program, err = expr.Compile( + input, expr.AllowUndefinedVariables(), expr.Patch(patcher), expr.AsBool(), + ) + if err != nil { + return nil, false, err + } + + return program, patcher.foundBodyFieldRef, err +} + +type signozExprPatcher struct { + // set to true if the patcher encounters a reference to a body field + // (like `body.request.id`) while compiling an expression + foundBodyFieldRef bool +} + +func (p *signozExprPatcher) Visit(node *ast.Node) { + // Change all references to fields inside body (eg: body.request.id) + // to refer inside body_map instead (eg: body_map.request.id) + // + // `body_map` is supplied in expr env's by JSON parsing the log body + // when it contains serialized JSON + memberAccessNode, isMemberNode := (*node).(*ast.MemberNode) + if isMemberNode { + // MemberNode represents a member access in expr + // It can be a field access, a method call, or an array element access. + // Example: `foo.bar` or `foo["bar"]` or `foo.bar()` or `array[0]` + // + // `memberNode.Node` is the node whose property/member is being accessed. + // Eg: the node for `body` in `body.request.id` + // + // `memberNode.Property` is the property being accessed on `memberNode.Node` + // Eg: the AST for `request.id` in `body.request.id` + // + // Change all `MemberNode`s where the target (`memberNode.Node`) + // is `body` to target `body_map` instead. + identifierNode, isIdentifierNode := (memberAccessNode.Node).(*ast.IdentifierNode) + if isIdentifierNode && identifierNode.Value == "body" { + identifierNode.Value = "body_map" + p.foundBodyFieldRef = true + } + } + + n, ok := (*node).(*ast.CallNode) + if !ok { + return + } + c, ok := (n.Callee).(*ast.IdentifierNode) + if !ok { + return + } + if c.Value == "env" { + c.Value = "os_env_func" + } +} diff --git a/processor/signozlogspipelineprocessor/stanza/operator/operators/add/config.go b/processor/signozlogspipelineprocessor/stanza/operator/operators/add/config.go index f4fe6310..06e573f6 100644 --- a/processor/signozlogspipelineprocessor/stanza/operator/operators/add/config.go +++ b/processor/signozlogspipelineprocessor/stanza/operator/operators/add/config.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/component" signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator" + signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" @@ -58,11 +59,12 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error exprStr := strings.TrimPrefix(strVal, "EXPR(") exprStr = strings.TrimSuffix(exprStr, ")") - compiled, err := helper.ExprCompile(exprStr) + compiled, hasBodyFieldRef, err := signozstanzahelper.ExprCompile(exprStr) if err != nil { return nil, fmt.Errorf("failed to compile expression '%s': %w", c.IfExpr, err) } addOperator.program = compiled + addOperator.valueExprHasBodyFieldRef = hasBodyFieldRef return addOperator, nil } diff --git a/processor/signozlogspipelineprocessor/stanza/operator/operators/add/transformer.go b/processor/signozlogspipelineprocessor/stanza/operator/operators/add/transformer.go index 99f63553..7b94ef84 100644 --- a/processor/signozlogspipelineprocessor/stanza/operator/operators/add/transformer.go +++ b/processor/signozlogspipelineprocessor/stanza/operator/operators/add/transformer.go @@ -18,9 +18,10 @@ import ( type Transformer struct { helper.TransformerOperator - Field entry.Field - Value any - program *vm.Program + Field entry.Field + Value any + program *vm.Program + valueExprHasBodyFieldRef bool } // Process will process an entry with a add transformation. @@ -34,7 +35,7 @@ func (t *Transformer) Transform(e *entry.Entry) error { return e.Set(t.Field, t.Value) } if t.program != nil { - env := signozstanzahelper.GetExprEnv(e) + env := signozstanzahelper.GetExprEnv(e, t.valueExprHasBodyFieldRef) defer signozstanzahelper.PutExprEnv(env) result, err := vm.Run(t.program, env) diff --git a/processor/signozlogspipelineprocessor/stanza/operator/operators/router/config.go b/processor/signozlogspipelineprocessor/stanza/operator/operators/router/config.go index 0be31c30..c6e0fe4e 100644 --- a/processor/signozlogspipelineprocessor/stanza/operator/operators/router/config.go +++ b/processor/signozlogspipelineprocessor/stanza/operator/operators/router/config.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/component" signozlogspipelinestanzaoperator "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator" + signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" ) @@ -61,7 +62,7 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error routes := make([]*Route, 0, len(c.Routes)) for _, routeConfig := range c.Routes { - compiled, err := helper.ExprCompileBool(routeConfig.Expression) + compiled, hasBodyFieldRef, err := signozstanzahelper.ExprCompileBool(routeConfig.Expression) if err != nil { return nil, fmt.Errorf("failed to compile expression '%s': %w", routeConfig.Expression, err) } @@ -72,9 +73,10 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error } route := Route{ - Attributer: attributer, - Expression: compiled, - OutputIDs: routeConfig.OutputIDs, + Attributer: attributer, + Expression: compiled, + exprHasBodyFieldRef: hasBodyFieldRef, + OutputIDs: routeConfig.OutputIDs, } routes = append(routes, &route) } diff --git a/processor/signozlogspipelineprocessor/stanza/operator/operators/router/transformer.go b/processor/signozlogspipelineprocessor/stanza/operator/operators/router/transformer.go index cbf10ab3..79d07593 100644 --- a/processor/signozlogspipelineprocessor/stanza/operator/operators/router/transformer.go +++ b/processor/signozlogspipelineprocessor/stanza/operator/operators/router/transformer.go @@ -5,6 +5,7 @@ package router import ( "context" "fmt" + "slices" signozstanzahelper "github.com/SigNoz/signoz-otel-collector/processor/signozlogspipelineprocessor/stanza/operator/helper" "github.com/expr-lang/expr/vm" @@ -24,7 +25,10 @@ type Transformer struct { // Route is a route on a router operator type Route struct { helper.Attributer - Expression *vm.Program + + Expression *vm.Program + exprHasBodyFieldRef bool + OutputIDs []string OutputOperators []operator.Operator } @@ -36,7 +40,10 @@ func (t *Transformer) CanProcess() bool { // Process will route incoming entries based on matching expressions func (t *Transformer) Process(ctx context.Context, entry *entry.Entry) error { - env := signozstanzahelper.GetExprEnv(entry) + routesHaveBodyFieldRef := slices.ContainsFunc( + t.routes, func(r *Route) bool { return r.exprHasBodyFieldRef }, + ) + env := signozstanzahelper.GetExprEnv(entry, routesHaveBodyFieldRef) defer signozstanzahelper.PutExprEnv(env) for _, route := range t.routes {