Skip to content

Commit

Permalink
[Improve]Support modify column type without default when column exist…
Browse files Browse the repository at this point in the history
…s default value (#490)
  • Loading branch information
qg-lin authored Sep 20, 2024
1 parent d7323aa commit c4ae051
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ public IllegalArgumentException(String msg, Throwable cause) {
public IllegalArgumentException(String arg, String value) {
super("argument '" + arg + "' is illegal, value is '" + value + "'.");
}

public IllegalArgumentException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class SchemaChangeHelper {
private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s";
private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s";
private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'";
private static final String SHOW_FULL_COLUMN_DDL = "SHOW FULL COLUMNS FROM `%s`.`%s`";

public static void compareSchema(
Map<String, FieldSchema> updateFiledSchemaMap,
Expand Down Expand Up @@ -166,13 +167,19 @@ public static String buildModifyColumnDataTypeDDL(
String columnName = fieldSchema.getName();
String dataType = fieldSchema.getTypeString();
String comment = fieldSchema.getComment();
String defaultValue = fieldSchema.getDefaultValue();
StringBuilder modifyDDL =
new StringBuilder(
String.format(
MODIFY_TYPE_DDL,
DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),
DorisSchemaFactory.identifier(columnName),
dataType));
if (StringUtils.isNotBlank(defaultValue)) {
modifyDDL
.append(" DEFAULT ")
.append(DorisSchemaFactory.quoteDefaultValue(defaultValue));
}
commentColumn(modifyDDL, comment);
return modifyDDL.toString();
}
Expand All @@ -183,6 +190,10 @@ private static void commentColumn(StringBuilder ddl, String comment) {
}
}

public static String buildShowFullColumnDDL(String database, String table) {
return String.format(SHOW_FULL_COLUMN_DDL, database, table);
}

public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
Expand Down Expand Up @@ -123,11 +124,30 @@ public boolean modifyColumnDataType(String database, String table, FieldSchema f
throws IOException, IllegalArgumentException {
if (!checkColumnExists(database, table, field.getName())) {
LOG.warn(
"The column {} is not exists in table {}, can not modify it type",
"The column {} is not exists in table {}, can not modify it's type",
field.getName(),
table);
return false;
}

String ddl = SchemaChangeHelper.buildShowFullColumnDDL(database, table);
String defaultValue = getDefaultValue(ddl, database, field.getName());
if (!StringUtils.isNullOrWhitespaceOnly(field.getDefaultValue())) {
// Can not change default value
if (!field.getDefaultValue().equals(defaultValue)) {
LOG.warn(
"Column:{} can not change default value from {} to {}, fallback it",
field.getName(),
defaultValue,
field.getDefaultValue());
field.setDefaultValue(defaultValue);
}
} else {
// If user does not give a default value, need fill it from
// original table schema to avoid change type failed if default value exists
field.setDefaultValue(defaultValue);
}

// If user does not give a comment, need fill it from
// original table schema to avoid miss comment
if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) {
Expand Down Expand Up @@ -214,15 +234,42 @@ private boolean handleSchemaChange(String responseEntity) throws JsonProcessingE
}
}

private String getDefaultValue(String ddl, String database, String column)
throws IOException, IllegalArgumentException {
String responseEntity = executeThenReturnResponse(ddl, database);
JsonNode responseNode = objectMapper.readTree(responseEntity);
String code = responseNode.get("code").asText("-1");
if (code.equals("0")) {
JsonNode data = responseNode.get("data").get("data");
for (JsonNode node : data) {
if (node.get(0).asText().equals(column)) {
JsonNode defaultValueNode = node.get(5);
return (defaultValueNode instanceof NullNode)
? null
: defaultValueNode.asText();
}
}
return null;
} else {
throw new DorisSchemaChangeException(
"Failed to get default value, response: " + responseEntity);
}
}

/** execute sql in doris. */
public boolean execute(String ddl, String database)
private String executeThenReturnResponse(String ddl, String database)
throws IOException, IllegalArgumentException {
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
return false;
throw new IllegalArgumentException("ddl can not be null or empty string!");
}
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
String responseEntity = handleResponse(httpPost);
return handleResponse(httpPost);
}

public boolean execute(String ddl, String database)
throws IOException, IllegalArgumentException {
String responseEntity = executeThenReturnResponse(ddl, database);
return handleSchemaChange(responseEntity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.flink.sink.schema;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void setUp() throws Exception {
schemaChangeManager = new SchemaChangeManager(options);
}

private void initDorisSchemaChangeTable(String table) {
private void initDorisSchemaChangeTable(String table, String defaultValue) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
Expand All @@ -67,17 +69,22 @@ private void initDorisSchemaChangeTable(String table) {
"CREATE TABLE %s.%s ( \n"
+ "`id` varchar(32),\n"
+ "`age` int\n"
+ (StringUtils.isNotBlank(defaultValue)
? " DEFAULT "
+ DorisSchemaFactory.quoteDefaultValue(defaultValue)
: "")
+ ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE, table));
DATABASE,
table));
}

