Skip to content

Commit

Permalink
Optimize the serialize method of intermedia result of grouped aggrega…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
Wei-hao-Li authored Oct 23, 2024
1 parent 576b6d5 commit 9e48f9c
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.tsfile.utils.BytesUtils.doubleToBytes;
import static org.apache.tsfile.utils.BytesUtils.longToBytes;

public class GroupedAvgAccumulator implements GroupedAccumulator {
private static final long INSTANCE_SIZE =
Expand Down Expand Up @@ -118,16 +116,10 @@ private void deserialize(int groupId, byte[] bytes) {
}

private byte[] serializeState(int groupId) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeLong(countValues.get(groupId));
dataOutputStream.writeDouble(sumValues.get(groupId));
} catch (IOException e) {
throw new UnsupportedOperationException(
"Failed to serialize intermediate result for AvgAccumulator.", e);
}
return byteArrayOutputStream.toByteArray();
byte[] bytes = new byte[Long.BYTES + Double.BYTES];
longToBytes(countValues.get(groupId), bytes, 0);
doubleToBytes(sumValues.get(groupId), bytes, Long.BYTES);
return bytes;
}

private void addIntInput(int[] groupIds, Column column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.tsfile.utils.BytesUtils.boolToBytes;
import static org.apache.tsfile.utils.BytesUtils.doubleToBytes;
import static org.apache.tsfile.utils.BytesUtils.floatToBytes;
import static org.apache.tsfile.utils.BytesUtils.intToBytes;
import static org.apache.tsfile.utils.BytesUtils.longToBytes;

public class GroupedFirstAccumulator implements GroupedAccumulator {
private static final long INSTANCE_SIZE =
Expand Down Expand Up @@ -288,43 +289,54 @@ public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
public void prepareFinal() {}

private byte[] serializeTimeWithValue(int groupId) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeLong(minTimes.get(groupId));
switch (seriesDataType) {
case INT32:
case DATE:
dataOutputStream.writeInt(intValues.get(groupId));
break;
case INT64:
case TIMESTAMP:
dataOutputStream.writeLong(longValues.get(groupId));
break;
case FLOAT:
dataOutputStream.writeFloat(floatValues.get(groupId));
break;
case DOUBLE:
dataOutputStream.writeDouble(doubleValues.get(groupId));
break;
case TEXT:
case BLOB:
case STRING:
byte[] values = binaryValues.get(groupId).getValues();
dataOutputStream.writeInt(values.length);
dataOutputStream.write(values);
break;
case BOOLEAN:
dataOutputStream.writeBoolean(booleanValues.get(groupId));
break;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", seriesDataType));
}
} catch (IOException e) {
throw new UnsupportedOperationException("Failed to serialize intermediate result.", e);
byte[] bytes;
int length = Long.BYTES;
switch (seriesDataType) {
case INT32:
case DATE:
length += Integer.BYTES;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
intToBytes(intValues.get(groupId), bytes, Long.BYTES);
return bytes;
case INT64:
case TIMESTAMP:
length += Long.BYTES;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
longToBytes(longValues.get(groupId), bytes, Long.BYTES);
return bytes;
case FLOAT:
length += Float.BYTES;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
floatToBytes(floatValues.get(groupId), bytes, Long.BYTES);
return bytes;
case DOUBLE:
length += Double.BYTES;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
doubleToBytes(doubleValues.get(groupId), bytes, Long.BYTES);
return bytes;
case TEXT:
case BLOB:
case STRING:
byte[] values = binaryValues.get(groupId).getValues();
length += Integer.BYTES + values.length;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
System.arraycopy(values, 0, bytes, length - values.length, values.length);
return bytes;
case BOOLEAN:
length++;
bytes = new byte[length];
longToBytes(minTimes.get(groupId), bytes, 0);
boolToBytes(booleanValues.get(groupId), bytes, Long.BYTES);
return bytes;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", seriesDataType));
}
return byteArrayOutputStream.toByteArray();
}

private void addIntInput(int[] groupIds, Column valueColumn, Column timeColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.iotdb.db.utils.constant.SqlConstant.LAST_BY_AGGREGATION;
import static org.apache.tsfile.utils.BytesUtils.boolToBytes;
import static org.apache.tsfile.utils.BytesUtils.doubleToBytes;
import static org.apache.tsfile.utils.BytesUtils.floatToBytes;
import static org.apache.tsfile.utils.BytesUtils.intToBytes;
import static org.apache.tsfile.utils.BytesUtils.longToBytes;

public class GroupedFirstByAccumulator implements GroupedAccumulator {

Expand Down Expand Up @@ -198,7 +198,7 @@ public void addInput(int[] groupIds, Column[] arguments) {
return;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in LastBy: %s", yDataType));
String.format("Unsupported data type in LastBy: %s", xDataType));
}
}

Expand Down Expand Up @@ -262,7 +262,7 @@ public void addIntermediate(int[] groupIds, Column argument) {
break;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type in First Aggregation: %s", yDataType));
String.format("Unsupported data type in Aggregation: %s", xDataType));
}
}
}
Expand All @@ -281,49 +281,68 @@ public void evaluateIntermediate(int groupId, ColumnBuilder columnBuilder) {
}

