diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 2f6cd8a86..937d32866 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -187,6 +187,24 @@ public int hashCode() { flightSqlPort); } + public DorisReadOptions copy() { + return new DorisReadOptions( + readFields, + filterQuery, + requestTabletSize, + requestConnectTimeoutMs, + requestReadTimeoutMs, + requestQueryTimeoutS, + requestRetries, + requestBatchSize, + execMemLimit, + deserializeQueueSize, + deserializeArrowAsync, + useOldApi, + useFlightSql, + flightSqlPort); + } + /** Builder of {@link DorisReadOptions}. */ public static class Builder { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index e55d3631b..03d77bc36 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -166,9 +166,14 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { @Override public DynamicTableSource copy() { + // filterQuery/readFields of readOption may be overwritten in union all sql DorisDynamicTableSource newSource = new DorisDynamicTableSource( - options, readOptions, lookupOptions, physicalSchema, physicalRowDataType); + options, + readOptions.copy(), + lookupOptions, + physicalSchema, + physicalRowDataType); newSource.resolvedFilterQuery = new ArrayList<>(this.resolvedFilterQuery); return newSource; } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java index 4fb6fba8f..6f1483018 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/source/DorisSourceITCase.java @@ -63,6 +63,8 @@ public class DorisSourceITCase extends AbstractITCaseService { "tbl_read_tbl_push_down_with_union_all"; static final String TABLE_CSV_JM = "tbl_csv_jm_source"; static final String TABLE_CSV_TM = "tbl_csv_tm_source"; + private static final String TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER = + "tbl_read_tbl_push_down_with_union_all_not_eq_filter"; @Rule public final MiniClusterWithClientResource miniClusterResource = @@ -353,6 +355,51 @@ public void testTableSourceFilterWithUnionAll() throws Exception { checkResultInAnyOrder("testTableSourceFilterWithUnionAll", expected, actual.toArray()); } + @Test + public void testTableSourceFilterWithUnionAllNotEqualFilter() throws Exception { + LOG.info("starting to execute testTableSourceFilterWithUnionAllNotEqualFilter case."); + initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + String sourceDDL = + String.format( + "CREATE TABLE doris_source_filter_with_union_all (" + + " name STRING," + + " age INT" + + ") WITH (" + + " 'connector' = '" + + DorisConfigOptions.IDENTIFIER + + "'," + + " 'fenodes' = '%s'," + + " 'table.identifier' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'" + + ")", + getFenodes(), + DATABASE + "." + TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL_NOT_EQ_FILTER, + getDorisUsername(), + getDorisPassword()); + tEnv.executeSql(sourceDDL); + String querySql = + " SELECT * FROM doris_source_filter_with_union_all where name = 'doris'" + + " UNION ALL " + + "SELECT * FROM doris_source_filter_with_union_all where name in ('error','flink')"; + TableResult tableResult = tEnv.executeSql(querySql); + + List actual = new ArrayList<>(); + try (CloseableIterator iterator = tableResult.collect()) { + while (iterator.hasNext()) { + actual.add(iterator.next().toString()); + } + } + + String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"}; + checkResultInAnyOrder( + "testTableSourceFilterWithUnionAllNotEqualFilter", expected, actual.toArray()); + } + @Test public void testJobManagerFailoverSource() throws Exception { LOG.info("start to test JobManagerFailoverSource.");