Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve]Improve the hard code in CDC whole database synchronization #443

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,31 +38,27 @@

/** cdc sync tools. */
public class CdcTools {
private static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
private static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
private static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database";
private static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database";
private static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";
private static final List<String> EMPTY_KEYS = Collections.singletonList("password");
private static final List<String> EMPTY_KEYS =
Collections.singletonList(DatabaseSyncConfig.PASSWORD);

public static void main(String[] args) throws Exception {
System.out.println("Input args: " + Arrays.asList(args) + ".\n");
String operation = args[0].toLowerCase();
String[] opArgs = Arrays.copyOfRange(args, 1, args.length);
switch (operation) {
case MYSQL_SYNC_DATABASE:
case DatabaseSyncConfig.MYSQL_SYNC_DATABASE:
createMySQLSyncDatabase(opArgs);
break;
case ORACLE_SYNC_DATABASE:
case DatabaseSyncConfig.ORACLE_SYNC_DATABASE:
createOracleSyncDatabase(opArgs);
break;
case POSTGRES_SYNC_DATABASE:
case DatabaseSyncConfig.POSTGRES_SYNC_DATABASE:
createPostgresSyncDatabase(opArgs);
break;
case SQLSERVER_SYNC_DATABASE:
case DatabaseSyncConfig.SQLSERVER_SYNC_DATABASE:
createSqlServerSyncDatabase(opArgs);
break;
case MONGODB_SYNC_DATABASE:
case DatabaseSyncConfig.MONGODB_SYNC_DATABASE:
createMongoDBSyncDatabase(opArgs);
break;
default:
Expand All @@ -73,72 +69,72 @@ public static void main(String[] args) throws Exception {

private static void createMySQLSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("mysql-conf"));
Map<String, String> mysqlMap = getConfigMap(params, "mysql-conf");
Preconditions.checkArgument(params.has(DatabaseSyncConfig.MYSQL_CONF));
Map<String, String> mysqlMap = getConfigMap(params, DatabaseSyncConfig.MYSQL_CONF);
Configuration mysqlConfig = Configuration.fromMap(mysqlMap);
DatabaseSync databaseSync = new MysqlDatabaseSync();
syncDatabase(params, databaseSync, mysqlConfig, "MySQL");
syncDatabase(params, databaseSync, mysqlConfig, SourceConnector.MYSQL);
}

private static void createOracleSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("oracle-conf"));
Map<String, String> oracleMap = getConfigMap(params, "oracle-conf");
Preconditions.checkArgument(params.has(DatabaseSyncConfig.ORACLE_CONF));
Map<String, String> oracleMap = getConfigMap(params, DatabaseSyncConfig.ORACLE_CONF);
Configuration oracleConfig = Configuration.fromMap(oracleMap);
DatabaseSync databaseSync = new OracleDatabaseSync();
syncDatabase(params, databaseSync, oracleConfig, "Oracle");
syncDatabase(params, databaseSync, oracleConfig, SourceConnector.ORACLE);
}

private static void createPostgresSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("postgres-conf"));
Map<String, String> postgresMap = getConfigMap(params, "postgres-conf");
Preconditions.checkArgument(params.has(DatabaseSyncConfig.POSTGRES_CONF));
Map<String, String> postgresMap = getConfigMap(params, DatabaseSyncConfig.POSTGRES_CONF);
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new PostgresDatabaseSync();
syncDatabase(params, databaseSync, postgresConfig, "Postgres");
syncDatabase(params, databaseSync, postgresConfig, SourceConnector.POSTGRES);
}

private static void createSqlServerSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("sqlserver-conf"));
Map<String, String> postgresMap = getConfigMap(params, "sqlserver-conf");
Preconditions.checkArgument(params.has(DatabaseSyncConfig.SQLSERVER_CONF));
Map<String, String> postgresMap = getConfigMap(params, DatabaseSyncConfig.SQLSERVER_CONF);
Configuration postgresConfig = Configuration.fromMap(postgresMap);
DatabaseSync databaseSync = new SqlServerDatabaseSync();
syncDatabase(params, databaseSync, postgresConfig, "SqlServer");
syncDatabase(params, databaseSync, postgresConfig, SourceConnector.SQLSERVER);
}

