Skip to content

Commit

Permalink
- Support Spark 3.3.0
Browse files Browse the repository at this point in the history
- Close BufferedReader in DorisStreamLoad
- Change spark.minor.version to spark.major.version
- source jar to include scala code

- Support Spark 3.3.0
- Close BufferedReader in DorisStreamLoad
- Change spark.minor.version to spark.major.version
- source jar to include scala code
  • Loading branch information
chncaesar committed Dec 13, 2022
1 parent d933102 commit 02880a2
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
8 changes: 4 additions & 4 deletions spark-doris-connector/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ done

# extract minor version:
# eg: 3.1.2 -> 3
SPARK_MINOR_VERSION=0
SPARK_MAJOR_VERSION=0
if [ ${SPARK_VERSION} != 0 ]; then
SPARK_MINOR_VERSION=${SPARK_VERSION%.*}
echo "SPARK_MINOR_VERSION: ${SPARK_MINOR_VERSION}"
SPARK_MAJOR_VERSION=${SPARK_VERSION%.*}
echo "SPARK_MAJOR_VERSION: ${SPARK_MAJOR_VERSION}"
fi

if [[ ${BUILD_FROM_TAG} -eq 1 ]]; then
rm -rf ${ROOT}/output/
${MVN_BIN} clean package
else
rm -rf ${ROOT}/output/
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.minor.version=${SPARK_MINOR_VERSION} $MVN_ARGS
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.major.version=${SPARK_MAJOR_VERSION} $MVN_ARGS
fi

mkdir ${ROOT}/output/
Expand Down
5 changes: 3 additions & 2 deletions spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<version>23</version>
</parent>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-${spark.minor.version}_${scala.version}</artifactId>
<artifactId>spark-doris-connector-${spark.major.version}_${scala.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Spark Doris Connector</name>
<url>https://doris.apache.org/</url>
Expand Down Expand Up @@ -64,7 +64,7 @@
<properties>
<scala.version>${env.scala.version}</scala.version>
<spark.version>${env.spark.version}</spark.version>
<spark.minor.version>${env.spark.minor.version}</spark.minor.version>
<spark.major.version>${env.spark.major.version}</spark.major.version>
<libthrift.version>0.13.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
Expand Down Expand Up @@ -285,6 +285,7 @@
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
<goal>add-source</goal>
</goals>
</execution>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.Calendar;
import java.util.Properties;

import static java.nio.charset.StandardCharsets.UTF_8;


/**
* DorisStreamLoad
Expand Down Expand Up @@ -79,7 +81,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
}

public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
Expand All @@ -91,7 +93,7 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);

this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
Expand All @@ -112,7 +114,7 @@ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOExce


this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;

Expand Down Expand Up @@ -257,20 +259,21 @@ private LoadResponse loadBatch(String value) {

HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
BufferedReader br = null;
int status = -1;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(value.getBytes("UTF-8"));
bos.write(value.getBytes(UTF_8));
bos.close();

// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
br = new BufferedReader(new InputStreamReader(stream, UTF_8));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
Expand All @@ -290,6 +293,13 @@ private LoadResponse loadBatch(String value) {
if (beConn != null) {
beConn.disconnect();
}
if( br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Exception occurred during closing BufferedReader, {}", e.getMessage());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import org.apache.doris.spark.rest.PartitionDefinition
import org.apache.doris.spark.rest.models.Schema
import org.apache.doris.spark.serialization.{Routing, RowBatch}
import org.apache.doris.spark.sql.SchemaUtils
import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging

import scala.util.control.Breaks

Expand All @@ -43,8 +42,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
private val logger = Logger.getLogger(classOf[ScalaValueReader])
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging {

protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val clientLock =
Expand All @@ -57,15 +55,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
logWarning(s"Parse '${DORIS_DESERIALIZE_ARROW_ASYNC}' to boolean failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)}'." )
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}

protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))
logWarning(s"Parse '${DORIS_DESERIALIZE_QUEUE_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)}'.")
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}

Expand All @@ -89,21 +87,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val batchSize = Try {
settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))
logWarning(s"Parse '${DORIS_BATCH_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_BATCH_SIZE)}'." )
DORIS_BATCH_SIZE_DEFAULT
}

val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
logWarning(s"Parse '${DORIS_REQUEST_QUERY_TIMEOUT_S}' to number failed. Original string is '${settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)}'.")
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}

val execMemLimit = Try {
settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
logWarning(s"Parse '${DORIS_EXEC_MEM_LIMIT}' to number failed. Original string is '${settings.getProperty(DORIS_EXEC_MEM_LIMIT)}'.")
DORIS_EXEC_MEM_LIMIT_DEFAULT
}

Expand All @@ -113,7 +111,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))

logger.debug(s"Open scan params is, " +
logDebug(s"Open scan params is, " +
s"cluster: ${params.getCluster}, " +
s"database: ${params.getDatabase}, " +
s"table: ${params.getTable}, " +
Expand Down Expand Up @@ -159,7 +157,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
started
}

logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.")

/**
* read data and cached in rowBatch.
Expand Down Expand Up @@ -213,7 +211,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
*/
def next: AnyRef = {
if (!hasNext) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
logError(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
}
rowBatch.next
Expand Down

0 comments on commit 02880a2

Please sign in to comment.