Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] Use Spark's Logging instead of explicit usage of log4j #84

Merged
merged 2 commits into from
Mar 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.doris.spark.sql.SchemaUtils
import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging

import scala.util.control.Breaks

Expand All @@ -43,8 +43,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
private val logger = Logger.getLogger(classOf[ScalaValueReader])
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging{

protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val clientLock =
Expand All @@ -57,15 +56,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
logWarning(String.format(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)))
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}

protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))
logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)))
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}

Expand All @@ -89,21 +88,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val batchSize = Try {
settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))
DORIS_BATCH_SIZE_DEFAULT
logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE)))
DORIS_BATCH_SIZE_DEFAULT
}

val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)))
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}

val execMemLimit = Try {
settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
logWarning(String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT)))
DORIS_EXEC_MEM_LIMIT_DEFAULT
}

Expand All @@ -113,7 +112,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))

logger.debug(s"Open scan params is, " +
logDebug(s"Open scan params is, " +
s"cluster: ${params.getCluster}, " +
s"database: ${params.getDatabase}, " +
s"table: ${params.getTable}, " +
Expand Down Expand Up @@ -159,7 +158,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
started
}

logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.")

/**
* read data and cached in rowBatch.
Expand Down Expand Up @@ -213,7 +212,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
*/
def next: AnyRef = {
if (!hasNext) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
logError(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
}
rowBatch.next
Expand Down