From 0314b3fd1ff2fb91963fefbfebf97b2919149e01 Mon Sep 17 00:00:00 2001 From: wuchaojing Date: Fri, 3 Mar 2023 14:36:48 +0800 Subject: [PATCH] improve code style --- .../backend/store/raft/RaftContext.java | 66 +++++----- .../backend/store/raft/StoreSnapshotFile.java | 19 +-- .../store/raft/compress/CompressStrategy.java | 23 +++- .../compress/CompressStrategyManager.java | 27 +++- .../compress/ParallelCompressStrategy.java | 124 ++++++++++++------ .../raft/compress/SerialCompressStrategy.java | 28 +++- 6 files changed, 197 insertions(+), 90 deletions(-) diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java index 5564021d61..f0a2e9eef1 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/RaftContext.java @@ -30,22 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.io.FileUtils; -import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor; -import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder; -import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager; -import org.slf4j.Logger; - -import com.alipay.sofa.jraft.NodeManager; -import com.alipay.sofa.jraft.conf.Configuration; -import com.alipay.sofa.jraft.entity.PeerId; -import com.alipay.sofa.jraft.option.NodeOptions; -import com.alipay.sofa.jraft.option.RaftOptions; -import com.alipay.sofa.jraft.option.ReadOnlyOption; -import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; -import com.alipay.sofa.jraft.rpc.RpcServer; -import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; -import com.alipay.sofa.jraft.util.NamedThreadFactory; -import com.alipay.sofa.jraft.util.ThreadPoolUtil; import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.backend.cache.Cache; @@ -55,11 +39,14 @@ import org.apache.hugegraph.backend.store.BackendMutation; import org.apache.hugegraph.backend.store.BackendStore; import org.apache.hugegraph.backend.store.BackendStoreProvider; +import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager; +import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor; +import org.apache.hugegraph.backend.store.raft.rpc.ListPeersProcessor; import org.apache.hugegraph.backend.store.raft.rpc.RaftRequests.StoreType; +import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor; +import org.apache.hugegraph.backend.store.raft.rpc.RpcForwarder; import org.apache.hugegraph.backend.store.raft.rpc.SetLeaderProcessor; import org.apache.hugegraph.backend.store.raft.rpc.StoreCommandProcessor; -import org.apache.hugegraph.backend.store.raft.rpc.AddPeerProcessor; -import org.apache.hugegraph.backend.store.raft.rpc.RemovePeerProcessor; import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.event.EventHub; @@ -69,6 +56,19 @@ import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.Events; import org.apache.hugegraph.util.Log; +import org.slf4j.Logger; + +import com.alipay.sofa.jraft.NodeManager; +import com.alipay.sofa.jraft.conf.Configuration; +import com.alipay.sofa.jraft.entity.PeerId; +import com.alipay.sofa.jraft.option.NodeOptions; +import com.alipay.sofa.jraft.option.RaftOptions; +import com.alipay.sofa.jraft.option.ReadOnlyOption; +import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory; +import com.alipay.sofa.jraft.rpc.RpcServer; +import com.alipay.sofa.jraft.rpc.impl.BoltRpcServer; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; public final class RaftContext { @@ -233,13 +233,13 @@ public NodeOptions nodeOptions() throws IOException { NodeOptions nodeOptions = new NodeOptions(); nodeOptions.setEnableMetrics(false); nodeOptions.setRpcProcessorThreadPoolSize( - config.get(CoreOptions.RAFT_RPC_THREADS)); + config.get(CoreOptions.RAFT_RPC_THREADS)); nodeOptions.setRpcConnectTimeoutMs( - config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)); + config.get(CoreOptions.RAFT_RPC_CONNECT_TIMEOUT)); nodeOptions.setRpcDefaultTimeout( - 1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT)); + 1000 * config.get(CoreOptions.RAFT_RPC_TIMEOUT)); nodeOptions.setRpcInstallSnapshotTimeout( - 1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT)); + 1000 * config.get(CoreOptions.RAFT_INSTALL_SNAPSHOT_TIMEOUT)); int electionTimeout = config.get(CoreOptions.RAFT_ELECTION_TIMEOUT); nodeOptions.setElectionTimeoutMs(electionTimeout); @@ -269,15 +269,15 @@ public NodeOptions nodeOptions() throws IOException { */ raftOptions.setApplyBatch(config.get(CoreOptions.RAFT_APPLY_BATCH)); raftOptions.setDisruptorBufferSize( - config.get(CoreOptions.RAFT_QUEUE_SIZE)); + config.get(CoreOptions.RAFT_QUEUE_SIZE)); raftOptions.setDisruptorPublishEventWaitTimeoutSecs( - config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT)); + config.get(CoreOptions.RAFT_QUEUE_PUBLISH_TIMEOUT)); raftOptions.setReplicatorPipeline( - config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE)); + config.get(CoreOptions.RAFT_REPLICATOR_PIPELINE)); raftOptions.setOpenStatistics(false); raftOptions.setReadOnlyOptions( - ReadOnlyOption.valueOf( - config.get(CoreOptions.RAFT_READ_STRATEGY))); + ReadOnlyOption.valueOf( + config.get(CoreOptions.RAFT_READ_STRATEGY))); return nodeOptions; } @@ -373,18 +373,18 @@ private HugeConfig config() { @SuppressWarnings("unused") private RpcServer initAndStartRpcServer() { Integer lowWaterMark = this.config().get( - CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); + CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); System.setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf(lowWaterMark)); Integer highWaterMark = this.config().get( - CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK); + CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK); System.setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf(highWaterMark)); PeerId endpoint = this.endpoint(); NodeManager.getInstance().addAddress(endpoint.getEndpoint()); RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer( - endpoint.getEndpoint()); + endpoint.getEndpoint()); LOG.info("Raft-RPC server is started successfully"); return rpcServer; } @@ -392,11 +392,11 @@ private RpcServer initAndStartRpcServer() { private RpcServer wrapRpcServer(com.alipay.remoting.rpc.RpcServer rpcServer) { // TODO: pass ServerOptions instead of CoreOptions, to share by graphs Integer lowWaterMark = this.config().get( - CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); + CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK); System.setProperty("bolt.channel_write_buf_low_water_mark", String.valueOf(lowWaterMark)); Integer highWaterMark = this.config().get( - CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK); + CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK); System.setProperty("bolt.channel_write_buf_high_water_mark", String.valueOf(highWaterMark)); @@ -439,7 +439,7 @@ private ExecutorService createSnapshotExecutor(int coreThreads) { private ExecutorService createBackendExecutor(int threads) { String name = "store-backend-executor"; RejectedExecutionHandler handler = - new ThreadPoolExecutor.CallerRunsPolicy(); + new ThreadPoolExecutor.CallerRunsPolicy(); return newPool(threads, threads, name, handler); } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java index 8a691b4560..6d113715e7 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/StoreSnapshotFile.java @@ -31,6 +31,10 @@ import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hugegraph.backend.store.raft.compress.CompressStrategyManager; +import org.apache.hugegraph.testutil.Whitebox; +import org.apache.hugegraph.util.E; +import org.apache.hugegraph.util.InsertionOrderUtil; +import org.apache.hugegraph.util.Log; import org.slf4j.Logger; import com.alipay.sofa.jraft.Closure; @@ -40,10 +44,6 @@ import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader; import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter; import com.alipay.sofa.jraft.util.CRC64; -import org.apache.hugegraph.testutil.Whitebox; -import org.apache.hugegraph.util.E; -import org.apache.hugegraph.util.InsertionOrderUtil; -import org.apache.hugegraph.util.Log; import com.google.protobuf.ByteString; public class StoreSnapshotFile { @@ -174,14 +174,15 @@ private void compressSnapshotDir(SnapshotWriter writer, long begin = System.currentTimeMillis(); String rootDir = Paths.get(snapshotDir).getParent().toString(); String sourceDir = Paths.get(snapshotDir).getFileName().toString(); - CompressStrategyManager.getDefault().compressZip(rootDir, sourceDir, outputFile, checksum); + CompressStrategyManager.getDefault() + .compressZip(rootDir, sourceDir, outputFile, checksum); long end = System.currentTimeMillis(); LOG.info("Compressed dir '{}' to '{}', took {} seconds", snapshotDir, outputFile, (end - begin) / 1000.0F); } catch (Throwable e) { throw new RaftException( - "Failed to compress snapshot, path=%s, files=%s", - e, writerPath, snapshotDirMaps.keySet()); + "Failed to compress snapshot, path=%s, files=%s", + e, writerPath, snapshotDirMaps.keySet()); } LocalFileMeta.Builder metaBuilder = LocalFileMeta.newBuilder(); @@ -200,7 +201,7 @@ private void compressSnapshotDir(SnapshotWriter writer, private String decompressSnapshot(SnapshotReader reader, String snapshotDirTar) - throws IOException { + throws IOException { LocalFileMeta meta = (LocalFileMeta) reader.getFileMeta(snapshotDirTar); if (meta == null) { throw new IOException("Can't find snapshot archive file, path=" + @@ -231,7 +232,7 @@ private String decompressSnapshot(SnapshotReader reader, archiveFile, parentPath, (end - begin) / 1000.0F); } catch (Throwable e) { throw new RaftException( - "Failed to decompress snapshot, zip=%s", e, archiveFile); + "Failed to decompress snapshot, zip=%s", e, archiveFile); } if (meta.hasChecksum()) { diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java index c075b13cec..abc99e9d71 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategy.java @@ -1,11 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package org.apache.hugegraph.backend.store.raft.compress; import java.util.zip.Checksum; public interface CompressStrategy { - void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, final Checksum checksum) + void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, + final Checksum checksum) throws Throwable; - void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum) throws Throwable; + void decompressZip(final String sourceZipFile, final String outputDir, + final Checksum checksum) throws Throwable; } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java index 1803b132f0..437ed065df 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/CompressStrategyManager.java @@ -1,13 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package org.apache.hugegraph.backend.store.raft.compress; import org.apache.hugegraph.config.CoreOptions; import org.apache.hugegraph.config.HugeConfig; public class CompressStrategyManager { - private static CompressStrategy[] compressStrategies = new CompressStrategy[5]; + private static byte DEFAULT_STRATEGY = 1; public static final byte SERIAL_STRATEGY = 1; public static final byte PARALLEL_STRATEGY = 2; + public static final byte MAX_STRATEGY = 5; + private static CompressStrategy[] compressStrategies = new CompressStrategy[MAX_STRATEGY]; static { addCompressStrategy(SERIAL_STRATEGY, new SerialCompressStrategy()); @@ -19,7 +38,8 @@ private CompressStrategyManager() { public static void addCompressStrategy(final int idx, final CompressStrategy compressStrategy) { if (compressStrategies.length <= idx) { final CompressStrategy[] newCompressStrategies = new CompressStrategy[idx + 5]; - System.arraycopy(compressStrategies, 0, newCompressStrategies, 0, compressStrategies.length); + System.arraycopy(compressStrategies, 0, newCompressStrategies, 0, + compressStrategies.length); compressStrategies = newCompressStrategies; } compressStrategies[idx] = compressStrategy; @@ -36,7 +56,8 @@ public static void init(final HugeConfig config) { final CompressStrategy compressStrategy = new ParallelCompressStrategy( config.get(CoreOptions.RAFT_SNAPSHOT_COMPRESS_THREADS), config.get(CoreOptions.RAFT_SNAPSHOT_DECOMPRESS_THREADS)); - CompressStrategyManager.addCompressStrategy(CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy); + CompressStrategyManager.addCompressStrategy( + CompressStrategyManager.PARALLEL_STRATEGY, compressStrategy); DEFAULT_STRATEGY = PARALLEL_STRATEGY; } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java index 4cfc76e083..d423c86289 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/ParallelCompressStrategy.java @@ -1,11 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package org.apache.hugegraph.backend.store.raft.compress; -import com.alipay.sofa.jraft.util.ExecutorServiceHelper; -import com.alipay.sofa.jraft.util.NamedThreadFactory; -import com.alipay.sofa.jraft.util.Requires; -import com.alipay.sofa.jraft.util.ThreadPoolUtil; -import com.google.common.collect.Lists; -import org.apache.commons.compress.archivers.zip.*; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.file.Paths; +import java.util.Enumeration; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; +import java.util.zip.ZipEntry; + +import org.apache.commons.compress.archivers.zip.ParallelScatterZipCreator; +import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; +import org.apache.commons.compress.archivers.zip.ZipArchiveInputStream; +import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; +import org.apache.commons.compress.archivers.zip.ZipFile; import org.apache.commons.compress.parallel.InputStreamSupplier; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; @@ -14,17 +51,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; -import java.nio.file.Paths; -import java.util.Enumeration; -import java.util.List; -import java.util.concurrent.*; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; -import java.util.zip.Checksum; -import java.util.zip.ZipEntry; +import com.alipay.sofa.jraft.util.ExecutorServiceHelper; +import com.alipay.sofa.jraft.util.NamedThreadFactory; +import com.alipay.sofa.jraft.util.Requires; +import com.alipay.sofa.jraft.util.ThreadPoolUtil; +import com.google.common.collect.Lists; public class ParallelCompressStrategy implements CompressStrategy { + private static final Logger LOG = LoggerFactory.getLogger(ParallelCompressStrategy.class); public static final int QUEUE_SIZE = CoreOptions.CPUS; @@ -60,7 +94,8 @@ public void writeTo(final ZipArchiveOutputStream archiveOutput) throws Exception } @Override - public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, + public void compressZip(final String rootDir, final String sourceDir, + final String outputZipFile, final Checksum checksum) throws Throwable { final File rootFile = new File(Paths.get(rootDir, sourceDir).toString()); final File zipFile = new File(outputZipFile); @@ -68,8 +103,10 @@ public void compressZip(final String rootDir, final String sourceDir, final Stri FileUtils.forceMkdir(zipFile.getParentFile()); final ExecutorService compressExecutor = newFixedPool(compressThreads, compressThreads, - "raft-snapshot-compress-executor", new ThreadPoolExecutor.CallerRunsPolicy()); - final ZipArchiveScatterOutputStream scatterOutput = new ZipArchiveScatterOutputStream(compressExecutor); + "raft-snapshot-compress-executor", + new ThreadPoolExecutor.CallerRunsPolicy()); + final ZipArchiveScatterOutputStream scatterOutput = + new ZipArchiveScatterOutputStream(compressExecutor); compressDirectoryToZipFile(rootFile, scatterOutput, sourceDir, ZipEntry.DEFLATED); try (final FileOutputStream fos = new FileOutputStream(zipFile); @@ -85,11 +122,14 @@ public void compressZip(final String rootDir, final String sourceDir, final Stri } @Override - public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum) + public void decompressZip(final String sourceZipFile, final String outputDir, + final Checksum checksum) throws Throwable { LOG.info("Start to decompress snapshot in parallel mode"); - final ExecutorService decompressExecutor = newFixedPool(decompressThreads, decompressThreads, - "raft-snapshot-decompress-executor", new ThreadPoolExecutor.CallerRunsPolicy()); + final ExecutorService decompressExecutor = + newFixedPool(decompressThreads, decompressThreads, + "raft-snapshot-decompress-executor", + new ThreadPoolExecutor.CallerRunsPolicy()); // compute the checksum in a single thread final Future checksumFuture = decompressExecutor.submit(() -> { computeZipFileChecksumValue(sourceZipFile, checksum); @@ -98,7 +138,8 @@ public void decompressZip(final String sourceZipFile, final String outputDir, fi try (final ZipFile zipFile = new ZipFile(sourceZipFile)) { final List> futures = Lists.newArrayList(); - for (final Enumeration e = zipFile.getEntries(); e.hasMoreElements(); ) { + for (final Enumeration e = zipFile.getEntries(); + e.hasMoreElements(); ) { final ZipArchiveEntry zipEntry = e.nextElement(); final Future future = decompressExecutor.submit(() -> { unZipFile(zipFile, zipEntry, outputDir); @@ -116,7 +157,8 @@ public void decompressZip(final String sourceZipFile, final String outputDir, fi ExecutorServiceHelper.shutdownAndAwaitTermination(decompressExecutor); } - private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterOutputStream scatterOutput, + private void compressDirectoryToZipFile(final File dir, + final ZipArchiveScatterOutputStream scatterOutput, final String sourceDir, final int method) { if (dir == null) { return; @@ -139,14 +181,15 @@ private void compressDirectoryToZipFile(final File dir, final ZipArchiveScatterO /** * Add archive entry to the scatterOutputStream */ - private void addEntry(final String filePath, final File file, final ZipArchiveScatterOutputStream scatterOutputStream, + private void addEntry(final String filePath, final File file, + final ZipArchiveScatterOutputStream scatterOutputStream, final int method) { final ZipArchiveEntry archiveEntry = new ZipArchiveEntry(filePath); archiveEntry.setMethod(method); scatterOutputStream.addEntry(archiveEntry, () -> { try { return file.isDirectory() ? new NullInputStream(0) : - new BufferedInputStream(new FileInputStream(file)); + new BufferedInputStream(new FileInputStream(file)); } catch (final FileNotFoundException e) { LOG.error("Can't find file, path={}, {}", file.getPath(), e); } @@ -157,13 +200,15 @@ private void addEntry(final String filePath, final File file, final ZipArchiveSc /** * Unzip the archive entry to targetDir */ - private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final String targetDir) + private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, + final String targetDir) throws Exception { final File targetFile = new File(Paths.get(targetDir, entry.getName()).toString()); FileUtils.forceMkdir(targetFile.getParentFile()); try (final InputStream is = zipFile.getInputStream(entry); final BufferedInputStream fis = new BufferedInputStream(is); - final BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(targetFile))) { + final BufferedOutputStream bos = new BufferedOutputStream( + new FileOutputStream(targetFile))) { IOUtils.copy(fis, bos); } } @@ -171,7 +216,8 @@ private void unZipFile(final ZipFile zipFile, final ZipArchiveEntry entry, final /** * Compute the value of checksum */ - private void computeZipFileChecksumValue(final String zipPath, final Checksum checksum) throws Exception { + private void computeZipFileChecksumValue(final String zipPath, final Checksum checksum) throws + Exception { try (final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(zipPath)); final CheckedInputStream cis = new CheckedInputStream(bis, checksum); final ZipArchiveInputStream zis = new ZipArchiveInputStream(cis)) { @@ -182,18 +228,18 @@ private void computeZipFileChecksumValue(final String zipPath, final Checksum ch } private static ExecutorService newFixedPool(int coreThreads, int maxThreads, - String name, - RejectedExecutionHandler handler) { + String name, + RejectedExecutionHandler handler) { BlockingQueue queue = new ArrayBlockingQueue<>(QUEUE_SIZE); return ThreadPoolUtil.newBuilder() - .poolName(name) - .enableMetric(false) - .coreThreads(coreThreads) - .maximumThreads(maxThreads) - .keepAliveSeconds(KEEP_ALIVE_SECOND) - .workQueue(queue) - .threadFactory(new NamedThreadFactory(name, true)) - .rejectedHandler(handler) - .build(); + .poolName(name) + .enableMetric(false) + .coreThreads(coreThreads) + .maximumThreads(maxThreads) + .keepAliveSeconds(KEEP_ALIVE_SECOND) + .workQueue(queue) + .threadFactory(new NamedThreadFactory(name, true)) + .rejectedHandler(handler) + .build(); } } diff --git a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java index 233f3ced0c..fb0c283209 100644 --- a/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java +++ b/hugegraph-core/src/main/java/org/apache/hugegraph/backend/store/raft/compress/SerialCompressStrategy.java @@ -1,23 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + package org.apache.hugegraph.backend.store.raft.compress; +import java.util.zip.Checksum; + import org.apache.hugegraph.util.CompressUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.zip.Checksum; - public class SerialCompressStrategy implements CompressStrategy { + private static final Logger LOG = LoggerFactory.getLogger(SerialCompressStrategy.class); @Override - public void compressZip(final String rootDir, final String sourceDir, final String outputZipFile, + public void compressZip(final String rootDir, final String sourceDir, + final String outputZipFile, final Checksum checksum) throws Throwable { LOG.info("Start to compress snapshot in serial strategy"); CompressUtil.compressZip(rootDir, sourceDir, outputZipFile, checksum); } @Override - public void decompressZip(final String sourceZipFile, final String outputDir, final Checksum checksum) + public void decompressZip(final String sourceZipFile, final String outputDir, + final Checksum checksum) throws Throwable { LOG.info("Start to decompress snapshot in serial strategy"); CompressUtil.decompressZip(sourceZipFile, outputDir, checksum);