Skip to content

Commit

Permalink
[Fix] fix package issue in arrow format (#195)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnehil authored Mar 28, 2024
1 parent 6855414 commit 4fa7066
Show file tree
Hide file tree
Showing 7 changed files with 533 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.doris.spark.ArrowSchemaUtils;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.ArrowUtils;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -75,7 +75,7 @@ private RecordBatch(Iterator<InternalRow> iterator, DataFormat format, String se
this.schema = schema;
this.addDoubleQuotes = addDoubleQuotes;
if (format.equals(DataFormat.ARROW)) {
Schema arrowSchema = ArrowSchemaUtils.toArrowSchema(schema, "UTC");
Schema arrowSchema = ArrowUtils.toArrowSchema(schema, "UTC");
this.arrowRoot = VectorSchemaRoot.create(arrowSchema, new RootAllocator(Integer.MAX_VALUE));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,17 @@ private void readNext(Iterator<InternalRow> iterator) throws DorisException {

if (recordBatch.getFormat().equals(DataFormat.ARROW)) {
ArrowWriter arrowWriter = ArrowWriter.create(recordBatch.getVectorSchemaRoot());
while (iterator.hasNext() && readCount < recordBatch.getArrowBatchSize()) {
while (iterator.hasNext() && readCount < recordBatch.getArrowBatchSize()) {
arrowWriter.write(iterator.next());
readCount++;
}
arrowWriter.finish();

ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(
recordBatch.getVectorSchemaRoot(),
new DictionaryProvider.MapDictionaryProvider(),
out);
recordBatch.getVectorSchemaRoot(),
new DictionaryProvider.MapDictionaryProvider(),
out);

try {
writer.writeBatch();
Expand All @@ -174,7 +174,7 @@ private void readNext(Iterator<InternalRow> iterator) throws DorisException {
lineBuf = ByteBuffer.wrap(rowBytes);
isFirst = false;
} else {
delimBuf = ByteBuffer.wrap(delim);
delimBuf = ByteBuffer.wrap(delim);
lineBuf = ByteBuffer.wrap(rowBytes);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ public static List<BackendV2.BackendRowV2> getBeNodes(SparkSettings sparkSetting
*/
@VisibleForTesting
public static List<BackendV2.BackendRowV2> getBackendRows(SparkSettings sparkSettings, Logger logger) throws DorisException {
if (StringUtils.isNoneBlank(sparkSettings.getProperty(sparkSettings.getProperty(DORIS_BENODES)))) {
if (StringUtils.isNoneBlank(sparkSettings.getProperty(DORIS_BENODES))) {
return getBeNodes(sparkSettings, logger);
} else { // If the specified BE does not exist, the FE mode is used
String response = queryAllFrontends(sparkSettings, (frontend, enableHttps) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader

if (autoRedirect) {
val feNodes = settings.getProperty(ConfigurationOptions.DORIS_FENODES)
address = Some(RestService.randomEndpoint(feNodes, LOG))
Some(RestService.randomEndpoint(feNodes, LOG))
} else {
val backends = RestService.getBackendRows(settings, LOG)
val iter = backends.iterator()
Expand All @@ -298,11 +298,11 @@ class StreamLoader(settings: SparkSettings, isStreaming: Boolean) extends Loader
if (backends.isEmpty) throw new StreamLoadException("no backend alive")
Collections.shuffle(backends)
val backend = backends.get(0)
address = Some(backend.getIp + ":" + backend.getHttpPort)
Some(backend.getIp + ":" + backend.getHttpPort)
}

} match {
case Success(_) => // do nothing
case Success(node) => address = node
case Failure(e: ExecutionException) => throw new StreamLoadException("get backends info fail", e)
case Failure(e: IllegalArgumentException) => throw new StreamLoadException("get frontend info fail", e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.doris.spark
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.spark.sql.types.StructType

@deprecated(since = "1.4.0")
object ArrowSchemaUtils {
var classArrowUtils: Option[Class[_]] = None: Option[Class[_]]

Expand Down
Loading

0 comments on commit 4fa7066

Please sign in to comment.