From 40a130bb974d1d30e67b82e2e7b937d1df7c1e55 Mon Sep 17 00:00:00 2001 From: liangbowen Date: Wed, 29 Mar 2023 13:21:24 +0800 Subject: [PATCH 1/2] replace org.apache.log4j.Logger by using spark's Logging --- .../doris/spark/rdd/ScalaValueReader.scala | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 03643b2e..174276c6 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -20,8 +20,6 @@ package org.apache.doris.spark.rdd import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} - -import scala.collection.JavaConversions._ import scala.util.Try import org.apache.doris.spark.backend.BackendClient import org.apache.doris.spark.cfg.ConfigurationOptions._ @@ -34,8 +32,9 @@ import org.apache.doris.spark.sql.SchemaUtils import org.apache.doris.spark.util.ErrorMessages import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} -import org.apache.log4j.Logger +import org.apache.spark.internal.Logging +import scala.collection.convert.ImplicitConversions._ import scala.util.control.Breaks /** @@ -43,8 +42,7 @@ import scala.util.control.Breaks * @param partition Doris RDD partition * @param settings request configuration */ -class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { - private val logger = Logger.getLogger(classOf[ScalaValueReader]) +class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging{ protected val client = new BackendClient(new Routing(partition.getBeAddress), settings) protected val clientLock = @@ -57,7 +55,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try { settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean } getOrElse { - logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)) + logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT } @@ -65,7 +63,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val blockingQueueSize = Try { settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))) DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT } @@ -89,21 +87,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { val batchSize = Try { settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE)) - DORIS_BATCH_SIZE_DEFAULT + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))) + DORIS_BATCH_SIZE_DEFAULT } val queryDorisTimeout = Try { settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))) DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT } val execMemLimit = Try { settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)) + logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))) DORIS_EXEC_MEM_LIMIT_DEFAULT } @@ -113,7 +111,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, "")) params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, "")) - logger.debug(s"Open scan params is, " + + logDebug(s"Open scan params is, " + s"cluster: ${params.getCluster}, " + s"database: ${params.getDatabase}, " + s"table: ${params.getTable}, " + @@ -159,7 +157,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { started } - logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.") + logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.") /** * read data and cached in rowBatch. @@ -213,7 +211,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) { */ def next: AnyRef = { if (!hasNext) { - logger.error(SHOULD_NOT_HAPPEN_MESSAGE) + logError(SHOULD_NOT_HAPPEN_MESSAGE) throw new ShouldNeverHappenException } rowBatch.next From 4ed623b484dfbca6166c5de9a6e6e61a0f75692d Mon Sep 17 00:00:00 2001 From: liangbowen Date: Wed, 29 Mar 2023 13:26:38 +0800 Subject: [PATCH 2/2] fix JavaConversions --- .../scala/org/apache/doris/spark/rdd/ScalaValueReader.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala index 174276c6..b196355a 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/rdd/ScalaValueReader.scala @@ -20,6 +20,8 @@ package org.apache.doris.spark.rdd import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent._ import java.util.concurrent.locks.{Condition, Lock, ReentrantLock} + +import scala.collection.JavaConversions._ import scala.util.Try import org.apache.doris.spark.backend.BackendClient import org.apache.doris.spark.cfg.ConfigurationOptions._ @@ -34,7 +36,6 @@ import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} import org.apache.spark.internal.Logging -import scala.collection.convert.ImplicitConversions._ import scala.util.control.Breaks /**