Skip to content

Commit

Permalink
Add per workload TxThrottler metrics (#110)
Browse files Browse the repository at this point in the history
This is a backport of upstreamed vitessio#13526

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau authored Aug 1, 2023
1 parent bd682ad commit 7203c35
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 22 deletions.
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand All @@ -215,7 +215,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options), qre.options.GetWorkloadName()) {
return nil, errTxThrottled
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,6 @@ func (m mockTxThrottler) Open() (err error) {
func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
func (m mockTxThrottler) Throttle(priority int, workload string) (result bool) {
return m.throttle
}
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options), options.GetWorkloadName()) {
return errTxThrottled
}
var beginSQL string
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int) (result bool)
Throttle(priority int, workload string) (result bool)
}

func init() {
Expand Down Expand Up @@ -142,8 +142,8 @@ type txThrottler struct {

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
requestsTotal *stats.CountersWithSingleLabel
requestsThrottled *stats.CountersWithSingleLabel
}

// txThrottlerConfig holds the parameters that need to be
Expand Down Expand Up @@ -237,8 +237,8 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
requestsTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerRequests", "transaction throttler requests", "workload"),
requestsThrottled: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerThrottled", "transaction throttler requests throttled", "workload"),
}, nil
}

Expand Down Expand Up @@ -276,7 +276,7 @@ func (t *txThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *txThrottler) Throttle(priority int) (result bool) {
func (t *txThrottler) Throttle(priority int, workload string) (result bool) {
if !t.config.enabled {
return false
}
Expand All @@ -289,9 +289,9 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority

t.requestsTotal.Add(1)
t.requestsTotal.Add(workload, 1)
if result {
t.requestsThrottled.Add(1)
t.requestsThrottled.Add(workload, 1)
}

return result
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDisabledThrottler(t *testing.T) {
Shard: "shard",
})
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle(0))
assert.False(t, throttler.Throttle(0, "some-workload"))
throttlerImpl, _ := throttler.(*txThrottler)
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
throttler.Close()
Expand Down Expand Up @@ -126,9 +126,9 @@ func TestEnabledThrottler(t *testing.T) {
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

assert.False(t, throttler.Throttle(100))
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])

throttler.state.StatsUpdate(tabletStats)
rdonlyTabletStats := &discovery.LegacyTabletStats{
Expand All @@ -139,14 +139,14 @@ func TestEnabledThrottler(t *testing.T) {
// This call should not be forwarded to the go/vt/throttler.Throttler object.
hcListener.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.
assert.True(t, throttler.Throttle(100))
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
assert.True(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
assert.False(t, throttler.Throttle(0))
assert.Equal(t, int64(3), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down

0 comments on commit 7203c35

Please sign in to comment.