From c7f3c55b14e075332d35a9484c1f00d44f5db551 Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Sun, 15 Sep 2024 21:52:04 +0800 Subject: [PATCH 1/2] [Improve]Support modify column type without default when column exists default value --- .../flink/sink/schema/SchemaChangeHelper.java | 11 +++ .../sink/schema/SchemaChangeManager.java | 55 +++++++++++++-- .../sink/schema/SchemaManagerITCase.java | 70 ++++++++++++++++--- 3 files changed, 123 insertions(+), 13 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java index 74b574177..d0630b03e 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeHelper.java @@ -44,6 +44,7 @@ public class SchemaChangeHelper { private static final String CREATE_DATABASE_DDL = "CREATE DATABASE IF NOT EXISTS %s"; private static final String MODIFY_TYPE_DDL = "ALTER TABLE %s MODIFY COLUMN %s %s"; private static final String MODIFY_COMMENT_DDL = "ALTER TABLE %s MODIFY COLUMN %s COMMENT '%s'"; + private static final String SHOW_FULL_COLUMN_DDL = "SHOW FULL COLUMNS FROM `%s`.`%s`"; public static void compareSchema( Map updateFiledSchemaMap, @@ -166,6 +167,7 @@ public static String buildModifyColumnDataTypeDDL( String columnName = fieldSchema.getName(); String dataType = fieldSchema.getTypeString(); String comment = fieldSchema.getComment(); + String defaultValue = fieldSchema.getDefaultValue(); StringBuilder modifyDDL = new StringBuilder( String.format( @@ -173,6 +175,11 @@ public static String buildModifyColumnDataTypeDDL( DorisSchemaFactory.quoteTableIdentifier(tableIdentifier), DorisSchemaFactory.identifier(columnName), dataType)); + if (StringUtils.isNotBlank(defaultValue)) { + modifyDDL + .append(" DEFAULT ") + .append(DorisSchemaFactory.quoteDefaultValue(defaultValue)); + } commentColumn(modifyDDL, comment); return modifyDDL.toString(); } @@ -183,6 +190,10 @@ private static void commentColumn(StringBuilder ddl, String comment) { } } + public static String buildShowFullColumnDDL(String database, String table) { + return String.format(SHOW_FULL_COLUMN_DDL, database, table); + } + public static List getDdlSchemas() { return ddlSchemas; } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index c946bee70..bd336e125 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NullNode; import org.apache.commons.codec.binary.Base64; import org.apache.doris.flink.catalog.doris.DorisSystem; import org.apache.doris.flink.catalog.doris.FieldSchema; @@ -123,11 +124,30 @@ public boolean modifyColumnDataType(String database, String table, FieldSchema f throws IOException, IllegalArgumentException { if (!checkColumnExists(database, table, field.getName())) { LOG.warn( - "The column {} is not exists in table {}, can not modify it type", + "The column {} is not exists in table {}, can not modify it's type", field.getName(), table); return false; } + + String ddl = SchemaChangeHelper.buildShowFullColumnDDL(database, table); + String defaultValue = getDefaultValue(ddl, database, field.getName()); + if (!StringUtils.isNullOrWhitespaceOnly(field.getDefaultValue())) { + // Can not change default value + if (!field.getDefaultValue().equals(defaultValue)) { + LOG.warn( + "Column:{} can not change default value from {} to {}, fallback it", + field.getName(), + defaultValue, + field.getDefaultValue()); + field.setDefaultValue(defaultValue); + } + } else { + // If user does not give a default value, need fill it from + // original table schema to avoid change type failed if default value exists + field.setDefaultValue(defaultValue); + } + // If user does not give a comment, need fill it from // original table schema to avoid miss comment if (StringUtils.isNullOrWhitespaceOnly(field.getComment())) { @@ -214,15 +234,42 @@ private boolean handleSchemaChange(String responseEntity) throws JsonProcessingE } } + private String getDefaultValue(String ddl, String database, String column) + throws IOException, IllegalArgumentException { + String responseEntity = executeThenReturnResponse(ddl, database); + JsonNode responseNode = objectMapper.readTree(responseEntity); + String code = responseNode.get("code").asText("-1"); + if (code.equals("0")) { + JsonNode data = responseNode.get("data").get("data"); + for (JsonNode node : data) { + if (node.get(0).asText().equals(column)) { + JsonNode defaultValueNode = node.get(5); + return (defaultValueNode instanceof NullNode) + ? null + : defaultValueNode.asText(); + } + } + return null; + } else { + throw new DorisSchemaChangeException( + "Failed to get default value, response: " + responseEntity); + } + } + /** execute sql in doris. */ - public boolean execute(String ddl, String database) + public String executeThenReturnResponse(String ddl, String database) throws IOException, IllegalArgumentException { if (StringUtils.isNullOrWhitespaceOnly(ddl)) { - return false; + return null; } LOG.info("Execute SQL: {}", ddl); HttpPost httpPost = buildHttpPost(ddl, database); - String responseEntity = handleResponse(httpPost); + return handleResponse(httpPost); + } + + public boolean execute(String ddl, String database) + throws IOException, IllegalArgumentException { + String responseEntity = executeThenReturnResponse(ddl, database); return handleSchemaChange(responseEntity); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java index 37ca3a2d2..c4f7f282e 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/schema/SchemaManagerITCase.java @@ -17,7 +17,9 @@ package org.apache.doris.flink.sink.schema; +import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.catalog.doris.DataModel; +import org.apache.doris.flink.catalog.doris.DorisSchemaFactory; import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.catalog.doris.TableSchema; import org.apache.doris.flink.cfg.DorisOptions; @@ -57,7 +59,7 @@ public void setUp() throws Exception { schemaChangeManager = new SchemaChangeManager(options); } - private void initDorisSchemaChangeTable(String table) { + private void initDorisSchemaChangeTable(String table, String defaultValue) { ContainerUtils.executeSQLStatement( getDorisQueryConnection(), LOG, @@ -67,17 +69,22 @@ private void initDorisSchemaChangeTable(String table) { "CREATE TABLE %s.%s ( \n" + "`id` varchar(32),\n" + "`age` int\n" + + (StringUtils.isNotBlank(defaultValue) + ? " DEFAULT " + + DorisSchemaFactory.quoteDefaultValue(defaultValue) + : "") + ") DISTRIBUTED BY HASH(`id`) BUCKETS 1\n" + "PROPERTIES (\n" + "\"replication_num\" = \"1\"\n" + ")\n", - DATABASE, table)); + DATABASE, + table)); } @Test public void testAddColumn() throws IOException, IllegalArgumentException { String addColumnTbls = "add_column"; - initDorisSchemaChangeTable(addColumnTbls); + initDorisSchemaChangeTable(addColumnTbls, null); FieldSchema field = new FieldSchema("c1", "int", ""); schemaChangeManager.addColumn(DATABASE, addColumnTbls, field); boolean exists = schemaChangeManager.addColumn(DATABASE, addColumnTbls, field); @@ -91,7 +98,7 @@ public void testAddColumn() throws IOException, IllegalArgumentException { public void testAddColumnWithChineseComment() throws IOException, IllegalArgumentException, InterruptedException { String addColumnTbls = "add_column"; - initDorisSchemaChangeTable(addColumnTbls); + initDorisSchemaChangeTable(addColumnTbls, null); // add a column by UTF-8 encoding String addColumnName = "col_with_comment1"; @@ -147,7 +154,7 @@ private String getColumnType(String table, String columnName) { @Test public void testDropColumn() throws IOException, IllegalArgumentException { String dropColumnTbls = "drop_column"; - initDorisSchemaChangeTable(dropColumnTbls); + initDorisSchemaChangeTable(dropColumnTbls, null); schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age"); boolean success = schemaChangeManager.dropColumn(DATABASE, dropColumnTbls, "age"); Assert.assertTrue(success); @@ -159,7 +166,7 @@ public void testDropColumn() throws IOException, IllegalArgumentException { @Test public void testRenameColumn() throws IOException, IllegalArgumentException { String renameColumnTbls = "rename_column"; - initDorisSchemaChangeTable(renameColumnTbls); + initDorisSchemaChangeTable(renameColumnTbls, null); schemaChangeManager.renameColumn(DATABASE, renameColumnTbls, "age", "age1"); boolean exists = schemaChangeManager.checkColumnExists(DATABASE, renameColumnTbls, "age1"); Assert.assertTrue(exists); @@ -171,7 +178,7 @@ public void testRenameColumn() throws IOException, IllegalArgumentException { @Test public void testModifyColumnComment() throws IOException, IllegalArgumentException { String modifyColumnCommentTbls = "modify_column_comment"; - initDorisSchemaChangeTable(modifyColumnCommentTbls); + initDorisSchemaChangeTable(modifyColumnCommentTbls, null); String columnName = "age"; String newComment = "new comment of age"; schemaChangeManager.modifyColumnComment( @@ -187,7 +194,7 @@ public void testOnlyModifyColumnType() String modifyColumnTbls = "modify_column_type"; String columnName = "age"; String newColumnType = "bigint"; - initDorisSchemaChangeTable(modifyColumnTbls); + initDorisSchemaChangeTable(modifyColumnTbls, null); FieldSchema field = new FieldSchema(columnName, newColumnType, ""); schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); @@ -200,7 +207,7 @@ public void testOnlyModifyColumnType() public void testModifyColumnTypeAndComment() throws IOException, IllegalArgumentException, InterruptedException { String modifyColumnTbls = "modify_column_type_and_comment"; - initDorisSchemaChangeTable(modifyColumnTbls); + initDorisSchemaChangeTable(modifyColumnTbls, null); String columnName = "age"; String newColumnType = "bigint"; String newComment = "new comment of age"; @@ -238,4 +245,49 @@ public void testCreateTableWhenDatabaseNotExists() Thread.sleep(3_000); Assert.assertNotNull(schemaChangeManager.getTableSchema(databaseName, tableName)); } + + @Test + public void testModifyColumnTypeWithoutDefault() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_without_default_value"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, null, ""); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testModifyColumnTypeWithDefault() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_with_default_value"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, "18", ""); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } + + @Test + public void testModifyColumnTypeWithDefaultAndChange() + throws IOException, IllegalArgumentException, InterruptedException { + String modifyColumnTbls = "modify_column_type_with_default_value_and_change"; + String columnName = "age"; + String newColumnType = "bigint"; + initDorisSchemaChangeTable(modifyColumnTbls, "18"); + FieldSchema field = new FieldSchema(columnName, newColumnType, "19", "new comment"); + schemaChangeManager.modifyColumnDataType(DATABASE, modifyColumnTbls, field); + + Thread.sleep(3_000); + String columnType = getColumnType(modifyColumnTbls, columnName); + Assert.assertEquals(newColumnType, columnType.toLowerCase()); + } } From 46200914bb5939d5b0c22a5e27d7c425eb9f3bfa Mon Sep 17 00:00:00 2001 From: "north.lin" <37775475+qg-lin@users.noreply.github.com> Date: Thu, 19 Sep 2024 09:45:13 +0800 Subject: [PATCH 2/2] throw IllegalArgumentException when ddl is null or empty --- .../doris/flink/exception/IllegalArgumentException.java | 4 ++++ .../apache/doris/flink/sink/schema/SchemaChangeManager.java | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java index 4c0ae0939..7b2428916 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/IllegalArgumentException.java @@ -25,4 +25,8 @@ public IllegalArgumentException(String msg, Throwable cause) { public IllegalArgumentException(String arg, String value) { super("argument '" + arg + "' is illegal, value is '" + value + "'."); } + + public IllegalArgumentException(String msg) { + super(msg); + } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java index bd336e125..50ec1d34a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java @@ -257,10 +257,10 @@ private String getDefaultValue(String ddl, String database, String column) } /** execute sql in doris. */ - public String executeThenReturnResponse(String ddl, String database) + private String executeThenReturnResponse(String ddl, String database) throws IOException, IllegalArgumentException { if (StringUtils.isNullOrWhitespaceOnly(ddl)) { - return null; + throw new IllegalArgumentException("ddl can not be null or empty string!"); } LOG.info("Execute SQL: {}", ddl); HttpPost httpPost = buildHttpPost(ddl, database);