Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve]Support modify column type without default when column exists default value #490

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ public IllegalArgumentException(String msg, Throwable cause) {
public IllegalArgumentException(String arg, String value) {
super("argument '" + arg + "' is illegal, value is '" + value + "'.");
}

public IllegalArgumentException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class SchemaChangeHelper {
private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s";
private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s";
private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'";
private static final String SHOW_FULL_COLUMN_DDL = "SHOW FULL COLUMNS FROM `%s`.`%s`";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this approach not universal?
This sql seems to be the unique syntax of MySQL. If the upstream is Oracle or PostgreSQL, it will not be supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It only executes within Doris.


public static void compareSchema(
Map<String, FieldSchema> updateFiledSchemaMap,
Expand Down Expand Up @@ -166,13 +167,19 @@ public static String buildModifyColumnDataTypeDDL(
String columnName = fieldSchema.getName();
String dataType = fieldSchema.getTypeString();
String comment = fieldSchema.getComment();
String defaultValue = fieldSchema.getDefaultValue();
StringBuilder modifyDDL =
new StringBuilder(
String.format(
MODIFY_TYPE_DDL,
DorisSchemaFactory.quoteTableIdentifier(tableIdentifier),
DorisSchemaFactory.identifier(columnName),
dataType));
if (StringUtils.isNotBlank(defaultValue)) {
modifyDDL
.append(" DEFAULT ")
.append(DorisSchemaFactory.quoteDefaultValue(defaultValue));
}
commentColumn(modifyDDL, comment);
return modifyDDL.toString();
}
Expand All @@ -183,6 +190,10 @@ private static void commentColumn(StringBuilder ddl, String comment) {
}
}

public static String buildShowFullColumnDDL(String database, String table) {
return String.format(SHOW_FULL_COLUMN_DDL, database, table);
}

public static List<DDLSchema> getDdlSchemas() {
return ddlSchemas;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.commons.codec.binary.Base64;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
Expand Down Expand Up @@ -123,11 +124,30 @@ public boolean modifyColumnDataType(String database, String table, FieldSchema f
throws IOException, IllegalArgumentException {
if (!checkColumnExists(database, table, field.getName())) {
LOG.warn(
"The column {} is not exists in table {}, can not modify it type",
"The column {} is not exists in table {}, can not modify it's type",
field.getName(),
table);
return false;
}

String ddl = SchemaChangeHelper.buildShowFullColumnDDL(database, table);
String defaultValue = getDefaultValue(ddl, database, field.getName());
if (!StringUtils.isNullOrWhitespaceOnly(field.getDefaultValue())) {
// Can not change default value
if (!field.getDefaultValue().equals(defaultValue)) {
LOG.warn(
"Column:{} can not change default value from {} to {}, fallback it",
field.getName(),
defaultValue,
field.getDefaultValue());
field.setDefaultValue(defaultValue);
}
} else {
// If user does not give a default value, need fill it from
// original table schema to avoid change type failed if default value exists
field.setDefaultValue(defaultValue);
}

// If user does not give a comment, need fill it from
// original table schema to avoid miss comment
if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) {
Expand Down Expand Up @@ -214,15 +234,42 @@ private boolean handleSchemaChange(String responseEntity) throws JsonProcessingE
}
}

private String getDefaultValue(String ddl, String database, String column)
throws IOException, IllegalArgumentException {
String responseEntity = executeThenReturnResponse(ddl, database);
JsonNode responseNode = objectMapper.readTree(responseEntity);
String code = responseNode.get("code").asText("-1");
if (code.equals("0")) {
JsonNode data = responseNode.get("data").get("data");
for (JsonNode node : data) {
if (node.get(0).asText().equals(column)) {
JsonNode defaultValueNode = node.get(5);
return (defaultValueNode instanceof NullNode)
? null
: defaultValueNode.asText();
}
}
return null;
} else {
throw new DorisSchemaChangeException(
"Failed to get default value, response: " + responseEntity);
}
}

/** execute sql in doris. */
public boolean execute(String ddl, String database)
private String executeThenReturnResponse(String ddl, String database)
throws IOException, IllegalArgumentException {
if (StringUtils.isNullOrWhitespaceOnly(ddl)) {
return false;
throw new IllegalArgumentException("ddl can not be null or empty string!");
}
LOG.info("Execute SQL: {}", ddl);
HttpPost httpPost = buildHttpPost(ddl, database);
String responseEntity = handleResponse(httpPost);
return handleResponse(httpPost);
}

public boolean execute(String ddl, String database)
throws IOException, IllegalArgumentException {
String responseEntity = executeThenReturnResponse(ddl, database);
return handleSchemaChange(responseEntity);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.doris.flink.sink.schema;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSchemaFactory;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisOptions;
Expand Down Expand Up @@ -57,7 +59,7 @@ public void setUp() throws Exception {
schemaChangeManager = new SchemaChangeManager(options);
}

private void initDorisSchemaChangeTable(String table) {
private void initDorisSchemaChangeTable(String table, String defaultValue) {
ContainerUtils.executeSQLStatement(
getDorisQueryConnection(),
LOG,
Expand All @@ -67,17 +69,22 @@ private void initDorisSchemaChangeTable(String table) {
"CREATE TABLE %s.%s ( \n"
+ "`id` varchar(32),\n"
+ "`age` int\n"
+ (StringUtils.isNotBlank(defaultValue)
? " DEFAULT "
+ DorisSchemaFactory.quoteDefaultValue(defaultValue)
: "")
+ ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n"
+ "PROPERTIES (\n"
+ "\"replication_num\" = \"1\"\n"
+ ")\n",
DATABASE, table));
DATABASE,
table));
}

@Test
public void testAddColumn() throws IOException, IllegalArgumentException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
initDorisSchemaChangeTable(addColumnTbls, null);
FieldSchema field = new FieldSchema("c1", "int", "");
schemaChangeManager.addColumn(DATABASE, addColumnTbls, field);
boolean exists = schemaChangeManager.addColumn(DATABASE, addColumnTbls, field);
Expand All @@ -91,7 +98,7 @@ public void testAddColumn() throws IOException, IllegalArgumentException {
public void testAddColumnWithChineseComment()
throws IOException, IllegalArgumentException, InterruptedException {
String addColumnTbls = "add_column";
initDorisSchemaChangeTable(addColumnTbls);
initDorisSchemaChangeTable(addColumnTbls, null);

// add a column by UTF-8 encoding
String addColumnName = "col_with_comment1";
Expand Down Expand Up @@ -147,7 +154,7 @@ private String getColumnType(String table, String columnName) {
@Test
public void testDropColumn() throws IOException, IllegalArgumentException {
String dropColumnTbls = "drop_column";
initDorisSchemaChangeTable(dropColumnTbls);
initDorisSchemaChangeTable(dropColumnTbls, null);
schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
boolean success = schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age");
Assert.assertTrue(success);
Expand All @@ -159,7 +166,7 @@ public void testDropColumn() throws IOException, IllegalArgumentException {
@Test
public void testRenameColumn() throws IOException, IllegalArgumentException {
String renameColumnTbls = "rename_column";
initDorisSchemaChangeTable(renameColumnTbls);
initDorisSchemaChangeTable(renameColumnTbls, null);
schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1");
boolean exists = schemaChangeManager.checkColumnExists(DATABASE, renameColumnTbls, "age1");
Assert.assertTrue(exists);
Expand All @@ -171,7 +178,7 @@ public void testRenameColumn() throws IOException, IllegalArgumentException {
@Test
public void testModifyColumnComment() throws IOException, IllegalArgumentException {
String modifyColumnCommentTbls = "modify_column_comment";
initDorisSchemaChangeTable(modifyColumnCommentTbls);
initDorisSchemaChangeTable(modifyColumnCommentTbls, null);
String columnName = "age";
String newComment = "new comment of age";
schemaChangeManager.modifyColumnComment(
Expand All @@ -187,7 +194,7 @@ public void testOnlyModifyColumnType()
String modifyColumnTbls = "modify_column_type";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls);
initDorisSchemaChangeTable(modifyColumnTbls, null);
FieldSchema field = new FieldSchema(columnName, newColumnType, "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Expand All @@ -200,7 +207,7 @@ public void testOnlyModifyColumnType()
public void testModifyColumnTypeAndComment()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_and_comment";
initDorisSchemaChangeTable(modifyColumnTbls);
initDorisSchemaChangeTable(modifyColumnTbls, null);
String columnName = "age";
String newColumnType = "bigint";
String newComment = "new comment of age";
Expand Down Expand Up @@ -238,4 +245,49 @@ public void testCreateTableWhenDatabaseNotExists()
Thread.sleep(3_000);
Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, tableName));
}

@Test
public void testModifyColumnTypeWithoutDefault()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_without_default_value";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, null, "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}

@Test
public void testModifyColumnTypeWithDefault()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_with_default_value";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, "18", "");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}

@Test
public void testModifyColumnTypeWithDefaultAndChange()
throws IOException, IllegalArgumentException, InterruptedException {
String modifyColumnTbls = "modify_column_type_with_default_value_and_change";
String columnName = "age";
String newColumnType = "bigint";
initDorisSchemaChangeTable(modifyColumnTbls, "18");
FieldSchema field = new FieldSchema(columnName, newColumnType, "19", "new comment");
schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field);

Thread.sleep(3_000);
String columnType = getColumnType(modifyColumnTbls, columnName);
Assert.assertEquals(newColumnType, columnType.toLowerCase());
}
}
Loading