Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] add option ignore commit error #327

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public class DorisExecutionOptions implements Serializable {
private final boolean enableBatchMode;
private final boolean ignoreUpdateBefore;
private final WriteMode writeMode;
private final boolean ignoreCommitError;

public DorisExecutionOptions(
int checkInterval,
Expand All @@ -81,7 +82,8 @@ public DorisExecutionOptions(
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore,
boolean force2PC,
WriteMode writeMode) {
WriteMode writeMode,
boolean ignoreCommitError) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
Expand All @@ -102,6 +104,7 @@ public DorisExecutionOptions(

this.ignoreUpdateBefore = ignoreUpdateBefore;
this.writeMode = writeMode;
this.ignoreCommitError = ignoreCommitError;
}

public static Builder builder() {
Expand Down Expand Up @@ -205,6 +208,10 @@ public WriteMode getWriteMode() {
return writeMode;
}

public boolean ignoreCommitError() {
return ignoreCommitError;
}

/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
Expand All @@ -229,6 +236,7 @@ public static class Builder {

private boolean ignoreUpdateBefore = true;
private WriteMode writeMode = WriteMode.STREAM_LOAD;
private boolean ignoreCommitError = false;

public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
Expand Down Expand Up @@ -320,6 +328,11 @@ public Builder setWriteMode(WriteMode writeMode) {
return this;
}

public Builder setIgnoreCommitError(boolean ignoreCommitError) {
this.ignoreCommitError = ignoreCommitError;
return this;
}

public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record may not be written.
if (streamLoadProp != null
Expand All @@ -344,7 +357,8 @@ public DorisExecutionOptions build() {
bufferFlushIntervalMs,
ignoreUpdateBefore,
force2PC,
writeMode);
writeMode,
ignoreCommitError);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public DorisAbstractWriter createWriter(InitContext initContext) throws IOExcept
public Committer createCommitter() throws IOException {
if (WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())
|| WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCommitter(
dorisOptions, dorisReadOptions, dorisExecutionOptions.getMaxRetries());
return new DorisCommitter(dorisOptions, dorisReadOptions, dorisExecutionOptions);
} else if (WriteMode.COPY.equals(dorisExecutionOptions.getWriteMode())) {
return new DorisCopyCommitter(dorisOptions, dorisExecutionOptions.getMaxRetries());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.doris.flink.sink.committer;

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.util.Preconditions;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand Down Expand Up @@ -58,20 +60,25 @@ public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
private final BackendUtil backendUtil;

int maxRetry;
final boolean ignoreCommitError;

public DorisCommitter(
DorisOptions dorisOptions, DorisReadOptions dorisReadOptions, int maxRetry) {
this(dorisOptions, dorisReadOptions, maxRetry, new HttpUtil().getHttpClient());
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions) {
this(dorisOptions, dorisReadOptions, executionOptions, new HttpUtil().getHttpClient());
}

public DorisCommitter(
DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
int maxRetry,
DorisExecutionOptions executionOptions,
CloseableHttpClient client) {
this.dorisOptions = dorisOptions;
this.dorisReadOptions = dorisReadOptions;
this.maxRetry = maxRetry;
Preconditions.checkArgument(maxRetry >= 0);
this.maxRetry = executionOptions.getMaxRetries();
this.ignoreCommitError = executionOptions.ignoreCommitError();
this.httpClient = client;
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
Expand Down Expand Up @@ -99,8 +106,8 @@ private void commitTransaction(DorisCommittable committable) throws IOException

// hostPort
String hostPort = committable.getHostPort();

LOG.info("commit txn {} to host {}", committable.getTxnID(), hostPort);
Throwable ex = new Throwable();
int retry = 0;
while (retry <= maxRetry) {
// get latest-url
Expand Down Expand Up @@ -130,17 +137,31 @@ private void commitTransaction(DorisCommittable committable) throws IOException
String reasonPhrase = statusLine.getReasonPhrase();
LOG.error("commit failed with {}, reason {}", hostPort, reasonPhrase);
if (retry == maxRetry) {
throw new DorisRuntimeException("commit transaction error: " + reasonPhrase);
ex = new DorisRuntimeException("commit transaction error: " + reasonPhrase);
}
hostPort = backendUtil.getAvailableBackend();
} catch (Exception e) {
LOG.error("commit transaction failed, to retry, {}", e.getMessage());
if (retry == maxRetry) {
throw new DorisRuntimeException("commit transaction error, ", e);
}
ex = e;
hostPort = backendUtil.getAvailableBackend();
}
retry++;

if (retry++ >= maxRetry) {
if (ignoreCommitError) {
// Generally used when txn(stored in checkpoint) expires and unexpected
// errors occur in commit.

// It should be noted that you must manually ensure that the txn has been
// successfully submitted to doris, otherwise there may be a risk of data
// loss.
LOG.error(
"Unable to commit transaction {} and data has been potentially lost ",
committable,
ex);
} else {
throw new DorisRuntimeException("commit transaction error, ", ex);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ public class DorisConfigOptions {
.defaultValue(WriteMode.STREAM_LOAD.name())
.withDescription("Write mode, supports stream_load, stream_load_batch");

public static final ConfigOption<Boolean> SINK_IGNORE_COMMIT_ERROR =
ConfigOptions.key("sink.ignore.commit-error")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to ignore commit errors. Usually used when the checkpoint cannot be restored to skip the commit of txn. The default is false.");

public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;

public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_BATCH_MODE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_ENABLE_DELETE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_FLUSH_QUEUE_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_IGNORE_UPDATE_BEFORE;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
Expand Down Expand Up @@ -156,6 +157,7 @@ public Set<ConfigOption<?>> optionalOptions() {

options.add(SOURCE_USE_OLD_API);
options.add(SINK_WRITE_MODE);
options.add(SINK_IGNORE_COMMIT_ERROR);
return options;
}

Expand Down Expand Up @@ -226,6 +228,7 @@ private DorisExecutionOptions getDorisExecutionOptions(
builder.setStreamLoadProp(streamLoadProp);
builder.setDeletable(readableConfig.get(SINK_ENABLE_DELETE));
builder.setIgnoreUpdateBefore(readableConfig.get(SINK_IGNORE_UPDATE_BEFORE));
builder.setIgnoreCommitError(readableConfig.get(SINK_IGNORE_COMMIT_ERROR));

if (!readableConfig.get(SINK_ENABLE_2PC)) {
builder.disable2PC();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ public DorisSink<String> buildDorisSink(String table) {
sinkConfig
.getOptional(DorisConfigOptions.SINK_WRITE_MODE)
.ifPresent(v -> executionBuilder.setWriteMode(WriteMode.of(v)));
sinkConfig
.getOptional(DorisConfigOptions.SINK_IGNORE_COMMIT_ERROR)
.ifPresent(executionBuilder::setIgnoreCommitError);

DorisExecutionOptions executionOptions = executionBuilder.build();
builder.setDorisReadOptions(DorisReadOptions.builder().build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.committer;

import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class TestDorisCommitter {
public void setUp() throws Exception {
DorisOptions dorisOptions = OptionUtils.buildDorisOptions();
DorisReadOptions readOptions = OptionUtils.buildDorisReadOptions();
DorisExecutionOptions executionOptions = OptionUtils.buildExecutionOptional();
dorisCommittable = new DorisCommittable("127.0.0.1:8710", "test", 0);
CloseableHttpClient httpClient = mock(CloseableHttpClient.class);
entityMock = new HttpEntityMock();
Expand All @@ -78,7 +80,8 @@ public void setUp() throws Exception {
BackendV2.BackendRowV2.of("127.0.0.1", 8040, true)));
backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true);

dorisCommitter = new DorisCommitter(dorisOptions, readOptions, 3, httpClient);
dorisCommitter =
new DorisCommitter(dorisOptions, readOptions, executionOptions, httpClient);
}

@Test
Expand Down
Loading