From b1ae9a4e2b042aa522c44e07c9a0b849af5897eb Mon Sep 17 00:00:00 2001 From: Ivan Yurchenko Date: Fri, 10 Nov 2023 13:02:56 +0200 Subject: [PATCH] Refactor chunk cache Before, the chunk cache(s) was a wrapper over `DefaultChunkManager`. In some situations, this is not very convenient and also the cache has more responsibilities than it should (like prefetching). This commit refactors the cache to be a pluggable mechanism in the chunk manager. --- checkstyle/suppressions.xml | 3 +- .../chunkmanager/ChunkManagerFactory.java | 21 +- .../ChunkManagerFactoryConfig.java | 20 +- .../chunkmanager/DefaultChunkManager.java | 83 ++++- .../cache/AbstractChunkCache.java | 131 +++++++ .../chunkmanager/cache/BaseChunkCache.java | 61 ++++ .../chunkmanager/cache/ChunkCache.java | 153 +------- .../chunkmanager/cache/ChunkCacheConfig.java | 18 - .../cache/DiskBasedChunkCache.java | 7 +- .../cache/InMemoryChunkCache.java | 7 +- .../chunkmanager/cache/NoOpChunkCache.java | 45 +++ .../ChunkManagerFactoryConfigTest.java | 15 +- .../chunkmanager/ChunkManagerFactoryTest.java | 6 +- .../chunkmanager/DefaultChunkManagerTest.java | 119 +++++- ...ava => AbstractChunkCacheMetricsTest.java} | 47 +-- .../cache/AbstractChunkCacheTest.java | 260 ++++++++++++++ .../cache/ChunkCacheConfigTest.java | 13 - .../chunkmanager/cache/ChunkCacheTest.java | 340 ------------------ .../cache/DiskBasedChunkCacheTest.java | 11 +- .../transform/FetchChunkEnumerationTest.java | 5 +- 20 files changed, 756 insertions(+), 609 deletions(-) create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCache.java create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/BaseChunkCache.java create mode 100644 core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/NoOpChunkCache.java rename core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/{ChunkCacheMetricsTest.java => AbstractChunkCacheMetricsTest.java} (72%) create mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheTest.java delete mode 100644 core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e4415b5e5..74bde833e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -22,7 +22,8 @@ - + + diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java index cc8c2c4da..947c76859 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactory.java @@ -34,20 +34,13 @@ public void configure(final Map configs) { public ChunkManager initChunkManager(final ObjectFetcher fileFetcher, final AesEncryptionProvider aesEncryptionProvider) { - final DefaultChunkManager defaultChunkManager = new DefaultChunkManager(fileFetcher, aesEncryptionProvider); - if (config.cacheClass() != null) { - try { - final ChunkCache chunkCache = config - .cacheClass() - .getDeclaredConstructor(ChunkManager.class) - .newInstance(defaultChunkManager); - chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX)); - return chunkCache; - } catch (final ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } else { - return defaultChunkManager; + final ChunkCache chunkCache; + try { + chunkCache = config.cacheClass().getDeclaredConstructor().newInstance(); + } catch (final ReflectiveOperationException e) { + throw new RuntimeException(e); } + chunkCache.configure(config.originalsWithPrefix(ChunkManagerFactoryConfig.CHUNK_CACHE_PREFIX)); + return new DefaultChunkManager(fileFetcher, aesEncryptionProvider, chunkCache, config.cachePrefetchingSize()); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfig.java index 10a820080..a8b25b035 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfig.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.config.ConfigDef; import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.NoOpChunkCache; import io.aiven.kafka.tieredstorage.config.validators.Subclass; public class ChunkManagerFactoryConfig extends AbstractConfig { @@ -30,6 +31,11 @@ public class ChunkManagerFactoryConfig extends AbstractConfig { public static final String CHUNK_CACHE_CONFIG = CHUNK_CACHE_PREFIX + "class"; private static final String CHUNK_CACHE_DOC = "The chunk cache implementation"; + private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = CHUNK_CACHE_PREFIX + "prefetch.max.size"; + private static final String CACHE_PREFETCH_MAX_SIZE_DOC = + "The amount of data that should be eagerly prefetched and cached"; + private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be + private static final ConfigDef CONFIG; static { @@ -38,11 +44,19 @@ public class ChunkManagerFactoryConfig extends AbstractConfig { CONFIG.define( CHUNK_CACHE_CONFIG, ConfigDef.Type.CLASS, - null, + NoOpChunkCache.class, Subclass.of(ChunkCache.class), ConfigDef.Importance.MEDIUM, CHUNK_CACHE_DOC ); + CONFIG.define( + CACHE_PREFETCH_MAX_SIZE_CONFIG, + ConfigDef.Type.INT, + CACHE_PREFETCHING_SIZE_DEFAULT, + ConfigDef.Range.between(0, Integer.MAX_VALUE), + ConfigDef.Importance.MEDIUM, + CACHE_PREFETCH_MAX_SIZE_DOC + ); } public ChunkManagerFactoryConfig(final Map originals) { @@ -53,4 +67,8 @@ public ChunkManagerFactoryConfig(final Map originals) { public Class> cacheClass() { return (Class>) getClass(CHUNK_CACHE_CONFIG); } + + public int cachePrefetchingSize() { + return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG); + } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java index ac219eee5..b6e147a7e 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManager.java @@ -16,14 +16,22 @@ package io.aiven.kafka.tieredstorage.chunkmanager; +import java.io.IOException; import java.io.InputStream; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.function.Supplier; import io.aiven.kafka.tieredstorage.Chunk; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadata; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; import io.aiven.kafka.tieredstorage.security.AesEncryptionProvider; +import io.aiven.kafka.tieredstorage.storage.BytesRange; import io.aiven.kafka.tieredstorage.storage.ObjectFetcher; import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; @@ -36,10 +44,19 @@ public class DefaultChunkManager implements ChunkManager { private final ObjectFetcher fetcher; private final AesEncryptionProvider aesEncryptionProvider; + final ChunkCache chunkCache; + private final int prefetchingSize; - public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvider aesEncryptionProvider) { + private final Executor executor = new ForkJoinPool(); + + public DefaultChunkManager(final ObjectFetcher fetcher, + final AesEncryptionProvider aesEncryptionProvider, + final ChunkCache chunkCache, + final int prefetchingSize) { this.fetcher = fetcher; this.aesEncryptionProvider = aesEncryptionProvider; + this.chunkCache = chunkCache; + this.prefetchingSize = prefetchingSize; } /** @@ -48,24 +65,60 @@ public DefaultChunkManager(final ObjectFetcher fetcher, final AesEncryptionProvi * @return an {@link InputStream} of the chunk, plain text (i.e., decrypted and decompressed). */ public InputStream getChunk(final ObjectKey objectKey, final SegmentManifest manifest, - final int chunkId) throws StorageBackendException { - final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); + final int chunkId) throws StorageBackendException, IOException { + final var currentChunk = manifest.chunkIndex().chunks().get(chunkId); + startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize); + + final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId); + return chunkCache.getChunk(chunkKey, createChunkSupplier(objectKey, manifest, chunkId)); + } - final InputStream chunkContent = fetcher.fetch(objectKey, chunk.range()); + private void startPrefetching(final ObjectKey segmentKey, + final SegmentManifest segmentManifest, + final int startPosition) { + if (prefetchingSize > 0) { + final BytesRange prefetchingRange; + if (Integer.MAX_VALUE - startPosition < prefetchingSize) { + prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE); + } else { + prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize); + } + final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange); + chunks.forEach(chunk -> { + final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id); + chunkCache.supplyIfAbsent(chunkKey, createChunkSupplier(segmentKey, segmentManifest, chunk.id)); + }); + } + } - DetransformChunkEnumeration detransformEnum = new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); - final Optional encryptionMetadata = manifest.encryption(); - if (encryptionMetadata.isPresent()) { - detransformEnum = new DecryptionChunkEnumeration( + private Supplier> createChunkSupplier(final ObjectKey objectKey, + final SegmentManifest manifest, + final int chunkId) { + return () -> CompletableFuture.supplyAsync(() -> { + final Chunk chunk = manifest.chunkIndex().chunks().get(chunkId); + + final InputStream chunkContent; + try { + chunkContent = fetcher.fetch(objectKey, chunk.range()); + } catch (final StorageBackendException e) { + throw new CompletionException(e); + } + + DetransformChunkEnumeration detransformEnum = + new BaseDetransformChunkEnumeration(chunkContent, List.of(chunk)); + final Optional encryptionMetadata = manifest.encryption(); + if (encryptionMetadata.isPresent()) { + detransformEnum = new DecryptionChunkEnumeration( detransformEnum, encryptionMetadata.get().ivSize(), encryptedChunk -> aesEncryptionProvider.decryptionCipher(encryptedChunk, encryptionMetadata.get()) - ); - } - if (manifest.compression()) { - detransformEnum = new DecompressionChunkEnumeration(detransformEnum); - } - final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum); - return detransformFinisher.toInputStream(); + ); + } + if (manifest.compression()) { + detransformEnum = new DecompressionChunkEnumeration(detransformEnum); + } + final DetransformFinisher detransformFinisher = new DetransformFinisher(detransformEnum); + return detransformFinisher.toInputStream(); + }, executor); } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCache.java new file mode 100644 index 000000000..711aa2d48 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCache.java @@ -0,0 +1,131 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.chunkmanager.cache; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; +import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; + +import com.github.benmanes.caffeine.cache.AsyncCache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Scheduler; +import com.github.benmanes.caffeine.cache.Weigher; + +public abstract class AbstractChunkCache extends BaseChunkCache { + private static final long GET_TIMEOUT_SEC = 10; + private static final String METRIC_GROUP = "chunk-cache"; + + private final Executor executor = new ForkJoinPool(); + + private final CaffeineStatsCounter statsCounter = new CaffeineStatsCounter(METRIC_GROUP); + + protected AsyncCache cache; + + /** + * Get a chunk from the cache. + * + *

If the chunk is not present in the cache, use {@literal chunkSupplier} to get it and cache. + * + *

Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime + * there is a call to cache the InputStream is recreated from the data stored in cache and stored into local + * variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is + * opened right when fetching from cache happens even if the actual value is removed from the cache, + * the InputStream will still contain the data. + */ + @Override + protected InputStream getChunk0(final ChunkKey chunkKey, + final Supplier> chunkSupplier) + throws ExecutionException, InterruptedException, TimeoutException { + final AtomicReference result = new AtomicReference<>(); + return cache.asMap() + .compute(chunkKey, (key, val) -> { + if (val == null) { + statsCounter.recordMiss(); + // TODO do not put a failed future into the cache + return chunkSupplier.get().thenApplyAsync(chunk -> { + try { + final T t = this.cacheChunk(chunkKey, chunk); + result.getAndSet(cachedChunkToInputStream(t)); + return t; + } catch (final IOException e) { + throw new CompletionException(e); + } + }, executor); + } else { + statsCounter.recordHit(); + return CompletableFuture.supplyAsync(() -> { + try { + final T cachedChunk = val.get(); + result.getAndSet(cachedChunkToInputStream(cachedChunk)); + return cachedChunk; + } catch (final InterruptedException | ExecutionException e) { + throw new CompletionException(e); + } + }, executor); + } + }) + .thenApplyAsync(t -> result.get()) + .get(GET_TIMEOUT_SEC, TimeUnit.SECONDS); + } + + public void supplyIfAbsent(final ChunkKey chunkKey, + final Supplier> chunkSupplier) { + // TODO do some logging if error + // TODO do not put a failed future into the cache + cache.asMap().computeIfAbsent(chunkKey, + key -> chunkSupplier.get().thenApplyAsync(chunk -> { + try { + return this.cacheChunk(chunkKey, chunk); + } catch (final IOException e) { + throw new CompletionException(e); + } + }, executor)); + } + + public abstract InputStream cachedChunkToInputStream(final T cachedChunk); + + public abstract T cacheChunk(final ChunkKey chunkKey, final InputStream chunk) throws IOException; + + public abstract RemovalListener removalListener(); + + public abstract Weigher weigher(); + + protected AsyncCache buildCache(final ChunkCacheConfig config) { + final Caffeine cacheBuilder = Caffeine.newBuilder(); + config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher())); + config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess); + final var cache = cacheBuilder.evictionListener(removalListener()) + .scheduler(Scheduler.systemScheduler()) + .executor(executor) + .recordStats(() -> statsCounter) + .buildAsync(); + statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); + return cache; + } +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/BaseChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/BaseChunkCache.java new file mode 100644 index 000000000..1f4ffb2ea --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/BaseChunkCache.java @@ -0,0 +1,61 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.chunkmanager.cache; + +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +abstract class BaseChunkCache implements ChunkCache { + @Override + public InputStream getChunk(final ChunkKey chunkKey, + final Supplier> chunkSupplier) + throws StorageBackendException, IOException { + try { + return getChunk0(chunkKey, chunkSupplier); + } catch (final ExecutionException e) { + // Unwrap previously wrapped exceptions if possible. + final Throwable cause = e.getCause(); + + // We don't really expect this case, but handle it nevertheless. + if (cause == null) { + throw new RuntimeException(e); + } + if (e.getCause() instanceof StorageBackendException) { + throw (StorageBackendException) e.getCause(); + } + if (e.getCause() instanceof IOException) { + throw (IOException) e.getCause(); + } + + throw new RuntimeException(e); + } catch (final InterruptedException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + protected abstract InputStream getChunk0( + final ChunkKey chunkKey, + final Supplier> chunkSupplier + ) throws ExecutionException, InterruptedException, TimeoutException; +} diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java index 35c5ce302..517419473 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCache.java @@ -19,157 +19,18 @@ import java.io.IOException; import java.io.InputStream; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import org.apache.kafka.common.Configurable; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; -import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; -import io.aiven.kafka.tieredstorage.metrics.CaffeineStatsCounter; -import io.aiven.kafka.tieredstorage.storage.BytesRange; -import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackendException; -import com.github.benmanes.caffeine.cache.AsyncCache; -import com.github.benmanes.caffeine.cache.Caffeine; -import com.github.benmanes.caffeine.cache.RemovalListener; -import com.github.benmanes.caffeine.cache.Scheduler; -import com.github.benmanes.caffeine.cache.Weigher; +public interface ChunkCache extends Configurable { + InputStream getChunk(final ChunkKey chunkKey, + final Supplier> chunkSupplier) + throws StorageBackendException, IOException; -public abstract class ChunkCache implements ChunkManager, Configurable { - private static final long GET_TIMEOUT_SEC = 10; - private static final String METRIC_GROUP = "chunk-cache"; - - private final ChunkManager chunkManager; - private final Executor executor = new ForkJoinPool(); - - final CaffeineStatsCounter statsCounter; - - protected AsyncCache cache; - - private int prefetchingSize; - - protected ChunkCache(final ChunkManager chunkManager) { - this.chunkManager = chunkManager; - this.statsCounter = new CaffeineStatsCounter(METRIC_GROUP); - } - - /** - * Fetches a specific chunk from remote storage and stores into the cache. - * Since it's not possible to cache an opened InputStream, the actual data is cached, and everytime - * there is a call to cache the InputStream is recreated from the data stored in cache and stored into local - * variable. This also allows solving the race condition between eviction and fetching. Since the InputStream is - * opened right when fetching from cache happens even if the actual value is removed from the cache, - * the InputStream will still contain the data. - */ - public InputStream getChunk(final ObjectKey objectKey, - final SegmentManifest manifest, - final int chunkId) throws StorageBackendException, IOException { - final var currentChunk = manifest.chunkIndex().chunks().get(chunkId); - startPrefetching(objectKey, manifest, currentChunk.originalPosition + currentChunk.originalSize); - final ChunkKey chunkKey = new ChunkKey(objectKey.value(), chunkId); - final AtomicReference result = new AtomicReference<>(); - try { - return cache.asMap() - .compute(chunkKey, (key, val) -> CompletableFuture.supplyAsync(() -> { - if (val == null) { - statsCounter.recordMiss(); - try { - final InputStream chunk = - chunkManager.getChunk(objectKey, manifest, chunkId); - final T t = this.cacheChunk(chunkKey, chunk); - result.getAndSet(cachedChunkToInputStream(t)); - return t; - } catch (final StorageBackendException | IOException e) { - throw new CompletionException(e); - } - } else { - statsCounter.recordHit(); - try { - final T cachedChunk = val.get(); - result.getAndSet(cachedChunkToInputStream(cachedChunk)); - return cachedChunk; - } catch (final InterruptedException | ExecutionException e) { - throw new CompletionException(e); - } - } - }, executor)) - .thenApplyAsync(t -> result.get()) - .get(GET_TIMEOUT_SEC, TimeUnit.SECONDS); - } catch (final ExecutionException e) { - // Unwrap previously wrapped exceptions if possible. - final Throwable cause = e.getCause(); - - // We don't really expect this case, but handle it nevertheless. - if (cause == null) { - throw new RuntimeException(e); - } - if (e.getCause() instanceof StorageBackendException) { - throw (StorageBackendException) e.getCause(); - } - if (e.getCause() instanceof IOException) { - throw (IOException) e.getCause(); - } - - throw new RuntimeException(e); - } catch (final InterruptedException | TimeoutException e) { - throw new RuntimeException(e); - } - } - - public abstract InputStream cachedChunkToInputStream(final T cachedChunk); - - public abstract T cacheChunk(final ChunkKey chunkKey, final InputStream chunk) throws IOException; - - public abstract RemovalListener removalListener(); - - public abstract Weigher weigher(); - - protected AsyncCache buildCache(final ChunkCacheConfig config) { - this.prefetchingSize = config.cachePrefetchingSize(); - final Caffeine cacheBuilder = Caffeine.newBuilder(); - config.cacheSize().ifPresent(maximumWeight -> cacheBuilder.maximumWeight(maximumWeight).weigher(weigher())); - config.cacheRetention().ifPresent(cacheBuilder::expireAfterAccess); - final var cache = cacheBuilder.evictionListener(removalListener()) - .scheduler(Scheduler.systemScheduler()) - .executor(executor) - .recordStats(() -> statsCounter) - .buildAsync(); - statsCounter.registerSizeMetric(cache.synchronous()::estimatedSize); - return cache; - } - - private void startPrefetching(final ObjectKey segmentKey, - final SegmentManifest segmentManifest, - final int startPosition) { - if (prefetchingSize > 0) { - final BytesRange prefetchingRange; - if (Integer.MAX_VALUE - startPosition < prefetchingSize) { - prefetchingRange = BytesRange.of(startPosition, Integer.MAX_VALUE); - } else { - prefetchingRange = BytesRange.ofFromPositionAndSize(startPosition, prefetchingSize); - } - final var chunks = segmentManifest.chunkIndex().chunksForRange(prefetchingRange); - chunks.forEach(chunk -> { - final ChunkKey chunkKey = new ChunkKey(segmentKey.value(), chunk.id); - cache.asMap() - .computeIfAbsent(chunkKey, key -> CompletableFuture.supplyAsync(() -> { - try { - final InputStream chunkStream = - chunkManager.getChunk(segmentKey, segmentManifest, chunk.id); - return this.cacheChunk(chunkKey, chunkStream); - } catch (final StorageBackendException | IOException e) { - throw new CompletionException(e); - } - }, executor)); - }); - } - } + void supplyIfAbsent(final ChunkKey chunkKey, + final Supplier> chunkSupplier); } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java index 1b5e097f2..0462ce024 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfig.java @@ -33,12 +33,6 @@ public class ChunkCacheConfig extends AbstractConfig { + "where \"-1\" represents infinite retention"; private static final long DEFAULT_CACHE_RETENTION_MS = 600_000; - private static final String CACHE_PREFETCH_MAX_SIZE_CONFIG = "prefetch.max.size"; - private static final String CACHE_PREFETCH_MAX_SIZE_DOC = - "The amount of data that should be eagerly prefetched and cached"; - - private static final int CACHE_PREFETCHING_SIZE_DEFAULT = 0; //TODO find out what it should be - private static ConfigDef addCacheConfigs(final ConfigDef configDef) { configDef.define( CACHE_SIZE_CONFIG, @@ -56,14 +50,6 @@ private static ConfigDef addCacheConfigs(final ConfigDef configDef) { ConfigDef.Importance.MEDIUM, CACHE_RETENTION_DOC ); - configDef.define( - CACHE_PREFETCH_MAX_SIZE_CONFIG, - ConfigDef.Type.INT, - CACHE_PREFETCHING_SIZE_DEFAULT, - ConfigDef.Range.between(0, Integer.MAX_VALUE), - ConfigDef.Importance.MEDIUM, - CACHE_PREFETCH_MAX_SIZE_DOC - ); return configDef; } @@ -86,8 +72,4 @@ public Optional cacheRetention() { } return Optional.of(Duration.ofMillis(rawValue)); } - - public int cachePrefetchingSize() { - return getInt(CACHE_PREFETCH_MAX_SIZE_CONFIG); - } } diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCache.java index 34fe38d90..00505c2b8 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCache.java @@ -23,7 +23,6 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Weigher; @@ -32,16 +31,12 @@ import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; -public class DiskBasedChunkCache extends ChunkCache { +public class DiskBasedChunkCache extends AbstractChunkCache { private static final Logger log = LoggerFactory.getLogger(DiskBasedChunkCache.class); private DiskBasedChunkCacheConfig config; - public DiskBasedChunkCache(final ChunkManager chunkManager) { - super(chunkManager); - } - @Override public InputStream cachedChunkToInputStream(final Path cachedChunk) { try { diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/InMemoryChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/InMemoryChunkCache.java index b8285a730..cc54e1235 100644 --- a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/InMemoryChunkCache.java +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/InMemoryChunkCache.java @@ -24,20 +24,15 @@ import org.apache.kafka.common.config.ConfigDef; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Weigher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class InMemoryChunkCache extends ChunkCache { +public class InMemoryChunkCache extends AbstractChunkCache { private static final Logger log = LoggerFactory.getLogger(InMemoryChunkCache.class); - public InMemoryChunkCache(final ChunkManager chunkManager) { - super(chunkManager); - } - @Override public InputStream cachedChunkToInputStream(final byte[] cachedChunk) { return new ByteArrayInputStream(cachedChunk); diff --git a/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/NoOpChunkCache.java b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/NoOpChunkCache.java new file mode 100644 index 000000000..974833539 --- /dev/null +++ b/core/src/main/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/NoOpChunkCache.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.chunkmanager.cache; + +import java.io.InputStream; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; + +import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; + +public class NoOpChunkCache extends BaseChunkCache { + @Override + public void configure(final Map configs) { + // no-op + } + + @Override + protected InputStream getChunk0(final ChunkKey chunkKey, + final Supplier> chunkSupplier + ) throws ExecutionException, InterruptedException, TimeoutException { + return chunkSupplier.get().get(); + } + + @Override + public void supplyIfAbsent(final ChunkKey chunkKey, final Supplier> chunkSupplier) { + // no-op + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfigTest.java index b24850acb..4e95b0eb5 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryConfigTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.ConfigException; import io.aiven.kafka.tieredstorage.chunkmanager.cache.ChunkCache; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.NoOpChunkCache; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -53,6 +54,18 @@ void validCacheClass(final String cacheClass) { @Test void defaultConfig() { final ChunkManagerFactoryConfig config = new ChunkManagerFactoryConfig(Map.of()); - assertThat(config.cacheClass()).isNull(); + assertThat(config.cacheClass()).isSameAs(NoOpChunkCache.class); + assertThat(config.cachePrefetchingSize()).isEqualTo(0); + } + + @Test + void invalidPrefetchingSize() { + assertThatThrownBy(() -> new ChunkManagerFactoryConfig( + Map.of( + "chunk.cache.class", "io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache", + "chunk.cache.prefetch.max.size", "-1" + ) + )).isInstanceOf(ConfigException.class) + .hasMessage("Invalid value -1 for configuration chunk.cache.prefetch.max.size: Value must be at least 0"); } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java index 99108780d..1a12979a2 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/ChunkManagerFactoryTest.java @@ -68,9 +68,9 @@ void cachingChunkManagers(final Class> cls) { ) ); try (final MockedConstruction ignored = mockConstruction(cls)) { - final ChunkManager chunkManager = chunkManagerFactory.initChunkManager(null, null); - assertThat(chunkManager).isInstanceOf(cls); - verify((ChunkCache) chunkManager).configure(Map.of( + final var chunkManager = (DefaultChunkManager) + chunkManagerFactory.initChunkManager(null, null); + verify(cls.cast(chunkManager.chunkCache)).configure(Map.of( "class", cls, "size", 10, "retention.ms", 10 diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java index fbf702929..29f584c2d 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/DefaultChunkManagerTest.java @@ -19,10 +19,14 @@ import javax.crypto.Cipher; import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Map; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; import io.aiven.kafka.tieredstorage.AesKeyAwareTest; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.InMemoryChunkCache; +import io.aiven.kafka.tieredstorage.chunkmanager.cache.NoOpChunkCache; import io.aiven.kafka.tieredstorage.manifest.SegmentEncryptionMetadataV1; import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1; import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; @@ -32,15 +36,25 @@ import io.aiven.kafka.tieredstorage.security.DataKeyAndAAD; import io.aiven.kafka.tieredstorage.storage.ObjectKey; import io.aiven.kafka.tieredstorage.storage.StorageBackend; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; import com.github.luben.zstd.ZstdCompressCtx; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.description; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) @@ -63,7 +77,7 @@ void testGetChunk() throws Exception { final FixedSizeChunkIndex chunkIndex = new FixedSizeChunkIndex(10, 10, 10, 10); final SegmentManifest manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, null); + final ChunkManager chunkManager = new DefaultChunkManager(storage, null, new NoOpChunkCache(), 0); when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) .thenReturn(new ByteArrayInputStream("0123456789".getBytes())); @@ -89,7 +103,8 @@ void testGetChunkWithEncryption() throws Exception { final var encryption = new SegmentEncryptionMetadataV1(dataKeyAndAAD.dataKey, dataKeyAndAAD.aad); final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, encryption, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, aesEncryptionProvider); + final ChunkManager chunkManager = new DefaultChunkManager( + storage, aesEncryptionProvider, new NoOpChunkCache(), 0); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); @@ -109,9 +124,107 @@ void testGetChunkWithCompression() throws Exception { .thenReturn(new ByteArrayInputStream(compressed)); final var manifest = new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, true, null, null); - final ChunkManager chunkManager = new DefaultChunkManager(storage, null); + final ChunkManager chunkManager = new DefaultChunkManager(storage, null, new NoOpChunkCache(), 0); assertThat(chunkManager.getChunk(OBJECT_KEY, manifest, 0)).hasBinaryContent(TEST_CHUNK_CONTENT); verify(storage).fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); } + + @Nested + class PrefetchTests { + private final byte[] chunk0 = "0000000000".getBytes(); + private final byte[] chunk1 = "1111111111".getBytes(); + private final byte[] chunk2 = "2222222222".getBytes(); + private final int chunkSize = chunk0.length; + private final int fileSize = chunkSize * 3; + + private final FixedSizeChunkIndex chunkIndex = + new FixedSizeChunkIndex(chunkSize, fileSize, chunkSize, chunkSize); + private final SegmentManifest manifest = + new SegmentManifestV1(chunkIndex, SEGMENT_INDEXES, false, null, null); + + InMemoryChunkCache chunkCache; + + @BeforeEach + void setUp() throws StorageBackendException { + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range())) + .thenAnswer(i -> new ByteArrayInputStream(chunk0)); + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(1).range())) + .thenAnswer(i -> new ByteArrayInputStream(chunk1)); + when(storage.fetch(OBJECT_KEY, chunkIndex.chunks().get(2).range())) + .thenAnswer(i -> new ByteArrayInputStream(chunk2)); + + chunkCache = spy(new InMemoryChunkCache()); + chunkCache.configure(Map.of("size", "1000")); + } + + @Test + void nextChunk() throws Exception { + final var chunkManager = new DefaultChunkManager(storage, null, chunkCache, chunkSize); + + chunkManager.getChunk(OBJECT_KEY, manifest, 0); + + verify(chunkCache, timeout(10000).times(1)) + .supplyIfAbsent(eq(new ChunkKey(OBJECT_KEY.value(), 1)), any()); + + verify(storage, description("first chunk was fetched from remote")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); + verify(storage, description("second chunk was prefetched")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(1).range()); + verify(storage, never().description("third chunk was not prefetched ")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(2).range()); + verifyNoMoreInteractions(storage); + + final InputStream cachedChunk0 = chunkManager.getChunk(OBJECT_KEY, manifest, 0); + assertThat(cachedChunk0).hasBinaryContent(chunk0); + verifyNoMoreInteractions(storage); + + // checking that third chunk is prefetch when fetching chunk 1 + final InputStream cachedChunk1 = chunkManager.getChunk(OBJECT_KEY, manifest, 1); + assertThat(cachedChunk1).hasBinaryContent(chunk1); + + verify(chunkCache, timeout(10000).times(1)) + .supplyIfAbsent(eq(new ChunkKey(OBJECT_KEY.value(), 2)), any()); + + verify(storage, description("third chunk was prefetched")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(2).range()); + verifyNoMoreInteractions(storage); + } + + @Test + void prefetchingWholeSegment() throws Exception { + final var chunkManager = + new DefaultChunkManager(storage, null, chunkCache, fileSize - chunkSize); + + chunkManager.getChunk(OBJECT_KEY, manifest, 0); + + verify(chunkCache, timeout(10000).times(1)) + .supplyIfAbsent(eq(new ChunkKey(OBJECT_KEY.value(), 1)), any()); + verify(chunkCache, timeout(10000).times(1)) + .supplyIfAbsent(eq(new ChunkKey(OBJECT_KEY.value(), 2)), any()); + + verify(storage, description("first chunk was fetched from remote")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(0).range()); + verify(storage, description("second chunk was prefetched")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(1).range()); + verify(storage, description("third chunk was prefetched")) + .fetch(OBJECT_KEY, chunkIndex.chunks().get(2).range()); + verifyNoMoreInteractions(storage); + + // no fetching from remote since chunk 0 is cached + final InputStream cachedChunk0 = chunkManager.getChunk(OBJECT_KEY, manifest, 0); + assertThat(cachedChunk0).hasBinaryContent(chunk0); + verifyNoMoreInteractions(storage); + + // no fetching from remote since chunk 1 is cached + final InputStream cachedChunk1 = chunkManager.getChunk(OBJECT_KEY, manifest, 1); + assertThat(cachedChunk1).hasBinaryContent(chunk1); + verifyNoMoreInteractions(storage); + + // no fetching from remote since chunk 2 is cached + final InputStream cachedChunk2 = chunkManager.getChunk(OBJECT_KEY, manifest, 2); + assertThat(cachedChunk2).hasBinaryContent(chunk2); + verifyNoMoreInteractions(storage); + } + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheMetricsTest.java similarity index 72% rename from core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java rename to core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheMetricsTest.java index aa818fb42..17e5a60f5 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheMetricsTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheMetricsTest.java @@ -20,50 +20,37 @@ import javax.management.ObjectName; import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.lang.management.ManagementFactory; import java.nio.file.Path; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.stream.Stream; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; -import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; -import io.aiven.kafka.tieredstorage.manifest.index.ChunkIndex; -import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; -import io.aiven.kafka.tieredstorage.storage.ObjectKey; +import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.InstanceOfAssertFactories.DOUBLE; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.Mockito.when; /** * Tests metrics gathering on Chunk Cache implementations */ @ExtendWith(MockitoExtension.class) -class ChunkCacheMetricsTest { - private static final ChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex(10, 10, 10, 10); +class AbstractChunkCacheMetricsTest { static final MBeanServer MBEAN_SERVER = ManagementFactory.getPlatformMBeanServer(); - public static final ObjectKey OBJECT_KEY_PATH = () -> "topic/segment"; + static final ChunkKey CHUNK_KEY = new ChunkKey("topic/segment", 0); @TempDir static Path baseCachePath; - @Mock - ChunkManager chunkManager; - @Mock - SegmentManifest segmentManifest; - private static Stream caches() { return Stream.of( Arguments.of( @@ -83,32 +70,24 @@ private static Stream caches() { )); } - @BeforeEach - void setUp() { - when(segmentManifest.chunkIndex()).thenReturn(FIXED_SIZE_CHUNK_INDEX); - } - @ParameterizedTest(name = "Cache {0}") @MethodSource("caches") - void shouldRecordMetrics(final Class> chunkCacheClass, final Map config) + void shouldRecordMetrics(final Class> chunkCacheClass, final Map config) throws Exception { - // Given a chunk cache implementation - when(chunkManager.getChunk(any(), any(), anyInt())) - .thenReturn(new ByteArrayInputStream("test".getBytes())); - - final var chunkCache = chunkCacheClass.getDeclaredConstructor(ChunkManager.class).newInstance(chunkManager); + final var chunkCache = chunkCacheClass.getDeclaredConstructor().newInstance(); chunkCache.configure(config); + final var objectName = new ObjectName("aiven.kafka.server.tieredstorage.cache:type=chunk-cache"); // When getting a existing chunk from cache - chunkCache.getChunk(OBJECT_KEY_PATH, segmentManifest, 0); + chunkCache.getChunk(CHUNK_KEY, this::testChunkSupplier); // check cache size increases after first miss assertThat(MBEAN_SERVER.getAttribute(objectName, "cache-size-total")) .isEqualTo(1.0); - chunkCache.getChunk(OBJECT_KEY_PATH, segmentManifest, 0); + chunkCache.getChunk(CHUNK_KEY, this::testChunkSupplier); // Then the following metrics should be available assertThat(MBEAN_SERVER.getAttribute(objectName, "cache-hits-total")) @@ -133,4 +112,10 @@ void shouldRecordMetrics(final Class> chunkCacheClass, final Map testChunkSupplier() { + return CompletableFuture.completedFuture( + new ByteArrayInputStream("test".getBytes()) + ); + } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheTest.java new file mode 100644 index 000000000..2adfac1d1 --- /dev/null +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/AbstractChunkCacheTest.java @@ -0,0 +1,260 @@ +/* + * Copyright 2023 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.tieredstorage.chunkmanager.cache; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; +import io.aiven.kafka.tieredstorage.storage.StorageBackendException; + +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mockingDetails; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class AbstractChunkCacheTest { + + private static final byte[] CHUNK_0 = "0123456789".getBytes(); + private static final byte[] CHUNK_1 = "1011121314".getBytes(); + + private static final String TEST_EXCEPTION_MESSAGE = "test_message"; + + private static final ChunkKey CHUNK_KEY_0 = new ChunkKey("topic/segment", 0); + private static final ChunkKey CHUNK_KEY_1 = new ChunkKey("topic/segment", 1); + + @Mock + private Supplier> chunk0Supplier; + @Mock + private Supplier> chunk1Supplier; + private AbstractChunkCache chunkCache; + + @BeforeEach + void setUp() { + chunkCache = spy(new InMemoryChunkCache()); + } + + @AfterEach + void tearDown() { + reset(chunk0Supplier); + reset(chunk1Supplier); + } + + @Nested + class CacheTests { + @Mock + RemovalListener removalListener; + + @BeforeEach + void setUp() { + doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); + + // The second invocation is needed for some tests. + when(chunk0Supplier.get()) + .thenAnswer(i -> CompletableFuture.completedFuture(new ByteArrayInputStream(CHUNK_0))); + when(chunk1Supplier.get()) + .thenAnswer(i -> CompletableFuture.completedFuture(new ByteArrayInputStream(CHUNK_1))); + } + + @Test + void noEviction() throws IOException, StorageBackendException { + chunkCache.configure(Map.of( + "retention.ms", "-1", + "size", "-1" + )); + + final InputStream chunk0 = chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier); + assertThat(chunk0).hasBinaryContent(CHUNK_0); + verify(chunk0Supplier).get(); + final InputStream cachedChunk0 = chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier); + assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(chunk0Supplier); + + final InputStream chunk1 = chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier); + assertThat(chunk1).hasBinaryContent(CHUNK_1); + verify(chunk1Supplier).get(); + final InputStream cachedChunk1 = chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier); + assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); + verifyNoMoreInteractions(chunk1Supplier); + + verifyNoInteractions(removalListener); + } + + @Test + void timeBasedEviction() throws IOException, StorageBackendException, InterruptedException { + chunkCache.configure(Map.of( + "retention.ms", "100", + "size", "-1" + )); + + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + verify(chunk0Supplier).get(); + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(chunk0Supplier); + + Thread.sleep(100); + + assertThat(chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier)) + .hasBinaryContent(CHUNK_1); + verify(chunk1Supplier).get(); + assertThat(chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier)) + .hasBinaryContent(CHUNK_1); + verifyNoMoreInteractions(chunk1Supplier); + + await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100)) + .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); + + verify(removalListener) + .onRemoval( + argThat(argument -> argument.chunkId == 0), + any(), + eq(RemovalCause.EXPIRED)); + + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + verify(chunk0Supplier, times(2)).get(); + } + + @Test + void sizeBasedEviction() throws IOException, StorageBackendException { + chunkCache.configure(Map.of( + "retention.ms", "-1", + "size", "18" + )); + + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + verify(chunk0Supplier).get(); + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + verifyNoMoreInteractions(chunk0Supplier); + + assertThat(chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier)) + .hasBinaryContent(CHUNK_1); + verify(chunk1Supplier).get(); + + await().atMost(Duration.ofMillis(5000)) + .pollDelay(Duration.ofSeconds(2)) + .pollInterval(Duration.ofMillis(10)) + .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); + + verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); + + assertThat(chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier)) + .hasBinaryContent(CHUNK_0); + assertThat(chunkCache.getChunk(CHUNK_KEY_1, chunk1Supplier)) + .hasBinaryContent(CHUNK_1); + verify(chunk0Supplier, times(1)).get(); + verify(chunk1Supplier, times(2)).get(); + } + } + + @Nested + class ErrorHandlingTests { + private final Map configs = Map.of( + "retention.ms", "-1", + "size", "-1" + ); + + @BeforeEach + void setUp() { + chunkCache.configure(configs); + } + + @Test + void failedSupply() { + when(chunk0Supplier.get()) + .thenReturn(CompletableFuture.failedFuture(new StorageBackendException(TEST_EXCEPTION_MESSAGE))) + .thenReturn(CompletableFuture.failedFuture(new IOException(TEST_EXCEPTION_MESSAGE))); + + assertThatThrownBy(() -> chunkCache + .getChunk(CHUNK_KEY_0, chunk0Supplier)) + .isInstanceOf(StorageBackendException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); + assertThatThrownBy(() -> chunkCache + .getChunk(CHUNK_KEY_0, chunk0Supplier)) + .isInstanceOf(IOException.class) + .hasMessage(TEST_EXCEPTION_MESSAGE); + } + + @Test + void failedReadingCachedValueWithInterruptedException() throws Exception { + when(chunk0Supplier.get()) + .thenReturn(CompletableFuture.completedFuture(new ByteArrayInputStream(CHUNK_0))); + + doCallRealMethod().doAnswer(invocation -> { + throw new InterruptedException(TEST_EXCEPTION_MESSAGE); + }).when(chunkCache).cachedChunkToInputStream(any()); + + chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier); + assertThatThrownBy(() -> chunkCache + .getChunk(CHUNK_KEY_0, chunk0Supplier)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(InterruptedException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + } + + @Test + void failedReadingCachedValueWithExecutionException() throws Exception { + when(chunk0Supplier.get()) + .thenReturn(CompletableFuture.completedFuture(new ByteArrayInputStream(CHUNK_0))); + doCallRealMethod().doAnswer(invocation -> { + throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); + }).when(chunkCache).cachedChunkToInputStream(any()); + + chunkCache.getChunk(CHUNK_KEY_0, chunk0Supplier); + assertThatThrownBy(() -> chunkCache + .getChunk(CHUNK_KEY_0, chunk0Supplier)) + .isInstanceOf(RuntimeException.class) + .hasCauseInstanceOf(ExecutionException.class) + .hasRootCauseInstanceOf(RuntimeException.class) + .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); + } + } +} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java index d87bfb4be..89db4345c 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheConfigTest.java @@ -37,7 +37,6 @@ void defaults() { ); assertThat(config.cacheRetention()).hasValue(Duration.ofMinutes(10)); - assertThat(config.cachePrefetchingSize()).isEqualTo(0); } @Test @@ -111,16 +110,4 @@ void invalidRetention() { )).isInstanceOf(ConfigException.class) .hasMessage("Invalid value -2 for configuration retention.ms: Value must be at least -1"); } - - @Test - void invalidPrefetchingSize() { - assertThatThrownBy(() -> new ChunkCacheConfig( - new ConfigDef(), - Map.of( - "size", "-1", - "prefetch.max.size", "-1" - ) - )).isInstanceOf(ConfigException.class) - .hasMessage("Invalid value -1 for configuration prefetch.max.size: Value must be at least 0"); - } } diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java deleted file mode 100644 index 981aed359..000000000 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/ChunkCacheTest.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Copyright 2023 Aiven Oy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.aiven.kafka.tieredstorage.chunkmanager.cache; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; - -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; -import io.aiven.kafka.tieredstorage.manifest.SegmentIndexesV1; -import io.aiven.kafka.tieredstorage.manifest.SegmentManifest; -import io.aiven.kafka.tieredstorage.manifest.SegmentManifestV1; -import io.aiven.kafka.tieredstorage.manifest.index.FixedSizeChunkIndex; -import io.aiven.kafka.tieredstorage.storage.ObjectKey; -import io.aiven.kafka.tieredstorage.storage.StorageBackendException; - -import com.github.benmanes.caffeine.cache.RemovalCause; -import com.github.benmanes.caffeine.cache.RemovalListener; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.awaitility.Awaitility.await; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyInt; -import static org.mockito.Mockito.argThat; -import static org.mockito.Mockito.description; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doCallRealMethod; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mockingDetails; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class ChunkCacheTest { - - private static final byte[] CHUNK_0 = "0123456789".getBytes(); - private static final byte[] CHUNK_1 = "1011121314".getBytes(); - private static final byte[] CHUNK_2 = "1011121314".getBytes(); - private static final int ORIGINAL_CHUNK_SIZE = 10; - private static final int ORIGINAL_FILE_SIZE = 30; - private static final FixedSizeChunkIndex FIXED_SIZE_CHUNK_INDEX = new FixedSizeChunkIndex( - ORIGINAL_CHUNK_SIZE, - ORIGINAL_FILE_SIZE, - 10, - 10 - ); - private static final SegmentIndexesV1 SEGMENT_INDEXES = SegmentIndexesV1.builder() - .add(IndexType.OFFSET, 1) - .add(IndexType.TIMESTAMP, 1) - .add(IndexType.PRODUCER_SNAPSHOT, 1) - .add(IndexType.LEADER_EPOCH, 1) - .add(IndexType.TRANSACTION, 1) - .build(); - - private static final SegmentManifest SEGMENT_MANIFEST = - new SegmentManifestV1(FIXED_SIZE_CHUNK_INDEX, SEGMENT_INDEXES, false, null, null); - private static final String TEST_EXCEPTION_MESSAGE = "test_message"; - private static final String SEGMENT_KEY = "topic/segment"; - private static final ObjectKey SEGMENT_OBJECT_KEY = () -> SEGMENT_KEY; - - @Mock - private ChunkManager chunkManager; - private ChunkCache chunkCache; - - @BeforeEach - void setUp() { - chunkCache = spy(new InMemoryChunkCache(chunkManager)); - } - - @AfterEach - void tearDown() { - reset(chunkManager); - } - - @Nested - class CacheTests { - @Mock - RemovalListener removalListener; - - @BeforeEach - void setUp() throws Exception { - doAnswer(invocation -> removalListener).when(chunkCache).removalListener(); - when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_0)); - when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_1)); - when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2)) - .thenAnswer(invocation -> new ByteArrayInputStream(CHUNK_2)); - } - - @Test - void noEviction() throws IOException, StorageBackendException { - chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "-1" - )); - - final InputStream chunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunk0).hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); - verifyNoMoreInteractions(chunkManager); - - final InputStream chunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunk1).hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); - verifyNoMoreInteractions(chunkManager); - - verifyNoInteractions(removalListener); - } - - @Test - void timeBasedEviction() throws IOException, StorageBackendException, InterruptedException { - chunkCache.configure(Map.of( - "retention.ms", "100", - "size", "-1" - )); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verifyNoMoreInteractions(chunkManager); - - Thread.sleep(100); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verifyNoMoreInteractions(chunkManager); - - await().atMost(Duration.ofMillis(5000)).pollInterval(Duration.ofMillis(100)) - .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); - - verify(removalListener) - .onRemoval( - argThat(argument -> argument.chunkId == 0), - any(), - eq(RemovalCause.EXPIRED)); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager, times(2)).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - } - - @Test - void sizeBasedEviction() throws IOException, StorageBackendException { - chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "18" - )); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - verifyNoMoreInteractions(chunkManager); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager).getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - - await().atMost(Duration.ofMillis(5000)) - .pollDelay(Duration.ofSeconds(2)) - .pollInterval(Duration.ofMillis(10)) - .until(() -> !mockingDetails(removalListener).getInvocations().isEmpty()); - - verify(removalListener).onRemoval(any(ChunkKey.class), any(), eq(RemovalCause.SIZE)); - - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .hasBinaryContent(CHUNK_0); - assertThat(chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .hasBinaryContent(CHUNK_1); - verify(chunkManager, times(3)).getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt()); - } - - @Test - void prefetchingNextChunk() throws Exception { - chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "-1", - "prefetch.max.size", ORIGINAL_CHUNK_SIZE - )); - chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 2); - verify(chunkManager, description("first chunk was fetched from remote")) - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - verify(chunkManager, description("second chunk was prefetched")) - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - verify(chunkManager, never().description("third chunk was not prefetched ")) - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2); - - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); - verifyNoMoreInteractions(chunkManager); - - // checking that third chunk is prefetch when fetching chunk 1 from cache - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); - await("waiting for prefetching to finish").pollInterval(Duration.ofMillis(5)) - .until(() -> chunkCache.statsCounter.snapshot().loadCount() == 5); - verify(chunkManager, description("third chunk was prefetched")) - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2); - verifyNoMoreInteractions(chunkManager); - } - - @Test - void prefetchingWholeSegment() throws Exception { - chunkCache.configure(Map.of( - "retention.ms", "-1", - "size", "-1", - "prefetch.max.size", ORIGINAL_FILE_SIZE - ORIGINAL_CHUNK_SIZE - )); - chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - await().pollInterval(Duration.ofMillis(5)).until(() -> chunkCache.statsCounter.snapshot().loadCount() == 3); - // verifying fetching for all 3 chunks(2 prefetched) - verify(chunkManager, times(3)).getChunk(any(), any(), anyInt()); - - // no fetching from remote since chunk 0 is cached - final InputStream cachedChunk0 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThat(cachedChunk0).hasBinaryContent(CHUNK_0); - verifyNoMoreInteractions(chunkManager); - - // no fetching from remote since chunk 1 is cached - final InputStream cachedChunk1 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1); - assertThat(cachedChunk1).hasBinaryContent(CHUNK_1); - verifyNoMoreInteractions(chunkManager); - - // no fetching from remote since chunk 2 is cached - final InputStream cachedChunk2 = chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 2); - assertThat(cachedChunk2).hasBinaryContent(CHUNK_2); - verifyNoMoreInteractions(chunkManager); - } - } - - @Nested - class ErrorHandlingTests { - private final Map configs = Map.of( - "retention.ms", "-1", - "size", "-1" - ); - - @BeforeEach - void setUp() { - chunkCache.configure(configs); - } - - @Test - void failedFetching() throws Exception { - when(chunkManager.getChunk(eq(SEGMENT_OBJECT_KEY), eq(SEGMENT_MANIFEST), anyInt())) - .thenThrow(new StorageBackendException(TEST_EXCEPTION_MESSAGE)) - .thenThrow(new IOException(TEST_EXCEPTION_MESSAGE)); - - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(StorageBackendException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 1)) - .isInstanceOf(IOException.class) - .hasMessage(TEST_EXCEPTION_MESSAGE); - } - - @Test - void failedReadingCachedValueWithInterruptedException() throws Exception { - when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .thenReturn(new ByteArrayInputStream(CHUNK_0)); - - doCallRealMethod().doAnswer(invocation -> { - throw new InterruptedException(TEST_EXCEPTION_MESSAGE); - }).when(chunkCache).cachedChunkToInputStream(any()); - - chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(InterruptedException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); - } - - @Test - void failedReadingCachedValueWithExecutionException() throws Exception { - when(chunkManager.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)).thenReturn( - new ByteArrayInputStream(CHUNK_0)); - doCallRealMethod().doAnswer(invocation -> { - throw new ExecutionException(new RuntimeException(TEST_EXCEPTION_MESSAGE)); - }).when(chunkCache).cachedChunkToInputStream(any()); - - chunkCache.getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0); - assertThatThrownBy(() -> chunkCache - .getChunk(SEGMENT_OBJECT_KEY, SEGMENT_MANIFEST, 0)) - .isInstanceOf(RuntimeException.class) - .hasCauseInstanceOf(ExecutionException.class) - .hasRootCauseInstanceOf(RuntimeException.class) - .hasRootCauseMessage(TEST_EXCEPTION_MESSAGE); - } - } -} diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java index 96c0a0e56..15e4e50fc 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/chunkmanager/cache/DiskBasedChunkCacheTest.java @@ -23,7 +23,6 @@ import java.util.Map; import io.aiven.kafka.tieredstorage.chunkmanager.ChunkKey; -import io.aiven.kafka.tieredstorage.chunkmanager.ChunkManager; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; @@ -32,7 +31,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; @@ -43,7 +41,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.CALLS_REAL_METHODS; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -54,12 +51,10 @@ class DiskBasedChunkCacheTest { private static final byte[] CHUNK_0 = "0123456789".getBytes(); private static final byte[] CHUNK_1 = "1011121314".getBytes(); private static final String TEST_EXCEPTION_MESSAGE = "test_message"; - @Mock - ChunkManager chunkManager; @TempDir Path baseCachePath; - DiskBasedChunkCache diskBasedChunkCache = new DiskBasedChunkCache(chunkManager); + DiskBasedChunkCache diskBasedChunkCache = new DiskBasedChunkCache(); private Path cachePath; private Path tempCachePath; @@ -212,9 +207,7 @@ void removalFails() throws IOException { @Test void cacheInitialized() { - final DiskBasedChunkCache spy = spy( - new DiskBasedChunkCache(mock(ChunkManager.class)) - ); + final DiskBasedChunkCache spy = spy(new DiskBasedChunkCache()); final Map configs = Map.of( "retention.ms", "-1", "size", "-1", diff --git a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java index ff9fb16e3..d9a786f69 100644 --- a/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java +++ b/core/src/test/java/io/aiven/kafka/tieredstorage/transform/FetchChunkEnumerationTest.java @@ -17,6 +17,7 @@ package io.aiven.kafka.tieredstorage.transform; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.NoSuchElementException; import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType; @@ -105,7 +106,7 @@ void endPositionIsOutsideIndex() { // - Single chunk @Test - void shouldReturnRangeFromSingleChunk() throws StorageBackendException { + void shouldReturnRangeFromSingleChunk() throws StorageBackendException, IOException { // Given a set of 10 chunks with 10 bytes each // When final int from = 32; @@ -123,7 +124,7 @@ void shouldReturnRangeFromSingleChunk() throws StorageBackendException { // - Multiple chunks @Test - void shouldReturnRangeFromMultipleChunks() throws StorageBackendException { + void shouldReturnRangeFromMultipleChunks() throws StorageBackendException, IOException { // Given a set of 10 chunks with 10 bytes each // When final int from = 15;