From c1419510e41648c416c6896e8b1901de35b06102 Mon Sep 17 00:00:00 2001 From: "jiachuan.zhu" Date: Mon, 12 Dec 2022 20:29:13 +0800 Subject: [PATCH] - Support Spark 3.3.0 - Close BufferedReader in DorisStreamLoad - Change spark.minor.version to spark.major.version - source jar to include scala code --- spark-doris-connector/pom.xml | 5 +++-- .../apache/doris/spark/DorisStreamLoad.java | 20 ++++++++++++----- .../doris/spark/rdd/ScalaValueReader.scala | 22 +++++++++---------- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/spark-doris-connector/pom.xml b/spark-doris-connector/pom.xml index 023607af..4c8644c5 100644 --- a/spark-doris-connector/pom.xml +++ b/spark-doris-connector/pom.xml @@ -26,7 +26,7 @@ 23 org.apache.doris - spark-doris-connector-${spark.minor.version}_${scala.version} + spark-doris-connector-${spark.major.version}_${scala.version} 1.0.0-SNAPSHOT Spark Doris Connector https://doris.apache.org/ @@ -64,7 +64,7 @@ ${env.scala.version} ${env.spark.version} - ${env.spark.minor.version} + ${env.spark.major.version} 0.13.0 5.0.0 3.8.1 @@ -285,6 +285,7 @@ process-resources compile + add-source diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index f375c769..67684aaf 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -47,6 +47,8 @@ import java.util.Calendar; import java.util.Properties; +import static java.nio.charset.StandardCharsets.UTF_8; + /** * DorisStreamLoad @@ -79,7 +81,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri this.user = user; this.passwd = passwd; this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); - this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8)); } public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException { @@ -91,7 +93,7 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER); this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD); this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); - this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8)); this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO); @@ -112,7 +114,7 @@ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOExce this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); - this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); + this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8)); this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS); this.dfColumns = dfColumns; @@ -257,20 +259,21 @@ private LoadResponse loadBatch(String value) { HttpURLConnection feConn = null; HttpURLConnection beConn = null; + BufferedReader br = null; int status = -1; try { // build request and send to new be location beConn = getConnection(loadUrlStr, label); // send data to be BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); - bos.write(value.getBytes("UTF-8")); + bos.write(value.getBytes(UTF_8)); bos.close(); // get respond status = beConn.getResponseCode(); String respMsg = beConn.getResponseMessage(); InputStream stream = (InputStream) beConn.getContent(); - BufferedReader br = new BufferedReader(new InputStreamReader(stream)); + br = new BufferedReader(new InputStreamReader(stream, UTF_8)); StringBuilder response = new StringBuilder(); String line; while ((line = br.readLine()) != null) { @@ -290,6 +293,13 @@ private LoadResponse loadBatch(String value) { if (beConn != null) { beConn.disconnect(); } + if( br != null) { + try { + br.close(); + } catch (IOException e) { + LOG.warn("Exception occurred during closing BufferedReader, {}", e.getMessage()); + } + } } } diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 03643b2e..cd262f6c 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -31,10 +31,9 @@ import org.apache.doris.spark.rest.PartitionDefinition import org.apache.doris.spark.rest.models.Schema import org.apache.doris.spark.serialization.{Routing, RowBatch} import org.apache.doris.spark.sql.SchemaUtils -import org.apache.doris.spark.util.ErrorMessages import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} -import org.apache.log4j.Logger +import org.apache.spark.internal.Logging import scala.util.control.Breaks @@ -43,8 +42,7 @@ import scala.util.control.Breaks * @param partition Doris RDD partition * @param settings request configuration */ -class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { - private val logger = Logger.getLogger(classOf[ScalaValueReader]) +class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging { protected val client = new BackendClient(new Routing(partition.getBeAddress), settings) protected val clientLock = @@ -57,7 +55,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try { settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean } getOrElse { - logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)) + logWarning(s"Parse '${DORIS_DESERIALIZE_ARROW_ASYNC}' to boolean failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)}'." ) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT } @@ -65,7 +63,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val blockingQueueSize = Try { settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)) + logWarning(s"Parse '${DORIS_DESERIALIZE_QUEUE_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)}'.") DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT } @@ -89,21 +87,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val batchSize = Try { settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE)) + logWarning(s"Parse '${DORIS_BATCH_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_BATCH_SIZE)}'." ) DORIS_BATCH_SIZE_DEFAULT } val queryDorisTimeout = Try { settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)) + logWarning(s"Parse '${DORIS_REQUEST_QUERY_TIMEOUT_S}' to number failed. Original string is '${settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)}'.") DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT } val execMemLimit = Try { settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)) + logWarning(s"Parse '${DORIS_EXEC_MEM_LIMIT}' to number failed. Original string is '${settings.getProperty(DORIS_EXEC_MEM_LIMIT)}'.") DORIS_EXEC_MEM_LIMIT_DEFAULT } @@ -113,7 +111,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) - logger.debug(s"Open scan params is, " + + logDebug(s"Open scan params is, " + s"cluster: ${params.getCluster}, " + s"database: ${params.getDatabase}, " + s"table: ${params.getTable}, " + @@ -159,7 +157,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { started } - logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.") + logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.") /** * read data and cached in rowBatch. @@ -213,7 +211,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { */ def next: AnyRef = { if (!hasNext) { - logger.error(SHOULD_NOT_HAPPEN_MESSAGE) + logError(SHOULD_NOT_HAPPEN_MESSAGE) throw new ShouldNeverHappenException } rowBatch.next