diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java index 1b05453ad..1f09041c4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/DorisSource.java @@ -27,7 +27,6 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.core.io.SimpleVersionedSerializer; -import org.apache.flink.util.StringUtils; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; @@ -65,18 +64,14 @@ public class DorisSource private final Boundedness boundedness; private final DorisDeserializationSchema deserializer; - private final List resolvedFilterQuery; - public DorisSource( DorisOptions options, DorisReadOptions readOptions, Boundedness boundedness, - List resolvedFilterQuery, DorisDeserializationSchema deserializer) { this.options = options; this.readOptions = readOptions; this.boundedness = boundedness; - this.resolvedFilterQuery = resolvedFilterQuery; this.deserializer = deserializer; } @@ -100,15 +95,6 @@ public SourceReader createReader(SourceReaderContext read public SplitEnumerator createEnumerator( SplitEnumeratorContext context) throws Exception { List dorisSourceSplits = new ArrayList<>(); - if (!resolvedFilterQuery.isEmpty()) { - String filterQuery = String.join(" AND ", resolvedFilterQuery); - if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { - readOptions.setFilterQuery(filterQuery); - } else { - readOptions.setFilterQuery( - String.join(" AND ", readOptions.getFilterQuery(), filterQuery)); - } - } List partitions = RestService.findPartitions(options, readOptions, LOG); for (int index = 0; index < partitions.size(); index++) { @@ -162,7 +148,6 @@ public static class DorisSourceBuilder { // Boundedness private Boundedness boundedness; private DorisDeserializationSchema deserializer; - private List resolvedFilterQuery = new ArrayList<>(); DorisSourceBuilder() { boundedness = Boundedness.BOUNDED; @@ -189,17 +174,11 @@ public DorisSourceBuilder setDeserializer( return this; } - public DorisSourceBuilder setResolvedFilterQuery(List resolvedFilterQuery) { - this.resolvedFilterQuery = resolvedFilterQuery; - return this; - } - public DorisSource build() { if (readOptions == null) { readOptions = DorisReadOptions.builder().build(); } - return new DorisSource<>( - options, readOptions, boundedness, resolvedFilterQuery, deserializer); + return new DorisSource<>(options, readOptions, boundedness, deserializer); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java index 03d77bc36..9763a888a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSource.java @@ -90,6 +90,11 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) { + String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND ")); + readOptions.setFilterQuery(filterQuery); + } + if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) { String[] selectFields = DataType.getFieldNames(physicalRowDataType).toArray(new String[0]); @@ -123,7 +128,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon DorisSource.builder() .setDorisReadOptions(readOptions) .setDorisOptions(options) - .setResolvedFilterQuery(resolvedFilterQuery) .setDeserializer( new RowDataDeserializationSchema( (RowType) physicalRowDataType.getLogicalType()))