From f3c20611e42e0eb597d54d0047baa33da4f3ac60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Spo=CC=88rri?= Date: Thu, 14 Sep 2023 11:32:39 +0200 Subject: [PATCH] Increase maxConcurrencyTask to 10. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Pascal SpoĢˆrri --- README.md | 2 +- .../org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ca52dd4..06bb3b1 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ Changing these values might have an impact on performance. - `spark.shuffle.s3.bufferSize`: Default buffer size when writing (default: `8388608`) - `spark.shuffle.s3.maxBufferSizeTask`: Maximum size of the buffered output streams per task (default: `134217728`) -- `spark.shuffle.s3.maxConcurrencyTask`: Maximum per task concurrency. Computed by analysing the IO latencies (default: `25`). +- `spark.shuffle.s3.maxConcurrencyTask`: Maximum per task concurrency. Computed by analysing the IO latencies (default: `10`). - `spark.shuffle.s3.cachePartitionLengths`: Cache partition lengths in memory (default: `true`) - `spark.shuffle.s3.cacheChecksums`: Cache checksums in memory (default: `true`) - `spark.shuffle.s3.cleanup`: Cleanup the shuffle files (default: `true`) diff --git a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala index f216e13..38824d9 100644 --- a/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala +++ b/src/main/scala/org/apache/spark/shuffle/helper/S3ShuffleDispatcher.scala @@ -50,7 +50,7 @@ class S3ShuffleDispatcher extends Logging { // Optional val bufferSize: Int = conf.getInt("spark.shuffle.s3.bufferSize", defaultValue = 8 * 1024 * 1024) val maxBufferSizeTask: Int = conf.getInt("spark.shuffle.s3.maxBufferSizeTask", defaultValue = 128 * 1024 * 1024) - val maxConcurrencyTask: Int = conf.getInt("spark.shuffle.s3.maxConcurrencyTask", defaultValue = 5) + val maxConcurrencyTask: Int = conf.getInt("spark.shuffle.s3.maxConcurrencyTask", defaultValue = 10) val cachePartitionLengths: Boolean = conf.getBoolean("spark.shuffle.s3.cachePartitionLengths", defaultValue = true) val cacheChecksums: Boolean = conf.getBoolean("spark.shuffle.s3.cacheChecksums", defaultValue = true) val cleanupShuffleFiles: Boolean = conf.getBoolean("spark.shuffle.s3.cleanup", defaultValue = true)