private byte[] serializeTimeWithValue(int groupId) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeLong(yFirstTimes.get(groupId));
dataOutputStream.writeBoolean(xNulls.get(groupId));
if (!xNulls.get(groupId)) {
switch (xDataType) {
case INT32:
case DATE:
dataOutputStream.writeInt(xIntValues.get(groupId));
break;
case INT64:
case TIMESTAMP:
dataOutputStream.writeLong(xLongValues.get(groupId));
break;
case FLOAT:
dataOutputStream.writeFloat(xFloatValues.get(groupId));
break;
case DOUBLE:
dataOutputStream.writeDouble(xDoubleValues.get(groupId));
break;
case TEXT:
case BLOB:
case STRING:
dataOutputStream.writeInt(xBinaryValues.get(groupId).getValues().length);
dataOutputStream.write(xBinaryValues.get(groupId).getValues());
break;
case BOOLEAN:
dataOutputStream.writeBoolean(xBooleanValues.get(groupId));
break;
default:
throw new UnSupportedDataTypeException(
String.format(
"Unsupported data type: %s in aggregation %s", xDataType, LAST_BY_AGGREGATION));
}
boolean xNull = xNulls.get(groupId);
int length = Long.BYTES + 1 + (xNull ? 0 : calculateValueLength(groupId));
byte[] bytes = new byte[length];

longToBytes(yFirstTimes.get(groupId), bytes, 0);
boolToBytes(xNulls.get(groupId), bytes, Long.BYTES);
if (!xNull) {
switch (xDataType) {
case INT32:
case DATE:
intToBytes(xIntValues.get(groupId), bytes, Long.BYTES + 1);
return bytes;
case INT64:
case TIMESTAMP:
longToBytes(xLongValues.get(groupId), bytes, Long.BYTES + 1);
return bytes;
case FLOAT:
floatToBytes(xFloatValues.get(groupId), bytes, Long.BYTES + 1);
return bytes;
case DOUBLE:
doubleToBytes(xDoubleValues.get(groupId), bytes, Long.BYTES + 1);
return bytes;
case TEXT:
case BLOB:
case STRING:
byte[] values = xBinaryValues.get(groupId).getValues();
intToBytes(values.length, bytes, Long.BYTES + 1);
System.arraycopy(values, 0, bytes, length - values.length, values.length);
return bytes;
case BOOLEAN:
boolToBytes(xBooleanValues.get(groupId), bytes, Long.BYTES + 1);
return bytes;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", yDataType));
}
} catch (IOException e) {
throw new UnsupportedOperationException(
String.format(
"Failed to serialize intermediate result for Accumulator %s, errorMsg: %s.",
LAST_BY_AGGREGATION, e.getMessage()));
}
return byteArrayOutputStream.toByteArray();
return bytes;
}

