Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor chunk cache #443

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
<suppress checks="ClassDataAbstractionCoupling" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files=".*Test\.java"/>
<suppress checks="ClassFanOutComplexity" files="RemoteStorageManager.java"/>
<suppress checks="ClassFanOutComplexity" files="ChunkCache.java"/>
<suppress checks="ClassFanOutComplexity" files="AbstractChunkCache.java"/>
<suppress checks="ClassFanOutComplexity" files="DefaultChunkManager.java"/>
<suppress checks="ClassFanOutComplexity" files="AzureBlobStorage.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="CaffeineStatsCounter.java"/>
<suppress checks="ClassDataAbstractionCoupling" files="S3StorageConfig.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,13 @@ public void configure(final Map<String, ?> configs) {

public ChunkManager initChunkManager(final ObjectFetcher fileFetcher,
final AesEncryptionProvider aesEncryptionProvider) {
final DefaultChunkManager defaultChunkManager = new DefaultChunkManager(fileFetcher, aesEncryptionProvider);
if (config.cacheClass() != null) {
try {
final ChunkCache<?> chunkCache = config
.cacheClass()
.getDeclaredConstructor(ChunkManager.class)
.newInstance(defaultChunkManager);
chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX));
return chunkCache;
} catch (final ReflectiveOperationException e) {
throw new RuntimeException(e);
}
} else {
return defaultChunkManager;
final ChunkCache<?> chunkCache;
try {
chunkCache = config.cacheClass().getDeclaredConstructor().newInstance();
} catch (final ReflectiveOperationException e) {
throw new RuntimeException(e);
}
chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX));
return new DefaultChunkManager(fileFetcher, aesEncryptionProvider, chunkCache, config.cachePrefetchingSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.kafka.common.config.ConfigDef;

import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.NoOpChunkCache;
import io.aiven.kafka.tieredstorage.config.validators.Subclass;

public class ChunkManagerFactoryConfig extends AbstractConfig {
Expand All @@ -30,6 +31,11 @@ public class ChunkManagerFactoryConfig extends AbstractConfig {
public static final String CHUNK_CACHE_CONFIG = CHUNK_CACHE_PREFIX + "class";
private static final String CHUNK_CACHE_DOC = "The chunk cache implementation";

private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = CHUNK_CACHE_PREFIX + "prefetch.max.size";
private static final String CACHE_PREFETCH_MAX_SIZE_DOC =
"The amount of data that should be eagerly prefetched and cached";
private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be

private static final ConfigDef CONFIG;

static {
Expand All @@ -38,11 +44,19 @@ public class ChunkManagerFactoryConfig extends AbstractConfig {
CONFIG.define(
CHUNK_CACHE_CONFIG,
ConfigDef.Type.CLASS,
null,
NoOpChunkCache.class,
Subclass.of(ChunkCache.class),
ConfigDef.Importance.MEDIUM,
CHUNK_CACHE_DOC
);
CONFIG.define(
CACHE_PREFETCH_MAX_SIZE_CONFIG,
ConfigDef.Type.INT,
CACHE_PREFETCHING_SIZE_DEFAULT,
ConfigDef.Range.between(0, Integer.MAX_VALUE),
ConfigDef.Importance.MEDIUM,
CACHE_PREFETCH_MAX_SIZE_DOC
);
}

public ChunkManagerFactoryConfig(final Map<?, ?> originals) {
Expand All @@ -53,4 +67,8 @@ public ChunkManagerFactoryConfig(final Map<?, ?> originals) {
public Class<ChunkCache<?>> cacheClass() {
return (Class<ChunkCache<?>>) getClass(CHUNK_CACHE_CONFIG);
}

public int cachePrefetchingSize() {
return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,22 @@

package io.aiven.kafka.tieredstorage.chunkmanager;

import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;

import io.aiven.kafka.tieredstorage.Chunk;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache;
import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata;
import io.aiven.kafka.tieredstorage.manifest.SegmentManifest;
import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider;
import io.aiven.kafka.tieredstorage.storage.BytesRange;
import io.aiven.kafka.tieredstorage.storage.ObjectFetcher;
import io.aiven.kafka.tieredstorage.storage.ObjectKey;
import io.aiven.kafka.tieredstorage.storage.StorageBackendException;
Expand All @@ -36,10 +44,19 @@
public class DefaultChunkManager implements ChunkManager {
private final ObjectFetcher fetcher;
private final AesEncryptionProvider aesEncryptionProvider;
final ChunkCache<?> chunkCache;
private final int prefetchingSize;

public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) {
private final Executor executor = new ForkJoinPool();

public DefaultChunkManager(final ObjectFetcher fetcher,
final AesEncryptionProvider aesEncryptionProvider,
final ChunkCache<?> chunkCache,
final int prefetchingSize) {
this.fetcher = fetcher;
this.aesEncryptionProvider = aesEncryptionProvider;
this.chunkCache = chunkCache;
this.prefetchingSize = prefetchingSize;
}

/**
Expand All @@ -48,24 +65,60 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi
* @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed).
*/
public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest,
final int chunkId) throws StorageBackendException {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);
final int chunkId) throws StorageBackendException, IOException {
final var currentChunk = manifest.chunkIndex().chunks().get(chunkId);
startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize);

final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId);
return chunkCache.getChunk(chunkKey, createChunkSupplier(objectKey, manifest, chunkId));
}

final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range());
private void startPrefetching(final ObjectKey segmentKey,
final SegmentManifest segmentManifest,
final int startPosition) {
if (prefetchingSize > 0) {
final BytesRange prefetchingRange;
if (Integer.MAX_VALUE - startPosition < prefetchingSize) {
prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE);
} else {
prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize);
}
final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange);
chunks.forEach(chunk -> {
final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id);
chunkCache.supplyIfAbsent(chunkKey, createChunkSupplier(segmentKey, segmentManifest, chunk.id));
});
}
}

DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
private Supplier<CompletableFuture<InputStream>> createChunkSupplier(final ObjectKey objectKey,
final SegmentManifest manifest,
final int chunkId) {
return () -> CompletableFuture.supplyAsync(() -> {
final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId);

final InputStream chunkContent;
try {
chunkContent = fetcher.fetch(objectKey, chunk.range());
} catch (final StorageBackendException e) {
throw new CompletionException(e);
}

DetransformChunkEnumeration detransformEnum =
new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk));
final Optional<SegmentEncryptionMetadata> encryptionMetadata = manifest.encryption();
if (encryptionMetadata.isPresent()) {
detransformEnum = new DecryptionChunkEnumeration(
detransformEnum,
encryptionMetadata.get().ivSize(),
encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get())
);
}
if (manifest.compression()) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
return detransformFinisher.toInputStream();
);
}
if (manifest.compression()) {
detransformEnum = new DecompressionChunkEnumeration(detransformEnum);
}
final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum);
return detransformFinisher.toInputStream();
}, executor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.chunkmanager.cache;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey;
import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter;

import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.github.benmanes.caffeine.cache.Weigher;

public abstract class AbstractChunkCache<T> extends BaseChunkCache<T> {
private static final long GET_TIMEOUT_SEC = 10;
private static final String METRIC_GROUP = "chunk-cache";

private final Executor executor = new ForkJoinPool();

private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP);

protected AsyncCache<ChunkKey, T> cache;

/**
* Get a chunk from the cache.
*
* <p>If the chunk is not present in the cache, use {@literal chunkSupplier} to get it and cache.
*
* <p>Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime
* there is a call to cache the InputStream is recreated from the data stored in cache and stored into local
* variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is
* opened right when fetching from cache happens even if the actual value is removed from the cache,
* the InputStream will still contain the data.
*/
@Override
protected InputStream getChunk0(final ChunkKey chunkKey,
final Supplier<CompletableFuture<InputStream>> chunkSupplier)
throws ExecutionException, InterruptedException, TimeoutException {
final AtomicReference<InputStream> result = new AtomicReference<>();
return cache.asMap()
.compute(chunkKey, (key, val) -> {
if (val == null) {
statsCounter.recordMiss();
// TODO do not put a failed future into the cache
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing problem, decided not to handle it in this PR

return chunkSupplier.get().thenApplyAsync(chunk -> {
try {
final T t = this.cacheChunk(chunkKey, chunk);
result.getAndSet(cachedChunkToInputStream(t));
return t;
} catch (final IOException e) {
throw new CompletionException(e);
}
}, executor);
} else {
statsCounter.recordHit();
return CompletableFuture.supplyAsync(() -> {
try {
final T cachedChunk = val.get();
result.getAndSet(cachedChunkToInputStream(cachedChunk));
return cachedChunk;
} catch (final InterruptedException | ExecutionException e) {
throw new CompletionException(e);
}
}, executor);
}
})
.thenApplyAsync(t -> result.get())
.get(GET_TIMEOUT_SEC, TimeUnit.SECONDS);
}

public void supplyIfAbsent(final ChunkKey chunkKey,
final Supplier<CompletableFuture<InputStream>> chunkSupplier) {
// TODO do some logging if error
// TODO do not put a failed future into the cache
Comment on lines +99 to +100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are pre-existing problems, decided not to handle them in this PR

cache.asMap().computeIfAbsent(chunkKey,
key -> chunkSupplier.get().thenApplyAsync(chunk -> {
try {
return this.cacheChunk(chunkKey, chunk);
} catch (final IOException e) {
throw new CompletionException(e);
}
}, executor));
}

public abstract InputStream cachedChunkToInputStream(final T cachedChunk);

public abstract T cacheChunk(final ChunkKey chunkKey, final InputStream chunk) throws IOException;

public abstract RemovalListener<ChunkKey, T> removalListener();

public abstract Weigher<ChunkKey, T> weigher();

protected AsyncCache<ChunkKey, T> buildCache(final ChunkCacheConfig config) {
final Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher()));
config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess);
final var cache = cacheBuilder.evictionListener(removalListener())
.scheduler(Scheduler.systemScheduler())
.executor(executor)
.recordStats(() -> statsCounter)
.buildAsync();
statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize);
return cache;
}
}
Loading