Skip to content

Commit

Permalink
[optimize][refractor] Optimizing memory usage for writing data (#140)
Browse files Browse the repository at this point in the history
1.Optimize data transmission
2.Optimize partitioned data iteration
  • Loading branch information
gnehil authored Sep 13, 2023
1 parent 11f4976 commit 0daf6c4
Show file tree
Hide file tree
Showing 14 changed files with 637 additions and 319 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.DorisInternalException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.serialization.Routing;
import org.apache.doris.spark.util.ErrorMessages;

import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.util.IOUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.load;

import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
* Wrapper Object for batch loading
*/
public class RecordBatch {

private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;

/**
* Spark row data iterator
*/
private final Iterator<InternalRow> iterator;

/**
* batch size for single load
*/
private final int batchSize;

/**
* stream load format
*/
private final String format;

/**
* column separator, only used when the format is csv
*/
private final String sep;

/**
* line delimiter
*/
private final byte[] delim;

/**
* schema of row
*/
private final StructType schema;

private RecordBatch(Iterator<InternalRow> iterator, int batchSize, String format, String sep, byte[] delim,
StructType schema) {
this.iterator = iterator;
this.batchSize = batchSize;
this.format = format;
this.sep = sep;
this.delim = delim;
this.schema = schema;
}

public Iterator<InternalRow> getIterator() {
return iterator;
}

public int getBatchSize() {
return batchSize;
}

public String getFormat() {
return format;
}

public String getSep() {
return sep;
}

public byte[] getDelim() {
return delim;
}

public StructType getSchema() {
return schema;
}
public static Builder newBuilder(Iterator<InternalRow> iterator) {
return new Builder(iterator);
}

/**
* RecordBatch Builder
*/
public static class Builder {

private final Iterator<InternalRow> iterator;

private int batchSize;

private String format;

private String sep;

private byte[] delim;

private StructType schema;

public Builder(Iterator<InternalRow> iterator) {
this.iterator = iterator;
}

public Builder batchSize(int batchSize) {
this.batchSize = batchSize;
return this;
}

public Builder format(String format) {
this.format = format;
return this;
}

public Builder sep(String sep) {
this.sep = sep;
return this;
}

public Builder delim(String delim) {
this.delim = delim.getBytes(DEFAULT_CHARSET);
return this;
}

public Builder schema(StructType schema) {
this.schema = schema;
return this;
}

public RecordBatch build() {
return new RecordBatch(iterator, batchSize, format, sep, delim, schema);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.load;

import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.util.DataUtil;

import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.spark.sql.catalyst.InternalRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
* InputStream for batch load
*/
public class RecordBatchInputStream extends InputStream {

public static final Logger LOG = LoggerFactory.getLogger(RecordBatchInputStream.class);

private static final int DEFAULT_BUF_SIZE = 4096;

/**
* Load record batch
*/
private final RecordBatch recordBatch;

/**
* first line flag
*/
private boolean isFirst = true;

/**
* record buffer
*/
private ByteBuffer buffer = ByteBuffer.allocate(0);

/**
* record count has been read
*/
private int readCount = 0;

/**
* streaming mode pass through data without process
*/
private final boolean passThrough;

public RecordBatchInputStream(RecordBatch recordBatch, boolean passThrough) {
this.recordBatch = recordBatch;
this.passThrough = passThrough;
}

@Override
public int read() throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
}
} catch (DorisException e) {
throw new IOException(e);
}
return buffer.get() & 0xFF;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
try {
if (buffer.remaining() == 0 && endOfBatch()) {
return -1; // End of stream
}
} catch (DorisException e) {
throw new IOException(e);
}
int bytesRead = Math.min(len, buffer.remaining());
buffer.get(b, off, bytesRead);
return bytesRead;
}

/**
* Check if the current batch read is over.
* If the number of reads is greater than or equal to the batch size or there is no next record, return false,
* otherwise return true.
*
* @return Whether the current batch read is over
* @throws DorisException
*/
public boolean endOfBatch() throws DorisException {
Iterator<InternalRow> iterator = recordBatch.getIterator();
if (readCount >= recordBatch.getBatchSize() || !iterator.hasNext()) {
return true;
}
readNext(iterator);
return false;
}

/**
* read next record into buffer
*
* @param iterator row iterator
* @throws DorisException
*/
private void readNext(Iterator<InternalRow> iterator) throws DorisException {
if (!iterator.hasNext()) {
throw new ShouldNeverHappenException();
}
byte[] delim = recordBatch.getDelim();
byte[] rowBytes = rowToByte(iterator.next());
if (isFirst) {
ensureCapacity(rowBytes.length);
buffer.put(rowBytes);
buffer.flip();
isFirst = false;
} else {
ensureCapacity(delim.length + rowBytes.length);
buffer.put(delim);
buffer.put(rowBytes);
buffer.flip();
}
readCount++;
}

/**
* Check if the buffer has enough capacity.
*
* @param need required buffer space
*/
private void ensureCapacity(int need) {

int capacity = buffer.capacity();

if (need <= capacity) {
buffer.clear();
return;
}

// need to extend
int newCapacity = calculateNewCapacity(capacity, need);
LOG.info("expand buffer, min cap: {}, now cap: {}, new cap: {}", need, capacity, newCapacity);
buffer = ByteBuffer.allocate(newCapacity);

}

/**
* Calculate new capacity for buffer expansion.
*
* @param capacity current buffer capacity
* @param minCapacity required min buffer space
* @return new capacity
*/
private int calculateNewCapacity(int capacity, int minCapacity) {
int newCapacity;
if (capacity == 0) {
newCapacity = DEFAULT_BUF_SIZE;
while (newCapacity < minCapacity) {
newCapacity = newCapacity << 1;
}
} else {
newCapacity = capacity << 1;
}
return newCapacity;
}

/**
* Convert Spark row data to byte array
*
* @param row row data
* @return byte array
* @throws DorisException
*/
private byte[] rowToByte(InternalRow row) throws DorisException {

byte[] bytes;

if (passThrough) {
bytes = row.getString(0).getBytes(StandardCharsets.UTF_8);
return bytes;
}

switch (recordBatch.getFormat().toLowerCase()) {
case "csv":
bytes = DataUtil.rowToCsvBytes(row, recordBatch.getSchema(), recordBatch.getSep());
break;
case "json":
try {
bytes = DataUtil.rowToJsonBytes(row, recordBatch.getSchema());
} catch (JsonProcessingException e) {
throw new DorisException("parse row to json bytes failed", e);
}
break;
default:
throw new IllegalArgumentException("format", recordBatch.getFormat());
}

return bytes;

}


}
Loading

0 comments on commit 0daf6c4

Please sign in to comment.