Skip to content

Commit

Permalink
commit for manage resumeable signedURL uploads googleapis#2462
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinav-qlogic committed Mar 29, 2019
1 parent b05fadc commit 9fa7323
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,38 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.cloud.BaseWriteChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.spi.v1.StorageRpc;

import java.net.URL;
import java.util.Map;
import java.util.concurrent.Callable;

import static com.google.cloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

/** Write channel implementation to upload Google Cloud Storage blobs. */
class BlobWriteChannel extends BaseWriteChannel<StorageOptions, BlobInfo> {

BlobWriteChannel(StorageOptions options, BlobInfo blob, Map<StorageRpc.Option, ?> optionsMap) {
this(options, blob, open(options, blob, optionsMap));
}

BlobWriteChannel(StorageOptions options,URL signURL) {
this(options,getUploadId(signURL.toString()));
}

BlobWriteChannel(StorageOptions options, BlobInfo blobInfo, String uploadId) {
super(options, blobInfo, uploadId);
}

BlobWriteChannel(StorageOptions options,String uploadId) {
super(options,uploadId);
}

@Override
protected void flushBuffer(final int length, final boolean last) {
try {
Expand Down Expand Up @@ -83,6 +93,25 @@ public String call() {
}
}

private static String getUploadId(
final String signURL) {
try {
final StorageOptions options = StorageOptions.newBuilder().build();
return runWithRetries(
new Callable<String>() {
@Override
public String call() {
return options.getStorageRpcV1().getUploadId(signURL);
}
},
options.getRetrySettings(),
StorageImpl.EXCEPTION_HANDLER,
options.getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
}

static class StateImpl extends BaseWriteChannel.BaseState<StorageOptions, BlobInfo> {

private static final long serialVersionUID = -9028324143780151286L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,28 @@

package com.google.cloud.storage;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.paging.Page;
import com.google.auth.ServiceAccountSigner;
import com.google.auth.ServiceAccountSigner.SigningException;
import com.google.cloud.FieldSelector;
import com.google.cloud.*;
import com.google.cloud.FieldSelector.Helper;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.Service;
import com.google.cloud.Tuple;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;

import java.io.InputStream;
import java.io.Serializable;
import java.net.URL;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* An interface for Google Cloud Storage.
*
Expand Down Expand Up @@ -2068,6 +2057,18 @@ Blob create(
*/
WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options);


/**
* Creates a blob and return a channel for writing its content. By default any md5 and crc32c
* values in the given {@code blobInfo} are ignored unless requested via the {@code
* BlobWriteOption.md5Match} and {@code BlobWriteOption.crc32cMatch} options.
*
*
* @throws StorageException upon failure
*/

WriteChannel writer(URL signURL);

/**
* Generates a signed URL for a blob. If you have a blob that you want to allow access to for a
* fixed amount of time, you can use this method to generate a URL that is only valid within a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,68 +16,46 @@

package com.google.cloud.storage;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.PolicyHelper.convertFromApiPolicy;
import static com.google.cloud.storage.PolicyHelper.convertToApiPolicy;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.DELIMITER;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_GENERATION_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_GENERATION_NOT_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_METAGENERATION_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_METAGENERATION_NOT_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_SOURCE_GENERATION_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_SOURCE_GENERATION_NOT_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_SOURCE_METAGENERATION_MATCH;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.IF_SOURCE_METAGENERATION_NOT_MATCH;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;

import com.google.api.gax.paging.Page;
import com.google.api.services.storage.model.BucketAccessControl;
import com.google.api.services.storage.model.ObjectAccessControl;
import com.google.api.services.storage.model.StorageObject;
import com.google.api.services.storage.model.TestIamPermissionsResponse;
import com.google.auth.ServiceAccountSigner;
import com.google.cloud.BaseService;
import com.google.cloud.BatchResult;
import com.google.cloud.PageImpl;
import com.google.cloud.*;
import com.google.cloud.PageImpl.NextPageFetcher;
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.Tuple;
import com.google.cloud.storage.Acl.Entity;
import com.google.cloud.storage.spi.v1.StorageRpc;
import com.google.cloud.storage.spi.v1.StorageRpc.RewriteResponse;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.*;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.common.net.UrlEscapers;
import com.google.common.primitives.Ints;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static com.google.cloud.RetryHelper.runWithRetries;
import static com.google.cloud.storage.PolicyHelper.convertFromApiPolicy;
import static com.google.cloud.storage.PolicyHelper.convertToApiPolicy;
import static com.google.cloud.storage.spi.v1.StorageRpc.Option.*;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;

final class StorageImpl extends BaseService<StorageOptions> implements Storage {

private static final byte[] EMPTY_BYTE_ARRAY = {};
Expand All @@ -87,6 +65,8 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {
/** Signed URLs are only supported through the GCS XML API endpoint. */
private static final String STORAGE_XML_HOST_NAME = "https://storage.googleapis.com";

private static final Map<StorageRpc.Option, ?> EMPTY_RPC_OPTIONS = ImmutableMap.of();

private static final Function<Tuple<Storage, Boolean>, Boolean> DELETE_FUNCTION =
new Function<Tuple<Storage, Boolean>, Boolean>() {
@Override
Expand Down Expand Up @@ -598,11 +578,18 @@ public BlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
return writer(targetOptions.x(), targetOptions.y());
}

@Override
public BlobWriteChannel writer(URL signURL) {
return new BlobWriteChannel(getOptions(),signURL);
}

private BlobWriteChannel writer(BlobInfo blobInfo, BlobTargetOption... options) {
final Map<StorageRpc.Option, ?> optionsMap = optionMap(blobInfo, options);
return new BlobWriteChannel(getOptions(), blobInfo, optionsMap);
}



@Override
public URL signUrl(BlobInfo blobInfo, long duration, TimeUnit unit, SignUrlOption... options) {
EnumMap<SignUrlOption.Option, Object> optionMap = Maps.newEnumMap(SignUrlOption.Option.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,19 @@

package com.google.cloud.storage.spi.v1;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpHeaders;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.InputStreamContent;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.http.*;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.util.IOUtils;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.api.services.storage.Storage.Objects.Insert;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.BucketAccessControl;
import com.google.api.services.storage.model.Buckets;
import com.google.api.services.storage.model.ComposeRequest;
import com.google.api.services.storage.model.*;
import com.google.api.services.storage.model.ComposeRequest.SourceObjects.ObjectPreconditions;
import com.google.api.services.storage.model.Notification;
import com.google.api.services.storage.model.ObjectAccessControl;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.Policy;
import com.google.api.services.storage.model.ServiceAccount;
import com.google.api.services.storage.model.StorageObject;
import com.google.api.services.storage.model.TestIamPermissionsResponse;
import com.google.cloud.BaseServiceException;
import com.google.cloud.Tuple;
import com.google.cloud.http.CensusHttpModule;
Expand All @@ -67,11 +43,9 @@
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import io.opencensus.common.Scope;
import io.opencensus.trace.AttributeValue;
import io.opencensus.trace.Span;
import io.opencensus.trace.Status;
import io.opencensus.trace.Tracer;
import io.opencensus.trace.Tracing;
import io.opencensus.trace.*;
import org.apache.http.HttpStatus;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -81,7 +55,10 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpStatus;

import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;

public class HttpStorageRpc implements StorageRpc {
public static final String DEFAULT_PROJECTION = "full";
Expand Down Expand Up @@ -822,6 +799,46 @@ public String open(StorageObject object, Map<Option, ?> options) {
}
}

@Override
public String getUploadId(String signURL) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN);
Scope scope = tracer.withSpan(span);
try {
GenericUrl url = new GenericUrl(signURL);
String bytesArrayParameters = "";
byte[] bytesArray = new byte[bytesArrayParameters.length()];
String scheme = url.getScheme();
String host = url.getHost();
int port = url.getPort();
port = port > 0 ? port : url.toURL().getDefaultPort();
String path = "/upload" + url.getRawPath();
url = new GenericUrl(signURL);
url.set("uploadType", "resumable");
HttpRequestFactory requestFactory = storage.getRequestFactory();
HttpRequest httpRequest =
requestFactory.buildPostRequest(url, new ByteArrayContent("",bytesArray, 0, bytesArray.length));
HttpHeaders requestHeaders = httpRequest.getHeaders();
requestHeaders.set(
"X-Upload-Content-Type",
firstNonNull("", "application/octet-stream"));
requestHeaders.set("x-goog-resumable", "start");
HttpResponse response = httpRequest.execute();
if (response.getStatusCode() != 201) {
GoogleJsonError error = new GoogleJsonError();
error.setCode(response.getStatusCode());
error.setMessage(response.getStatusMessage());
throw translate(error);
}
return response.getHeaders().getLocation();
} catch (IOException ex) {
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
throw translate(ex);
} finally {
scope.close();
span.end();
}
}

@Override
public RewriteResponse openRewrite(RewriteRequest rewriteRequest) {
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_OPEN_REWRITE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,13 @@ StorageObject compose(
*/
String open(StorageObject object, Map<Option, ?> options);

/**
* Opens a resumable upload channel for a given storage object.
*
* @throws StorageException upon failure
*/
String getUploadId(String signURL);

/**
* Writes the provided bytes to a storage object at the provided location.
*
Expand Down

0 comments on commit 9fa7323

Please sign in to comment.