Skip to content

Commit

Permalink
fix(interactive): Fix memory leak when reopening rocksdb instance (#4250
Browse files Browse the repository at this point in the history
)

Fixes memory leak when reopening rocksdb instance.
  • Loading branch information
siyuan0322 authored Sep 24, 2024
1 parent 27dc402 commit 4cf11cf
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 246 deletions.
2 changes: 1 addition & 1 deletion analytical_engine/core/launcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <memory>
#include <string>

#include "boost/process/detail/child_decl.hpp"
#include "boost/process.hpp"
#include "core/flags.h"
#include "grape/worker/comm_spec.h"

Expand Down
69 changes: 69 additions & 0 deletions analytical_engine/core/object/fragment_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "boost/leaf/error.hpp"
#include "boost/leaf/result.hpp"
#include "grape/fragment/immutable_edgecut_fragment.h"
// #include "grape/fragment/immutable_vertexcut_fragment.h"
#include "grape/serialization/in_archive.h"
#include "grape/worker/comm_spec.h"
#include "vineyard/client/client.h"
Expand Down Expand Up @@ -1094,6 +1095,74 @@ class FragmentWrapper<
std::shared_ptr<fragment_t> fragment_;
};

/*
template <typename OID_T, typename VID_T, typename VDATA_T, typename EDATA_T>
class FragmentWrapper<
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>>
: public IFragmentWrapper {
using fragment_t =
grape::ImmutableVertexcutFragment<OID_T, VID_T, VDATA_T, EDATA_T>;
public:
FragmentWrapper(const std::string& id, rpc::graph::GraphDefPb graph_def,
std::shared_ptr<fragment_t> fragment)
: IFragmentWrapper(id),
graph_def_(std::move(graph_def)),
fragment_(std::move(fragment)) {
CHECK_EQ(graph_def_.graph_type(), rpc::graph::IMMUTABLE_EDGECUT);
}
std::shared_ptr<void> fragment() const override {
return std::static_pointer_cast<void>(fragment_);
}
const rpc::graph::GraphDefPb& graph_def() const override {
return graph_def_;
}
rpc::graph::GraphDefPb& mutable_graph_def() override { return graph_def_; }
bl::result<std::shared_ptr<IFragmentWrapper>> CopyGraph(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot copy the ArrowProjectedFragment");
}
bl::result<std::unique_ptr<grape::InArchive>> ReportGraph(
const grape::CommSpec& comm_spec, const rpc::GSParams& params) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Not implemented.");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToDirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the directed DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> ToUndirected(
const grape::CommSpec& comm_spec,
const std::string& dst_graph_name) override {
RETURN_GS_ERROR(
vineyard::ErrorCode::kInvalidOperationError,
"Cannot convert to the undirected DynamicProjectedFragment");
}
bl::result<std::shared_ptr<IFragmentWrapper>> CreateGraphView(
const grape::CommSpec& comm_spec, const std::string& dst_graph_name,
const std::string& copy_type) override {
RETURN_GS_ERROR(vineyard::ErrorCode::kInvalidOperationError,
"Cannot generate a view over the ArrowProjectedFragment");
}
private:
rpc::graph::GraphDefPb graph_def_;
std::shared_ptr<fragment_t> fragment_;
};
*/

