Skip to content

Commit

Permalink
[Fix](source) fix IndexOutOfBoundsException for union all (#495)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Sep 27, 2024
1 parent c61342f commit 8efaa43
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,24 @@ public int hashCode() {
flightSqlPort);
}

public DorisReadOptions copy() {
return new DorisReadOptions(
readFields,
filterQuery,
requestTabletSize,
requestConnectTimeoutMs,
requestReadTimeoutMs,
requestQueryTimeoutS,
requestRetries,
requestBatchSize,
execMemLimit,
deserializeQueueSize,
deserializeArrowAsync,
useOldApi,
useFlightSql,
flightSqlPort);
}

/** Builder of {@link DorisReadOptions}. */
public static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,14 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {

@Override
public DynamicTableSource copy() {
// filterQuery/readFields of readOption may be overwritten in union all sql
DorisDynamicTableSource newSource =
new DorisDynamicTableSource(
options, readOptions, lookupOptions, physicalSchema, physicalRowDataType);
options,
readOptions.copy(),
lookupOptions,
physicalSchema,
physicalRowDataType);
newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery);
return newSource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class DorisSourceITCase extends AbstractITCaseService {
"tbl_read_tbl_push_down_with_union_all";
static final String TABLE_CSV_JM = "tbl_csv_jm_source";
static final String TABLE_CSV_TM = "tbl_csv_tm_source";
private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER =
"tbl_read_tbl_push_down_with_union_all_not_eq_filter";

@Rule
public final MiniClusterWithClientResource miniClusterResource =
Expand Down Expand Up @@ -353,6 +355,51 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray());
}

@Test
public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception {
LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

String sourceDDL =
String.format(
"CREATE TABLE doris_source_filter_with_union_all ("
+ " name STRING,"
+ " age INT"
+ ") WITH ("
+ " 'connector' = '"
+ DorisConfigOptions.IDENTIFIER
+ "',"
+ " 'fenodes' = '%s',"
+ " 'table.identifier' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER,
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);
String querySql =
" SELECT * FROM doris_source_filter_with_union_all where name = 'doris'"
+ " UNION ALL "
+ "SELECT * FROM doris_source_filter_with_union_all where name in ('error','flink')";
TableResult tableResult = tEnv.executeSql(querySql);

List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
}

String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
checkResultInAnyOrder(
"testTableSourceFilterWithUnionAllNotEqualFilter", expected, actual.toArray());
}

@Test
public void testJobManagerFailoverSource() throws Exception {
LOG.info("start to test JobManagerFailoverSource.");
Expand Down

0 comments on commit 8efaa43

Please sign in to comment.