Skip to content

Commit

Permalink
query should be able to filter by whole family and cherry pickable ta…
Browse files Browse the repository at this point in the history
…bles
  • Loading branch information
Hongyu Zhou committed Feb 26, 2024
1 parent 5dffcef commit 3d90f7f
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 115 deletions.
78 changes: 39 additions & 39 deletions pkg/cmd/ctlstore/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,29 @@ type sidecarConfig struct {
}

type reflectorCliConfig struct {
LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"`
ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"`
ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"`
UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"`
UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"`
UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"`
UpstreamShardingFamily string `conf:"upstream-sharding-family" help:"Sharding family(s) reflector is targeting"`
UpstreamShardingTable string `conf:"upstream-sharding-table" help:"Sharding tables(s) reflector is targeting"`
BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"`
BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"`
PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"`
PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"`
PollTimeout time.Duration `conf:"poll-timeout" help:"How long to poll from the source before canceling"`
QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"`
Debug bool `conf:"debug" help:"Turns on debug logging"`
LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"`
Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"`
MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"`
WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"`
WALCheckpointThresholdSize int `conf:"wal-checkpoint-threshold-size" help:"Performs a checkpoint after the WAL file exceeds this size in bytes"`
WALCheckpointType ldbwriter.CheckpointType `conf:"wal-checkpoint-type" help:"what type of checkpoint to manually perform once the wal size is exceeded"`
BusyTimeoutMS int `conf:"busy-timeout-ms" help:"Set a busy timeout on the connection string for sqlite in milliseconds"`
MultiReflector multiReflectorConfig `conf:"multi-reflector" help:"Configuration for running multiple reflectors at once"`
LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"`
ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"`
ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"`
UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"`
UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"`
UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"`
UpstreamShardingWholeFamily string `conf:"upstream-sharding-full-family" help:"Whole sharding family(s) reflector is targeting"`
UpstreamShardingFamilyTable string `conf:"upstream-sharding-family-table" help:"Fully-qualified sharding tables(s) reflector is targeting"`
BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"`
BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"`
PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"`
PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"`
PollTimeout time.Duration `conf:"poll-timeout" help:"How long to poll from the source before canceling"`
QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"`
Debug bool `conf:"debug" help:"Turns on debug logging"`
LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"`
Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"`
MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"`
WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"`
WALCheckpointThresholdSize int `conf:"wal-checkpoint-threshold-size" help:"Performs a checkpoint after the WAL file exceeds this size in bytes"`
WALCheckpointType ldbwriter.CheckpointType `conf:"wal-checkpoint-type" help:"what type of checkpoint to manually perform once the wal size is exceeded"`
BusyTimeoutMS int `conf:"busy-timeout-ms" help:"Set a busy timeout on the connection string for sqlite in milliseconds"`
MultiReflector multiReflectorConfig `conf:"multi-reflector" help:"Configuration for running multiple reflectors at once"`
}

