Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support parallel compress snapshot #2136

Merged
merged 11 commits into from
Mar 14, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor;
import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.NodeManager;
Expand Down Expand Up @@ -140,6 +141,8 @@ public RaftContext(HugeGraphParams params) {
threads = config.get(CoreOptions.RAFT_BACKEND_THREADS);
this.backendExecutor = this.createBackendExecutor(threads);

CompressStrategyManager.init(config);

this.raftRpcServer = null;
this.endpoint = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager;
import org.slf4j.Logger;

import com.alipay.sofa.jraft.Closure;
Expand All @@ -40,7 +41,6 @@
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
import com.alipay.sofa.jraft.util.CRC64;
import org.apache.hugegraph.testutil.Whitebox;
import org.apache.hugegraph.util.CompressUtil;
import org.apache.hugegraph.util.E;
import org.apache.hugegraph.util.InsertionOrderUtil;
import org.apache.hugegraph.util.Log;
Expand Down Expand Up @@ -172,7 +172,9 @@ private void compressSnapshotDir(SnapshotWriter writer,
LOG.info("Prepare to compress dir '{}' to '{}'",
snapshotDir, outputFile);
long begin = System.currentTimeMillis();
CompressUtil.compressZip(snapshotDir, outputFile, checksum);
String rootDir = Paths.get(snapshotDir).getParent().toString();
String sourceDir = Paths.get(snapshotDir).getFileName().toString();
CompressStrategyManager.getDefault().compressZip(rootDir, sourceDir, outputFile, checksum);
long end = System.currentTimeMillis();
LOG.info("Compressed dir '{}' to '{}', took {} seconds",
snapshotDir, outputFile, (end - begin) / 1000.0F);
Expand Down Expand Up @@ -219,13 +221,19 @@ private String decompressSnapshot(SnapshotReader reader,
Checksum checksum = new CRC64();
String archiveFile = Paths.get(reader.getPath(), snapshotDirTar)
.toString();
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressUtil.decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
try {
LOG.info("Prepare to decompress snapshot zip '{}' to '{}'",
archiveFile, parentPath);
long begin = System.currentTimeMillis();
CompressStrategyManager.getDefault().decompressZip(archiveFile, parentPath, checksum);
long end = System.currentTimeMillis();
LOG.info("Decompress snapshot zip '{}' to '{}', took {} seconds",
archiveFile, parentPath, (end - begin) / 1000.0F);
} catch (Throwable e) {
throw new RaftException(
"Failed to decompress snapshot, zip=%s", e, archiveFile);
}

if (meta.hasChecksum()) {
String expected = meta.getChecksum();
String actual = Long.toHexString(checksum.getValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.hugegraph.backend.store.raft.compress;
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved

import java.util.zip.Checksum;

public interface CompressStrategy {

void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, final Checksum checksum)
throws Throwable;

void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum) throws Throwable;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.apache.hugegraph.backend.store.raft.compress;

import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;

public class CompressStrategyManager {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
private static CompressStrategy[] compressStrategies = new CompressStrategy[5];
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
private static byte DEFAULT_STRATEGY = 1;
public static final byte SERIAL_STRATEGY = 1;
public static final byte PARALLEL_STRATEGY = 2;

static {
addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy());
}

private CompressStrategyManager() {
}

public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
if (compressStrategies.length <= idx) {
final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5];
System.arraycopy(compressStrategies, 0, newCompressStrategies, 0, compressStrategies.length);
compressStrategies = newCompressStrategies;
}
compressStrategies[idx] = compressStrategy;
}

public static CompressStrategy getDefault() {
return compressStrategies[DEFAULT_STRATEGY];
}

public static void init(final HugeConfig config) {
if (config.get(CoreOptions.RAFT_SNAPSHOT_PARALLEL_COMPRESS)) {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
// add parallel compress strategy
if (compressStrategies[PARALLEL_STRATEGY] == null) {
final CompressStrategy compressStrategy = new ParallelCompressStrategy(
config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can align with "CompressStrategy" or "compressStrategy"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can align with "CompressStrategy" or "compressStrategy"

It's better not to use the manually align, it will break the auto-format style and greatly raise the threshold of contributing code (Otherwise we could define a automatic rule for it)

image

config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS));
CompressStrategyManager.addCompressStrategy(CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy);
DEFAULT_STRATEGY = PARALLEL_STRATEGY;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package org.apache.hugegraph.backend.store.raft.compress;

import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.google.common.collect.Lists;
import org.apache.commons.compress.archivers.zip.*;
import org.apache.commons.compress.parallel.InputStreamSupplier;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.NullInputStream;
import org.apache.hugegraph.config.CoreOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Paths;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.*;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import java.util.zip.ZipEntry;

public class ParallelCompressStrategy implements CompressStrategy {
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class);

public static final int QUEUE_SIZE = CoreOptions.CPUS;
public static final long KEEP_ALIVE_SECOND = 300L;

private final int compressThreads;
private final int decompressThreads;

public ParallelCompressStrategy(final int compressThreads, final int decompressThreads) {
this.compressThreads = compressThreads;
this.decompressThreads = decompressThreads;
}

/**
* Parallel output streams controller
*/
private static class ZipArchiveScatterOutputStream {

private final ParallelScatterZipCreator creator;

public ZipArchiveScatterOutputStream(final ExecutorService executorService) {
this.creator = new ParallelScatterZipCreator(executorService);
}

public void addEntry(final ZipArchiveEntry entry, final InputStreamSupplier supplier) {
creator.addArchiveEntry(entry, supplier);
}

public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception {
creator.writeTo(archiveOutput);
}

}

@Override
public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum) throws Throwable {
final File rootFile = new File(Paths.get(rootDir, sourceDir).toString());
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
final File zipFile = new File(outputZipFile);
LOG.info("Start to compress snapshot in parallel mode");
FileUtils.forceMkdir(zipFile.getParentFile());

final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads,
"raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor);
compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED);

try (final FileOutputStream fos = new FileOutputStream(zipFile);
final BufferedOutputStream bos = new BufferedOutputStream(fos);
final CheckedOutputStream cos = new CheckedOutputStream(bos, checksum);
final ZipArchiveOutputStream archiveOutputStream = new ZipArchiveOutputStream(cos)) {
scatterOutput.writeTo(archiveOutputStream);
archiveOutputStream.flush();
fos.getFD().sync();
}

ExecutorServiceHelper.shutdownAndAwaitTermination(compressExecutor);
}

@Override
public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
throws Throwable {
LOG.info("Start to decompress snapshot in parallel mode");
final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads,
"raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy());
// compute the checksum in a single thread
final Future<Boolean> checksumFuture = decompressExecutor.submit(() -> {
computeZipFileChecksumValue(sourceZipFile, checksum);
return true;
});

try (final ZipFile zipFile = new ZipFile(sourceZipFile)) {
final List<Future<Boolean>> futures = Lists.newArrayList();
for (final Enumeration<ZipArchiveEntry> e = zipFile.getEntries(); e.hasMoreElements(); ) {
final ZipArchiveEntry zipEntry = e.nextElement();
final Future<Boolean> future = decompressExecutor.submit(() -> {
unZipFile(zipFile, zipEntry, outputDir);
return true;
});
futures.add(future);
}
// blocking and caching exception
for (final Future<Boolean> future : futures) {
future.get();
}
}
// wait for checksum to be calculated
checksumFuture.get();
ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor);
}

private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput,
final String sourceDir, final int method) {
if (dir == null) {
return;
}
if (dir.isFile()) {
addEntry(sourceDir, dir, scatterOutput, method);
return;
}
final File[] files = Requires.requireNonNull(dir.listFiles(), "files");
for (final File file : files) {
final String child = Paths.get(sourceDir, file.getName()).toString();
if (file.isDirectory()) {
compressDirectoryToZipFile(file, scatterOutput, child, method);
} else {
addEntry(child, file, scatterOutput, method);
}
}
}

