Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Fix map type mapping to doris type error #267

Merged
merged 24 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c7ff9bd
modify the invalid uri
vinlee19 Jul 6, 2023
fdacdbb
Merge branch 'apache:master' into master
vinlee19 Sep 21, 2023
6611669
Merge branch 'apache:master' into master
vinlee19 Oct 7, 2023
d0a0642
update pom.xml
vinlee19 Oct 16, 2023
e084a18
Merge remote-tracking branch 'origin/master'
vinlee19 Oct 16, 2023
469f74f
Merge remote-tracking branch 'upstream/master'
vinlee19 Oct 27, 2023
ab38293
Merge remote-tracking branch 'upstream/master'
vinlee19 Oct 31, 2023
2f6c09f
Merge remote-tracking branch 'origin/master'
vinlee19 Nov 13, 2023
99ed96f
Merge remote-tracking branch 'upstream/master'
vinlee19 Dec 7, 2023
8266d83
add unit test for map
vinlee19 Dec 8, 2023
02e8fd7
add map test
vinlee19 Dec 11, 2023
f5bda17
improve code for MAP conversion
vinlee19 Dec 13, 2023
8ad5c02
Merge remote-tracking branch 'upstream/master' into map_test
vinlee19 Dec 15, 2023
ee05116
add unit test for MAP in DorisRowConverter
vinlee19 Dec 15, 2023
89f8e69
[Fix] fix Load Doris data failed, schema size of fetch data is wrong …
caoliang-web Dec 11, 2023
d989ac8
[fix] improve char length proplem (#262)
JNSimba Dec 12, 2023
300ebaf
add unit test for MAP in DorisRowConverter
vinlee19 Dec 15, 2023
36c8f8a
Merge remote-tracking branch 'origin/map_test' into map_test
vinlee19 Dec 15, 2023
7bf5287
reformt code
vinlee19 Dec 15, 2023
cb16007
Merge branch 'master' into map_test
vinlee19 Dec 15, 2023
6fce59d
reformat code according to checkstyle
vinlee19 Dec 18, 2023
2f4f136
reformat code according to checkstyle
vinlee19 Dec 18, 2023
add11da
add timestamp() test case for MAP
vinlee19 Dec 20, 2023
1d149ab
add timestamp test case for MAP
vinlee19 Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -360,39 +360,50 @@ private static List<Object> convertArrayData(ArrayData array, LogicalType type)

private static Object convertMapData(MapData map, LogicalType type) {
Map<Object, Object> result = new HashMap<>();
LogicalType valueType = ((MapType) type).getValueType();
LogicalType keyType = ((MapType) type).getKeyType();
if (map instanceof GenericMapData) {
GenericMapData gMap = (GenericMapData) map;
for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) {
result.put(key, gMap.get(key));

Object convertedKey = convertMapEntry(key, keyType);
Object convertedValue = convertMapEntry(gMap.get(key), valueType);
result.put(convertedKey, convertedValue);
}
return result;
}
if (map instanceof BinaryMapData) {
} else if (map instanceof BinaryMapData) {
BinaryMapData bMap = (BinaryMapData) map;
LogicalType valueType = ((MapType) type).getValueType();
Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), valueType);
for (Map.Entry<?, ?> entry : javaMap.entrySet()) {
String key = entry.getKey().toString();
if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) {
result.put(key, convertMapData((MapData) entry.getValue(), valueType));
} else if (LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) {
result.put(
key,
Date.valueOf(LocalDate.ofEpochDay((Integer) entry.getValue()))
.toString());
} else if (LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) {
result.put(key, convertArrayData((ArrayData) entry.getValue(), valueType));
} else if (entry.getValue() instanceof TimestampData) {
result.put(key, ((TimestampData) entry.getValue()).toTimestamp().toString());
} else {
result.put(key, entry.getValue().toString());
}
Object convertedKey = convertMapEntry(entry.getKey(), keyType);
Object convertedValue = convertMapEntry(entry.getValue(), valueType);
result.put(convertedKey, convertedValue);
}
return result;
}
throw new UnsupportedOperationException("Unsupported map data: " + map.getClass());
}

/**
* Converts the key-value pair of MAP to the actual type.
*
* @param originValue the original value of key-value pair
* @param logicalType key or value logical type
*/
private static Object convertMapEntry(Object originValue, LogicalType logicalType) {
if (LogicalTypeRoot.MAP.equals(logicalType.getTypeRoot())) {
return convertMapData((MapData) originValue, logicalType);
} else if (LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) {
return Date.valueOf(LocalDate.ofEpochDay((Integer) originValue)).toString();
} else if (LogicalTypeRoot.ARRAY.equals(logicalType.getTypeRoot())) {
return convertArrayData((ArrayData) originValue, logicalType);
} else if (originValue instanceof TimestampData) {
return ((TimestampData) originValue).toTimestamp().toString();
} else {
return originValue.toString();
}
}

