Skip to content

Commit

Permalink
Suppress DEADLINE_EXCEEDED on download progress
Browse files Browse the repository at this point in the history
Prevent DEADLINE_EXCEEDED from contributing to retry counter when it
is making progress, and preserve progress in between retry attempts on a
single file.

Closes #5230.

PiperOrigin-RevId: 239363567
  • Loading branch information
George Gensure authored and copybara-github committed Mar 20, 2019
1 parent 973542e commit 9813c58
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.hash.HashingOutputStream;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -47,7 +46,6 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.Symlinks;
import io.grpc.Context;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -74,12 +72,10 @@ public abstract class AbstractRemoteActionCache implements AutoCloseable {

protected final RemoteOptions options;
protected final DigestUtil digestUtil;
private final Retrier retrier;

public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil, Retrier retrier) {
public AbstractRemoteActionCache(RemoteOptions options, DigestUtil digestUtil) {
this.options = options;
this.digestUtil = digestUtil;
this.retrier = retrier;
}

/**
Expand Down Expand Up @@ -154,23 +150,19 @@ public void onFailure(Throwable t) {
*/
public void download(ActionResult result, Path execRoot, FileOutErr outErr)
throws ExecException, IOException, InterruptedException {
Context ctx = Context.current();
List<FuturePathBooleanTuple> fileDownloads =
Collections.synchronizedList(
new ArrayList<>(result.getOutputFilesCount() + result.getOutputDirectoriesCount()));
for (OutputFile file : result.getOutputFilesList()) {
Path path = execRoot.getRelative(file.getPath());
ListenableFuture<Void> download =
retrier.executeAsync(
() -> ctx.call(() -> downloadFile(path, file.getDigest())));
ListenableFuture<Void> download = downloadFile(path, file.getDigest());
fileDownloads.add(new FuturePathBooleanTuple(download, path, file.getIsExecutable()));
}

List<ListenableFuture<Void>> dirDownloads = new ArrayList<>(result.getOutputDirectoriesCount());
for (OutputDirectory dir : result.getOutputDirectoriesList()) {
SettableFuture<Void> dirDownload = SettableFuture.create();
ListenableFuture<byte[]> protoDownload =
retrier.executeAsync(() -> ctx.call(() -> downloadBlob(dir.getTreeDigest())));
ListenableFuture<byte[]> protoDownload = downloadBlob(dir.getTreeDigest());
Futures.addCallback(
protoDownload,
new FutureCallback<byte[]>() {
Expand All @@ -183,7 +175,7 @@ public void onSuccess(byte[] b) {
childrenMap.put(digestUtil.compute(child), child);
}
Path path = execRoot.getRelative(dir.getPath());
fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap, ctx));
fileDownloads.addAll(downloadDirectory(path, tree.getRoot(), childrenMap));
dirDownload.set(null);
} catch (IOException e) {
dirDownload.setException(e);
Expand All @@ -206,7 +198,7 @@ public void onFailure(Throwable t) {

IOException downloadException = null;
try {
fileDownloads.addAll(downloadOutErr(result, outErr, ctx));
fileDownloads.addAll(downloadOutErr(result, outErr));
} catch (IOException e) {
downloadException = e;
}
Expand Down Expand Up @@ -333,8 +325,7 @@ public boolean isExecutable() {
* digest.
*/
private List<FuturePathBooleanTuple> downloadDirectory(
Path path, Directory dir, Map<Digest, Directory> childrenMap, Context ctx)
throws IOException {
Path path, Directory dir, Map<Digest, Directory> childrenMap) throws IOException {
// Ensure that the directory is created here even though the directory might be empty
path.createDirectoryAndParents();

Expand All @@ -347,10 +338,7 @@ private List<FuturePathBooleanTuple> downloadDirectory(
Path childPath = path.getRelative(child.getName());
downloads.add(
new FuturePathBooleanTuple(
retrier.executeAsync(
() -> ctx.call(() -> downloadFile(childPath, child.getDigest()))),
childPath,
child.getIsExecutable()));
downloadFile(childPath, child.getDigest()), childPath, child.getIsExecutable()));
}

for (DirectoryNode child : dir.getDirectoriesList()) {
Expand All @@ -367,7 +355,7 @@ private List<FuturePathBooleanTuple> downloadDirectory(
+ childDigest
+ "not found");
}
downloads.addAll(downloadDirectory(childPath, childDir, childrenMap, ctx));
downloads.addAll(downloadDirectory(childPath, childDir, childrenMap));
}

return downloads;
Expand Down Expand Up @@ -414,34 +402,24 @@ public void onFailure(Throwable t) {
return outerF;
}

private List<FuturePathBooleanTuple> downloadOutErr(
ActionResult result, FileOutErr outErr, Context ctx) throws IOException {
private List<FuturePathBooleanTuple> downloadOutErr(ActionResult result, FileOutErr outErr)
throws IOException {
List<FuturePathBooleanTuple> downloads = new ArrayList<>();
if (!result.getStdoutRaw().isEmpty()) {
result.getStdoutRaw().writeTo(outErr.getOutputStream());
outErr.getOutputStream().flush();
} else if (result.hasStdoutDigest()) {
downloads.add(
new FuturePathBooleanTuple(
retrier.executeAsync(
() ->
ctx.call(
() -> downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()))),
null,
false));
downloadBlob(result.getStdoutDigest(), outErr.getOutputStream()), null, false));
}
if (!result.getStderrRaw().isEmpty()) {
result.getStderrRaw().writeTo(outErr.getErrorStream());
outErr.getErrorStream().flush();
} else if (result.hasStderrDigest()) {
downloads.add(
new FuturePathBooleanTuple(
retrier.executeAsync(
() ->
ctx.call(
() -> downloadBlob(result.getStderrDigest(), outErr.getErrorStream()))),
null,
false));
downloadBlob(result.getStderrDigest(), outErr.getErrorStream()), null, false));
}
return downloads;
}
Expand Down Expand Up @@ -675,13 +653,11 @@ private void illegalOutput(Path what) throws ExecException, IOException {
}
}