private static void createMongoDBSyncDatabase(String[] opArgs) throws Exception {
MultipleParameterTool params = MultipleParameterTool.fromArgs(opArgs);
Preconditions.checkArgument(params.has("mongodb-conf"));
Map<String, String> mongoMap = getConfigMap(params, "mongodb-conf");
Preconditions.checkArgument(params.has(DatabaseSyncConfig.MONGODB_CONF));
Map<String, String> mongoMap = getConfigMap(params, DatabaseSyncConfig.MONGODB_CONF);
Configuration mongoConfig = Configuration.fromMap(mongoMap);
DatabaseSync databaseSync = new MongoDBDatabaseSync();
syncDatabase(params, databaseSync, mongoConfig, "mongodb");
syncDatabase(params, databaseSync, mongoConfig, SourceConnector.MONGODB);
}

private static void syncDatabase(
MultipleParameterTool params,
DatabaseSync databaseSync,
Configuration config,
String type)
SourceConnector sourceConnector)
throws Exception {
String jobName = params.get("job-name");
String database = params.get("database");
String tablePrefix = params.get("table-prefix");
String tableSuffix = params.get("table-suffix");
String includingTables = params.get("including-tables");
String excludingTables = params.get("excluding-tables");
String multiToOneOrigin = params.get("multi-to-one-origin");
String multiToOneTarget = params.get("multi-to-one-target");
String schemaChangeMode = params.get("schema-change-mode");
boolean createTableOnly = params.has("create-table-only");
boolean ignoreDefaultValue = params.has("ignore-default-value");
boolean ignoreIncompatible = params.has("ignore-incompatible");
boolean singleSink = params.has("single-sink");

Preconditions.checkArgument(params.has("sink-conf"));
Map<String, String> sinkMap = getConfigMap(params, "sink-conf");
Map<String, String> tableMap = getConfigMap(params, "table-conf");
String jobName = params.get(DatabaseSyncConfig.JOB_NAME);
String database = params.get(DatabaseSyncConfig.DATABASE);
String tablePrefix = params.get(DatabaseSyncConfig.TABLE_PREFIX);
String tableSuffix = params.get(DatabaseSyncConfig.TABLE_SUFFIX);
String includingTables = params.get(DatabaseSyncConfig.INCLUDING_TABLES);
String excludingTables = params.get(DatabaseSyncConfig.EXCLUDING_TABLES);
String multiToOneOrigin = params.get(DatabaseSyncConfig.MULTI_TO_ONE_ORIGIN);
String multiToOneTarget = params.get(DatabaseSyncConfig.MULTI_TO_ONE_TARGET);
String schemaChangeMode = params.get(DatabaseSyncConfig.SCHEMA_CHANGE_MODE);
boolean createTableOnly = params.has(DatabaseSyncConfig.CREATE_TABLE_ONLY);
boolean ignoreDefaultValue = params.has(DatabaseSyncConfig.IGNORE_DEFAULT_VALUE);
boolean ignoreIncompatible = params.has(DatabaseSyncConfig.IGNORE_INCOMPATIBLE);
boolean singleSink = params.has(DatabaseSyncConfig.SINGLE_SINK);

Preconditions.checkArgument(params.has(DatabaseSyncConfig.SINK_CONF));
Map<String, String> sinkMap = getConfigMap(params, DatabaseSyncConfig.SINK_CONF);
Map<String, String> tableMap = getConfigMap(params, DatabaseSyncConfig.TABLE_CONF);
Configuration sinkConfig = Configuration.fromMap(sinkMap);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Expand All @@ -165,7 +161,9 @@ private static void syncDatabase(
jobName =
String.format(
"%s-Doris Sync Database: %s",
type, config.getString("database-name", "db"));
sourceConnector.getConnectorName(),
config.getString(
DatabaseSyncConfig.DATABASE_NAME, DatabaseSyncConfig.DB));
}
env.execute(jobName);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.tools.cdc;

