Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Introduce kafka as another wal storage #4301

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@
path = learning_engine/graphlearn-for-pytorch
url = https://github.com/alibaba/graphlearn-for-pytorch.git

[submodule "flex/third_party/cppkafka"]
path = flex/third_party/cppkafka
url = https://github.com/mfontanini/cppkafka.git
22 changes: 22 additions & 0 deletions docs/flex/interactive/development/dev_and_test.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,28 @@ mvn clean package -DskipTests -Pexperimental
- `USE_PTHASH`: Indicates whether to use a perfect hash when building the vertex map.
- `OPTIMIZE_FOR_HOST`: Determines if Flex should be optimized for performance on the current machine. Note that enabling this option may result in a binary that does not run on different platforms or CPU architectures.

### Wal Writer

Interactive use WAL(Write Ahead Logging) to ensure the data integrity. Two different wal writer is provided with respect to different storage interface: `LocalWalWriter` for writing wals to local disk and `KafkaWalWriter` for persisting wals on kafka.

You could switch the wal writer type in the configuration. See [Configuration](./../configuration.md#service-configuration).

#### Local Wal Writer

The default wal writer, you don't need to do anything, just make sure your disk has enough space

#### Kafka Wal Writer

You need to deploy a kafka cluster first. For details, please refer to [Kafka Documentation](https://kafka.apache.org/documentation/). Follow the [QuickStart](https://kafka.apache.org/quickstart) to deploy a service.

#### Performance

##### Settings

##### Producing Wals

##### Consuming Wals

## Testing

Numerous test cases have been created for Interactive, which can be referenced in the GitHub workflow[interactive.yaml](https://github.com/alibaba/GraphScope/blob/main/.github/workflows/interactive.yml).
Expand Down
21 changes: 21 additions & 0 deletions flex/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" OFF)
option(USE_PTHASH "Whether to use pthash" OFF)
option(OPTIMIZE_FOR_HOST "Whether to optimize on host" ON) # Whether to build optimized code on host
option(USE_STATIC_ARROW "Whether to use static arrow" ON) # Whether to link arrow statically, default is ON
option(BUILD_KAFKA_WAL_WRITER "Whether to build kafka wal writer" ON)

#print options
message(STATUS "Build test: ${BUILD_TEST}")
Expand Down Expand Up @@ -57,6 +58,26 @@ if(USE_PTHASH)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/third_party/murmurhash)
endif()

if (BUILD_KAFKA_WAL_WRITER)
find_package(CppKafka)
if (NOT CppKafka_FOUND)
message(STATUS "cppkafka not found, try to build with third_party/cppkafka")
add_subdirectory(third_party/cppkafka)
# if cppkafka/CMakeLists.txt not exits, tell user to run git submodule update --init --recursive
if (NOT EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/CMakeLists.txt)
message(FATAL_ERROR "cppkafka not found, please run git submodule update --init --recursive")
endif ()
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/include)
set (CppKafka_INSTALL_DIR ${CMAKE_CURRENT_SOURCE_DIR}/third_party/cppkafka/build)
list (APPEND CMAKE_PREFIX_PATH ${CppKafka_INSTALL_DIR})
set(CppKafka_LIBRARIES cppkafka)
else()
include_directories(SYSTEM ${CppKafka_INCLUDE_DIRS})
set(CppKafka_LIBRARIES CppKafka::cppkafka)
endif ()
add_definitions(-DBUILD_KAFKA_WAL_WRITER)
endif()

set(DEFAULT_BUILD_TYPE "Release")
if (NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(STATUS "Setting build type to '${DEFAULT_BUILD_TYPE}' as none was specified.")
Expand Down
8 changes: 7 additions & 1 deletion flex/bin/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ install_without_export_flex_target(bulk_loader)

add_executable(stored_procedure_runner stored_procedure_runner.cc)
target_link_libraries(stored_procedure_runner flex_rt_mutable_graph flex_utils flex_graph_db ${GLOG_LIBRARIES} ${GFLAGS_LIBRARIES} ${Boost_LIBRARIES})
install_without_export_flex_target(stored_procedure_runner)
install_without_export_flex_target(stored_procedure_runner)

if (BUILD_KAFKA_WAL_WRITER)
add_executable(wal_consumer wal_consumer.cc)
target_link_libraries(wal_consumer PUBLIC ${CppKafka_LIBRARIES} ${Boost_LIBRARIES} ${GLOG_LIBRARIES} flex_graph_db)
install_without_export_flex_target(wal_consumer)
endif()
31 changes: 28 additions & 3 deletions flex/bin/interactive_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,143 +117,168 @@
/**
* The main entrance for InteractiveServer.
*/
int main(int argc, char** argv) {
// block sigint and sigterm in main thread, let seastar handle it
gs::blockSignal(SIGINT);
gs::blockSignal(SIGTERM);

bpo::options_description desc("Usage:");
desc.add_options()("help,h", "Display help messages")(
"enable-admin-service,e", bpo::value<bool>()->default_value(false),
"whether or not to start admin service")("server-config,c",
bpo::value<std::string>(),
"path to server config yaml")(
"codegen-dir,d",
bpo::value<std::string>()->default_value("/tmp/codegen/"),
"codegen working directory")(
"workspace,w",
bpo::value<std::string>()->default_value("/tmp/workspace/"),
"directory to interactive workspace")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"data-path,a", bpo::value<std::string>(), "data directory path")(
"open-thread-resource-pool", bpo::value<bool>()->default_value(true),
"open thread resource pool")("worker-thread-number",
bpo::value<unsigned>()->default_value(2),
"worker thread number")(
"enable-trace", bpo::value<bool>()->default_value(false),
"whether to enable opentelemetry tracing")(
"start-compiler", bpo::value<bool>()->default_value(false),
"whether or not to start compiler")(
"memory-level,m", bpo::value<unsigned>()->default_value(1),
"memory allocation strategy")("enable-adhoc-handler",
bpo::value<bool>()->default_value(false),
"whether to enable adhoc handler");
"whether to enable adhoc handler")(
"kafka-brokers,k", bpo::value<std::string>()->default_value(""),
"kafka endpoint")("kafka-topic",
bpo::value<std::string>()->default_value(""),
"kafka topic")(
"wal-type,t", bpo::value<std::string>()->default_value("local"),
"wal writer type");

setenv("TZ", "Asia/Shanghai", 1);
tzset();

bpo::variables_map vm;
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
bpo::notify(vm);

if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}

