Skip to content

Commit

Permalink
feat(v2): revive grpc health checks
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Oct 22, 2024
1 parent ec5e5ba commit 11f619c
Show file tree
Hide file tree
Showing 14 changed files with 267 additions and 183 deletions.
19 changes: 12 additions & 7 deletions pkg/experiment/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,10 @@ const grpcServiceConfig = `{
"methodConfig": [{
"name": [{"service": ""}],
"retryPolicy": {}
}]
}],
"healthCheckConfig": {
"serviceName": "pyroscope.segment-writer"
}
}`

type Client struct {
Expand Down Expand Up @@ -255,14 +258,16 @@ func (c *Client) pushToInstance(
ctx context.Context,
req *segmentwriterv1.PushRequest,
addr string,
) (*segmentwriterv1.PushResponse, error) {
) (resp *segmentwriterv1.PushResponse, err error) {
conn, err := c.pool.GetConnFor(addr)
if err != nil {
return nil, err
}
c.metrics.sentBytes.
WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr).
Observe(float64(len(req.Profile)))
if err != nil {
c.metrics.sentBytes.
WithLabelValues(strconv.Itoa(int(req.Shard)), req.TenantId, addr).
Observe(float64(len(req.Profile)))
}
return segmentwriterv1.NewSegmentWriterServiceClient(conn).Push(ctx, req)
}

