Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] support reader doris using arrow flight driver #465

Merged
merged 6 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ under the License.
<flink.sql.cdc.version>3.1.1</flink.sql.cdc.version>
<flink.python.id>flink-python</flink.python.id>
<libthrift.version>0.16.0</libthrift.version>
<arrow.version>13.0.0</arrow.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
Expand All @@ -95,6 +94,8 @@ under the License.
<jsqlparser.version>4.9</jsqlparser.version>
<mysql.driver.version>8.0.26</mysql.driver.version>
<ojdbc.version>19.3.0.0</ojdbc.version>
<arrow.version>15.0.2</arrow.version>
<adbc.version>0.12.0</adbc.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -179,13 +180,16 @@ under the License.
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow.adbc</groupId>
<artifactId>adbc-driver-flight-sql</artifactId>
<version>${adbc.version}</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>${arrow.version}</version>
</dependency>

<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
Expand All @@ -206,7 +210,6 @@ under the License.
</exclusion>
</exclusions>
</dependency>

<!-- jackson -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -410,13 +413,13 @@ under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<version>3.4.1</version>
<configuration>
<relocations>
<relocation>
<pattern>org.apache.arrow</pattern>
<shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>
</relocation>
<!-- <relocation>-->
<!-- <pattern>org.apache.arrow</pattern>-->
<!-- <shadedPattern>org.apache.doris.shaded.org.apache.arrow</shadedPattern>-->
<!-- </relocation>-->
<relocation>
<pattern>io.netty</pattern>
<shadedPattern>org.apache.doris.shaded.io.netty</shadedPattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,10 @@ public interface ConfigurationOptions {
Boolean DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT = false;
String DORIS_DESERIALIZE_QUEUE_SIZE = "doris.deserialize.queue.size";
Integer DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT = 64;

String USE_FLIGHT_SQL = "source.use-flight-sql";
Boolean USE_FLIGHT_SQL_DEFAULT = false;

String FLIGHT_SQL_PORT = "source.flight-sql-port";
Integer FLIGHT_SQL_PORT_DEFAULT = 9040;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class DorisReadOptions implements Serializable {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private boolean useOldApi;
private boolean useFlightSql;
private Integer flightSqlPort;

public DorisReadOptions(
String readFields,
Expand All @@ -50,7 +52,9 @@ public DorisReadOptions(
Long execMemLimit,
Integer deserializeQueueSize,
Boolean deserializeArrowAsync,
boolean useOldApi) {
boolean useOldApi,
boolean useFlightSql,
Integer flightSqlPort) {
this.readFields = readFields;
this.filterQuery = filterQuery;
this.requestTabletSize = requestTabletSize;
Expand All @@ -63,6 +67,8 @@ public DorisReadOptions(
this.deserializeQueueSize = deserializeQueueSize;
this.deserializeArrowAsync = deserializeArrowAsync;
this.useOldApi = useOldApi;
this.useFlightSql = useFlightSql;
this.flightSqlPort = flightSqlPort;
}

public String getReadFields() {
Expand Down Expand Up @@ -121,6 +127,14 @@ public void setFilterQuery(String filterQuery) {
this.filterQuery = filterQuery;
}

public boolean getUseFlightSql() {
return useFlightSql;
}

public Integer getFlightSqlPort() {
return flightSqlPort;
}

public static Builder builder() {
return new Builder();
}
Expand Down Expand Up @@ -149,7 +163,9 @@ public boolean equals(Object o) {
&& Objects.equals(requestBatchSize, that.requestBatchSize)
&& Objects.equals(execMemLimit, that.execMemLimit)
&& Objects.equals(deserializeQueueSize, that.deserializeQueueSize)
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync);
&& Objects.equals(deserializeArrowAsync, that.deserializeArrowAsync)
&& Objects.equals(useFlightSql, that.useFlightSql)
&& Objects.equals(flightSqlPort, that.flightSqlPort);
}

@Override
Expand All @@ -166,7 +182,9 @@ public int hashCode() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
useFlightSql,
flightSqlPort);
}

/** Builder of {@link DorisReadOptions}. */
Expand All @@ -184,6 +202,8 @@ public static class Builder {
private Integer deserializeQueueSize;
private Boolean deserializeArrowAsync;
private Boolean useOldApi = false;
private Boolean useFlightSql = false;
private Integer flightSqlPort;

public Builder setReadFields(String readFields) {
this.readFields = readFields;
Expand Down Expand Up @@ -240,11 +260,21 @@ public Builder setDeserializeArrowAsync(Boolean deserializeArrowAsync) {
return this;
}

public Builder setUseOldApi(boolean useOldApi) {
public Builder setUseFlightSql(Boolean useFlightSql) {
this.useFlightSql = useFlightSql;
return this;
}

public Builder setUseOldApi(Boolean useOldApi) {
this.useOldApi = useOldApi;
return this;
}

public Builder setFlightSqlPort(Integer flightSqlPort) {
this.flightSqlPort = flightSqlPort;
return this;
}

public DorisReadOptions build() {
return new DorisReadOptions(
readFields,
Expand All @@ -258,7 +288,9 @@ public DorisReadOptions build() {
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi);
useOldApi,
useFlightSql,
flightSqlPort);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ private void init() {
prop.getProperty(
ConfigurationOptions.DORIS_TABLET_SIZE,
ConfigurationOptions.DORIS_TABLET_SIZE_DEFAULT
.toString())))
.setUseFlightSql(
Boolean.valueOf(
prop.getProperty(
ConfigurationOptions.USE_FLIGHT_SQL,
ConfigurationOptions.USE_FLIGHT_SQL_DEFAULT
.toString())))
.setFlightSqlPort(
Integer.valueOf(
prop.getProperty(
ConfigurationOptions.FLIGHT_SQL_PORT,
ConfigurationOptions.FLIGHT_SQL_PORT_DEFAULT
.toString())));

this.options = optionsBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,38 @@ public static String parseResponse(HttpURLConnection connection, Logger logger)
}
}

@VisibleForTesting
public static String parseFlightSql(
DorisReadOptions readOptions,
DorisOptions options,
PartitionDefinition partition,
Logger logger)
throws IllegalArgumentException {
String[] tableIdentifiers = parseIdentifier(options.getTableIdentifier(), logger);
String readFields =
StringUtils.isBlank(readOptions.getReadFields())
? "*"
: readOptions.getReadFields();
String sql =
"select "
+ readFields
+ " from `"
+ tableIdentifiers[0]
+ "`.`"
+ tableIdentifiers[1]
+ "`";
String tablet =
partition.getTabletIds().stream()
.map(Object::toString)
.collect(Collectors.joining(","));
sql += " TABLET(" + tablet + ") ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the tablet need to be spliced ​​in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DorisSource.createEnumerator() method, readers split it by node. Currently, doris does not support specifying be node to read data for the time being, so you need to specify a tablet to query

if (!StringUtils.isEmpty(readOptions.getFilterQuery())) {
sql += " where " + readOptions.getFilterQuery();
}
logger.info("Query SQL Sending to Doris FE is: '{}'.", sql);
return sql;
}

/**
* parse table identifier to array.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.doris.sdk.thrift.TScanColumnDesc;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

public class SchemaUtils {

Expand All @@ -46,4 +49,25 @@ public static Schema convertToSchema(List<TScanColumnDesc> tscanColumnDescs) {
"")));
return schema;
}

public static Schema convertToSchema(
JNSimba marked this conversation as resolved.
Show resolved Hide resolved
Schema tableSchema, org.apache.arrow.vector.types.pojo.Schema tscanColumnDescs) {
Schema schema = new Schema(tscanColumnDescs.getFields().size());
Map<String, Field> collect =
tableSchema.getProperties().stream()
.collect(Collectors.toMap(Field::getName, Function.identity()));
tscanColumnDescs
.getFields()
.forEach(
desc ->
schema.put(
new Field(
desc.getName(),
collect.get(desc.getName()).getType(),
"",
0,
0,
"")));
return schema;
}
}
Loading
Loading