//// declare vars
std::string workspace, engine_config_file;
if (vm.count("workspace")) {
workspace = vm["workspace"].as<std::string>();
}
server::WorkDirManipulator::SetWorkspace(workspace);

if (!vm.count("server-config")) {
LOG(FATAL) << "server-config is needed";
}
engine_config_file = vm["server-config"].as<std::string>();

YAML::Node node = YAML::LoadFile(engine_config_file);
// Parse service config
server::ServiceConfig service_config = node.as<server::ServiceConfig>();
service_config.engine_config_path = engine_config_file;
service_config.start_admin_service = vm["enable-admin-service"].as<bool>();
service_config.start_compiler = vm["start-compiler"].as<bool>();
service_config.memory_level = vm["memory-level"].as<unsigned>();
service_config.enable_adhoc_handler = vm["enable-adhoc-handler"].as<bool>();
service_config.kafka_brokers = vm["kafka-brokers"].as<std::string>();
service_config.wal_writer_type =
gs::IWalWriter::parseWalWriterType(vm["wal-type"].as<std::string>());

auto kafka_topic = vm["kafka-topic"].as<std::string>();

// Config log level
gs::config_log_level(service_config.log_level, service_config.verbose_level);

auto& db = gs::GraphDB::get();

if (vm["enable-trace"].as<bool>()) {
#ifdef HAVE_OPENTELEMETRY_CPP
LOG(INFO) << "Initialize opentelemetry...";
otel::initTracer();
otel::initMeter();
otel::initLogger();
#else
LOG(WARNING) << "OpenTelemetry is not enabled in this build";
#endif
}