#ifdef NETWORKX
/**
* @brief A specialized FragmentWrapper for DynamicFragment.
Expand Down
1 change: 1 addition & 0 deletions charts/graphscope-store/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ data:
secondary.instance.enabled={{ .Values.secondary.enabled }}
store.data.secondary.path={{ .Values.secondary.storeDataPath }}
store.gc.interval.ms={{ .Values.storeGcIntervalMs }}
store.catchup.interval.ms={{ .Values.storeCatchupIntervalMs }}

write.ha.enabled={{ .Values.backup.enabled }}

Expand Down
5 changes: 3 additions & 2 deletions charts/graphscope-store/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ discoveryMode: "file"

## Coordinator Config
snapshotIncreaseIntervalMs: 1000
offsetsPersistIntervalMs: 3000
offsetsPersistIntervalMs: 1000
fileMetaStorePath: "/etc/groot/my.meta"
logRecycleEnable: true
logRecycleOffsetReserve: 86400
Expand All @@ -538,9 +538,10 @@ storeDataPath: "/var/lib/graphscope-store"
storeDataDownloadPath: "/var/lib/graphscope-store/download"
storeDataSecondaryPath: "/home/graphscope/secondary"
storeWriteThreadCount: 1
storeQueueBufferSize: "1024000"
storeQueueBufferSize: "102400"

storeGcIntervalMs: 5000
storeCatchupIntervalMs: 5000

## Kafka Config
##
Expand Down
1 change: 1 addition & 0 deletions interactive_engine/assembly/src/conf/groot/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
<logger name="org.apache.zookeeper" level="ERROR" />
<logger name="org.apache.kafka" level="ERROR" />
<logger name="kafka" level="ERROR" />
<logger name="io.grpc.netty" level="ERROR" />
<Logger name="MetricLog" level="INFO" additivity="false">
<appender-ref ref="Metric"/>
</Logger>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class CoordinatorConfig {
Config.longConfig("snapshot.increase.interval.ms", 1000L);

public static final Config<Long> OFFSETS_PERSIST_INTERVAL_MS =
Config.longConfig("offsets.persist.interval.ms", 3000L);
Config.longConfig("offsets.persist.interval.ms", 1000L);

public static final Config<Boolean> LOG_RECYCLE_ENABLE =
Config.boolConfig("log.recycle.enable", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class StoreConfig {
Config.intConfig("store.write.thread.count", 1);

public static final Config<Integer> STORE_QUEUE_BUFFER_SIZE =
Config.intConfig("store.queue.buffer.size", 1024000);
Config.intConfig("store.queue.buffer.size", 102400);

public static final Config<Long> STORE_QUEUE_WAIT_MS =
Config.longConfig("store.queue.wait.ms", 3000L);
Expand Down
40 changes: 20 additions & 20 deletions interactive_engine/executor/assembly/groot/src/store/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ static INIT: Once = Once::new();

#[no_mangle]
pub extern "C" fn openGraphStore(config_bytes: *const u8, len: usize) -> GraphHandle {
trace!("openGraphStore");
debug!("openGraphStore");
let buf = unsafe { ::std::slice::from_raw_parts(config_bytes, len) };
let proto = parse_pb::<ConfigPb>(buf).expect("parse config pb failed");
let mut config_builder = GraphConfigBuilder::new();
Expand Down Expand Up @@ -77,7 +77,7 @@ pub extern "C" fn closeGraphStore(handle: GraphHandle) -> bool {

#[no_mangle]
pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box<JnaResponse> {
trace!("getGraphDefBlob");
debug!("getGraphDefBlob");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.get_graph_def_blob() {
Expand All @@ -100,7 +100,7 @@ pub extern "C" fn getGraphDefBlob(ptr: GraphHandle) -> Box<JnaResponse> {

#[no_mangle]
pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box<JnaResponse> {
trace!("ingestData");
debug!("ingestData");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
let slice = CStr::from_ptr(path).to_bytes();
Expand All @@ -119,7 +119,7 @@ pub extern "C" fn ingestData(ptr: GraphHandle, path: *const c_char) -> Box<JnaRe
pub extern "C" fn writeBatch(
ptr: GraphHandle, snapshot_id: i64, data: *const u8, len: usize,
) -> Box<JnaResponse> {
trace!("writeBatch");
debug!("writeBatch");

let graph_store_ptr = unsafe { &*(ptr as *const GraphStore) };
let buf = unsafe { ::std::slice::from_raw_parts(data, len) };
Expand All @@ -139,7 +139,7 @@ pub extern "C" fn writeBatch(

#[no_mangle]
pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box<JnaResponse> {
trace!("getGraphStatistics");
debug!("getGraphStatistics");
unsafe {
let graph_store_ptr = &*(ptr as *const GraphStore);
match graph_store_ptr.get_graph_statistics_blob(snapshot_id) {
Expand All @@ -163,7 +163,7 @@ pub extern "C" fn getGraphStatistics(ptr: GraphHandle, snapshot_id: i64) -> Box<
fn do_write_batch<G: MultiVersionGraph>(
graph: &G, snapshot_id: SnapshotId, buf: &[u8],
) -> GraphResult<bool> {
trace!("do_write_batch");
debug!("do_write_batch");
let proto = parse_pb::<OperationBatchPb>(buf)?;
let mut has_ddl = false;
let operations = proto.get_operations();
Expand Down Expand Up @@ -241,7 +241,7 @@ fn do_write_batch<G: MultiVersionGraph>(
fn commit_data_load<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("commit_data_load");
info!("commit_data_load");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let commit_data_load_pb = parse_pb::<CommitDataLoadPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -256,7 +256,7 @@ fn commit_data_load<G: MultiVersionGraph>(
fn prepare_data_load<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("prepare_data_load");
info!("prepare_data_load");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let prepare_data_load_pb = parse_pb::<PrepareDataLoadPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -269,7 +269,7 @@ fn prepare_data_load<G: MultiVersionGraph>(
fn create_vertex_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("create_vertex_type");
info!("create_vertex_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let create_vertex_type_pb = parse_pb::<CreateVertexTypePb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -283,7 +283,7 @@ fn create_vertex_type<G: MultiVersionGraph>(
fn add_vertex_type_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("add_vertex_type_properties");
info!("add_vertex_type_properties");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let add_vertex_type_properties_pb =
Expand All @@ -298,7 +298,7 @@ fn add_vertex_type_properties<G: MultiVersionGraph>(
fn drop_vertex_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("drop_vertex_type");
info!("drop_vertex_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let label_id_pb = parse_pb::<LabelIdPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -309,7 +309,7 @@ fn drop_vertex_type<G: MultiVersionGraph>(
fn create_edge_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("create_edge_type");
info!("create_edge_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let typedef_pb = parse_pb::<TypeDefPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -321,7 +321,7 @@ fn create_edge_type<G: MultiVersionGraph>(
fn add_edge_type_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("add_edge_type_properties");
info!("add_edge_type_properties");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let typedef_pb = parse_pb::<TypeDefPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -333,7 +333,7 @@ fn add_edge_type_properties<G: MultiVersionGraph>(
fn drop_edge_type<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("drop_edge_type");
info!("drop_edge_type");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let label_id_pb = parse_pb::<LabelIdPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -342,7 +342,7 @@ fn drop_edge_type<G: MultiVersionGraph>(
}

fn add_edge_kind<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<bool> {
trace!("add_edge_kind");
info!("add_edge_kind");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let add_edge_kind_pb = parse_pb::<AddEdgeKindPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -355,7 +355,7 @@ fn add_edge_kind<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operati
fn remove_edge_kind<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<bool> {
trace!("remove_edge_kind");
info!("remove_edge_kind");
let ddl_operation_pb = parse_pb::<DdlOperationPb>(op.get_dataBytes())?;
let schema_version = ddl_operation_pb.get_schemaVersion();
let edge_kind_pb = parse_pb::<EdgeKindPb>(ddl_operation_pb.get_ddlBlob())?;
Expand All @@ -366,7 +366,7 @@ fn remove_edge_kind<G: MultiVersionGraph>(
fn overwrite_vertex<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<()> {
trace!("overwrite_vertex");
debug!("overwrite_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -380,7 +380,7 @@ fn overwrite_vertex<G: MultiVersionGraph>(
}

fn update_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("update_vertex");
debug!("update_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -396,7 +396,7 @@ fn update_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &Operati
fn clear_vertex_properties<G: MultiVersionGraph>(
graph: &G, snapshot_id: i64, op: &OperationPb,
) -> GraphResult<()> {
trace!("clear vertex properties");
debug!("clear vertex properties");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand All @@ -409,7 +409,7 @@ fn clear_vertex_properties<G: MultiVersionGraph>(
}

fn delete_vertex<G: MultiVersionGraph>(graph: &G, snapshot_id: i64, op: &OperationPb) -> GraphResult<()> {
trace!("delete_vertex");
debug!("delete_vertex");
let data_operation_pb = parse_pb::<DataOperationPb>(op.get_dataBytes())?;

let vertex_id_pb = parse_pb::<VertexIdPb>(data_operation_pb.get_keyBlob())?;
Expand Down
2 changes: 2 additions & 0 deletions interactive_engine/executor/store/groot/src/db/graph/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ impl Meta {
}

pub fn recover(&self) -> GraphResult<(VertexTypeManager, EdgeTypeManager)> {
debug!("meta graph recover start");
{
let mut graph_def_val = self.graph_def_lock.lock()?;
*graph_def_val = GraphDef::new(
Expand Down Expand Up @@ -237,6 +238,7 @@ impl Meta {
}
}
}
debug!("meta graph recovered");
Ok((vertex_manager_builder.build(), edge_manager_builder.build()))
}

Expand Down
Loading

0 comments on commit 4cf11cf

Please sign in to comment.