private static Object convertRowData(RowData val, int index, LogicalType type) {
RowType rowType = (RowType) type;
Map<String, Object> value = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
Expand All @@ -40,10 +41,11 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DorisRowConverterTest implements Serializable {

@Test
public void testConvert() throws IOException {
ResolvedSchema schema =
Expand All @@ -67,11 +69,11 @@ public void testConvert() throws IOException {

DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());

LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
// Doris DatetimeV2 supports up to 6 decimal places (microseconds).
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
List record =
List<Object> record =
Arrays.asList(
null,
true,
Expand Down Expand Up @@ -104,7 +106,7 @@ public void testConvert() throws IOException {
.build();
String s = new String(serializer.serialize(rowData).getRow());
Assert.assertEquals(
"\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris",
"\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris",
s);
}

Expand All @@ -130,8 +132,9 @@ public void testExternalConvert() {
Column.physical("f16", DataTypes.VARCHAR(256)));
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
// Doris DatetimeV2 supports up to 6 decimal places (microseconds).
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
GenericRowData rowData =
GenericRowData.of(
Expand All @@ -151,12 +154,198 @@ public void testExternalConvert() {
(int) date1.toEpochDay(),
StringData.fromString("a"),
StringData.fromString("doris"));
List row = new ArrayList();
List<Object> row = new ArrayList<>();
for (int i = 0; i < rowData.getArity(); i++) {
row.add(converter.convertExternal(rowData, i));
}
Assert.assertEquals(
"[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]",
"[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]",
row.toString());
}

@Test
public void testMapInternalConvert() throws IOException {

ResolvedSchema schema = getRowMapSchema();
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
// Doris DatetimeV2 supports up to 6 decimal places (microseconds).
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
Map<Boolean, Boolean> booleanMap = createMapAndPut(new HashMap<>(), true, false);
Map<Float, Float> floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f);
Map<Double, Double> doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d);
Map<Integer, Integer> intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24);
Map<Integer, Integer> intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10);
Map<Byte, Byte> tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1);
Map<Short, Short> shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32);
Map<Integer, Integer> intMap = createMapAndPut(new HashMap<>(), 64, 64);
Map<Long, Long> longMap = createMapAndPut(new HashMap<>(), 128L, 128L);
Map<BigDecimal, BigDecimal> decimalMap =
createMapAndPut(
new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123));
Map<LocalDateTime, LocalDateTime> timestampWithZoneMap =
createMapAndPut(new HashMap<>(), time1, time1);
Map<LocalDateTime, LocalDateTime> timestampWithLocalZoneMap =
createMapAndPut(new HashMap<>(), time2, time2);
Map<LocalDateTime, LocalDateTime> timestampNoLTZ =
createMapAndPut(new HashMap<>(), time3, time3);
Map<LocalDate, LocalDate> dateMap = createMapAndPut(new HashMap<>(), date1, date1);
Map<Character, Character> charMap = createMapAndPut(new HashMap<>(), 'a', 'a');
Map<String, String> stringMap = createMapAndPut(new HashMap<>(), "doris", "doris");

List<Object> record =
Arrays.asList(
booleanMap,
floatMap,
doubleMap,
intervalYearMap,
intervalDayMap,
tinyIntMap,
shortIntMap,
intMap,
longMap,
decimalMap,
timestampWithZoneMap,
timestampWithLocalZoneMap,
timestampNoLTZ,
dateMap,
charMap,
stringMap);
GenericRowData rowData = converter.convertInternal(record);

RowDataSerializer serializer =
new Builder()
.setFieldType(schema.getColumnDataTypes().toArray(new DataType[0]))
.setType("csv")
.setFieldDelimiter("|")
.setFieldNames(
new String[] {
"f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10",
"f11", "f12", "f13", "f14", "f15", "f16"
})
.build();
String s = new String(serializer.serialize(rowData).getRow());
Assert.assertEquals(
"{\"true\":\"false\"}|{\"1.2\":\"1.3\"}|{\"1.2345\":\"1.2345\"}|{\"24\":\"24\"}|{\"10\":\"10\"}|{\"1\":\"1\"}|{\"32\":\"32\"}|{\"64\":\"64\"}|{\"128\":\"128\"}|{\"10.12\":\"10.12\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01\":\"2021-01-01\"}|{\"a\":\"a\"}|{\"doris\":\"doris\"}",
s);
}

@Test
public void testMapExternalConvert() {

ResolvedSchema schema = getRowMapSchema();
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
// Doris DatetimeV2 supports up to 6 decimal places (microseconds).
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);