/**
* Add archive entry to the scatterOutputStream
*/
private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream,
final int method) {
final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath);
archiveEntry.setMethod(method);
scatterOutputStream.addEntry(archiveEntry, () -> {
try {
return file.isDirectory() ? new NullInputStream(0) :
new BufferedInputStream(new FileInputStream(file));
wuchaojing marked this conversation as resolved.
Show resolved Hide resolved
} catch (final FileNotFoundException e) {
LOG.error("Can't find file, path={}, {}", file.getPath(), e);
}
return new NullInputStream(0);
});
}

/**
* Unzip the archive entry to targetDir
*/
private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir)
throws Exception {
final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString());
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Show resolved Hide resolved
FileUtils.forceMkdir(targetFile.getParentFile());
try (final InputStream is = zipFile.getInputStream(entry);
final BufferedInputStream fis = new BufferedInputStream(is);
final BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))) {
IOUtils.copy(fis, bos);
}
}

/**
* Compute the value of checksum
*/
private void computeZipFileChecksumValue(final String zipPath, final Checksum checksum) throws Exception {
try (final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(zipPath));
final CheckedInputStream cis = new CheckedInputStream(bis, checksum);
final ZipArchiveInputStream zis = new ZipArchiveInputStream(cis)) {
// checksum is calculated in the process
while ((zis.getNextZipEntry()) != null)
;
}
}

