Skip to content

Commit

Permalink
complete property adoption
Browse files Browse the repository at this point in the history
  • Loading branch information
Pengzna committed Oct 26, 2024
1 parent 8344443 commit d9cf408
Show file tree
Hide file tree
Showing 5 changed files with 179 additions and 16 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,6 @@ public void serializeSelfToByteBuf() {
this.objectOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.objectOffHeap.markReaderIndex();
this.objectOffHeap.writeBytes(bytes);
//try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
// ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
// // serialize attributes to outputStream
// outputStream.writeObject(this.object);
// objectOffHeap = (ByteBuf) memoryPool.requireMemory(byteArrayOutputStream.size());
// objectOffHeap.writeBytes(byteArrayOutputStream.toByteArray());
//} catch (IOException e) {
// LOG.error("Unexpected error occurs when serializing ObjectId.", e);
// throw new SerializationRuntimeException(e);
//}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.consumer.impl.property;

import java.util.NoSuchElementException;

import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.util.FurySerializationUtils;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.structure.HugeEdge;
import org.apache.hugegraph.structure.HugeEdgeProperty;
import org.apache.hugegraph.structure.HugeElement;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

public class HugeEdgePropertyOffHeap<V> extends HugeEdgeProperty<V> implements MemoryConsumer {

private final MemoryPool memoryPool;
private ByteBuf valueOffHeap;

public HugeEdgePropertyOffHeap(MemoryPool memoryPool, HugeElement owner, PropertyKey key,
V value) {
super(owner, key, value);
this.memoryPool = memoryPool;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
}

@Override
public Object zeroCopyReadFromByteBuf() {
return new HugeEdgeProperty<>(this.owner, this.pkey,
FurySerializationUtils.FURY.deserialize(
ByteBufUtil.getBytes(this.valueOffHeap)));
}

@Override
public void serializeSelfToByteBuf() {
byte[] bytes = FurySerializationUtils.FURY.serialize(this.value);
this.valueOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.valueOffHeap.markReaderIndex();
this.valueOffHeap.writeBytes(bytes);
}

@Override
public void releaseOriginalOnHeapVars() {
this.value = null;
}

@Override
public MemoryPool getOperatorMemoryPool() {
return memoryPool;
}

@Override
public HugeEdgeProperty<V> switchEdgeOwner() {
assert this.owner instanceof HugeEdge;
return new HugeEdgeProperty<>(((HugeEdge) this.owner).switchOwner(),
this.pkey, this.value());
}

@Override
public Object serialValue(boolean encodeNumber) {
return this.pkey.serialValue(this.value(), encodeNumber);
}

@Override
public boolean isPresent() {
return this.value() != null;
}

@Override
public V value() throws NoSuchElementException {
return (V) FurySerializationUtils.FURY.deserialize(
ByteBufUtil.getBytes(this.valueOffHeap));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hugegraph.memory.consumer.impl.property;

import java.util.NoSuchElementException;

import org.apache.hugegraph.memory.consumer.MemoryConsumer;
import org.apache.hugegraph.memory.pool.MemoryPool;
import org.apache.hugegraph.memory.util.FurySerializationUtils;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.structure.HugeElement;
import org.apache.hugegraph.structure.HugeVertexProperty;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;

public class HugeVertexPropertyOffHeap<V> extends HugeVertexProperty<V> implements MemoryConsumer {

private final MemoryPool memoryPool;
private ByteBuf valueOffHeap;

public HugeVertexPropertyOffHeap(MemoryPool memoryPool, HugeElement owner, PropertyKey key,
V value) {
super(owner, key, value);
this.memoryPool = memoryPool;
serializeSelfToByteBuf();
releaseOriginalOnHeapVars();
}

@Override
public Object zeroCopyReadFromByteBuf() {
return new HugeVertexProperty<>(this.owner, this.pkey,
FurySerializationUtils.FURY.deserialize(
ByteBufUtil.getBytes(this.valueOffHeap)));
}

@Override
public void serializeSelfToByteBuf() {
byte[] bytes = FurySerializationUtils.FURY.serialize(this.value);
this.valueOffHeap = (ByteBuf) memoryPool.requireMemory(bytes.length);
this.valueOffHeap.markReaderIndex();
this.valueOffHeap.writeBytes(bytes);
}

@Override
public void releaseOriginalOnHeapVars() {
this.value = null;
}

@Override
public MemoryPool getOperatorMemoryPool() {
return memoryPool;
}

@Override
public Object serialValue(boolean encodeNumber) {
return this.pkey.serialValue(this.value(), encodeNumber);
}

@Override
public boolean isPresent() {
return this.value() != null;
}

@Override
public V value() throws NoSuchElementException {
return (V) FurySerializationUtils.FURY.deserialize(
ByteBufUtil.getBytes(this.valueOffHeap));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class HugeProperty<V> implements Property<V>, GraphType {

protected final HugeElement owner;
protected final PropertyKey pkey;
protected final V value;
protected V value;

public HugeProperty(HugeElement owner, PropertyKey pkey, V value) {
E.checkArgument(owner != null, "Property owner can't be null");
Expand Down

0 comments on commit d9cf408

Please sign in to comment.