Skip to content

Commit

Permalink
fix: set correct clickhouse aggregation functions when using consolid…
Browse files Browse the repository at this point in the history
…ateBy (go-graphite#281)
  • Loading branch information
mchrome authored Jul 5, 2024
1 parent e7c0053 commit 3d3a35a
Show file tree
Hide file tree
Showing 10 changed files with 508 additions and 35 deletions.
34 changes: 32 additions & 2 deletions cmd/e2e-test/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"strings"
"time"

"github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/helper/client"
"github.com/lomik/graphite-clickhouse/helper/datetime"
"github.com/lomik/graphite-clickhouse/helper/tests/compare"
Expand Down Expand Up @@ -271,6 +272,24 @@ func compareRender(errors *[]string, name, url string, actual, expected []client
}
}

func parseFilteringFunctions(strFilteringFuncs []string) ([]*carbonapi_v3_pb.FilteringFunction, error) {
res := make([]*carbonapi_v3_pb.FilteringFunction, 0, len(strFilteringFuncs))
for _, strFF := range strFilteringFuncs {
strFFSplit := strings.Split(strFF, "(")
if len(strFFSplit) != 2 {
return nil, fmt.Errorf("could not parse filtering function: %s", strFF)
}
name := strFFSplit[0]
args := strings.Split(strFFSplit[1], ",")
for i := range args {
args[i] = strings.TrimSpace(args[i])
args[i] = strings.Trim(args[i], ")'")
}
res = append(res, &carbonapi_v3_pb.FilteringFunction{Name: name, Arguments: args})
}
return res, nil
}

