From 994fcb19acc8c940441fe99c2ef4c17168a8d36e Mon Sep 17 00:00:00 2001 From: Petrichor <31833513+vinlee19@users.noreply.github.com> Date: Tue, 6 Aug 2024 19:18:51 +0800 Subject: [PATCH] [cdc] fix create doris table failed when load data from DB2 (#451) --- .../doris/flink/tools/cdc/db2/Db2Schema.java | 10 +++++ .../tools/cdc/CdcDb2SyncDatabaseCase.java | 42 ++++++++----------- 2 files changed, 27 insertions(+), 25 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java index 5aaf8cea5..c36777f36 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/db2/Db2Schema.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc.db2; +import org.apache.doris.flink.catalog.doris.FieldSchema; import org.apache.doris.flink.tools.cdc.JdbcSourceSchema; import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.LinkedHashMap; public class Db2Schema extends JdbcSourceSchema { public Db2Schema( @@ -41,4 +44,11 @@ public String convertToDorisType(String fieldType, Integer precision, Integer sc public String getCdcTableName() { return schemaName + "\\." + tableName; } + + @Override + public LinkedHashMap getColumnInfo( + DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) + throws SQLException { + return super.getColumnInfo(metaData, null, schemaName, tableName); + } } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java index 77b8931d9..0327079a0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/tools/cdc/CdcDb2SyncDatabaseCase.java @@ -17,9 +17,12 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions; +import org.apache.flink.cdc.connectors.base.options.SourceOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.doris.flink.table.DorisConfigOptions; import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync; import java.util.HashMap; @@ -35,42 +38,31 @@ public static void main(String[] args) throws Exception { env.disableOperatorChaining(); env.enableCheckpointing(10000); - // Map flinkMap = new HashMap<>(); - // flinkMap.put("execution.checkpointing.interval","10s"); - // flinkMap.put("pipeline.operator-chaining","false"); - // flinkMap.put("parallelism.default","1"); - - // Configuration configuration = Configuration.fromMap(flinkMap); - // env.configure(configuration); - String database = "db2_test"; String tablePrefix = ""; String tableSuffix = ""; Map sourceConfig = new HashMap<>(); - sourceConfig.put("database-name", "testdb"); - sourceConfig.put("schema-name", "DB2INST1"); - sourceConfig.put("hostname", "127.0.0.1"); - sourceConfig.put("port", "50000"); - sourceConfig.put("username", "db2inst1"); - sourceConfig.put("password", "=doris123456"); - // sourceConfig.put("debezium.database.tablename.case.insensitive","false"); - sourceConfig.put("scan.incremental.snapshot.enabled", "true"); - // sourceConfig.put("debezium.include.schema.changes","false"); + sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb"); + sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1"); + sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1"); + sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000"); + sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1"); + sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456"); + sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true"); Configuration config = Configuration.fromMap(sourceConfig); Map sinkConfig = new HashMap<>(); - sinkConfig.put("fenodes", "127.0.0.1:8030"); - // sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040"); - sinkConfig.put("username", "root"); - sinkConfig.put("password", "123456"); - sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030"); - sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString()); + sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030"); + sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root"); + sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456"); + sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://127.0.0.1:9030"); + sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString()); Configuration sinkConf = Configuration.fromMap(sinkConfig); Map tableConfig = new HashMap<>(); - tableConfig.put("replication_num", "1"); - // tableConfig.put("table-buckets", "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); + tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1"); + tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"); String includingTables = "FULL_TYPES"; String excludingTables = null; String multiToOneOrigin = null;