Expand Down Expand Up @@ -294,8 +299,8 @@ func newConnPool(
"segment-writer",
ring_client.PoolConfig{
CheckInterval: poolCleanupPeriod,
// Note that no health checks are performed: it's caller
// responsibility to pick a healthy instance.
// Note that no health checks are not used:
// gGRPC health-checking is done at the gRPC connection level.
HealthCheckEnabled: false,
HealthCheckTimeout: 0,
MaxConcurrentHealthChecks: 0,
Expand Down
51 changes: 23 additions & 28 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,12 @@ var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ")

type shardKey uint32

type segmentWriterConfig struct {
segmentDuration time.Duration
}

type segmentsWriter struct {
segmentDuration time.Duration
limits Limits

l log.Logger
bucket objstore.Bucket
metastoreClient metastorev1.MetastoreServiceClient
config Config
limits Limits
logger log.Logger
bucket objstore.Bucket
metastore metastorev1.MetastoreServiceClient

shards map[shardKey]*shard
shardsLock sync.RWMutex
Expand Down Expand Up @@ -76,7 +71,7 @@ func (sh *shard) ingest(fn func(head segmentIngest)) segmentWaitFlushed {
}

func (sh *shard) loop(ctx context.Context) {
ticker := time.NewTicker(sh.sw.segmentDuration)
ticker := time.NewTicker(sh.sw.config.SegmentDuration)
defer ticker.Stop()
for {
select {
Expand All @@ -102,7 +97,7 @@ func (sh *shard) flushSegment(ctx context.Context) {

err := s.flush(ctx)
if err != nil {
_ = level.Error(sh.sw.l).Log("msg", "failed to flush segment", "err", err)
_ = level.Error(sh.sw.logger).Log("msg", "failed to flush segment", "err", err)
}
if s.debuginfo.movedHeads > 0 {
_ = level.Debug(s.l).Log("msg",
Expand All @@ -118,19 +113,19 @@ func (sh *shard) flushSegment(ctx context.Context) {
}()
}

func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, cfg segmentWriterConfig, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter {
func newSegmentWriter(l log.Logger, metrics *segmentMetrics, hm *memdb.HeadMetrics, config Config, limits Limits, bucket objstore.Bucket, metastoreClient metastorev1.MetastoreServiceClient) *segmentsWriter {
ctx, cancelFunc := context.WithCancel(context.Background())
sw := &segmentsWriter{
limits: limits,
metrics: metrics,
headMetrics: hm,
segmentDuration: cfg.segmentDuration,
l: l,
bucket: bucket,
shards: make(map[shardKey]*shard),
metastoreClient: metastoreClient,
cancel: cancelFunc,
cancelCtx: ctx,
limits: limits,
metrics: metrics,
headMetrics: hm,
config: config,
logger: l,
bucket: bucket,
shards: make(map[shardKey]*shard),
metastore: metastoreClient,
cancel: cancelFunc,
cancelCtx: ctx,
}

return sw
Expand Down Expand Up @@ -158,20 +153,20 @@ func (sw *segmentsWriter) ingest(shard shardKey, fn func(head segmentIngest)) (a
}

func (sw *segmentsWriter) Stop() error {
sw.l.Log("msg", "stopping segments writer")
sw.logger.Log("msg", "stopping segments writer")
sw.cancel()
sw.shardsLock.Lock()
defer sw.shardsLock.Unlock()
for _, s := range sw.shards {
s.wg.Wait()
}
sw.l.Log("msg", "segments writer stopped")
sw.logger.Log("msg", "segments writer stopped")

return nil
}

func (sw *segmentsWriter) newShard(sk shardKey) *shard {
sl := log.With(sw.l, "shard", fmt.Sprintf("%d", sk))
sl := log.With(sw.logger, "shard", fmt.Sprintf("%d", sk))
sh := &shard{
sw: sw,
l: sl,
Expand Down Expand Up @@ -523,7 +518,7 @@ func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, met
if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil {
return err
}
sw.l.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))
sw.logger.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))
return nil
}

Expand All @@ -533,7 +528,7 @@ func (sw *segmentsWriter) storeMeta(ctx context.Context, meta *metastorev1.Block
sw.metrics.storeMetaDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds())
s.debuginfo.storeMetaDuration = time.Since(t1)
}()
_, err := sw.metastoreClient.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta})
_, err := sw.metastore.AddBlock(ctx, &metastorev1.AddBlockRequest{Block: meta})
if err != nil {
sw.metrics.storeMetaErrors.WithLabelValues(s.sshard).Inc()
}
Expand Down
40 changes: 20 additions & 20 deletions pkg/experiment/ingester/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestSegmentIngest(t *testing.T) {
}

func ingestWithMetastoreAvailable(t *testing.T, chunks []inputChunk) {
sw := newTestSegmentWriter(t, defaultTestSegmentWriterConfig())
sw := newTestSegmentWriter(t, defaultTestConfig())
defer sw.Stop()
blocks := make(chan *metastorev1.BlockMeta, 128)

Expand All @@ -106,7 +106,7 @@ func ingestWithMetastoreAvailable(t *testing.T, chunks []inputChunk) {
}

func ingestWithDLQ(t *testing.T, chunks []inputChunk) {
sw := newTestSegmentWriter(t, defaultTestSegmentWriterConfig())
sw := newTestSegmentWriter(t, defaultTestConfig())
defer sw.Stop()
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("metastore unavailable"))
Expand All @@ -124,8 +124,8 @@ func ingestWithDLQ(t *testing.T, chunks []inputChunk) {
}

func TestIngestWait(t *testing.T) {
sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
sw := newTestSegmentWriter(t, Config{
SegmentDuration: 100 * time.Millisecond,
})

defer sw.Stop()
Expand All @@ -146,8 +146,8 @@ func TestIngestWait(t *testing.T) {

func TestBusyIngestLoop(t *testing.T) {

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
sw := newTestSegmentWriter(t, Config{
SegmentDuration: 100 * time.Millisecond,
})
defer sw.Stop()

Expand Down Expand Up @@ -247,8 +247,8 @@ func TestDLQFail(t *testing.T) {
l,
newSegmentMetrics(nil),
memdb.NewHeadMetricsWithPrefix(nil, ""),
segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
Config{
SegmentDuration: 100 * time.Millisecond,
},
validation.MockDefaultOverrides(),
bucket,
Expand Down Expand Up @@ -292,8 +292,8 @@ func TestDatasetMinMaxTime(t *testing.T) {
l,
newSegmentMetrics(nil),
memdb.NewHeadMetricsWithPrefix(nil, ""),
segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
Config{
SegmentDuration: 100 * time.Millisecond,
},
validation.MockDefaultOverrides(),
bucket,
Expand Down Expand Up @@ -333,8 +333,8 @@ func TestDatasetMinMaxTime(t *testing.T) {
func TestQueryMultipleSeriesSingleTenant(t *testing.T) {
metas := make(chan *metastorev1.BlockMeta, 1)

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
sw := newTestSegmentWriter(t, Config{
SegmentDuration: 100 * time.Millisecond,
})
defer sw.Stop()
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Expand Down Expand Up @@ -382,8 +382,8 @@ func TestDLQRecoveryMock(t *testing.T) {
{shard: 1, tenant: "tb", profile: cpuProfile(42, 239, "svc1", "kek", "foo", "bar")},
})

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
sw := newTestSegmentWriter(t, Config{
SegmentDuration: 100 * time.Millisecond,
})
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("mock metastore unavailable"))
Expand Down Expand Up @@ -422,8 +422,8 @@ func TestDLQRecovery(t *testing.T) {
{shard: 1, tenant: tenant, profile: cpuProfile(42, int(ts), "svc1", "kek", "foo", "bar")},
})

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
sw := newTestSegmentWriter(t, Config{
SegmentDuration: 100 * time.Millisecond,
})
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("mock metastore unavailable"))
Expand Down Expand Up @@ -471,7 +471,7 @@ type sw struct {
queryNo int
}

func newTestSegmentWriter(t *testing.T, cfg segmentWriterConfig) sw {
func newTestSegmentWriter(t *testing.T, cfg Config) sw {
l := testutil.NewLogger(t)
bucket := memory.NewInMemBucket()
client := mockmetastorev1.NewMockMetastoreServiceClient(t)
Expand All @@ -492,9 +492,9 @@ func newTestSegmentWriter(t *testing.T, cfg segmentWriterConfig) sw {
}
}

func defaultTestSegmentWriterConfig() segmentWriterConfig {
return segmentWriterConfig{
segmentDuration: 1 * time.Second,
func defaultTestConfig() Config {
return Config{
SegmentDuration: 1 * time.Second,
}
}

Expand Down
Loading

0 comments on commit 11f619c

Please sign in to comment.