From 6814c743c7726b9041864680c54dc27dc6c7150a Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 26 Sep 2024 01:45:11 +0530 Subject: [PATCH 1/3] Add initial version of clickhousemetricsexporterv2 --- .../clickhousemetricsexporterv2/config.go | 47 ++ .../config_test.go | 1 + .../clickhousemetricsexporterv2/exporter.go | 783 ++++++++++++++++++ .../exporter_test.go | 374 +++++++++ .../clickhousemetricsexporterv2/factory.go | 106 +++ .../factory_test.go | 1 + .../fingerprint.go | 64 ++ .../clickhousemetricsexporterv2/helper.go | 109 +++ .../clickhousemetricsexporterv2/test_data.go | 267 ++++++ 9 files changed, 1752 insertions(+) create mode 100644 exporter/clickhousemetricsexporterv2/config.go create mode 100644 exporter/clickhousemetricsexporterv2/config_test.go create mode 100644 exporter/clickhousemetricsexporterv2/exporter.go create mode 100644 exporter/clickhousemetricsexporterv2/exporter_test.go create mode 100644 exporter/clickhousemetricsexporterv2/factory.go create mode 100644 exporter/clickhousemetricsexporterv2/factory_test.go create mode 100644 exporter/clickhousemetricsexporterv2/fingerprint.go create mode 100644 exporter/clickhousemetricsexporterv2/helper.go create mode 100644 exporter/clickhousemetricsexporterv2/test_data.go diff --git a/exporter/clickhousemetricsexporterv2/config.go b/exporter/clickhousemetricsexporterv2/config.go new file mode 100644 index 00000000..1d744b1c --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/config.go @@ -0,0 +1,47 @@ +package clickhousemetricsexporterv2 + +import ( + "errors" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +// Config defines configuration for ClickHouse Metrics exporter. +type Config struct { + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configretry.BackOffConfig `mapstructure:"retry_on_failure"` + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + + DSN string `mapstructure:"dsn"` + + EnableExpHist bool `mapstructure:"enable_exp_hist"` + + Database string + SamplesTable string + TimeSeriesTable string + ExpHistTable string +} + +var _ component.Config = (*Config)(nil) + +// Validate checks if the exporter configuration is valid +func (cfg *Config) Validate() error { + if cfg.DSN == "" { + return errors.New("dsn must be specified") + } + if err := cfg.QueueSettings.Validate(); err != nil { + return err + } + + if err := cfg.TimeoutSettings.Validate(); err != nil { + return err + } + + if err := cfg.BackOffConfig.Validate(); err != nil { + return err + } + + return nil +} diff --git a/exporter/clickhousemetricsexporterv2/config_test.go b/exporter/clickhousemetricsexporterv2/config_test.go new file mode 100644 index 00000000..ffa2c564 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/config_test.go @@ -0,0 +1 @@ +package clickhousemetricsexporterv2 diff --git a/exporter/clickhousemetricsexporterv2/exporter.go b/exporter/clickhousemetricsexporterv2/exporter.go new file mode 100644 index 00000000..53503fad --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/exporter.go @@ -0,0 +1,783 @@ +package clickhousemetricsexporterv2 + +import ( + "context" + "fmt" + "math" + "strconv" + "sync" + "time" + + chproto "github.com/ClickHouse/ch-go/proto" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/jellydator/ttlcache/v3" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + semconv "go.opentelemetry.io/collector/semconv/v1.5.0" + "go.uber.org/zap" +) + +var ( + countSuffix = ".count" + sumSuffix = ".sum" + minSuffix = ".min" + maxSuffix = ".max" + bucketSuffix = ".bucket" + quantilesSuffix = ".quantile" + samplesSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)" + timeSeriesSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, description, unit, type, is_monotonic, fingerprint, unix_milli, labels, attrs, scope_attrs, resource_attrs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + expHistSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, count, sum, min, max, sketch) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +) + +type clickhouseMetricsExporter struct { + cfg *Config + logger *zap.Logger + cache *ttlcache.Cache[string, bool] + conn clickhouse.Conn + wg sync.WaitGroup + enableExpHist bool + + samplesSQL string + timeSeriesSQL string + expHistSQL string +} + +// sample represents a single metric sample +// directly mapped to the table `samples_v4` schema +type sample struct { + env string + temporality pmetric.AggregationTemporality + metricName string + fingerprint uint64 + unixMilli int64 + value float64 +} + +// exponentialHistogramSample represents a single exponential histogram sample +// directly mapped to the table `exp_hist` schema +type exponentialHistogramSample struct { + env string + temporality pmetric.AggregationTemporality + metricName string + fingerprint uint64 + unixMilli int64 + sketch chproto.DD + count float64 + sum float64 + min float64 + max float64 +} + +// ts represents a single time series +// directly mapped to the table `time_series_v4` schema +type ts struct { + env string + temporality pmetric.AggregationTemporality + metricName string + description string + unit string + typ pmetric.MetricType + isMonotonic bool + fingerprint uint64 + unixMilli int64 + labels string + attrs map[string]string + scopeAttrs map[string]string + resourceAttrs map[string]string +} + +type writeBatch struct { + samples []sample + expHist []exponentialHistogramSample + ts []ts +} + +type ExporterOption func(e *clickhouseMetricsExporter) error + +func WithLogger(logger *zap.Logger) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.logger = logger + return nil + } +} + +func WithEnableExpHist(enableExpHist bool) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.enableExpHist = enableExpHist + return nil + } +} + +func WithCache(cache *ttlcache.Cache[string, bool]) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.cache = cache + return nil + } +} + +func WithConn(conn clickhouse.Conn) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.conn = conn + return nil + } +} + +func WithConfig(cfg *Config) ExporterOption { + return func(e *clickhouseMetricsExporter) error { + e.cfg = cfg + return nil + } +} + +func defaultOptions() []ExporterOption { + + cache := ttlcache.New[string, bool]( + ttlcache.WithTTL[string, bool](45*time.Minute), + ttlcache.WithDisableTouchOnHit[string, bool](), + ) + go cache.Start() + + return []ExporterOption{ + WithCache(cache), + WithLogger(zap.NewNop()), + WithEnableExpHist(false), + } +} + +func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter, error) { + + chExporter := &clickhouseMetricsExporter{} + + newOptions := append(defaultOptions(), opts...) + + for _, opt := range newOptions { + if err := opt(chExporter); err != nil { + return nil, err + } + } + + chExporter.samplesSQL = fmt.Sprintf(samplesSQLTmpl, chExporter.cfg.Database, chExporter.cfg.SamplesTable) + chExporter.timeSeriesSQL = fmt.Sprintf(timeSeriesSQLTmpl, chExporter.cfg.Database, chExporter.cfg.TimeSeriesTable) + chExporter.expHistSQL = fmt.Sprintf(expHistSQLTmpl, chExporter.cfg.Database, chExporter.cfg.ExpHistTable) + + return chExporter, nil +} + +func (c *clickhouseMetricsExporter) Start(ctx context.Context, host component.Host) error { + return nil +} + +func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error { + c.wg.Wait() + return c.conn.Close() +} + +// processGauge processes gauge metrics +func (c *clickhouseMetricsExporter) processGauge(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { + name := metric.Name() + desc := metric.Description() + unit := metric.Unit() + typ := metric.Type() + // gauge metrics do not have a temporality + temporality := pmetric.AggregationTemporalityUnspecified + env := "" + if de, ok := resAttrs.Get(semconv.AttributeDeploymentEnvironment); ok { + env = de.AsString() + } + // there is no monotonicity for gauge metrics + isMonotonic := false + + for i := 0; i < metric.Gauge().DataPoints().Len(); i++ { + dp := metric.Gauge().DataPoints().At(i) + var value float64 + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + value = float64(dp.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + value = dp.DoubleValue() + } + unixMilli := dp.Timestamp().AsTime().UnixMilli() + pointAttrs := dp.Attributes() + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + value: value, + }) + + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } +} + +// processSum processes sum metrics +func (c *clickhouseMetricsExporter) processSum(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { + + name := metric.Name() + desc := metric.Description() + unit := metric.Unit() + typ := metric.Type() + // sum metrics have a temporality + temporality := metric.Sum().AggregationTemporality() + env := "" + if de, ok := resAttrs.Get(semconv.AttributeDeploymentEnvironment); ok { + env = de.AsString() + } + isMonotonic := metric.Sum().IsMonotonic() + + for i := 0; i < metric.Sum().DataPoints().Len(); i++ { + dp := metric.Sum().DataPoints().At(i) + var value float64 + switch dp.ValueType() { + case pmetric.NumberDataPointValueTypeInt: + value = float64(dp.IntValue()) + case pmetric.NumberDataPointValueTypeDouble: + value = dp.DoubleValue() + } + unixMilli := dp.Timestamp().AsTime().UnixMilli() + pointAttrs := dp.Attributes() + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + value: value, + }) + + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } +} + +// processHistogram processes histogram metrics +func (c *clickhouseMetricsExporter) processHistogram(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { + + name := metric.Name() + desc := metric.Description() + unit := metric.Unit() + typ := metric.Type() + temporality := metric.Histogram().AggregationTemporality() + env := "" + if de, ok := resAttrs.Get(semconv.AttributeDeploymentEnvironment); ok { + env = de.AsString() + } + // monotonicity is assumed for histograms + isMonotonic := true + + addSample := func(batch *writeBatch, dp pmetric.HistogramDataPoint, suffix string) { + unixMilli := dp.Timestamp().AsTime().UnixMilli() + var value float64 + switch suffix { + case countSuffix: + value = float64(dp.Count()) + case sumSuffix: + value = dp.Sum() + case minSuffix: + value = dp.Min() + case maxSuffix: + value = dp.Max() + } + pointAttrs := dp.Attributes() + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: value, + }) + + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + + addBucketSample := func(batch *writeBatch, dp pmetric.HistogramDataPoint, suffix string) { + var cumulativeCount uint64 + unixMilli := dp.Timestamp().AsTime().UnixMilli() + pointAttrs := dp.Attributes() + + for i := 0; i < dp.ExplicitBounds().Len() && i < dp.BucketCounts().Len(); i++ { + bound := dp.ExplicitBounds().At(i) + cumulativeCount += dp.BucketCounts().At(i) + boundStr := strconv.FormatFloat(bound, 'f', -1, 64) + pointAttrs.PutStr("le", boundStr) + + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: float64(cumulativeCount), + }) + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + // add le=+Inf sample + pointAttrs.PutStr("le", "+Inf") + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: float64(dp.Count()), + }) + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + + for i := 0; i < metric.Histogram().DataPoints().Len(); i++ { + dp := metric.Histogram().DataPoints().At(i) + // we need to create five samples for each histogram dp + // 1. count + // 2. sum + // 3. min + // 4. max + // 5. bucket counts + addSample(batch, dp, countSuffix) + addSample(batch, dp, sumSuffix) + addSample(batch, dp, minSuffix) + addSample(batch, dp, maxSuffix) + addBucketSample(batch, dp, bucketSuffix) + } +} + +func (c *clickhouseMetricsExporter) processSummary(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { + name := metric.Name() + desc := metric.Description() + unit := metric.Unit() + typ := metric.Type() + temporality := pmetric.AggregationTemporalityUnspecified + env := "" + if de, ok := resAttrs.Get(semconv.AttributeDeploymentEnvironment); ok { + env = de.AsString() + } + // monotonicity is assumed for summaries + isMonotonic := true + + addSample := func(batch *writeBatch, dp pmetric.SummaryDataPoint, suffix string) { + unixMilli := dp.Timestamp().AsTime().UnixMilli() + var value float64 + switch suffix { + case countSuffix: + value = float64(dp.Count()) + case sumSuffix: + value = dp.Sum() + } + pointAttrs := dp.Attributes() + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: value, + }) + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + + addQuantileSample := func(batch *writeBatch, dp pmetric.SummaryDataPoint, suffix string) { + unixMilli := dp.Timestamp().AsTime().UnixMilli() + pointAttrs := dp.Attributes() + for i := 0; i < dp.QuantileValues().Len(); i++ { + quantile := dp.QuantileValues().At(i) + quantileStr := strconv.FormatFloat(quantile.Quantile(), 'f', -1, 64) + quantileValue := quantile.Value() + pointAttrs.PutStr("quantile", quantileStr) + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: quantileValue, + }) + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + } + + for i := 0; i < metric.Summary().DataPoints().Len(); i++ { + dp := metric.Summary().DataPoints().At(i) + // for summary metrics, we need to create three samples + // 1. count + // 2. sum + // 3. quantiles + addSample(batch, dp, countSuffix) + addSample(batch, dp, sumSuffix) + addQuantileSample(batch, dp, quantilesSuffix) + } +} + +func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { + if !c.enableExpHist { + c.logger.Debug("exponential histogram is not enabled") + return + } + + if metric.ExponentialHistogram().AggregationTemporality() != pmetric.AggregationTemporalityDelta { + c.logger.Warn("exponential histogram temporality is not delta", zap.String("metric_name", metric.Name()), zap.String("temporality", metric.ExponentialHistogram().AggregationTemporality().String())) + return + } + + name := metric.Name() + desc := metric.Description() + unit := metric.Unit() + typ := metric.Type() + temporality := metric.ExponentialHistogram().AggregationTemporality() + + env := "" + if de, ok := resAttrs.Get(semconv.AttributeDeploymentEnvironment); ok { + env = de.AsString() + } + + isMonotonic := true + + addSample := func(batch *writeBatch, dp pmetric.ExponentialHistogramDataPoint, suffix string) { + unixMilli := dp.Timestamp().AsTime().UnixMilli() + var value float64 + switch suffix { + case countSuffix: + value = float64(dp.Count()) + case sumSuffix: + value = dp.Sum() + case minSuffix: + value = dp.Min() + case maxSuffix: + value = dp.Max() + } + pointAttrs := dp.Attributes() + batch.samples = append(batch.samples, sample{ + env: env, + temporality: temporality, + metricName: name + suffix, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + value: value, + }) + + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name + suffix, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name+suffix), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name+suffix)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + + toStore := func(buckets pmetric.ExponentialHistogramDataPointBuckets) *chproto.Store { + bincounts := make([]float64, 0, buckets.BucketCounts().Len()) + for _, bucket := range buckets.BucketCounts().AsRaw() { + bincounts = append(bincounts, float64(bucket)) + } + + store := &chproto.Store{ + ContiguousBinIndexOffset: int32(buckets.Offset()), + ContiguousBinCounts: bincounts, + } + return store + } + + addDDSketchSample := func(batch *writeBatch, dp pmetric.ExponentialHistogramDataPoint) { + unixMilli := dp.Timestamp().AsTime().UnixMilli() + pointAttrs := dp.Attributes() + positive := toStore(dp.Positive()) + negative := toStore(dp.Negative()) + gamma := math.Pow(2, math.Pow(2, float64(-dp.Scale()))) + dd := chproto.DD{ + Mapping: &chproto.IndexMapping{Gamma: gamma}, + PositiveValues: positive, + NegativeValues: negative, + ZeroCount: float64(dp.ZeroCount()), + } + batch.expHist = append(batch.expHist, exponentialHistogramSample{ + env: env, + temporality: temporality, + metricName: name, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + sketch: dd, + count: float64(dp.Count()), + sum: dp.Sum(), + min: dp.Min(), + max: dp.Max(), + }) + + batch.ts = append(batch.ts, ts{ + env: env, + temporality: temporality, + metricName: name, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + fingerprint: Fingerprint(pointAttrs, scopeAttrs, resAttrs, name), + unixMilli: unixMilli, + labels: getJSONString(getAllLabels(pointAttrs, scopeAttrs, resAttrs, name)), + attrs: getAttrMap(pointAttrs), + scopeAttrs: getAttrMap(scopeAttrs), + resourceAttrs: getAttrMap(resAttrs), + }) + } + + for i := 0; i < metric.ExponentialHistogram().DataPoints().Len(); i++ { + dp := metric.ExponentialHistogram().DataPoints().At(i) + // we need to create five samples for each exponential histogram dp + // 1. count + // 2. sum + // 3. min + // 4. max + // 5. ddsketch + addSample(batch, dp, countSuffix) + addSample(batch, dp, sumSuffix) + addSample(batch, dp, minSuffix) + addSample(batch, dp, maxSuffix) + addDDSketchSample(batch, dp) + } + +} + +func (c *clickhouseMetricsExporter) prepareBatch(md pmetric.Metrics) *writeBatch { + batch := &writeBatch{} + for i := 0; i < md.ResourceMetrics().Len(); i++ { + rm := md.ResourceMetrics().At(i) + resAttrs := rm.Resource().Attributes() + resAttrs.PutStr("__resource.schema_url__", rm.SchemaUrl()) + for j := 0; j < rm.ScopeMetrics().Len(); j++ { + sm := rm.ScopeMetrics().At(j) + scopeAttrs := sm.Scope().Attributes() + scopeAttrs.PutStr("__scope.name__", sm.Scope().Name()) + scopeAttrs.PutStr("__scope.version__", sm.Scope().Version()) + scopeAttrs.PutStr("__scope.schema_url__", sm.SchemaUrl()) + for k := 0; k < sm.Metrics().Len(); k++ { + metric := sm.Metrics().At(k) + switch metric.Type() { + case pmetric.MetricTypeGauge: + c.processGauge(batch, metric, resAttrs, scopeAttrs) + case pmetric.MetricTypeSum: + c.processSum(batch, metric, resAttrs, scopeAttrs) + case pmetric.MetricTypeHistogram: + c.processHistogram(batch, metric, resAttrs, scopeAttrs) + case pmetric.MetricTypeSummary: + c.processSummary(batch, metric, resAttrs, scopeAttrs) + case pmetric.MetricTypeExponentialHistogram: + c.processExponentialHistogram(batch, metric, resAttrs, scopeAttrs) + case pmetric.MetricTypeEmpty: + c.logger.Warn("metric type is set to empty", zap.String("metric_name", metric.Name()), zap.String("metric_type", metric.Type().String())) + default: + c.logger.Warn("unknown metric type", zap.String("metric_name", metric.Name()), zap.String("metric_type", metric.Type().String())) + } + } + } + } + return batch +} + +func (c *clickhouseMetricsExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) error { + c.wg.Add(1) + defer c.wg.Done() + return c.writeBatch(ctx, c.prepareBatch(md)) +} + +func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *writeBatch) error { + + writeTimeSeries := func(ctx context.Context, timeSeries []ts) error { + statement, err := c.conn.PrepareBatch(ctx, c.timeSeriesSQL, driver.WithReleaseConnection()) + if err != nil { + return err + } + for _, ts := range timeSeries { + roundedUnixMilli := ts.unixMilli / 3600000 * 3600000 + cacheKey := makeCacheKey(ts.fingerprint, uint64(roundedUnixMilli)) + if c.cache.Get(cacheKey) != nil && c.cache.Get(cacheKey).Value() { + continue + } + err = statement.Append( + ts.env, + ts.temporality.String(), + ts.metricName, + ts.description, + ts.unit, + ts.typ.String(), + ts.isMonotonic, + ts.fingerprint, + roundedUnixMilli, + ts.labels, + ts.attrs, + ts.scopeAttrs, + ts.resourceAttrs, + ) + if err != nil { + return err + } + c.cache.Set(cacheKey, true, ttlcache.DefaultTTL) + } + return statement.Send() + } + + if err := writeTimeSeries(ctx, batch.ts); err != nil { + return err + } + + writeSamples := func(ctx context.Context, samples []sample) error { + statement, err := c.conn.PrepareBatch(ctx, c.samplesSQL, driver.WithReleaseConnection()) + if err != nil { + return err + } + for _, sample := range samples { + err = statement.Append( + sample.env, + sample.temporality.String(), + sample.metricName, + sample.fingerprint, + sample.unixMilli, + sample.value, + ) + if err != nil { + return err + } + } + return statement.Send() + } + + if err := writeSamples(ctx, batch.samples); err != nil { + return err + } + + writeExpHist := func(ctx context.Context, expHist []exponentialHistogramSample) error { + statement, err := c.conn.PrepareBatch(ctx, c.expHistSQL, driver.WithReleaseConnection()) + if err != nil { + return err + } + for _, expHist := range expHist { + err = statement.Append( + expHist.env, + expHist.temporality.String(), + expHist.metricName, + expHist.fingerprint, + expHist.unixMilli, + expHist.count, + expHist.sum, + expHist.min, + expHist.max, + expHist.sketch, + ) + if err != nil { + return err + } + } + return statement.Send() + } + + if err := writeExpHist(ctx, batch.expHist); err != nil { + return err + } + + return nil +} diff --git a/exporter/clickhousemetricsexporterv2/exporter_test.go b/exporter/clickhousemetricsexporterv2/exporter_test.go new file mode 100644 index 00000000..c8089439 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/exporter_test.go @@ -0,0 +1,374 @@ +package clickhousemetricsexporterv2 + +import ( + "fmt" + "strconv" + "testing" + + "github.com/zeebo/assert" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +func Test_prepareBatchGauge(t *testing.T) { + metrics := generateGaugeMetrics(1, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{} + batch := exp.prepareBatch(metrics) + assert.NotNil(t, batch) + expectedSamples := []sample{ + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "system.memory.usage0", + unixMilli: 1727286182000, + value: 0, + }, + } + assert.Equal(t, len(expectedSamples), len(batch.samples)) + + for idx, sample := range expectedSamples { + curSample := batch.samples[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.value, curSample.value) + } + + expectedTs := []ts{ + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "system.memory.usage0", + description: "memory usage of the host", + unit: "bytes", + typ: pmetric.MetricTypeGauge, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"system.memory.usage0\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"gauge.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"gauge.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + } + assert.Equal(t, len(expectedTs), len(batch.ts)) + + for idx, ts := range expectedTs { + currentTs := batch.ts[idx] + + assert.Equal(t, ts.env, currentTs.env) + assert.Equal(t, ts.temporality, currentTs.temporality) + assert.Equal(t, ts.metricName, currentTs.metricName) + assert.Equal(t, ts.description, currentTs.description) + assert.Equal(t, ts.unit, currentTs.unit) + assert.Equal(t, ts.typ, currentTs.typ) + assert.Equal(t, ts.isMonotonic, currentTs.isMonotonic) + assert.Equal(t, ts.unixMilli, currentTs.unixMilli) + assert.Equal(t, ts.labels, currentTs.labels) + assert.Equal(t, ts.attrs, currentTs.attrs) + assert.Equal(t, ts.scopeAttrs, currentTs.scopeAttrs) + assert.Equal(t, ts.resourceAttrs, currentTs.resourceAttrs) + } +} + +func Test_prepareBatchSum(t *testing.T) { + metrics := generateSumMetrics(1, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{} + batch := exp.prepareBatch(metrics) + assert.NotNil(t, batch) + expectedSamples := []sample{ + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "system.cpu.time0", + unixMilli: 1727286182000, + value: 0, + }, + } + assert.Equal(t, len(expectedSamples), len(batch.samples)) + + for idx, sample := range expectedSamples { + curSample := batch.samples[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.value, curSample.value) + } + + expectedTs := []ts{ + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "system.cpu.time0", + description: "cpu time of the host", + unit: "s", + typ: pmetric.MetricTypeSum, + isMonotonic: true, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"system.cpu.time0\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\",\"sum.attr_0\":\"1\"}", + attrs: map[string]string{"sum.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + } + assert.Equal(t, len(expectedTs), len(batch.ts)) + + for idx, ts := range expectedTs { + currentTs := batch.ts[idx] + + assert.Equal(t, ts.env, currentTs.env) + assert.Equal(t, ts.temporality, currentTs.temporality) + assert.Equal(t, ts.metricName, currentTs.metricName) + assert.Equal(t, ts.description, currentTs.description) + assert.Equal(t, ts.unit, currentTs.unit) + assert.Equal(t, ts.typ, currentTs.typ) + assert.Equal(t, ts.isMonotonic, currentTs.isMonotonic) + assert.Equal(t, ts.unixMilli, currentTs.unixMilli) + assert.Equal(t, ts.labels, currentTs.labels) + assert.Equal(t, ts.attrs, currentTs.attrs) + assert.Equal(t, ts.scopeAttrs, currentTs.scopeAttrs) + assert.Equal(t, ts.resourceAttrs, currentTs.resourceAttrs) + } +} + +func Test_prepareBatchHistogram(t *testing.T) { + metrics := generateHistogramMetrics(1, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{} + batch := exp.prepareBatch(metrics) + assert.NotNil(t, batch) + // there should be 4 (count, sum, min, max) + 20 (for each bucket) + 1 (for the inf bucket) = 25 samples + expectedSamples := []sample{ + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.count", + unixMilli: 1727286182000, + value: 30, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.sum", + unixMilli: 1727286182000, + value: 35, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.min", + unixMilli: 1727286182000, + value: 0, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.max", + unixMilli: 1727286182000, + value: 12, + }, + } + cumulativeCount := 0 + // 20 buckets + for i := 0; i < 20; i++ { + cumulativeCount += 1 + if i == 5 || i == 12 { + cumulativeCount += i - 1 + } + expectedSamples = append(expectedSamples, sample{ + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.bucket", + unixMilli: 1727286182000, + value: float64(cumulativeCount), + }) + } + + // 1 for the inf bucket + expectedSamples = append(expectedSamples, sample{ + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.bucket", + unixMilli: 1727286182000, + value: 30, + }) + + assert.Equal(t, len(expectedSamples), len(batch.samples)) + + for idx, sample := range expectedSamples { + curSample := batch.samples[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.value, curSample.value) + } + + // 4 ts for count, sum, min, max + expectedTs := []ts{ + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.count", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"http.server.duration0.count\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"histogram.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.sum", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"http.server.duration0.sum\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"histogram.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.min", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"http.server.duration0.min\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"histogram.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.max", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"http.server.duration0.max\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"histogram.attr_0": "1"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }, + } + + // 20 buckets, one separate ts for each bucket + for i := 0; i < 20; i++ { + expectedTs = append(expectedTs, ts{ + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.bucket", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: fmt.Sprintf("{\"__name__\":\"http.server.duration0.bucket\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"le\":\"%d\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", i), + attrs: map[string]string{"histogram.attr_0": "1", "le": strconv.Itoa(i)}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }) + } + + // add le=+Inf sample + expectedTs = append(expectedTs, ts{ + env: "", + temporality: pmetric.AggregationTemporalityCumulative, + metricName: "http.server.duration0.bucket", + description: "server duration of the http server", + unit: "ms", + typ: pmetric.MetricTypeHistogram, + isMonotonic: false, + unixMilli: 1727286182000, + labels: "{\"__name__\":\"http.server.duration0.bucket\",\"__resource.schema_url__\":\"resource.schema_url\",\"__scope.name__\":\"go.signoz.io/app/reader\",\"__scope.schema_url__\":\"scope.schema_url\",\"__scope.version__\":\"1.0.0\",\"le\":\"+Inf\",\"histogram.attr_0\":\"1\",\"resource.attr_0\":\"value0\",\"scope.attr_0\":\"value0\"}", + attrs: map[string]string{"histogram.attr_0": "1", "le": "+Inf"}, + scopeAttrs: map[string]string{"__scope.name__": "go.signoz.io/app/reader", "__scope.schema_url__": "scope.schema_url", "__scope.version__": "1.0.0", "scope.attr_0": "value0"}, + resourceAttrs: map[string]string{"__resource.schema_url__": "resource.schema_url", "resource.attr_0": "value0"}, + }) + + for idx, ts := range expectedTs { + currentTs := batch.ts[idx] + + assert.Equal(t, ts.env, currentTs.env) + assert.Equal(t, ts.temporality, currentTs.temporality) + assert.Equal(t, ts.metricName, currentTs.metricName) + assert.Equal(t, ts.description, currentTs.description) + assert.Equal(t, ts.unit, currentTs.unit) + assert.Equal(t, ts.typ, currentTs.typ) + } +} + +func Test_prepareBatchExponentialHistogram(t *testing.T) { + metrics := generateExponentialHistogramMetrics(1, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{} + batch := exp.prepareBatch(metrics) + assert.NotNil(t, batch) +} + +func Test_prepareBatchSummary(t *testing.T) { + metrics := generateSummaryMetrics(1, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{} + batch := exp.prepareBatch(metrics) + assert.NotNil(t, batch) +} + +func Benchmark_prepareBatchGauge(b *testing.B) { + metrics := generateGaugeMetrics(100000, 10, 10, 10, 10) + b.ResetTimer() + b.ReportAllocs() + exp := &clickhouseMetricsExporter{} + for i := 0; i < b.N; i++ { + exp.prepareBatch(metrics) + } +} + +func Benchmark_prepareBatchSum(b *testing.B) { + metrics := generateSumMetrics(100000, 10, 10, 10, 10) + b.ResetTimer() + b.ReportAllocs() + exp := &clickhouseMetricsExporter{} + for i := 0; i < b.N; i++ { + exp.prepareBatch(metrics) + } +} + +func Benchmark_prepareBatchHistogram(b *testing.B) { + metrics := generateHistogramMetrics(10000, 10, 10, 10, 10) + b.ResetTimer() + b.ReportAllocs() + exp := &clickhouseMetricsExporter{} + for i := 0; i < b.N; i++ { + exp.prepareBatch(metrics) + } +} + +func Benchmark_prepareBatchExponentialHistogram(b *testing.B) { + metrics := generateExponentialHistogramMetrics(10000, 10, 10, 10, 10) + b.ResetTimer() + b.ReportAllocs() + exp := &clickhouseMetricsExporter{} + for i := 0; i < b.N; i++ { + exp.prepareBatch(metrics) + } +} + +func Benchmark_prepareBatchSummary(b *testing.B) { + metrics := generateSummaryMetrics(10000, 10, 10, 10, 10) + b.ResetTimer() + b.ReportAllocs() + exp := &clickhouseMetricsExporter{} + for i := 0; i < b.N; i++ { + exp.prepareBatch(metrics) + } +} diff --git a/exporter/clickhousemetricsexporterv2/factory.go b/exporter/clickhousemetricsexporterv2/factory.go new file mode 100644 index 00000000..b351c0bb --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/factory.go @@ -0,0 +1,106 @@ +package clickhousemetricsexporterv2 + +import ( + "context" + "errors" + + "github.com/ClickHouse/clickhouse-go/v2" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configretry" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" +) + +const ( + // The value of "type" key in configuration. + typeStr = "clickhousemetricswritev2" +) + +var ( + writeLatencyMillis = stats.Int64("exporter_db_write_latency", "Time taken (in millis) for exporter to write batch", "ms") + exporterKey = tag.MustNewKey("exporter") + tableKey = tag.MustNewKey("table") +) + +// NewFactory creates a new ClickHouse Metrics exporter. +func NewFactory() exporter.Factory { + + writeLatencyDistribution := view.Distribution(100, 500, 750, 1000, 1500, 2000, 2500, 3000, 4000, 8000, 16000, 32000, 64000) + writeLatencyView := &view.View{ + Name: "exporter_db_write_latency", + Measure: writeLatencyMillis, + Description: writeLatencyMillis.Description(), + TagKeys: []tag.Key{exporterKey, tableKey}, + Aggregation: writeLatencyDistribution, + } + + view.Register(writeLatencyView) + return exporter.NewFactory( + component.MustNewType(typeStr), + createDefaultConfig, + exporter.WithMetrics(createMetricsExporter, component.StabilityLevelUndefined)) +} + +func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, + cfg component.Config) (exporter.Metrics, error) { + + chCfg, ok := cfg.(*Config) + if !ok { + return nil, errors.New("invalid configuration") + } + + connOptions, err := clickhouse.ParseDSN(chCfg.DSN) + if err != nil { + return nil, err + } + + conn, err := clickhouse.Open(connOptions) + if err != nil { + return nil, err + } + + chExporter, err := NewClickHouseExporter( + WithConfig(chCfg), + WithConn(conn), + WithLogger(set.Logger), + WithEnableExpHist(chCfg.EnableExpHist), + ) + if err != nil { + return nil, err + } + + exporter, err := exporterhelper.NewMetricsExporter( + ctx, + set, + cfg, + chExporter.PushMetrics, + exporterhelper.WithTimeout(chCfg.TimeoutSettings), + exporterhelper.WithQueue(chCfg.QueueSettings), + exporterhelper.WithRetry(chCfg.BackOffConfig), + exporterhelper.WithStart(chExporter.Start), + exporterhelper.WithShutdown(chExporter.Shutdown), + ) + + if err != nil { + return nil, err + } + + return exporter, nil +} + +func createDefaultConfig() component.Config { + return &Config{ + TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + BackOffConfig: configretry.NewDefaultBackOffConfig(), + QueueSettings: exporterhelper.NewDefaultQueueSettings(), + DSN: "tcp://localhost:9000", + EnableExpHist: false, + Database: "signoz_metrics", + SamplesTable: "distributed_samples_v4", + TimeSeriesTable: "distributed_time_series_v4", + ExpHistTable: "distributed_exp_hist_v4", + } +} diff --git a/exporter/clickhousemetricsexporterv2/factory_test.go b/exporter/clickhousemetricsexporterv2/factory_test.go new file mode 100644 index 00000000..ffa2c564 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/factory_test.go @@ -0,0 +1 @@ +package clickhousemetricsexporterv2 diff --git a/exporter/clickhousemetricsexporterv2/fingerprint.go b/exporter/clickhousemetricsexporterv2/fingerprint.go new file mode 100644 index 00000000..da622922 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/fingerprint.go @@ -0,0 +1,64 @@ +package clickhousemetricsexporterv2 + +import "go.opentelemetry.io/collector/pdata/pcommon" + +const ( + offset64 uint64 = 14695981039346656037 + prime64 uint64 = 1099511628211 + separatorByte byte = 255 +) + +// hashAdd adds a string to a fnv64a hash value, returning the updated hash. +func hashAdd(h uint64, s string) uint64 { + for i := 0; i < len(s); i++ { + h ^= uint64(s[i]) + h *= prime64 + } + return h +} + +// hashAddByte adds a byte to a fnv64a hash value, returning the updated hash. +func hashAddByte(h uint64, b byte) uint64 { + h ^= uint64(b) + h *= prime64 + return h +} + +func Fingerprint(attrs pcommon.Map, scopeAttrs pcommon.Map, resAttrs pcommon.Map, name string) uint64 { + if attrs.Len() == 0 && resAttrs.Len() == 0 && scopeAttrs.Len() == 0 { + return offset64 + } + + sum := offset64 + + resAttrs.Range(func(k string, v pcommon.Value) bool { + sum = hashAdd(sum, k) + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, v.AsString()) + sum = hashAddByte(sum, separatorByte) + return true + }) + + scopeAttrs.Range(func(k string, v pcommon.Value) bool { + sum = hashAdd(sum, k) + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, v.AsString()) + sum = hashAddByte(sum, separatorByte) + return true + }) + + attrs.Range(func(k string, v pcommon.Value) bool { + sum = hashAdd(sum, k) + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, v.AsString()) + sum = hashAddByte(sum, separatorByte) + return true + }) + + // add special __name__=name + sum = hashAdd(sum, "__name__") + sum = hashAddByte(sum, separatorByte) + sum = hashAdd(sum, name) + + return sum +} diff --git a/exporter/clickhousemetricsexporterv2/helper.go b/exporter/clickhousemetricsexporterv2/helper.go new file mode 100644 index 00000000..fe528a92 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/helper.go @@ -0,0 +1,109 @@ +package clickhousemetricsexporterv2 + +import ( + "sort" + "strconv" + "strings" + + "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter/utils/gofuzz" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func getAllLabels(attrs pcommon.Map, scopeAttrs pcommon.Map, resAttrs pcommon.Map, name string) map[string]string { + labels := make(map[string]string) + attrs.Range(func(k string, v pcommon.Value) bool { + labels[k] = v.AsString() + return true + }) + + scopeAttrs.Range(func(k string, v pcommon.Value) bool { + labels[k] = v.AsString() + return true + }) + + resAttrs.Range(func(k string, v pcommon.Value) bool { + labels[k] = v.AsString() + return true + }) + labels["__name__"] = name + return labels +} + +func getJSONString(attrs map[string]string) string { + return string(marshalLabels(attrs, make([]byte, 0, 128))) +} + +func getAttrMap(attrs pcommon.Map) map[string]string { + attrMap := make(map[string]string) + attrs.Range(func(k string, v pcommon.Value) bool { + attrMap[k] = v.AsString() + return true + }) + return attrMap +} + +// marshalLabels marshals attributes into JSON, appending it to b. +// It preserves an order. It is also significantly faster then json.Marshal. +// It is compatible with ClickHouse JSON functions: https://clickhouse.yandex/docs/en/functions/json_functions.html +func marshalLabels(attrs map[string]string, b []byte) []byte { + // make sure that the attrs are sorted + keys := make([]string, 0, len(attrs)) + for k := range attrs { + keys = append(keys, k) + } + sort.SliceStable(keys, func(i, j int) bool { + return keys[i] < keys[j] + }) + + if len(attrs) == 0 { + return append(b, '{', '}') + } + + b = append(b, '{') + for _, name := range keys { + value := attrs[name] + // add label name which can't contain runes that should be escaped + b = append(b, '"') + b = append(b, name...) + b = append(b, '"', ':', '"') + + // add label value while escaping some runes + for _, c := range []byte(value) { + switch c { + case '\\', '"': + b = append(b, '\\', c) + case '\n': + b = append(b, '\\', 'n') + case '\r': + b = append(b, '\\', 'r') + case '\t': + b = append(b, '\\', 't') + default: + b = append(b, c) + } + } + + b = append(b, '"', ',') + } + b[len(b)-1] = '}' // replace last comma + + gofuzz.AddToCorpus("json", b) + return b +} + +func makeCacheKey(a, b uint64) string { + // Pre-allocate a builder with an estimated capacity + var builder strings.Builder + builder.Grow(40) // Max length: 20 digits for each uint64 + 1 for the colon + + // Convert and write the first uint64 + builder.WriteString(strconv.FormatUint(a, 10)) + + // Write the separator + builder.WriteByte(':') + + // Convert and write the second uint64 + builder.WriteString(strconv.FormatUint(b, 10)) + + return builder.String() +} diff --git a/exporter/clickhousemetricsexporterv2/test_data.go b/exporter/clickhousemetricsexporterv2/test_data.go new file mode 100644 index 00000000..1466def3 --- /dev/null +++ b/exporter/clickhousemetricsexporterv2/test_data.go @@ -0,0 +1,267 @@ +package clickhousemetricsexporterv2 + +import ( + "strconv" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// generateGaugeMetrics generates a set of gauge metrics with +// the given number of metrics, data points, point attributes, +// scope attributes, and resource attributes. +// the metrics will be named as system.memory.usage0, system.memory.usage1, etc. +// the data points will have the value as the index of the data point +// the point attributes will be named as gauge.attr0, gauge.attr1, etc. +// the scope attributes will be named as scope.attr0, scope.attr1, etc. +// the resource attributes will be named as resource.attr0, resource.attr1, etc. +func generateGaugeMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeAttributes, numResourceAttributes int) pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + for i := 0; i < numResourceAttributes; i++ { + rm.Resource().Attributes().PutStr("resource.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + rm.SetSchemaUrl("resource.schema_url") + sm := rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("scope.schema_url") + for i := 0; i < numScopeAttributes; i++ { + sm.Scope().Attributes().PutStr("scope.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + sm.Scope().SetName("go.signoz.io/app/reader") + sm.Scope().SetVersion("1.0.0") + for i := 0; i < numMetrics; i++ { + m := sm.Metrics().AppendEmpty() + m.SetName("system.memory.usage" + strconv.Itoa(i)) + m.SetUnit("bytes") + m.SetDescription("memory usage of the host") + dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + for j := 0; j < numDataPoints; j++ { + dp.SetIntValue(int64(i)) + dp.SetFlags(pmetric.DefaultDataPointFlags) + for k := 0; k < numPointAttributes; k++ { + dp.Attributes().PutStr("gauge.attr_"+strconv.Itoa(k), "1") + } + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182, 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182, 0))) + } + } + return metrics +} + +// generateSumMetrics generates a set of sum metrics with +// the given number of metrics, data points, point attributes, +// scope attributes, and resource attributes. +// the metrics will be named as system.cpu.time0, system.cpu.time1, etc. +// the data points will have the value as the index of the data point +// the point attributes will be named as sum.attr0, sum.attr1, etc. +// the scope attributes will be named as scope.attr0, scope.attr1, etc. +// the resource attributes will be named as resource.attr0, resource.attr1, etc. +// the sum metrics will be cumulative or delta based on the index of the metric +// for even metrics i.e system.cpu.time0, system.cpu.time2, etc will be cumulative +// for odd metrics i.e system.cpu.time1, system.cpu.time3, etc will be delta +// the sum metrics will be monotonic or not based on the index of the metric +// for even metrics i.e system.cpu.time0, system.cpu.time2, etc will be monotonic +// for odd metrics i.e system.cpu.time1, system.cpu.time3, etc will be not monotonic +func generateSumMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeAttributes, numResourceAttributes int) pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + for i := 0; i < numResourceAttributes; i++ { + rm.Resource().Attributes().PutStr("resource.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + rm.SetSchemaUrl("resource.schema_url") + sm := rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("scope.schema_url") + for i := 0; i < numScopeAttributes; i++ { + sm.Scope().Attributes().PutStr("scope.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + sm.Scope().SetName("go.signoz.io/app/reader") + sm.Scope().SetVersion("1.0.0") + timestamp := time.Unix(1727286182, 0) + for i := 0; i < numMetrics; i++ { + m := sm.Metrics().AppendEmpty() + m.SetName("system.cpu.time" + strconv.Itoa(i)) + m.SetUnit("s") + m.SetDescription("cpu time of the host") + dp := m.SetEmptySum().DataPoints().AppendEmpty() + if i%2 == 0 { + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + if i%3 == 0 { + m.Sum().SetIsMonotonic(true) + } else { + m.Sum().SetIsMonotonic(false) + } + for j := 0; j < numDataPoints; j++ { + dp.SetIntValue(int64(i)) + dp.SetFlags(pmetric.DefaultDataPointFlags) + for k := 0; k < numPointAttributes; k++ { + dp.Attributes().PutStr("sum.attr_"+strconv.Itoa(k), "1") + } + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + } + } + return metrics +} + +// generateHistogramMetrics generates a set of histogram metrics with +// the given number of metrics, data points, point attributes, +// scope attributes, and resource attributes. +// the metrics will be named as http.server.duration0, http.server.duration1, etc. +// the data points will have the value as the index of the data point +// the point attributes will be named as histogram.attr0, histogram.attr1, etc. +// the scope attributes will be named as scope.attr0, scope.attr1, etc. +// the resource attributes will be named as resource.attr0, resource.attr1, etc. +// the histogram metrics will be cumulative or delta based on the index of the metric +// for even metrics i.e http.server.duration0, http.server.duration2, etc will be cumulative +// for odd metrics i.e http.server.duration1, http.server.duration3, etc will be delta +// the default number of buckets is 20 +// the default bucket counts are `1, 1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, 12, 1, 1, 1, 1, 1, 1, 1` +// the default explicit bounds are `0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19` +func generateHistogramMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeAttributes, numResourceAttributes int) pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + for i := 0; i < numResourceAttributes; i++ { + rm.Resource().Attributes().PutStr("resource.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + rm.SetSchemaUrl("resource.schema_url") + sm := rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("scope.schema_url") + for i := 0; i < numScopeAttributes; i++ { + sm.Scope().Attributes().PutStr("scope.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + sm.Scope().SetName("go.signoz.io/app/reader") + sm.Scope().SetVersion("1.0.0") + timestamp := time.Unix(1727286182, 0) + for i := 0; i < numMetrics; i++ { + m := sm.Metrics().AppendEmpty() + m.SetName("http.server.duration" + strconv.Itoa(i)) + m.SetUnit("ms") + m.SetDescription("server duration of the http server") + dp := m.SetEmptyHistogram().DataPoints().AppendEmpty() + if i%2 == 0 { + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + for j := 0; j < numDataPoints; j++ { + dp.SetCount(30) + dp.SetSum(35) + for k := 0; k < numPointAttributes; k++ { + dp.Attributes().PutStr("histogram.attr_"+strconv.Itoa(k), "1") + } + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.ExplicitBounds().FromRaw([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}) + dp.BucketCounts().FromRaw([]uint64{1, 1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, 12, 1, 1, 1, 1, 1, 1, 1}) + dp.SetMin(0) + dp.SetMax(12) + } + } + return metrics +} + +// generateExponentialHistogramMetrics generates a set of exponential histogram metrics with +// the given number of metrics, data points, point attributes, +// scope attributes, and resource attributes. +// the metrics will be named as http.server.duration0, http.server.duration1, etc. +// the data points will have the value as the index of the data point +// the point attributes will be named as exponential.histogram.attr0, exponential.histogram.attr1, etc. +// the scope attributes will be named as scope.attr0, scope.attr1, etc. +// the resource attributes will be named as resource.attr0, resource.attr1, etc. +// the exponential histogram metrics will be cumulative or delta based on the index of the metric +// for even metrics i.e http.server.duration0, http.server.duration2, etc will be cumulative +// for odd metrics i.e http.server.duration1, http.server.duration3, etc will be delta +func generateExponentialHistogramMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeAttributes, numResourceAttributes int) pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + for i := 0; i < numResourceAttributes; i++ { + rm.Resource().Attributes().PutStr("resource.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + rm.SetSchemaUrl("resource.schema_url") + sm := rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("scope.schema_url") + for i := 0; i < numScopeAttributes; i++ { + sm.Scope().Attributes().PutStr("scope.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + sm.Scope().SetName("go.signoz.io/app/reader") + sm.Scope().SetVersion("1.0.0") + timestamp := time.Unix(1727286182, 0) + for i := 0; i < numMetrics; i++ { + m := sm.Metrics().AppendEmpty() + m.SetName("http.server.duration" + strconv.Itoa(i)) + m.SetUnit("ms") + m.SetDescription("server duration of the http server but in exponential histogram format") + dp := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + if i%2 == 0 { + m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + } else { + m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) + } + for j := 0; j < numDataPoints; j++ { + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetSum(1) + dp.SetMin(0) + dp.SetMax(1) + dp.SetZeroCount(0) + dp.SetCount(1) + for k := 0; k < numPointAttributes; k++ { + dp.Attributes().PutStr("exponential.histogram.attr_"+strconv.Itoa(k), "1") + } + dp.Negative().SetOffset(1) + dp.Negative().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 11, 1, 1, 1, 1, 10}) + dp.Positive().SetOffset(1) + dp.Positive().BucketCounts().FromRaw([]uint64{0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 11, 1, 1, 1, 1, 10}) + } + } + return metrics +} + +// generateSummaryMetrics generates a set of summary metrics with +// the given number of metrics, data points, point attributes, +// scope attributes, and resource attributes. +// the metrics will be named as zk.duration0, zk.duration1, etc. +// the data points will have the value as the index of the data point +// the point attributes will be named as summary.attr0, summary.attr1, etc. +// the scope attributes will be named as scope.attr0, scope.attr1, etc. +// the resource attributes will be named as resource.attr0, resource.attr1, etc. +func generateSummaryMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeAttributes, numResourceAttributes int) pmetric.Metrics { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + for i := 0; i < numResourceAttributes; i++ { + rm.Resource().Attributes().PutStr("resource.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + rm.SetSchemaUrl("resource.schema_url") + sm := rm.ScopeMetrics().AppendEmpty() + sm.SetSchemaUrl("scope.schema_url") + for i := 0; i < numScopeAttributes; i++ { + sm.Scope().Attributes().PutStr("scope.attr_"+strconv.Itoa(i), "value"+strconv.Itoa(i)) + } + sm.Scope().SetName("go.signoz.io/app/reader") + sm.Scope().SetVersion("1.0.0") + timestamp := time.Unix(1727286182, 0) + for i := 0; i < numMetrics; i++ { + m := sm.Metrics().AppendEmpty() + m.SetName("zk.duration" + strconv.Itoa(i)) + m.SetUnit("ms") + m.SetDescription("This is a summary metrics") + dp := m.SetEmptySummary().DataPoints().AppendEmpty() + for j := 0; j < numDataPoints; j++ { + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetCount(1) + dp.SetSum(1) + for k := 0; k < numPointAttributes; k++ { + dp.Attributes().PutStr("summary.attr_"+strconv.Itoa(k), "1") + } + quantileValues := dp.QuantileValues().AppendEmpty() + quantileValues.SetValue(1) + quantileValues.SetQuantile(1) + } + } + return metrics +} From dbb660db02a39d30193d1e6acc976225502c7711 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Fri, 27 Sep 2024 15:26:16 +0530 Subject: [PATCH 2/3] Handle remaining types and add more metrics --- components/components.go | 2 + .../clickhousemetricsexporterv2/config.go | 1 + .../config_test.go | 20 +++ .../clickhousemetricsexporterv2/exporter.go | 132 ++++++++++++++- .../exporter_test.go | 151 +++++++++++++++++- .../clickhousemetricsexporterv2/factory.go | 3 +- .../factory_test.go | 12 ++ .../clickhousemetricsexporterv2/test_data.go | 49 +++--- go.mod | 4 +- 9 files changed, 337 insertions(+), 37 deletions(-) diff --git a/components/components.go b/components/components.go index e301afe3..045f3850 100644 --- a/components/components.go +++ b/components/components.go @@ -115,6 +115,7 @@ import ( "github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter" + "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporterv2" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter" "github.com/SigNoz/signoz-otel-collector/exporter/signozkafkaexporter" signozhealthcheckextension "github.com/SigNoz/signoz-otel-collector/extension/healthcheckextension" @@ -219,6 +220,7 @@ func Components() (otelcol.Factories, error) { awss3exporter.NewFactory(), carbonexporter.NewFactory(), clickhousemetricsexporter.NewFactory(), + clickhousemetricsexporterv2.NewFactory(), clickhousetracesexporter.NewFactory(), clickhouselogsexporter.NewFactory(), fileexporter.NewFactory(), diff --git a/exporter/clickhousemetricsexporterv2/config.go b/exporter/clickhousemetricsexporterv2/config.go index 1d744b1c..8dcedab7 100644 --- a/exporter/clickhousemetricsexporterv2/config.go +++ b/exporter/clickhousemetricsexporterv2/config.go @@ -22,6 +22,7 @@ type Config struct { SamplesTable string TimeSeriesTable string ExpHistTable string + MetadataTable string } var _ component.Config = (*Config)(nil) diff --git a/exporter/clickhousemetricsexporterv2/config_test.go b/exporter/clickhousemetricsexporterv2/config_test.go index ffa2c564..ffe43835 100644 --- a/exporter/clickhousemetricsexporterv2/config_test.go +++ b/exporter/clickhousemetricsexporterv2/config_test.go @@ -1 +1,21 @@ package clickhousemetricsexporterv2 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConfig_Validate(t *testing.T) { + cfg := &Config{} + err := cfg.Validate() + require.Error(t, err) +} + +func TestConfig_Validate_Valid(t *testing.T) { + cfg := &Config{ + DSN: "tcp://localhost:9000?database=default", + } + err := cfg.Validate() + require.NoError(t, err) +} diff --git a/exporter/clickhousemetricsexporterv2/exporter.go b/exporter/clickhousemetricsexporterv2/exporter.go index 53503fad..2bbda77e 100644 --- a/exporter/clickhousemetricsexporterv2/exporter.go +++ b/exporter/clickhousemetricsexporterv2/exporter.go @@ -20,6 +20,9 @@ import ( ) var ( + resourceAttrType = "resource" + scopeAttrType = "scope" + pointAttrType = "point" countSuffix = ".count" sumSuffix = ".sum" minSuffix = ".min" @@ -29,6 +32,7 @@ var ( samplesSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, value) VALUES (?, ?, ?, ?, ?, ?)" timeSeriesSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, description, unit, type, is_monotonic, fingerprint, unix_milli, labels, attrs, scope_attrs, resource_attrs) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" expHistSQLTmpl = "INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, unix_milli, count, sum, min, max, sketch) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + metadataSQLTmpl = "INSERT INTO %s.%s (temporality, metric_name, description, unit, type, is_monotonic, attr_name, attr_type, attr_datatype, attr_string_value, first_reported_unix_milli, last_reported_unix_milli) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" ) type clickhouseMetricsExporter struct { @@ -42,6 +46,7 @@ type clickhouseMetricsExporter struct { samplesSQL string timeSeriesSQL string expHistSQL string + metadataSQL string } // sample represents a single metric sample @@ -88,10 +93,29 @@ type ts struct { resourceAttrs map[string]string } +// metadata represents a single metric metadata +// directly mapped to the table `metadata` schema +type metadata struct { + metricName string + temporality pmetric.AggregationTemporality + description string + unit string + typ pmetric.MetricType + isMonotonic bool + attrName string + attrType string + attrDatatype pcommon.ValueType + attrStringValue string +} + +// writeBatch is a batch of data to be written to the database type writeBatch struct { - samples []sample - expHist []exponentialHistogramSample - ts []ts + samples []sample + expHist []exponentialHistogramSample + ts []ts + metadata []metadata + + metaSeen map[string]struct{} } type ExporterOption func(e *clickhouseMetricsExporter) error @@ -161,6 +185,7 @@ func NewClickHouseExporter(opts ...ExporterOption) (*clickhouseMetricsExporter, chExporter.samplesSQL = fmt.Sprintf(samplesSQLTmpl, chExporter.cfg.Database, chExporter.cfg.SamplesTable) chExporter.timeSeriesSQL = fmt.Sprintf(timeSeriesSQLTmpl, chExporter.cfg.Database, chExporter.cfg.TimeSeriesTable) chExporter.expHistSQL = fmt.Sprintf(expHistSQLTmpl, chExporter.cfg.Database, chExporter.cfg.ExpHistTable) + chExporter.metadataSQL = fmt.Sprintf(metadataSQLTmpl, chExporter.cfg.Database, chExporter.cfg.MetadataTable) return chExporter, nil } @@ -174,6 +199,33 @@ func (c *clickhouseMetricsExporter) Shutdown(ctx context.Context) error { return c.conn.Close() } +func (c *clickhouseMetricsExporter) processMetadata( + batch *writeBatch, name, desc, unit string, typ pmetric.MetricType, temporality pmetric.AggregationTemporality, isMonotonic bool, attrs pcommon.Map, attrType string) { + attrs.Range(func(key string, value pcommon.Value) bool { + // there should never be a conflicting key (either with resource, scope, or point attributes) in metrics + // it breaks the fingerprinting, we assume this will never happen + // even if it does, we will not handle it on our end (because we can't reliably which should take + // precedence), the user should be responsible for ensuring no conflicting keys in their metrics + if _, ok := batch.metaSeen[key]; ok { + return true + } + batch.metaSeen[key] = struct{}{} + batch.metadata = append(batch.metadata, metadata{ + metricName: name, + temporality: temporality, + description: desc, + unit: unit, + typ: typ, + isMonotonic: isMonotonic, + attrName: key, + attrType: attrType, + attrDatatype: value.Type(), + attrStringValue: value.AsString(), + }) + return true + }) +} + // processGauge processes gauge metrics func (c *clickhouseMetricsExporter) processGauge(batch *writeBatch, metric pmetric.Metric, resAttrs pcommon.Map, scopeAttrs pcommon.Map) { name := metric.Name() @@ -189,6 +241,9 @@ func (c *clickhouseMetricsExporter) processGauge(batch *writeBatch, metric pmetr // there is no monotonicity for gauge metrics isMonotonic := false + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, resAttrs, resourceAttrType) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, scopeAttrs, scopeAttrType) + for i := 0; i < metric.Gauge().DataPoints().Len(); i++ { dp := metric.Gauge().DataPoints().At(i) var value float64 @@ -208,6 +263,7 @@ func (c *clickhouseMetricsExporter) processGauge(batch *writeBatch, metric pmetr unixMilli: unixMilli, value: value, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, @@ -242,6 +298,9 @@ func (c *clickhouseMetricsExporter) processSum(batch *writeBatch, metric pmetric } isMonotonic := metric.Sum().IsMonotonic() + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, resAttrs, resourceAttrType) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, scopeAttrs, scopeAttrType) + for i := 0; i < metric.Sum().DataPoints().Len(); i++ { dp := metric.Sum().DataPoints().At(i) var value float64 @@ -261,6 +320,7 @@ func (c *clickhouseMetricsExporter) processSum(batch *writeBatch, metric pmetric unixMilli: unixMilli, value: value, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, @@ -295,6 +355,9 @@ func (c *clickhouseMetricsExporter) processHistogram(batch *writeBatch, metric p // monotonicity is assumed for histograms isMonotonic := true + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, resAttrs, resourceAttrType) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, scopeAttrs, scopeAttrType) + addSample := func(batch *writeBatch, dp pmetric.HistogramDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() var value float64 @@ -317,6 +380,7 @@ func (c *clickhouseMetricsExporter) processHistogram(batch *writeBatch, metric p unixMilli: unixMilli, value: value, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, @@ -354,6 +418,8 @@ func (c *clickhouseMetricsExporter) processHistogram(batch *writeBatch, metric p unixMilli: unixMilli, value: float64(cumulativeCount), }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) + batch.ts = append(batch.ts, ts{ env: env, temporality: temporality, @@ -380,6 +446,7 @@ func (c *clickhouseMetricsExporter) processHistogram(batch *writeBatch, metric p unixMilli: unixMilli, value: float64(dp.Count()), }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, temporality: temporality, @@ -426,6 +493,9 @@ func (c *clickhouseMetricsExporter) processSummary(batch *writeBatch, metric pme // monotonicity is assumed for summaries isMonotonic := true + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, resAttrs, resourceAttrType) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, scopeAttrs, scopeAttrType) + addSample := func(batch *writeBatch, dp pmetric.SummaryDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() var value float64 @@ -444,6 +514,8 @@ func (c *clickhouseMetricsExporter) processSummary(batch *writeBatch, metric pme unixMilli: unixMilli, value: value, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) + batch.ts = append(batch.ts, ts{ env: env, temporality: temporality, @@ -477,6 +549,7 @@ func (c *clickhouseMetricsExporter) processSummary(batch *writeBatch, metric pme unixMilli: unixMilli, value: quantileValue, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, temporality: temporality, @@ -531,6 +604,9 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatc isMonotonic := true + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, resAttrs, resourceAttrType) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, scopeAttrs, scopeAttrType) + addSample := func(batch *writeBatch, dp pmetric.ExponentialHistogramDataPoint, suffix string) { unixMilli := dp.Timestamp().AsTime().UnixMilli() var value float64 @@ -553,6 +629,7 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatc unixMilli: unixMilli, value: value, }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, @@ -608,6 +685,7 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatc min: dp.Min(), max: dp.Max(), }) + c.processMetadata(batch, name, desc, unit, typ, temporality, isMonotonic, pointAttrs, pointAttrType) batch.ts = append(batch.ts, ts{ env: env, @@ -644,7 +722,7 @@ func (c *clickhouseMetricsExporter) processExponentialHistogram(batch *writeBatc } func (c *clickhouseMetricsExporter) prepareBatch(md pmetric.Metrics) *writeBatch { - batch := &writeBatch{} + batch := &writeBatch{metaSeen: make(map[string]struct{})} for i := 0; i < md.ResourceMetrics().Len(); i++ { rm := md.ResourceMetrics().At(i) resAttrs := rm.Resource().Attributes() @@ -688,6 +766,9 @@ func (c *clickhouseMetricsExporter) PushMetrics(ctx context.Context, md pmetric. func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *writeBatch) error { writeTimeSeries := func(ctx context.Context, timeSeries []ts) error { + if len(timeSeries) == 0 { + return nil + } statement, err := c.conn.PrepareBatch(ctx, c.timeSeriesSQL, driver.WithReleaseConnection()) if err != nil { return err @@ -726,6 +807,9 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write } writeSamples := func(ctx context.Context, samples []sample) error { + if len(samples) == 0 { + return nil + } statement, err := c.conn.PrepareBatch(ctx, c.samplesSQL, driver.WithReleaseConnection()) if err != nil { return err @@ -751,6 +835,9 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write } writeExpHist := func(ctx context.Context, expHist []exponentialHistogramSample) error { + if len(expHist) == 0 { + return nil + } statement, err := c.conn.PrepareBatch(ctx, c.expHistSQL, driver.WithReleaseConnection()) if err != nil { return err @@ -779,5 +866,42 @@ func (c *clickhouseMetricsExporter) writeBatch(ctx context.Context, batch *write return err } + writeMetadata := func(ctx context.Context, metadata []metadata) error { + if len(metadata) == 0 { + return nil + } + statement, err := c.conn.PrepareBatch(ctx, c.metadataSQL, driver.WithReleaseConnection()) + if err != nil { + return err + } + for _, meta := range metadata { + err = statement.Append( + meta.metricName, + meta.temporality.String(), + meta.description, + meta.unit, + meta.typ.String(), + meta.isMonotonic, + meta.attrName, + meta.attrType, + meta.attrDatatype, + meta.attrStringValue, + time.Now().UnixMilli(), + time.Now().UnixMilli(), + ) + if err != nil { + return err + } + } + return statement.Send() + } + + if err := writeMetadata(ctx, batch.metadata); err != nil { + // we don't need to return an error here because the metadata is not critical to the operation of the exporter + // and we don't want to cause the exporter to fail if it is not able to write metadata for some reason + // if there were a generic error, it would have been returned in the other write functions + c.logger.Error("error writing metadata", zap.Error(err)) + } + return nil } diff --git a/exporter/clickhousemetricsexporterv2/exporter_test.go b/exporter/clickhousemetricsexporterv2/exporter_test.go index c8089439..895704b2 100644 --- a/exporter/clickhousemetricsexporterv2/exporter_test.go +++ b/exporter/clickhousemetricsexporterv2/exporter_test.go @@ -2,11 +2,14 @@ package clickhousemetricsexporterv2 import ( "fmt" + "math" "strconv" "testing" + chproto "github.com/ClickHouse/ch-go/proto" "github.com/zeebo/assert" "go.opentelemetry.io/collector/pdata/pmetric" + "go.uber.org/zap" ) func Test_prepareBatchGauge(t *testing.T) { @@ -310,21 +313,147 @@ func Test_prepareBatchHistogram(t *testing.T) { } func Test_prepareBatchExponentialHistogram(t *testing.T) { - metrics := generateExponentialHistogramMetrics(1, 1, 1, 1, 1) - exp := &clickhouseMetricsExporter{} + metrics := generateExponentialHistogramMetrics(2, 1, 1, 1, 1) + exp := &clickhouseMetricsExporter{enableExpHist: true, logger: zap.NewNop()} batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) + + expectedSamples := []sample{ + { + env: "", + temporality: pmetric.AggregationTemporalityDelta, + metricName: "http.server.duration1.count", + unixMilli: 1727286182000, + value: 1, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityDelta, + metricName: "http.server.duration1.sum", + unixMilli: 1727286182000, + value: 1, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityDelta, + metricName: "http.server.duration1.min", + unixMilli: 1727286182000, + value: 0, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityDelta, + metricName: "http.server.duration1.max", + unixMilli: 1727286182000, + value: 1, + }, + } + + for idx, sample := range expectedSamples { + curSample := batch.samples[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.value, curSample.value) + } + + expectedExpHistSamples := []exponentialHistogramSample{ + { + env: "", + temporality: pmetric.AggregationTemporalityDelta, + metricName: "http.server.duration1", + unixMilli: 1727286182000, + sketch: chproto.DD{ + Mapping: &chproto.IndexMapping{Gamma: math.Pow(2, math.Pow(2, float64(-2)))}, + PositiveValues: &chproto.Store{ + ContiguousBinIndexOffset: 1, + ContiguousBinCounts: []float64{0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 11, 1, 1, 1, 1, 10}, + }, + NegativeValues: &chproto.Store{ + ContiguousBinIndexOffset: 1, + ContiguousBinCounts: []float64{0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 11, 1, 1, 1, 1, 10}, + }, + ZeroCount: 0, + }, + }, + } + + for idx, sample := range expectedExpHistSamples { + curSample := batch.expHist[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.sketch, curSample.sketch) + } } func Test_prepareBatchSummary(t *testing.T) { - metrics := generateSummaryMetrics(1, 1, 1, 1, 1) + metrics := generateSummaryMetrics(1, 2, 1, 1, 1) exp := &clickhouseMetricsExporter{} batch := exp.prepareBatch(metrics) assert.NotNil(t, batch) + + expectedSamples := []sample{ + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.count", + unixMilli: 1727286182000, + value: 0, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.sum", + unixMilli: 1727286182000, + value: 0, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.quantile", + unixMilli: 1727286182000, + value: 0, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.count", + unixMilli: 1727286183000, + value: 1, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.sum", + unixMilli: 1727286183000, + value: 1, + }, + { + env: "", + temporality: pmetric.AggregationTemporalityUnspecified, + metricName: "zk.duration0.quantile", + unixMilli: 1727286183000, + value: 1, + }, + } + + for idx, sample := range expectedSamples { + curSample := batch.samples[idx] + assert.Equal(t, sample.env, curSample.env) + assert.Equal(t, sample.temporality, curSample.temporality) + assert.Equal(t, sample.metricName, curSample.metricName) + assert.Equal(t, sample.unixMilli, curSample.unixMilli) + assert.Equal(t, sample.value, curSample.value) + } } func Benchmark_prepareBatchGauge(b *testing.B) { - metrics := generateGaugeMetrics(100000, 10, 10, 10, 10) + // 10k gauge metrics * 10 data points = 100k data point in total + // each with 30 total attributes + metrics := generateGaugeMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() exp := &clickhouseMetricsExporter{} @@ -334,7 +463,9 @@ func Benchmark_prepareBatchGauge(b *testing.B) { } func Benchmark_prepareBatchSum(b *testing.B) { - metrics := generateSumMetrics(100000, 10, 10, 10, 10) + // 10k sum * 10 data points = 100k data point in total + // each with 30 total attributes + metrics := generateSumMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() exp := &clickhouseMetricsExporter{} @@ -344,7 +475,9 @@ func Benchmark_prepareBatchSum(b *testing.B) { } func Benchmark_prepareBatchHistogram(b *testing.B) { - metrics := generateHistogramMetrics(10000, 10, 10, 10, 10) + // 1k histogram * 10 datapoints * 20 buckets = 200k samples in total + // each with 30 total attributes + metrics := generateHistogramMetrics(1000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() exp := &clickhouseMetricsExporter{} @@ -354,16 +487,20 @@ func Benchmark_prepareBatchHistogram(b *testing.B) { } func Benchmark_prepareBatchExponentialHistogram(b *testing.B) { + // 1k histogram * 10 datapoints * (20 positive + 20 negative) buckets = 400k samples in total + // each with 30 total attributes metrics := generateExponentialHistogramMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() - exp := &clickhouseMetricsExporter{} + exp := &clickhouseMetricsExporter{enableExpHist: true, logger: zap.NewNop()} for i := 0; i < b.N; i++ { exp.prepareBatch(metrics) } } func Benchmark_prepareBatchSummary(b *testing.B) { + // 10k summary * 10 datapoints = 100k+ samples in total + // each with 30 total attributes metrics := generateSummaryMetrics(10000, 10, 10, 10, 10) b.ResetTimer() b.ReportAllocs() diff --git a/exporter/clickhousemetricsexporterv2/factory.go b/exporter/clickhousemetricsexporterv2/factory.go index b351c0bb..6adccf74 100644 --- a/exporter/clickhousemetricsexporterv2/factory.go +++ b/exporter/clickhousemetricsexporterv2/factory.go @@ -101,6 +101,7 @@ func createDefaultConfig() component.Config { Database: "signoz_metrics", SamplesTable: "distributed_samples_v4", TimeSeriesTable: "distributed_time_series_v4", - ExpHistTable: "distributed_exp_hist_v4", + ExpHistTable: "distributed_exp_hist", + MetadataTable: "distributed_metadata", } } diff --git a/exporter/clickhousemetricsexporterv2/factory_test.go b/exporter/clickhousemetricsexporterv2/factory_test.go index ffa2c564..21aa28ca 100644 --- a/exporter/clickhousemetricsexporterv2/factory_test.go +++ b/exporter/clickhousemetricsexporterv2/factory_test.go @@ -1 +1,13 @@ package clickhousemetricsexporterv2 + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCreateDefaultConfig(t *testing.T) { + factory := NewFactory() + cfg := factory.CreateDefaultConfig() + require.NotNil(t, cfg) +} diff --git a/exporter/clickhousemetricsexporterv2/test_data.go b/exporter/clickhousemetricsexporterv2/test_data.go index 1466def3..a9577433 100644 --- a/exporter/clickhousemetricsexporterv2/test_data.go +++ b/exporter/clickhousemetricsexporterv2/test_data.go @@ -35,15 +35,16 @@ func generateGaugeMetrics(numMetrics, numDataPoints, numPointAttributes, numScop m.SetName("system.memory.usage" + strconv.Itoa(i)) m.SetUnit("bytes") m.SetDescription("memory usage of the host") - dp := m.SetEmptyGauge().DataPoints().AppendEmpty() + dpSlice := m.SetEmptyGauge().DataPoints() for j := 0; j < numDataPoints; j++ { + dp := dpSlice.AppendEmpty() dp.SetIntValue(int64(i)) dp.SetFlags(pmetric.DefaultDataPointFlags) for k := 0; k < numPointAttributes; k++ { dp.Attributes().PutStr("gauge.attr_"+strconv.Itoa(k), "1") } - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182, 0))) - dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182, 0))) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) } } return metrics @@ -77,13 +78,12 @@ func generateSumMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeA } sm.Scope().SetName("go.signoz.io/app/reader") sm.Scope().SetVersion("1.0.0") - timestamp := time.Unix(1727286182, 0) for i := 0; i < numMetrics; i++ { m := sm.Metrics().AppendEmpty() m.SetName("system.cpu.time" + strconv.Itoa(i)) m.SetUnit("s") m.SetDescription("cpu time of the host") - dp := m.SetEmptySum().DataPoints().AppendEmpty() + dpSlice := m.SetEmptySum().DataPoints() if i%2 == 0 { m.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } else { @@ -95,13 +95,14 @@ func generateSumMetrics(numMetrics, numDataPoints, numPointAttributes, numScopeA m.Sum().SetIsMonotonic(false) } for j := 0; j < numDataPoints; j++ { + dp := dpSlice.AppendEmpty() dp.SetIntValue(int64(i)) dp.SetFlags(pmetric.DefaultDataPointFlags) for k := 0; k < numPointAttributes; k++ { dp.Attributes().PutStr("sum.attr_"+strconv.Itoa(k), "1") } - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) - dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) } } return metrics @@ -135,26 +136,26 @@ func generateHistogramMetrics(numMetrics, numDataPoints, numPointAttributes, num } sm.Scope().SetName("go.signoz.io/app/reader") sm.Scope().SetVersion("1.0.0") - timestamp := time.Unix(1727286182, 0) for i := 0; i < numMetrics; i++ { m := sm.Metrics().AppendEmpty() m.SetName("http.server.duration" + strconv.Itoa(i)) m.SetUnit("ms") m.SetDescription("server duration of the http server") - dp := m.SetEmptyHistogram().DataPoints().AppendEmpty() + dpSlice := m.SetEmptyHistogram().DataPoints() if i%2 == 0 { m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } else { m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) } for j := 0; j < numDataPoints; j++ { + dp := dpSlice.AppendEmpty() dp.SetCount(30) dp.SetSum(35) for k := 0; k < numPointAttributes; k++ { dp.Attributes().PutStr("histogram.attr_"+strconv.Itoa(k), "1") } - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) - dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) dp.ExplicitBounds().FromRaw([]float64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19}) dp.BucketCounts().FromRaw([]uint64{1, 1, 1, 1, 1, 5, 1, 1, 1, 1, 1, 1, 12, 1, 1, 1, 1, 1, 1, 1}) dp.SetMin(0) @@ -189,21 +190,22 @@ func generateExponentialHistogramMetrics(numMetrics, numDataPoints, numPointAttr } sm.Scope().SetName("go.signoz.io/app/reader") sm.Scope().SetVersion("1.0.0") - timestamp := time.Unix(1727286182, 0) for i := 0; i < numMetrics; i++ { m := sm.Metrics().AppendEmpty() m.SetName("http.server.duration" + strconv.Itoa(i)) m.SetUnit("ms") m.SetDescription("server duration of the http server but in exponential histogram format") - dp := m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty() + dpSlice := m.SetEmptyExponentialHistogram().DataPoints() if i%2 == 0 { m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) } else { m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityDelta) } for j := 0; j < numDataPoints; j++ { - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) - dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + dp := dpSlice.AppendEmpty() + dp.SetScale(2) + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) dp.SetSum(1) dp.SetMin(0) dp.SetMax(1) @@ -243,24 +245,25 @@ func generateSummaryMetrics(numMetrics, numDataPoints, numPointAttributes, numSc } sm.Scope().SetName("go.signoz.io/app/reader") sm.Scope().SetVersion("1.0.0") - timestamp := time.Unix(1727286182, 0) + for i := 0; i < numMetrics; i++ { m := sm.Metrics().AppendEmpty() m.SetName("zk.duration" + strconv.Itoa(i)) m.SetUnit("ms") m.SetDescription("This is a summary metrics") - dp := m.SetEmptySummary().DataPoints().AppendEmpty() + slice := m.SetEmptySummary().DataPoints() for j := 0; j < numDataPoints; j++ { - dp.SetStartTimestamp(pcommon.NewTimestampFromTime(timestamp)) - dp.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) - dp.SetCount(1) - dp.SetSum(1) + dp := slice.AppendEmpty() + dp.SetStartTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1727286182+int64(j), 0))) + dp.SetCount(uint64(j)) + dp.SetSum(float64(j)) for k := 0; k < numPointAttributes; k++ { dp.Attributes().PutStr("summary.attr_"+strconv.Itoa(k), "1") } quantileValues := dp.QuantileValues().AppendEmpty() - quantileValues.SetValue(1) - quantileValues.SetQuantile(1) + quantileValues.SetValue(float64(j)) + quantileValues.SetQuantile(float64(j)) } } return metrics diff --git a/go.mod b/go.mod index dc35703d..82053347 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/apache/thrift v0.20.0 github.com/aws/aws-sdk-go v1.53.11 github.com/cenkalti/backoff/v4 v4.3.0 + github.com/expr-lang/expr v1.16.9 github.com/gogo/protobuf v1.3.2 github.com/golang-migrate/migrate/v4 v4.15.1 github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da @@ -273,7 +274,6 @@ require ( github.com/envoyproxy/go-control-plane v0.12.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.0.4 // indirect github.com/euank/go-kmsg-parser v2.0.0+incompatible // indirect - github.com/expr-lang/expr v1.16.9 // indirect github.com/fatih/color v1.16.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect github.com/go-faster/city v1.0.1 // indirect @@ -496,7 +496,7 @@ require ( github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/json-iterator/go v1.1.12 // indirect + github.com/json-iterator/go v1.1.12 github.com/klauspost/compress v1.17.8 // indirect github.com/leodido/ragel-machinery v0.0.0-20190525184631-5f46317e436b // indirect github.com/leoluk/perflib_exporter v0.2.1 // indirect From f2f6c95b67960438d55d1faa19f4ef36edec3246 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Mon, 14 Oct 2024 19:07:34 +0530 Subject: [PATCH 3/3] Fix build --- exporter/clickhousemetricsexporterv2/config.go | 10 +++++----- exporter/clickhousemetricsexporterv2/factory.go | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/exporter/clickhousemetricsexporterv2/config.go b/exporter/clickhousemetricsexporterv2/config.go index 8dcedab7..228c21b6 100644 --- a/exporter/clickhousemetricsexporterv2/config.go +++ b/exporter/clickhousemetricsexporterv2/config.go @@ -10,9 +10,9 @@ import ( // Config defines configuration for ClickHouse Metrics exporter. type Config struct { - exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. - configretry.BackOffConfig `mapstructure:"retry_on_failure"` - exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.TimeoutConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configretry.BackOffConfig `mapstructure:"retry_on_failure"` + exporterhelper.QueueConfig `mapstructure:"sending_queue"` DSN string `mapstructure:"dsn"` @@ -32,11 +32,11 @@ func (cfg *Config) Validate() error { if cfg.DSN == "" { return errors.New("dsn must be specified") } - if err := cfg.QueueSettings.Validate(); err != nil { + if err := cfg.QueueConfig.Validate(); err != nil { return err } - if err := cfg.TimeoutSettings.Validate(); err != nil { + if err := cfg.TimeoutConfig.Validate(); err != nil { return err } diff --git a/exporter/clickhousemetricsexporterv2/factory.go b/exporter/clickhousemetricsexporterv2/factory.go index 6adccf74..dbc3306c 100644 --- a/exporter/clickhousemetricsexporterv2/factory.go +++ b/exporter/clickhousemetricsexporterv2/factory.go @@ -44,7 +44,7 @@ func NewFactory() exporter.Factory { exporter.WithMetrics(createMetricsExporter, component.StabilityLevelUndefined)) } -func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, +func createMetricsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Metrics, error) { chCfg, ok := cfg.(*Config) @@ -77,8 +77,8 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, set, cfg, chExporter.PushMetrics, - exporterhelper.WithTimeout(chCfg.TimeoutSettings), - exporterhelper.WithQueue(chCfg.QueueSettings), + exporterhelper.WithTimeout(chCfg.TimeoutConfig), + exporterhelper.WithQueue(chCfg.QueueConfig), exporterhelper.WithRetry(chCfg.BackOffConfig), exporterhelper.WithStart(chExporter.Start), exporterhelper.WithShutdown(chExporter.Shutdown), @@ -93,9 +93,9 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, func createDefaultConfig() component.Config { return &Config{ - TimeoutSettings: exporterhelper.NewDefaultTimeoutSettings(), + TimeoutConfig: exporterhelper.NewDefaultTimeoutConfig(), BackOffConfig: configretry.NewDefaultBackOffConfig(), - QueueSettings: exporterhelper.NewDefaultQueueSettings(), + QueueConfig: exporterhelper.NewDefaultQueueConfig(), DSN: "tcp://localhost:9000", EnableExpHist: false, Database: "signoz_metrics",