Skip to content

Commit

Permalink
Closes #4529
Browse files Browse the repository at this point in the history
- Reference the ElasticSearch documentation in the bulk chunker size limit config option
- Cleaning up the population of the ES bulk request path logic
- Submitting bulk request items that are below the configured limit, then throwing for overly large items

Signed-off-by: Allan Clements <criminosis@gmail.com>
  • Loading branch information
criminosis committed Jun 25, 2024
1 parent d255396 commit da5c1cf
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 19 deletions.
2 changes: 1 addition & 1 deletion docs/configs/janusgraph-cfg.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ Elasticsearch index configuration

| Name | Description | Datatype | Default Value | Mutability |
| ---- | ---- | ---- | ---- | ---- |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL |
| index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE |
| index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE |
| index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,10 @@ public class ElasticSearchIndex implements IndexProvider {
public static final ConfigOption<Integer> BULK_CHUNK_SIZE_LIMIT_BYTES =
new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes",
"The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " +
"chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000);
"chunked to this size. Ensure that this limit is always less than or equal to the configured limit of " +
"`http.max_content_length` on the Elasticsearch servers. For more information, refer to the " +
"[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).",
ConfigOption.Type.LOCAL, Integer.class, 100_000_000);

public static final int HOST_PORT_DEFAULT = 9200;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.janusgraph.diskstorage.es.rest;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException {
}
}

private class RequestBytes {
@VisibleForTesting
class RequestBytes {
final byte [] requestBytes;
final byte [] requestSource;

private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
@VisibleForTesting
RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException {
Map<String, Object> requestData = new HashMap<>();
if (useMappingTypes) {
requestData.put("_index", request.getIndex());
Expand All @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE
}
}

private int getSerializedSize() {
@VisibleForTesting
int getSerializedSize() {
int serializedSize = this.requestBytes.length;
serializedSize+= 1; //For follow-up NEW_LINE_BYTES
if (this.requestSource != null) {
Expand All @@ -445,15 +450,15 @@ private Pair<String, byte[]> buildBulkRequestInput(List<RequestBytes> requests,
request.writeTo(outputStream);
}

final StringBuilder builder = new StringBuilder();
final StringBuilder bulkRequestQueryParameters = new StringBuilder();
if (ingestPipeline != null) {
APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline);
APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline);
}
if (bulkRefreshEnabled) {
APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh);
APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh);
}
builder.insert(0, REQUEST_SEPARATOR + "_bulk");
return Pair.with(builder.toString(), outputStream.toByteArray());
final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters;
return Pair.with(bulkRequestPath, outputStream.toByteArray());
}

private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMutation(
Expand All @@ -476,7 +481,8 @@ private List<Triplet<Object, Integer, RequestBytes>> pairErrorsWithSubmittedMuta
return errors;
}

private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
@VisibleForTesting
class BulkRequestChunker implements Iterator<List<RequestBytes>> {
//By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under
//the specified limit

Expand All @@ -485,18 +491,32 @@ private class BulkRequestChunker implements Iterator<List<RequestBytes>> {
// settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum
// size of a HTTP request to 100mb by default
private final PeekingIterator<RequestBytes> requestIterator;
private final int[] exceptionallyLargeRequests;

private BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
@VisibleForTesting
BulkRequestChunker(List<ElasticSearchMutation> requests) throws JsonProcessingException {
List<RequestBytes> serializedRequests = new ArrayList<>(requests.size());
List<Integer> requestSizesThatWereTooLarge = new ArrayList<>();
for (ElasticSearchMutation request : requests) {
serializedRequests.add(new RequestBytes(request));
RequestBytes requestBytes = new RequestBytes(request);
int requestSerializedSize = requestBytes.getSerializedSize();
if (requestSerializedSize <= bulkChunkSerializedLimitBytes) {
//Only keep items that we can actually send in memory
serializedRequests.add(requestBytes);
} else {
requestSizesThatWereTooLarge.add(requestSerializedSize);
}
}
this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator());
//Condense request sizes that are too large into an int array to remove Boxed & List memory overhead
this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray();
}

@Override
public boolean hasNext() {
return requestIterator.hasNext();
//Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted
//This allows next() to throw after all well sized requests have been chunked for submission
return requestIterator.hasNext() || exceptionallyLargeRequests.length > 0;
}

@Override
Expand All @@ -509,16 +529,18 @@ public List<RequestBytes> next() {
if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) {
chunkSerializedTotal += requestSerializedSize;
serializedRequests.add(requestIterator.next());
} else if (requestSerializedSize > bulkChunkSerializedLimitBytes) {
//we've encountered an element we cannot send to Elasticsearch given the configured limit
throw new IllegalArgumentException(String.format(
"Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s",
bulkChunkSerializedLimitBytes, requestSerializedSize));
} else {
//Adding this element would exceed the limit, so return the chunk
return serializedRequests;
}
}
//Check if we should throw an exception for items that were exceptionally large and therefore undeliverable.
//This is only done after all items that could be sent have been sent
if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests.length > 0) {
throw new IllegalArgumentException(String.format(
"Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s",
bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests)));
}
//All remaining requests fit in this chunk
return serializedRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException {
}
}

@Test
public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException {
int bulkLimit = 1_000_000;
StringBuilder overlyLargePayloadBuilder = new StringBuilder();
IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a"));
String overlyLargePayload = overlyLargePayloadBuilder.toString();
ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2",
Collections.singletonMap("someKey", overlyLargePayload));
List<ElasticSearchMutation> bulkItems = Arrays.asList(
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1",
Collections.singletonMap("someKey", "small_payload1")),
overlyLargeMutation,
ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3",
Collections.singletonMap("someKey", "small_payload2"))
);

try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) {
RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems);
int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize();

//The chunker should chunk this request first as a list of the 2 smaller items
List<RestElasticSearchClient.RequestBytes> smallItemsChunk = chunkerUnderTest.next();
Assertions.assertEquals(2, smallItemsChunk.size());

//Then the chunker should still return true for hasNext()
Assertions.assertTrue(chunkerUnderTest.hasNext());

//Then the next call for next() should throw to report the exceptionally large item
IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next,
"Should have thrown due to bulk request item being too large");

String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]",
bulkLimit, overlyLargeRequestExpectedSize);

Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage());
}
}

@Test
public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException {
int bulkLimit = 800;
Expand Down

0 comments on commit da5c1cf

Please sign in to comment.