Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.3.0 support #57

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions spark-doris-connector/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,18 @@ done

# extract minor version:
# eg: 3.1.2 -> 3
SPARK_MINOR_VERSION=0
SPARK_MAJOR_VERSION=0
if [ ${SPARK_VERSION} != 0 ]; then
SPARK_MINOR_VERSION=${SPARK_VERSION%.*}
echo "SPARK_MINOR_VERSION: ${SPARK_MINOR_VERSION}"
SPARK_MAJOR_VERSION=${SPARK_VERSION%.*}
echo "SPARK_MAJOR_VERSION: ${SPARK_MAJOR_VERSION}"
fi

if [[ ${BUILD_FROM_TAG} -eq 1 ]]; then
rm -rf ${ROOT}/output/
${MVN_BIN} clean package
else
rm -rf ${ROOT}/output/
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.minor.version=${SPARK_MINOR_VERSION} $MVN_ARGS
${MVN_BIN} clean package -Dspark.version=${SPARK_VERSION} -Dscala.version=${SCALA_VERSION} -Dspark.major.version=${SPARK_MAJOR_VERSION} $MVN_ARGS
fi

mkdir ${ROOT}/output/
Expand Down
5 changes: 3 additions & 2 deletions spark-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<version>23</version>
</parent>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-${spark.minor.version}_${scala.version}</artifactId>
<artifactId>spark-doris-connector-${spark.major.version}_${scala.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Spark Doris Connector</name>
<url>https://doris.apache.org/</url>
Expand Down Expand Up @@ -64,7 +64,7 @@
<properties>
<scala.version>${env.scala.version}</scala.version>
<spark.version>${env.spark.version}</spark.version>
<spark.minor.version>${env.spark.minor.version}</spark.minor.version>
<spark.major.version>${env.spark.major.version}</spark.major.version>
<libthrift.version>0.13.0</libthrift.version>
<arrow.version>5.0.0</arrow.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
Expand Down Expand Up @@ -285,6 +285,7 @@
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
<goal>add-source</goal>
</goals>
</execution>
<execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.Calendar;
import java.util.Properties;

import static java.nio.charset.StandardCharsets.UTF_8;


/**
* DorisStreamLoad
Expand Down Expand Up @@ -79,7 +81,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
this.user = user;
this.passwd = passwd;
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
}

public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
Expand All @@ -91,7 +93,7 @@ public DorisStreamLoad(SparkSettings settings) throws IOException, DorisExceptio
this.user = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_USER);
this.passwd = settings.getProperty(ConfigurationOptions.DORIS_REQUEST_AUTH_PASSWORD);
this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);

this.maxFilterRatio = settings.getProperty(ConfigurationOptions.DORIS_MAX_FILTER_RATIO);
Expand All @@ -112,7 +114,7 @@ public DorisStreamLoad(SparkSettings settings, String[] dfColumns) throws IOExce


this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(UTF_8));
this.columns = settings.getProperty(ConfigurationOptions.DORIS_WRITE_FIELDS);
this.dfColumns = dfColumns;

Expand Down Expand Up @@ -257,20 +259,21 @@ private LoadResponse loadBatch(String value) {

HttpURLConnection feConn = null;
HttpURLConnection beConn = null;
BufferedReader br = null;
int status = -1;
try {
// build request and send to new be location
beConn = getConnection(loadUrlStr, label);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(value.getBytes("UTF-8"));
bos.write(value.getBytes(UTF_8));
bos.close();

// get respond
status = beConn.getResponseCode();
String respMsg = beConn.getResponseMessage();
InputStream stream = (InputStream) beConn.getContent();
BufferedReader br = new BufferedReader(new InputStreamReader(stream));
br = new BufferedReader(new InputStreamReader(stream, UTF_8));
StringBuilder response = new StringBuilder();
String line;
while ((line = br.readLine()) != null) {
Expand All @@ -290,6 +293,13 @@ private LoadResponse loadBatch(String value) {
if (beConn != null) {
beConn.disconnect();
}
if( br != null) {
try {
br.close();
} catch (IOException e) {
LOG.warn("Exception occurred during closing BufferedReader, {}", e.getMessage());
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ import org.apache.doris.spark.rest.PartitionDefinition
import org.apache.doris.spark.rest.models.Schema
import org.apache.doris.spark.serialization.{Routing, RowBatch}
import org.apache.doris.spark.sql.SchemaUtils
import org.apache.doris.spark.util.ErrorMessages
import org.apache.doris.spark.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE
import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult}
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging

import scala.util.control.Breaks

Expand All @@ -43,8 +42,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param settings request configuration
*/
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
private val logger = Logger.getLogger(classOf[ScalaValueReader])
class ScalaValueReader(partition: PartitionDefinition, settings: Settings) extends Logging {

protected val client = new BackendClient(new Routing(partition.getBeAddress), settings)
protected val clientLock =
Expand All @@ -57,15 +55,15 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
protected lazy val deserializeArrowToRowBatchAsync: Boolean = Try {
settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC, DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT.toString).toBoolean
} getOrElse {
logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC))
logWarning(s"Parse '${DORIS_DESERIALIZE_ARROW_ASYNC}' to boolean failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_ARROW_ASYNC)}'." )
DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT
}

protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = {
val blockingQueueSize = Try {
settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE, DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE))
logWarning(s"Parse '${DORIS_DESERIALIZE_QUEUE_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_DESERIALIZE_QUEUE_SIZE)}'.")
DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT
}

Expand All @@ -89,21 +87,21 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
val batchSize = Try {
settings.getProperty(DORIS_BATCH_SIZE, DORIS_BATCH_SIZE_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, settings.getProperty(DORIS_BATCH_SIZE))
logWarning(s"Parse '${DORIS_BATCH_SIZE}' to number failed. Original string is '${settings.getProperty(DORIS_BATCH_SIZE)}'." )
DORIS_BATCH_SIZE_DEFAULT
}

val queryDorisTimeout = Try {
settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S, DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT.toString).toInt
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S))
logWarning(s"Parse '${DORIS_REQUEST_QUERY_TIMEOUT_S}' to number failed. Original string is '${settings.getProperty(DORIS_REQUEST_QUERY_TIMEOUT_S)}'.")
DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT
}

val execMemLimit = Try {
settings.getProperty(DORIS_EXEC_MEM_LIMIT, DORIS_EXEC_MEM_LIMIT_DEFAULT.toString).toLong
} getOrElse {
logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, settings.getProperty(DORIS_EXEC_MEM_LIMIT))
logWarning(s"Parse '${DORIS_EXEC_MEM_LIMIT}' to number failed. Original string is '${settings.getProperty(DORIS_EXEC_MEM_LIMIT)}'.")
DORIS_EXEC_MEM_LIMIT_DEFAULT
}

Expand All @@ -113,7 +111,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
params.setUser(settings.getProperty(DORIS_REQUEST_AUTH_USER, ""))
params.setPasswd(settings.getProperty(DORIS_REQUEST_AUTH_PASSWORD, ""))

logger.debug(s"Open scan params is, " +
logDebug(s"Open scan params is, " +
s"cluster: ${params.getCluster}, " +
s"database: ${params.getDatabase}, " +
s"table: ${params.getTable}, " +
Expand Down Expand Up @@ -159,7 +157,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
started
}

logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.")
logDebug(s"Open scan result is, contextId: $contextId, schema: $schema.")

/**
* read data and cached in rowBatch.
Expand Down Expand Up @@ -213,7 +211,7 @@ class ScalaValueReader(partition: PartitionDefinition, settings: Settings) {
*/
def next: AnyRef = {
if (!hasNext) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE)
logError(SHOULD_NOT_HAPPEN_MESSAGE)
throw new ShouldNeverHappenException
}
rowBatch.next
Expand Down