protected void verifyContents(Digest expected, HashingOutputStream actual) throws IOException {
String expectedHash = expected.getHash();
String actualHash = DigestUtil.hashCodeToString(actual.hash());
protected void verifyContents(String expectedHash, String actualHash) throws IOException {
if (!expectedHash.equals(actualHash)) {
String msg =
String.format(
"Download an output failed, because the expected hash"
"An output download failed, because the expected hash"
+ "'%s' did not match the received hash '%s'.",
expectedHash, actualHash);
throw new IOException(msg);
Expand Down
137 changes: 120 additions & 17 deletions src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,24 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.hash.HashCode;
import com.google.common.hash.HashingOutputStream;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.Retrier.Backoff;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.DigestUtil.ActionKey;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
import com.google.devtools.build.lib.util.io.FileOutErr;
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import io.grpc.CallCredentials;
import io.grpc.Context;
Expand All @@ -67,6 +72,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/** A RemoteActionCache implementation that uses gRPC calls to a remote cache server. */
Expand All @@ -88,7 +95,7 @@ public GrpcRemoteCache(
RemoteRetrier retrier,
DigestUtil digestUtil,
ByteStreamUploader uploader) {
super(options, digestUtil, retrier);
super(options, digestUtil);
this.credentials = credentials;
this.channel = channel;
this.retrier = retrier;
Expand Down Expand Up @@ -248,50 +255,146 @@ protected ListenableFuture<Void> downloadBlob(Digest digest, OutputStream out) {
}
resourceName += "blobs/" + digestUtil.toString(digest);

@Nullable
HashingOutputStream hashOut =
options.remoteVerifyDownloads ? digestUtil.newHashingOutputStream(out) : null;
@Nullable Supplier<HashCode> hashSupplier = null;
if (options.remoteVerifyDownloads) {
HashingOutputStream hashOut = digestUtil.newHashingOutputStream(out);
hashSupplier = hashOut::hash;
out = hashOut;
}

SettableFuture<Void> outerF = SettableFuture.create();
Futures.addCallback(
downloadBlob(resourceName, digest, out, hashSupplier),
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
outerF.set(null);
}

@Override
public void onFailure(Throwable t) {
outerF.setException(t);
}
},
Context.current().fixedContextExecutor(MoreExecutors.directExecutor()));
return outerF;
}

private ListenableFuture<Void> downloadBlob(
String resourceName,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
Context ctx = Context.current();
AtomicLong offset = new AtomicLong(0);
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
return retrier.executeAsync(
() ->
ctx.call(
() ->
requestRead(
resourceName, offset, progressiveBackoff, digest, out, hashSupplier)));
}

static class ProgressiveBackoff implements Backoff {
private final Supplier<Backoff> backoffSupplier;
private Backoff currentBackoff = null;
private int retries = 0;

/**
* Creates a resettable Backoff for progressive reads. After a reset, the nextDelay returned
* indicates an immediate retry. Initially and after indicating an immediate retry, a delegate
* is generated to provide nextDelay until reset.
*
* @param backoffSupplier Delegate Backoff generator
*/
ProgressiveBackoff(Supplier<Backoff> backoffSupplier) {
this.backoffSupplier = backoffSupplier;
currentBackoff = backoffSupplier.get();
}

public void reset() {
if (currentBackoff != null) {
retries += currentBackoff.getRetryAttempts();
}
currentBackoff = null;
}

@Override
public long nextDelayMillis() {
if (currentBackoff == null) {
currentBackoff = backoffSupplier.get();
retries++;
return 0;
}
return currentBackoff.nextDelayMillis();
}

@Override
public int getRetryAttempts() {
int retryAttempts = retries;
if (currentBackoff != null) {
retryAttempts += currentBackoff.getRetryAttempts();
}
return retryAttempts;
}
}

private ListenableFuture<Void> requestRead(
String resourceName,
AtomicLong offset,
ProgressiveBackoff progressiveBackoff,
Digest digest,
OutputStream out,
@Nullable Supplier<HashCode> hashSupplier) {
SettableFuture<Void> future = SettableFuture.create();
bsAsyncStub()
.read(
ReadRequest.newBuilder().setResourceName(resourceName).build(),
ReadRequest.newBuilder()
.setResourceName(resourceName)
.setReadOffset(offset.get())
.build(),
new StreamObserver<ReadResponse>() {
@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
readResponse.getData().writeTo(hashOut != null ? hashOut : out);
data.writeTo(out);
offset.addAndGet(data.size());
} catch (IOException e) {
outerF.setException(e);
future.setException(e);
// Cancel the call.
throw new RuntimeException(e);
}
// reset the stall backoff because we've made progress or been kept alive
progressiveBackoff.reset();
}

@Override
public void onError(Throwable t) {
if (t instanceof StatusRuntimeException
&& ((StatusRuntimeException) t).getStatus().getCode()
== Status.NOT_FOUND.getCode()) {
outerF.setException(new CacheNotFoundException(digest, digestUtil));
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
future.setException(new CacheNotFoundException(digest, digestUtil));
} else {
outerF.setException(t);
future.setException(t);
}
}

@Override
public void onCompleted() {
try {
if (hashOut != null) {
verifyContents(digest, hashOut);
if (hashSupplier != null) {
verifyContents(
digest.getHash(), DigestUtil.hashCodeToString(hashSupplier.get()));
}
out.flush();
outerF.set(null);
future.set(null);
} catch (IOException e) {
outerF.setException(e);
future.setException(e);
}
}
});
return outerF;
return future;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
}

if (enableBlobStoreCache) {
Retrier retrier =
new Retrier(
() -> Retrier.RETRIES_DISABLED,
(e) -> false,
retryScheduler,
Retrier.ALLOW_ALL_CALLS);
executeRetrier = null;
cache =
new SimpleBlobStoreActionCache(
Expand All @@ -257,7 +251,6 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
remoteOptions,
GoogleAuthUtils.newCredentials(authAndTlsOptions),
env.getWorkingDirectory()),
retrier,
digestUtil);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ public Retrier(
this.sleeper = sleeper;
}

ListeningScheduledExecutorService getRetryService() {
return retryService;
}

/**
* Execute a {@link Callable}, retrying execution in case of failure and returning the result in
* case of success.
Expand Down
Loading

0 comments on commit 9813c58

Please sign in to comment.