@Test
public void testAddColumn() throws IOException, IllegalArgumentException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
initDorisSchemaChangeTable(addColumnTbls, null);
FieldSchema field = new FieldSchema("c1", "int", "");
schemaChangeManager.addColumn(DATABASE, addColumnTbls, field);
boolean exists = schemaChangeManager.addColumn(DATABASE, addColumnTbls, field);
Expand All @@ -91,7 +98,7 @@ public void testAddColumn() throws IOException, IllegalArgumentException {
public void testAddColumnWithChineseComment()
throws IOException, IllegalArgumentException, InterruptedException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
initDorisSchemaChangeTable(addColumnTbls, null);

// add a column by UTF-8 encoding
String addColumnName = "col_with_comment1";
Expand Down Expand Up @@ -147,7 +154,7 @@ private String getColumnType(String table, String columnName) {
@Test
public void testDropColumn() throws IOException, IllegalArgumentException {
String dropColumnTbls = "drop_column";
initDorisSchemaChangeTable(dropColumnTbls);
initDorisSchemaChangeTable(dropColumnTbls, null);
schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
boolean success = schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
Assert.assertTrue(success);
Expand All @@ -159,7 +166,7 @@ public void testDropColumn() throws IOException, IllegalArgumentException {
@Test
public void testRenameColumn() throws IOException, IllegalArgumentException {
String renameColumnTbls = "rename_column";
initDorisSchemaChangeTable(renameColumnTbls);
initDorisSchemaChangeTable(renameColumnTbls, null);
schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1");
boolean exists = schemaChangeManager.checkColumnExists(DATABASE, renameColumnTbls, "age1");
Assert.assertTrue(exists);
Expand All @@ -171,7 +178,7 @@ public void testRenameColumn() throws IOException, IllegalArgumentException {
@Test
public void testModifyColumnComment() throws IOException, IllegalArgumentException {
String modifyColumnCommentTbls = "modify_column_comment";
initDorisSchemaChangeTable(modifyColumnCommentTbls);
initDorisSchemaChangeTable(modifyColumnCommentTbls, null);
String columnName = "age";
String newComment = "new comment of age";
schemaChangeManager.modifyColumnComment(
Expand All @@ -187,7 +194,7 @@ public void testOnlyModifyColumnType()
String modifyColumnTbls = "modify_column_type";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls);
initDorisSchemaChangeTable(modifyColumnTbls, null);
FieldSchema field = new FieldSchema(columnName, newColumnType, "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Expand All @@ -200,7 +207,7 @@ public void testOnlyModifyColumnType()
public void testModifyColumnTypeAndComment()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_and_comment";
initDorisSchemaChangeTable(modifyColumnTbls);
initDorisSchemaChangeTable(modifyColumnTbls, null);
String columnName = "age";
String newColumnType = "bigint";
String newComment = "new comment of age";
Expand Down Expand Up @@ -238,4 +245,49 @@ public void testCreateTableWhenDatabaseNotExists()
Thread.sleep(3_000);
Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, tableName));
}

@Test
public void testModifyColumnTypeWithoutDefault()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_without_default_value";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, null, "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}

@Test
public void testModifyColumnTypeWithDefault()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_with_default_value";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, "18", "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}

@Test
public void testModifyColumnTypeWithDefaultAndChange()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_with_default_value_and_change";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, "19", "new comment");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}
}

0 comments on commit c4ae051

Please sign in to comment.