if (service_config.start_admin_service) {
// When start admin service, we need a workspace to put all the meta data
// and graph indices. We will initiate the query service with default graph.
if (vm.count("graph-config") || vm.count("data-path")) {
LOG(FATAL) << "To start admin service, graph-config and "
"data-path should NOT be specified";
}

if (!kafka_topic.empty()) {
// When starting with admin service, we don't need a kafka topic from the
// command line
LOG(WARNING) << "kafka-topic is discarded when starting with admin "
"service";
}

// Suppose the default_graph is already loaded.
LOG(INFO) << "Finish init workspace";
auto schema_file = server::WorkDirManipulator::GetGraphSchemaPath(
service_config.default_graph);
if (service_config.enable_adhoc_handler) {
gs::init_codegen_proxy(vm, engine_config_file);
}
} else {
LOG(INFO) << "Start query service only";
std::string graph_schema_path, data_path;

// init graph
if (!vm.count("graph-config")) {
LOG(ERROR) << "graph-config is required";
return -1;
}
graph_schema_path = vm["graph-config"].as<std::string>();
if (!vm.count("data-path")) {
LOG(ERROR) << "data-path is required";
return -1;
}
data_path = vm["data-path"].as<std::string>();

auto schema_res = gs::Schema::LoadFromYaml(graph_schema_path);
if (!schema_res.ok()) {
LOG(FATAL) << "Fail to load graph schema from yaml file: "
<< graph_schema_path;
}

// The schema is loaded just to get the plugin dir and plugin list
if (service_config.enable_adhoc_handler) {
gs::init_codegen_proxy(vm, engine_config_file, graph_schema_path);
}
db.Close();
auto load_res =
db.Open(schema_res.value(), data_path, service_config.shard_num);
gs::GraphDBConfig config(schema_res.value(), data_path,
service_config.shard_num);
config.set_wal_writer_type(vm["wal-type"].as<std::string>());
if (config.wal_writer_type == gs::IWalWriter::WalWriterType::kKafka) {
config.kafka_brokers =
vm["kafka-brokers"].as<std::string>(); // default empty
config.kafka_topic = kafka_topic;
}
auto load_res = db.Open(config);
if (!load_res.ok()) {
LOG(FATAL) << "Failed to load graph from data directory: "
<< load_res.status().error_message();
}
}

server::GraphDBService::get().init(service_config);
server::GraphDBService::get().run_and_wait_for_exit();

#ifdef HAVE_OPENTELEMETRY_CPP
otel::cleanUpTracer();
#endif

return 0;

Check notice on line 283 in flex/bin/interactive_server.cc

View check run for this annotation

codefactor.io / CodeFactor

flex/bin/interactive_server.cc#L120-L283

Complex Method
}
137 changes: 137 additions & 0 deletions flex/bin/wal_consumer.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifdef BUILD_KAFKA_WAL_WRITER

#include <csignal>
#include <filesystem>
#include <iostream>
#include <queue>

#include <glog/logging.h>

#include <boost/program_options.hpp>
#include "cppkafka/cppkafka.h"
#include "flex/engines/graph_db/database/wal.h"
#include "flex/third_party/httplib.h"

namespace gs {

// Give a WAL(in string format), forward to the Interactive Engine, which should
// be on the same machine, and the engine will write the WAL to the disk.
class WalSender {
public:
static constexpr int32_t CONNECTION_TIMEOUT = 10;
static constexpr int32_t READ_TIMEOUT = 60;
static constexpr int32_t WRITE_TIMEOUT = 60;
WalSender(const std::string& host, int port, const std::string& dst_url)
: host_(host), port_(port), client_(host_, port_), req_url_(dst_url) {
client_.set_connection_timeout(CONNECTION_TIMEOUT);
client_.set_read_timeout(READ_TIMEOUT);
client_.set_write_timeout(WRITE_TIMEOUT);
}

void send(const std::string& payload) {
httplib::Headers headers = {{"Content-Type", "application/octet-stream"}};
auto res =
client_.Post(req_url_, headers, payload, "application/octet-stream");
if (res) {
LOG(INFO) << "Send to engine: " << res->status << ", " << res->body;
} else {
LOG(ERROR) << "Send to engine failed: " << res.error();
}
}

private:
std::string host_;
int port_;
httplib::Client client_;
std::string req_url_;
};

} // namespace gs

