From 11f619c898a14be0d1c8b160e73bf4c6f723421e Mon Sep 17 00:00:00 2001 From: Anton Kolesnikov Date: Mon, 21 Oct 2024 22:09:53 +0800 Subject: [PATCH] feat(v2): revive grpc health checks --- pkg/experiment/ingester/client/client.go | 19 +-- pkg/experiment/ingester/segment.go | 51 ++++---- pkg/experiment/ingester/segment_test.go | 40 +++---- pkg/experiment/ingester/service.go | 111 +++++++++++++++--- pkg/experiment/metastore/client/client.go | 16 ++- pkg/experiment/metastore/client/methods.go | 14 ++- pkg/experiment/metastore/metastore.go | 70 ++++++----- .../metastore/metastore_read_index.go | 16 --- pkg/experiment/metastore/test/create.go | 6 +- pkg/phlare/modules.go | 8 +- pkg/phlare/modules_experimental.go | 44 +++---- pkg/phlare/phlare.go | 12 +- pkg/util/health/health.go | 41 +++++-- pkg/util/recovery.go | 2 +- 14 files changed, 267 insertions(+), 183 deletions(-) diff --git a/pkg/experiment/ingester/client/client.go b/pkg/experiment/ingester/client/client.go index 158ddec9f1..b125c932be 100644 --- a/pkg/experiment/ingester/client/client.go +++ b/pkg/experiment/ingester/client/client.go @@ -137,7 +137,10 @@ const grpcServiceConfig = `{ "methodConfig": [{ "name": [{"service": ""}], "retryPolicy": {} - }] + }], + "healthCheckConfig": { + "serviceName": "pyroscope.segment-writer" + } }` type Client struct { @@ -255,14 +258,16 @@ func (c *Client) pushToInstance( ctx context.Context, req *segmentwriterv1.PushRequest, addr string, -) (*segmentwriterv1.PushResponse, error) { +) (resp *segmentwriterv1.PushResponse, err error) { conn, err := c.pool.GetConnFor(addr) if err != nil { return nil, err } - c.metrics.sentBytes. - WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr). - Observe(float64(len(req.Profile))) + if err != nil { + c.metrics.sentBytes. + WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr). + Observe(float64(len(req.Profile))) + } return segmentwriterv1.NewSegmentWriterServiceClient(conn).Push(ctx, req) } @@ -294,8 +299,8 @@ func newConnPool( "segment-writer", ring_client.PoolConfig{ CheckInterval: poolCleanupPeriod, - // Note that no health checks are performed: it's caller - // responsibility to pick a healthy instance. + // Note that no health checks are not used: + // gGRPC health-checking is done at the gRPC connection level. HealthCheckEnabled: false, HealthCheckTimeout: 0, MaxConcurrentHealthChecks: 0, diff --git a/pkg/experiment/ingester/segment.go b/pkg/experiment/ingester/segment.go index b2daa102fe..bfdcba7fa9 100644 --- a/pkg/experiment/ingester/segment.go +++ b/pkg/experiment/ingester/segment.go @@ -34,17 +34,12 @@ var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ") type shardKey uint32 -type segmentWriterConfig struct { - segmentDuration time.Duration -} - type segmentsWriter struct { - segmentDuration time.Duration - limits Limits - - l log.Logger - bucket objstore.Bucket - metastoreClient metastorev1.MetastoreServiceClient + config Config + limits Limits + logger log.Logger + bucket objstore.Bucket + metastore metastorev1.MetastoreServiceClient shards map[shardKey]*shard shardsLock sync.RWMutex @@ -76,7 +71,7 @@ func (sh *shard) ingest(fn func(head segmentIngest)) segmentWaitFlushed { } func (sh *shard) loop(ctx context.Context) { - ticker := time.NewTicker(sh.sw.segmentDuration) + ticker := time.NewTicker(sh.sw.config.SegmentDuration) defer ticker.Stop() for { select { @@ -102,7 +97,7 @@ func (sh *shard) flushSegment(ctx context.Context) { err := s.flush(ctx) if err != nil { - _ = level.Error(sh.sw.l).Log("msg", "failed to flush segment", "err", err) + _ = level.Error(sh.sw.logger).Log("msg", "failed to flush segment", "err", err) } if s.debuginfo.movedHeads > 0 { _ = level.Debug(s.l).Log("msg", @@ -118,19 +113,19 @@ func (sh *shard) flushSegment(ctx context.Context) { }() } -func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, cfg segmentWriterConfig, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter { +func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, config Config, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter { ctx, cancelFunc := context.WithCancel(context.Background()) sw := &segmentsWriter{ - limits: limits, - metrics: metrics, - headMetrics: hm, - segmentDuration: cfg.segmentDuration, - l: l, - bucket: bucket, - shards: make(map[shardKey]*shard), - metastoreClient: metastoreClient, - cancel: cancelFunc, - cancelCtx: ctx, + limits: limits, + metrics: metrics, + headMetrics: hm, + config: config, + logger: l, + bucket: bucket, + shards: make(map[shardKey]*shard), + metastore: metastoreClient, + cancel: cancelFunc, + cancelCtx: ctx, } return sw @@ -158,20 +153,20 @@ func (sw *segmentsWriter) ingest(shard shardKey, fn func(head segmentIngest)) (a } func (sw *segmentsWriter) Stop() error { - sw.l.Log("msg", "stopping segments writer") + sw.logger.Log("msg", "stopping segments writer") sw.cancel() sw.shardsLock.Lock() defer sw.shardsLock.Unlock() for _, s := range sw.shards { s.wg.Wait() } - sw.l.Log("msg", "segments writer stopped") + sw.logger.Log("msg", "segments writer stopped") return nil } func (sw *segmentsWriter) newShard(sk shardKey) *shard { - sl := log.With(sw.l, "shard", fmt.Sprintf("%d", sk)) + sl := log.With(sw.logger, "shard", fmt.Sprintf("%d", sk)) sh := &shard{ sw: sw, l: sl, @@ -523,7 +518,7 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil { return err } - sw.l.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1)) + sw.logger.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1)) return nil } @@ -533,7 +528,7 @@ func (sw *segmentsWriter) storeMeta(ctx context.Context, meta *metastorev1.Block sw.metrics.storeMetaDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds()) s.debuginfo.storeMetaDuration = time.Since(t1) }() - _, err := sw.metastoreClient.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta}) + _, err := sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta}) if err != nil { sw.metrics.storeMetaErrors.WithLabelValues(s.sshard).Inc() } diff --git a/pkg/experiment/ingester/segment_test.go b/pkg/experiment/ingester/segment_test.go index fc6ee55a1f..97dd5e5e30 100644 --- a/pkg/experiment/ingester/segment_test.go +++ b/pkg/experiment/ingester/segment_test.go @@ -82,7 +82,7 @@ func TestSegmentIngest(t *testing.T) { } func ingestWithMetastoreAvailable(t *testing.T, chunks []inputChunk) { - sw := newTestSegmentWriter(t, defaultTestSegmentWriterConfig()) + sw := newTestSegmentWriter(t, defaultTestConfig()) defer sw.Stop() blocks := make(chan *metastorev1.BlockMeta, 128) @@ -106,7 +106,7 @@ func ingestWithMetastoreAvailable(t *testing.T, chunks []inputChunk) { } func ingestWithDLQ(t *testing.T, chunks []inputChunk) { - sw := newTestSegmentWriter(t, defaultTestSegmentWriterConfig()) + sw := newTestSegmentWriter(t, defaultTestConfig()) defer sw.Stop() sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("metastore unavailable")) @@ -124,8 +124,8 @@ func ingestWithDLQ(t *testing.T, chunks []inputChunk) { } func TestIngestWait(t *testing.T) { - sw := newTestSegmentWriter(t, segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + sw := newTestSegmentWriter(t, Config{ + SegmentDuration: 100 * time.Millisecond, }) defer sw.Stop() @@ -146,8 +146,8 @@ func TestIngestWait(t *testing.T) { func TestBusyIngestLoop(t *testing.T) { - sw := newTestSegmentWriter(t, segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + sw := newTestSegmentWriter(t, Config{ + SegmentDuration: 100 * time.Millisecond, }) defer sw.Stop() @@ -247,8 +247,8 @@ func TestDLQFail(t *testing.T) { l, newSegmentMetrics(nil), memdb.NewHeadMetricsWithPrefix(nil, ""), - segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + Config{ + SegmentDuration: 100 * time.Millisecond, }, validation.MockDefaultOverrides(), bucket, @@ -292,8 +292,8 @@ func TestDatasetMinMaxTime(t *testing.T) { l, newSegmentMetrics(nil), memdb.NewHeadMetricsWithPrefix(nil, ""), - segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + Config{ + SegmentDuration: 100 * time.Millisecond, }, validation.MockDefaultOverrides(), bucket, @@ -333,8 +333,8 @@ func TestDatasetMinMaxTime(t *testing.T) { func TestQueryMultipleSeriesSingleTenant(t *testing.T) { metas := make(chan *metastorev1.BlockMeta, 1) - sw := newTestSegmentWriter(t, segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + sw := newTestSegmentWriter(t, Config{ + SegmentDuration: 100 * time.Millisecond, }) defer sw.Stop() sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). @@ -382,8 +382,8 @@ func TestDLQRecoveryMock(t *testing.T) { {shard: 1, tenant: "tb", profile: cpuProfile(42, 239, "svc1", "kek", "foo", "bar")}, }) - sw := newTestSegmentWriter(t, segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + sw := newTestSegmentWriter(t, Config{ + SegmentDuration: 100 * time.Millisecond, }) sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("mock metastore unavailable")) @@ -422,8 +422,8 @@ func TestDLQRecovery(t *testing.T) { {shard: 1, tenant: tenant, profile: cpuProfile(42, int(ts), "svc1", "kek", "foo", "bar")}, }) - sw := newTestSegmentWriter(t, segmentWriterConfig{ - segmentDuration: 100 * time.Millisecond, + sw := newTestSegmentWriter(t, Config{ + SegmentDuration: 100 * time.Millisecond, }) sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("mock metastore unavailable")) @@ -471,7 +471,7 @@ type sw struct { queryNo int } -func newTestSegmentWriter(t *testing.T, cfg segmentWriterConfig) sw { +func newTestSegmentWriter(t *testing.T, cfg Config) sw { l := testutil.NewLogger(t) bucket := memory.NewInMemBucket() client := mockmetastorev1.NewMockMetastoreServiceClient(t) @@ -492,9 +492,9 @@ func newTestSegmentWriter(t *testing.T, cfg segmentWriterConfig) sw { } } -func defaultTestSegmentWriterConfig() segmentWriterConfig { - return segmentWriterConfig{ - segmentDuration: 1 * time.Second, +func defaultTestConfig() Config { + return Config{ + SegmentDuration: 1 * time.Second, } } diff --git a/pkg/experiment/ingester/service.go b/pkg/experiment/ingester/service.go index 659c17d520..debbcd3fdc 100644 --- a/pkg/experiment/ingester/service.go +++ b/pkg/experiment/ingester/service.go @@ -5,6 +5,7 @@ import ( "errors" "flag" "fmt" + "sync" "time" "github.com/go-kit/log" @@ -27,6 +28,7 @@ import ( "github.com/grafana/pyroscope/pkg/pprof" "github.com/grafana/pyroscope/pkg/tenant" "github.com/grafana/pyroscope/pkg/util" + "github.com/grafana/pyroscope/pkg/util/health" "github.com/grafana/pyroscope/pkg/validation" ) @@ -38,8 +40,7 @@ const ( type Config struct { GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the segment writer."` LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` - SegmentDuration time.Duration `yaml:"segmentDuration,omitempty"` - Async bool `yaml:"async,omitempty"` //todo make it pertenant + SegmentDuration time.Duration `yaml:"segment_duration,omitempty"` } // RegisterFlags registers the flags. @@ -47,8 +48,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { const prefix = "segment-writer" cfg.GRPCClientConfig.RegisterFlagsWithPrefix(prefix, f) cfg.LifecyclerConfig.RegisterFlagsWithPrefix(prefix+".", f, util.Logger) - f.DurationVar(&cfg.SegmentDuration, prefix+".segment.duration", 500*time.Millisecond, "Timeout when flushing segments to bucket.") - f.BoolVar(&cfg.Async, prefix+".async", false, "Enable async mode for segment writer.") + f.DurationVar(&cfg.SegmentDuration, prefix+".segment-duration", 500*time.Millisecond, "Timeout when flushing segments to bucket.") } func (cfg *Config) Validate() error { @@ -68,11 +68,13 @@ type SegmentWriterService struct { services.Service segmentwriterv1.UnimplementedSegmentWriterServiceServer - cfg Config + config Config dbConfig phlaredb.Config logger log.Logger reg prometheus.Registerer + health health.Service + requests requests lifecycler *ring.Lifecycler subservices *services.Manager subservicesWatcher *services.FailureWatcher @@ -94,22 +96,24 @@ func (i *ingesterFlusherCompat) Flush() { func New( reg prometheus.Registerer, - log log.Logger, - cfg Config, - lim Limits, + logger log.Logger, + config Config, + limits Limits, + health health.Service, storageBucket phlareobj.Bucket, metastoreClient *metastoreclient.Client, ) (*SegmentWriterService, error) { i := &SegmentWriterService{ - cfg: cfg, - logger: log, + config: config, + logger: logger, reg: reg, + health: health, storageBucket: storageBucket, } var err error i.lifecycler, err = ring.NewLifecycler( - cfg.LifecyclerConfig, + config.LifecyclerConfig, &ingesterFlusherCompat{i}, RingName, RingKey, @@ -129,10 +133,9 @@ func New( if metastoreClient == nil { return nil, errors.New("metastore client is required for segment writer") } - segmentMetrics := newSegmentMetrics(i.reg) + metrics := newSegmentMetrics(i.reg) headMetrics := memdb.NewHeadMetricsWithPrefix(reg, "pyroscope_segment_writer") - config := segmentWriterConfig{segmentDuration: cfg.SegmentDuration} - i.segmentWriter = newSegmentWriter(i.logger, segmentMetrics, headMetrics, config, lim, storageBucket, metastoreClient) + i.segmentWriter = newSegmentWriter(i.logger, metrics, headMetrics, config, limits, storageBucket, metastoreClient) i.subservicesWatcher = services.NewFailureWatcher() i.subservicesWatcher.WatchManager(i.subservices) i.Service = services.NewBasicService(i.starting, i.running, i.stopping) @@ -140,7 +143,17 @@ func New( } func (i *SegmentWriterService) starting(ctx context.Context) error { - return services.StartManagerAndAwaitHealthy(ctx, i.subservices) + if err := services.StartManagerAndAwaitHealthy(ctx, i.subservices); err != nil { + return err + } + // The instance is ready to handle incoming requests. + // We do not have to wait for the lifecycler: its readiness check + // is used to rate limit the number of instances that can be coming + // or going at any one time, by only returning true if all instances + // are active. + i.requests.open() + i.health.SetServing() + return nil } func (i *SegmentWriterService) running(ctx context.Context) error { @@ -153,6 +166,8 @@ func (i *SegmentWriterService) running(ctx context.Context) error { } func (i *SegmentWriterService) stopping(_ error) error { + i.health.SetNotServing() + i.requests.drain() errs := multierror.New() errs.Add(services.StopManagerAndAwaitStopped(context.Background(), i.subservices)) errs.Add(i.segmentWriter.Stop()) @@ -160,6 +175,11 @@ func (i *SegmentWriterService) stopping(_ error) error { } func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.PushRequest) (*segmentwriterv1.PushResponse, error) { + if !i.requests.add() { + return nil, status.Error(codes.Unavailable, "service is unavailable") + } else { + defer i.requests.done() + } if req.TenantId == "" { return nil, status.Error(codes.InvalidArgument, tenant.ErrNoTenantID.Error()) } @@ -175,9 +195,6 @@ func (i *SegmentWriterService) Push(ctx context.Context, req *segmentwriterv1.Pu wait := i.segmentWriter.ingest(shardKey(req.Shard), func(segment segmentIngest) { segment.ingest(ctx, req.TenantId, p.Profile, id, req.Labels) }) - if i.cfg.Async { - return &segmentwriterv1.PushResponse{}, nil - } flushStarted := time.Now() defer func() { @@ -217,7 +234,7 @@ func (i *SegmentWriterService) TransferOut(ctx context.Context) error { return ring.ErrTransferDisabled } -// CheckReady is used to indicate to k8s when the ingesters are ready for +// CheckReady is used to indicate when the ingesters are ready for // the addition removal of another ingester. Returns 204 when the ingester is // ready, 500 otherwise. func (i *SegmentWriterService) CheckReady(ctx context.Context) error { @@ -226,3 +243,59 @@ func (i *SegmentWriterService) CheckReady(ctx context.Context) error { } return i.lifecycler.CheckReady(ctx) } + +// The requests utility emerged due to the need to handle request draining at +// the service level. +// +// Ideally, this should be the responsibility of the server using the service. +// However, since the server is a dependency of the service and is only +// shut down after the service is stopped, requests may still arrive +// after the Stop call. This issue arises from how we initialize modules. +// +// In other scenarios, request draining could be managed at a higher level, +// such as in a load balancer or service discovery mechanism. The goal would +// be to stop routing requests to an instance that is about to shut down. +// +// In our case, segment writer service instances are not directly exposed to +// the outside world but are discoverable via the ring (see lifecycler). +// There's no _reliable_ mechanism to ensure that all the ring members are +// aware of fact that the instance is leaving, so requests may continue to +// arrive within a short period of time, until the membership state converges. +type requests struct { + mu sync.RWMutex + wg sync.WaitGroup + allowed bool // Indicates if new requests are allowed +} + +// open allows new requests to be accepted. +func (r *requests) open() { + r.mu.Lock() + r.allowed = true + r.mu.Unlock() +} + +// add increments the WaitGroup if new requests are allowed. +// Returns true if the request was accepted, false otherwise. +func (r *requests) add() bool { + r.mu.RLock() + defer r.mu.RUnlock() + if !r.allowed { + return false + } + r.wg.Add(1) + return true +} + +// done decrements the WaitGroup, indicating a request has completed. +func (r *requests) done() { + r.wg.Done() +} + +// drain prevents new requests from being accepted and waits for +// all ongoing requests to complete. +func (r *requests) drain() { + r.mu.Lock() + r.allowed = false + r.mu.Unlock() + r.wg.Wait() +} diff --git a/pkg/experiment/metastore/client/client.go b/pkg/experiment/metastore/client/client.go index 8a30fbc008..1ee80a961e 100644 --- a/pkg/experiment/metastore/client/client.go +++ b/pkg/experiment/metastore/client/client.go @@ -3,12 +3,14 @@ package metastoreclient import ( "context" "fmt" + "io" + "sync" + "github.com/go-kit/log" - "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" - "io" - "sync" + + "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/services" @@ -162,8 +164,14 @@ func dial(address string, grpcClientConfig grpcclient.Config, _ log.Logger) (*gr } // TODO: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto options = append(options, - //grpc.WithDefaultServiceConfig(grpcServiceConfig), + grpc.WithDefaultServiceConfig(grpcServiceConfig), grpc.WithUnaryInterceptor(otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())), ) return grpc.Dial(address, options...) } + +const grpcServiceConfig = `{ + "healthCheckConfig": { + "serviceName": "pyroscope.metastore" + } +}` diff --git a/pkg/experiment/metastore/client/methods.go b/pkg/experiment/metastore/client/methods.go index c644d5d461..29d5b9df8f 100644 --- a/pkg/experiment/metastore/client/methods.go +++ b/pkg/experiment/metastore/client/methods.go @@ -3,15 +3,17 @@ package metastoreclient import ( "context" "fmt" - compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" - metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" - typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" + "math/rand" + "time" + "github.com/hashicorp/raft" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "math/rand" - "time" + + compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1" + metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" + typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1" ) func invoke[R any](ctx context.Context, cl *Client, @@ -19,7 +21,7 @@ func invoke[R any](ctx context.Context, cl *Client, ) (*R, error) { const ( n = 50 - backoff = 11 * time.Millisecond + backoff = 51 * time.Millisecond deadline = 500000000 * time.Millisecond ) diff --git a/pkg/experiment/metastore/metastore.go b/pkg/experiment/metastore/metastore.go index 8936c28021..8d7f6d28bc 100644 --- a/pkg/experiment/metastore/metastore.go +++ b/pkg/experiment/metastore/metastore.go @@ -8,7 +8,6 @@ import ( "os" "path/filepath" "strings" - "sync" "time" "github.com/go-kit/log" @@ -27,10 +26,10 @@ import ( metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" adaptiveplacement "github.com/grafana/pyroscope/pkg/experiment/distributor/placement/adaptive_placement" "github.com/grafana/pyroscope/pkg/experiment/metastore/blockcleaner" - metastoreclient "github.com/grafana/pyroscope/pkg/experiment/metastore/client" "github.com/grafana/pyroscope/pkg/experiment/metastore/dlq" "github.com/grafana/pyroscope/pkg/experiment/metastore/index" "github.com/grafana/pyroscope/pkg/experiment/metastore/raftleader" + "github.com/grafana/pyroscope/pkg/util/health" ) const ( @@ -113,10 +112,11 @@ type Metastore struct { metastorev1.OperatorServiceServer compactorv1.CompactionPlannerServer - config Config - logger log.Logger - reg prometheus.Registerer - bucket objstore.Bucket + config Config + logger log.Logger + reg prometheus.Registerer + metrics *metastoreMetrics + health health.Service // In-memory state. state *metastoreState @@ -137,12 +137,8 @@ type Metastore struct { walDir string - metrics *metastoreMetrics - client *metastoreclient.Client - - readyOnce sync.Once - readySince time.Time - + bucket objstore.Bucket + client metastorev1.MetastoreServiceClient placementMgr *adaptiveplacement.Manager dnsProvider *dns.Provider dlq *dlq.Recovery @@ -153,7 +149,8 @@ func New( config Config, logger log.Logger, reg prometheus.Registerer, - client *metastoreclient.Client, + healthService health.Service, + client metastorev1.MetastoreServiceClient, bucket objstore.Bucket, placementMgr *adaptiveplacement.Manager, ) (*Metastore, error) { @@ -162,9 +159,10 @@ func New( config: config, logger: logger, reg: reg, + metrics: metrics, + health: healthService, bucket: bucket, db: newDB(config, logger, metrics), - metrics: metrics, client: client, placementMgr: placementMgr, } @@ -211,6 +209,7 @@ func (m *Metastore) stopping(_ error) error { } func (m *Metastore) running(ctx context.Context) error { + m.health.SetServing() <-ctx.Done() return nil } @@ -308,22 +307,22 @@ func (m *Metastore) createRaftDirs() (err error) { func (m *Metastore) shutdownRaft() { if m.raft != nil { - // If raft has been initialized, try to transfer leadership. - // Only after this we remove the leader health observer and - // shutdown the raft. - // There is a chance that client will still be trying to connect - // to this instance, therefore retrying is still required. - if err := m.raft.LeadershipTransfer().Error(); err != nil { - switch { - case errors.Is(err, raft.ErrNotLeader): - // Not a leader, nothing to do. - case strings.Contains(err.Error(), "cannot find peer"): - // It's likely that there's just one node in the cluster. - default: - _ = level.Error(m.logger).Log("msg", "failed to transfer leadership", "err", err) - } - } m.leaderhealth.Deregister() + // We let clients observe the leadership transfer: it's their + // responsibility to connect to the new leader. We only need to + // make sure that any error returned to clients includes details + // about the raft leader, if applicable. + if err := m.TransferLeadership(); err == nil { + // We were the leader and managed to transfer leadership. + // Wait a bit to let the new leader settle. + _ = level.Info(m.logger).Log("msg", "waiting for leadership transfer to complete") + // TODO(kolesnikovae): Wait until ReadIndex of + // the new leader catches up the local CommitIndex. + time.Sleep(m.config.MinReadyDuration) + } + // Tell clients to stop sending requests to this node. + // There are no any guarantees that clients will see or obey this. + m.health.SetNotServing() if err := m.raft.Shutdown().Error(); err != nil { _ = level.Error(m.logger).Log("msg", "failed to shutdown raft", "err", err) } @@ -339,3 +338,16 @@ func (m *Metastore) shutdownRaft() { } } } + +func (m *Metastore) TransferLeadership() (err error) { + switch err = m.raft.LeadershipTransfer().Error(); { + case err == nil: + case errors.Is(err, raft.ErrNotLeader): + // Not a leader, nothing to do. + case strings.Contains(err.Error(), "cannot find peer"): + // No peers, nothing to do. + default: + _ = level.Error(m.logger).Log("msg", "failed to transfer leadership", "err", err) + } + return err +} diff --git a/pkg/experiment/metastore/metastore_read_index.go b/pkg/experiment/metastore/metastore_read_index.go index f3189f1ea4..35d83b177d 100644 --- a/pkg/experiment/metastore/metastore_read_index.go +++ b/pkg/experiment/metastore/metastore_read_index.go @@ -2,7 +2,6 @@ package metastore import ( "context" - "fmt" "time" metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1" @@ -45,18 +44,3 @@ func (m *Metastore) waitLeaderCommitIndexAppliedLocally(ctx context.Context) err } } } - -// CheckReady verifies if the metastore is ready to serve requests by -// ensuring the node is up-to-date with the leader's commit index. -func (m *Metastore) CheckReady(ctx context.Context) error { - if err := m.waitLeaderCommitIndexAppliedLocally(ctx); err != nil { - return err - } - m.readyOnce.Do(func() { - m.readySince = time.Now() - }) - if w := m.config.MinReadyDuration - time.Since(m.readySince); w > 0 { - return fmt.Errorf("%v before reporting readiness", w) - } - return nil -} diff --git a/pkg/experiment/metastore/test/create.go b/pkg/experiment/metastore/test/create.go index f91da7caab..016df9f4bb 100644 --- a/pkg/experiment/metastore/test/create.go +++ b/pkg/experiment/metastore/test/create.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" "github.com/grafana/pyroscope/pkg/test" "github.com/grafana/pyroscope/pkg/test/mocks/mockdiscovery" + "github.com/grafana/pyroscope/pkg/util/health" "github.com/grafana/pyroscope/pkg/validation" "github.com/hashicorp/raft" @@ -96,7 +97,7 @@ func NewMetastoreSet(t *testing.T, cfg *metastore.Config, n int, bucket objstore validation.MockDefaultOverrides(), adaptive_placement.NewStore(bucket), ) - m, err := metastore.New(configs[i], logger, registry, client, bucket, placementManager) + m, err := metastore.New(configs[i], logger, registry, health.NoOpService, client, bucket, placementManager) require.NoError(t, err) metastorev1.RegisterMetastoreServiceServer(server, m) compactorv1.RegisterCompactionPlannerServer(server, m) @@ -123,9 +124,6 @@ func NewMetastoreSet(t *testing.T, cfg *metastore.Config, n int, bucket objstore if res.Instances[i].Metastore.Service().State() != services.Running { return false } - if res.Instances[i].Metastore.CheckReady(context.Background()) != nil { - return false - } } return true }, 10*time.Second, 100*time.Millisecond) diff --git a/pkg/phlare/modules.go b/pkg/phlare/modules.go index 32f31c2178..581f52b9fa 100644 --- a/pkg/phlare/modules.go +++ b/pkg/phlare/modules.go @@ -30,6 +30,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/h2c" "google.golang.org/genproto/googleapis/api/httpbody" + "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/protobuf/encoding/protojson" "gopkg.in/yaml.v3" @@ -99,7 +100,7 @@ const ( CompactionWorker string = "compaction-worker" PlacementAgent string = "placement-agent" PlacementManager string = "placement-manager" - ShutdownHelper string = "shutdown-helper" + HealthServer string = "health-server" ) var objectStoreTypeStats = usagestats.NewString("store_object_type") @@ -503,7 +504,7 @@ func (f *Phlare) initServer() (services.Service, error) { // see https://github.com/grafana/pyroscope/issues/231 f.Cfg.Server.DoNotAddDefaultHTTPMiddleware = true f.Cfg.Server.ExcludeRequestInLog = true // gRPC-specific. - f.Cfg.Server.GRPCMiddleware = append(f.Cfg.Server.GRPCMiddleware, util.GRPCRecoveryInterceptor) + f.Cfg.Server.GRPCMiddleware = append(f.Cfg.Server.GRPCMiddleware, util.RecoveryInterceptorGRPC) f.setupWorkerTimeout() if f.isModuleActive(QueryScheduler) { @@ -517,6 +518,9 @@ func (f *Phlare) initServer() (services.Service, error) { } f.Server = serv + if f.Cfg.v2Experiment { + grpc_health_v1.RegisterHealthServer(f.Server.GRPC, f.healthServer) + } servicesToWaitFor := func() []services.Service { svs := []services.Service(nil) diff --git a/pkg/phlare/modules_experimental.go b/pkg/phlare/modules_experimental.go index b2fdf9a538..db48b28488 100644 --- a/pkg/phlare/modules_experimental.go +++ b/pkg/phlare/modules_experimental.go @@ -5,10 +5,10 @@ import ( "slices" "github.com/go-kit/log" - "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + grpchealth "google.golang.org/grpc/health" compactionworker "github.com/grafana/pyroscope/pkg/experiment/compactor" adaptiveplacement "github.com/grafana/pyroscope/pkg/experiment/distributor/placement/adaptive_placement" @@ -19,6 +19,7 @@ import ( "github.com/grafana/pyroscope/pkg/experiment/metastore/discovery" querybackend "github.com/grafana/pyroscope/pkg/experiment/query_backend" querybackendclient "github.com/grafana/pyroscope/pkg/experiment/query_backend/client" + "github.com/grafana/pyroscope/pkg/util/health" ) func (f *Phlare) initSegmentWriterRing() (_ services.Service, err error) { @@ -45,17 +46,22 @@ func (f *Phlare) initSegmentWriter() (services.Service, error) { if err := f.Cfg.SegmentWriter.Validate(); err != nil { return nil, err } + + logger := log.With(f.logger, "component", "segment-writer") + healthService := health.NewGRPCHealthService(f.healthServer, logger, "pyroscope.segment-writer") segmentWriter, err := segmentwriter.New( f.reg, - f.logger, + logger, f.Cfg.SegmentWriter, f.Overrides, + healthService, f.storageBucket, f.metastoreClient, ) if err != nil { return nil, err } + f.segmentWriter = segmentWriter f.API.RegisterSegmentWriter(segmentWriter) return f.segmentWriter, nil @@ -101,11 +107,14 @@ func (f *Phlare) initMetastore() (services.Service, error) { if err := f.Cfg.Metastore.Validate(); err != nil { return nil, err } + logger := log.With(f.logger, "component", "metastore") + healthService := health.NewGRPCHealthService(f.healthServer, logger, "pyroscope.metastore") m, err := metastore.New( f.Cfg.Metastore, logger, f.reg, + healthService, f.metastoreClient, f.storageBucket, f.placementManager, @@ -113,6 +122,7 @@ func (f *Phlare) initMetastore() (services.Service, error) { if err != nil { return nil, err } + f.API.RegisterMetastore(m) f.metastore = m return m.Service(), nil @@ -200,31 +210,7 @@ func (f *Phlare) adaptivePlacementStore() adaptiveplacement.Store { return adaptiveplacement.NewStore(f.storageBucket) } -// The shutdown helper utility emerged due to the need to handle request -// draining at the server level. -// -// Since the server is a dependency of many services that handle requests -// and is only shut down after the services have stopped, there's a possibility -// that a de-initialized component may receive requests, which causes undefined -// behaviour. -// -// In other scenarios, request draining could be managed at a higher level, -// such as in a load balancer or the service discovery mechanism. However, -// there's no _reliable_ mechanism to ensure that all the clients are informed -// of the server's shutdown and confirmed that they have stopped sending -// requests to this specific instance. -// -// The helper should be de-initialized first in the dependency chain; -// immediately, it drains the gRPC server, thereby preventing any further -// requests from being processed. THe helper does not affect the HTTP -// server that serves metrics and profiles. -func (f *Phlare) initShutdownHelper() (services.Service, error) { - shutdownServer := func(error) error { - if f.Server.GRPC != nil { - level.Info(f.logger).Log("msg", "shutting down gRPC server") - f.Server.GRPC.GracefulStop() - } - return nil - } - return services.NewIdleService(nil, shutdownServer), nil +func (f *Phlare) initHealthServer() (services.Service, error) { + f.healthServer = grpchealth.NewServer() + return nil, nil } diff --git a/pkg/phlare/phlare.go b/pkg/phlare/phlare.go index 3b5736612e..c558b4f34b 100644 --- a/pkg/phlare/phlare.go +++ b/pkg/phlare/phlare.go @@ -37,6 +37,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" "github.com/samber/lo" + "google.golang.org/grpc/health" "github.com/grafana/pyroscope/pkg/api" apiversion "github.com/grafana/pyroscope/pkg/api/version" @@ -306,6 +307,7 @@ type Phlare struct { metastoreClient *metastoreclient.Client queryBackendClient *querybackendclient.Client compactionWorker *compactionworker.Worker + healthServer *health.Server } func New(cfg Config) (*Phlare, error) { @@ -409,21 +411,21 @@ func (f *Phlare) setupModuleManager() error { experimentalModules := map[string][]string{ SegmentWriter: {Overrides, API, MemberlistKV, Storage, UsageReport, MetastoreClient}, Metastore: {Overrides, API, MetastoreClient, Storage, PlacementManager}, - CompactionWorker: {Overrides, API, Storage, Overrides, MetastoreClient}, - QueryBackend: {Overrides, API, Storage, Overrides, QueryBackendClient}, + CompactionWorker: {Overrides, API, Storage, MetastoreClient}, + QueryBackend: {Overrides, API, Storage, QueryBackendClient}, SegmentWriterRing: {Overrides, API, MemberlistKV}, SegmentWriterClient: {Overrides, API, SegmentWriterRing, PlacementAgent}, PlacementAgent: {Overrides, API, Storage}, PlacementManager: {Overrides, API, Storage}, - ShutdownHelper: {Distributor, SegmentWriter, Metastore, QueryBackend}, } for k, v := range experimentalModules { deps[k] = v } - deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend, ShutdownHelper) + deps[All] = append(deps[All], SegmentWriter, Metastore, CompactionWorker, QueryBackend) deps[QueryFrontend] = append(deps[QueryFrontend], MetastoreClient, QueryBackendClient) deps[Distributor] = append(deps[Distributor], SegmentWriterClient) + deps[Server] = append(deps[Server], HealthServer) mm.RegisterModule(SegmentWriter, f.initSegmentWriter) mm.RegisterModule(Metastore, f.initMetastore) @@ -436,7 +438,7 @@ func (f *Phlare) setupModuleManager() error { mm.RegisterModule(QueryBackendClient, f.initQueryBackendClient, modules.UserInvisibleModule) mm.RegisterModule(PlacementAgent, f.initPlacementAgent, modules.UserInvisibleModule) mm.RegisterModule(PlacementManager, f.initPlacementManager, modules.UserInvisibleModule) - mm.RegisterModule(ShutdownHelper, f.initShutdownHelper, modules.UserInvisibleModule) + mm.RegisterModule(HealthServer, f.initHealthServer, modules.UserInvisibleModule) } for mod, targets := range deps { diff --git a/pkg/util/health/health.go b/pkg/util/health/health.go index da9d7572c6..c8bd47d903 100644 --- a/pkg/util/health/health.go +++ b/pkg/util/health/health.go @@ -3,7 +3,8 @@ package health import ( "context" - "github.com/grafana/dskit/services" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -11,29 +12,43 @@ import ( ) type Service interface { - SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus) + SetServing() + SetNotServing() } type noopService struct{} var NoOpService = noopService{} -func (noopService) SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus) {} +func (noopService) SetServing() {} -func NewGRPCHealthService() *GRPCHealthService { - s := health.NewServer() +func (noopService) SetNotServing() {} + +type GRPCHealthService struct { + logger log.Logger + name string + server *health.Server +} + +func NewGRPCHealthService(server *health.Server, logger log.Logger, name string) *GRPCHealthService { return &GRPCHealthService{ - Server: s, - Service: services.NewIdleService(nil, func(error) error { - s.Shutdown() - return nil - }), + logger: logger, + name: name, + server: server, } } -type GRPCHealthService struct { - services.Service - *health.Server +func (s *GRPCHealthService) setStatus(x grpc_health_v1.HealthCheckResponse_ServingStatus) { + level.Info(s.logger).Log("msg", "setting health status", "status", x) + s.server.SetServingStatus(s.name, x) +} + +func (s *GRPCHealthService) SetServing() { + s.setStatus(grpc_health_v1.HealthCheckResponse_SERVING) +} + +func (s *GRPCHealthService) SetNotServing() { + s.setStatus(grpc_health_v1.HealthCheckResponse_NOT_SERVING) } var NoOpClient = noOpClient{} diff --git a/pkg/util/recovery.go b/pkg/util/recovery.go index 3ea7840b6a..80288fd1ee 100644 --- a/pkg/util/recovery.go +++ b/pkg/util/recovery.go @@ -38,7 +38,7 @@ var ( }) RecoveryInterceptor recoveryInterceptor - GRPCRecoveryInterceptor = grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(PanicError)) + RecoveryInterceptorGRPC = grpc_recovery.UnaryServerInterceptor(grpc_recovery.WithRecoveryHandler(PanicError)) ) func PanicError(p interface{}) error {