Skip to content

Commit

Permalink
[cdc] fix create doris table failed when load data from DB2 (#451)
Browse files Browse the repository at this point in the history
  • Loading branch information
vinlee19 authored Aug 6, 2024
1 parent 784a6a1 commit 994fcb1
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.doris.flink.tools.cdc.db2;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.tools.cdc.JdbcSourceSchema;

import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.LinkedHashMap;

public class Db2Schema extends JdbcSourceSchema {
public Db2Schema(
Expand All @@ -41,4 +44,11 @@ public String convertToDorisType(String fieldType, Integer precision, Integer sc
public String getCdcTableName() {
return schemaName + "\\." + tableName;
}

@Override
public LinkedHashMap<String, FieldSchema> getColumnInfo(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {
return super.getColumnInfo(metaData, null, schemaName, tableName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.doris.flink.tools.cdc;

import org.apache.flink.cdc.connectors.base.options.JdbcSourceOptions;
import org.apache.flink.cdc.connectors.base.options.SourceOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.doris.flink.table.DorisConfigOptions;
import org.apache.doris.flink.tools.cdc.db2.Db2DatabaseSync;

import java.util.HashMap;
Expand All @@ -35,42 +38,31 @@ public static void main(String[] args) throws Exception {
env.disableOperatorChaining();
env.enableCheckpointing(10000);

// Map<String,String> flinkMap = new HashMap<>();
// flinkMap.put("execution.checkpointing.interval","10s");
// flinkMap.put("pipeline.operator-chaining","false");
// flinkMap.put("parallelism.default","1");

// Configuration configuration = Configuration.fromMap(flinkMap);
// env.configure(configuration);

String database = "db2_test";
String tablePrefix = "";
String tableSuffix = "";
Map<String, String> sourceConfig = new HashMap<>();
sourceConfig.put("database-name", "testdb");
sourceConfig.put("schema-name", "DB2INST1");
sourceConfig.put("hostname", "127.0.0.1");
sourceConfig.put("port", "50000");
sourceConfig.put("username", "db2inst1");
sourceConfig.put("password", "=doris123456");
// sourceConfig.put("debezium.database.tablename.case.insensitive","false");
sourceConfig.put("scan.incremental.snapshot.enabled", "true");
// sourceConfig.put("debezium.include.schema.changes","false");
sourceConfig.put(JdbcSourceOptions.DATABASE_NAME.key(), "testdb");
sourceConfig.put(JdbcSourceOptions.SCHEMA_NAME.key(), "DB2INST1");
sourceConfig.put(JdbcSourceOptions.HOSTNAME.key(), "127.0.0.1");
sourceConfig.put(Db2DatabaseSync.PORT.key(), "50000");
sourceConfig.put(JdbcSourceOptions.USERNAME.key(), "db2inst1");
sourceConfig.put(JdbcSourceOptions.PASSWORD.key(), "=doris123456");
sourceConfig.put(SourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(), "true");

Configuration config = Configuration.fromMap(sourceConfig);

Map<String, String> sinkConfig = new HashMap<>();
sinkConfig.put("fenodes", "127.0.0.1:8030");
// sinkConfig.put("benodes","10.20.30.1:8040, 10.20.30.2:8040, 10.20.30.3:8040");
sinkConfig.put("username", "root");
sinkConfig.put("password", "123456");
sinkConfig.put("jdbc-url", "jdbc:mysql://127.0.0.1:9030");
sinkConfig.put("sink.label-prefix", UUID.randomUUID().toString());
sinkConfig.put(DorisConfigOptions.FENODES.key(), "127.0.0.1:8030");
sinkConfig.put(DorisConfigOptions.USERNAME.key(), "root");
sinkConfig.put(DorisConfigOptions.PASSWORD.key(), "123456");
sinkConfig.put(DorisConfigOptions.JDBC_URL.key(), "jdbc:mysql://127.0.0.1:9030");
sinkConfig.put(DorisConfigOptions.SINK_LABEL_PREFIX.key(), UUID.randomUUID().toString());
Configuration sinkConf = Configuration.fromMap(sinkConfig);

Map<String, String> tableConfig = new HashMap<>();
tableConfig.put("replication_num", "1");
// tableConfig.put("table-buckets", "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
tableConfig.put(DatabaseSyncConfig.REPLICATION_NUM, "1");
tableConfig.put(DatabaseSyncConfig.TABLE_BUCKETS, "tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50");
String includingTables = "FULL_TYPES";
String excludingTables = null;
String multiToOneOrigin = null;
Expand Down

0 comments on commit 994fcb1

Please sign in to comment.