Skip to content

Commit

Permalink
[improve]Improve the clarity and detail of the database synchronizati…
Browse files Browse the repository at this point in the history
…on logs (#467)
  • Loading branch information
vinlee19 authored Aug 14, 2024
1 parent eb90905 commit 63b1290
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ public void build() throws Exception {
DorisSystem dorisSystem = new DorisSystem(options);

List<SourceSchema> schemaList = getSchemaList();
Preconditions.checkState(!schemaList.isEmpty(), "No tables to be synchronized.");
Preconditions.checkState(
!schemaList.isEmpty(),
"No tables to be synchronized. Please make sure whether the tables that need to be synchronized exist in the corresponding database or schema.");

if (!StringUtils.isNullOrWhitespaceOnly(database)
&& !dorisSystem.databaseExists(database)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.doris.flink.tools.cdc;

import org.apache.flink.util.Preconditions;

import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
Expand All @@ -31,6 +35,7 @@
* databases.
*/
public abstract class JdbcSourceSchema extends SourceSchema {
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceSchema.class);

public JdbcSourceSchema(
DatabaseMetaData metaData,
Expand All @@ -48,7 +53,7 @@ public LinkedHashMap<String, FieldSchema> getColumnInfo(
DatabaseMetaData metaData, String databaseName, String schemaName, String tableName)
throws SQLException {
LinkedHashMap<String, FieldSchema> fields = new LinkedHashMap<>();
//
LOG.debug("Starting to get column info for table: {}", tableName);
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
while (rs.next()) {
String fieldName = rs.getString("COLUMN_NAME");
Expand All @@ -63,10 +68,17 @@ public LinkedHashMap<String, FieldSchema> getColumnInfo(
if (rs.wasNull()) {
scale = null;
}
String dorisTypeStr = convertToDorisType(fieldType, precision, scale);
String dorisTypeStr = null;
try {
dorisTypeStr = convertToDorisType(fieldType, precision, scale);
} catch (UnsupportedOperationException e) {
throw new UnsupportedOperationException(e + " in table: " + tableName);
}
fields.put(fieldName, new FieldSchema(fieldName, dorisTypeStr, comment));
}
}
Preconditions.checkArgument(!fields.isEmpty(), "The column info of {} is empty", tableName);
LOG.debug("Successfully retrieved column info for table: {}", tableName);
return fields;
}

Expand Down

0 comments on commit 63b1290

Please sign in to comment.