Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[2.1][improvement](jdbc catalog) Add catalog property to enable jdbc connection pool" #42481

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,21 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc)
_connection_pool_max_size(tdesc.jdbcTable.connection_pool_max_size),
_connection_pool_max_wait_time(tdesc.jdbcTable.connection_pool_max_wait_time),
_connection_pool_max_life_time(tdesc.jdbcTable.connection_pool_max_life_time),
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive),
_enable_connection_pool(tdesc.jdbcTable.enable_connection_pool) {}
_connection_pool_keep_alive(tdesc.jdbcTable.connection_pool_keep_alive) {}

std::string JdbcTableDescriptor::debug_string() const {
fmt::memory_buffer buf;
fmt::format_to(
buf,
"JDBCTable({} ,_jdbc_catalog_id = {}, _jdbc_resource_name={} ,_jdbc_driver_url={} "
",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} "
",_enable_connection_pool={},_connection_pool_min_size={} "
",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_connection_pool_min_size={} "
",_connection_pool_max_size={} ,_connection_pool_max_wait_time={} "
",_connection_pool_max_life_time={} ,_connection_pool_keep_alive={})",
TableDescriptor::debug_string(), _jdbc_catalog_id, _jdbc_resource_name,
_jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url,
_jdbc_table_name, _jdbc_user, _jdbc_passwd, _enable_connection_pool,
_connection_pool_min_size, _connection_pool_max_size, _connection_pool_max_wait_time,
_jdbc_table_name, _jdbc_user, _jdbc_passwd, _connection_pool_min_size,
_connection_pool_max_size, _connection_pool_max_wait_time,
_connection_pool_max_life_time, _connection_pool_keep_alive);
return fmt::to_string(buf);
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,6 @@ class JdbcTableDescriptor : public TableDescriptor {
int32_t connection_pool_max_wait_time() const { return _connection_pool_max_wait_time; }
int32_t connection_pool_max_life_time() const { return _connection_pool_max_life_time; }
bool connection_pool_keep_alive() const { return _connection_pool_keep_alive; }
bool enable_connection_pool() const { return _enable_connection_pool; }

private:
int64_t _jdbc_catalog_id;
Expand All @@ -322,7 +321,6 @@ class JdbcTableDescriptor : public TableDescriptor {
int32_t _connection_pool_max_wait_time;
int32_t _connection_pool_max_life_time;
bool _connection_pool_keep_alive;
bool _enable_connection_pool;
};

class TupleDescriptor {
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/scan/new_jdbc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con
_jdbc_param.connection_pool_max_life_time = jdbc_table->connection_pool_max_life_time();
_jdbc_param.connection_pool_max_wait_time = jdbc_table->connection_pool_max_wait_time();
_jdbc_param.connection_pool_keep_alive = jdbc_table->connection_pool_keep_alive();
_jdbc_param.enable_connection_pool = jdbc_table->enable_connection_pool();

if (get_parent() != nullptr) {
get_parent()->_scanner_profile->add_info_string("JdbcDriverClass",
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/vjdbc_connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
}
ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE);
ctor_params.__set_table_type(_conn_param.table_type);
ctor_params.__set_enable_connection_pool(_conn_param.enable_connection_pool);
ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size);
ctor_params.__set_connection_pool_max_size(_conn_param.connection_pool_max_size);
ctor_params.__set_connection_pool_max_wait_time(_conn_param.connection_pool_max_wait_time);
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/exec/vjdbc_connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ struct JdbcConnectorParam {
int32_t connection_pool_max_wait_time = -1;
int32_t connection_pool_max_life_time = -1;
bool connection_pool_keep_alive = false;
bool enable_connection_pool;

const TupleDescriptor* tuple_desc = nullptr;
};
Expand Down
1 change: 0 additions & 1 deletion be/src/vec/sink/writer/vjdbc_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink
jdbc_param.connection_pool_max_wait_time = t_jdbc_sink.jdbc_table.connection_pool_max_wait_time;
jdbc_param.connection_pool_max_life_time = t_jdbc_sink.jdbc_table.connection_pool_max_life_time;
jdbc_param.connection_pool_keep_alive = t_jdbc_sink.jdbc_table.connection_pool_keep_alive;
jdbc_param.enable_connection_pool = t_jdbc_sink.jdbc_table.enable_connection_pool;

return jdbc_param;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Date;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -51,7 +50,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Function;

public abstract class BaseJdbcExecutor implements JdbcExecutor {
Expand Down Expand Up @@ -93,8 +91,7 @@ public BaseJdbcExecutor(byte[] thriftParams) throws Exception {
.setConnectionPoolMaxSize(request.connection_pool_max_size)
.setConnectionPoolMaxWaitTime(request.connection_pool_max_wait_time)
.setConnectionPoolMaxLifeTime(request.connection_pool_max_life_time)
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive)
.setEnableConnectionPool(request.enable_connection_pool);
.setConnectionPoolKeepAlive(request.connection_pool_keep_alive);
JdbcDataSource.getDataSource().setCleanupInterval(request.connection_pool_cache_clear_time);
System.setProperty("com.zaxxer.hikari.useWeakReferences", "true");
init(config, request.statement);
Expand All @@ -119,12 +116,10 @@ public void close() throws Exception {
}
} finally {
closeResources(resultSet, stmt, conn);
if (config.isEnableConnectionPool()) {
if (config.getConnectionPoolMinSize() == 0 && hikariDataSource != null) {
hikariDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
hikariDataSource = null;
}
if (config.getConnectionPoolMinSize() == 0 && hikariDataSource != null) {
hikariDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
hikariDataSource = null;
}
}
}
Expand All @@ -146,12 +141,10 @@ protected void abortReadConnection(Connection connection, ResultSet resultSet)
}

public void cleanDataSource() {
if (config.isEnableConnectionPool()) {
if (hikariDataSource != null) {
hikariDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
hikariDataSource = null;
}
if (hikariDataSource != null) {
hikariDataSource.close();
JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey());
hikariDataSource = null;
}
}

Expand Down Expand Up @@ -293,64 +286,51 @@ public boolean hasNext() throws JdbcExecutorException {

private void init(JdbcDataSourceConfig config, String sql) throws JdbcExecutorException {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
String hikariDataSourceKey = config.createCacheKey();
try {
ClassLoader parent = getClass().getClassLoader();
ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent);
Thread.currentThread().setContextClassLoader(classLoader);
if (config.isEnableConnectionPool()) {
String hikariDataSourceKey = config.createCacheKey();
hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
if (hikariDataSource == null) {
synchronized (hikariDataSourceLock) {
hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
if (hikariDataSource == null) {
long start = System.currentTimeMillis();
HikariDataSource ds = new HikariDataSource();
ds.setDriverClassName(config.getJdbcDriverClass());
ds.setJdbcUrl(config.getJdbcUrl());
ds.setUsername(config.getJdbcUser());
ds.setPassword(config.getJdbcPassword());
ds.setMinimumIdle(config.getConnectionPoolMinSize()); // default 1
ds.setMaximumPoolSize(config.getConnectionPoolMaxSize()); // default 10
ds.setConnectionTimeout(config.getConnectionPoolMaxWaitTime()); // default 5000
ds.setMaxLifetime(config.getConnectionPoolMaxLifeTime()); // default 30 min
ds.setIdleTimeout(config.getConnectionPoolMaxLifeTime() / 2L); // default 15 min
setValidationQuery(ds);
if (config.isConnectionPoolKeepAlive()) {
ds.setKeepaliveTime(config.getConnectionPoolMaxLifeTime() / 5L); // default 6 min
}
hikariDataSource = ds;
JdbcDataSource.getDataSource().putSource(hikariDataSourceKey, hikariDataSource);
LOG.info("JdbcClient set"
+ " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize()
+ ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize()
+ ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime()
+ ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime()
+ ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive());
LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
System.currentTimeMillis() - start) + " ms");
hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
if (hikariDataSource == null) {
synchronized (hikariDataSourceLock) {
hikariDataSource = JdbcDataSource.getDataSource().getSource(hikariDataSourceKey);
if (hikariDataSource == null) {
long start = System.currentTimeMillis();
HikariDataSource ds = new HikariDataSource();
ds.setDriverClassName(config.getJdbcDriverClass());
ds.setJdbcUrl(config.getJdbcUrl());
ds.setUsername(config.getJdbcUser());
ds.setPassword(config.getJdbcPassword());
ds.setMinimumIdle(config.getConnectionPoolMinSize()); // default 1
ds.setMaximumPoolSize(config.getConnectionPoolMaxSize()); // default 10
ds.setConnectionTimeout(config.getConnectionPoolMaxWaitTime()); // default 5000
ds.setMaxLifetime(config.getConnectionPoolMaxLifeTime()); // default 30 min
ds.setIdleTimeout(config.getConnectionPoolMaxLifeTime() / 2L); // default 15 min
setValidationQuery(ds);
if (config.isConnectionPoolKeepAlive()) {
ds.setKeepaliveTime(config.getConnectionPoolMaxLifeTime() / 5L); // default 6 min
}
hikariDataSource = ds;
JdbcDataSource.getDataSource().putSource(hikariDataSourceKey, hikariDataSource);
LOG.info("JdbcClient set"
+ " ConnectionPoolMinSize = " + config.getConnectionPoolMinSize()
+ ", ConnectionPoolMaxSize = " + config.getConnectionPoolMaxSize()
+ ", ConnectionPoolMaxWaitTime = " + config.getConnectionPoolMaxWaitTime()
+ ", ConnectionPoolMaxLifeTime = " + config.getConnectionPoolMaxLifeTime()
+ ", ConnectionPoolKeepAlive = " + config.isConnectionPoolKeepAlive());
LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
System.currentTimeMillis() - start) + " ms");
}
}
conn = hikariDataSource.getConnection();
} else {
Class<?> driverClass = Class.forName(config.getJdbcDriverClass(), true, classLoader);
Driver driverInstance = (Driver) driverClass.getDeclaredConstructor().newInstance();

Properties info = new Properties();
info.put("user", config.getJdbcUser());
info.put("password", config.getJdbcPassword());

conn = driverInstance.connect(config.getJdbcUrl(), info);
if (conn == null) {
throw new SQLException("Failed to establish a connection. The JDBC driver returned null. "
+ "Please check if the JDBC URL is correct: "
+ config.getJdbcUrl()
+ ". Ensure that the URL format and parameters are valid for the driver: "
+ driverInstance.getClass().getName());
}
}

long start = System.currentTimeMillis();
conn = hikariDataSource.getConnection();
LOG.info("get connection [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + (
System.currentTimeMillis() - start)
+ " ms");

initializeStatement(conn, config, sql);

} catch (MalformedURLException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class JdbcDataSourceConfig {
private int connectionPoolMaxWaitTime = 5000;
private int connectionPoolMaxLifeTime = 1800000;
private boolean connectionPoolKeepAlive = false;
private boolean enableConnectionPool = false;

public String createCacheKey() {
return catalogId + jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass
Expand Down Expand Up @@ -168,13 +167,4 @@ public JdbcDataSourceConfig setConnectionPoolKeepAlive(boolean connectionPoolKee
this.connectionPoolKeepAlive = connectionPoolKeepAlive;
return this;
}

public boolean isEnableConnectionPool() {
return enableConnectionPool;
}

public JdbcDataSourceConfig setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ public class JdbcResource extends Resource {
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";
public static final String ENABLE_CONNECTION_POOL = "enable_connection_pool";

private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
Expand All @@ -128,8 +127,7 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION,
ExternalCatalog.USE_META_CACHE,
ENABLE_CONNECTION_POOL
ExternalCatalog.USE_META_CACHE
).build();

// The default value of optional properties
Expand All @@ -150,7 +148,6 @@ public class JdbcResource extends Resource {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ExternalCatalog.USE_META_CACHE,
String.valueOf(ExternalCatalog.DEFAULT_USE_META_CACHE));
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ENABLE_CONNECTION_POOL, "false");
}

// timeout for both connection and read. 10 seconds is long enough.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public class JdbcTable extends Table {

private long catalogId = -1;

private boolean enableConnectionPool;
private int connectionPoolMinSize;
private int connectionPoolMaxSize;
private int connectionPoolMaxWaitTime;
Expand Down Expand Up @@ -178,11 +177,6 @@ public long getCatalogId() {
return catalogId;
}

public boolean isEnableConnectionPool() {
return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.ENABLE_CONNECTION_POOL,
String.valueOf(enableConnectionPool)));
}

