Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Convert data types when data sync sink metadata does not match integration test #13208

Merged
merged 29 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7ffb89a
Pipe: Convert data types when data sync sink metadata does not match …
luoluoyuyu Aug 16, 2024
76974fc
Merge branch 'apache:master' into type-conversion-it
luoluoyuyu Aug 19, 2024
346f308
fix type conversion
luoluoyuyu Aug 20, 2024
0336659
add integration tests
luoluoyuyu Aug 20, 2024
3383e15
fix
luoluoyuyu Aug 21, 2024
dddc1df
fix
luoluoyuyu Aug 21, 2024
3dceb45
Add Session interface integration test
luoluoyuyu Aug 22, 2024
874b92a
fix
luoluoyuyu Aug 22, 2024
8a570a0
fix
luoluoyuyu Aug 22, 2024
15cbf97
fix
luoluoyuyu Aug 22, 2024
7814c35
Merge branch 'apache:master' into type-conversion-it
luoluoyuyu Aug 23, 2024
4814ecb
fix code
luoluoyuyu Aug 23, 2024
3f8cae9
Add TSFile receive test
luoluoyuyu Aug 23, 2024
ffa1ed5
fix metadata synchronization Drop Pipe failure
luoluoyuyu Aug 23, 2024
3daed60
Add verification retry
luoluoyuyu Aug 25, 2024
d6b8dbb
fix generate bitMaps
luoluoyuyu Aug 27, 2024
bc8c404
execute flush memTable
luoluoyuyu Aug 27, 2024
9ef3744
fix NPE for validateResultSet
luoluoyuyu Aug 27, 2024
534ac86
modify code format
luoluoyuyu Aug 27, 2024
bd92ccd
Merge branch 'apache:master' into type-conversion-it
luoluoyuyu Aug 27, 2024
de94962
add comment
luoluoyuyu Aug 28, 2024
76d28af
Update pipe-it-2cluster.yml
SteveYurongSu Aug 28, 2024
ffef18a
Update IoTDBPipeTypeConversionISessionIT.java
SteveYurongSu Aug 28, 2024
b418803
test
SteveYurongSu Aug 28, 2024
df7e2da
Update pipe-it-2cluster.yml
SteveYurongSu Aug 28, 2024
e9f9df7
modify the default time zone for type conversion
luoluoyuyu Aug 28, 2024
84fe185
Revert "Update pipe-it-2cluster.yml"
SteveYurongSu Aug 28, 2024
8b42da7
Revert "test"
SteveYurongSu Aug 28, 2024
8ec973b
Revert "Update pipe-it-2cluster.yml"
SteveYurongSu Aug 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ public synchronized void clearRetryEventsReferenceCount() {

@Override
public synchronized void discardEventsOfPipe(final String pipeNameToDrop) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
if (isTabletBatchModeEnabled) {
tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop);
}
retryEventQueue.removeIf(
event -> {
if (event instanceof EnrichedEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.DateUtils;

import java.time.ZoneId;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;

public class ValueConverter {

Expand Down Expand Up @@ -299,6 +303,11 @@ public static Object convert(

private static final Binary BINARY_TRUE = parseString(Boolean.TRUE.toString());
private static final Binary BINARY_FALSE = parseString(Boolean.FALSE.toString());
private static final int TRUE_DATE = DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 2));
private static final int FALSE_DATE =
DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));
private static final int DEFAULT_DATE =
DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1));