func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, defaultPreision time.Duration) []string {
var errors []string
httpClient := http.Client{
Expand All @@ -280,7 +299,18 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
from := datetime.TimestampTruncate(check.from, defaultPreision)
until := datetime.TimestampTruncate(check.until, defaultPreision)
for _, format := range check.Formats {
if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {

var filteringFunctions []*carbonapi_v3_pb.FilteringFunction
if format == client.FormatPb_v3 {
var err error
filteringFunctions, err = parseFilteringFunctions(check.FilteringFunctions)
if err != nil {
errors = append(errors, err.Error())
continue
}
}

if url, result, respHeader, err := client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
id := requestId(respHeader)
name := ""
if check.ErrorRegexp != "" {
Expand All @@ -303,7 +333,7 @@ func verifyRender(ch *Clickhouse, gch *GraphiteClickhouse, check *RenderCheck, d
if check.CacheTTL > 0 && check.ErrorRegexp == "" {
// second query must be find-cached
name = "cache"
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, from, until); err == nil {
if url, result, respHeader, err = client.Render(&httpClient, address, format, check.Targets, filteringFunctions, check.MaxDataPoints, from, until); err == nil {
compareRender(&errors, name, url, result, check.result, true, respHeader, check.CacheTTL)
} else {
errStr := strings.TrimRight(err.Error(), "\n")
Expand Down
19 changes: 12 additions & 7 deletions cmd/e2e-test/e2etesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ type Metric struct {
}

type RenderCheck struct {
Name string `toml:"name"`
Formats []client.FormatType `toml:"formats"`
From string `toml:"from"`
Until string `toml:"until"`
Targets []string `toml:"targets"`
Timeout time.Duration `toml:"timeout"`
DumpIfEmpty []string `toml:"dump_if_empty"`
Name string `toml:"name"`
Formats []client.FormatType `toml:"formats"`
From string `toml:"from"`
Until string `toml:"until"`
Targets []string `toml:"targets"`
MaxDataPoints int64 `toml:"max_data_points"`
FilteringFunctions []string `toml:"filtering_functions"`
Timeout time.Duration `toml:"timeout"`
DumpIfEmpty []string `toml:"dump_if_empty"`

Optimize []string `toml:"optimize"` // optimize tables before run tests

Expand Down Expand Up @@ -338,6 +340,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand All @@ -361,6 +364,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand All @@ -377,6 +381,7 @@ func verifyGraphiteClickhouse(test *TestSchema, gch *GraphiteClickhouse, clickho
zap.String("clickhouse config", clickhouseDir),
zap.String("graphite-clickhouse config", gch.ConfigTpl),
zap.Strings("targets", check.Targets),
zap.Strings("filtering_functions", check.FilteringFunctions),
zap.String("from_raw", check.From),
zap.String("until_raw", check.Until),
zap.Int64("from", check.from),
Expand Down
10 changes: 9 additions & 1 deletion cmd/graphite-clickhouse-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"

"github.com/go-graphite/protocol/carbonapi_v3_pb"
"github.com/lomik/graphite-clickhouse/helper/client"
"github.com/lomik/graphite-clickhouse/helper/datetime"
)
Expand All @@ -31,6 +33,7 @@ func main() {
address := flag.String("address", "http://127.0.0.1:9090", "Address of graphite-clickhouse server")
fromStr := flag.String("from", "0", "from")
untilStr := flag.String("until", "", "until")
maxDataPointsStr := flag.String("maxDataPoints", "1048576", "Maximum amount of datapoints in response")

metricsFind := flag.String("find", "", "Query for /metrics/find/ , valid formats are carbonapi_v3_pb. protobuf, pickle")

Expand Down Expand Up @@ -71,6 +74,11 @@ func main() {
fmt.Printf("invalid until: %s\n", *untilStr)
os.Exit(1)
}
maxDataPoints, err := strconv.ParseInt(*maxDataPointsStr, 10, 64)
if err != nil {
fmt.Printf("invalid maxDataPoints: %s\n", *maxDataPointsStr)
os.Exit(1)
}

httpClient := http.Client{
Timeout: *timeout,
Expand Down Expand Up @@ -182,7 +190,7 @@ func main() {
if formatRender == client.FormatDefault {
formatRender = client.FormatPb_v3
}
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, int64(from), int64(until))
queryRaw, r, respHeader, err := client.Render(&httpClient, *address, formatRender, targets, []*carbonapi_v3_pb.FilteringFunction{}, maxDataPoints, int64(from), int64(until))
if respHeader != nil {
fmt.Printf("Responce header: %+v\n", respHeader)
}
Expand Down
22 changes: 13 additions & 9 deletions helper/client/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Metric struct {

// Render do /metrics/find/ request
// Valid formats are carbonapi_v3_pb. protobuf, pickle, json
func Render(client *http.Client, address string, format FormatType, targets []string, from, until int64) (string, []Metric, http.Header, error) {
func Render(client *http.Client, address string, format FormatType, targets []string, filteringFunctions []*protov3.FilteringFunction, maxDataPoints, from, until int64) (string, []Metric, http.Header, error) {
rUrl := "/render/"
if format == FormatDefault {
format = FormatPb_v3
Expand All @@ -56,6 +56,7 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
fromStr := strconv.FormatInt(from, 10)
untilStr := strconv.FormatInt(until, 10)
maxDataPointsStr := strconv.FormatInt(maxDataPoints, 10)

u, err := url.Parse(address + rUrl)
if err != nil {
Expand All @@ -77,10 +78,12 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
for i, target := range targets {
r.Metrics[i] = protov3.FetchRequest{
Name: target,
StartTime: from,
StopTime: until,
PathExpression: target,
Name: target,
StartTime: from,
StopTime: until,
PathExpression: target,
FilterFunctions: filteringFunctions,
MaxDataPoints: maxDataPoints,
}
}

Expand All @@ -93,10 +96,11 @@ func Render(client *http.Client, address string, format FormatType, targets []st
}
case FormatPb_v2, FormatProtobuf, FormatPickle, FormatJSON:
v := url.Values{
"format": []string{format.String()},
"from": []string{fromStr},
"until": []string{untilStr},
"target": targets,
"format": []string{format.String()},
"from": []string{fromStr},
"until": []string{untilStr},
"target": targets,
"maxDataPoints": []string{maxDataPointsStr},
}
u.RawQuery = v.Encode()
default:
Expand Down
8 changes: 6 additions & 2 deletions render/data/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ func (d *Data) GetAggregation(id uint32) (string, error) {
if err != nil {
return function, err
}
if function == "any" || function == "anyLast" {
switch function {
case "any":
return "first", nil
case "anyLast":
return "last", nil
default:
return function, nil
}
return function, nil
}

// data wraps Data and adds asynchronous processing of data
Expand Down
17 changes: 14 additions & 3 deletions render/data/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"os"
"strings"
"sync"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/lomik/graphite-clickhouse/config"
"github.com/lomik/graphite-clickhouse/helper/clickhouse"
"github.com/lomik/graphite-clickhouse/helper/errs"
"github.com/lomik/graphite-clickhouse/helper/rollup"
"github.com/lomik/graphite-clickhouse/metrics"
"github.com/lomik/graphite-clickhouse/pkg/dry"
Expand Down Expand Up @@ -145,7 +147,11 @@ func (q *query) getDataPoints(ctx context.Context, cond *conditions) error {
// carbonlink request
carbonlinkResponseRead := queryCarbonlink(ctx, carbonlink, cond.metricsUnreverse)

cond.prepareLookup()
err = cond.prepareLookup()
if err != nil {
logger.Error("prepare_lookup", zap.Error(err))
return errs.NewErrorWithCode(err.Error(), http.StatusBadRequest)
}
cond.setStep(q.cStep)
if cond.step < 1 {
return ErrSetStepTimeout
Expand Down Expand Up @@ -279,7 +285,7 @@ func (c *conditions) prepareMetricsLists() {
}
}

func (c *conditions) prepareLookup() {
func (c *conditions) prepareLookup() error {
age := uint32(dry.Max(0, time.Now().Unix()-c.From))
c.aggregations = make(map[string][]string)
c.appliedFunctions = make(map[string][]string)
Expand All @@ -295,7 +301,11 @@ func (c *conditions) prepareLookup() {
// Currently it just finds the first target matching the metric
// to avoid making multiple request for every type of aggregation for a given metric.
for _, alias := range c.AM.Get(c.metricsUnreverse[i]) {
if requestedAgg := c.GetRequestedAggregation(alias.Target); requestedAgg != "" {
requestedAgg, err := c.GetRequestedAggregation(alias.Target)
if err != nil {
return fmt.Errorf("failed to choose appropriate aggregation for '%s': %s", alias.Target, err.Error())
}
if requestedAgg != "" {
agg = rollup.AggrMap[requestedAgg]
c.appliedFunctions[alias.Target] = []string{graphiteConsolidationFunction}
break
Expand Down Expand Up @@ -330,6 +340,7 @@ func (c *conditions) prepareLookup() {
mm.WriteString(c.metricsRequested[i] + "\n")
}
}
return nil
}

var ErrSetStepTimeout = errors.New("unexpected error, setStep timeout")
Expand Down
46 changes: 35 additions & 11 deletions render/data/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,23 +131,47 @@ TableLoop:
return fmt.Errorf("data tables is not specified for %v", tt.List[0])
}

func (tt *Targets) GetRequestedAggregation(target string) string {
func (tt *Targets) GetRequestedAggregation(target string) (string, error) {
if ffs, ok := tt.filteringFunctionsByTarget[target]; !ok {
return ""
return "", nil
} else {
for _, filteringFunc := range ffs {
ffName := filteringFunc.GetName()
ffArgs := filteringFunc.GetArguments()
if ffName == graphiteConsolidationFunction && len(ffArgs) > 0 {
// Graphite standard supports both average and avg.
// It is the only aggregation that has two aliases.
// https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.consolidateBy
if ffArgs[0] == "average" {
return "avg"
}
return ffArgs[0]

if ffName != graphiteConsolidationFunction {
continue
}

if len(ffArgs) < 1 {
return "", fmt.Errorf("no argumets were provided to consolidateBy function")
}

switch ffArgs[0] {
// 'last' in graphite == clickhouse aggregate function 'anyLast'
case "last":
return "anyLast", nil
// 'first' in graphite == clickhouse aggregate function 'any'
case "first":
return "any", nil
// Graphite standard supports both average and avg.
// It is the only aggregation that has two aliases.
// https://graphite.readthedocs.io/en/latest/functions.html#graphite.render.functions.consolidateBy
case "average":
return "avg", nil
// avg, sum, max, min have the same name in clickhouse
case "avg", "sum", "max", "min":
return ffArgs[0], nil
default:
return "",
fmt.Errorf(
"unknown \"%s\" argument function (allowed argumets are: 'avg', 'average', 'sum', 'max', 'min', 'last', 'first'): recieved %s",
graphiteConsolidationFunction,
ffArgs[0],
)

}
}
}
return ""
return "", nil
}
45 changes: 45 additions & 0 deletions tests/consolidateBy/carbon-clickhouse.conf.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[common]

[data]
path = "/etc/carbon-clickhouse/data"
chunk-interval = "1s"
chunk-auto-interval = ""

[upload.graphite_index]
type = "index"
table = "graphite_index"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
cache-ttl = "1h"

[upload.graphite_tags]
type = "tagged"
table = "graphite_tags"
threads = 3
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
cache-ttl = "1h"

[upload.graphite_reverse]
type = "points-reverse"
table = "graphite_reverse"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
zero-timestamp = false

[upload.graphite]
type = "points"
table = "graphite"
url = "{{ .CLICKHOUSE_URL }}/"
timeout = "2m30s"
zero-timestamp = false

[tcp]
listen = ":2003"
enabled = true
drop-future = "0s"
drop-past = "0s"

[logging]
file = "/etc/carbon-clickhouse/carbon-clickhouse.log"
level = "debug"
Loading

0 comments on commit 3d3a35a

Please sign in to comment.