private static ExecutorService newFixedPool(int coreThreads, int maxThreads,
String name,
RejectedExecutionHandler handler) {
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(false)
.coreThreads(coreThreads)
.maximumThreads(maxThreads)
.keepAliveSeconds(KEEP_ALIVE_SECOND)
.workQueue(queue)
.threadFactory(new NamedThreadFactory(name, true))
.rejectedHandler(handler)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.hugegraph.backend.store.raft.compress;

import org.apache.hugegraph.util.CompressUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.zip.Checksum;

public class SerialCompressStrategy implements CompressStrategy {
private static final Logger LOG = LoggerFactory.getLogger(SerialCompressStrategy.class);

@Override
public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile,
final Checksum checksum) throws Throwable {
LOG.info("Start to compress snapshot in serial strategy");
CompressUtil.compressZip(rootDir, sourceDir, outputZipFile, checksum);
}

@Override
public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum)
throws Throwable {
LOG.info("Start to decompress snapshot in serial strategy");
CompressUtil.decompressZip(sourceZipFile, outputDir, checksum);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,30 @@ public static synchronized CoreOptions instance() {
4
);

public static final ConfigOption<Boolean> RAFT_SNAPSHOT_PARALLEL_COMPRESS =
new ConfigOption<>(
"raft.snapshot_parallel_compress",
"Whether to enable parallel compress.",
disallowEmpty(),
false
);

public static final ConfigOption<Integer> RAFT_SNAPSHOT_COMPRESS_THREADS =
new ConfigOption<>(
"raft.snapshot_compress_threads",
"The thread number used to do snapshot compress.",
rangeInt(0, Integer.MAX_VALUE),
4
);

public static final ConfigOption<Integer> RAFT_SNAPSHOT_DECOMPRESS_THREADS =
new ConfigOption<>(
"raft.snapshot_decompress_threads",
"The thread number used to do snapshot decompress.",
rangeInt(0, Integer.MAX_VALUE),
4
);

public static final ConfigOption<Integer> RAFT_BACKEND_THREADS =
new ConfigOption<>(
"raft.backend_threads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ raft.snapshot_interval=3600
raft.backend_threads=48
raft.read_index_threads=8
raft.snapshot_threads=4
raft.snapshot_parallel_compress=false
raft.snapshot_compress_threads=4
raft.snapshot_decompress_threads=4
raft.read_strategy=ReadOnlyLeaseBased
raft.queue_size=16384
raft.queue_publish_timeout=60
Expand Down