Skip to content

Commit

Permalink
Convert an Arrow integer to an IPv4 address
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 committed Apr 15, 2024
1 parent fc3b2f3 commit 01c2cce
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.common.base.Preconditions;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BaseIntVector;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.DateDayVector;
Expand Down Expand Up @@ -251,9 +252,14 @@ public void convertArrowToRowBatch() throws DorisException {
}
break;
case "IPV4":
Preconditions.checkArgument(mt.equals(Types.MinorType.UINT4),
Preconditions.checkArgument(mt.equals(Types.MinorType.UINT4) || mt.equals(Types.MinorType.INT),
typeMismatchMessage(currentType, mt));
UInt4Vector ipv4Vector = (UInt4Vector) curFieldVector;
BaseIntVector ipv4Vector;
if (mt.equals(Types.MinorType.INT)) {
ipv4Vector = (IntVector) curFieldVector;
} else {
ipv4Vector = (UInt4Vector) curFieldVector;
}
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Object fieldValue = ipv4Vector.isNull(rowIndex) ? null : convertLongToIPv4Address(ipv4Vector.getValueAsLong(rowIndex));
addValueToRow(rowIndex, fieldValue);
Expand Down Expand Up @@ -327,7 +333,7 @@ public void convertArrowToRowBatch() throws DorisException {
case "DATE":
case "DATEV2":
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR)
|| mt.equals(Types.MinorType.DATEDAY), typeMismatchMessage(currentType, mt));
|| mt.equals(Types.MinorType.DATEDAY), typeMismatchMessage(currentType, mt));
if (mt.equals(Types.MinorType.VARCHAR)) {
VarCharVector date = (VarCharVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ public void testDate() throws DorisException, IOException {
root.setRowCount(1);

FieldVector vector = root.getVector("k1");
VarCharVector dateVector = (VarCharVector)vector;
VarCharVector dateVector = (VarCharVector) vector;
dateVector.setInitialCapacity(1);
dateVector.allocateNew();
dateVector.setIndexDefined(0);
Expand All @@ -491,7 +491,7 @@ public void testDate() throws DorisException, IOException {


vector = root.getVector("k2");
VarCharVector dateV2Vector = (VarCharVector)vector;
VarCharVector dateV2Vector = (VarCharVector) vector;
dateV2Vector.setInitialCapacity(1);
dateV2Vector.allocateNew();
dateV2Vector.setIndexDefined(0);
Expand All @@ -500,7 +500,7 @@ public void testDate() throws DorisException, IOException {
vector.setValueCount(1);

vector = root.getVector("k3");
DateDayVector dateNewVector = (DateDayVector)vector;
DateDayVector dateNewVector = (DateDayVector) vector;
dateNewVector.setInitialCapacity(1);
dateNewVector.allocateNew();
dateNewVector.setIndexDefined(0);
Expand Down Expand Up @@ -563,7 +563,7 @@ public void testLargeInt() throws DorisException, IOException {
root.setRowCount(1);

FieldVector vector = root.getVector("k1");
VarCharVector lageIntVector = (VarCharVector)vector;
VarCharVector lageIntVector = (VarCharVector) vector;
lageIntVector.setInitialCapacity(1);
lageIntVector.allocateNew();
lageIntVector.setIndexDefined(0);
Expand All @@ -573,7 +573,7 @@ public void testLargeInt() throws DorisException, IOException {


vector = root.getVector("k2");
FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector)vector;
FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector;
lageIntVector1.setInitialCapacity(1);
lageIntVector1.allocateNew();
lageIntVector1.setIndexDefined(0);
Expand Down Expand Up @@ -777,7 +777,7 @@ public void testDateTime() throws IOException, DorisException {
root.setRowCount(3);

FieldVector vector = root.getVector("k1");
VarCharVector datetimeVector = (VarCharVector)vector;
VarCharVector datetimeVector = (VarCharVector) vector;
datetimeVector.setInitialCapacity(3);
datetimeVector.allocateNew();
datetimeVector.setIndexDefined(0);
Expand Down Expand Up @@ -869,7 +869,7 @@ public void testVariant() throws DorisException, IOException {
root.setRowCount(3);

FieldVector vector = root.getVector("k1");
VarCharVector datetimeVector = (VarCharVector)vector;
VarCharVector datetimeVector = (VarCharVector) vector;
datetimeVector.setInitialCapacity(3);
datetimeVector.allocateNew();
datetimeVector.setIndexDefined(0);
Expand Down Expand Up @@ -926,7 +926,8 @@ public void testIPv4() throws DorisException, IOException {

ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
childrenBuilder.add(
new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null));
new Field("k1", FieldType.nullable(new ArrowType.Int(32, false)), null),
new Field("k2", FieldType.nullable(new ArrowType.Int(32, true)), null));

VectorSchemaRoot root =
VectorSchemaRoot.create(
Expand All @@ -944,7 +945,7 @@ public void testIPv4() throws DorisException, IOException {
FieldVector vector = root.getVector("k1");
UInt4Vector uInt4Vector = (UInt4Vector) vector;
uInt4Vector.setInitialCapacity(5);
uInt4Vector.allocateNew(4);
uInt4Vector.allocateNew();
uInt4Vector.setIndexDefined(0);
uInt4Vector.setSafe(0, 0);
uInt4Vector.setIndexDefined(1);
Expand All @@ -955,7 +956,24 @@ public void testIPv4() throws DorisException, IOException {
uInt4Vector.setSafe(3, 16777215);
uInt4Vector.setIndexDefined(4);
uInt4Vector.setWithPossibleTruncate(4, 4294967295L);

FieldVector vector1 = root.getVector("k2");
IntVector int4Vector = (IntVector) vector1;
int4Vector.setInitialCapacity(5);
int4Vector.allocateNew();
int4Vector.setIndexDefined(0);
int4Vector.setSafe(0, 0);
int4Vector.setIndexDefined(1);
int4Vector.setSafe(1, 255);
int4Vector.setIndexDefined(2);
int4Vector.setSafe(2, 65535);
int4Vector.setIndexDefined(3);
int4Vector.setSafe(3, 16777215);
int4Vector.setIndexDefined(4);
int4Vector.setWithPossibleTruncate(4, 4294967295L);

vector.setValueCount(5);
vector1.setValueCount(5);
arrowStreamWriter.writeBatch();

arrowStreamWriter.end();
Expand All @@ -970,7 +988,8 @@ public void testIPv4() throws DorisException, IOException {

String schemaStr =
"{\"properties\":["
+ "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}"
+ "{\"type\":\"IPV4\",\"name\":\"k1\",\"comment\":\"\"}, "
+ "{\"type\":\"IPV4\",\"name\":\"k2\",\"comment\":\"\"}"
+ "], \"status\":200}";

Schema schema = RestService.parseSchema(schemaStr, logger);
Expand All @@ -979,16 +998,21 @@ public void testIPv4() throws DorisException, IOException {
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
assertEquals("0.0.0.0", actualRow0.get(0));
assertEquals("0.0.0.0", actualRow0.get(1));
List<Object> actualRow1 = rowBatch.next();
assertEquals("0.0.0.255", actualRow1.get(0));
assertEquals("0.0.0.255", actualRow1.get(1));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow2 = rowBatch.next();
assertEquals("0.0.255.255", actualRow2.get(0));
assertEquals("0.0.255.255", actualRow2.get(1));
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow3 = rowBatch.next();
assertEquals("0.255.255.255", actualRow3.get(0));
assertEquals("0.255.255.255", actualRow3.get(1));
List<Object> actualRow4 = rowBatch.next();
assertEquals("255.255.255.255", actualRow4.get(0));
assertEquals("255.255.255.255", actualRow4.get(1));
Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
thrown.expectMessage(startsWith("Get row offset:"));
Expand Down

0 comments on commit 01c2cce

Please sign in to comment.