From aaeacb54d214470a1689c15e77a6bd9438620257 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Sat, 26 Oct 2024 21:39:38 +0800 Subject: [PATCH] release ByteBuf off heap memory block --- .../memory/allocator/MemoryAllocator.java | 4 +++- .../allocator/NettyMemoryAllocator.java | 20 ++++++++++++++++++- .../allocator/OnHeapMemoryStrategy.java | 7 ++++++- .../memory/consumer/MemoryConsumer.java | 6 ++++++ .../consumer/impl/id/BinaryIdOffHeap.java | 8 ++++++++ .../consumer/impl/id/EdgeIdOffHeap.java | 8 ++++++++ .../consumer/impl/id/LongIdOffHeap.java | 7 +++++++ .../consumer/impl/id/ObjectIdOffHeap.java | 7 +++++++ .../consumer/impl/id/QueryIdOffHeap.java | 7 +++++++ .../consumer/impl/id/StringIdOffHeap.java | 7 +++++++ .../consumer/impl/id/UuidIdOffHeap.java | 7 +++++++ .../property/HugeEdgePropertyOffHeap.java | 7 +++++++ .../property/HugeVertexPropertyOffHeap.java | 7 +++++++ .../memory/pool/AbstractMemoryPool.java | 6 ++++++ .../hugegraph/memory/pool/MemoryPool.java | 3 +++ .../memory/pool/impl/OperatorMemoryPool.java | 17 +++++++++++++++- 16 files changed, 124 insertions(+), 4 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/MemoryAllocator.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/MemoryAllocator.java index 17ed167c17..9e8f443429 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/MemoryAllocator.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/MemoryAllocator.java @@ -23,5 +23,7 @@ public interface MemoryAllocator { Object forceAllocate(long size); - void releaseMemory(long size); + void returnMemoryToManager(long size); + + void releaseMemoryBlock(Object memoryBlock); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/NettyMemoryAllocator.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/NettyMemoryAllocator.java index a6e4958e85..579bb68829 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/NettyMemoryAllocator.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/NettyMemoryAllocator.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.ReferenceCountUtil; /** * This class makes fully use of Netty's efficient memory management strategy. @@ -50,7 +51,24 @@ public ByteBuf tryToAllocate(long size) { } @Override - public void releaseMemory(long size) { + public void returnMemoryToManager(long size) { memoryManager.getCurrentOffHeapAllocatedMemory().addAndGet(-size); } + + @Override + public void releaseMemoryBlock(Object memoryBlock) { + if (!(memoryBlock instanceof ByteBuf)) { + throw new IllegalArgumentException("memoryBlock must be ByteBuf"); + } + ByteBuf buf = (ByteBuf) memoryBlock; + ReferenceCountUtil.safeRelease(buf, ReferenceCountUtil.refCnt(buf)); + } + + public static void main(String[] args) { + MemoryAllocator netty = new NettyMemoryAllocator(null); + ByteBuf buf = (ByteBuf) netty.forceAllocate(1024); + System.out.println(ReferenceCountUtil.refCnt(buf)); + netty.releaseMemoryBlock(buf); + System.out.println(ReferenceCountUtil.refCnt(buf)); + } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java index 401283254f..cf5d57a9cd 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/allocator/OnHeapMemoryStrategy.java @@ -48,7 +48,12 @@ public byte[] forceAllocate(long size) { } @Override - public void releaseMemory(long size) { + public void returnMemoryToManager(long size) { memoryManager.getCurrentOnHeapAllocatedMemory().addAndGet(-size); } + + @Override + public void releaseMemoryBlock(Object memoryBlock) { + memoryBlock = null; + } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java index 8eb4cc0489..bb860750e0 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/MemoryConsumer.java @@ -17,8 +17,12 @@ package org.apache.hugegraph.memory.consumer; +import java.util.List; + import org.apache.hugegraph.memory.pool.MemoryPool; +import io.netty.buffer.ByteBuf; + /** * This interface is used by immutable, memory-heavy objects which will be stored in off heap. */ @@ -44,4 +48,6 @@ public interface MemoryConsumer { void releaseOriginalOnHeapVars(); MemoryPool getOperatorMemoryPool(); + + List getAllOffHeapByteBuf(); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java index 8215f98749..334257e988 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/BinaryIdOffHeap.java @@ -17,6 +17,9 @@ package org.apache.hugegraph.memory.consumer.impl.id; +import java.util.Collections; +import java.util.List; + import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.serializer.BinaryBackendEntry; import org.apache.hugegraph.memory.consumer.MemoryConsumer; @@ -59,6 +62,11 @@ public MemoryPool getOperatorMemoryPool() { return this.memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(bytesOffHeap); + } + @Override public void releaseOriginalOnHeapVars() { this.bytes = null; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java index 98d30f3dd6..a63c8c5c38 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/EdgeIdOffHeap.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.memory.consumer.impl.id; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Objects; import org.apache.hugegraph.backend.id.EdgeId; @@ -29,6 +30,8 @@ import org.apache.hugegraph.structure.HugeVertex; import org.apache.hugegraph.type.define.Directions; +import com.google.common.collect.Lists; + import io.netty.buffer.ByteBuf; // TODO: rewrite static method in EdgeId @@ -140,6 +143,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Lists.newArrayList(this.sortValuesOffHeap, this.cacheOffHeap); + } + @Override public EdgeId switchDirection() { Directions newDirection = this.direction.opposite(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java index 6c7bf65d66..c0f1345460 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/LongIdOffHeap.java @@ -19,6 +19,8 @@ import static org.apache.hugegraph.backend.id.IdGenerator.compareType; +import java.util.Collections; +import java.util.List; import java.util.Objects; import org.apache.hugegraph.backend.id.Id; @@ -75,6 +77,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(idOffHeap); + } + @Override public long asLong() { try { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java index 93c8506033..e6ff53c9fe 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/ObjectIdOffHeap.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.memory.consumer.impl.id; +import java.util.Collections; +import java.util.List; import java.util.Objects; import org.apache.hugegraph.backend.id.IdGenerator; @@ -63,6 +65,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(objectOffHeap); + } + @Override public Object asObject() { return FurySerializationUtils.FURY.deserialize(ByteBufUtil.getBytes(objectOffHeap)); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java index b2852992a9..9f62e0c67c 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/QueryIdOffHeap.java @@ -18,6 +18,8 @@ package org.apache.hugegraph.memory.consumer.impl.id; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; import org.apache.hugegraph.backend.cache.CachedBackendStore; import org.apache.hugegraph.backend.id.Id; @@ -69,6 +71,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(queryOffHeap); + } + @Override public boolean equals(Object other) { if (!(other instanceof QueryIdOffHeap)) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java index e924a17a61..3501026963 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/StringIdOffHeap.java @@ -20,6 +20,8 @@ import static org.apache.hugegraph.backend.id.IdGenerator.compareType; import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; @@ -75,6 +77,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(idOffHeap); + } + @Override public Object asObject() { return this.asString(); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java index b21b72ad63..3b2ba7e372 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/id/UuidIdOffHeap.java @@ -20,6 +20,8 @@ import static org.apache.hugegraph.backend.id.IdGenerator.compareType; import java.io.IOException; +import java.util.Collections; +import java.util.List; import java.util.UUID; import org.apache.hugegraph.backend.id.Id; @@ -87,6 +89,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(idOffHeap); + } + @Override public Object asObject() { return FurySerializationUtils.FURY.deserialize( diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeEdgePropertyOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeEdgePropertyOffHeap.java index c2cf91a713..9ed058fef3 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeEdgePropertyOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeEdgePropertyOffHeap.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.memory.consumer.impl.property; +import java.util.Collections; +import java.util.List; import java.util.NoSuchElementException; import org.apache.hugegraph.memory.consumer.MemoryConsumer; @@ -68,6 +70,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(valueOffHeap); + } + @Override public HugeEdgeProperty switchEdgeOwner() { assert this.owner instanceof HugeEdge; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeVertexPropertyOffHeap.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeVertexPropertyOffHeap.java index 194da3ec15..7d6bd3a107 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeVertexPropertyOffHeap.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/consumer/impl/property/HugeVertexPropertyOffHeap.java @@ -17,6 +17,8 @@ package org.apache.hugegraph.memory.consumer.impl.property; +import java.util.Collections; +import java.util.List; import java.util.NoSuchElementException; import org.apache.hugegraph.memory.consumer.MemoryConsumer; @@ -67,6 +69,11 @@ public MemoryPool getOperatorMemoryPool() { return memoryPool; } + @Override + public List getAllOffHeapByteBuf() { + return Collections.singletonList(valueOffHeap); + } + @Override public Object serialValue(boolean encodeNumber) { return this.pkey.serialValue(this.value(), encodeNumber); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/AbstractMemoryPool.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/AbstractMemoryPool.java index 639874d639..9c193702dc 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/AbstractMemoryPool.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/AbstractMemoryPool.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.ReentrantLock; import org.apache.hugegraph.memory.MemoryManager; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; import org.apache.hugegraph.memory.pool.impl.MemoryPoolStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +140,11 @@ public Object tryToAcquireMemoryInternal(long bytes) { return null; } + @Override + public void bindMemoryConsumer(MemoryConsumer memoryConsumer) { + // default do nothing + } + @Override public Object requireMemory(long bytes) { return null; diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/MemoryPool.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/MemoryPool.java index 7c396ede91..c928008941 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/MemoryPool.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/MemoryPool.java @@ -17,6 +17,7 @@ package org.apache.hugegraph.memory.pool; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; import org.apache.hugegraph.memory.pool.impl.MemoryPoolStats; import org.apache.hugegraph.memory.util.QueryOutOfMemoryException; @@ -49,4 +50,6 @@ public interface MemoryPool { MemoryPool getParentPool(); MemoryPool findRootQueryPool(); + + void bindMemoryConsumer(MemoryConsumer memoryConsumer); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/OperatorMemoryPool.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/OperatorMemoryPool.java index eb57cd78d8..506e725c4e 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/OperatorMemoryPool.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/memory/pool/impl/OperatorMemoryPool.java @@ -17,8 +17,12 @@ package org.apache.hugegraph.memory.pool.impl; +import java.util.HashSet; +import java.util.Set; + import org.apache.hugegraph.memory.MemoryManager; import org.apache.hugegraph.memory.allocator.MemoryAllocator; +import org.apache.hugegraph.memory.consumer.MemoryConsumer; import org.apache.hugegraph.memory.pool.AbstractMemoryPool; import org.apache.hugegraph.memory.pool.MemoryPool; import org.apache.hugegraph.memory.util.MemoryManageUtils; @@ -30,16 +34,27 @@ public class OperatorMemoryPool extends AbstractMemoryPool { private static final Logger LOG = LoggerFactory.getLogger(OperatorMemoryPool.class); private final MemoryAllocator memoryAllocator; + private final Set memoryConsumers; public OperatorMemoryPool(MemoryPool parent, String poolName, MemoryAllocator memoryAllocator, MemoryManager memoryManager) { super(parent, poolName, memoryManager); this.memoryAllocator = memoryAllocator; + this.memoryConsumers = new HashSet<>(); + } + + @Override + public void bindMemoryConsumer(MemoryConsumer memoryConsumer) { + this.memoryConsumers.add(memoryConsumer); } @Override public synchronized void releaseSelf(String reason) { - this.memoryAllocator.releaseMemory(getAllocatedBytes()); + this.memoryAllocator.returnMemoryToManager(getAllocatedBytes()); + this.memoryConsumers.forEach(memoryConsumer -> { + memoryConsumer.getAllOffHeapByteBuf().forEach(memoryAllocator::releaseMemoryBlock); + }); + this.memoryConsumers.clear(); super.releaseSelf(reason); // TODO: release memory consumer, release byte buffer. }