Map<Boolean, Boolean> booleanMap = createMapAndPut(new HashMap<>(), true, false);
Map<Float, Float> floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f);
Map<Double, Double> doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d);
Map<Integer, Integer> intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24);
Map<Integer, Integer> intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10);
Map<Byte, Byte> tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1);
Map<Short, Short> shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32);
Map<Integer, Integer> intMap = createMapAndPut(new HashMap<>(), 64, 64);
Map<Long, Long> longMap = createMapAndPut(new HashMap<>(), 128L, 128L);
Map<BigDecimal, BigDecimal> decimalMap =
createMapAndPut(
new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123));
Map<TimestampData, TimestampData> timestampWithZoneMap =
createMapAndPut(
new HashMap<>(),
TimestampData.fromLocalDateTime(time1),
TimestampData.fromLocalDateTime(time1));
Map<TimestampData, TimestampData> timestampWithLocalZoneMap =
createMapAndPut(
new HashMap<>(),
TimestampData.fromLocalDateTime(time2),
TimestampData.fromLocalDateTime(time2));
Map<TimestampData, TimestampData> timestampNoLTZ =
createMapAndPut(
new HashMap<>(),
TimestampData.fromLocalDateTime(time3),
TimestampData.fromLocalDateTime(time3));
Map<Integer, Integer> dateMap =
createMapAndPut(
new HashMap<>(), (int) date1.toEpochDay(), (int) date1.toEpochDay());
Map<Character, Character> charMap = createMapAndPut(new HashMap<>(), 'a', 'a');
Map<String, String> stringMap = createMapAndPut(new HashMap<>(), "doris", "doris");
GenericRowData rowData =
GenericRowData.of(
new GenericMapData(booleanMap),
new GenericMapData(floatMap),
new GenericMapData(doubleMap),
new GenericMapData(intervalYearMap),
new GenericMapData(intervalDayMap),
new GenericMapData(tinyIntMap),
new GenericMapData(shortIntMap),
new GenericMapData(intMap),
new GenericMapData(longMap),
new GenericMapData(decimalMap),
new GenericMapData(timestampWithZoneMap),
new GenericMapData(timestampWithLocalZoneMap),
new GenericMapData(timestampNoLTZ),
new GenericMapData(dateMap),
new GenericMapData(charMap),
new GenericMapData(stringMap));

List<Object> row = new ArrayList<>();
for (int i = 0; i < rowData.getArity(); i++) {
row.add(converter.convertExternal(rowData, i));
}
Assert.assertEquals(
"[{\"true\":\"false\"}, {\"1.2\":\"1.3\"}, {\"1.2345\":\"1.2345\"}, {\"24\":\"24\"}, {\"10\":\"10\"}, {\"1\":\"1\"}, {\"32\":\"32\"}, {\"64\":\"64\"}, {\"128\":\"128\"}, {\"10.123\":\"10.123\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01\":\"2021-01-01\"}, {\"a\":\"a\"}, {\"doris\":\"doris\"}]",
row.toString());
}

/** generate map data. */
public static <K, V> Map<K, V> createMapAndPut(Map<K, V> map, K key, V value) {
map.put(key, value);
return map;
}

public static ResolvedSchema getRowMapSchema() {
return ResolvedSchema.of(
Column.physical("f1", DataTypes.MAP(DataTypes.BOOLEAN(), DataTypes.BOOLEAN())),
Column.physical("f2", DataTypes.MAP(DataTypes.FLOAT(), DataTypes.FLOAT())),
Column.physical("f3", DataTypes.MAP(DataTypes.DOUBLE(), DataTypes.DOUBLE())),
Column.physical(
"f4",
DataTypes.MAP(
DataTypes.INTERVAL(DataTypes.YEAR()),
DataTypes.INTERVAL(DataTypes.YEAR()))),
Column.physical(
"f5",
DataTypes.MAP(
DataTypes.INTERVAL(DataTypes.DAY()),
DataTypes.INTERVAL(DataTypes.DAY()))),
Column.physical("f6", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.TINYINT())),
Column.physical("f7", DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.SMALLINT())),
Column.physical("f8", DataTypes.MAP(DataTypes.INT(), DataTypes.INT())),
Column.physical("f9", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())),
Column.physical(
"f10", DataTypes.MAP(DataTypes.DECIMAL(10, 2), DataTypes.DECIMAL(10, 2))),
Column.physical(
"f11",
DataTypes.MAP(
DataTypes.TIMESTAMP_WITH_TIME_ZONE(),
DataTypes.TIMESTAMP_WITH_TIME_ZONE())),
Column.physical(
"f12",
DataTypes.MAP(
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
Column.physical("f13", DataTypes.MAP(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP())),
Column.physical("f14", DataTypes.MAP(DataTypes.DATE(), DataTypes.DATE())),
Column.physical("f15", DataTypes.MAP(DataTypes.CHAR(1), DataTypes.CHAR(1))),
Column.physical(
"f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
}
}
Loading