Skip to content

Commit

Permalink
Support distinguish of DB model && Fixed the table detection bug when…
Browse files Browse the repository at this point in the history
… indexOutOfRange && Fixed the bug that conflict databases may be saved in cache
  • Loading branch information
Caideyipi authored Oct 25, 2024
1 parent ad97746 commit 77a2dd7
Show file tree
Hide file tree
Showing 35 changed files with 602 additions and 402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1041,25 +1041,23 @@ public void testInvalidMaxPointNumber() {

@Test
public void testStorageGroupWithHyphenInName() {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
statement.setFetchSize(5);
statement.execute("CREATE DATABASE root.group_with_hyphen");
} catch (SQLException e) {
} catch (final SQLException e) {
fail();
}

try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement()) {
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
while (resultSet.next()) {
StringBuilder builder = new StringBuilder();
builder.append(resultSet.getString(1));
Assert.assertEquals(builder.toString(), "root.group_with_hyphen");
Assert.assertEquals("root.group_with_hyphen", resultSet.getString(1));
Assert.assertEquals("TREE", resultSet.getString(12));
}
}
} catch (SQLException e) {
} catch (final SQLException e) {
fail();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Collections;

import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showDBColumnHeaders;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.showDBDetailsColumnHeaders;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -83,6 +84,7 @@ public void testManageDatabase() {
int[] schemaReplicaFactors = new int[] {1};
int[] dataReplicaFactors = new int[] {1};
int[] timePartitionInterval = new int[] {604800000};
String[] model = new String[] {"TABLE"};

// show
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES")) {
Expand All @@ -102,6 +104,26 @@ public void testManageDatabase() {
assertEquals(databaseNames.length, cnt);
}

// show
try (final ResultSet resultSet = statement.executeQuery("SHOW DATABASES DETAILS")) {
int cnt = 0;
final ResultSetMetaData metaData = resultSet.getMetaData();
assertEquals(showDBDetailsColumnHeaders.size(), metaData.getColumnCount());
for (int i = 0; i < showDBDetailsColumnHeaders.size(); i++) {
assertEquals(
showDBDetailsColumnHeaders.get(i).getColumnName(), metaData.getColumnName(i + 1));
}
while (resultSet.next()) {
assertEquals(databaseNames[cnt], resultSet.getString(1));
assertEquals(schemaReplicaFactors[cnt], resultSet.getInt(2));
assertEquals(dataReplicaFactors[cnt], resultSet.getInt(3));
assertEquals(timePartitionInterval[cnt], resultSet.getLong(4));
assertEquals(model[cnt], resultSet.getString(5));
cnt++;
}
assertEquals(databaseNames.length, cnt);
}

// use
statement.execute("use test");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public enum TSStatusCode {
MEASUREMENT_ALREADY_EXISTS_IN_TEMPLATE(527),
TYPE_NOT_FOUND(528),
DATABASE_CONFLICT(529),
DATABASE_MODEL(530),

TABLE_NOT_EXISTS(550),
TABLE_ALREADY_EXISTS(551),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ public TSStatus setDatabase(final DatabaseSchemaPlan databaseSchemaPlan) {
}

@Override
public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
TSStatus status = confirmLeader();
public TSStatus alterDatabase(final DatabaseSchemaPlan databaseSchemaPlan) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return clusterSchemaManager.alterDatabase(databaseSchemaPlan, false);
} else {
Expand All @@ -736,19 +736,19 @@ public TSStatus alterDatabase(DatabaseSchemaPlan databaseSchemaPlan) {
}

@Override
public synchronized TSStatus deleteDatabases(TDeleteDatabasesReq tDeleteReq) {
TSStatus status = confirmLeader();
public synchronized TSStatus deleteDatabases(final TDeleteDatabasesReq tDeleteReq) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
List<String> deletedPaths = tDeleteReq.getPrefixPathList();
final List<String> deletedPaths = tDeleteReq.getPrefixPathList();
// remove wild
Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
final Map<String, TDatabaseSchema> deleteDatabaseSchemaMap =
getClusterSchemaManager().getMatchedDatabaseSchemasByName(deletedPaths);
if (deleteDatabaseSchemaMap.isEmpty()) {
return RpcUtils.getStatus(
TSStatusCode.PATH_NOT_EXIST.getStatusCode(),
String.format("Path %s does not exist", Arrays.toString(deletedPaths.toArray())));
}
ArrayList<TDatabaseSchema> parsedDeleteDatabases =
final ArrayList<TDatabaseSchema> parsedDeleteDatabases =
new ArrayList<>(deleteDatabaseSchemaMap.values());
return procedureManager.deleteDatabases(
parsedDeleteDatabases,
Expand Down Expand Up @@ -1834,14 +1834,15 @@ public TShowConfigNodesResp showConfigNodes() {
}

@Override
public TShowDatabaseResp showDatabase(TGetDatabaseReq req) {
TSStatus status = confirmLeader();
public TShowDatabaseResp showDatabase(final TGetDatabaseReq req) {
final TSStatus status = confirmLeader();
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
PathPatternTree scope =
final PathPatternTree scope =
req.getScopePatternTree() == null
? SchemaConstant.ALL_MATCH_SCOPE
: PathPatternTree.deserialize(ByteBuffer.wrap(req.getScopePatternTree()));
GetDatabasePlan getDatabasePlan = new GetDatabasePlan(req.getDatabasePathPattern(), scope);
final GetDatabasePlan getDatabasePlan =
new GetDatabasePlan(req.getDatabasePathPattern(), scope);
return getClusterSchemaManager().showDatabase(getDatabasePlan);
} else {
return new TShowDatabaseResp().setStatus(status);
Expand Down Expand Up @@ -1943,17 +1944,18 @@ public TGetPathsSetTemplatesResp getPathsSetTemplate(TGetPathsSetTemplatesReq re
}

@Override
public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
TSStatus status = confirmLeader();
public TSStatus deactivateSchemaTemplate(final TDeactivateSchemaTemplateReq req) {
final TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return status;
}

PathPatternTree patternTree =
final PathPatternTree patternTree =
PathPatternTree.deserialize(ByteBuffer.wrap(req.getPathPatternTree()));

List<PartialPath> patternList = patternTree.getAllPathPatterns();
TemplateSetInfoResp templateSetInfoResp = clusterSchemaManager.getTemplateSetInfo(patternList);
final List<PartialPath> patternList = patternTree.getAllPathPatterns();
final TemplateSetInfoResp templateSetInfoResp =
clusterSchemaManager.getTemplateSetInfo(patternList);
if (templateSetInfoResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return templateSetInfoResp.getStatus();
}
Expand All @@ -1968,9 +1970,9 @@ public TSStatus deactivateSchemaTemplate(TDeactivateSchemaTemplateReq req) {
}

if (!req.getTemplateName().equals(ONE_LEVEL_PATH_WILDCARD)) {
Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
for (Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
for (Template template : entry.getValue()) {
final Map<PartialPath, List<Template>> filteredTemplateSetInfo = new HashMap<>();
for (final Map.Entry<PartialPath, List<Template>> entry : templateSetInfo.entrySet()) {
for (final Template template : entry.getValue()) {
if (template.getName().equals(req.getTemplateName())) {
filteredTemplateSetInfo.put(entry.getKey(), Collections.singletonList(template));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private TSStatus executePlan(final ConfigPhysicalPlan plan) throws ConsensusExce
case CreateDatabase:
// Here we only reserve database name and substitute the sender's local information
// with the receiver's default configurations
TDatabaseSchema schema = ((DatabaseSchemaPlan) plan).getSchema();
final TDatabaseSchema schema = ((DatabaseSchemaPlan) plan).getSchema();
schema.setSchemaReplicationFactor(
ConfigNodeDescriptor.getInstance().getConf().getSchemaReplicationFactor());
schema.setDataReplicationFactor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,11 +336,11 @@ public DatabaseSchemaResp getMatchedDatabaseSchema(GetDatabasePlan getDatabasePl
}

/** Only used in cluster tool show Databases. */
public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
public TShowDatabaseResp showDatabase(final GetDatabasePlan getDatabasePlan) {
DatabaseSchemaResp databaseSchemaResp;
try {
databaseSchemaResp = (DatabaseSchemaResp) getConsensusManager().read(getDatabasePlan);
} catch (ConsensusException e) {
} catch (final ConsensusException e) {
LOGGER.warn(CONSENSUS_READ_ERROR, e);
TSStatus res = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
res.setMessage(e.getMessage());
Expand All @@ -352,10 +352,10 @@ public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
return new TShowDatabaseResp().setStatus(databaseSchemaResp.getStatus());
}

Map<String, TDatabaseInfo> infoMap = new ConcurrentHashMap<>();
for (TDatabaseSchema databaseSchema : databaseSchemaResp.getSchemaMap().values()) {
String database = databaseSchema.getName();
TDatabaseInfo databaseInfo = new TDatabaseInfo();
final Map<String, TDatabaseInfo> infoMap = new ConcurrentHashMap<>();
for (final TDatabaseSchema databaseSchema : databaseSchemaResp.getSchemaMap().values()) {
final String database = databaseSchema.getName();
final TDatabaseInfo databaseInfo = new TDatabaseInfo();
databaseInfo.setName(database);
databaseInfo.setSchemaReplicationFactor(databaseSchema.getSchemaReplicationFactor());
databaseInfo.setDataReplicationFactor(databaseSchema.getDataReplicationFactor());
Expand All @@ -369,13 +369,14 @@ public TShowDatabaseResp showDatabase(GetDatabasePlan getDatabasePlan) {
getMinRegionGroupNum(database, TConsensusGroupType.DataRegion));
databaseInfo.setMaxDataRegionNum(
getMaxRegionGroupNum(database, TConsensusGroupType.DataRegion));
databaseInfo.setIsTableModel(databaseSchema.isIsTableModel());

try {
databaseInfo.setSchemaRegionNum(
getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.SchemaRegion));
databaseInfo.setDataRegionNum(
getPartitionManager().getRegionGroupCount(database, TConsensusGroupType.DataRegion));
} catch (DatabaseNotExistsException e) {
} catch (final DatabaseNotExistsException e) {
// Skip pre-deleted Database
LOGGER.warn(
"The Database: {} doesn't exist. Maybe it has been pre-deleted.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,21 +991,21 @@ public boolean processTakeSnapshot(File snapshotDir) throws TException, IOExcept
}
}

public void processLoadSnapshot(File snapshotDir) throws TException, IOException {
public void processLoadSnapshot(final File snapshotDir) throws TException, IOException {

File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
final File snapshotFile = new File(snapshotDir, SNAPSHOT_FILENAME);
if (!snapshotFile.exists() || !snapshotFile.isFile()) {
LOGGER.error(
"Failed to load snapshot,snapshot file [{}] is not exist.",
snapshotFile.getAbsolutePath());
return;
}

try (BufferedInputStream fileInputStream =
try (final BufferedInputStream fileInputStream =
new BufferedInputStream(
Files.newInputStream(snapshotFile.toPath()), PARTITION_TABLE_BUFFER_SIZE);
TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
final TIOStreamTransport tioStreamTransport = new TIOStreamTransport(fileInputStream)) {
final TProtocol protocol = new TBinaryProtocol(tioStreamTransport);
// before restoring a snapshot, clear all old data
clear();

Expand All @@ -1015,19 +1015,21 @@ public void processLoadSnapshot(File snapshotDir) throws TException, IOException
// restore StorageGroupPartitionTable
int length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
final String storageGroup = ReadWriteIOUtils.readString(fileInputStream);
if (storageGroup == null) {
throw new IOException("Failed to load snapshot because get null StorageGroup name");
}
DatabasePartitionTable databasePartitionTable = new DatabasePartitionTable(storageGroup);
final DatabasePartitionTable databasePartitionTable =
new DatabasePartitionTable(storageGroup);
databasePartitionTable.deserialize(fileInputStream, protocol);
databasePartitionTables.put(storageGroup, databasePartitionTable);
}

// restore deletedRegionSet
length = ReadWriteIOUtils.readInt(fileInputStream);
for (int i = 0; i < length; i++) {
RegionMaintainTask task = RegionMaintainTask.Factory.create(fileInputStream, protocol);
final RegionMaintainTask task =
RegionMaintainTask.Factory.create(fileInputStream, protocol);
regionMaintainTaskList.add(task);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public ClusterSchemaInfo() throws IOException {
/**
* Cache DatabaseSchema.
*
* @param plan DatabaseSchemaPlan
* @param plan {@link DatabaseSchemaPlan}
* @return {@link TSStatusCode#SUCCESS_STATUS} if the Database is set successfully.
*/
public TSStatus createDatabase(final DatabaseSchemaPlan plan) {
Expand Down Expand Up @@ -195,7 +195,7 @@ public TSStatus alterDatabase(final DatabaseSchemaPlan plan) {
final TDatabaseSchema alterSchema = plan.getSchema();
final PartialPath partialPathName = new PartialPath(alterSchema.getName());

TDatabaseSchema currentSchema =
final TDatabaseSchema currentSchema =
mTree.getDatabaseNodeByDatabasePath(partialPathName).getAsMNode().getDatabaseSchema();
// TODO: Support alter other fields
if (alterSchema.isSetMinSchemaRegionGroupNum()) {
Expand Down Expand Up @@ -234,7 +234,7 @@ public TSStatus alterDatabase(final DatabaseSchemaPlan plan) {
.getAsMNode()
.setDatabaseSchema(currentSchema);
result.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
} catch (MetadataException e) {
} catch (final MetadataException e) {
LOGGER.error(ERROR_NAME, e);
result.setCode(e.getErrorCode()).setMessage(e.getMessage());
} finally {
Expand Down Expand Up @@ -455,7 +455,7 @@ public TSStatus adjustMaxRegionGroupCount(final AdjustMaxRegionGroupNumPlan plan
public List<String> getDatabaseNames() {
databaseReadWriteLock.readLock().lock();
try {
return mTree.getAllDatabasePaths().stream()
return mTree.getAllDatabasePaths(false).stream()
.map(PartialPath::getFullPath)
.collect(Collectors.toList());
} finally {
Expand Down Expand Up @@ -759,12 +759,12 @@ public Template getTemplate(int id) throws MetadataException {
}

public synchronized TemplateInfoResp checkTemplateSettable(
CheckTemplateSettablePlan checkTemplateSettablePlan) {
TemplateInfoResp resp = new TemplateInfoResp();
PartialPath path;
final CheckTemplateSettablePlan checkTemplateSettablePlan) {
final TemplateInfoResp resp = new TemplateInfoResp();
final PartialPath path;
try {
path = new PartialPath(checkTemplateSettablePlan.getPath());
} catch (IllegalPathException e) {
} catch (final IllegalPathException e) {
LOGGER.error(e.getMessage());
resp.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
return resp;
Expand All @@ -776,7 +776,7 @@ public synchronized TemplateInfoResp checkTemplateSettable(
resp.setTemplateList(
Collections.singletonList(
templateTable.getTemplate(checkTemplateSettablePlan.getName())));
} catch (MetadataException e) {
} catch (final MetadataException e) {
LOGGER.error(e.getMessage(), e);
resp.setStatus(RpcUtils.getStatus(e.getErrorCode(), e.getMessage()));
}
Expand Down Expand Up @@ -933,13 +933,12 @@ public AllTemplateSetInfoResp getAllTemplateSetInfo() {
* into specified path pattern start with template set path. The result set is organized as
* specified path pattern -> template id
*/
public TemplateSetInfoResp getTemplateSetInfo(GetTemplateSetInfoPlan plan) {
TemplateSetInfoResp resp = new TemplateSetInfoResp();
public TemplateSetInfoResp getTemplateSetInfo(final GetTemplateSetInfoPlan plan) {
final TemplateSetInfoResp resp = new TemplateSetInfoResp();
try {

Map<PartialPath, Set<Integer>> allTemplateSetInfo = new HashMap<>();
for (PartialPath pattern : plan.getPatternList()) {
Map<Integer, Set<PartialPath>> templateSetInfo = mTree.getTemplateSetInfo(pattern);
final Map<PartialPath, Set<Integer>> allTemplateSetInfo = new HashMap<>();
for (final PartialPath pattern : plan.getPatternList()) {
final Map<Integer, Set<PartialPath>> templateSetInfo = mTree.getTemplateSetInfo(pattern);
if (templateSetInfo.isEmpty()) {
continue;
}
Expand Down
Loading

0 comments on commit 77a2dd7

Please sign in to comment.