Skip to content

Commit

Permalink
support: invalid cache through rpc (#1357)
Browse files Browse the repository at this point in the history
* support: invalid cache through rpc
* add RpcServiceConfig* interfaces
* add FanoutCluster to broadcast reuqest
* fix GraphManager.close() is not called by HttpServer.DESTROY_FINISHED
* notify cache with Id[] arg to avoid too many events
* improve perf of raft notify-cache
* only enable cache-rpc service for shared storage
* check raft mode can only be enabled if non-shared store

Change-Id: I3fe18cd8c893a84a56fb66df651b78c5e0fe5f4b
  • Loading branch information
javeme authored Mar 11, 2021
1 parent 4bf094d commit 314fd8a
Show file tree
Hide file tree
Showing 38 changed files with 753 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.baidu.hugegraph.api.API;
import com.baidu.hugegraph.backend.query.QueryResults;
import com.baidu.hugegraph.core.GraphManager;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.server.RestServer;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser.SimilarsMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ public ConfigAuthenticator() {

@Override
public void setup(HugeConfig config) {
this.tokens.put(USER_ADMIN, config.get(ServerOptions.ADMIN_TOKEN));
this.tokens.putAll(config.getMap(ServerOptions.USER_TOKENS));
this.tokens.putAll(config.getMap(ServerOptions.AUTH_USER_TOKENS));
assert !this.tokens.containsKey(USER_ADMIN);
this.tokens.put(USER_ADMIN, config.get(ServerOptions.AUTH_ADMIN_TOKEN));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.exception.NotSupportException;
import com.baidu.hugegraph.iterator.FilterIterator;
import com.baidu.hugegraph.rpc.RpcServiceConfig4Client;
import com.baidu.hugegraph.rpc.RpcServiceConfig4Server;
import com.baidu.hugegraph.schema.EdgeLabel;
import com.baidu.hugegraph.schema.IndexLabel;
import com.baidu.hugegraph.schema.PropertyKey;
Expand Down Expand Up @@ -113,7 +115,7 @@ public HugeGraphAuthProxy(HugeGraph hugegraph) {

@Override
public HugeGraph hugegraph() {
verifyAdminPermission();
this.verifyAdminPermission();
return this.hugegraph;
}

Expand Down Expand Up @@ -502,7 +504,7 @@ public Transaction tx() {

@Override
public void close() throws Exception {
verifyAdminPermission();
this.verifyAdminPermission();
this.hugegraph.close();
}

Expand Down Expand Up @@ -575,7 +577,7 @@ public String backendVersion() {

@Override
public BackendStoreSystemInfo backendStoreSystemInfo() {
verifyAdminPermission();
this.verifyAdminPermission();
return this.hugegraph.backendStoreSystemInfo();
}

Expand Down Expand Up @@ -617,19 +619,19 @@ public void waitStarted() {

@Override
public void serverStarted(Id serverId, NodeRole serverRole) {
verifyAdminPermission();
this.verifyAdminPermission();
this.hugegraph.serverStarted(serverId, serverRole);
}

@Override
public boolean started() {
verifyAdminPermission();
this.verifyAdminPermission();
return this.hugegraph.started();
}

@Override
public boolean closed() {
verifyAdminPermission();
this.verifyAdminPermission();
return this.hugegraph.closed();
}

Expand Down Expand Up @@ -664,21 +666,28 @@ public RaftGroupManager raftGroupManager(String group) {
return this.hugegraph.raftGroupManager(group);
}

@Override
public void registerRpcServices(RpcServiceConfig4Server serverConfig,
RpcServiceConfig4Client clientConfig) {
this.verifyAdminPermission();
this.hugegraph.registerRpcServices(serverConfig, clientConfig);
}

@Override
public void initBackend() {
verifyAdminPermission();
this.verifyAdminPermission();
this.hugegraph.initBackend();
}

@Override
public void clearBackend() {
verifyAdminPermission();
this.verifyAdminPermission();
this.hugegraph.clearBackend();
}

@Override
public void truncateBackend() {
verifyAdminPermission();
this.verifyAdminPermission();
AuthManager userManager = this.hugegraph.authManager();
HugeUser admin = userManager.findUser(HugeAuthenticator.USER_ADMIN);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void setup(HugeConfig config) {

String remoteUrl = config.get(ServerOptions.AUTH_REMOTE_URL);
if (StringUtils.isNotEmpty(remoteUrl)) {
RpcClientProvider provider = new RpcClientProvider(config);
this.graph.switchAuthManager(provider.authManager());
RpcClientProvider clientProvider = new RpcClientProvider(config);
this.graph.switchAuthManager(clientProvider.authManager());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public static synchronized ServerOptions instance() {
"hugegraph"
);

public static final ConfigOption<String> ADMIN_TOKEN =
public static final ConfigOption<String> AUTH_ADMIN_TOKEN =
new ConfigOption<>(
"auth.admin_token",
"Token for administrator operations, " +
Expand All @@ -213,7 +213,7 @@ public static synchronized ServerOptions instance() {
"162f7848-0b6d-4faf-b557-3a0797869c55"
);

public static final ConfigListOption<String> USER_TOKENS =
public static final ConfigListOption<String> AUTH_USER_TOKENS =
new ConfigListOption<>(
"auth.user_tokens",
"The map of user tokens with name and password, " +
Expand All @@ -228,7 +228,7 @@ public static synchronized ServerOptions instance() {
"If the address is empty, it provide auth service, " +
"otherwise it is auth client and also provide auth service " +
"through rpc forwarding. The remote url can be set to " +
"multiple addresses, which are linked by ','.",
"multiple addresses, which are concat by ','.",
null,
""
);
Expand All @@ -238,7 +238,7 @@ public static synchronized ServerOptions instance() {
"rpc.server_port",
"The port bound by rpc server to provide services.",
rangeInt(1, Integer.MAX_VALUE),
8099
8090
);

public static final ConfigOption<String> RPC_SERVER_HOST =
Expand All @@ -258,6 +258,15 @@ public static synchronized ServerOptions instance() {
30
);

public static final ConfigOption<String> RPC_REMOTE_URL =
new ConfigOption<>(
"rpc.remote_url",
"The remote urls of rpc peers, it can be set to " +
"multiple addresses, which are concat by ','.",
disallowEmpty(),
"127.0.0.1:8090"
);

public static final ConfigOption<Integer> RPC_CLIENT_CONNECT_TIMEOUT =
new ConfigOption<>(
"rpc.client_connect_timeout",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@
import com.baidu.hugegraph.license.LicenseVerifier;
import com.baidu.hugegraph.metrics.MetricsUtil;
import com.baidu.hugegraph.metrics.ServerReporter;
import com.baidu.hugegraph.rpc.RpcClientProvider;
import com.baidu.hugegraph.rpc.RpcConsumerConfig;
import com.baidu.hugegraph.rpc.RpcProviderConfig;
import com.baidu.hugegraph.rpc.SofaRpcServer;
import com.baidu.hugegraph.rpc.RpcServer;
import com.baidu.hugegraph.serializer.JsonSerializer;
import com.baidu.hugegraph.serializer.Serializer;
import com.baidu.hugegraph.server.RestServer;
Expand All @@ -67,12 +69,14 @@ public final class GraphManager {

private final Map<String, Graph> graphs;
private final HugeAuthenticator authenticator;
private final SofaRpcServer rpcServer;
private final RpcServer rpcServer;
private final RpcClientProvider rpcClient;

public GraphManager(HugeConfig conf) {
this.graphs = new ConcurrentHashMap<>();
this.authenticator = HugeAuthenticator.loadAuthenticator(conf);
this.rpcServer = new SofaRpcServer(conf);
this.rpcServer = new RpcServer(conf);
this.rpcClient = new RpcClientProvider(conf);

this.loadGraphs(conf.getMap(ServerOptions.GRAPHS));
// this.installLicense(conf, "");
Expand Down Expand Up @@ -165,19 +169,35 @@ public AuthManager authManager() {
}

public void close() {
this.destoryRpcServer();
this.destroyRpcServer();
}

private void startRpcServer() {
RpcProviderConfig config = this.rpcServer.config();
RpcProviderConfig serverConfig = this.rpcServer.config();

if (this.authenticator != null) {
config.addService(AuthManager.class,
this.authenticator.authManager());
serverConfig.addService(AuthManager.class,
this.authenticator.authManager());
}

if (this.rpcClient.enabled()) {
RpcConsumerConfig clientConfig = this.rpcClient.config();

for (Graph graph : this.graphs.values()) {
HugeGraph hugegraph = (HugeGraph) graph;
hugegraph.registerRpcServices(serverConfig, clientConfig);
}
}

try {
this.rpcServer.exportAll();
} catch (Throwable e) {
this.rpcServer.destroy();
throw e;
}
this.rpcServer.exportAll();
}

private void destoryRpcServer() {
private void destroyRpcServer() {
this.rpcServer.destroy();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,58 @@

package com.baidu.hugegraph.rpc;

import com.alipay.sofa.rpc.config.ConsumerConfig;
import java.util.Arrays;
import java.util.LinkedHashSet;
import java.util.Set;

import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.baidu.hugegraph.auth.AuthManager;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.config.ServerOptions;
import com.baidu.hugegraph.util.E;

public class RpcClientProvider {

public final RpcConsumerConfig rpcConsumerConfig;
public final RpcConsumerConfig consumerConfig;
public final RpcConsumerConfig authConsumerConfig;

public RpcClientProvider(HugeConfig conf) {
RpcCommonConfig.initRpcConfigs(conf);
this.rpcConsumerConfig = new RpcConsumerConfig();
this.rpcConsumerConfig.addConsumerConfig(AuthManager.class, conf);
// TODO: fetch from registry server
String rpcUrl = conf.get(ServerOptions.RPC_REMOTE_URL);
String selfUrl = conf.get(ServerOptions.RPC_SERVER_HOST) + ":" +
conf.get(ServerOptions.RPC_SERVER_PORT);
rpcUrl = excludeSelfUrl(rpcUrl, selfUrl);
this.consumerConfig = StringUtils.isNotBlank(rpcUrl) ?
new RpcConsumerConfig(conf, rpcUrl) : null;

String authUrl = conf.get(ServerOptions.AUTH_REMOTE_URL);
this.authConsumerConfig = StringUtils.isNotBlank(authUrl) ?
new RpcConsumerConfig(conf, authUrl) : null;
}

public boolean enabled() {
return this.consumerConfig != null;
}

public RpcConsumerConfig config() {
E.checkArgument(this.consumerConfig != null,
"RpcClient is not enabled, please config option '%s'",
ServerOptions.RPC_REMOTE_URL.name());
return this.consumerConfig;
}

public AuthManager authManager() {
return (AuthManager) this.serviceProxy(AuthManager.class.getName());
E.checkArgument(this.authConsumerConfig != null,
"RpcClient is not enabled, please config option '%s'",
ServerOptions.AUTH_REMOTE_URL.name());
return this.authConsumerConfig.serviceProxy(AuthManager.class);
}

public Object serviceProxy(String serviceName) {
ConsumerConfig config = this.rpcConsumerConfig.consumerConfig(serviceName);
return config.refer();
private static String excludeSelfUrl(String rpcUrl, String selfUrl) {
String[] urls = StringUtils.splitWithCommaOrSemicolon(rpcUrl);
// Keep urls order via LinkedHashSet
Set<String> urlSet = new LinkedHashSet<>(Arrays.asList(urls));
urlSet.remove(selfUrl);
return String.join(",", urlSet);
}
}
Loading

0 comments on commit 314fd8a

Please sign in to comment.