public class DatabaseSyncConfig {

public static final String MYSQL_SYNC_DATABASE = "mysql-sync-database";
public static final String ORACLE_SYNC_DATABASE = "oracle-sync-database";
public static final String POSTGRES_SYNC_DATABASE = "postgres-sync-database";
public static final String SQLSERVER_SYNC_DATABASE = "sqlserver-sync-database";
public static final String MONGODB_SYNC_DATABASE = "mongodb-sync-database";

public static final String MYSQL_CONF = "mysql-conf";
public static final String ORACLE_CONF = "oracle-conf";
public static final String POSTGRES_CONF = "postgres-conf";
public static final String SQLSERVER_CONF = "sqlserver-conf";
public static final String MONGODB_CONF = "mongodb-conf";

///////////// source-conf ////////
public static final String DATABASE_NAME = "database-name";
public static final String DB = "db";
public static final String PORT = "port";
public static final String USER = "user";
public static final String PASSWORD = "password";
public static final String TABLE_NAME = "TABLE_NAME";
public static final String REMARKS = "REMARKS";

////////// cdc-conf //////////
// config options of {@link
// org.apache.flink.cdc.connectors.base.options.SourceOptions.SCAN_STARTUP_MODE}
public static final String SCAN_STARTUP_MODE_VALUE_INITIAL = "initial";
public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST_OFFSET = "earliest-offset";
public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSET = "specific-offset";
public static final String SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET = "latest-offset";
public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
public static final String DECIMAL_HANDLING_MODE = "decimal.handling.mode";

////////// sink-conf /////////////
public static final String SINK_CONF = "sink-conf";
public static final String JOB_NAME = "job-name";
public static final String DATABASE = "database";
public static final String TABLE_PREFIX = "table-prefix";
public static final String TABLE_SUFFIX = "table-suffix";
public static final String INCLUDING_TABLES = "including-tables";
public static final String EXCLUDING_TABLES = "excluding-tables";
public static final String MULTI_TO_ONE_ORIGIN = "multi-to-one-origin";
public static final String MULTI_TO_ONE_TARGET = "multi-to-one-target";
public static final String SCHEMA_CHANGE_MODE = "schema-change-mode";
public static final String CREATE_TABLE_ONLY = "create-table-only";
public static final String IGNORE_DEFAULT_VALUE = "ignore-default-value";
public static final String IGNORE_INCOMPATIBLE = "ignore-incompatible";
public static final String SINGLE_SINK = "single-sink";
////////// doris-table-conf //////////
public static final String TABLE_CONF = "table-conf";
public static final String REPLICATION_NUM = "replication_num";
public static final String TABLE_BUCKETS = "table-buckets";

////////// date-converter-conf //////////
public static final String CONVERTERS = "converters";
public static final String DATE = "date";
public static final String DATE_TYPE = "date.type";
public static final String DATE_FORMAT_DATE = "date.format.date";
public static final String DATE_FORMAT_DATETIME = "date.format.datetime";
public static final String DATE_FORMAT_TIMESTAMP = "date.format.timestamp";
public static final String DATE_FORMAT_TIMESTAMP_ZONE = "date.format.timestamp.zone";
public static final String YEAR_MONTH_DAY_FORMAT = "yyyy-MM-dd";
public static final String DATE_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String DATETIME_MICRO_FORMAT = "yyyy-MM-dd HH:mm:ss.SSSSSS";
public static final String TIME_ZONE_SHANGHAI = "Asia/Shanghai";
public static final String TIME_ZONE_UTC_8 = "UTC+8";
public static final String FORMAT_DATE = "format.date";
public static final String FORMAT_TIME = "format.time";
public static final String FORMAT_DATETIME = "format.datetime";
public static final String FORMAT_TIMESTAMP = "format.timestamp";
public static final String FORMAT_TIMESTAMP_ZONE = "format.timestamp.zone";
public static final String UPPERCASE_DATE = "DATE";
public static final String TIME = "TIME";
public static final String DATETIME = "DATETIME";
public static final String TIMESTAMP = "TIMESTAMP";
public static final String SMALLDATETIME = "SMALLDATETIME";
public static final String DATETIME2 = "DATETIME2";
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum SourceConnector {
MYSQL("mysql"),
ORACLE("oracle"),
POSTGRES("postgres"),
SQLSERVER("sqlserver");
SQLSERVER("sqlserver"),
MONGODB("mongodb");

public final String connectorName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.apache.doris.flink.tools.cdc.DatabaseSync;
import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;
import org.apache.doris.flink.tools.cdc.ParsingProcessFunction;
import org.apache.doris.flink.tools.cdc.SourceSchema;
import org.apache.doris.flink.tools.cdc.mongodb.serializer.MongoDBJsonDebeziumSchemaSerializer;
Expand All @@ -61,10 +62,6 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

public class MongoDBDatabaseSync extends DatabaseSync {

private static final String INITIAL_MODE = "initial";
private static final String LATEST_OFFSET_MODE = "latest-offset";
private static final String TIMESTAMP_MODE = "timestamp";
public static final ConfigOption<Double> MONGO_CDC_CREATE_SAMPLE_PERCENT =
ConfigOptions.key("schema.sample-percent")
.doubleType()
Expand Down Expand Up @@ -161,7 +158,7 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {
String password = config.get(MongoDBSourceOptions.PASSWORD);
String database = config.get(MongoDBSourceOptions.DATABASE);
// note: just to unify job name, no other use.
config.setString("database-name", database);
config.setString(DatabaseSyncConfig.DATABASE_NAME, database);
String collection = config.get(MongoDBSourceOptions.COLLECTION);
if (StringUtils.isBlank(collection)) {
collection = config.get(TABLE_NAME);
Expand All @@ -181,13 +178,13 @@ public DataStreamSource<String> buildCdcSource(StreamExecutionEnvironment env) {

String startupMode = config.get(SourceOptions.SCAN_STARTUP_MODE);
switch (startupMode.toLowerCase()) {
case INITIAL_MODE:
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_INITIAL:
mongoDBSourceBuilder.startupOptions(StartupOptions.initial());
break;
case LATEST_OFFSET_MODE:
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_LATEST_OFFSET:
mongoDBSourceBuilder.startupOptions(StartupOptions.latest());
break;
case TIMESTAMP_MODE:
case DatabaseSyncConfig.SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
Comment on lines +181 to +187
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use cdc variables directly here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These parameters of flink-cdc cannot be directly referenced because they are also privately defined.

mongoDBSourceBuilder.startupOptions(
StartupOptions.timestamp(
config.get(SourceOptions.SCAN_STARTUP_TIMESTAMP_MILLIS)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.flink.tools.cdc.mongodb;

import org.apache.doris.flink.tools.cdc.DatabaseSyncConfig;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
Expand All @@ -25,11 +27,12 @@
public class MongoDateConverter {
private static final ThreadLocal<DateTimeFormatter> dateFormatterThreadLocal =
ThreadLocal.withInitial(
() -> DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS"));
() -> DateTimeFormatter.ofPattern(DatabaseSyncConfig.DATETIME_MICRO_FORMAT));

public static String convertTimestampToString(long timestamp) {
Instant instant = Instant.ofEpochMilli(timestamp);
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("Asia/Shanghai"));
LocalDateTime localDateTime =
LocalDateTime.ofInstant(instant, ZoneId.of(DatabaseSyncConfig.TIME_ZONE_SHANGHAI));
return dateFormatterThreadLocal.get().format(localDateTime);
}
}
Loading
Loading