Skip to content

Commit

Permalink
Refactor memory modules for JPMS support
Browse files Browse the repository at this point in the history
Based on apache#13072
- Avoid having multiple memory modules contribute to the same package
- Introduce memory-netty-buffer-patch module for patching classes
into Netty modules
- Avoid using BaseAllocator in tests and use RootAllocator instead
- Move TestBaseAllocator#testMemoryUsage() to a new test in
memory-netty TestNettyAllocator because TestBaseAllocator is now in
memory-core, but that specific test has Netty dependencies.
  • Loading branch information
jduo committed Nov 30, 2023
1 parent 0b49cfa commit 32c5840
Show file tree
Hide file tree
Showing 30 changed files with 314 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,35 @@
*/
final class CheckAllocator {
private static final Logger logger = LoggerFactory.getLogger(CheckAllocator.class);
private static final String ALLOCATOR_PATH = "org/apache/arrow/memory/DefaultAllocationManagerFactory.class";
// unique package names needed by JPMS module naming
private static final String ALLOCATOR_PATH_CORE =
"org/apache/arrow/memory/DefaultAllocationManagerFactory.class";
private static final String ALLOCATOR_PATH_UNSAFE =
"org/apache/arrow/memory/unsafe/DefaultAllocationManagerFactory.class";
private static final String ALLOCATOR_PATH_NETTY =
"org/apache/arrow/memory/netty/DefaultAllocationManagerFactory.class";

private CheckAllocator() {

}

static String check() {
Set<URL> urls = scanClasspath();
URL rootAllocator = assertOnlyOne(urls);
reportResult(rootAllocator);
return "org.apache.arrow.memory.DefaultAllocationManagerFactory";
if (rootAllocator.getPath().contains("memory-core") ||
rootAllocator.getPath().contains("/org/apache/arrow/memory/core/")) {
return "org.apache.arrow.memory.DefaultAllocationManagerFactory";
} else if (rootAllocator.getPath().contains("memory-unsafe") ||
rootAllocator.getPath().contains("/org/apache/arrow/memory/unsafe/")) {
return "org.apache.arrow.memory.unsafe.DefaultAllocationManagerFactory";
} else if (rootAllocator.getPath().contains("memory-netty") ||
rootAllocator.getPath().contains("/org/apache/arrow/memory/netty/")) {
return "org.apache.arrow.memory.netty.DefaultAllocationManagerFactory";
} else {
throw new IllegalStateException("Unknown allocation manager type to infer. Current: " + rootAllocator.getPath());
}
}


private static Set<URL> scanClasspath() {
// LinkedHashSet appropriate here because it preserves insertion order
// during iteration
Expand All @@ -53,9 +68,21 @@ private static Set<URL> scanClasspath() {
ClassLoader allocatorClassLoader = CheckAllocator.class.getClassLoader();
Enumeration<URL> paths;
if (allocatorClassLoader == null) {
paths = ClassLoader.getSystemResources(ALLOCATOR_PATH);
paths = ClassLoader.getSystemResources(ALLOCATOR_PATH_CORE);
if (!paths.hasMoreElements()) {
paths = ClassLoader.getSystemResources(ALLOCATOR_PATH_UNSAFE);
}
if (!paths.hasMoreElements()) {
paths = ClassLoader.getSystemResources(ALLOCATOR_PATH_NETTY);
}
} else {
paths = allocatorClassLoader.getResources(ALLOCATOR_PATH);
paths = allocatorClassLoader.getResources(ALLOCATOR_PATH_CORE);
if (!paths.hasMoreElements()) {
paths = allocatorClassLoader.getResources(ALLOCATOR_PATH_UNSAFE);
}
if (!paths.hasMoreElements()) {
paths = allocatorClassLoader.getResources(ALLOCATOR_PATH_NETTY);
}
}
while (paths.hasMoreElements()) {
URL path = paths.nextElement();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.lang.reflect.Field;

import org.apache.arrow.util.VisibleForTesting;

/**
* A class for choosing the default allocation manager.
*/
Expand Down Expand Up @@ -61,7 +63,12 @@ public enum AllocationManagerType {
Unknown,
}

static AllocationManagerType getDefaultAllocationManagerType() {
/**
* Returns the default allocation manager type.
* @return the default allocation manager type.
*/
@VisibleForTesting
public static AllocationManagerType getDefaultAllocationManagerType() {
AllocationManagerType ret = AllocationManagerType.Unknown;

try {
Expand Down Expand Up @@ -115,7 +122,7 @@ private static AllocationManager.Factory getFactory(String clazzName) {

private static AllocationManager.Factory getUnsafeFactory() {
try {
return getFactory("org.apache.arrow.memory.UnsafeAllocationManager");
return getFactory("org.apache.arrow.memory.unsafe.UnsafeAllocationManager");
} catch (RuntimeException e) {
throw new RuntimeException("Please add arrow-memory-unsafe to your classpath," +
" No DefaultAllocationManager found to instantiate an UnsafeAllocationManager", e);
Expand All @@ -124,7 +131,7 @@ private static AllocationManager.Factory getUnsafeFactory() {

private static AllocationManager.Factory getNettyFactory() {
try {
return getFactory("org.apache.arrow.memory.NettyAllocationManager");
return getFactory("org.apache.arrow.memory.netty.NettyAllocationManager");
} catch (RuntimeException e) {
throw new RuntimeException("Please add arrow-memory-netty to your classpath," +
" No DefaultAllocationManager found to instantiate an NettyAllocationManager", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package org.apache.arrow.memory;

import org.apache.arrow.memory.AllocationListener;
import org.apache.arrow.memory.AllocationOutcome;
import org.apache.arrow.memory.BufferAllocator;

/**
* Counting allocation listener.
* It counts the number of times it has been invoked, and how much memory allocation it has seen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,16 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.stream.Collectors;

import org.apache.arrow.memory.AllocationOutcomeDetails.Entry;
import org.apache.arrow.memory.rounding.RoundingPolicy;
import org.apache.arrow.memory.rounding.SegmentRoundingPolicy;
import org.apache.arrow.memory.util.AssertionUtil;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import io.netty.buffer.PooledByteBufAllocatorL;
import sun.misc.Unsafe;

public class TestBaseAllocator {
Expand Down Expand Up @@ -448,73 +442,73 @@ public ArrowBuf empty() {
@Test
public void testRootAllocator_listeners() throws Exception {
CountingAllocationListener l1 = new CountingAllocationListener();
assertEquals(0, l1.getNumPreCalls());
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getNumReleaseCalls());
assertEquals(0, l1.getNumChildren());
assertEquals(0, l1.getTotalMem());
Assert.assertEquals(0, l1.getNumPreCalls());
Assert.assertEquals(0, l1.getNumCalls());
Assert.assertEquals(0, l1.getNumReleaseCalls());
Assert.assertEquals(0, l1.getNumChildren());
Assert.assertEquals(0, l1.getTotalMem());
CountingAllocationListener l2 = new CountingAllocationListener();
assertEquals(0, l2.getNumPreCalls());
assertEquals(0, l2.getNumCalls());
assertEquals(0, l2.getNumReleaseCalls());
assertEquals(0, l2.getNumChildren());
assertEquals(0, l2.getTotalMem());
Assert.assertEquals(0, l2.getNumPreCalls());
Assert.assertEquals(0, l2.getNumCalls());
Assert.assertEquals(0, l2.getNumReleaseCalls());
Assert.assertEquals(0, l2.getNumChildren());
Assert.assertEquals(0, l2.getTotalMem());
// root and first-level child share the first listener
// second-level and third-level child share the second listener
try (final RootAllocator rootAllocator = new RootAllocator(l1, MAX_ALLOCATION)) {
try (final BufferAllocator c1 = rootAllocator.newChildAllocator("c1", 0, MAX_ALLOCATION)) {
assertEquals(1, l1.getNumChildren());
Assert.assertEquals(1, l1.getNumChildren());
final ArrowBuf buf1 = c1.buffer(16);
assertNotNull("allocation failed", buf1);
assertEquals(1, l1.getNumPreCalls());
assertEquals(1, l1.getNumCalls());
assertEquals(0, l1.getNumReleaseCalls());
assertEquals(16, l1.getTotalMem());
Assert.assertEquals(1, l1.getNumPreCalls());
Assert.assertEquals(1, l1.getNumCalls());
Assert.assertEquals(0, l1.getNumReleaseCalls());
Assert.assertEquals(16, l1.getTotalMem());
buf1.getReferenceManager().release();
try (final BufferAllocator c2 = c1.newChildAllocator("c2", l2, 0, MAX_ALLOCATION)) {
assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified
assertEquals(0, l2.getNumChildren());
Assert.assertEquals(2, l1.getNumChildren()); // c1 got a new child, so c1's listener (l1) is notified
Assert.assertEquals(0, l2.getNumChildren());
final ArrowBuf buf2 = c2.buffer(32);
assertNotNull("allocation failed", buf2);
assertEquals(1, l1.getNumCalls());
assertEquals(16, l1.getTotalMem());
assertEquals(1, l2.getNumPreCalls());
assertEquals(1, l2.getNumCalls());
assertEquals(0, l2.getNumReleaseCalls());
assertEquals(32, l2.getTotalMem());
Assert.assertEquals(1, l1.getNumCalls());
Assert.assertEquals(16, l1.getTotalMem());
Assert.assertEquals(1, l2.getNumPreCalls());
Assert.assertEquals(1, l2.getNumCalls());
Assert.assertEquals(0, l2.getNumReleaseCalls());
Assert.assertEquals(32, l2.getTotalMem());
buf2.getReferenceManager().release();
try (final BufferAllocator c3 = c2.newChildAllocator("c3", 0, MAX_ALLOCATION)) {
assertEquals(2, l1.getNumChildren());
assertEquals(1, l2.getNumChildren());
Assert.assertEquals(2, l1.getNumChildren());
Assert.assertEquals(1, l2.getNumChildren());
final ArrowBuf buf3 = c3.buffer(64);
assertNotNull("allocation failed", buf3);
assertEquals(1, l1.getNumPreCalls());
assertEquals(1, l1.getNumCalls());
assertEquals(1, l1.getNumReleaseCalls());
assertEquals(16, l1.getTotalMem());
assertEquals(2, l2.getNumPreCalls());
assertEquals(2, l2.getNumCalls());
assertEquals(1, l2.getNumReleaseCalls());
assertEquals(32 + 64, l2.getTotalMem());
Assert.assertEquals(1, l1.getNumPreCalls());
Assert.assertEquals(1, l1.getNumCalls());
Assert.assertEquals(1, l1.getNumReleaseCalls());
Assert.assertEquals(16, l1.getTotalMem());
Assert.assertEquals(2, l2.getNumPreCalls());
Assert.assertEquals(2, l2.getNumCalls());
Assert.assertEquals(1, l2.getNumReleaseCalls());
Assert.assertEquals(32 + 64, l2.getTotalMem());
buf3.getReferenceManager().release();
}
assertEquals(2, l1.getNumChildren());
assertEquals(0, l2.getNumChildren()); // third-level child removed
Assert.assertEquals(2, l1.getNumChildren());
Assert.assertEquals(0, l2.getNumChildren()); // third-level child removed
}
assertEquals(1, l1.getNumChildren()); // second-level child removed
assertEquals(0, l2.getNumChildren());
Assert.assertEquals(1, l1.getNumChildren()); // second-level child removed
Assert.assertEquals(0, l2.getNumChildren());
}
assertEquals(0, l1.getNumChildren()); // first-level child removed
Assert.assertEquals(0, l1.getNumChildren()); // first-level child removed

assertEquals(2, l2.getNumReleaseCalls());
Assert.assertEquals(2, l2.getNumReleaseCalls());
}
}

@Test
public void testRootAllocator_listenerAllocationFail() throws Exception {
CountingAllocationListener l1 = new CountingAllocationListener();
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());
Assert.assertEquals(0, l1.getNumCalls());
Assert.assertEquals(0, l1.getTotalMem());
// Test attempts to allocate too much from a child whose limit is set to half of the max
// allocation. The listener's callback triggers, expanding the child allocator's limit, so then
// the allocation succeeds.
Expand All @@ -527,14 +521,14 @@ public void testRootAllocator_listenerAllocationFail() throws Exception {
} catch (OutOfMemoryException e) {
// expected
}
assertEquals(0, l1.getNumCalls());
assertEquals(0, l1.getTotalMem());
Assert.assertEquals(0, l1.getNumCalls());
Assert.assertEquals(0, l1.getTotalMem());

l1.setExpandOnFail(c1, MAX_ALLOCATION);
ArrowBuf arrowBuf = c1.buffer(MAX_ALLOCATION);
assertNotNull("allocation failed", arrowBuf);
assertEquals(1, l1.getNumCalls());
assertEquals(MAX_ALLOCATION, l1.getTotalMem());
Assert.assertEquals(1, l1.getNumCalls());
Assert.assertEquals(MAX_ALLOCATION, l1.getTotalMem());
arrowBuf.getReferenceManager().release();
}
}
Expand Down Expand Up @@ -1098,42 +1092,6 @@ public void testMemoryLeakWithReservation() throws Exception {
}
}

@Test
public void testMemoryUsage() {
ListAppender<ILoggingEvent> memoryLogsAppender = new ListAppender<>();
Logger logger = (Logger) LoggerFactory.getLogger("arrow.allocator");
try {
logger.setLevel(Level.TRACE);
logger.addAppender(memoryLogsAppender);
memoryLogsAppender.start();
try (ArrowBuf buf = new ArrowBuf(ReferenceManager.NO_OP, null,
1024, new PooledByteBufAllocatorL().empty.memoryAddress())) {
buf.memoryAddress();
}
boolean result = false;
long startTime = System.currentTimeMillis();
while ((System.currentTimeMillis() - startTime) < 10000) { // 10 seconds maximum for time to read logs
result = memoryLogsAppender.list.stream()
.anyMatch(
log -> log.toString().contains("Memory Usage: \n") &&
log.toString().contains("Large buffers outstanding: ") &&
log.toString().contains("Normal buffers outstanding: ") &&
log.getLevel().equals(Level.TRACE)
);
if (result) {
break;
}
}
assertTrue("Log messages are:\n" +
memoryLogsAppender.list.stream().map(ILoggingEvent::toString).collect(Collectors.joining("\n")),
result);
} finally {
memoryLogsAppender.stop();
logger.detachAppender(memoryLogsAppender);
logger.setLevel(null);
}
}

@Test
public void testOverlimit() {
try (BufferAllocator allocator = new RootAllocator(1024)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@
import java.util.ArrayList;
import java.util.List;

import org.apache.arrow.memory.AllocationListener;
import org.apache.arrow.memory.AllocationOutcome;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.ForeignAllocation;
import org.apache.arrow.memory.OutOfMemoryException;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.memory.util.MemoryUtil;
import org.junit.After;
import org.junit.Before;
Expand Down
44 changes: 44 additions & 0 deletions java/memory/memory-netty-buffer-patch/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arrow-memory</artifactId>
<groupId>org.apache.arrow</groupId>
<version>15.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arrow-memory-netty-buffer-patch</artifactId>
<name>Arrow Memory - Netty Buffer</name>
<description>Netty Buffer needed to patch that is consumed by Arrow Memory Netty</description>

<dependencies>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-core</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit 32c5840

Please sign in to comment.