diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index 8f64c74e..bdc7c485 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -59,7 +59,8 @@ public interface ConfigurationOptions { int DORIS_TABLET_SIZE_MIN = 1; String DORIS_BATCH_SIZE = "doris.batch.size"; - int DORIS_BATCH_SIZE_DEFAULT = 1024; + int DORIS_BATCH_SIZE_DEFAULT = 4064; + int DORIS_BATCH_SIZE_MAX = 65535; String DORIS_EXEC_MEM_LIMIT = "doris.exec.mem.limit"; long DORIS_EXEC_MEM_LIMIT_DEFAULT = 8L * 1024 * 1024 * 1024; diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index f9124a65..3a3e477d 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -96,7 +96,8 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) exten // max row number of one read batch val batchSize = Try { - settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt + // + Math.min(settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt, DORIS_BATCH_SIZE_MAX) } getOrElse { logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))) DORIS_BATCH_SIZE_DEFAULT