Skip to content

Commit

Permalink
revert 479
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba committed Oct 11, 2024
1 parent 8efaa43 commit f1c2895
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.StringUtils;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
Expand Down Expand Up @@ -65,18 +64,14 @@ public class DorisSource<OUT>
private final Boundedness boundedness;
private final DorisDeserializationSchema<OUT> deserializer;

private final List<String> resolvedFilterQuery;

public DorisSource(
DorisOptions options,
DorisReadOptions readOptions,
Boundedness boundedness,
List<String> resolvedFilterQuery,
DorisDeserializationSchema<OUT> deserializer) {
this.options = options;
this.readOptions = readOptions;
this.boundedness = boundedness;
this.resolvedFilterQuery = resolvedFilterQuery;
this.deserializer = deserializer;
}

Expand All @@ -100,15 +95,6 @@ public SourceReader<OUT, DorisSourceSplit> createReader(SourceReaderContext read
public SplitEnumerator<DorisSourceSplit, PendingSplitsCheckpoint> createEnumerator(
SplitEnumeratorContext<DorisSourceSplit> context) throws Exception {
List<DorisSourceSplit> dorisSourceSplits = new ArrayList<>();
if (!resolvedFilterQuery.isEmpty()) {
String filterQuery = String.join(" AND ", resolvedFilterQuery);
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
readOptions.setFilterQuery(filterQuery);
} else {
readOptions.setFilterQuery(
String.join(" AND ", readOptions.getFilterQuery(), filterQuery));
}
}
List<PartitionDefinition> partitions =
RestService.findPartitions(options, readOptions, LOG);
for (int index = 0; index < partitions.size(); index++) {
Expand Down Expand Up @@ -162,7 +148,6 @@ public static class DorisSourceBuilder<OUT> {
// Boundedness
private Boundedness boundedness;
private DorisDeserializationSchema<OUT> deserializer;
private List<String> resolvedFilterQuery = new ArrayList<>();

DorisSourceBuilder() {
boundedness = Boundedness.BOUNDED;
Expand All @@ -189,17 +174,11 @@ public DorisSourceBuilder<OUT> setDeserializer(
return this;
}

public DorisSourceBuilder<OUT> setResolvedFilterQuery(List<String> resolvedFilterQuery) {
this.resolvedFilterQuery = resolvedFilterQuery;
return this;
}

public DorisSource<OUT> build() {
if (readOptions == null) {
readOptions = DorisReadOptions.builder().build();
}
return new DorisSource<>(
options, readOptions, boundedness, resolvedFilterQuery, deserializer);
return new DorisSource<>(options, readOptions, boundedness, deserializer);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
if (StringUtils.isNullOrWhitespaceOnly(readOptions.getFilterQuery())) {
String filterQuery = resolvedFilterQuery.stream().collect(Collectors.joining(" AND "));
readOptions.setFilterQuery(filterQuery);
}

if (StringUtils.isNullOrWhitespaceOnly(readOptions.getReadFields())) {
String[] selectFields =
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]);
Expand Down Expand Up @@ -123,7 +128,6 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
DorisSource.<RowData>builder()
.setDorisReadOptions(readOptions)
.setDorisOptions(options)
.setResolvedFilterQuery(resolvedFilterQuery)
.setDeserializer(
new RowDataDeserializationSchema(
(RowType) physicalRowDataType.getLogicalType()))
Expand Down

0 comments on commit f1c2895

Please sign in to comment.