Skip to content

Commit

Permalink
improve code style
Browse files Browse the repository at this point in the history
  • Loading branch information
wuchaojing committed Mar 3, 2023
1 parent 43a4535 commit 0314b3f
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,6 @@
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
Expand All @@ -55,11 +39,14 @@
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
Expand All @@ -69,6 +56,19 @@
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;

public final class RaftContext {

Expand Down Expand Up @@ -233,13 +233,13 @@ public NodeOptions nodeOptions() throws IOException {
NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setEnableMetrics(false);
nodeOptions.setRpcProcessorThreadPoolSize(
config.get(CoreOptions.RAFT_RPC_THREADS));
config.get(CoreOptions.RAFT_RPC_THREADS));
nodeOptions.setRpcConnectTimeoutMs(
config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT));
config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT));
nodeOptions.setRpcDefaultTimeout(
1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT));
1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT));
nodeOptions.setRpcInstallSnapshotTimeout(
1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT));
1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT));

int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT);
nodeOptions.setElectionTimeoutMs(electionTimeout);
Expand Down Expand Up @@ -269,15 +269,15 @@ public NodeOptions nodeOptions() throws IOException {
*/
raftOptions.setApplyBatch(config.get(CoreOptions.RAFT_APPLY_BATCH));
raftOptions.setDisruptorBufferSize(
config.get(CoreOptions.RAFT_QUEUE_SIZE));
config.get(CoreOptions.RAFT_QUEUE_SIZE));
raftOptions.setDisruptorPublishEventWaitTimeoutSecs(
config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT));
config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT));
raftOptions.setReplicatorPipeline(
config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE));
config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE));
raftOptions.setOpenStatistics(false);
raftOptions.setReadOnlyOptions(
ReadOnlyOption.valueOf(
config.get(CoreOptions.RAFT_READ_STRATEGY)));
ReadOnlyOption.valueOf(
config.get(CoreOptions.RAFT_READ_STRATEGY)));

return nodeOptions;
}
Expand Down Expand Up @@ -373,30 +373,30 @@ private HugeConfig config() {
@SuppressWarnings("unused")
private RpcServer initAndStartRpcServer() {
Integer lowWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
System.setProperty("bolt.channel_write_buf_low_water_mark",
String.valueOf(lowWaterMark));
Integer highWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));

PeerId endpoint = this.endpoint();
NodeManager.getInstance().addAddress(endpoint.getEndpoint());
RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
endpoint.getEndpoint());
endpoint.getEndpoint());
LOG.info("Raft-RPC server is started successfully");
return rpcServer;
}

private RpcServer wrapRpcServer(com.alipay.remoting.rpc.RpcServer rpcServer) {
// TODO: pass ServerOptions instead of CoreOptions, to share by graphs
Integer lowWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK);
System.setProperty("bolt.channel_write_buf_low_water_mark",
String.valueOf(lowWaterMark));
Integer highWaterMark = this.config().get(
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK);
System.setProperty("bolt.channel_write_buf_high_water_mark",
String.valueOf(highWaterMark));

Expand Down Expand Up @@ -439,7 +439,7 @@ private ExecutorService createSnapshotExecutor(int coreThreads) {
private ExecutorService createBackendExecutor(int threads) {
String name = "store-backend-executor";
RejectedExecutionHandler handler =
new ThreadPoolExecutor.CallerRunsPolicy();
new ThreadPoolExecutor.CallerRunsPolicy();
return newPool(threads, threads, name, handler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -40,10 +44,6 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import com.google.protobuf.ByteString;

public class StoreSnapshotFile {
Expand Down Expand Up @@ -174,14 +174,15 @@ private void compressSnapshotDir(SnapshotWriter writer,
long begin = System.currentTimeMillis();
String rootDir = Paths.get(snapshotDir).getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault().compressZip(rootDir, sourceDir, outputFile, checksum);
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}

LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Expand All @@ -200,7 +201,7 @@ private void compressSnapshotDir(SnapshotWriter writer,

private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar)
throws IOException {
throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" +
Expand Down Expand Up @@ -231,7 +232,7 @@ private String decompressSnapshot(SnapshotReader reader,
archiveFile, parentPath, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to decompress snapshot, zip=%s", e, archiveFile);
"Failed to decompress snapshot, zip=%s", e, archiveFile);
}

if (meta.hasChecksum()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;

import java.util.zip.Checksum;

public interface CompressStrategy {

void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, final Checksum checksum)
void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum)
throws Throwable;

void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum) throws Throwable;
void decompressZip(final String sourceZipFile, final String outputDir,
final Checksum checksum) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;

import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;

public class CompressStrategyManager {
private static CompressStrategy[] compressStrategies = new CompressStrategy[5];

private static byte DEFAULT_STRATEGY = 1;
public static final byte SERIAL_STRATEGY = 1;
public static final byte PARALLEL_STRATEGY = 2;
public static final byte MAX_STRATEGY = 5;
private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];

static {
addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
Expand All @@ -19,7 +38,8 @@ private CompressStrategyManager() {
public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
if (compressStrategies.length <= idx) {
final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
System.arraycopy(compressStrategies, 0, newCompressStrategies, 0, compressStrategies.length);
System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
compressStrategies.length);
compressStrategies = newCompressStrategies;
}
compressStrategies[idx] = compressStrategy;
Expand All @@ -36,7 +56,8 @@ public static void init(final HugeConfig config) {
final CompressStrategy compressStrategy = new ParallelCompressStrategy(
config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
CompressStrategyManager.addCompressStrategy(CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
CompressStrategyManager.addCompressStrategy(
CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}
Expand Down
Loading

0 comments on commit 0314b3f

Please sign in to comment.