type multiReflectorConfig struct {
Expand Down Expand Up @@ -575,20 +575,20 @@ func multiReflector(ctx context.Context, args []string) {

func defaultReflectorCLIConfig(isSupervisor bool) reflectorCliConfig {
config := reflectorCliConfig{
LDBPath: "",
ChangelogPath: "",
ChangelogSize: 1 * 1024 * 1024,
UpstreamDriver: "",
UpstreamDSN: "",
UpstreamLedgerTable: "ctlstore_dml_ledger",
UpstreamShardingFamily: "flagon2,cob",
UpstreamShardingTable: "flagon2___flags,cob___kvs",
BootstrapURL: "",
PollInterval: 1 * time.Second,
PollJitterCoefficient: 0.25,
QueryBlockSize: 100,
Dogstatsd: defaultDogstatsdConfig(),
PollTimeout: 5 * time.Second,
LDBPath: "",
ChangelogPath: "",
ChangelogSize: 1 * 1024 * 1024,
UpstreamDriver: "",
UpstreamDSN: "",
UpstreamLedgerTable: "ctlstore_dml_ledger",
UpstreamShardingWholeFamily: "flagon2,cob",
UpstreamShardingFamilyTable: "flagon2___flags,cob___kvs",

This comment has been minimized.

Copy link
@janderson-seg

janderson-seg Feb 26, 2024

Contributor

Having tables for families already specified might confuse future users.

BootstrapURL: "",
PollInterval: 1 * time.Second,
PollJitterCoefficient: 0.25,
QueryBlockSize: 100,
Dogstatsd: defaultDogstatsdConfig(),
PollTimeout: 5 * time.Second,
LedgerHealth: ledgerHealthConfig{
Disable: false,
MaxHealthyLatency: time.Minute,
Expand Down Expand Up @@ -658,8 +658,8 @@ func newReflector(cliCfg reflectorCliConfig, isSupervisor bool, i int) (*reflect
Driver: cliCfg.UpstreamDriver,
DSN: cliCfg.UpstreamDSN,
LedgerTable: cliCfg.UpstreamLedgerTable,
ShardingFamily: cliCfg.UpstreamShardingFamily,
ShardingTable: cliCfg.UpstreamShardingTable,
ShardingFamily: cliCfg.UpstreamShardingWholeFamily,
ShardingTable: cliCfg.UpstreamShardingFamilyTable,
PollInterval: cliCfg.PollInterval,
PollJitterCoefficient: cliCfg.PollJitterCoefficient,
QueryBlockSize: cliCfg.QueryBlockSize,
Expand Down
34 changes: 23 additions & 11 deletions pkg/reflector/dml_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,34 @@ func (source *sqlDmlSource) Next(ctx context.Context) (statement schema.DMLState

// Helper function to generate the SQL query
func generateSQLQuery(ledgerTableName, shardingFamily, shardingTable string, blocksize int) string {
baseQuery := "SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE "
whereClause := "seq > ?"
limitClause := " ORDER BY seq LIMIT $4"

This comment has been minimized.

Copy link
@janderson-seg

janderson-seg Feb 26, 2024

Contributor

Feels like it might be cleaner to have nested ifs:

if shardingFamily != "" && shardingTable != "" {
		whereClause += " AND (family_name IN $2 OR CONCAT(family_name,'___',table_name) IN $3)"
	} else if shardingFamily != "" {
		whereClause += " AND family_name IN $2"
	} else if shardingTable != "" {
		whereClause += " AND CONCAT(family_name,'___',table_name) IN $3"
	}
if shardingFamily != "" {
familiesStr := prepareString(shardingFamily)
tablesStr := prepareString(shardingTable)
return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? AND family_name IN $2 AND CONCAT(family_name,'___',table_name) IN $3 ORDER BY seq LIMIT $4",
ledgerTableName,
familiesStr,
tablesStr,
fmt.Sprintf("%d", blocksize))
} else {
return sqlgen.SqlSprintf("SELECT seq, leader_ts, statement, family_name, table_name FROM $1 WHERE seq > ? ORDER BY seq LIMIT $2",
ledgerTableName,
fmt.Sprintf("%d", blocksize))
whereClause += fmt.Sprintf(" AND family_name IN $2")
}

if shardingTable != "" {
whereClause += fmt.Sprintf(" AND CONCAT(family_name,'___',table_name) IN $3")
}

if shardingFamily != "" && shardingTable != "" {
whereClause = fmt.Sprintf("seq > ? AND (family_name IN $2 OR CONCAT(family_name,'___',table_name) IN $3)")
}

return sqlgen.SqlSprintf(baseQuery+whereClause+limitClause,
ledgerTableName,
prepareString(shardingFamily),
prepareString(shardingTable),
fmt.Sprintf("%d", blocksize))
}

// Helper function to prepare the family string for SQL query
func prepareString(str string) string {
if str == "" {
return ""
}

return "(\"" + strings.ReplaceAll(str, ",", "\", \"") + "\")"
}
123 changes: 58 additions & 65 deletions pkg/reflector/dml_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,79 +138,73 @@ func TestSqlDmlSourceWithSharding(t *testing.T) {
}{
{statement: foobar, family: "foo", table: "bar"},
{statement: foobar1, family: "foo", table: "bar1"},
{statement: foo1bar1, family: "foo1", table: "bar1"},
{statement: foo1bar, family: "foo1", table: "bar"},
{statement: foo1bar1, family: "foo1", table: "bar1"},
}

testCases := []struct {
name string
shardingFamily string
shardingTable string
stContains []string
stNotContains []string
seqModContains []int64
seqModNotContains []int64
expectedErr error
name string
shardingFamily string
shardingTable string
stContains []string
seqModContains []int64
expectedErr error
}{
{
name: "Single family single table",
shardingFamily: "foo",
shardingTable: "foo___bar",
stContains: []string{foobar},
stNotContains: []string{foobar1, foo1bar1, foo1bar},
seqModContains: []int64{0},
seqModNotContains: []int64{1, 2, 3},
expectedErr: nil,
name: "Single whole family",
shardingFamily: "foo",
shardingTable: "",
stContains: []string{foobar, foobar1},
seqModContains: []int64{0, 1},
expectedErr: nil,
},
{
name: "Single family multiple tables",
shardingFamily: "foo",
shardingTable: "foo___bar,foo___bar1",
stContains: []string{foobar, foobar1},
stNotContains: []string{foo1bar1, foo1bar},
seqModContains: []int64{0, 1},
seqModNotContains: []int64{2, 3},
expectedErr: nil,
name: "Single qualified table",
shardingFamily: "",
shardingTable: "foo___bar",
stContains: []string{foobar},
seqModContains: []int64{0},
expectedErr: nil,
},
{
name: "Multiple families multiple tables",
shardingFamily: "foo,foo1",
shardingTable: "foo___bar,foo1___bar1",
stContains: []string{foobar, foo1bar1},
stNotContains: []string{foo1bar, foobar1},
seqModContains: []int64{0, 2},
seqModNotContains: []int64{1, 3},
expectedErr: nil,
name: "Multiple whole families",
shardingFamily: "foo,foo1",
shardingTable: "",
stContains: []string{foobar, foobar1, foo1bar, foo1bar1},
seqModContains: []int64{0, 1, 2, 3},
expectedErr: nil,
},
{
name: "All families all tables",
shardingFamily: "foo,foo1",
shardingTable: "foo___bar,foo___bar1,foo1___bar1,foo1___bar",
stContains: []string{foobar, foobar1, foo1bar1, foo1bar},
stNotContains: []string{},
seqModContains: []int64{0, 1, 2, 3},
seqModNotContains: []int64{},
expectedErr: nil,
name: "Multiple qualified tables",
shardingFamily: "",
shardingTable: "foo___bar,foo___bar1,foo1___bar,foo1___bar1",
stContains: []string{foobar, foobar1, foo1bar, foo1bar1},
seqModContains: []int64{0, 1, 2, 3},
expectedErr: nil,
},
{
name: "No family no table",
shardingFamily: "",
shardingTable: "",
stContains: []string{foobar, foobar1, foo1bar1, foo1bar},
stNotContains: []string{},
seqModContains: []int64{0, 1, 2, 3},
seqModNotContains: []int64{},
expectedErr: nil,
name: "Whole family and qualified table",
shardingFamily: "foo",
shardingTable: "foo1___bar1",
stContains: []string{foobar, foobar1, foo1bar1},
seqModContains: []int64{0, 1, 3},
expectedErr: nil,
},
{
name: "Single family no table",
shardingFamily: "foo",
shardingTable: "",
stContains: []string{},
stNotContains: []string{foobar, foobar1, foo1bar1, foo1bar},
seqModContains: []int64{},
seqModNotContains: []int64{0, 1, 2, 3},
expectedErr: nil,
name: "Whole family override qualified table",
shardingFamily: "foo",
shardingTable: "foo___bar",
stContains: []string{foobar, foobar1},
seqModContains: []int64{0, 1},
expectedErr: nil,
},
{
name: "No sharding",
shardingFamily: "",
shardingTable: "",
stContains: []string{foobar, foobar1, foo1bar1, foo1bar},
seqModContains: []int64{0, 1, 2, 3},
expectedErr: nil,
},
}

Expand Down Expand Up @@ -245,11 +239,10 @@ func TestSqlDmlSourceWithSharding(t *testing.T) {
st, err := src.Next(ctx)
require.NoError(t, err)
require.Contains(t, tt.stContains, st.Statement)
require.NotContains(t, tt.stNotContains, st.Statement)
require.True(t, st.Sequence.Int() > lastSeq)
lastSeq = st.Sequence.Int()
require.Contains(t, tt.seqModContains, (lastSeq-2)%int64(len(statements)))
require.NotContains(t, tt.seqModNotContains, (lastSeq-2)%int64(len(statements)))
mod := (lastSeq - 2) % int64(len(statements))
require.Contains(t, tt.seqModContains, mod)
}

_, err = src.Next(ctx)
Expand Down Expand Up @@ -297,6 +290,11 @@ func TestPrepareString(t *testing.T) {
input string
expected string
}{
{
name: "Empty string",
input: "",
expected: "",
},
{
name: "Single family",
input: "family1",
Expand All @@ -307,11 +305,6 @@ func TestPrepareString(t *testing.T) {
input: "family1,family2,family3",
expected: "(\"family1\", \"family2\", \"family3\")",
},
{
name: "No families",
input: "",
expected: "(\"\")",
},
{
name: "Sharding table",
input: "foo___bar",
Expand Down

0 comments on commit 3d90f7f

Please sign in to comment.