Skip to content

Commit

Permalink
Add parameters to control the number of schema clients. (#454)
Browse files Browse the repository at this point in the history
* SCHEMA_CLIENT_NUMBER controls the number of connections that register the schema.

* Modify the getTableIdFromDeviceName method

* Modify the printed log

* Compatible parameter client_number

* Compatible parameter client_number

* Compatible parameter client_number

* Replace the magic String with a constant.

* Clear SCHEMA_CLIENT_DATA_SCHEMA after registration is complete

* Modify the judgment conditions for loading client

* Modify schema client comments to align with data client.
  • Loading branch information
YangYumings authored Oct 13, 2024
1 parent 6f78dfd commit c24ee74
Show file tree
Hide file tree
Showing 29 changed files with 208 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class CnosConnection {

CnosConnection(String urlString, String cnosDbName) throws MalformedURLException {
ConnectionPool connectionPool =
new ConnectionPool(config.getCLIENT_NUMBER(), 5, TimeUnit.MINUTES);
new ConnectionPool(config.getDATA_CLIENT_NUMBER(), 5, TimeUnit.MINUTES);
client = new OkHttpClient().newBuilder().connectionPool(connectionPool).build();
url = urlString + "/api/v1/sql?db=" + cnosDbName;
}
Expand Down
7 changes: 5 additions & 2 deletions configuration/conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,12 @@
# 是否将设备绑定给客户端,如果绑定,则客户端数小于等于设备数,否则可以大于
# IS_CLIENT_BIND=true

# 客户端总数
# schema 客户端总数,schema_client 负责注册元数据
# SCHEMA_CLIENT_NUMBER=20

# data 客户端总数, data_client 负责读写数据
# iotdb表模型下,client数为table数的整数倍
# CLIENT_NUMBER=20
# DATA_CLIENT_NUMBER=20

############## 被测系统为IoTDB时扩展参数 ##################
# 是否使用thrift压缩,需要在iotdb的配置文件iotdb-datanode.properties中设置dn_rpc_thrift_compression_enable=true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier)
this.queryWorkLoad = QueryWorkLoad.getInstance(id);
this.clientThreadId = id;
this.clientDeviceSchemas =
MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
MetaDataSchema.getInstance().getDeviceSchemaByDataClientId(clientThreadId);
this.service =
Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("ShowWorkProgress-" + clientThreadId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public SchemaClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier
this.countDownLatch = countDownLatch;
this.barrier = barrier;
this.clientThreadId = id;
this.deviceSchemas = MetaDataSchema.getInstance().getDeviceSchemaByClientId(clientThreadId);
this.deviceSchemas =
MetaDataSchema.getInstance().getDeviceSchemaBySchemaClientId(clientThreadId);
this.deviceSchemasSize = deviceSchemas.size();
initDBWrappers();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public GenerateDataDeviceClient(int id, CountDownLatch countDownLatch, CyclicBar
@Override
protected void doTest() {
try {
for (int i = 0; i < config.getDEVICE_NUMBER() / config.getCLIENT_NUMBER() + 1; i++) {
for (int i = 0; i < config.getDEVICE_NUMBER() / config.getDATA_CLIENT_NUMBER() + 1; i++) {
DeviceQuery deviceQuery = queryWorkLoad.getDeviceQuery();
if (deviceQuery == null) {
break;
Expand Down
30 changes: 22 additions & 8 deletions core/src/main/java/cn/edu/tsinghua/iot/benchmark/conf/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,15 @@ public class Config {
*/
private boolean IS_CLIENT_BIND = true;
/**
* The number of client if IS_CLIENT_BIND = true: this number must be less than or equal to the
* number of devices.
* The number of schema client if IS_CLIENT_BIND = true: this number must be less than or equal to
* the number of devices.
*/
private int CLIENT_NUMBER = 20;
private int SCHEMA_CLIENT_NUMBER = 20;
/**
* The number of data client if IS_CLIENT_BIND = true: this number must be less than or equal to
* the number of devices.
*/
private int DATA_CLIENT_NUMBER = 20;

/** name prefix of table */
private String IoTDB_TABLE_NAME_PREFIX = "table_";
Expand Down Expand Up @@ -905,12 +910,20 @@ public void setIS_CLIENT_BIND(boolean IS_CLIENT_BIND) {
this.IS_CLIENT_BIND = IS_CLIENT_BIND;
}

public int getCLIENT_NUMBER() {
return CLIENT_NUMBER;
public int getSCHEMA_CLIENT_NUMBER() {
return SCHEMA_CLIENT_NUMBER;
}

public void setSCHEMA_CLIENT_NUMBER(int SCHEMA_CLIENT_NUMBER) {
this.SCHEMA_CLIENT_NUMBER = SCHEMA_CLIENT_NUMBER;
}

public int getDATA_CLIENT_NUMBER() {
return DATA_CLIENT_NUMBER;
}

public void setCLIENT_NUMBER(int CLIENT_NUMBER) {
this.CLIENT_NUMBER = CLIENT_NUMBER;
public void setDATA_CLIENT_NUMBER(int DATA_CLIENT_NUMBER) {
this.DATA_CLIENT_NUMBER = DATA_CLIENT_NUMBER;
}

public int getTAG_NUMBER() {
Expand Down Expand Up @@ -1813,7 +1826,8 @@ public ConfigProperties getShowConfigProperties() {
configProperties.addProperty("Data Mode", "IS_OUT_OF_ORDER", this.IS_OUT_OF_ORDER);
configProperties.addProperty("Data Mode", "OUT_OF_ORDER_RATIO", this.OUT_OF_ORDER_RATIO);
configProperties.addProperty("Data Amount", "OPERATION_PROPORTION", this.OPERATION_PROPORTION);
configProperties.addProperty("Data Amount", "CLIENT_NUMBER", this.CLIENT_NUMBER);
configProperties.addProperty("Data Amount", "SCHEMA_CLIENT_NUMBER", this.SCHEMA_CLIENT_NUMBER);
configProperties.addProperty("Data Amount", "DATA_CLIENT_NUMBER", this.DATA_CLIENT_NUMBER);
configProperties.addProperty("Data Amount", "LOOP", this.LOOP);
configProperties.addProperty("Data Amount", "BATCH_SIZE_PER_WRITE", this.BATCH_SIZE_PER_WRITE);
configProperties.addProperty("Data Amount", "DEVICE_NUM_PER_WRITE", this.DEVICE_NUM_PER_WRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,21 @@ private void loadProps() {
config.setIS_CLIENT_BIND(
Boolean.parseBoolean(
properties.getProperty("IS_CLIENT_BIND", config.isIS_CLIENT_BIND() + "")));
config.setCLIENT_NUMBER(
Integer.parseInt(
properties.getProperty("CLIENT_NUMBER", config.getCLIENT_NUMBER() + "")));

String schemaClientNumber = properties.getProperty("SCHEMA_CLIENT_NUMBER");
if (schemaClientNumber == null || schemaClientNumber.isEmpty()) {
schemaClientNumber =
properties.getProperty("CLIENT_NUMBER", config.getSCHEMA_CLIENT_NUMBER() + "");
}
config.setSCHEMA_CLIENT_NUMBER(Integer.parseInt(schemaClientNumber));

String dataClientNumber = properties.getProperty("DATA_CLIENT_NUMBER");
if (dataClientNumber == null || dataClientNumber.isEmpty()) {
dataClientNumber =
properties.getProperty("CLIENT_NUMBER", config.getDATA_CLIENT_NUMBER() + "");
}
config.setDATA_CLIENT_NUMBER(Integer.parseInt(dataClientNumber));

config.setIoTDB_TABLE_NAME_PREFIX(
properties.getProperty("IoTDB_TABLE_NAME_PREFIX", config.getIoTDB_TABLE_NAME_PREFIX()));
config.setGROUP_NAME_PREFIX(
Expand Down Expand Up @@ -546,9 +558,11 @@ private boolean checkConfig() {
// Checking config according to mode
switch (config.getBENCHMARK_WORK_MODE()) {
case TEST_WITH_DEFAULT_PATH:
if (config.isIS_CLIENT_BIND() && config.getDEVICE_NUMBER() < config.getCLIENT_NUMBER()) {
if (config.isIS_CLIENT_BIND()
&& config.getDEVICE_NUMBER() < config.getSCHEMA_CLIENT_NUMBER()
&& config.getDEVICE_NUMBER() < config.getDATA_CLIENT_NUMBER()) {
LOGGER.error(
"In client bind way, the number of client should be less than the number of device");
"In client bind way, the number of schema client and data client should be less than the number of device");
result = false;
}
if (!config.hasWrite()) {
Expand Down Expand Up @@ -584,7 +598,8 @@ private boolean checkConfig() {
}
}
if (config.isIS_POINT_COMPARISON()) {
if (config.getDEVICE_NUMBER() < config.getCLIENT_NUMBER()) {
if (config.getDEVICE_NUMBER() < config.getSCHEMA_CLIENT_NUMBER()
|| config.getDEVICE_NUMBER() < config.getDATA_CLIENT_NUMBER()) {
LOGGER.warn("There are too many client ( > device number)");
}
}
Expand Down Expand Up @@ -620,7 +635,7 @@ private boolean checkConfig() {
}
result &= checkInsertDataTypeProportion();
result &= checkOperationProportion();
if (config.getCLIENT_NUMBER() == 0) {
if (config.getSCHEMA_CLIENT_NUMBER() == 0 || config.getDATA_CLIENT_NUMBER() == 0) {
LOGGER.error("Client number can't be zero");
result = false;
}
Expand Down Expand Up @@ -694,10 +709,10 @@ private boolean checkDeviceNumPerWrite() {
}
// tableMode
if (config.getIoTDB_DIALECT_MODE() == SQLDialect.TABLE
&& config.getCLIENT_NUMBER() % config.getIoTDB_TABLE_NUMBER() != 0) {
&& config.getDATA_CLIENT_NUMBER() % config.getIoTDB_TABLE_NUMBER() != 0) {
LOGGER.error(
"TableMode must ensure that a client only writes to one table. Therefore, a client only switches database once.\n"
+ "please make CLIENT_NUMBER % IoTDB_TABLE_NUMBER == 0");
+ "please make DATA_CLIENT_NUMBER % IoTDB_TABLE_NUMBER == 0");
return false;
}
if (dnw == 1) {
Expand All @@ -714,12 +729,12 @@ private boolean checkDeviceNumPerWrite() {
}
for (int deviceNumPerClient :
CommonAlgorithms.distributeDevicesToClients(
config.getDEVICE_NUMBER(), config.getCLIENT_NUMBER())
config.getDEVICE_NUMBER(), config.getDATA_CLIENT_NUMBER())
.values()) {
if (deviceNumPerClient % dnw != 0) {
LOGGER.error(
"Some clients will be allocated {} devices, which are not divisible by parameter DEVICE_NUM_PER_WRITE {}.\n"
+ "To solve this problem, please make DEVICE_NUMBER % CLIENTS_NUMBER == 0, and (DEVICE_NUMBER / CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.",
+ "To solve this problem, please make DEVICE_NUMBER % DATA_CLIENT_NUMBER == 0, and (DEVICE_NUMBER / DATA_CLIENT_NUMBER) % DEVICE_NUM_PER_WRITE == 0.",
deviceNumPerClient, dnw);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,8 @@ public void saveTestConfig() {
String.format(
SAVE_CONFIG,
"'" + PROJECT_ID + "'",
"'getCLIENT_NUMBER()'",
"'" + config.getCLIENT_NUMBER() + "'");
"'getDATA_CLIENT_NUMBER()'",
"'" + config.getDATA_CLIENT_NUMBER() + "'");
statement.addBatch(sql);
sql =
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import cn.edu.tsinghua.iot.benchmark.conf.Config;
import cn.edu.tsinghua.iot.benchmark.conf.ConfigDescriptor;
import cn.edu.tsinghua.iot.benchmark.measurement.Measurement;
import cn.edu.tsinghua.iot.benchmark.schema.MetaDataSchema;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper;
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
Expand Down Expand Up @@ -54,13 +55,14 @@ public abstract class BaseMode {

protected ExecutorService schemaExecutorService =
Executors.newFixedThreadPool(
config.getCLIENT_NUMBER(), new NamedThreadFactory("SchemaClient"));
config.getSCHEMA_CLIENT_NUMBER(), new NamedThreadFactory("SchemaClient"));
protected ExecutorService executorService =
Executors.newFixedThreadPool(config.getCLIENT_NUMBER(), new NamedThreadFactory("DataClient"));
protected CountDownLatch schemaDownLatch = new CountDownLatch(config.getCLIENT_NUMBER());
protected CyclicBarrier schemaBarrier = new CyclicBarrier(config.getCLIENT_NUMBER());
protected CountDownLatch dataDownLatch = new CountDownLatch(config.getCLIENT_NUMBER());
protected CyclicBarrier dataBarrier = new CyclicBarrier(config.getCLIENT_NUMBER());
Executors.newFixedThreadPool(
config.getDATA_CLIENT_NUMBER(), new NamedThreadFactory("DataClient"));
protected CountDownLatch schemaDownLatch = new CountDownLatch(config.getSCHEMA_CLIENT_NUMBER());
protected CyclicBarrier schemaBarrier = new CyclicBarrier(config.getSCHEMA_CLIENT_NUMBER());
protected CountDownLatch dataDownLatch = new CountDownLatch(config.getDATA_CLIENT_NUMBER());
protected CyclicBarrier dataBarrier = new CyclicBarrier(config.getDATA_CLIENT_NUMBER());
protected List<DataClient> dataClients = new ArrayList<>();
protected List<SchemaClient> schemaClients = new ArrayList<>();
protected Measurement baseModeMeasurement = new Measurement();
Expand All @@ -73,7 +75,7 @@ public void run() {
if (!preCheck()) {
return;
}
for (int i = 0; i < config.getCLIENT_NUMBER(); i++) {
for (int i = 0; i < config.getDATA_CLIENT_NUMBER(); i++) {
DataClient client = DataClient.getInstance(i, dataDownLatch, dataBarrier);
if (client == null) {
return;
Expand Down Expand Up @@ -169,7 +171,7 @@ protected boolean cleanUpData(List<DBConfig> dbConfigs) {

/** Register schema */
protected boolean registerSchema() {
for (int i = 0; i < config.getCLIENT_NUMBER(); i++) {
for (int i = 0; i < config.getSCHEMA_CLIENT_NUMBER(); i++) {
SchemaClient schemaClient = new SchemaClient(i, schemaDownLatch, schemaBarrier);
schemaClients.add(schemaClient);
}
Expand All @@ -189,6 +191,7 @@ protected boolean registerSchema() {
Thread.currentThread().interrupt();
}
LOGGER.info("Registering schema successful!");
MetaDataSchema.clearSchemaClientDataSchema();
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ public abstract class MetaDataSchema {
private static final String UNKNOWN_DEVICE = "Unknown device: %s";

protected static final Config config = ConfigDescriptor.getInstance().getConfig();
/** DeviceSchema for each client */
protected static final Map<Integer, List<DeviceSchema>> CLIENT_DATA_SCHEMA =
/** DeviceSchema for each schema client */
protected static final Map<Integer, List<DeviceSchema>> SCHEMA_CLIENT_DATA_SCHEMA =
new ConcurrentHashMap<>();
/** DeviceSchema for each data client */
protected static final Map<Integer, List<DeviceSchema>> DATA_CLIENT_DATA_SCHEMA =
new ConcurrentHashMap<>();
/** Name mapping of DeviceSchema */
protected static final Map<String, DeviceSchema> NAME_DATA_SCHEMA = new ConcurrentHashMap<>();
Expand All @@ -52,6 +55,11 @@ protected MetaDataSchema() {
}
}

public static void clearSchemaClientDataSchema() {
SCHEMA_CLIENT_DATA_SCHEMA.clear();
LOGGER.info("SCHEMA_CLIENT_DATA_SCHEMA has been cleared!");
}

/** init data schema for each device */
protected abstract boolean createMetaDataSchema();

Expand All @@ -65,14 +73,26 @@ public DeviceSchema getDeviceSchemaByName(String deviceName) {
}
}

/** Get DeviceSchema by clientId */
public List<DeviceSchema> getDeviceSchemaByClientId(int clientId) {
return CLIENT_DATA_SCHEMA.get(clientId);
/** Get DeviceSchema by schema clientId */
public List<DeviceSchema> getDeviceSchemaBySchemaClientId(int clientId) {
return SCHEMA_CLIENT_DATA_SCHEMA.get(clientId);
}

/** Get DeviceSchema by data clientId */
public List<DeviceSchema> getDeviceSchemaByDataClientId(int clientId) {
return DATA_CLIENT_DATA_SCHEMA.get(clientId);
}

/** Get All Device Schema */
public List<DeviceSchema> getAllDeviceSchemas() {
return new ArrayList<>(NAME_DATA_SCHEMA.values());
List<Integer> deviceIds = MetaUtil.sortDeviceId();
List<DeviceSchema> deviceSchemaList = new ArrayList<>();
for (int deviceId : deviceIds) {
if (NAME_DATA_SCHEMA.containsKey(config.getDEVICE_NAME_PREFIX() + deviceId)) {
deviceSchemaList.add(NAME_DATA_SCHEMA.get(config.getDEVICE_NAME_PREFIX() + deviceId));
}
}
return new ArrayList<>(deviceSchemaList);
}

/** Get All Group */
Expand Down
Loading

0 comments on commit c24ee74

Please sign in to comment.