Skip to content

Commit

Permalink
add sql input source config and relevant IngestionSpecOptions (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
vzayts authored Nov 30, 2023
1 parent b0fa8ee commit c00829b
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 36 deletions.
44 changes: 44 additions & 0 deletions supervisor_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,19 @@ type HttpInputSourceConfig struct {
AllowedProtocols []string `json:" allowedProtocols,omitempty"`
}

// ConnectorConfig is connection configuration for Database.
type ConnectorConfig struct {
ConnectURI string `json:"connectURI"`
User string `json:"user"`
Password string `json:"password"`
}

// Database configuration for InputSource "sql".
type Database struct {
Type string `json:"type"`
ConnectorConfig *ConnectorConfig `json:"connectorConfig"`
}

// InputSource is a specification of the storage system where input data is stored.
type InputSource struct {
Type string `json:"type"`
Expand All @@ -322,6 +335,10 @@ type InputSource struct {

// CombiningInputSource fields
Delegates []InputSource `json:"delegates,omitempty"`

// SqlInputSource
SQLs []string `json:"sqls,omitempty"`
Database *Database `json:"database,omitempty"`
}

// TransformSpec is responsible for transforming druid input data
Expand Down Expand Up @@ -428,6 +445,15 @@ func SetType(stype string) IngestionSpecOptions {
}
}

// SetIOConfigType sets the type of the supervisor IOConfig.
func SetIOConfigType(ioctype string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
if ioctype != "" {
spec.IOConfig.Type = ioctype
}
}
}

// SetTopic sets the Kafka topic to consume data from.
func SetTopic(topic string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
Expand Down Expand Up @@ -519,3 +545,21 @@ func SetGranularitySpec(segmentGranularity, queryGranularity string, rollup bool
}
}
}

// SetSQLInputSource configures sql input source.
func SetSQLInputSource(dbType, connectURI, user, password string, sqls []string) IngestionSpecOptions {
return func(spec *InputIngestionSpec) {
spec.IOConfig.InputSource = &InputSource{
Type: "sql",
SQLs: sqls,
Database: &Database{
Type: dbType,
ConnectorConfig: &ConnectorConfig{
ConnectURI: connectURI,
User: user,
Password: password,
},
},
}
}
}
151 changes: 115 additions & 36 deletions supervisor_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,27 +96,25 @@ var jsonBasic = `{
}`

func TestKafkaIngestionSpec_MarshalJSON(t *testing.T) {
t.Run("jsonBasic", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{"ts", "user_name", "payload"}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonBasic)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
})
var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

var jsonWithTypedDimensions = `{
Expand Down Expand Up @@ -165,21 +163,102 @@ var jsonWithTypedDimensions = `{
}`

func TestIngestionSpecWithTypedDimensions_MarshalJSON(t *testing.T) {
t.Run("jsonWithTypedDimensions", func(t *testing.T) {
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
spec := NewIngestionSpec(
SetDataSource("test_datasource"),
SetTopic("test_topic"),
SetBrokers("test_brokers"),
SetDimensions([]any{
Dimension{Type: "string", Name: "ts"},
Dimension{Type: "json", Name: "payload"},
}),
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
}

var jsonWithSqlInputSource = `{
"type": "index_parallel",
"dataSchema": {
"dataSource": "test_datasource",
"timestampSpec": {
"column": "ts",
"format": "auto"
},
"transformSpec": {
"transforms": []
},
"dimensionsSpec": {
"dimensions": [
"ts",
"user_name",
"payload"
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "none"
}
},
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "sql",
"sqls": [
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'"
],
"database": {
"type": "mysql",
"connectorConfig": {
"connectURI": "jdbc:mysql://host:port/schema",
"user": "username",
"password": "password"
}
}
},
"consumerProperties": {},
"taskDuration": "PT1H",
"useEarliestOffset": false,
"flattenSpec": {
"fields": []
},
"inputFormat": {
"type": "json"
}
}
}`

func TestIngestionSpecWithSqlInputSource_MarshalJSON(t *testing.T) {
spec := NewIngestionSpec(
SetType("index_parallel"),
SetIOConfigType("index_parallel"),
SetDataSource("test_datasource"),
SetDimensions([]any{"ts", "user_name", "payload"}),
SetSQLInputSource("mysql",
"jdbc:mysql://host:port/schema",
"username",
"password",
[]string{
"SELECT * FROM table1 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
"SELECT * FROM table2 WHERE timestamp BETWEEN '2013-01-01 00:00:00' AND '2013-01-01 11:59:59'",
}),
)
actual, err := json.MarshalIndent(spec, "", " ")
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithTypedDimensions)
require.Equal(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))
})
)
actual, err := json.Marshal(spec)
if err != nil {
t.Fatalf("unexpected error while marshalling: %v", err)
}
expected := []byte(jsonWithSqlInputSource)
require.JSONEq(t, string(expected), string(actual), fmt.Sprintf("expected: %s\nactual: %s", string(expected), string(actual)))

var checkSpec *InputIngestionSpec
err = json.Unmarshal(actual, &checkSpec)
if err != nil {
t.Fatalf("unexpected error while unmarshalling: %v", err)
}
require.Equal(t, spec, checkSpec)
}

0 comments on commit c00829b

Please sign in to comment.