diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IBenchmarkSession.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IBenchmarkSession.java index 3c0b15b2d..735e39035 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IBenchmarkSession.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IBenchmarkSession.java @@ -2,22 +2,58 @@ import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.session.Session; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.write.record.Tablet; import java.util.List; public interface IBenchmarkSession { - void open() throws IoTDBConnectionException; - void open(boolean enableRPCCompression) throws IoTDBConnectionException; - void insertRecord(String deviceId, long time, List measurements, List types, List values) throws IoTDBConnectionException, StatementExecutionException; - void insertAlignedRecord(String multiSeriesId, long time, List multiMeasurementComponents, List types, List values) throws IoTDBConnectionException, StatementExecutionException; - void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException; - void insertAlignedRecords(List multiSeriesIds, List times, List> multiMeasurementComponentsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException; - void insertTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException; - void insertAlignedTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException; - ISessionDataSet executeQueryStatement(String sql) throws IoTDBConnectionException, StatementExecutionException; - void executeNonQueryStatement(String deleteSeriesSql) throws IoTDBConnectionException, StatementExecutionException; - void close() throws IoTDBConnectionException; + void open() throws IoTDBConnectionException; + + void open(boolean enableRPCCompression) throws IoTDBConnectionException; + + void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException; + + void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException; + + void insertTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException; + + void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException; + + ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException; + + void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException; + + void close() throws IoTDBConnectionException; } diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/ISessionDataSet.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/ISessionDataSet.java index 0be105152..6310de5c6 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/ISessionDataSet.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/ISessionDataSet.java @@ -1,14 +1,13 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; -import org.apache.iotdb.isession.SessionDataSet; -import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.tsfile.read.common.RowRecord; public interface ISessionDataSet { - RowRecord next() throws IoTDBConnectionException, StatementExecutionException; - boolean hasNext() throws IoTDBConnectionException, StatementExecutionException; - void close() throws IoTDBConnectionException, StatementExecutionException; -} + RowRecord next() throws IoTDBConnectionException, StatementExecutionException; + + boolean hasNext() throws IoTDBConnectionException, StatementExecutionException; + void close() throws IoTDBConnectionException, StatementExecutionException; +} diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBClusterSession.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBClusterSession.java index b43be9ef6..a0b6fac12 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBClusterSession.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBClusterSession.java @@ -19,8 +19,6 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; -import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; -import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; import org.apache.iotdb.isession.pool.SessionDataSetWrapper; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; @@ -28,6 +26,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; + +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; import org.slf4j.LoggerFactory; import java.util.ArrayList; @@ -37,59 +38,99 @@ public class IoTDBClusterSession extends IoTDBSessionBase { private class BenchmarkSessionPool implements IBenchmarkSession { private final SessionPool sessionPool; + public BenchmarkSessionPool(SessionPool sessionPool) { this.sessionPool = sessionPool; } + @Override - public void open() { - - } + public void open() {} + @Override - public void open(boolean enableRPCCompression) { - - } + public void open(boolean enableRPCCompression) {} + @Override - public void insertRecord(String deviceId, long time, List measurements, List types, List values) throws IoTDBConnectionException, StatementExecutionException { + public void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { sessionPool.insertRecord(deviceId, time, measurements, types, values); } + @Override - public void insertAlignedRecord(String multiSeriesId, long time, List multiMeasurementComponents, List types, List values) throws IoTDBConnectionException, StatementExecutionException { - sessionPool.insertAlignedRecord(multiSeriesId, time, multiMeasurementComponents, types, values); + public void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertAlignedRecord( + multiSeriesId, time, multiMeasurementComponents, types, values); } + @Override - public void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException { + public void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { sessionPool.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); } + @Override - public void insertAlignedRecords(List multiSeriesIds, List times, List> multiMeasurementComponentsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException { - sessionPool.insertAlignedRecords(multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); + public void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + sessionPool.insertAlignedRecords( + multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); } + @Override - public void insertTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException { + public void insertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { sessionPool.insertTablet(tablet); } + @Override - public void insertAlignedTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException { + public void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { sessionPool.insertAlignedTablet(tablet); } + @Override - public ISessionDataSet executeQueryStatement(String sql) throws IoTDBConnectionException, StatementExecutionException { + public ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { return new SessionDataSet2(sessionPool.executeQueryStatement(sql)); } + @Override public void close() { sessionPool.close(); } + @Override - public void executeNonQueryStatement(String deleteSeriesSql) throws IoTDBConnectionException, StatementExecutionException { + public void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException { sessionPool.executeNonQueryStatement(deleteSeriesSql); } + private class SessionDataSet2 implements ISessionDataSet { SessionDataSetWrapper sessionDataSet; public SessionDataSet2(SessionDataSetWrapper sessionDataSetWrapper) { this.sessionDataSet = sessionDataSetWrapper; } + @Override public RowRecord next() throws IoTDBConnectionException, StatementExecutionException { return sessionDataSet.next(); @@ -106,6 +147,7 @@ public void close() throws IoTDBConnectionException, StatementExecutionException } } } + private static final int MAX_SESSION_CONNECTION_PER_CLIENT = 3; public IoTDBClusterSession(DBConfig dbConfig) { @@ -115,15 +157,15 @@ public IoTDBClusterSession(DBConfig dbConfig) { for (int i = 0; i < dbConfig.getHOST().size(); i++) { hostUrls.add(dbConfig.getHOST().get(i) + ":" + dbConfig.getPORT().get(i)); } - sessionWrapper = new BenchmarkSessionPool( - new SessionPool( - hostUrls, - dbConfig.getUSERNAME(), - dbConfig.getPASSWORD(), - MAX_SESSION_CONNECTION_PER_CLIENT, - config.isENABLE_THRIFT_COMPRESSION(), - true) - ); + sessionWrapper = + new BenchmarkSessionPool( + new SessionPool( + hostUrls, + dbConfig.getUSERNAME(), + dbConfig.getPASSWORD(), + MAX_SESSION_CONNECTION_PER_CLIENT, + config.isENABLE_THRIFT_COMPRESSION(), + true)); } @Override diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSession.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSession.java index faee26e18..5dee11b76 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSession.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSession.java @@ -19,8 +19,6 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; -import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; -import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; @@ -29,6 +27,9 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.RowRecord; import org.apache.iotdb.tsfile.write.record.Tablet; + +import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig; +import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException; import org.slf4j.LoggerFactory; import java.util.List; @@ -37,53 +38,95 @@ public class IoTDBSession extends IoTDBSessionBase { private class BenchmarkSession implements IBenchmarkSession { private final Session session; + public BenchmarkSession(Session session) { this.session = session; } + @Override public void open() throws IoTDBConnectionException { session.open(); } + @Override public void open(boolean enableRPCCompression) throws IoTDBConnectionException { session.open(enableRPCCompression); } + @Override - public void insertRecord(String deviceId, long time, List measurements, List types, List values) throws IoTDBConnectionException, StatementExecutionException { + public void insertRecord( + String deviceId, + long time, + List measurements, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { session.insertRecord(deviceId, time, measurements, types, values); } + @Override - public void insertAlignedRecord(String multiSeriesId, long time, List multiMeasurementComponents, List types, List values) throws IoTDBConnectionException, StatementExecutionException { + public void insertAlignedRecord( + String multiSeriesId, + long time, + List multiMeasurementComponents, + List types, + List values) + throws IoTDBConnectionException, StatementExecutionException { session.insertAlignedRecord(multiSeriesId, time, multiMeasurementComponents, types, values); } + @Override - public void insertRecords(List deviceIds, List times, List> measurementsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException { + public void insertRecords( + List deviceIds, + List times, + List> measurementsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); } + @Override - public void insertAlignedRecords(List multiSeriesIds, List times, List> multiMeasurementComponentsList, List> typesList, List> valuesList) throws IoTDBConnectionException, StatementExecutionException { - session.insertAlignedRecords(multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); + public void insertAlignedRecords( + List multiSeriesIds, + List times, + List> multiMeasurementComponentsList, + List> typesList, + List> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + session.insertAlignedRecords( + multiSeriesIds, times, multiMeasurementComponentsList, typesList, valuesList); } + @Override - public void insertTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException { + public void insertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { session.insertTablet(tablet); } + @Override - public void insertAlignedTablet(Tablet tablet) throws IoTDBConnectionException, StatementExecutionException { + public void insertAlignedTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { session.insertAlignedTablet(tablet); } + @Override - public ISessionDataSet executeQueryStatement(String sql) throws IoTDBConnectionException, StatementExecutionException { + public ISessionDataSet executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { return new SessionDataSet1(session.executeQueryStatement(sql)); } + @Override public void close() throws IoTDBConnectionException { session.close(); } + @Override - public void executeNonQueryStatement(String deleteSeriesSql) throws IoTDBConnectionException, StatementExecutionException { + public void executeNonQueryStatement(String deleteSeriesSql) + throws IoTDBConnectionException, StatementExecutionException { session.executeNonQueryStatement(deleteSeriesSql); } + private class SessionDataSet1 implements ISessionDataSet { SessionDataSet sessionDataSet; @@ -111,16 +154,16 @@ public void close() throws IoTDBConnectionException, StatementExecutionException public IoTDBSession(DBConfig dbConfig) { super(dbConfig); LOGGER = LoggerFactory.getLogger(IoTDBSession.class); - sessionWrapper = new BenchmarkSession( + sessionWrapper = + new BenchmarkSession( new Session.Builder() - .host(dbConfig.getHOST().get(0)) - .port(Integer.parseInt(dbConfig.getPORT().get(0))) - .username(dbConfig.getUSERNAME()) - .password(dbConfig.getPASSWORD()) - .enableRedirection(true) - .version(Version.V_1_0) - .build() - ); + .host(dbConfig.getHOST().get(0)) + .port(Integer.parseInt(dbConfig.getPORT().get(0))) + .username(dbConfig.getUSERNAME()) + .password(dbConfig.getPASSWORD()) + .enableRedirection(true) + .version(Version.V_1_0) + .build()); } @Override diff --git a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java index 774f8462b..7caf7e3d7 100644 --- a/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java +++ b/iotdb-1.1/src/main/java/cn/edu/tsinghua/iot/benchmark/iotdb110/IoTDBSessionBase.java @@ -19,6 +19,16 @@ package cn.edu.tsinghua.iot.benchmark.iotdb110; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; + import cn.edu.tsinghua.iot.benchmark.client.operation.Operation; import cn.edu.tsinghua.iot.benchmark.conf.Config; import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor; @@ -34,15 +44,6 @@ import cn.edu.tsinghua.iot.benchmark.tsdb.enums.DBInsertMode; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.DeviceQuery; import cn.edu.tsinghua.iot.benchmark.workload.query.impl.VerificationQuery; -import org.apache.iotdb.rpc.IoTDBConnectionException; -import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.common.Field; -import org.apache.iotdb.tsfile.read.common.RowRecord; -import org.apache.iotdb.tsfile.utils.Binary; -import org.apache.iotdb.tsfile.write.record.Tablet; -import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.slf4j.Logger; import java.sql.SQLException; @@ -69,18 +70,19 @@ public IoTDBSessionBase(DBConfig dbConfig) { public Status insertOneBatchByTablet(Batch batch) { Tablet tablet = genTablet(batch); - future = service.submit( - () -> { - try { - if (config.isVECTOR()) { - sessionWrapper.insertAlignedTablet(tablet); - } else { - sessionWrapper.insertTablet(tablet); - } - } catch (IoTDBConnectionException | StatementExecutionException e) { - LOGGER.error("insert tablet failed", e); - } - }); + future = + service.submit( + () -> { + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedTablet(tablet); + } else { + sessionWrapper.insertTablet(tablet); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("insert tablet failed", e); + } + }); return waitFuture(); } @@ -136,18 +138,21 @@ public Status insertOneBatchByRecords(Batch batch) { constructDataTypes( batch.getDeviceSchema().getSensors(), record.getRecordDataValue().size())); } - future = service.submit( - () -> { - try { - if (config.isVECTOR()) { - sessionWrapper.insertAlignedRecords(deviceIds, times, measurementsList, typesList, valuesList); - } else { - sessionWrapper.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); - } - } catch (IoTDBConnectionException | StatementExecutionException e) { - LOGGER.error("insert records failed", e); - } - }); + future = + service.submit( + () -> { + try { + if (config.isVECTOR()) { + sessionWrapper.insertAlignedRecords( + deviceIds, times, measurementsList, typesList, valuesList); + } else { + sessionWrapper.insertRecords( + deviceIds, times, measurementsList, typesList, valuesList); + } + } catch (IoTDBConnectionException | StatementExecutionException e) { + LOGGER.error("insert records failed", e); + } + }); return waitFuture(); }