public static int convertBooleanToInt32(final boolean value) {
return value ? 1 : 0;
Expand All @@ -325,7 +334,7 @@ public static long convertBooleanToTimestamp(final boolean value) {
}

public static int convertBooleanToDate(final boolean value) {
return value ? 1 : 0;
return value ? TRUE_DATE : FALSE_DATE;
}

public static Binary convertBooleanToBlob(final boolean value) {
Expand Down Expand Up @@ -363,7 +372,12 @@ public static long convertInt32ToTimestamp(final int value) {
}

public static int convertInt32ToDate(final int value) {
return value;
try {
DateUtils.parseIntToLocalDate(value);
return value;
} catch (Exception e) {
return DEFAULT_DATE;
}
}

public static Binary convertInt32ToBlob(final int value) {
Expand Down Expand Up @@ -401,7 +415,13 @@ public static long convertInt64ToTimestamp(final long value) {
}

public static int convertInt64ToDate(final long value) {
return (int) value;
try {
int data = (int) value;
DateUtils.parseIntToLocalDate(data);
return data;
} catch (Exception e) {
return DEFAULT_DATE;
}
}

public static Binary convertInt64ToBlob(final long value) {
Expand Down Expand Up @@ -439,7 +459,13 @@ public static long convertFloatToTimestamp(final float value) {
}

public static int convertFloatToDate(final float value) {
return (int) value;
try {
int data = (int) value;
DateUtils.parseIntToLocalDate(data);
return data;
} catch (Exception e) {
return DEFAULT_DATE;
}
}

public static Binary convertFloatToBlob(final float value) {
Expand Down Expand Up @@ -477,7 +503,13 @@ public static long convertDoubleToTimestamp(final double value) {
}

public static int convertDoubleToDate(final double value) {
return (int) value;
try {
int data = (int) value;
DateUtils.parseIntToLocalDate(data);
return data;
} catch (Exception e) {
return DEFAULT_DATE;
}
}

public static Binary convertDoubleToBlob(final double value) {
Expand Down Expand Up @@ -553,7 +585,12 @@ public static Binary convertTimestampToText(final long value) {
}

public static int convertTimestampToDate(final long value) {
return (int) value;
try {
Instant instant = Instant.ofEpochMilli(value);
return DateUtils.parseDateExpressionToInt(instant.atZone(ZoneOffset.UTC).toLocalDate());
} catch (Exception e) {
return DEFAULT_DATE;
}
}

public static Binary convertTimestampToBlob(final long value) {
Expand All @@ -567,7 +604,7 @@ public static Binary convertTimestampToString(final long value) {
///////////// DATE //////////////

public static boolean convertDateToBoolean(final int value) {
return value != 0;
return value != FALSE_DATE;
}

public static int convertDateToInt32(final int value) {
Expand All @@ -591,7 +628,14 @@ public static Binary convertDateToText(final int value) {
}

public static long convertDateToTimestamp(final int value) {
return value;
try {
LocalDate date = DateUtils.parseIntToLocalDate(value);
ZonedDateTime dateTime = date.atStartOfDay(ZoneOffset.UTC);
Instant instant = dateTime.toInstant();
return instant.toEpochMilli();
} catch (Exception e) {
return 0L;
}
}

public static Binary convertDateToBlob(final int value) {
Expand Down Expand Up @@ -753,23 +797,25 @@ private static long parseTimestamp(final String value) {
try {
return TypeInferenceUtils.isNumber(value)
? Long.parseLong(value)
: DateTimeUtils.parseDateTimeExpressionToLong(
StringUtils.trim(value), ZoneId.systemDefault());
: DateTimeUtils.parseDateTimeExpressionToLong(StringUtils.trim(value), ZoneOffset.UTC);
} catch (final Exception e) {
return 0L;
}
}

private static int parseDate(final String value) {
if (value == null || value.isEmpty()) {
return 0;
return DEFAULT_DATE;
}
try {
return TypeInferenceUtils.isNumber(value)
? Integer.parseInt(value)
: DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
if (TypeInferenceUtils.isNumber(value)) {
int date = Integer.parseInt(value);
DateUtils.parseIntToLocalDate(date);
return date;
}
return DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value));
} catch (final Exception e) {
return 0;
return DEFAULT_DATE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStateme
// Statement
isDebug = insertRowStatement.isDebug();
// InsertBaseStatement
insertRowStatement.removeAllFailedMeasurementMarks();
devicePath = insertRowStatement.getDevicePath();
isAligned = insertRowStatement.isAligned();
measurementSchemas = insertRowStatement.getMeasurementSchemas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public PipeConvertedInsertTabletStatement(final InsertTabletStatement insertTabl
// Statement
isDebug = insertTabletStatement.isDebug();
// InsertBaseStatement
insertTabletStatement.removeAllFailedMeasurementMarks();
devicePath = insertTabletStatement.getDevicePath();
isAligned = insertTabletStatement.isAligned();
measurementSchemas = insertTabletStatement.getMeasurementSchemas();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public void markFailedMeasurement(int index, Exception cause) {
throw new UnsupportedOperationException();
}

/** * Resets the state of all measurements marked as failed, clearing the failure records. */
public void removeAllFailedMeasurementMarks() {
throw new UnsupportedOperationException();
}

public boolean hasValidMeasurements() {
for (Object o : measurements) {
if (o != null) {
Expand Down Expand Up @@ -372,6 +377,18 @@ public FailedMeasurementInfo(
this.value = value;
this.cause = cause;
}

public String getMeasurement() {
return measurement;
}

public TSDataType getDataType() {
return dataType;
}

public Object getValue() {
return value;
}
}

// endregion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,20 @@ public void markFailedMeasurement(int index, Exception cause) {
values[index] = null;
}

@Override
public void removeAllFailedMeasurementMarks() {
if (failedMeasurementIndex2Info == null) {
return;
}
failedMeasurementIndex2Info.forEach(
(index, info) -> {
measurements[index] = info.getMeasurement();
dataTypes[index] = info.getDataType();
values[index] = info.getValue();
});
failedMeasurementIndex2Info.clear();
}

@Override
public void semanticCheck() {
super.semanticCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,20 @@ public void markFailedMeasurement(int index, Exception cause) {
columns[index] = null;
}

@Override
public void removeAllFailedMeasurementMarks() {
if (failedMeasurementIndex2Info == null) {
return;
}
failedMeasurementIndex2Info.forEach(
(index, info) -> {
measurements[index] = info.getMeasurement();
dataTypes[index] = info.getDataType();
columns[index] = info.getValue();
});
failedMeasurementIndex2Info.clear();
}

@Override
public void semanticCheck() {
super.semanticCheck();
Expand Down
Loading