private int calculateValueLength(int groupId) {
switch (xDataType) {
case INT32:
case DATE:
return Integer.BYTES;
case INT64:
case TIMESTAMP:
return Long.BYTES;
case FLOAT:
return Float.BYTES;
case DOUBLE:
return Double.BYTES;
case TEXT:
case BLOB:
case STRING:
return Integer.BYTES + xBinaryValues.get(groupId).getValues().length;
case BOOLEAN:
return 1;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", xDataType));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.write.UnSupportedDataTypeException;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.tsfile.utils.BytesUtils.boolToBytes;
import static org.apache.tsfile.utils.BytesUtils.doubleToBytes;
import static org.apache.tsfile.utils.BytesUtils.floatToBytes;
import static org.apache.tsfile.utils.BytesUtils.intToBytes;
import static org.apache.tsfile.utils.BytesUtils.longToBytes;

public class GroupedLastAccumulator implements GroupedAccumulator {
private static final long INSTANCE_SIZE =
Expand Down Expand Up @@ -288,43 +289,54 @@ public void evaluateFinal(int groupId, ColumnBuilder columnBuilder) {
public void prepareFinal() {}

private byte[] serializeTimeWithValue(int groupId) {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
try {
dataOutputStream.writeLong(maxTimes.get(groupId));
switch (seriesDataType) {
case INT32:
case DATE:
dataOutputStream.writeInt(intValues.get(groupId));
break;
case INT64:
case TIMESTAMP:
dataOutputStream.writeLong(longValues.get(groupId));
break;
case FLOAT:
dataOutputStream.writeFloat(floatValues.get(groupId));
break;
case DOUBLE:
dataOutputStream.writeDouble(doubleValues.get(groupId));
break;
case TEXT:
case BLOB:
case STRING:
byte[] values = binaryValues.get(groupId).getValues();
dataOutputStream.writeInt(values.length);
dataOutputStream.write(values);
break;
case BOOLEAN:
dataOutputStream.writeBoolean(booleanValues.get(groupId));
break;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", seriesDataType));
}
} catch (IOException e) {
throw new UnsupportedOperationException("Failed to serialize intermediate result.", e);
byte[] bytes;
int length = Long.BYTES;
switch (seriesDataType) {
case INT32:
case DATE:
length += Integer.BYTES;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
intToBytes(intValues.get(groupId), bytes, Long.BYTES);
return bytes;
case INT64:
case TIMESTAMP:
length += Long.BYTES;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
longToBytes(longValues.get(groupId), bytes, Long.BYTES);
return bytes;
case FLOAT:
length += Float.BYTES;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
floatToBytes(floatValues.get(groupId), bytes, Long.BYTES);
return bytes;
case DOUBLE:
length += Double.BYTES;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
doubleToBytes(doubleValues.get(groupId), bytes, Long.BYTES);
return bytes;
case TEXT:
case BLOB:
case STRING:
byte[] values = binaryValues.get(groupId).getValues();
length += Integer.BYTES + values.length;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
System.arraycopy(values, 0, bytes, length - values.length, values.length);
return bytes;
case BOOLEAN:
length++;
bytes = new byte[length];
longToBytes(maxTimes.get(groupId), bytes, 0);
boolToBytes(booleanValues.get(groupId), bytes, Long.BYTES);
return bytes;
default:
throw new UnSupportedDataTypeException(
String.format("Unsupported data type: %s", seriesDataType));
}
return byteArrayOutputStream.toByteArray();
}

private void addIntInput(int[] groupIds, Column valueColumn, Column timeColumn) {
Expand Down
Loading

0 comments on commit 9e48f9c

Please sign in to comment.