Skip to content

Commit

Permalink
Test cleanup of docker images
Browse files Browse the repository at this point in the history
  • Loading branch information
eskabetxe committed Jul 12, 2024
1 parent c242b83 commit 86fad05
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 156 deletions.
8 changes: 8 additions & 0 deletions flink-connector-jdbc-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ under the License.
<scope>test</scope>
</dependency>

<!-- TestContainer dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>jdbc</artifactId>
<scope>test</scope>
</dependency>


</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
import org.apache.flink.util.FlinkRuntimeException;

import java.io.OutputStream;
Expand All @@ -36,39 +38,44 @@ public class DerbyDatabase extends DatabaseExtension {
public void write(int b) {}
};

private static DerbyMetadata metadata;
private static final DerbyMetadata metadata = new DerbyMetadata("test");

public static DerbyMetadata getMetadata() {
if (metadata == null) {
metadata = new DerbyMetadata("test");
}
return metadata;
}

@Override
public DatabaseMetadata startDatabase() throws Exception {
DatabaseMetadata metadata = getMetadata();
try {
System.setProperty(
"derby.stream.error.field",
DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
Class.forName(metadata.getDriverClass());
DriverManager.getConnection(String.format("%s;create=true", metadata.getJdbcUrl()))
.close();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return metadata;
protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}

@Override
protected void stopDatabase() throws Exception {
try {
DriverManager.getConnection(String.format("%s;shutdown=true", metadata.getJdbcUrl()))
.close();
} catch (SQLException ignored) {
} finally {
metadata = null;
}
protected DatabaseResource getResource() {
return new MemoryResource() {
@Override
public void start() {
try {
System.setProperty(
"derby.stream.error.field",
DerbyDatabase.class.getCanonicalName() + ".DEV_NULL");
Class.forName(metadata.getDriverClass());
DriverManager.getConnection(
String.format("%s;create=true", metadata.getJdbcUrl()))
.close();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}

@Override
public void stop() {
try {
DriverManager.getConnection(
String.format("%s;shutdown=true", metadata.getJdbcUrl()))
.close();
} catch (SQLException ignored) {
}
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,38 +19,45 @@

import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
import org.apache.flink.connector.jdbc.testutils.resources.MemoryResource;
import org.apache.flink.util.FlinkRuntimeException;

import java.sql.DriverManager;

/** H2 database for testing. */
public class H2XaDatabase extends DatabaseExtension {

private static H2Metadata metadata;
private static final H2Metadata metadata = new H2Metadata("test");

public static H2Metadata getMetadata() {
if (metadata == null) {
metadata = new H2Metadata("test");
}
return metadata;
}

@Override
protected DatabaseMetadata startDatabase() throws Exception {
DatabaseMetadata metadata = getMetadata();
try {
Class.forName(metadata.getDriverClass());
DriverManager.getConnection(
String.format(
"%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s",
metadata.getJdbcUrl(), "test", "test"))
.close();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
return metadata;
protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}

@Override
protected void stopDatabase() throws Exception {}
protected DatabaseResource getResource() {
return new MemoryResource() {
@Override
public void start() {
try {
Class.forName(metadata.getDriverClass());
DriverManager.getConnection(
String.format(
"%s;DB_CLOSE_DELAY=-1;INIT=CREATE SCHEMA IF NOT EXISTS %s\\;SET SCHEMA %s",
metadata.getJdbcUrl(), "test", "test"))
.close();
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}

@Override
public void stop() {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,42 +35,21 @@
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.junit.platform.commons.support.AnnotationSupport.findRepeatableAnnotations;

/** Database extension for testing. */
public abstract class DatabaseExtension
implements BeforeAllCallback,
AfterAllCallback,
BeforeEachCallback,
AfterEachCallback,
ExtensionContext.Store.CloseableResource {

/**
* Database Lifecycle for testing. The goal it's that all database containers are create only
* one time.
*/
public enum Lifecycle {
/** Database will be instantiated only one time. */
PER_EXECUTION,
/** Database will be instantiated by class. */
PER_CLASS
}
implements BeforeAllCallback, AfterAllCallback, BeforeEachCallback, AfterEachCallback {

protected abstract DatabaseMetadata startDatabase() throws Exception;
protected abstract DatabaseMetadata getMetadataDB();

protected abstract void stopDatabase() throws Exception;
protected abstract DatabaseResource getResource();

private final String uniqueKey = this.getClass().getSimpleName();

protected Lifecycle getLifecycle() {
return Lifecycle.PER_EXECUTION;
}

private ExtensionContext.Store getStore(ExtensionContext context) {
return context.getRoot().getStore(Namespace.GLOBAL);
}
private final String uniqueResource = String.format("%sResource", uniqueKey);

private DatabaseTest getDatabaseBaseTest(Class<?> clazz) throws Exception {
DatabaseTest dbClazz = null;
Expand All @@ -95,8 +74,7 @@ private void getManagedTables(
.filter(DatabaseTest.class::isAssignableFrom)
.ifPresent(
clazz -> {
DatabaseMetadata metadata =
getStore(context).get(uniqueKey, DatabaseMetadata.class);
DatabaseMetadata metadata = getMetadataDB();
if (metadata != null) {
try (Connection conn = metadata.getConnection()) {
for (TableManaged table :
Expand Down Expand Up @@ -132,15 +110,19 @@ private boolean ignoreTestDatabase(ExtensionContext context) {
return false;
}

private DatabaseResource getResource(ExtensionContext context) {
return context.getRoot()
.getStore(Namespace.GLOBAL)
.getOrComputeIfAbsent(uniqueResource, startResource(), DatabaseResource.class);
}

@Override
public final void beforeAll(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}

if (getStore(context).get(uniqueKey) == null) {
getStore(context).put(uniqueKey, startDatabase());
}
getResource(context);

getManagedTables(context, TableManaged::createTable);
}
Expand All @@ -153,6 +135,7 @@ public final void afterEach(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}

getManagedTables(context, TableManaged::deleteTable);
}

Expand All @@ -161,18 +144,9 @@ public final void afterAll(ExtensionContext context) throws Exception {
if (ignoreTestDatabase(context)) {
return;
}
getManagedTables(context, TableManaged::dropTable);
if (Lifecycle.PER_CLASS == getLifecycle()) {
stopDatabase();
getStore(context).remove(uniqueKey, DatabaseMetadata.class);
}
}

@Override
public final void close() throws Throwable {
if (Lifecycle.PER_EXECUTION == getLifecycle()) {
stopDatabase();
}
getManagedTables(context, TableManaged::dropTable);
getResource(context);
}

private Set<String> retrieveDatabaseExtensions(final ExtensionContext context) {
Expand All @@ -199,4 +173,12 @@ public Set<String> apply(ExtensionContext context, Set<String> acc) {

return retrieveExtensions.apply(context, new HashSet<>());
}

private Function<String, DatabaseResource> startResource() {
return s -> {
DatabaseResource resource = getResource();
resource.start();
return resource;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.apache.flink.connector.jdbc.testutils;

import org.junit.jupiter.api.extension.ExtensionContext.Store.CloseableResource;

/** Database resource for testing. */
public interface DatabaseResource extends CloseableResource {

void start();

void stop();

default void close() throws Throwable {
stop();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.flink.connector.jdbc.testutils.resources;

import org.apache.flink.connector.jdbc.testutils.DatabaseResource;

import com.github.dockerjava.api.DockerClient;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.JdbcDatabaseContainer;

import java.util.Arrays;

/** Docker based database resource. */
public class DockerResource implements DatabaseResource {

private final JdbcDatabaseContainer<?> container;

public DockerResource(JdbcDatabaseContainer<?> container) {
this.container = container;
}

@Override
public void start() {
this.container.start();
}

@Override
public void stop() {
this.container.stop();
}

@Override
public void close() throws Throwable {
stop();
cleanContainers(container);
}

public static void cleanContainers(GenericContainer<?> container) {
try {
DockerClient client = DockerClientFactory.instance().client();
// client.removeImageCmd(container.getDockerImageName()).exec();
client.listImagesCmd().exec().stream()
.filter(
image ->
Arrays.stream(image.getRepoTags())
.anyMatch(
tag ->
!tag.contains("testcontainers/ryuk")
&& !tag.contains(
container
.getDockerImageName())))
.forEach(image -> client.removeImageCmd(image.getId()).exec());

} catch (Exception ignore) {
System.out.println("ERROR:");
ignore.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.apache.flink.connector.jdbc.testutils.resources;

import org.apache.flink.connector.jdbc.testutils.DatabaseResource;

/** Memory based database resource. */
public interface MemoryResource extends DatabaseResource {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
import org.apache.flink.util.FlinkRuntimeException;

import org.testcontainers.containers.JdbcDatabaseContainer;
Expand Down Expand Up @@ -66,15 +68,13 @@ public static CrateDBMetadata getMetadata() {
}

@Override
protected DatabaseMetadata startDatabase() throws Exception {
CONTAINER.start();
protected DatabaseMetadata getMetadataDB() {
return getMetadata();
}

@Override
protected void stopDatabase() throws Exception {
CONTAINER.stop();
metadata = null;
protected DatabaseResource getResource() {
return new DockerResource(CONTAINER);
}

/**
Expand Down
Loading

0 comments on commit 86fad05

Please sign in to comment.