Skip to content

Commit

Permalink
[step2][Improve]Use flink’s checkstyle to format Sink Connector (#272)
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 authored Dec 15, 2023
1 parent f8b2f53 commit f60bebf
Show file tree
Hide file tree
Showing 95 changed files with 2,350 additions and 1,789 deletions.
75 changes: 30 additions & 45 deletions flink-doris-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ under the License.
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand Down Expand Up @@ -460,38 +459,36 @@ under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>

<!-- TODO Wait until all code is formatted before introducing it.-->
<!-- <plugin>-->
<!-- <groupId>com.diffplug.spotless</groupId>-->
<!-- <artifactId>spotless-maven-plugin</artifactId>-->
<!-- <version>${spotless.version}</version>-->
<!-- <configuration>-->
<!-- <java>-->
<!-- <googleJavaFormat>-->
<!-- <version>1.7</version>-->
<!-- <style>AOSP</style>-->
<!-- </googleJavaFormat>-->

<!-- &lt;!&ndash; \# refers to the static imports &ndash;&gt;-->
<!-- <importOrder>-->
<!-- <order>org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\#-->
<!-- </order>-->
<!-- </importOrder>-->

<!-- <removeUnusedImports/>-->
<!-- </java>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <id>spotless-check</id>-->
<!-- <phase>validate</phase>-->
<!-- <goals>-->
<!-- <goal>check</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotless.version}</version>
<configuration>
<java>
<googleJavaFormat>
<version>1.7</version>
<style>AOSP</style>
</googleJavaFormat>

<!-- \# refers to the static imports -->
<importOrder>
<order>org.apache.flink,org.apache.flink.shaded,,javax,java,scala,\#
</order>
</importOrder>

