diff --git a/flink-connector-jdbc-core/pom.xml b/flink-connector-jdbc-core/pom.xml
index a7579845d..851c1e07f 100644
--- a/flink-connector-jdbc-core/pom.xml
+++ b/flink-connector-jdbc-core/pom.xml
@@ -158,6 +158,14 @@ under the License.
test
+
+
+ org.testcontainers
+ jdbc
+ test
+
+
+
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
index b8a7439d0..5c9e75cec 100644
--- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/derby/testutils/DerbyDatabase.java
@@ -20,6 +20,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
import org.apache.flink.util.FlinkRuntimeException;
import java.io.OutputStream;
@@ -36,39 +38,44 @@ public class DerbyDatabase extends DatabaseExtension {
public void write(int b) {}
};
- private static DerbyMetadata metadata;
+ private static final DerbyMetadata metadata = new DerbyMetadata("test");
public static DerbyMetadata getMetadata() {
- if (metadata == null) {
- metadata = new DerbyMetadata("test");
- }
return metadata;
}
@Override
- public DatabaseMetadata startDatabase() throws Exception {
- DatabaseMetadata metadata = getMetadata();
- try {
- System.setProperty(
- "derby.stream.error.field",
- DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
- Class.forName(metadata.getDriverClass());
- DriverManager.getConnection(String.format("%s;create=true", metadata.getJdbcUrl()))
- .close();
- } catch (Exception e) {
- throw new FlinkRuntimeException(e);
- }
- return metadata;
+ protected DatabaseMetadata getMetadataDB() {
+ return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- try {
- DriverManager.getConnection(String.format("%s;shutdown=true", metadata.getJdbcUrl()))
- .close();
- } catch (SQLException ignored) {
- } finally {
- metadata = null;
- }
+ protected DatabaseResource getResource() {
+ return new MemoryResource() {
+ @Override
+ public void start() {
+ try {
+ System.setProperty(
+ "derby.stream.error.field",
+ DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
+ Class.forName(metadata.getDriverClass());
+ DriverManager.getConnection(
+ String.format("%s;create=true", metadata.getJdbcUrl()))
+ .close();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ DriverManager.getConnection(
+ String.format("%s;shutdown=true", metadata.getJdbcUrl()))
+ .close();
+ } catch (SQLException ignored) {
+ }
+ }
+ };
}
}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
index edca8ad09..6c37f69e7 100644
--- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/h2/testutils/H2XaDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
import org.apache.flink.util.FlinkRuntimeException;
import java.sql.DriverManager;
@@ -26,31 +28,36 @@
/** H2 database for testing. */
public class H2XaDatabase extends DatabaseExtension {
- private static H2Metadata metadata;
+ private static final H2Metadata metadata = new H2Metadata("test");
public static H2Metadata getMetadata() {
- if (metadata == null) {
- metadata = new H2Metadata("test");
- }
return metadata;
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- DatabaseMetadata metadata = getMetadata();
- try {
- Class.forName(metadata.getDriverClass());
- DriverManager.getConnection(
- String.format(
- "%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s",
- metadata.getJdbcUrl(), "test", "test"))
- .close();
- } catch (Exception e) {
- throw new FlinkRuntimeException(e);
- }
- return metadata;
+ protected DatabaseMetadata getMetadataDB() {
+ return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {}
+ protected DatabaseResource getResource() {
+ return new MemoryResource() {
+ @Override
+ public void start() {
+ try {
+ Class.forName(metadata.getDriverClass());
+ DriverManager.getConnection(
+ String.format(
+ "%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s",
+ metadata.getJdbcUrl(), "test", "test"))
+ .close();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {}
+ };
+ }
}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
index 3cc73559f..718b4f4f6 100644
--- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseExtension.java
@@ -35,42 +35,21 @@
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static org.junit.platform.commons.support.AnnotationSupport.findRepeatableAnnotations;
/** Database extension for testing. */
public abstract class DatabaseExtension
- implements BeforeAllCallback,
- AfterAllCallback,
- BeforeEachCallback,
- AfterEachCallback,
- ExtensionContext.Store.CloseableResource {
-
- /**
- * Database Lifecycle for testing. The goal it's that all database containers are create only
- * one time.
- */
- public enum Lifecycle {
- /** Database will be instantiated only one time. */
- PER_EXECUTION,
- /** Database will be instantiated by class. */
- PER_CLASS
- }
+ implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {
- protected abstract DatabaseMetadata startDatabase() throws Exception;
+ protected abstract DatabaseMetadata getMetadataDB();
- protected abstract void stopDatabase() throws Exception;
+ protected abstract DatabaseResource getResource();
private final String uniqueKey = this.getClass().getSimpleName();
-
- protected Lifecycle getLifecycle() {
- return Lifecycle.PER_EXECUTION;
- }
-
- private ExtensionContext.Store getStore(ExtensionContext context) {
- return context.getRoot().getStore(Namespace.GLOBAL);
- }
+ private final String uniqueResource = String.format("%sResource", uniqueKey);
private DatabaseTest getDatabaseBaseTest(Class> clazz) throws Exception {
DatabaseTest dbClazz = null;
@@ -95,8 +74,7 @@ private void getManagedTables(
.filter(DatabaseTest.class::isAssignableFrom)
.ifPresent(
clazz -> {
- DatabaseMetadata metadata =
- getStore(context).get(uniqueKey, DatabaseMetadata.class);
+ DatabaseMetadata metadata = getMetadataDB();
if (metadata != null) {
try (Connection conn = metadata.getConnection()) {
for (TableManaged table :
@@ -132,15 +110,19 @@ private boolean ignoreTestDatabase(ExtensionContext context) {
return false;
}
+ private DatabaseResource getResource(ExtensionContext context) {
+ return context.getRoot()
+ .getStore(Namespace.GLOBAL)
+ .getOrComputeIfAbsent(uniqueResource, startResource(), DatabaseResource.class);
+ }
+
@Override
public final void beforeAll(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}
- if (getStore(context).get(uniqueKey) == null) {
- getStore(context).put(uniqueKey, startDatabase());
- }
+ getResource(context);
getManagedTables(context, TableManaged::createTable);
}
@@ -153,6 +135,7 @@ public final void afterEach(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}
+
getManagedTables(context, TableManaged::deleteTable);
}
@@ -161,18 +144,9 @@ public final void afterAll(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}
- getManagedTables(context, TableManaged::dropTable);
- if (Lifecycle.PER_CLASS == getLifecycle()) {
- stopDatabase();
- getStore(context).remove(uniqueKey, DatabaseMetadata.class);
- }
- }
- @Override
- public final void close() throws Throwable {
- if (Lifecycle.PER_EXECUTION == getLifecycle()) {
- stopDatabase();
- }
+ getManagedTables(context, TableManaged::dropTable);
+ getResource(context);
}
private Set retrieveDatabaseExtensions(final ExtensionContext context) {
@@ -199,4 +173,12 @@ public Set apply(ExtensionContext context, Set acc) {
return retrieveExtensions.apply(context, new HashSet<>());
}
+
+ private Function startResource() {
+ return s -> {
+ DatabaseResource resource = getResource();
+ resource.start();
+ return resource;
+ };
+ }
}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java
new file mode 100644
index 000000000..07e95617f
--- /dev/null
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/DatabaseResource.java
@@ -0,0 +1,15 @@
+package org.apache.flink.connector.jdbc.testutils;
+
+import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;
+
+/** Database resource for testing. */
+public interface DatabaseResource extends CloseableResource {
+
+ void start();
+
+ void stop();
+
+ default void close() throws Throwable {
+ stop();
+ }
+}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
new file mode 100644
index 000000000..06f50d37a
--- /dev/null
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
@@ -0,0 +1,58 @@
+package org.apache.flink.connector.jdbc.testutils.resources;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+
+import com.github.dockerjava.api.DockerClient;
+import org.testcontainers.DockerClientFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+
+import java.util.Arrays;
+
+/** Docker based database resource. */
+public class DockerResource implements DatabaseResource {
+
+ private final JdbcDatabaseContainer> container;
+
+ public DockerResource(JdbcDatabaseContainer> container) {
+ this.container = container;
+ }
+
+ @Override
+ public void start() {
+ this.container.start();
+ }
+
+ @Override
+ public void stop() {
+ this.container.stop();
+ }
+
+ @Override
+ public void close() throws Throwable {
+ stop();
+ cleanContainers(container);
+ }
+
+ public static void cleanContainers(GenericContainer> container) {
+ try {
+ DockerClient client = DockerClientFactory.instance().client();
+ // client.removeImageCmd(container.getDockerImageName()).exec();
+ client.listImagesCmd().exec().stream()
+ .filter(
+ image ->
+ Arrays.stream(image.getRepoTags())
+ .anyMatch(
+ tag ->
+ !tag.contains("testcontainers/ryuk")
+ && !tag.contains(
+ container
+ .getDockerImageName())))
+ .forEach(image -> client.removeImageCmd(image.getId()).exec());
+
+ } catch (Exception ignore) {
+ System.out.println("ERROR:");
+ ignore.printStackTrace();
+ }
+ }
+}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java
new file mode 100644
index 000000000..2404bf445
--- /dev/null
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/MemoryResource.java
@@ -0,0 +1,6 @@
+package org.apache.flink.connector.jdbc.testutils.resources;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+
+/** Memory based database resource. */
+public interface MemoryResource extends DatabaseResource {}
diff --git a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
index 50d3ae4c0..08bf113f4 100644
--- a/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
+++ b/flink-connector-jdbc-cratedb/src/test/java/org/apache/flink/connector/jdbc/cratedb/testutils/CrateDBDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.JdbcDatabaseContainer;
@@ -66,15 +68,13 @@ public static CrateDBMetadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
/**
diff --git a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
index 24e758586..930dbdb11 100644
--- a/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
+++ b/flink-connector-jdbc-db2/src/test/java/org/apache/flink/connector/jdbc/db2/testutils/Db2Database.java
@@ -20,6 +20,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
@@ -56,14 +58,12 @@ public static Db2Metadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
}
diff --git a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
index 99195063f..0b5e7908f 100644
--- a/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
+++ b/flink-connector-jdbc-mysql/src/test/java/org/apache/flink/connector/jdbc/mysql/testutils/MySqlDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.MySQLContainer;
@@ -46,14 +48,12 @@ public static MySqlMetadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
}
diff --git a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
index 62279748a..c07ad5c2d 100644
--- a/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
+++ b/flink-connector-jdbc-oceanbase/src/test/java/org/apache/flink/connector/jdbc/oceanbase/testutils/OceanBaseDatabase.java
@@ -20,6 +20,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
@@ -28,6 +30,7 @@
import org.testcontainers.oceanbase.OceanBaseCEContainer;
import java.sql.Connection;
+import java.sql.SQLException;
import java.sql.Statement;
/** OceanBase database for testing. */
@@ -56,18 +59,23 @@ public static OceanBaseMetadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
- try (Connection connection = getMetadata().getConnection();
- Statement statement = connection.createStatement()) {
- statement.execute("SET GLOBAL time_zone = '+00:00'");
- }
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER) {
+ @Override
+ public void start() {
+ super.start();
+ try (Connection connection = getMetadata().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("SET GLOBAL time_zone = '+00:00'");
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+ };
}
}
diff --git a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
index 371972921..c1a4e4f94 100644
--- a/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
+++ b/flink-connector-jdbc-oracle/src/test/java/org/apache/flink/connector/jdbc/oracle/testutils/OracleDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.OracleContainer;
@@ -45,14 +47,12 @@ public static OracleMetadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
}
diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
index 4282d0b6c..8b88ffb8c 100644
--- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
+++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/table/PostgresDynamicTableSinkITCase.java
@@ -24,4 +24,16 @@
/** The Table Sink ITCase for {@link PostgresDialect}. */
public class PostgresDynamicTableSinkITCase extends JdbcDynamicTableSinkITCase
- implements PostgresTestBase {}
+ implements PostgresTestBase {
+
+ // @Test
+ // void testReal1(DatabaseResource resource) throws Exception {
+ //
+ // assertThat(true).isTrue();
+ // }
+ //
+ // @Test
+ // void testReal2(DatabaseResource resource) throws Exception {
+ // assertThat(true).isTrue();
+ // }
+}
diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
index 5b1700bdd..0b871ec41 100644
--- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
+++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.PostgreSQLContainer;
@@ -44,16 +46,13 @@ public static PostgresMetadata getMetadata() {
return metadata;
}
- @Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
/** {@link PostgreSQLContainer} with XA enabled (by setting max_prepared_transactions). */
diff --git a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
index 219f49611..2d083e051 100644
--- a/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
+++ b/flink-connector-jdbc-postgres/src/test/java/org/apache/flink/connector/jdbc/postgres/testutils/PostgresMetadata.java
@@ -20,6 +20,7 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.postgresql.xa.PGXADataSource;
+import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.PostgreSQLContainer;
import javax.sql.XADataSource;
@@ -38,7 +39,7 @@ public PostgresMetadata(PostgreSQLContainer> container) {
this(container, false);
}
- public PostgresMetadata(PostgreSQLContainer> container, boolean hasXaEnabled) {
+ public PostgresMetadata(JdbcDatabaseContainer> container, boolean hasXaEnabled) {
this.username = container.getUsername();
this.password = container.getPassword();
this.url = container.getJdbcUrl();
diff --git a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
index 7629fba53..ddb9fbd18 100644
--- a/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
+++ b/flink-connector-jdbc-sqlserver/src/test/java/org/apache/flink/connector/jdbc/sqlserver/testutils/SqlServerDatabase.java
@@ -19,6 +19,8 @@
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
@@ -48,15 +50,13 @@ public static SqlServerMetadata getMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
}
/** {@link MSSQLServerContainer} with Xa. */
diff --git a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
index ba65a189d..f56d3d487 100644
--- a/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
+++ b/flink-connector-jdbc-trino/src/test/java/org/apache/flink/connector/jdbc/trino/testutils/TrinoDatabase.java
@@ -22,6 +22,8 @@
import org.apache.flink.connector.jdbc.postgres.testutils.PostgresMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.testcontainers.containers.BindMode;
@@ -33,6 +35,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
/** A Trino database for testing. */
public class TrinoDatabase extends DatabaseExtension implements TrinoImages, PostgresImages {
@@ -75,34 +78,52 @@ public static PostgresMetadata getDatabaseMetadata() {
}
@Override
- protected DatabaseMetadata startDatabase() throws Exception {
- CONTAINER_DB.start();
-
- Path tempFile = Files.createTempFile(null, null);
- String postgresContent =
- "connector.name=postgresql\n"
- + String.format(
- "connection-url=jdbc:postgresql://%s:%s/test\n",
- CONTAINER_DB_ALIAS, CONTAINER_DB_PORT)
- + String.format("connection-user=%s\n", CONTAINER_DB.getUsername())
- + String.format("connection-password=%s\n", CONTAINER_DB.getPassword());
- Files.write(tempFile, postgresContent.getBytes(StandardCharsets.UTF_8));
-
- CONTAINER
- .withDatabaseName("postgres/public")
- .withFileSystemBind(
- tempFile.toFile().getAbsolutePath(),
- "/etc/trino/catalog/postgres.properties",
- BindMode.READ_WRITE)
- .waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200));
- CONTAINER.start();
+ protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}
@Override
- protected void stopDatabase() throws Exception {
- CONTAINER.stop();
- CONTAINER_DB.stop();
- metadata = null;
+ protected DatabaseResource getResource() {
+ return new DatabaseResource() {
+ @Override
+ public void start() {
+ try {
+ CONTAINER_DB.start();
+
+ Path tempFile = Files.createTempFile(null, null);
+ String postgresContent =
+ "connector.name=postgresql\n"
+ + String.format(
+ "connection-url=jdbc:postgresql://%s:%s/test\n",
+ CONTAINER_DB_ALIAS, CONTAINER_DB_PORT)
+ + String.format(
+ "connection-user=%s\n", CONTAINER_DB.getUsername())
+ + String.format(
+ "connection-password=%s\n", CONTAINER_DB.getPassword());
+ Files.write(tempFile, postgresContent.getBytes(StandardCharsets.UTF_8));
+
+ CONTAINER
+ .withDatabaseName("postgres/public")
+ .withFileSystemBind(
+ tempFile.toFile().getAbsolutePath(),
+ "/etc/trino/catalog/postgres.properties",
+ BindMode.READ_WRITE)
+ .waitingFor(Wait.forHttp("/ui/login.html").forStatusCode(200));
+ CONTAINER.start();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ Arrays.asList(CONTAINER, CONTAINER_DB)
+ .forEach(
+ container -> {
+ container.stop();
+ DockerResource.cleanContainers(container);
+ });
+ }
+ };
}
}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
index 56c04f0ce..c7e680907 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkMigrationTest.java
@@ -22,7 +22,7 @@
import org.apache.flink.connector.jdbc.JdbcTestBase;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
-import org.apache.flink.connector.jdbc.derby.testutils.DerbyDatabase;
+import org.apache.flink.connector.jdbc.derby.DerbyTestBase;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -52,7 +52,7 @@
import static org.apache.flink.streaming.util.OperatorSnapshotUtil.writeStateHandle;
/** Tests state migration for {@link JdbcXaSinkFunction}. */
-public class JdbcXaSinkMigrationTest extends JdbcTestBase {
+public class JdbcXaSinkMigrationTest extends JdbcTestBase implements DerbyTestBase {
// write a snapshot:
// java
@@ -60,7 +60,7 @@ public class JdbcXaSinkMigrationTest extends JdbcTestBase {
// mvn exec:java -Dexec.mainClass="" -Dexec.args=''
// -Dexec.classpathScope=test -Dexec.cleanupDaemonThreads=false
public static void main(String[] args) throws Exception {
- new DerbyDatabase().startDatabase();
+ // new DerbyDatabase();
JdbcXaSinkMigrationTest test = new JdbcXaSinkMigrationTest();
test.writeSnapshot(parseVersionArg(args));
}
diff --git a/pom.xml b/pom.xml
index fda0c8423..fa8d48376 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,6 @@ under the License.
1.7.36
2.17.2
-
flink-connector-jdbc-parent
-XX:+UseG1GC -Xms256m -XX:+IgnoreUnrecognizedVMOptions ${surefire.module.config}