Skip to content

Commit

Permalink
feat: support parallel compress snapshot (#2136)
Browse files Browse the repository at this point in the history
* add raft default config

* delete useless final

* fix import style

---------

Co-authored-by: imbajin <jin@apache.org>
  • Loading branch information
wuchaojing and imbajin authored Mar 14, 2023
1 parent 901da45 commit 82c7e78
Show file tree
Hide file tree
Showing 8 changed files with 469 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,6 @@
import java.util.concurrent.ThreadPoolExecutor;

import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import org.apache.hugegraph.HugeException;
import org.apache.hugegraph.HugeGraphParams;
import org.apache.hugegraph.backend.cache.Cache;
Expand All @@ -54,11 +39,14 @@
import org.apache.hugegraph.backend.store.BackendMutation;
import org.apache.hugegraph.backend.store.BackendStore;
import org.apache.hugegraph.backend.store.BackendStoreProvider;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.event.EventHub;
Expand All @@ -68,6 +56,19 @@
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.Events;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReadOnlyOption;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;

public final class RaftContext {

Expand Down Expand Up @@ -113,7 +114,7 @@ public RaftContext(HugeGraphParams params) {
HugeConfig config = params.configuration();

/*
* NOTE: `raft.group_peers` option is transfered from ServerConfig
* NOTE: `raft.group_peers` option is transferred from ServerConfig
* to CoreConfig, since it's shared by all graphs.
*/
String groupPeersString = this.config().getString("raft.group_peers");
Expand All @@ -140,6 +141,8 @@ public RaftContext(HugeGraphParams params) {
threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);

CompressStrategyManager.init(config);

this.raftRpcServer = null;
this.endpoint = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -39,11 +44,6 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.CompressUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
import com.google.protobuf.ByteString;

public class StoreSnapshotFile {
Expand Down Expand Up @@ -83,8 +83,7 @@ public void save(SnapshotWriter writer, Closure done,
if (!this.compressing.compareAndSet(false, true)) {
LOG.info("Last compress task doesn't finish, skipped it");
done.run(new Status(RaftError.EBUSY,
"Last compress task doesn't finish, " +
"skipped it"));
"Last compress task doesn't finish, skipped it"));
return;
}

Expand All @@ -104,8 +103,7 @@ public void save(SnapshotWriter writer, Closure done,
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to save snapshot, error is %s",
e.getMessage()));
"Failed to save snapshot, error is %s", e.getMessage()));
}
}

Expand All @@ -120,8 +118,7 @@ public boolean load(SnapshotReader reader) {

try {
for (String snapshotDirTar : snapshotDirTars) {
String snapshotDir = this.decompressSnapshot(reader,
snapshotDirTar);
String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar);
snapshotDirs.add(snapshotDir);
}
} catch (Throwable e) {
Expand All @@ -144,8 +141,7 @@ public boolean load(SnapshotReader reader) {
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
snapshotDirMaps.putAll(store.originStore()
.createSnapshot(SNAPSHOT_DIR));
snapshotDirMaps.putAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
Expand All @@ -157,29 +153,27 @@ private void doSnapshotLoad() {
}
}

private void compressSnapshotDir(SnapshotWriter writer,
Map<String, String> snapshotDirMaps) {
private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
String snapshotDirTar = Paths.get(snapshotDir).getFileName()
.toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar)
.toString();
String snapshotDirTar = Paths.get(snapshotDir).getFileName().toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar).toString();
Checksum checksum = new CRC64();
try {
LOG.info("Prepare to compress dir '{}' to '{}'",
snapshotDir, outputFile);
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
CompressUtil.compressZip(snapshotDir, outputFile, checksum);
String rootDir = Paths.get(snapshotDir).getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
throw new RaftException("Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}

LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Expand All @@ -190,48 +184,48 @@ private void compressSnapshotDir(SnapshotWriter writer,
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
throw new RaftException("Failed to add snapshot file: '%s'",
snapshotDirTar);
throw new RaftException("Failed to add snapshot file: '%s'", snapshotDirTar);
}
}
}

private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar)
throws IOException {
String snapshotDirTar) throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" +
snapshotDirTar);
throw new IOException("Can't find snapshot archive file, path=" + snapshotDirTar);
}

String diskTableKey = meta.getUserMeta().toStringUtf8();
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String snapshotDir = Paths.get(parentPath,
StringUtils.removeEnd(snapshotDirTar, TAR))
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);

Checksum checksum = new CRC64();
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
.toString();
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressUtil.decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar).toString();
try {
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to decompress snapshot, zip=%s", e, archiveFile);
}

if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
"Snapshot checksum error: '%s' != '%s'",
actual, expected);
"Snapshot checksum error: '%s' != '%s'", actual, expected);
}
return snapshotDir;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;

import java.util.zip.Checksum;

public interface CompressStrategy {

void compressZip(String rootDir, String sourceDir, String outputZipFile,
Checksum checksum) throws Throwable;

void decompressZip(String sourceZipFile, String outputDir, Checksum checksum) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;

import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;

public class CompressStrategyManager {

private static byte DEFAULT_STRATEGY = 1;
public static final byte SERIAL_STRATEGY = 1;
public static final byte PARALLEL_STRATEGY = 2;
public static final byte MAX_STRATEGY = 5;
private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];

static {
addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
}

private CompressStrategyManager() {
}

public static void addCompressStrategy(int index, CompressStrategy compressStrategy) {
if (compressStrategies.length <= index) {
CompressStrategy[] newCompressStrategies = new CompressStrategy[index + MAX_STRATEGY];
System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
compressStrategies.length);
compressStrategies = newCompressStrategies;
}
compressStrategies[index] = compressStrategy;
}

public static CompressStrategy getDefault() {
return compressStrategies[DEFAULT_STRATEGY];
}

public static void init(final HugeConfig config) {
if (!config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
return;
}
// add parallel compress strategy
if (compressStrategies[PARALLEL_STRATEGY] == null) {
CompressStrategy compressStrategy = new ParallelCompressStrategy(
config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
CompressStrategyManager.addCompressStrategy(
CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}
}
Loading

0 comments on commit 82c7e78

Please sign in to comment.