public int getConnectionPoolMinSize() {
return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.CONNECTION_POOL_MIN_SIZE,
String.valueOf(connectionPoolMinSize)));
Expand Down Expand Up @@ -231,7 +225,6 @@ public TTableDescriptor toThrift() {
tJdbcTable.setJdbcDriverUrl(getDriverUrl());
tJdbcTable.setJdbcResourceName(resourceName);
tJdbcTable.setJdbcDriverChecksum(checkSum);
tJdbcTable.setEnableConnectionPool(isEnableConnectionPool());
tJdbcTable.setConnectionPoolMinSize(getConnectionPoolMinSize());
tJdbcTable.setConnectionPoolMaxSize(getConnectionPoolMaxSize());
tJdbcTable.setConnectionPoolMaxWaitTime(getConnectionPoolMaxWaitTime());
Expand Down Expand Up @@ -418,7 +411,6 @@ private void validate(Map<String, String> properties) throws DdlException {
driverClass = jdbcResource.getProperty(DRIVER_CLASS);
driverUrl = jdbcResource.getProperty(DRIVER_URL);
checkSum = jdbcResource.getProperty(CHECK_SUM);
enableConnectionPool = Boolean.parseBoolean(jdbcResource.getProperty(JdbcResource.ENABLE_CONNECTION_POOL));
connectionPoolMinSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MIN_SIZE));
connectionPoolMaxSize = Integer.parseInt(jdbcResource.getProperty(JdbcResource.CONNECTION_POOL_MAX_SIZE));
connectionPoolMaxWaitTime = Integer.parseInt(
Expand Down
Loading
Loading