From 093a91f1bb134aa653a5bea146b7cb4045a7dc9e Mon Sep 17 00:00:00 2001 From: Petrichor <31833513+vinlee19@users.noreply.github.com> Date: Wed, 14 Aug 2024 10:18:50 +0800 Subject: [PATCH] [improve]Improve the clarity and detail of the database synchronization logs (#467) (cherry picked from commit 63b1290cec9d4f60f4e1490e8c00afde9de3bea7) --- .../doris/flink/tools/cdc/DatabaseSync.java | 4 +++- .../doris/flink/tools/cdc/JdbcSourceSchema.java | 16 ++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java index 2f672adb2..5ae44da66 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/DatabaseSync.java @@ -118,7 +118,9 @@ public void build() throws Exception { DorisSystem dorisSystem = new DorisSystem(options); List schemaList = getSchemaList(); - Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized."); + Preconditions.checkState( + !schemaList.isEmpty(), + "No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema."); if (!StringUtils.isNullOrWhitespaceOnly(database) && !dorisSystem.databaseExists(database)) { diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java index b421affbf..31cfd1cbf 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/tools/cdc/JdbcSourceSchema.java @@ -17,7 +17,11 @@ package org.apache.doris.flink.tools.cdc; +import org.apache.flink.util.Preconditions; + import org.apache.doris.flink.catalog.doris.FieldSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.DatabaseMetaData; import java.sql.ResultSet; @@ -31,6 +35,7 @@ * databases. */ public abstract class JdbcSourceSchema extends SourceSchema { + private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSchema.class); public JdbcSourceSchema( DatabaseMetaData metaData, @@ -48,7 +53,7 @@ public LinkedHashMap getColumnInfo( DatabaseMetaData metaData, String databaseName, String schemaName, String tableName) throws SQLException { LinkedHashMap fields = new LinkedHashMap<>(); - // + LOG.debug("Starting to get column info for table: {}", tableName); try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) { while (rs.next()) { String fieldName = rs.getString("COLUMN_NAME"); @@ -63,10 +68,17 @@ public LinkedHashMap getColumnInfo( if (rs.wasNull()) { scale = null; } - String dorisTypeStr = convertToDorisType(fieldType, precision, scale); + String dorisTypeStr = null; + try { + dorisTypeStr = convertToDorisType(fieldType, precision, scale); + } catch (UnsupportedOperationException e) { + throw new UnsupportedOperationException(e + " in table: " + tableName); + } fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment)); } } + Preconditions.checkArgument(!fields.isEmpty(), "The column info of {} is empty", tableName); + LOG.debug("Successfully retrieved column info for table: {}", tableName); return fields; }