namespace bpo = boost::program_options;

int main(int argc, char** argv) {
std::string brokers;
std::string topic_name;
std::string group_id;
std::string engine_host;
std::string req_url;
int32_t engine_port;

bpo::options_description desc("Usage:");
desc.add_options()("help", "Display help message")(
"kafka-brokers,b", bpo::value<std::string>(&brokers)->required(),
"Kafka brokers list")(
"url,u", bpo::value<std::string>(&req_url)->required(), "req_url")(
"group-id,g",
bpo::value<std::string>(&group_id)->default_value("interactive_group"),
"Kafka group id")("engine-host,h",
bpo::value<std::string>(&engine_host)->required(),
"Engine URL")(
"engine-port,p", bpo::value<int32_t>(&engine_port)->required(),
"Engine port")("topic,t",
bpo::value<std::string>(&topic_name)->required(),
"Kafka topic name");

google::InitGoogleLogging(argv[0]);
FLAGS_logtostderr = true;

bpo::variables_map vm;
try {
bpo::store(bpo::command_line_parser(argc, argv).options(desc).run(), vm);
bpo::notify(vm);
} catch (std::exception& e) {
std::cerr << "Error parsing command line: " << e.what() << std::endl;
std::cerr << e.what() << std::endl;
std::cout << desc << std::endl;
return -1;
}

auto kafka_brokers = vm["kafka-brokers"].as<std::string>();
LOG(INFO) << "Kafka brokers: " << kafka_brokers;
LOG(INFO) << "engine endpoint: " << vm["engine-url"].as<std::string>() << ":"
<< vm["engine-port"].as<int32_t>();

// Construct the configuration
cppkafka::Configuration config = {{"metadata.broker.list", kafka_brokers},
{"group.id", group_id},
// Disable auto commit
{"enable.auto.commit", false}};
// Create the consumer
gs::WalSender sender(vm["engine-url"].as<std::string>(),
vm["engine-port"].as<int32_t>(),
vm["req-url"].as<std::string>());
gs::KafkaWalConsumer consumer(config, topic_name, 1);

if (vm.count("help")) {
std::cout << desc << std::endl;
return 0;
}

while (true) {
auto msg = consumer.poll();
LOG(INFO) << "Received message: -> " << msg << ">";
sender.send(msg);
}

LOG(INFO) << "Consuming messages from topic " << topic_name;

return 0;
}

#endif
3 changes: 3 additions & 0 deletions flex/engines/graph_db/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ target_include_directories(flex_graph_db PUBLIC $<BUILD_INTERFACE:${CMAKE_CURREN
target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT})
target_link_libraries(flex_graph_db flex_plan_proto runtime_adhoc)
install_flex_target(flex_graph_db)
if (BUILD_KAFKA_WAL_WRITER)
target_link_libraries(flex_graph_db ${CppKafka_LIBRARIES})
endif()

install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h
${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db_session.h
Expand Down
2 changes: 1 addition & 1 deletion flex/engines/graph_db/database/compact_transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace gs {

CompactTransaction::CompactTransaction(MutablePropertyFragment& graph,
WalWriter& logger, VersionManager& vm,
IWalWriter& logger, VersionManager& vm,
timestamp_t timestamp)
: graph_(graph), logger_(logger), vm_(vm), timestamp_(timestamp) {
arc_.Resize(sizeof(WalHeader));
Expand Down
6 changes: 3 additions & 3 deletions flex/engines/graph_db/database/compact_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
namespace gs {

class MutablePropertyFragment;
class WalWriter;
class IWalWriter;
class VersionManager;

class CompactTransaction {
public:
CompactTransaction(MutablePropertyFragment& graph, WalWriter& logger,
CompactTransaction(MutablePropertyFragment& graph, IWalWriter& logger,
VersionManager& vm, timestamp_t timestamp);
~CompactTransaction();

Expand All @@ -39,7 +39,7 @@ class CompactTransaction {

private:
MutablePropertyFragment& graph_;
WalWriter& logger_;
IWalWriter& logger_;
VersionManager& vm_;
timestamp_t timestamp_;

Expand Down
Loading
Loading