Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support parallel compress snapshot #2136

Merged
merged 11 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
Expand Down Expand Up @@ -140,6 +141,8 @@ public RaftContext(HugeGraphParams params) {
threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);

CompressStrategyManager.init(config);

this.raftRpcServer = null;
this.endpoint = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -39,11 +40,12 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;

import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.CompressUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;

import com.google.protobuf.ByteString;

public class StoreSnapshotFile {
Expand Down Expand Up @@ -83,8 +85,7 @@ public void save(SnapshotWriter writer, Closure done,
if (!this.compressing.compareAndSet(false, true)) {
LOG.info("Last compress task doesn't finish, skipped it");
done.run(new Status(RaftError.EBUSY,
"Last compress task doesn't finish, " +
"skipped it"));
"Last compress task doesn't finish, skipped it"));
return;
}

Expand All @@ -104,8 +105,7 @@ public void save(SnapshotWriter writer, Closure done,
} catch (Throwable e) {
LOG.error("Failed to save snapshot", e);
done.run(new Status(RaftError.EIO,
"Failed to save snapshot, error is %s",
e.getMessage()));
"Failed to save snapshot, error is %s", e.getMessage()));
}
}

Expand All @@ -120,8 +120,7 @@ public boolean load(SnapshotReader reader) {

try {
for (String snapshotDirTar : snapshotDirTars) {
String snapshotDir = this.decompressSnapshot(reader,
snapshotDirTar);
String snapshotDir = this.decompressSnapshot(reader, snapshotDirTar);
snapshotDirs.add(snapshotDir);
}
} catch (Throwable e) {
Expand All @@ -144,8 +143,7 @@ public boolean load(SnapshotReader reader) {
private Map<String, String> doSnapshotSave() {
Map<String, String> snapshotDirMaps = InsertionOrderUtil.newMap();
for (RaftBackendStore store : this.stores) {
snapshotDirMaps.putAll(store.originStore()
.createSnapshot(SNAPSHOT_DIR));
snapshotDirMaps.putAll(store.originStore().createSnapshot(SNAPSHOT_DIR));
}
LOG.info("Saved all snapshots: {}", snapshotDirMaps);
return snapshotDirMaps;
Expand All @@ -157,29 +155,27 @@ private void doSnapshotLoad() {
}
}

private void compressSnapshotDir(SnapshotWriter writer,
Map<String, String> snapshotDirMaps) {
private void compressSnapshotDir(SnapshotWriter writer, Map<String, String> snapshotDirMaps) {
String writerPath = writer.getPath();
for (Map.Entry<String, String> entry : snapshotDirMaps.entrySet()) {
String snapshotDir = entry.getKey();
String diskTableKey = entry.getValue();
String snapshotDirTar = Paths.get(snapshotDir).getFileName()
.toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar)
.toString();
String snapshotDirTar = Paths.get(snapshotDir).getFileName().toString() + TAR;
String outputFile = Paths.get(writerPath, snapshotDirTar).toString();
Checksum checksum = new CRC64();
try {
LOG.info("Prepare to compress dir '{}' to '{}'",
snapshotDir, outputFile);
LOG.info("Prepare to compress dir '{}' to '{}'", snapshotDir, outputFile);
long begin = System.currentTimeMillis();
CompressUtil.compressZip(snapshotDir, outputFile, checksum);
String rootDir = Paths.get(snapshotDir).getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault()
.compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
throw new RaftException("Failed to compress snapshot, path=%s, files=%s",
e, writerPath, snapshotDirMaps.keySet());
}

LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder();
Expand All @@ -190,48 +186,48 @@ private void compressSnapshotDir(SnapshotWriter writer,
*/
metaBuilder.setUserMeta(ByteString.copyFromUtf8(diskTableKey));
if (!writer.addFile(snapshotDirTar, metaBuilder.build())) {
throw new RaftException("Failed to add snapshot file: '%s'",
snapshotDirTar);
throw new RaftException("Failed to add snapshot file: '%s'", snapshotDirTar);
}
}
}

private String decompressSnapshot(SnapshotReader reader,
String snapshotDirTar)
throws IOException {
String snapshotDirTar) throws IOException {
LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar);
if (meta == null) {
throw new IOException("Can't find snapshot archive file, path=" +
snapshotDirTar);
throw new IOException("Can't find snapshot archive file, path=" + snapshotDirTar);
}

String diskTableKey = meta.getUserMeta().toStringUtf8();
E.checkArgument(this.dataDisks.containsKey(diskTableKey),
"The data path for '%s' should be exist", diskTableKey);
String dataPath = this.dataDisks.get(diskTableKey);
String parentPath = Paths.get(dataPath).getParent().toString();
String snapshotDir = Paths.get(parentPath,
StringUtils.removeEnd(snapshotDirTar, TAR))
String snapshotDir = Paths.get(parentPath, StringUtils.removeEnd(snapshotDirTar, TAR))
.toString();
FileUtils.deleteDirectory(new File(snapshotDir));
LOG.info("Delete stale snapshot dir {}", snapshotDir);

Checksum checksum = new CRC64();
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
.toString();
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressUtil.decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar).toString();
try {
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to decompress snapshot, zip=%s", e, archiveFile);
}

if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
E.checkArgument(expected.equals(actual),
"Snapshot checksum error: '%s' != '%s'",
actual, expected);
"Snapshot checksum error: '%s' != '%s'", actual, expected);
}
return snapshotDir;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved

import java.util.zip.Checksum;

public interface CompressStrategy {

void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum) throws Throwable;

void decompressZip(final String sourceZipFile, final String outputDir,
final Checksum checksum) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package org.apache.hugegraph.backend.store.raft.compress;

import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;

public class CompressStrategyManager {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved

private static byte DEFAULT_STRATEGY = 1;
public static final byte SERIAL_STRATEGY = 1;
public static final byte PARALLEL_STRATEGY = 2;
public static final byte MAX_STRATEGY = 5;
private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY];

static {
addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
}

private CompressStrategyManager() {
}

public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
if (compressStrategies.length <= idx) {
final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
System.arraycopy(compressStrategies, 0, newCompressStrategies, 0,
compressStrategies.length);
compressStrategies = newCompressStrategies;
}
compressStrategies[idx] = compressStrategy;
}

public static CompressStrategy getDefault() {
return compressStrategies[DEFAULT_STRATEGY];
}

public static void init(final HugeConfig config) {
if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
// add parallel compress strategy
if (compressStrategies[PARALLEL_STRATEGY] == null) {
final CompressStrategy compressStrategy = new ParallelCompressStrategy(
config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can align with "CompressStrategy" or "compressStrategy"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can align with "CompressStrategy" or "compressStrategy"

It's better not to use the manually align, it will break the auto-format style and greatly raise the threshold of contributing code (Otherwise we could define a automatic rule for it)

image

config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
CompressStrategyManager.addCompressStrategy(
CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

align

DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}
}
}
Loading