Skip to content

Commit

Permalink
Merge pull request #2 from fluozhiye/V20240117
Browse files Browse the repository at this point in the history
compress data before stream_load
  • Loading branch information
fluozhiye authored Jan 19, 2024
2 parents 248eb7a + c7a2c15 commit 19bc637
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,18 @@ public long load(Iterator<InternalRow> rows, StructType schema)
HttpPut httpPut = getHttpPut(label, loadUrlStr, enable2PC, schema);

if(StringUtils.isNotEmpty(compressType)){
if("gz".equals(compressType.toLowerCase())){
if(dataFormat.equals(DataFormat.CSV)){
RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
String content = recordBatchString.getContent();
byte[] compressedData = compressByGZ(content);
httpPut.setEntity(new ByteArrayEntity(compressedData));
}else{
throw new StreamLoadException("compress data of JSON format is not supported");
}
if("gz".equals(compressType.toLowerCase()) && dataFormat.equals(DataFormat.CSV) ){
RecordBatchString recordBatchString = new RecordBatchString(RecordBatch.newBuilder(rows)
.format(dataFormat)
.sep(FIELD_DELIMITER)
.delim(LINE_DELIMITER)
.schema(schema)
.addDoubleQuotes(addDoubleQuotes).build(), streamingPassthrough);
String content = recordBatchString.getContent();
byte[] compressedData = compressByGZ(content);
httpPut.setEntity(new ByteArrayEntity(compressedData));
}else{
throw new StreamLoadException("not support the compress type: " + compressType);
throw new StreamLoadException("Not support the compress type [" + compressType + "] for the dataformat [" + dataFormat + "]");
}
}else{
RecordBatchInputStream recodeBatchInputStream = new RecordBatchInputStream(RecordBatch.newBuilder(rows)
Expand Down Expand Up @@ -533,7 +529,7 @@ private void handleStreamPassThrough() {
/**
* compress data by gz compression algorithm
*/
private byte[] compressByGZ(String content) throws IOException{
public byte[] compressByGZ(String content) throws IOException{
byte[] compressedData;
try(ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.apache.doris.spark.load;

import org.apache.doris.spark.cfg.SparkSettings;
import org.apache.spark.SparkConf;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class TestDorisStreamLoad {

@Test
public void compressByGZ() throws IOException {
String content = "1,aa,1\n" +
"2,aa,2\n" +
"3,aa,3\n" +
"4,aa,4\n" +
"5,aa,5\n" +
"6,aa,6\n" +
"7,aa,7\n" +
"8,aa,8\n" +
"9,aa,9\n" +
"10,aa,10\n" +
"11,aa,11\n" +
"12,aa,12\n" +
"13,aa,13\n" +
"14,aa,14\n" +
"15,aa,15\n" +
"16,aa,16\n" +
"17,aa,17\n" +
"18,aa,18\n" +
"19,aa,19\n" +
"20,aa,20\n" +
"21,aa,21\n" +
"22,aa,22\n" +
"23,aa,23\n" +
"24,aa,24\n" +
"25,aa,25\n" +
"26,aa,26\n" +
"27,aa,27\n" +
"28,aa,28\n" +
"29,aa,29\n" +
"30,aa,30\n" +
"31,aa,31\n" +
"32,aa,32\n" +
"33,aa,33\n" +
"34,aa,34\n" +
"35,aa,35\n" +
"36,aa,36\n" +
"37,aa,37\n" +
"38,aa,38\n" +
"39,aa,39";
byte[] compressByte = new DorisStreamLoad(new SparkSettings(new SparkConf().set("doris.table.identifier", "aa.bb"))).compressByGZ(content);

int contentByteLength = content.getBytes("utf-8").length;
int compressByteLength = compressByte.length;
System.out.println(contentByteLength);
System.out.println(compressByteLength);
Assert.assertTrue(contentByteLength > compressByteLength);

java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
java.io.ByteArrayInputStream in = new java.io.ByteArrayInputStream(compressByte);
java.util.zip.GZIPInputStream ungzip = new java.util.zip.GZIPInputStream(in);
byte[] buffer = new byte[1024];
int n;
while ((n = ungzip.read(buffer)) >= 0) out.write(buffer, 0, n);
byte[] unGzipByte = out.toByteArray();

String unGzipStr = new String(unGzipByte);
Assert.assertArrayEquals(unGzipStr.getBytes("utf-8"), content.getBytes("utf-8"));
Assert.assertEquals(unGzipStr, content);
}
}

0 comments on commit 19bc637

Please sign in to comment.