diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java new file mode 100644 index 000000000000..6093d048c5c1 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionISessionIT.java @@ -0,0 +1,807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.manual; + +import org.apache.iotdb.commons.utils.function.CheckedTriConsumer; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.StatementExecutionException; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.read.common.Field; +import org.apache.tsfile.read.common.RowRecord; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2ManualCreateSchema.class}) +public class IoTDBPipeTypeConversionISessionIT extends AbstractPipeDualManualIT { + private static final int generateDataSize = 100; + + @Test + public void insertTablet() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + senderSession.insertTablet(tablet); + }, + false); + } + + @Test + public void insertTabletReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + senderSession.insertTablet(tablet); + }, + true); + } + + @Test + public void insertAlignedTablet() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + senderSession.insertAlignedTablet(tablet); + }, + false); + } + + @Test + public void insertAlignedTabletReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + senderSession.insertAlignedTablet(tablet); + }, + true); + } + + @Test + public void insertRecordsReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + senderSession.insertRecords( + getDeviceID(tablet), timestamps, pair.left, pair.right, values); + }, + true); + } + + @Test + public void insertRecord() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + for (int i = 0; i < values.size(); i++) { + senderSession.insertRecord( + tablet.getDeviceId(), + timestamps.get(i), + pair.left.get(i), + pair.right.get(i), + values.get(i).toArray()); + } + }, + false); + } + + @Test + public void insertRecordReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + for (int i = 0; i < values.size(); i++) { + senderSession.insertRecord( + tablet.getDeviceId(), + timestamps.get(i), + pair.left.get(i), + pair.right.get(i), + values.get(i).toArray()); + } + }, + true); + } + + @Test + public void insertAlignedRecord() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + for (int i = 0; i < values.size(); i++) { + senderSession.insertAlignedRecord( + tablet.getDeviceId(), + timestamps.get(i), + pair.left.get(i), + pair.right.get(i), + values.get(i)); + } + }, + false); + } + + @Test + public void insertAlignedRecordReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + for (int i = 0; i < values.size(); i++) { + senderSession.insertAlignedRecord( + tablet.getDeviceId(), + timestamps.get(i), + pair.left.get(i), + pair.right.get(i), + values.get(i)); + } + }, + true); + } + + @Test + public void insertRecords() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + senderSession.insertRecords( + getDeviceID(tablet), timestamps, pair.left, pair.right, values); + }, + false); + } + + @Test + public void insertAlignedRecords() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + senderSession.insertAlignedRecords( + getDeviceID(tablet), timestamps, pair.left, pair.right, values); + }, + false); + } + + @Test + public void insertAlignedRecordsReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertRecordForTable(tablet); + senderSession.insertAlignedRecords( + getDeviceID(tablet), timestamps, pair.left, pair.right, values); + }, + true); + } + + @Test + public void insertStringRecordsOfOneDevice() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertStrRecordForTable(tablet); + senderSession.insertStringRecordsOfOneDevice( + tablet.getDeviceId(), timestamps, pair.left, values); + }, + false); + } + + @Test + public void insertStringRecordsOfOneDeviceReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertStrRecordForTable(tablet); + senderSession.insertStringRecordsOfOneDevice( + tablet.getDeviceId(), timestamps, pair.left, values); + }, + true); + } + + @Test + public void insertAlignedStringRecordsOfOneDevice() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertStrRecordForTable(tablet); + senderSession.insertAlignedStringRecordsOfOneDevice( + tablet.getDeviceId(), timestamps, pair.left, values); + }, + false); + } + + @Test + public void insertAlignedStringRecordsOfOneDeviceReceiveByTsFile() { + prepareTypeConversionTest( + (ISession senderSession, ISession receiverSession, Tablet tablet) -> { + List timestamps = getTimestampList(tablet); + Pair>, List>> pair = + getMeasurementSchemasAndType(tablet); + List> values = generateTabletInsertStrRecordForTable(tablet); + senderSession.insertAlignedStringRecordsOfOneDevice( + tablet.getDeviceId(), timestamps, pair.left, values); + }, + true); + } + + private SessionDataSet query( + ISession session, List measurementSchemas, String deviceId) + throws IoTDBConnectionException, StatementExecutionException { + String sql = "select "; + StringBuffer param = new StringBuffer(); + for (IMeasurementSchema schema : measurementSchemas) { + param.append(schema.getMeasurementId()); + param.append(','); + } + sql = sql + param.substring(0, param.length() - 1); + sql = sql + " from " + deviceId + " ORDER BY time ASC"; + return session.executeQueryStatement(sql); + } + + private void prepareTypeConversionTest( + CheckedTriConsumer consumer, boolean isTsFile) { + List> measurementSchemas = + generateMeasurementSchemas(); + + // Generate createTimeSeries in sender and receiver + String uuid = RandomStringUtils.random(8, true, false); + for (Pair pair : measurementSchemas) { + createTimeSeries( + uuid.toString(), pair.left.getMeasurementId(), pair.left.getType().name(), senderEnv); + createTimeSeries( + uuid.toString(), pair.right.getMeasurementId(), pair.right.getType().name(), receiverEnv); + } + + try (ISession senderSession = senderEnv.getSessionConnection(); + ISession receiverSession = receiverEnv.getSessionConnection()) { + Tablet tablet = generateTabletAndMeasurementSchema(measurementSchemas, "root.test." + uuid); + if (isTsFile) { + // Send TsFile data to receiver + consumer.accept(senderSession, receiverSession, tablet); + Thread.sleep(2000); + createDataPipe(uuid, true); + senderSession.executeNonQueryStatement("flush"); + } else { + // Send Tablet data to receiver + createDataPipe(uuid, false); + Thread.sleep(2000); + // The actual implementation logic of inserting data + consumer.accept(senderSession, receiverSession, tablet); + senderSession.executeNonQueryStatement("flush"); + } + + // Verify receiver data + long timeoutSeconds = 600; + List> expectedValues = + generateTabletResultSetForTable(tablet, measurementSchemas); + await() + .pollInSameThread() + .pollDelay(1L, TimeUnit.SECONDS) + .pollInterval(1L, TimeUnit.SECONDS) + .atMost(timeoutSeconds, TimeUnit.SECONDS) + .untilAsserted( + () -> { + try { + validateResultSet( + query(receiverSession, tablet.getSchemas(), tablet.getDeviceId()), + expectedValues, + tablet.timestamps); + } catch (Exception e) { + fail(); + } + }); + senderSession.close(); + receiverSession.close(); + tablet.reset(); + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + private void createTimeSeries(String diff, String measurementID, String dataType, BaseEnv env) { + String timeSeriesCreation = + String.format( + "create timeseries root.test.%s.%s with datatype=%s,encoding=PLAIN", + diff, measurementID, dataType); + TestUtils.tryExecuteNonQueriesWithRetry(env, Collections.singletonList(timeSeriesCreation)); + } + + private void createDataPipe(String diff, boolean isTSFile) { + String sql = + String.format( + "create pipe test%s" + + " with source ('source'='iotdb-source','source.path'='root.test.**','realtime.mode'='%s','realtime.enable'='%s','history.enable'='%s')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')", + diff, + isTSFile ? "file" : "forced-log", + !isTSFile, + isTSFile, + receiverEnv.getIP(), + receiverEnv.getPort(), + isTSFile ? "tsfile" : "tablet"); + TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList(sql)); + } + + private void validateResultSet( + SessionDataSet dataSet, List> values, long[] timestamps) + throws IoTDBConnectionException, StatementExecutionException { + int index = 0; + while (dataSet.hasNext()) { + RowRecord record = dataSet.next(); + List fields = record.getFields(); + + assertEquals(record.getTimestamp(), timestamps[index]); + List rowValues = values.get(index++); + for (int i = 0; i < fields.size(); i++) { + Field field = fields.get(i); + switch (field.getDataType()) { + case INT64: + case TIMESTAMP: + assertEquals(field.getLongV(), (long) rowValues.get(i)); + break; + case DATE: + assertEquals(field.getDateV(), rowValues.get(i)); + break; + case BLOB: + assertEquals(field.getBinaryV(), rowValues.get(i)); + break; + case TEXT: + case STRING: + assertEquals(field.getStringValue(), rowValues.get(i)); + break; + case INT32: + assertEquals(field.getIntV(), (int) rowValues.get(i)); + break; + case DOUBLE: + assertEquals(0, Double.compare(field.getDoubleV(), (double) rowValues.get(i))); + break; + case FLOAT: + assertEquals(0, Float.compare(field.getFloatV(), (float) rowValues.get(i))); + break; + } + } + } + assertEquals(values.size(), index); + } + + private boolean[] createTestDataForBoolean() { + boolean[] data = new boolean[generateDataSize]; + Random random = new Random(); + for (int i = 0; i < data.length; i++) { + data[i] = random.nextBoolean(); + } + return data; + } + + private int[] createTestDataForInt32() { + int[] data = new int[generateDataSize]; + Random random = new Random(); + for (int i = 0; i < data.length; i++) { + data[i] = random.nextInt(); + } + return data; + } + + private long[] createTestDataForInt64() { + long[] data = new long[generateDataSize]; + Random random = new Random(); + for (int i = 0; i < data.length; i++) { + data[i] = random.nextLong(); + } + return data; + } + + private float[] createTestDataForFloat() { + float[] data = new float[generateDataSize]; + Random random = new Random(); + for (int i = 0; i < data.length; i++) { + data[i] = random.nextFloat(); + } + return data; + } + + private double[] createTestDataForDouble() { + double[] data = new double[generateDataSize]; + Random random = new Random(); + for (int i = 0; i < data.length; i++) { + data[i] = random.nextDouble(); + } + return data; + } + + private long[] createTestDataForTimestamp() { + long[] data = new long[generateDataSize]; + long time = new Date().getTime(); + for (int i = 0; i < data.length; i++) { + data[i] = time + i; + } + return data; + } + + private LocalDate[] createTestDataForDate() { + LocalDate[] data = new LocalDate[generateDataSize]; + int year = 2023; + int month = 1; + int day = 1; + for (int i = 0; i < data.length; i++) { + data[i] = DateUtils.parseIntToLocalDate(year * 10000 + (month * 100) + day); + // update + day++; + if (day > 28) { + day = 1; + month++; + if (month > 12) { + month = 1; + year++; + } + } + } + return data; + } + + private Binary[] createTestDataForString() { + String[] stringData = { + "Hello", + "Hello World!", + "This is a test.", + "IoTDB Hello World!!!!", + "IoTDB is an excellent time series database!!!!!!!!!", + "12345678910!!!!!!!!", + "123456", + "1234567.123213", + "21232131.21", + "enable = true", + "true", + "false", + "12345678910", + "123231232132131233213123123123123123131312", + "123231232132131233213123123123123123131312.212312321312312", + }; + Binary[] data = new Binary[generateDataSize]; + for (int i = 0; i < data.length; i++) { + data[i] = + new Binary(stringData[(i % stringData.length)].getBytes(TSFileConfig.STRING_CHARSET)); + } + return data; + } + + private List getTimestampList(Tablet tablet) { + long[] timestamps = tablet.timestamps; + List data = new ArrayList<>(timestamps.length); + for (long timestamp : timestamps) { + data.add(timestamp); + } + return data; + } + + private Pair>, List>> getMeasurementSchemasAndType( + Tablet tablet) { + List> schemaData = new ArrayList<>(tablet.rowSize); + List> typeData = new ArrayList<>(tablet.rowSize); + List measurementSchemas = new ArrayList<>(tablet.getSchemas().size()); + List types = new ArrayList<>(tablet.rowSize); + for (IMeasurementSchema measurementSchema : tablet.getSchemas()) { + measurementSchemas.add(measurementSchema.getMeasurementId()); + types.add(measurementSchema.getType()); + } + + for (int i = 0; i < tablet.rowSize; i++) { + schemaData.add(measurementSchemas); + typeData.add(types); + } + + return new Pair<>(schemaData, typeData); + } + + private List getDeviceID(Tablet tablet) { + List data = new ArrayList<>(tablet.rowSize); + for (int i = 0; i < tablet.rowSize; i++) { + data.add(tablet.getDeviceId()); + } + return data; + } + + private List> generateTabletResultSetForTable( + final Tablet tablet, List> pairs) { + List> insertRecords = new ArrayList<>(tablet.rowSize); + final List schemas = tablet.getSchemas(); + final Object[] values = tablet.values; + for (int i = 0; i < tablet.rowSize; i++) { + List insertRecord = new ArrayList<>(); + for (int j = 0; j < schemas.size(); j++) { + TSDataType sourceType = pairs.get(j).left.getType(); + TSDataType targetType = pairs.get(j).right.getType(); + Object value = null; + switch (sourceType) { + case INT64: + case TIMESTAMP: + value = ValueConverter.convert(sourceType, targetType, ((long[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case INT32: + value = ValueConverter.convert(sourceType, targetType, ((int[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case DOUBLE: + value = ValueConverter.convert(sourceType, targetType, ((double[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case FLOAT: + value = ValueConverter.convert(sourceType, targetType, ((float[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case DATE: + value = + ValueConverter.convert( + sourceType, + targetType, + DateUtils.parseDateExpressionToInt(((LocalDate[]) values[j])[i])); + insertRecord.add(convert(value, targetType)); + break; + case TEXT: + case STRING: + value = ValueConverter.convert(sourceType, targetType, ((Binary[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case BLOB: + value = ValueConverter.convert(sourceType, targetType, ((Binary[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + case BOOLEAN: + value = ValueConverter.convert(sourceType, targetType, ((boolean[]) values[j])[i]); + insertRecord.add(convert(value, targetType)); + break; + } + } + insertRecords.add(insertRecord); + } + + return insertRecords; + } + + private Object convert(Object value, TSDataType targetType) { + switch (targetType) { + case DATE: + return DateUtils.parseIntToLocalDate((Integer) value); + case TEXT: + case STRING: + return new String(((Binary) value).getValues(), TSFileConfig.STRING_CHARSET); + } + return value; + } + + private List> generateTabletInsertRecordForTable(final Tablet tablet) { + List> insertRecords = new ArrayList<>(tablet.rowSize); + final List schemas = tablet.getSchemas(); + final Object[] values = tablet.values; + for (int i = 0; i < tablet.rowSize; i++) { + List insertRecord = new ArrayList<>(); + for (int j = 0; j < schemas.size(); j++) { + switch (schemas.get(j).getType()) { + case INT64: + case TIMESTAMP: + insertRecord.add(((long[]) values[j])[i]); + break; + case INT32: + insertRecord.add(((int[]) values[j])[i]); + break; + case DOUBLE: + insertRecord.add(((double[]) values[j])[i]); + break; + case FLOAT: + insertRecord.add(((float[]) values[j])[i]); + break; + case DATE: + insertRecord.add(((LocalDate[]) values[j])[i]); + break; + case TEXT: + case STRING: + insertRecord.add( + new String(((Binary[]) values[j])[i].getValues(), TSFileConfig.STRING_CHARSET)); + break; + case BLOB: + insertRecord.add(((Binary[]) values[j])[i]); + break; + case BOOLEAN: + insertRecord.add(((boolean[]) values[j])[i]); + break; + } + } + insertRecords.add(insertRecord); + } + + return insertRecords; + } + + private List> generateTabletInsertStrRecordForTable(Tablet tablet) { + List> insertRecords = new ArrayList<>(tablet.rowSize); + final List schemas = tablet.getSchemas(); + final Object[] values = tablet.values; + for (int i = 0; i < tablet.rowSize; i++) { + List insertRecord = new ArrayList<>(); + for (int j = 0; j < schemas.size(); j++) { + switch (schemas.get(j).getType()) { + case INT64: + insertRecord.add(String.valueOf(((long[]) values[j])[i])); + break; + case TIMESTAMP: + insertRecord.add( + RpcUtils.formatDatetime("default", "ms", ((long[]) values[j])[i], ZoneOffset.UTC)); + break; + case INT32: + insertRecord.add(String.valueOf(((int[]) values[j])[i])); + break; + case DOUBLE: + insertRecord.add(String.valueOf(((double[]) values[j])[i])); + break; + case FLOAT: + insertRecord.add(String.valueOf(((float[]) values[j])[i])); + break; + case DATE: + insertRecord.add(((LocalDate[]) values[j])[i].toString()); + break; + case TEXT: + case STRING: + insertRecord.add( + new String(((Binary[]) values[j])[i].getValues(), TSFileConfig.STRING_CHARSET)); + break; + case BLOB: + String value = + BytesUtils.parseBlobByteArrayToString(((Binary[]) values[j])[i].getValues()) + .substring(2); + insertRecord.add(String.format("X'%s'", value)); + break; + case BOOLEAN: + insertRecord.add(String.valueOf(((boolean[]) values[j])[i])); + break; + } + } + insertRecords.add(insertRecord); + } + + return insertRecords; + } + + private Tablet generateTabletAndMeasurementSchema( + List> pairs, String deviceId) { + long[] timestamp = createTestDataForTimestamp(); + Object[] objects = new Object[pairs.size()]; + List measurementSchemas = new ArrayList<>(pairs.size()); + BitMap[] bitMaps = new BitMap[pairs.size()]; + for (int i = 0; i < bitMaps.length; i++) { + bitMaps[i] = new BitMap(generateDataSize); + } + List columnTypes = new ArrayList<>(pairs.size()); + for (int i = 0; i < objects.length; i++) { + MeasurementSchema schema = pairs.get(i).left; + measurementSchemas.add(schema); + columnTypes.add(Tablet.ColumnType.MEASUREMENT); + switch (schema.getType()) { + case INT64: + objects[i] = createTestDataForInt64(); + break; + case INT32: + objects[i] = createTestDataForInt32(); + break; + case TIMESTAMP: + objects[i] = createTestDataForTimestamp(); + break; + case DOUBLE: + objects[i] = createTestDataForDouble(); + break; + case FLOAT: + objects[i] = createTestDataForFloat(); + break; + case DATE: + objects[i] = createTestDataForDate(); + break; + case STRING: + case BLOB: + case TEXT: + objects[i] = createTestDataForString(); + break; + case BOOLEAN: + objects[i] = createTestDataForBoolean(); + break; + } + } + return new Tablet( + deviceId, measurementSchemas, columnTypes, timestamp, objects, bitMaps, generateDataSize); + } + + private List> generateMeasurementSchemas() { + TSDataType[] dataTypes = { + TSDataType.STRING, + TSDataType.TEXT, + TSDataType.BLOB, + TSDataType.TIMESTAMP, + TSDataType.BOOLEAN, + TSDataType.DATE, + TSDataType.DOUBLE, + TSDataType.FLOAT, + TSDataType.INT32, + TSDataType.INT64 + }; + List> pairs = new ArrayList<>(); + + for (TSDataType type : dataTypes) { + for (TSDataType dataType : dataTypes) { + String id = String.format("%s2%s", type.name(), dataType.name()); + pairs.add(new Pair<>(new MeasurementSchema(id, type), new MeasurementSchema(id, dataType))); + } + } + return pairs; + } +} diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java new file mode 100644 index 000000000000..e22a49ed279d --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeTypeConversionIT.java @@ -0,0 +1,616 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.manual; + +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.db.pipe.receiver.transform.converter.ValueConverter; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.rpc.RpcUtils; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.DateUtils; +import org.apache.tsfile.utils.Pair; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.nio.charset.StandardCharsets; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2ManualCreateSchema.class}) +public class IoTDBPipeTypeConversionIT extends AbstractPipeDualManualIT { + + private static final int generateDataSize = 100; + + // Test for converting BOOLEAN to OtherType + @Test + public void testBooleanToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.BOOLEAN, TSDataType.DATE); + } + + // Test for converting INT32 to OtherType + @Test + public void testInt32ToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.INT32, TSDataType.DATE); + } + + // Test for converting INT64 to OtherType + @Test + public void testInt64ToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.INT64, TSDataType.DATE); + } + + // Test for converting FLOAT to OtherType + @Test + public void testFloatToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.FLOAT, TSDataType.DATE); + } + + // Test for converting DOUBLE to OtherType + @Test + public void testDoubleToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.DOUBLE, TSDataType.DATE); + } + + // Test for converting TEXT to OtherType + @Test + public void testTextToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.TEXT, TSDataType.DATE); + } + + // Test for converting TIMESTAMP to OtherType + @Test + public void testTimestampToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.TIMESTAMP, TSDataType.DATE); + } + + // Test for converting DATE to OtherType + @Test + public void testDateToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.DATE, TSDataType.TIMESTAMP); + } + + // Test for converting BLOB to OtherType + @Test + public void testBlobToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.STRING); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.TIMESTAMP); + executeAndVerifyTypeConversion(TSDataType.BLOB, TSDataType.DATE); + } + + // Test for converting STRING to OtherType + @Test + public void testStringToOtherTypeConversion() { + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TEXT); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BLOB); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.BOOLEAN); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT32); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.INT64); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.FLOAT); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.DOUBLE); + executeAndVerifyTypeConversion(TSDataType.STRING, TSDataType.TIMESTAMP); + } + + private void executeAndVerifyTypeConversion(TSDataType source, TSDataType target) { + List pairs = prepareTypeConversionTest(source, target); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + String.format("select * from root.%s2%s.**", source.name(), target.name()), + String.format("Time,root.%s2%s.test.status,", source.name(), target.name()), + createExpectedResultSet(pairs, source, target), + 30); + } + + private List prepareTypeConversionTest(TSDataType sourceType, TSDataType targetType) { + String sourceTypeName = sourceType.name(); + String targetTypeName = targetType.name(); + + createTimeSeries(sourceTypeName, targetTypeName, sourceTypeName, senderEnv); + createTimeSeries(sourceTypeName, targetTypeName, targetTypeName, receiverEnv); + + createDataPipe(sourceTypeName, targetTypeName); + + List pairs = createTestDataForType(sourceTypeName); + + // wait pipe start + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + executeDataInsertions(pairs, sourceType, targetType); + return pairs; + } + + private void createTimeSeries( + String sourceTypeName, String targetTypeName, String dataType, BaseEnv env) { + String timeSeriesCreationQuery = + String.format( + "create timeseries root.%s2%s.test.status with datatype=%s,encoding=PLAIN", + sourceTypeName, targetTypeName, dataType); + TestUtils.tryExecuteNonQueriesWithRetry( + env, Collections.singletonList(timeSeriesCreationQuery)); + } + + private void createDataPipe(String sourceTypeName, String targetTypeName) { + String sql = + String.format( + "create pipe %s2%s" + + " with source ('source'='iotdb-source','source.path'='root.%s2%s.**','realtime.mode'='forced-log','realtime.enable'='true','history.enable'='false')" + + " with processor ('processor'='do-nothing-processor')" + + " with sink ('node-urls'='%s:%s','batch.enable'='false','sink.format'='tablet')", + sourceTypeName, + targetTypeName, + sourceTypeName, + targetTypeName, + receiverEnv.getIP(), + receiverEnv.getPort()); + TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, Collections.singletonList(sql)); + } + + private List createTestDataForType(String sourceType) { + switch (sourceType) { + case "BOOLEAN": + return createTestDataForBoolean(); + case "INT32": + return createTestDataForInt32(); + case "INT64": + return createTestDataForInt64(); + case "FLOAT": + return createTestDataForFloat(); + case "DOUBLE": + return createTestDataForDouble(); + case "TEXT": + return createTestDataForText(); + case "TIMESTAMP": + return createTestDataForTimestamp(); + case "DATE": + return createTestDataForDate(); + case "BLOB": + return createTestDataForBlob(); + case "STRING": + return createTestDataForString(); + default: + throw new UnsupportedOperationException("Unsupported data type: " + sourceType); + } + } + + private void executeDataInsertions( + List testData, TSDataType sourceType, TSDataType targetType) { + switch (sourceType) { + case STRING: + case TEXT: + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + createInsertStatementsForString(testData, sourceType.name(), targetType.name())); + return; + case TIMESTAMP: + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + createInsertStatementsForTimestamp(testData, sourceType.name(), targetType.name())); + return; + case DATE: + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + createInsertStatementsForLocalDate(testData, sourceType.name(), targetType.name())); + return; + case BLOB: + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + createInsertStatementsForBlob(testData, sourceType.name(), targetType.name())); + return; + default: + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + createInsertStatementsForNumeric(testData, sourceType.name(), targetType.name())); + } + } + + private List createInsertStatementsForString( + List testData, String sourceType, String targetType) { + List executes = new ArrayList<>(); + for (Pair pair : testData) { + executes.add( + String.format( + "insert into root.%s2%s.test(timestamp,status) values (%s,'%s');", + sourceType, + targetType, + pair.left, + new String(((Binary) (pair.right)).getValues(), StandardCharsets.UTF_8))); + } + executes.add("flush"); + return executes; + } + + private List createInsertStatementsForNumeric( + List testData, String sourceType, String targetType) { + List executes = new ArrayList<>(); + for (Pair pair : testData) { + executes.add( + String.format( + "insert into root.%s2%s.test(timestamp,status) values (%s,%s);", + sourceType, targetType, pair.left, pair.right)); + } + executes.add("flush"); + return executes; + } + + private List createInsertStatementsForTimestamp( + List testData, String sourceType, String targetType) { + List executes = new ArrayList<>(); + for (Pair pair : testData) { + executes.add( + String.format( + "insert into root.%s2%s.test(timestamp,status) values (%s,%s);", + sourceType, targetType, pair.left, pair.right)); + } + executes.add("flush"); + return executes; + } + + private List createInsertStatementsForLocalDate( + List testData, String sourceType, String targetType) { + List executes = new ArrayList<>(); + for (Pair pair : testData) { + executes.add( + String.format( + "insert into root.%s2%s.test(timestamp,status) values (%s,'%s');", + sourceType, targetType, pair.left, DateUtils.formatDate((Integer) pair.right))); + } + executes.add("flush"); + return executes; + } + + private List createInsertStatementsForBlob( + List testData, String sourceType, String targetType) { + List executes = new ArrayList<>(); + for (Pair pair : testData) { + String value = BytesUtils.parseBlobByteArrayToString(((Binary) pair.right).getValues()); + executes.add( + String.format( + "insert into root.%s2%s.test(timestamp,status) values (%s,X'%s');", + sourceType, targetType, pair.left, value.substring(2))); + } + executes.add("flush"); + return executes; + } + + private Set createExpectedResultSet( + List pairs, TSDataType sourceType, TSDataType targetType) { + switch (targetType) { + case TIMESTAMP: + return generateTimestampResultSet(pairs, sourceType, targetType); + case DATE: + return generateLocalDateResultSet(pairs, sourceType, targetType); + case BLOB: + return generateBlobResultSet(pairs, sourceType, targetType); + case TEXT: + case STRING: + return generateStringResultSet(pairs, sourceType, targetType); + default: + HashSet resultSet = new HashSet<>(); + for (Pair pair : pairs) { + resultSet.add( + String.format( + "%s,%s,", pair.left, ValueConverter.convert(sourceType, targetType, pair.right))); + } + return resultSet; + } + } + + private Set generateTimestampResultSet( + List pairs, TSDataType sourceType, TSDataType targetType) { + HashSet resultSet = new HashSet<>(); + for (Pair pair : pairs) { + resultSet.add( + String.format( + "%s,%s,", + pair.left, + RpcUtils.formatDatetime( + "default", + "ms", + (long) ValueConverter.convert(sourceType, targetType, pair.right), + ZoneOffset.UTC))); + } + return resultSet; + } + + private Set generateLocalDateResultSet( + List pairs, TSDataType sourceType, TSDataType targetType) { + HashSet resultSet = new HashSet<>(); + for (Pair pair : pairs) { + resultSet.add( + String.format( + "%s,%s,", + pair.left, + DateUtils.formatDate( + (Integer) ValueConverter.convert(sourceType, targetType, pair.right)))); + } + return resultSet; + } + + private Set generateBlobResultSet( + List pairs, TSDataType sourceType, TSDataType targetType) { + HashSet resultSet = new HashSet<>(); + for (Pair pair : pairs) { + resultSet.add( + String.format( + "%s,%s,", + pair.left, + BytesUtils.parseBlobByteArrayToString( + ((Binary) ValueConverter.convert(sourceType, targetType, pair.right)) + .getValues()))); + } + return resultSet; + } + + private Set generateStringResultSet( + List pairs, TSDataType sourceType, TSDataType targetType) { + HashSet resultSet = new HashSet<>(); + for (Pair pair : pairs) { + resultSet.add( + String.format( + "%s,%s,", + pair.left, + new String( + ((Binary) ValueConverter.convert(sourceType, targetType, pair.right)).getValues(), + StandardCharsets.UTF_8))); + } + return resultSet; + } + + private List createTestDataForBoolean() { + List pairs = new java.util.ArrayList<>(); + Random random = new Random(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, random.nextBoolean())); + } + return pairs; + } + + private List createTestDataForInt32() { + List pairs = new ArrayList<>(); + Random random = new Random(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, random.nextInt())); + } + pairs.add(new Pair<>(generateDataSize + 1, -1)); + pairs.add(new Pair<>(generateDataSize + 2, -2)); + pairs.add(new Pair<>(generateDataSize + 3, -3)); + return pairs; + } + + private List createTestDataForInt64() { + List pairs = new ArrayList<>(); + Random random = new Random(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, random.nextLong())); + } + pairs.add(new Pair<>(generateDataSize + 1, -1L)); + pairs.add(new Pair<>(generateDataSize + 2, -2L)); + pairs.add(new Pair<>(generateDataSize + 3, -3L)); + return pairs; + } + + private List createTestDataForFloat() { + List pairs = new ArrayList<>(); + Random random = new Random(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, random.nextFloat())); + } + return pairs; + } + + private List createTestDataForDouble() { + List pairs = new ArrayList<>(); + Random random = new Random(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, random.nextDouble())); + } + return pairs; + } + + private List createTestDataForText() { + List pairs = new ArrayList<>(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, new Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8)))); + } + pairs.add(new Pair(generateDataSize + 1, new Binary("Hello".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 2, new Binary("Hello World!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 3, new Binary("This is a test.".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 4, + new Binary("IoTDB Hello World!!!!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 5, + new Binary( + "IoTDB is an excellent time series database!!!!!!!!!" + .getBytes(StandardCharsets.UTF_8)))); + return pairs; + } + + private List createTestDataForTimestamp() { + List pairs = new ArrayList<>(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, new Date().getTime() + i)); + } + return pairs; + } + + private List createTestDataForDate() { + List pairs = new ArrayList<>(); + int year = 2023; + int month = 1; + int day = 1; + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, year * 10000 + (month * 100) + day)); + + // update + day++; + if (day > 28) { + day = 1; + month++; + if (month > 12) { + month = 1; + year++; + } + } + } + return pairs; + } + + private List createTestDataForBlob() { + List pairs = new ArrayList<>(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, new Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8)))); + } + pairs.add(new Pair(generateDataSize + 1, new Binary("Hello".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 2, new Binary("Hello World!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 3, new Binary("This is a test.".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 4, + new Binary("IoTDB Hello World!!!!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 5, + new Binary( + "IoTDB is an excellent time series database!!!!!!!!!" + .getBytes(StandardCharsets.UTF_8)))); + return pairs; + } + + private List createTestDataForString() { + List pairs = new ArrayList<>(); + for (long i = 0; i < generateDataSize; i++) { + pairs.add(new Pair<>(i, new Binary((String.valueOf(i)).getBytes(StandardCharsets.UTF_8)))); + } + pairs.add(new Pair(generateDataSize + 1, new Binary("Hello".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 2, new Binary("Hello World!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 3, new Binary("This is a test.".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 4, + new Binary("IoTDB Hello World!!!!".getBytes(StandardCharsets.UTF_8)))); + pairs.add( + new Pair( + generateDataSize + 5, + new Binary( + "IoTDB is an excellent time series database!!!!!!!!!" + .getBytes(StandardCharsets.UTF_8)))); + return pairs; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 394e2e95d2bc..083f12485ba4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -518,7 +518,9 @@ public synchronized void clearRetryEventsReferenceCount() { @Override public synchronized void discardEventsOfPipe(final String pipeNameToDrop) { - tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); + if (isTabletBatchModeEnabled) { + tabletBatchBuilder.discardEventsOfPipe(pipeNameToDrop); + } retryEventQueue.removeIf( event -> { if (event instanceof EnrichedEvent diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java index 098792fd0b3a..db38102ca9d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/converter/ValueConverter.java @@ -26,8 +26,12 @@ import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.DateUtils; -import java.time.ZoneId; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; public class ValueConverter { @@ -299,6 +303,11 @@ public static Object convert( private static final Binary BINARY_TRUE = parseString(Boolean.TRUE.toString()); private static final Binary BINARY_FALSE = parseString(Boolean.FALSE.toString()); + private static final int TRUE_DATE = DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 2)); + private static final int FALSE_DATE = + DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1)); + private static final int DEFAULT_DATE = + DateUtils.parseDateExpressionToInt(LocalDate.of(1970, 1, 1)); public static int convertBooleanToInt32(final boolean value) { return value ? 1 : 0; @@ -325,7 +334,7 @@ public static long convertBooleanToTimestamp(final boolean value) { } public static int convertBooleanToDate(final boolean value) { - return value ? 1 : 0; + return value ? TRUE_DATE : FALSE_DATE; } public static Binary convertBooleanToBlob(final boolean value) { @@ -363,7 +372,12 @@ public static long convertInt32ToTimestamp(final int value) { } public static int convertInt32ToDate(final int value) { - return value; + try { + DateUtils.parseIntToLocalDate(value); + return value; + } catch (Exception e) { + return DEFAULT_DATE; + } } public static Binary convertInt32ToBlob(final int value) { @@ -401,7 +415,13 @@ public static long convertInt64ToTimestamp(final long value) { } public static int convertInt64ToDate(final long value) { - return (int) value; + try { + int data = (int) value; + DateUtils.parseIntToLocalDate(data); + return data; + } catch (Exception e) { + return DEFAULT_DATE; + } } public static Binary convertInt64ToBlob(final long value) { @@ -439,7 +459,13 @@ public static long convertFloatToTimestamp(final float value) { } public static int convertFloatToDate(final float value) { - return (int) value; + try { + int data = (int) value; + DateUtils.parseIntToLocalDate(data); + return data; + } catch (Exception e) { + return DEFAULT_DATE; + } } public static Binary convertFloatToBlob(final float value) { @@ -477,7 +503,13 @@ public static long convertDoubleToTimestamp(final double value) { } public static int convertDoubleToDate(final double value) { - return (int) value; + try { + int data = (int) value; + DateUtils.parseIntToLocalDate(data); + return data; + } catch (Exception e) { + return DEFAULT_DATE; + } } public static Binary convertDoubleToBlob(final double value) { @@ -553,7 +585,12 @@ public static Binary convertTimestampToText(final long value) { } public static int convertTimestampToDate(final long value) { - return (int) value; + try { + Instant instant = Instant.ofEpochMilli(value); + return DateUtils.parseDateExpressionToInt(instant.atZone(ZoneOffset.UTC).toLocalDate()); + } catch (Exception e) { + return DEFAULT_DATE; + } } public static Binary convertTimestampToBlob(final long value) { @@ -567,7 +604,7 @@ public static Binary convertTimestampToString(final long value) { ///////////// DATE ////////////// public static boolean convertDateToBoolean(final int value) { - return value != 0; + return value != FALSE_DATE; } public static int convertDateToInt32(final int value) { @@ -591,7 +628,14 @@ public static Binary convertDateToText(final int value) { } public static long convertDateToTimestamp(final int value) { - return value; + try { + LocalDate date = DateUtils.parseIntToLocalDate(value); + ZonedDateTime dateTime = date.atStartOfDay(ZoneOffset.UTC); + Instant instant = dateTime.toInstant(); + return instant.toEpochMilli(); + } catch (Exception e) { + return 0L; + } } public static Binary convertDateToBlob(final int value) { @@ -753,8 +797,7 @@ private static long parseTimestamp(final String value) { try { return TypeInferenceUtils.isNumber(value) ? Long.parseLong(value) - : DateTimeUtils.parseDateTimeExpressionToLong( - StringUtils.trim(value), ZoneId.systemDefault()); + : DateTimeUtils.parseDateTimeExpressionToLong(StringUtils.trim(value), ZoneOffset.UTC); } catch (final Exception e) { return 0L; } @@ -762,14 +805,17 @@ private static long parseTimestamp(final String value) { private static int parseDate(final String value) { if (value == null || value.isEmpty()) { - return 0; + return DEFAULT_DATE; } try { - return TypeInferenceUtils.isNumber(value) - ? Integer.parseInt(value) - : DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value)); + if (TypeInferenceUtils.isNumber(value)) { + int date = Integer.parseInt(value); + DateUtils.parseIntToLocalDate(date); + return date; + } + return DateTimeUtils.parseDateExpressionToInt(StringUtils.trim(value)); } catch (final Exception e) { - return 0; + return DEFAULT_DATE; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java index 3df76b79ce45..6ae0e2d9fa45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java @@ -42,6 +42,7 @@ public PipeConvertedInsertRowStatement(final InsertRowStatement insertRowStateme // Statement isDebug = insertRowStatement.isDebug(); // InsertBaseStatement + insertRowStatement.removeAllFailedMeasurementMarks(); devicePath = insertRowStatement.getDevicePath(); isAligned = insertRowStatement.isAligned(); measurementSchemas = insertRowStatement.getMeasurementSchemas(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java index 2481034cbae1..a5013725061b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertTabletStatement.java @@ -36,6 +36,7 @@ public PipeConvertedInsertTabletStatement(final InsertTabletStatement insertTabl // Statement isDebug = insertTabletStatement.isDebug(); // InsertBaseStatement + insertTabletStatement.removeAllFailedMeasurementMarks(); devicePath = insertTabletStatement.getDevicePath(); isAligned = insertTabletStatement.isAligned(); measurementSchemas = insertTabletStatement.getMeasurementSchemas(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java index 11434865d688..8c5681c914ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertBaseStatement.java @@ -264,6 +264,11 @@ public void markFailedMeasurement(int index, Exception cause) { throw new UnsupportedOperationException(); } + /** * Resets the state of all measurements marked as failed, clearing the failure records. */ + public void removeAllFailedMeasurementMarks() { + throw new UnsupportedOperationException(); + } + public boolean hasValidMeasurements() { for (Object o : measurements) { if (o != null) { @@ -372,6 +377,18 @@ public FailedMeasurementInfo( this.value = value; this.cause = cause; } + + public String getMeasurement() { + return measurement; + } + + public TSDataType getDataType() { + return dataType; + } + + public Object getValue() { + return value; + } } // endregion diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java index a0b67ba997d2..f7b4f5f32a81 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertRowStatement.java @@ -276,6 +276,20 @@ public void markFailedMeasurement(int index, Exception cause) { values[index] = null; } + @Override + public void removeAllFailedMeasurementMarks() { + if (failedMeasurementIndex2Info == null) { + return; + } + failedMeasurementIndex2Info.forEach( + (index, info) -> { + measurements[index] = info.getMeasurement(); + dataTypes[index] = info.getDataType(); + values[index] = info.getValue(); + }); + failedMeasurementIndex2Info.clear(); + } + @Override public void semanticCheck() { super.semanticCheck(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java index 8cfabf92a856..cb844ef3175c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java @@ -228,6 +228,20 @@ public void markFailedMeasurement(int index, Exception cause) { columns[index] = null; } + @Override + public void removeAllFailedMeasurementMarks() { + if (failedMeasurementIndex2Info == null) { + return; + } + failedMeasurementIndex2Info.forEach( + (index, info) -> { + measurements[index] = info.getMeasurement(); + dataTypes[index] = info.getDataType(); + columns[index] = info.getValue(); + }); + failedMeasurementIndex2Info.clear(); + } + @Override public void semanticCheck() { super.semanticCheck();