diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index f5fd722f77..c44e5a96cc 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -150,7 +150,7 @@ Elasticsearch index configuration | Name | Description | Datatype | Default Value | Mutability | | ---- | ---- | ---- | ---- | ---- | -| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. | Integer | 100000000 | LOCAL | +| index.[X].elasticsearch.bulk-chunk-size-limit-bytes | The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be chunked to this size. Ensure that this limit is always less than or equal to the configured limit of `http.max_content_length` on the Elasticsearch servers. For more information, refer to the [Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html). | Integer | 100000000 | LOCAL | | index.[X].elasticsearch.bulk-refresh | Elasticsearch bulk API refresh setting used to control when changes made by this request are made visible to search | String | false | MASKABLE | | index.[X].elasticsearch.client-keep-alive | Set a keep-alive timeout (in milliseconds) | Long | (no default value) | GLOBAL_OFFLINE | | index.[X].elasticsearch.connect-timeout | Sets the maximum connection timeout (in milliseconds). | Integer | 1000 | MASKABLE | diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java index e357fd20ad..4587a3cb06 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/ElasticSearchIndex.java @@ -327,7 +327,10 @@ public class ElasticSearchIndex implements IndexProvider { public static final ConfigOption BULK_CHUNK_SIZE_LIMIT_BYTES = new ConfigOption<>(ELASTICSEARCH_NS, "bulk-chunk-size-limit-bytes", "The total size limit in bytes of a bulk request. Mutation batches in excess of this limit will be " + - "chunked to this size.", ConfigOption.Type.LOCAL, Integer.class, 100_000_000); + "chunked to this size. Ensure that this limit is always less than or equal to the configured limit of " + + "`http.max_content_length` on the Elasticsearch servers. For more information, refer to the " + + "[Elasticsearch documentation](https://www.elastic.co/guide/en/elasticsearch/reference/master/modules-network.html).", + ConfigOption.Type.LOCAL, Integer.class, 100_000_000); public static final int HOST_PORT_DEFAULT = 9200; diff --git a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java index 59fa0af314..6b4e739a98 100644 --- a/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java +++ b/janusgraph-es/src/main/java/org/janusgraph/diskstorage/es/rest/RestElasticSearchClient.java @@ -14,6 +14,7 @@ package org.janusgraph.diskstorage.es.rest; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; @@ -54,6 +55,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -392,11 +394,13 @@ public void clearStore(String indexName, String storeName) throws IOException { } } - private class RequestBytes { + @VisibleForTesting + class RequestBytes { final byte [] requestBytes; final byte [] requestSource; - private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { + @VisibleForTesting + RequestBytes(final ElasticSearchMutation request) throws JsonProcessingException { Map requestData = new HashMap<>(); if (useMappingTypes) { requestData.put("_index", request.getIndex()); @@ -419,7 +423,8 @@ private RequestBytes(final ElasticSearchMutation request) throws JsonProcessingE } } - private int getSerializedSize() { + @VisibleForTesting + int getSerializedSize() { int serializedSize = this.requestBytes.length; serializedSize+= 1; //For follow-up NEW_LINE_BYTES if (this.requestSource != null) { @@ -445,15 +450,15 @@ private Pair buildBulkRequestInput(List requests, request.writeTo(outputStream); } - final StringBuilder builder = new StringBuilder(); + final StringBuilder bulkRequestQueryParameters = new StringBuilder(); if (ingestPipeline != null) { - APPEND_OP.apply(builder).append("pipeline=").append(ingestPipeline); + APPEND_OP.apply(bulkRequestQueryParameters).append("pipeline=").append(ingestPipeline); } if (bulkRefreshEnabled) { - APPEND_OP.apply(builder).append("refresh=").append(bulkRefresh); + APPEND_OP.apply(bulkRequestQueryParameters).append("refresh=").append(bulkRefresh); } - builder.insert(0, REQUEST_SEPARATOR + "_bulk"); - return Pair.with(builder.toString(), outputStream.toByteArray()); + final String bulkRequestPath = REQUEST_SEPARATOR + "_bulk" + bulkRequestQueryParameters; + return Pair.with(bulkRequestPath, outputStream.toByteArray()); } private List> pairErrorsWithSubmittedMutation( @@ -476,7 +481,8 @@ private List> pairErrorsWithSubmittedMuta return errors; } - private class BulkRequestChunker implements Iterator> { + @VisibleForTesting + class BulkRequestChunker implements Iterator> { //By default, Elasticsearch writes are limited to 100mb, so chunk a given batch of requests so they stay under //the specified limit @@ -485,18 +491,32 @@ private class BulkRequestChunker implements Iterator> { // settings to find the optimal size for your particular workload. Note that Elasticsearch limits the maximum // size of a HTTP request to 100mb by default private final PeekingIterator requestIterator; + private final int[] exceptionallyLargeRequests; - private BulkRequestChunker(List requests) throws JsonProcessingException { + @VisibleForTesting + BulkRequestChunker(List requests) throws JsonProcessingException { List serializedRequests = new ArrayList<>(requests.size()); + List requestSizesThatWereTooLarge = new ArrayList<>(); for (ElasticSearchMutation request : requests) { - serializedRequests.add(new RequestBytes(request)); + RequestBytes requestBytes = new RequestBytes(request); + int requestSerializedSize = requestBytes.getSerializedSize(); + if (requestSerializedSize <= bulkChunkSerializedLimitBytes) { + //Only keep items that we can actually send in memory + serializedRequests.add(requestBytes); + } else { + requestSizesThatWereTooLarge.add(requestSerializedSize); + } } this.requestIterator = Iterators.peekingIterator(serializedRequests.iterator()); + //Condense request sizes that are too large into an int array to remove Boxed & List memory overhead + this.exceptionallyLargeRequests = requestSizesThatWereTooLarge.stream().mapToInt(Integer::intValue).toArray(); } @Override public boolean hasNext() { - return requestIterator.hasNext(); + //Make sure hasNext() still returns true if exceptionally large requests were attempted to be submitted + //This allows next() to throw after all well sized requests have been chunked for submission + return requestIterator.hasNext() || exceptionallyLargeRequests.length > 0; } @Override @@ -509,16 +529,18 @@ public List next() { if (requestSerializedSize + chunkSerializedTotal <= bulkChunkSerializedLimitBytes) { chunkSerializedTotal += requestSerializedSize; serializedRequests.add(requestIterator.next()); - } else if (requestSerializedSize > bulkChunkSerializedLimitBytes) { - //we've encountered an element we cannot send to Elasticsearch given the configured limit - throw new IllegalArgumentException(String.format( - "Bulk request item is larger than permitted chunk limit. Limit is %s. Serialized item size was %s", - bulkChunkSerializedLimitBytes, requestSerializedSize)); } else { //Adding this element would exceed the limit, so return the chunk return serializedRequests; } } + //Check if we should throw an exception for items that were exceptionally large and therefore undeliverable. + //This is only done after all items that could be sent have been sent + if (serializedRequests.isEmpty() && this.exceptionallyLargeRequests.length > 0) { + throw new IllegalArgumentException(String.format( + "Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) %s", + bulkChunkSerializedLimitBytes, Arrays.toString(this.exceptionallyLargeRequests))); + } //All remaining requests fit in this chunk return serializedRequests; } diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java index 87f4cc1c3f..dae07870d5 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/rest/RestClientBulkRequestsTest.java @@ -34,6 +34,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.stream.IntStream; import static org.mockito.ArgumentMatchers.any; @@ -105,6 +106,44 @@ public void testSplittingOfLargeBulkItems() throws IOException { } } + @Test + public void testThrowingForOverlyLargeBulkItemOnlyAfterSmallerItemsAreChunked() throws IOException { + int bulkLimit = 1_000_000; + StringBuilder overlyLargePayloadBuilder = new StringBuilder(); + IntStream.range(0, bulkLimit * 10).forEach(value -> overlyLargePayloadBuilder.append("a")); + String overlyLargePayload = overlyLargePayloadBuilder.toString(); + ElasticSearchMutation overlyLargeMutation = ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id2", + Collections.singletonMap("someKey", overlyLargePayload)); + List bulkItems = Arrays.asList( + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id1", + Collections.singletonMap("someKey", "small_payload1")), + overlyLargeMutation, + ElasticSearchMutation.createIndexRequest("some_index", "some_type", "some_doc_id3", + Collections.singletonMap("someKey", "small_payload2")) + ); + + try (RestElasticSearchClient restClientUnderTest = createClient(bulkLimit)) { + RestElasticSearchClient.BulkRequestChunker chunkerUnderTest = restClientUnderTest.new BulkRequestChunker(bulkItems); + int overlyLargeRequestExpectedSize = restClientUnderTest.new RequestBytes(overlyLargeMutation).getSerializedSize(); + + //The chunker should chunk this request first as a list of the 2 smaller items + List smallItemsChunk = chunkerUnderTest.next(); + Assertions.assertEquals(2, smallItemsChunk.size()); + + //Then the chunker should still return true for hasNext() + Assertions.assertTrue(chunkerUnderTest.hasNext()); + + //Then the next call for next() should throw to report the exceptionally large item + IllegalArgumentException thrownException = Assertions.assertThrows(IllegalArgumentException.class, chunkerUnderTest::next, + "Should have thrown due to bulk request item being too large"); + + String expectedExceptionMessage = String.format("Bulk request item(s) larger than permitted chunk limit. Limit is %s. Serialized item size(s) [%s]", + bulkLimit, overlyLargeRequestExpectedSize); + + Assertions.assertEquals(expectedExceptionMessage, thrownException.getMessage()); + } + } + @Test public void testThrowingIfSingleBulkItemIsLargerThanLimit() throws IOException { int bulkLimit = 800;