<removeUnusedImports/>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
Expand Down Expand Up @@ -519,18 +516,6 @@ under the License.
<configLocation>../tools/maven/checkstyle.xml</configLocation>
<logViolationsToConsole>true</logViolationsToConsole>
<failOnViolation>true</failOnViolation>
<!-- TODO It needs to be deleted after all code formatting is completed.-->
<sourceDirectories>
<directory>src/main/java/org/apache/doris/flink/source/</directory>
<directory>src/main/java/org/apache/doris/flink/rest</directory>
<directory>src/main/java/org/apache/doris/flink/serialization</directory>
<directory>src/main/java/org/apache/doris/flink/cfg</directory>
<directory>src/main/java/org/apache/doris/flink/datastream</directory>
<directory>src/main/java/org/apache/doris/flink/backend</directory>
<directory>src/main/java/org/apache/doris/flink/exception</directory>
<directory>src/main/java/org/apache/doris/flink/rest</directory>
<directory>src/main/java/org/apache/doris/flink/serialization</directory>
</sourceDirectories>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.catalog;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.table.DorisDynamicTableFactory;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.AbstractCatalog;
Expand Down Expand Up @@ -54,6 +48,14 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import org.apache.commons.compress.utils.Lists;
import org.apache.doris.flink.catalog.doris.DataModel;
import org.apache.doris.flink.catalog.doris.DorisSystem;
import org.apache.doris.flink.catalog.doris.FieldSchema;
import org.apache.doris.flink.catalog.doris.TableSchema;
import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.doris.flink.table.DorisDynamicTableFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -82,9 +84,7 @@
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* catalog for flink
*/
/** catalog for flink. */
public class DorisCatalog extends AbstractCatalog {

private static final Logger LOG = LoggerFactory.getLogger(DorisCatalog.class);
Expand Down Expand Up @@ -147,28 +147,28 @@ public boolean databaseExists(String databaseName) throws CatalogException {
@Override
public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
if(databaseExists(name)){
if(ignoreIfExists){
if (databaseExists(name)) {
if (ignoreIfExists) {
return;
}
throw new DatabaseAlreadyExistException(getName(), name);
}else {
} else {
dorisSystem.createDatabase(name);
}
}

@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
throws DatabaseNotEmptyException, CatalogException, DatabaseNotExistException {
if(!databaseExists(name)){
if (!databaseExists(name)) {
if (ignoreIfNotExists) {
return;
}
throw new DatabaseNotExistException(getName(), name);
}

if (!cascade && listTables(name).size() > 0) {
throw new DatabaseNotEmptyException(getName(),name);
throw new DatabaseNotEmptyException(getName(), name);
}
dorisSystem.dropDatabase(name);
}
Expand All @@ -185,7 +185,8 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
public List<String> listTables(String databaseName)
throws DatabaseNotExistException, CatalogException {
Preconditions.checkState(
org.apache.commons.lang3.StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
org.apache.commons.lang3.StringUtils.isNotBlank(databaseName),
"Database name must not be blank.");
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
Expand Down Expand Up @@ -220,19 +221,21 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
props.put(PASSWORD.key(), connectionOptions.getPassword());
props.put(TABLE_IDENTIFIER.key(), databaseName + "." + tableName);

String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(),"");
props.put(SINK_LABEL_PREFIX.key(), String.join("_",labelPrefix,databaseName,tableName));
//remove catalog option
String labelPrefix = props.getOrDefault(SINK_LABEL_PREFIX.key(), "");
props.put(SINK_LABEL_PREFIX.key(), String.join("_", labelPrefix, databaseName, tableName));
// remove catalog option
props.remove(DEFAULT_DATABASE.key());
return CatalogTable.of(createTableSchema(databaseName, tableName), null, Lists.newArrayList(), props);

return CatalogTable.of(
createTableSchema(databaseName, tableName), null, Lists.newArrayList(), props);
}

@VisibleForTesting
protected String queryFenodes() {
try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
connectionOptions.getUsername(),
connectionOptions.getPassword())) {
try (Connection conn =
DriverManager.getConnection(
connectionOptions.getJdbcUrl(),
connectionOptions.getUsername(),
connectionOptions.getPassword())) {
StringJoiner fenodes = new StringJoiner(",");
PreparedStatement ps = conn.prepareStatement("SHOW FRONTENDS");
ResultSet resultSet = ps.executeQuery();
Expand All @@ -248,12 +251,16 @@ protected String queryFenodes() {
}

private Schema createTableSchema(String databaseName, String tableName) {
try (Connection conn = DriverManager.getConnection(connectionOptions.getJdbcUrl(),
connectionOptions.getUsername(),
connectionOptions.getPassword())) {
try (Connection conn =
DriverManager.getConnection(
connectionOptions.getJdbcUrl(),
connectionOptions.getUsername(),
connectionOptions.getPassword())) {
PreparedStatement ps =
conn.prepareStatement(
String.format("SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'", databaseName, tableName));
String.format(
"SELECT COLUMN_NAME,DATA_TYPE,COLUMN_SIZE,DECIMAL_DIGITS FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA`= '%s' AND `TABLE_NAME`= '%s'",
databaseName, tableName));

List<String> columnNames = new ArrayList<>();
List<DataType> columnTypes = new ArrayList<>();
Expand All @@ -264,7 +271,9 @@ private Schema createTableSchema(String databaseName, String tableName) {
long columnSize = resultSet.getLong("COLUMN_SIZE");
long columnDigit = resultSet.getLong("DECIMAL_DIGITS");

DataType flinkType = DorisTypeMapper.toFlinkType(columnName, columnType, (int) columnSize, (int) columnDigit);
DataType flinkType =
DorisTypeMapper.toFlinkType(
columnName, columnType, (int) columnSize, (int) columnDigit);
columnNames.add(columnName);
columnTypes.add(flinkType);
}
Expand All @@ -273,7 +282,10 @@ private Schema createTableSchema(String databaseName, String tableName) {
return tableSchema;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting catalog %s database %s table %s", getName(), databaseName, tableName), e);
String.format(
"Failed getting catalog %s database %s table %s",
getName(), databaseName, tableName),
e);
}
}

Expand All @@ -290,14 +302,15 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException {
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
if(!tableExists(tablePath)){
if(ignoreIfNotExists){
if (!tableExists(tablePath)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(getName(), tablePath);
}

dorisSystem.dropTable(String.format("%s.%s", tablePath.getDatabaseName(), tablePath.getObjectName()));
dorisSystem.dropTable(
String.format("%s.%s", tablePath.getDatabaseName(), tablePath.getObjectName()));
}

@Override
Expand All @@ -312,11 +325,11 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
checkNotNull(tablePath, "tablePath cannot be null");
checkNotNull(table, "table cannot be null");

if(!databaseExists(tablePath.getDatabaseName())) {
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if(tableExists(tablePath)){
if(ignoreIfExists){
if (tableExists(tablePath)) {
if (ignoreIfExists) {
return;
}
throw new TableAlreadyExistException(getName(), tablePath);
Expand All @@ -341,22 +354,22 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
dorisSystem.createTable(schema);
}

public List<String> getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema){
Preconditions.checkState(schema.getPrimaryKey().isPresent(),"primary key cannot be null");
public List<String> getCreateDorisKeys(org.apache.flink.table.api.TableSchema schema) {
Preconditions.checkState(schema.getPrimaryKey().isPresent(), "primary key cannot be null");
return schema.getPrimaryKey().get().getColumns();
}

public Map<String, FieldSchema> getCreateDorisColumns(org.apache.flink.table.api.TableSchema schema){
public Map<String, FieldSchema> getCreateDorisColumns(
org.apache.flink.table.api.TableSchema schema) {
String[] fieldNames = schema.getFieldNames();
DataType[] fieldTypes = schema.getFieldDataTypes();

Map<String, FieldSchema> fields = new LinkedHashMap<>();
for (int i = 0; i < fieldNames.length; i++) {
fields.put(fieldNames[i],
fields.put(
fieldNames[i],
new FieldSchema(
fieldNames[i],
DorisTypeMapper.toDorisType(fieldTypes[i]),
null));
fieldNames[i], DorisTypeMapper.toDorisType(fieldTypes[i]), null));
}
return fields;
}
Expand All @@ -380,7 +393,7 @@ public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
public List<CatalogPartitionSpec> listPartitions(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
PartitionSpecInvalidException, CatalogException {
return Collections.emptyList();
}

Expand Down Expand Up @@ -410,8 +423,8 @@ public void createPartition(
CatalogPartition partition,
boolean ignoreIfExists)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
PartitionSpecInvalidException, PartitionAlreadyExistsException,
CatalogException {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.catalog;

import org.apache.doris.flink.cfg.DorisConnectionOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;

import org.apache.doris.flink.cfg.DorisConnectionOptions;

import java.util.HashSet;
import java.util.Set;

import static org.apache.doris.flink.catalog.DorisCatalogOptions.DEFAULT_DATABASE;
import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_BATCH_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
Expand All @@ -41,6 +42,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.DORIS_TABLET_SIZE;
import static org.apache.doris.flink.table.DorisConfigOptions.FENODES;
import static org.apache.doris.flink.table.DorisConfigOptions.IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.JDBC_URL;
import static org.apache.doris.flink.table.DorisConfigOptions.PASSWORD;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_COUNT;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_BUFFER_SIZE;
Expand All @@ -56,9 +58,7 @@
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
import static org.apache.doris.flink.table.DorisConfigOptions.USERNAME;

/**
* Factory for {@link DorisCatalog}.
*/
/** Factory for {@link DorisCatalog}. */
public class DorisCatalogFactory implements CatalogFactory {

@Override
Expand Down
Loading

0 comments on commit f60bebf

Please sign in to comment.