diff --git a/cmd/cli/config.go b/cmd/cli/config.go index 7dd121a29..0870ecaf1 100644 --- a/cmd/cli/config.go +++ b/cmd/cli/config.go @@ -13,10 +13,11 @@ type config struct { } type cleanupConfig struct { - Whitelist []string `yaml:"whitelist"` - Delete bool `yaml:"delete"` - AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"` - CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"` + Whitelist []string `yaml:"whitelist"` + Delete bool `yaml:"delete"` + AddAnonymousToWhitelist bool `json:"add_anonymous_to_whitelist"` + CleanupMetricsDuration string `yaml:"cleanup_metrics_duration"` + CleanupFutureMetricsDuration string `yaml:"cleanup_future_metrics_duration"` } func getDefault() config { @@ -30,8 +31,9 @@ func getDefault() config { DialTimeout: "500ms", }, Cleanup: cleanupConfig{ - Whitelist: []string{}, - CleanupMetricsDuration: "-168h", + Whitelist: []string{}, + CleanupMetricsDuration: "-168h", + CleanupFutureMetricsDuration: "60m", }, } } diff --git a/cmd/cli/main.go b/cmd/cli/main.go index f6a01fb9e..f981e8044 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -46,14 +46,15 @@ var plotting = flag.Bool("plotting", false, "enable images in all notifications" var removeSubscriptions = flag.String("remove-subscriptions", "", "Remove given subscriptions separated by semicolons.") var ( - cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users") - cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.") - cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.") - cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.") - cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.") - userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user") - fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.") - toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.") + cleanupUsers = flag.Bool("cleanup-users", false, "Disable/delete contacts and subscriptions of missing users") + cleanupLastChecks = flag.Bool("cleanup-last-checks", false, "Delete abandoned triggers last checks.") + cleanupTags = flag.Bool("cleanup-tags", false, "Delete abandoned tags.") + cleanupMetrics = flag.Bool("cleanup-metrics", false, "Delete outdated metrics.") + cleanupFutureMetrics = flag.Bool("cleanup-future-metrics", false, "Delete metrics with future timestamps.") + cleanupRetentions = flag.Bool("cleanup-retentions", false, "Delete abandoned retentions.") + userDel = flag.String("user-del", "", "Delete all contacts and subscriptions for a user") + fromUser = flag.String("from-user", "", "Transfer subscriptions and contacts from user.") + toUser = flag.String("to-user", "", "Transfer subscriptions and contacts to user.") ) var ( @@ -231,8 +232,8 @@ func main() { //nolint log := logger.String(moira.LogFieldNameContext, "cleanup-metrics") log.Info().Msg("Cleanup of outdated metrics started") - err := handleCleanUpOutdatedMetrics(confCleanup, database) - if err != nil { + + if err := handleCleanUpOutdatedMetrics(confCleanup, database); err != nil { log.Error(). Error(err). Msg("Failed to cleanup outdated metrics") @@ -252,6 +253,20 @@ func main() { //nolint log.Info().Msg("Cleanup of outdated metrics finished") } + if *cleanupFutureMetrics { + log := logger.String(moira.LogFieldNameContext, "cleanup-future-metrics") + + log.Info().Msg("Cleanup of future metrics started") + + if err := handleCleanUpFutureMetrics(confCleanup, database); err != nil { + log.Error(). + Error(err). + Msg("Failed to cleanup future metrics") + } + + log.Info().Msg("Cleanup of future metrics finished") + } + if *cleanupLastChecks { log := logger.String(moira.LogFieldNameContext, "cleanup-last-checks") diff --git a/cmd/cli/metrics.go b/cmd/cli/metrics.go index 8138a1110..b4b0d61ff 100644 --- a/cmd/cli/metrics.go +++ b/cmd/cli/metrics.go @@ -12,11 +12,23 @@ func handleCleanUpOutdatedMetrics(config cleanupConfig, database moira.Database) return err } - err = database.CleanUpOutdatedMetrics(duration) + if err = database.CleanUpOutdatedMetrics(duration); err != nil { + return err + } + + return nil +} + +func handleCleanUpFutureMetrics(config cleanupConfig, database moira.Database) error { + duration, err := time.ParseDuration(config.CleanupFutureMetricsDuration) if err != nil { return err } + if err = database.CleanUpFutureMetrics(duration); err != nil { + return err + } + return nil } diff --git a/cmd/cli/metrics_test.go b/cmd/cli/metrics_test.go index 012547b6b..1e268f46d 100644 --- a/cmd/cli/metrics_test.go +++ b/cmd/cli/metrics_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/moira-alert/moira/database/redis" mocks "github.com/moira-alert/moira/mock/moira-alert" "github.com/golang/mock/gomock" @@ -16,9 +17,40 @@ func TestCleanUpOutdatedMetrics(t *testing.T) { defer mockCtrl.Finish() db := mocks.NewMockDatabase(mockCtrl) - Convey("Test cleanup", t, func() { - db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil) - err := handleCleanUpOutdatedMetrics(conf.Cleanup, db) - So(err, ShouldBeNil) + Convey("Test cleanup outdated metrics", t, func() { + Convey("With valid duration", func() { + db.EXPECT().CleanUpOutdatedMetrics(-168 * time.Hour).Return(nil) + err := handleCleanUpOutdatedMetrics(conf.Cleanup, db) + So(err, ShouldBeNil) + }) + + Convey("With invalid duration", func() { + conf.Cleanup.CleanupMetricsDuration = "168h" + db.EXPECT().CleanUpOutdatedMetrics(168 * time.Hour).Return(redis.ErrCleanUpDurationGreaterThanZero) + err := handleCleanUpOutdatedMetrics(conf.Cleanup, db) + So(err, ShouldEqual, redis.ErrCleanUpDurationGreaterThanZero) + }) + }) +} + +func TestCleanUpFutureMetrics(t *testing.T) { + conf := getDefault() + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + db := mocks.NewMockDatabase(mockCtrl) + + Convey("Test cleanup future metrics", t, func() { + Convey("With valid duration", func() { + db.EXPECT().CleanUpFutureMetrics(60 * time.Minute).Return(nil) + err := handleCleanUpFutureMetrics(conf.Cleanup, db) + So(err, ShouldBeNil) + }) + + Convey("With invalid duration", func() { + conf.Cleanup.CleanupFutureMetricsDuration = "-60m" + db.EXPECT().CleanUpFutureMetrics(-60 * time.Minute).Return(redis.ErrCleanUpDurationLessThanZero) + err := handleCleanUpFutureMetrics(conf.Cleanup, db) + So(err, ShouldEqual, redis.ErrCleanUpDurationLessThanZero) + }) }) } diff --git a/database/redis/metric.go b/database/redis/metric.go index a47eac8b4..0e12806a6 100644 --- a/database/redis/metric.go +++ b/database/redis/metric.go @@ -18,6 +18,11 @@ import ( "gopkg.in/tomb.v2" ) +var ( + ErrCleanUpDurationLessThanZero = errors.New("clean up duration value must be greater than zero, otherwise the current metrics may be deleted") + ErrCleanUpDurationGreaterThanZero = errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed") +) + func (connector *DbConnector) addPatterns(patterns ...string) error { ctx := connector.context client := *connector.client @@ -321,15 +326,17 @@ func (connector *DbConnector) RemoveMetricRetention(metric string) error { return nil } -// RemoveMetricValues remove metric timestamps values from 0 to given time. -func (connector *DbConnector) RemoveMetricValues(metric string, toTime int64) (int64, error) { +// RemoveMetricValues remove values by metrics from the interval of passed parameters, if they are not in the metricsCache. +// In from and to, expect either -inf, +inf, or timestamps as strings. +func (connector *DbConnector) RemoveMetricValues(metric string, from, to string) (int64, error) { if !connector.needRemoveMetrics(metric) { return 0, nil } + c := *connector.client - result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), "-inf", strconv.FormatInt(toTime, 10)).Result() + result, err := c.ZRemRangeByScore(connector.context, metricDataKey(metric), from, to).Result() if err != nil { - return 0, fmt.Errorf("failed to remove metrics from -inf to %v, error: %w", toTime, err) + return 0, fmt.Errorf("failed to remove metrics from %s to %s, error: %w", from, to, err) } return result, nil @@ -359,23 +366,25 @@ func (connector *DbConnector) needRemoveMetrics(metric string) bool { return err == nil } -func cleanUpOutdatedMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, duration time.Duration) error { +func cleanUpMetricsOnRedisNode(connector *DbConnector, client redis.UniversalClient, from, to string) error { metricsIterator := client.ScanType(connector.context, 0, metricDataKey("*"), 0, "zset").Iterator() var count int64 for metricsIterator.Next(connector.context) { key := metricsIterator.Val() metric := strings.TrimPrefix(key, metricDataKey("")) - deletedCount, err := flushMetric(connector, metric, duration) + + deletedCount, err := connector.RemoveMetricValues(metric, from, to) if err != nil { return err } + count += deletedCount } connector.logger.Info(). Int64("count deleted metrics", count). - Msg("Cleaned up usefully metrics for trigger") + Msg("Cleaned up metrics") return nil } @@ -403,11 +412,29 @@ func cleanUpAbandonedRetentionsOnRedisNode(connector *DbConnector, client redis. func (connector *DbConnector) CleanUpOutdatedMetrics(duration time.Duration) error { if duration >= 0 { - return errors.New("clean up duration value must be less than zero, otherwise all metrics will be removed") + return ErrCleanUpDurationGreaterThanZero } + from := "-inf" + toTs := time.Now().UTC().Add(duration).Unix() + to := strconv.FormatInt(toTs, 10) + return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error { - return cleanUpOutdatedMetricsOnRedisNode(connector, client, duration) + return cleanUpMetricsOnRedisNode(connector, client, from, to) + }) +} + +func (connector *DbConnector) CleanUpFutureMetrics(duration time.Duration) error { + if duration <= 0 { + return ErrCleanUpDurationLessThanZero + } + + fromTs := connector.clock.Now().Add(duration).Unix() + from := strconv.FormatInt(fromTs, 10) + to := "+inf" + + return connector.callFunc(func(connector *DbConnector, client redis.UniversalClient) error { + return cleanUpMetricsOnRedisNode(connector, client, from, to) }) } @@ -564,16 +591,6 @@ func (connector *DbConnector) RemoveAllMetrics() error { return connector.callFunc(removeAllMetricsOnRedisNode) } -func flushMetric(database moira.Database, metric string, duration time.Duration) (int64, error) { - lastTs := time.Now().UTC() - toTs := lastTs.Add(duration).Unix() - deletedCount, err := database.RemoveMetricValues(metric, toTs) - if err != nil { - return deletedCount, err - } - return deletedCount, nil -} - var patternsListKey = "moira-pattern-list" var metricEventsChannels = []string{ diff --git a/database/redis/metric_test.go b/database/redis/metric_test.go index 90dc25e63..a6e96e1cb 100644 --- a/database/redis/metric_test.go +++ b/database/redis/metric_test.go @@ -3,17 +3,25 @@ package redis import ( "errors" "fmt" + "strconv" "testing" "time" + "github.com/golang/mock/gomock" "github.com/moira-alert/moira" logging "github.com/moira-alert/moira/logging/zerolog_adapter" + mock_clock "github.com/moira-alert/moira/mock/clock" "github.com/patrickmn/go-cache" . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" "gopkg.in/tomb.v2" ) +const ( + toInf = "+inf" + fromInf = "-inf" +) + func TestMetricsStoring(t *testing.T) { logger, _ := logging.GetLogger("dataBase") dataBase := NewTestDatabase(logger) @@ -237,42 +245,45 @@ func TestRemoveMetricValues(t *testing.T) { dataBase.metricsCache = cache.New(time.Second*2, time.Minute*60) dataBase.Flush() defer dataBase.Flush() - metric1 := "my.test.super.metric" - pattern := "my.test.*.metric*" - met1 := &moira.MatchedMetric{ - Patterns: []string{pattern}, - Metric: metric1, - Retention: 10, - RetentionTimestamp: 10, - Timestamp: 15, - Value: 1, - } - met2 := &moira.MatchedMetric{ - Patterns: []string{pattern}, - Metric: metric1, - Retention: 10, - RetentionTimestamp: 20, - Timestamp: 24, - Value: 2, - } - met3 := &moira.MatchedMetric{ - Patterns: []string{pattern}, - Metric: metric1, - Retention: 10, - RetentionTimestamp: 30, - Timestamp: 34, - Value: 3, - } - met4 := &moira.MatchedMetric{ - Patterns: []string{pattern}, - Metric: metric1, - Retention: 10, - RetentionTimestamp: 40, - Timestamp: 46, - Value: 4, - } - - Convey("Test", t, func() { + + Convey("Test that old metrics will be deleted", t, func() { + metric1 := "my.test.super.metric" + pattern := "my.test.*.metric*" + met1 := &moira.MatchedMetric{ + Patterns: []string{pattern}, + Metric: metric1, + Retention: 10, + RetentionTimestamp: 10, + Timestamp: 15, + Value: 1, + } + met2 := &moira.MatchedMetric{ + Patterns: []string{pattern}, + Metric: metric1, + Retention: 10, + RetentionTimestamp: 20, + Timestamp: 24, + Value: 2, + } + met3 := &moira.MatchedMetric{ + Patterns: []string{pattern}, + Metric: metric1, + Retention: 10, + RetentionTimestamp: 30, + Timestamp: 34, + Value: 3, + } + met4 := &moira.MatchedMetric{ + Patterns: []string{pattern}, + Metric: metric1, + Retention: 10, + RetentionTimestamp: 40, + Timestamp: 46, + Value: 4, + } + + from := fromInf + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric1: met1}) So(err, ShouldBeNil) // Save metric with changed retention err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric1: met2}) @@ -293,7 +304,9 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - deletedCount, err := dataBase.RemoveMetricValues(metric1, 11) + var toTs int64 = 11 + to := strconv.FormatInt(toTs, 10) + deletedCount, err := dataBase.RemoveMetricValues(metric1, from, to) So(err, ShouldBeNil) So(deletedCount, ShouldResemble, int64(1)) @@ -307,7 +320,9 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - deletedCount, err = dataBase.RemoveMetricValues(metric1, 22) + toTs = 22 + to = strconv.FormatInt(toTs, 10) + deletedCount, err = dataBase.RemoveMetricValues(metric1, from, to) So(err, ShouldBeNil) So(deletedCount, ShouldResemble, int64(0)) @@ -334,7 +349,7 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - time.Sleep(time.Second * 2) + dataBase.metricsCache.Flush() err = dataBase.RemoveMetricsValues([]string{metric1}, 22) So(err, ShouldBeNil) @@ -348,9 +363,11 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - time.Sleep(time.Second * 2) + dataBase.metricsCache.Flush() - deletedCount, err = dataBase.RemoveMetricValues(metric1, 30) + toTs = 30 + to = strconv.FormatInt(toTs, 10) + deletedCount, err = dataBase.RemoveMetricValues(metric1, from, to) So(err, ShouldBeNil) So(deletedCount, ShouldResemble, int64(1)) @@ -362,9 +379,11 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - time.Sleep(time.Second * 2) + dataBase.metricsCache.Flush() - deletedCount, err = dataBase.RemoveMetricValues(metric1, 39) + toTs = 39 + to = strconv.FormatInt(toTs, 10) + deletedCount, err = dataBase.RemoveMetricValues(metric1, from, to) So(err, ShouldBeNil) So(deletedCount, ShouldResemble, int64(0)) @@ -376,9 +395,11 @@ func TestRemoveMetricValues(t *testing.T) { }, }) - time.Sleep(time.Second * 2) + dataBase.metricsCache.Flush() - deletedCount, err = dataBase.RemoveMetricValues(metric1, 49) + toTs = 49 + to = strconv.FormatInt(toTs, 10) + deletedCount, err = dataBase.RemoveMetricValues(metric1, from, to) So(err, ShouldBeNil) So(deletedCount, ShouldResemble, int64(1)) @@ -386,6 +407,215 @@ func TestRemoveMetricValues(t *testing.T) { So(err, ShouldBeNil) So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{metric1: {}}) }) + + Convey("Test remove metric values", t, func() { + metric := "metric1" + + metric1 := &moira.MatchedMetric{ + Metric: metric, + RetentionTimestamp: 10, + Timestamp: 10, + Retention: 10, + Value: 1, + } + + metric2 := &moira.MatchedMetric{ + Metric: metric, + RetentionTimestamp: 20, + Timestamp: 20, + Retention: 10, + Value: 2, + } + + metric3 := &moira.MatchedMetric{ + Metric: metric, + RetentionTimestamp: 30, + Timestamp: 30, + Retention: 10, + Value: 3, + } + + metric4 := &moira.MatchedMetric{ + Metric: metric, + RetentionTimestamp: 40, + Timestamp: 40, + Retention: 10, + Value: 4, + } + + metric5 := &moira.MatchedMetric{ + Metric: metric, + RetentionTimestamp: 50, + Timestamp: 50, + Retention: 10, + Value: 5, + } + + Convey("Deleting metrics over the entire interval (from -inf to +inf)", func() { + defer func() { + dataBase.metricsCache.Flush() + }() + + from := fromInf + to := toInf + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric1}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric2}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric3}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric4}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric5}) + So(err, ShouldBeNil) + + dataBase.metricsCache.Flush() + + actualValues, err := dataBase.GetMetricsValues([]string{metric}, 1, 99) + So(err, ShouldBeNil) + So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{ + metric: { + &moira.MetricValue{Timestamp: 10, RetentionTimestamp: 10, Value: 1}, + &moira.MetricValue{Timestamp: 20, RetentionTimestamp: 20, Value: 2}, + &moira.MetricValue{Timestamp: 30, RetentionTimestamp: 30, Value: 3}, + &moira.MetricValue{Timestamp: 40, RetentionTimestamp: 40, Value: 4}, + &moira.MetricValue{Timestamp: 50, RetentionTimestamp: 50, Value: 5}, + }, + }) + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, from, to) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 5) + + actualValues, err = dataBase.GetMetricsValues([]string{metric}, 1, 99) + So(err, ShouldBeNil) + So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{metric: {}}) + }) + + Convey("Deletion of metrics on the interval up to the first metric (from -inf to 5)", func() { + defer func() { + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, fromInf, toInf) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 5) + }() + + from := fromInf + to := "5" + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric1}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric2}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric3}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric4}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric5}) + So(err, ShouldBeNil) + + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, from, to) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 0) + + actualValues, err := dataBase.GetMetricsValues([]string{metric}, 1, 99) + So(err, ShouldBeNil) + So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{ + metric: { + &moira.MetricValue{Timestamp: 10, RetentionTimestamp: 10, Value: 1}, + &moira.MetricValue{Timestamp: 20, RetentionTimestamp: 20, Value: 2}, + &moira.MetricValue{Timestamp: 30, RetentionTimestamp: 30, Value: 3}, + &moira.MetricValue{Timestamp: 40, RetentionTimestamp: 40, Value: 4}, + &moira.MetricValue{Timestamp: 50, RetentionTimestamp: 50, Value: 5}, + }, + }) + }) + + Convey("Deleting metrics on the interval after the last metric (from 60 to +inf)", func() { + defer func() { + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, fromInf, toInf) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 5) + }() + + from := "60" + to := toInf + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric1}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric2}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric3}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric4}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric5}) + So(err, ShouldBeNil) + + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, from, to) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 0) + + actualValues, err := dataBase.GetMetricsValues([]string{metric}, 1, 99) + So(err, ShouldBeNil) + So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{ + metric: { + &moira.MetricValue{Timestamp: 10, RetentionTimestamp: 10, Value: 1}, + &moira.MetricValue{Timestamp: 20, RetentionTimestamp: 20, Value: 2}, + &moira.MetricValue{Timestamp: 30, RetentionTimestamp: 30, Value: 3}, + &moira.MetricValue{Timestamp: 40, RetentionTimestamp: 40, Value: 4}, + &moira.MetricValue{Timestamp: 50, RetentionTimestamp: 50, Value: 5}, + }, + }) + }) + + Convey("Deleting metrics inside the metric interval (from 20 to 40)", func() { + defer func() { + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, fromInf, toInf) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 2) + }() + + from := "20" + to := "40" + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric1}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric2}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric3}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric4}) + So(err, ShouldBeNil) + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{metric: metric5}) + So(err, ShouldBeNil) + + dataBase.metricsCache.Flush() + + deletedMetrics, err := dataBase.RemoveMetricValues(metric, from, to) + So(err, ShouldBeNil) + So(deletedMetrics, ShouldEqual, 3) + + actualValues, err := dataBase.GetMetricsValues([]string{metric}, 1, 99) + So(err, ShouldBeNil) + So(actualValues, ShouldResemble, map[string][]*moira.MetricValue{ + metric: { + &moira.MetricValue{Timestamp: 10, RetentionTimestamp: 10, Value: 1}, + &moira.MetricValue{Timestamp: 50, RetentionTimestamp: 50, Value: 5}, + }, + }) + }) + }) } func TestMetricSubscription(t *testing.T) { @@ -496,7 +726,10 @@ func TestMetricsStoringErrorConnection(t *testing.T) { err = dataBase.RemovePatternWithMetrics("123") So(err, ShouldNotBeNil) - deletedCount, err := dataBase.RemoveMetricValues("123", 1) + from := fromInf + var toTs int64 = 1 + to := strconv.FormatInt(toTs, 10) + deletedCount, err := dataBase.RemoveMetricValues("123", from, to) So(err, ShouldNotBeNil) So(deletedCount, ShouldResemble, int64(0)) @@ -658,6 +891,182 @@ func TestCleanupOutdatedMetrics(t *testing.T) { }) } +func TestCleanupFutureMetrics(t *testing.T) { + logger, _ := logging.ConfigureLog("stdout", "warn", "test", true) + dataBase := NewTestDatabase(logger) + dataBase.Flush() + defer dataBase.Flush() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + mockClock := mock_clock.NewMockClock(mockCtrl) + + dataBase.clock = mockClock + + testTime := time.Date(2022, time.June, 6, 10, 0, 0, 0, time.UTC) + testTimeUnix := testTime.Unix() + + retention := 10 + + Convey("Test clean up future metrics", t, func() { + const ( + metric1 = "metric1" + metric2 = "metric2" + metric3 = "metric3" + ) + + matchedMetric1 := &moira.MatchedMetric{ + Metric: metric1, + Value: 1, + Timestamp: testTimeUnix, + RetentionTimestamp: testTimeUnix, + Retention: retention, + } + + matchedMetric2 := &moira.MatchedMetric{ + Metric: metric1, + Value: 2, + Timestamp: testTimeUnix + int64(retention), + RetentionTimestamp: testTimeUnix + int64(retention), + Retention: retention, + } + + matchedMetric3 := &moira.MatchedMetric{ + Metric: metric2, + Value: 3, + Timestamp: testTimeUnix, + RetentionTimestamp: testTimeUnix, + Retention: retention, + } + + matchedMetric4 := &moira.MatchedMetric{ + Metric: metric2, + Value: 4, + Timestamp: testTimeUnix + int64(retention), + RetentionTimestamp: testTimeUnix + int64(retention), + Retention: retention, + } + + matchedMetric5 := &moira.MatchedMetric{ + Metric: metric3, + Value: 5, + Timestamp: testTimeUnix, + RetentionTimestamp: testTimeUnix, + Retention: retention, + } + + Convey("Without future metrics", func() { + defer func() { + dataBase.metricsCache.Flush() + }() + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{ + metric1: matchedMetric1, + metric2: matchedMetric3, + metric3: matchedMetric5, + }) + So(err, ShouldBeNil) + + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{ + metric1: matchedMetric2, + metric2: matchedMetric4, + }) + So(err, ShouldBeNil) + + mockClock.EXPECT().Now().Return(testTime).Times(1) + + err = dataBase.CleanUpFutureMetrics(time.Hour) + So(err, ShouldBeNil) + + actualMetrics, err := dataBase.GetMetricsValues([]string{metric1, metric2, metric3}, testTimeUnix, testTimeUnix+int64(retention)) + So(actualMetrics, ShouldResemble, map[string][]*moira.MetricValue{ + metric1: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 1, + }, + { + RetentionTimestamp: testTimeUnix + int64(retention), + Timestamp: testTimeUnix + int64(retention), + Value: 2, + }, + }, + metric2: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 3, + }, + { + RetentionTimestamp: testTimeUnix + int64(retention), + Timestamp: testTimeUnix + int64(retention), + Value: 4, + }, + }, + metric3: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 5, + }, + }, + }) + So(err, ShouldBeNil) + }) + + Convey("With future metrics", func() { + defer func() { + dataBase.metricsCache.Flush() + }() + + err := dataBase.SaveMetrics(map[string]*moira.MatchedMetric{ + metric1: matchedMetric1, + metric2: matchedMetric3, + metric3: matchedMetric5, + }) + So(err, ShouldBeNil) + + err = dataBase.SaveMetrics(map[string]*moira.MatchedMetric{ + metric1: matchedMetric2, + metric2: matchedMetric4, + }) + So(err, ShouldBeNil) + + mockClock.EXPECT().Now().Return(testTime).Times(1) + + err = dataBase.CleanUpFutureMetrics(5 * time.Second) + So(err, ShouldBeNil) + + actualMetrics, err := dataBase.GetMetricsValues([]string{metric1, metric2, metric3}, testTimeUnix, testTimeUnix+int64(retention)) + So(actualMetrics, ShouldResemble, map[string][]*moira.MetricValue{ + metric1: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 1, + }, + }, + metric2: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 3, + }, + }, + metric3: { + { + RetentionTimestamp: testTimeUnix, + Timestamp: testTimeUnix, + Value: 5, + }, + }, + }) + So(err, ShouldBeNil) + }) + }) +} + func TestCleanupOutdatedPatternMetrics(t *testing.T) { logger, _ := logging.ConfigureLog("stdout", "warn", "test", true) dataBase := NewTestDatabase(logger) diff --git a/filter/metrics_parser.go b/filter/metrics_parser.go index 2a0fa5922..ad403abf4 100644 --- a/filter/metrics_parser.go +++ b/filter/metrics_parser.go @@ -100,9 +100,9 @@ func (metric ParsedMetric) IsTagged() bool { return len(metric.Labels) > 0 } -// IsTooOld checks that metric is old to parse it. -func (metric ParsedMetric) IsTooOld(maxTTL time.Duration, now time.Time) bool { - return moira.Int64ToTime(metric.Timestamp).Add(maxTTL).Before(now) +// IsExpired checks if the metric is in the window from maxTTL. +func (metric ParsedMetric) IsExpired(maxTTL time.Duration, now time.Time) bool { + return moira.Int64ToTime(metric.Timestamp).Add(maxTTL).Before(now) || now.Add(maxTTL).Before(moira.Int64ToTime(metric.Timestamp)) } func parseNameAndLabels(metricBytes []byte) (string, map[string]string, error) { diff --git a/filter/metrics_parser_test.go b/filter/metrics_parser_test.go index fc01b17fa..abfd1bd10 100644 --- a/filter/metrics_parser_test.go +++ b/filter/metrics_parser_test.go @@ -213,22 +213,36 @@ func TestRestoreMetricStringByNameAndLabels(t *testing.T) { }) } -func TestParsedMetric_IsTooOld(t *testing.T) { +func TestParsedMetric_IsExpired(t *testing.T) { now := time.Date(2022, 6, 16, 10, 0, 0, 0, time.UTC) maxTTL := time.Hour - Convey("When metric is old, return true", t, func() { - metric := ParsedMetric{ - Name: "too old metric", - Timestamp: time.Date(2022, 6, 16, 8, 59, 0, 0, time.UTC).Unix(), - } - So(metric.IsTooOld(maxTTL, now), ShouldBeTrue) - }) - Convey("When metric is young, return false", t, func() { - metric := ParsedMetric{ - Name: "too old metric", - Timestamp: time.Date(2022, 6, 16, 9, 0o0, 0, 0, time.UTC).Unix(), - } - So(metric.IsTooOld(maxTTL, now), ShouldBeFalse) + Convey("Test isExpired metric", t, func() { + Convey("Metric too old", func() { + metric := ParsedMetric{ + Name: "metric1", + Timestamp: now.Add(-2 * maxTTL).Unix(), + } + + So(metric.IsExpired(maxTTL, now), ShouldBeTrue) + }) + + Convey("Metric far into the future", func() { + metric := ParsedMetric{ + Name: "metric2", + Timestamp: now.Add(2 * maxTTL).Unix(), + } + + So(metric.IsExpired(maxTTL, now), ShouldBeTrue) + }) + + Convey("Metric in the maxTTL window", func() { + metric := ParsedMetric{ + Name: "metric3", + Timestamp: now.Unix(), + } + + So(metric.IsExpired(maxTTL, now), ShouldBeFalse) + }) }) } diff --git a/filter/patterns_storage.go b/filter/patterns_storage.go index 10b8437af..175fd7ba6 100644 --- a/filter/patterns_storage.go +++ b/filter/patterns_storage.go @@ -78,11 +78,11 @@ func (storage *PatternStorage) ProcessIncomingMetric(lineBytes []byte, maxTTL ti return nil } - if parsedMetric.IsTooOld(maxTTL, storage.clock.Now()) { + if parsedMetric.IsExpired(maxTTL, storage.clock.Now()) { storage.logger.Debug(). String(moira.LogFieldNameMetricName, parsedMetric.Name). String(moira.LogFieldNameMetricTimestamp, fmt.Sprint(parsedMetric.Timestamp)). - Msg("Metric is too old") + Msg("Metric is not in the window from maxTTL") return nil } diff --git a/interfaces.go b/interfaces.go index f30e482b5..15f294e64 100644 --- a/interfaces.go +++ b/interfaces.go @@ -108,7 +108,7 @@ type Database interface { GetMetricRetention(metric string) (int64, error) GetMetricsValues(metrics []string, from int64, until int64) (map[string][]*MetricValue, error) RemoveMetricRetention(metric string) error - RemoveMetricValues(metric string, toTime int64) (int64, error) + RemoveMetricValues(metric string, from, to string) (int64, error) RemoveMetricsValues(metrics []string, toTime int64) error GetMetricsTTLSeconds() int64 @@ -150,6 +150,7 @@ type Database interface { // Metrics management CleanUpOutdatedMetrics(duration time.Duration) error + CleanUpFutureMetrics(duration time.Duration) error CleanupOutdatedPatternMetrics() (int64, error) CleanUpAbandonedRetentions() error RemoveMetricsByPrefix(pattern string) error diff --git a/local/cli.yml b/local/cli.yml index f41e92256..21ef388c8 100644 --- a/local/cli.yml +++ b/local/cli.yml @@ -7,3 +7,6 @@ log_pretty_format: true cleanup: # Default cleanup duration according to max TTL for metrics = 7 days cleanup_metrics_duration: "-168h" + # Specifies the time from which metrics written to the future will be deleted + # Defaults to 1 hour + cleanup_future_metrics_duration: "60m" diff --git a/mock/moira-alert/database.go b/mock/moira-alert/database.go index 5e42492c7..0267dd269 100644 --- a/mock/moira-alert/database.go +++ b/mock/moira-alert/database.go @@ -149,6 +149,20 @@ func (mr *MockDatabaseMockRecorder) CleanUpAbandonedTriggerLastCheck() *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanUpAbandonedTriggerLastCheck", reflect.TypeOf((*MockDatabase)(nil).CleanUpAbandonedTriggerLastCheck)) } +// CleanUpFutureMetrics mocks base method. +func (m *MockDatabase) CleanUpFutureMetrics(arg0 time.Duration) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CleanUpFutureMetrics", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// CleanUpFutureMetrics indicates an expected call of CleanUpFutureMetrics. +func (mr *MockDatabaseMockRecorder) CleanUpFutureMetrics(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanUpFutureMetrics", reflect.TypeOf((*MockDatabase)(nil).CleanUpFutureMetrics), arg0) +} + // CleanUpOutdatedMetrics mocks base method. func (m *MockDatabase) CleanUpOutdatedMetrics(arg0 time.Duration) error { m.ctrl.T.Helper() @@ -1144,18 +1158,18 @@ func (mr *MockDatabaseMockRecorder) RemoveMetricRetention(arg0 interface{}) *gom } // RemoveMetricValues mocks base method. -func (m *MockDatabase) RemoveMetricValues(arg0 string, arg1 int64) (int64, error) { +func (m *MockDatabase) RemoveMetricValues(arg0, arg1, arg2 string) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RemoveMetricValues", arg0, arg1) + ret := m.ctrl.Call(m, "RemoveMetricValues", arg0, arg1, arg2) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // RemoveMetricValues indicates an expected call of RemoveMetricValues. -func (mr *MockDatabaseMockRecorder) RemoveMetricValues(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockDatabaseMockRecorder) RemoveMetricValues(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveMetricValues", reflect.TypeOf((*MockDatabase)(nil).RemoveMetricValues), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveMetricValues", reflect.TypeOf((*MockDatabase)(nil).RemoveMetricValues), arg0, arg1, arg2) } // RemoveMetricsByPrefix mocks base method.