Skip to content

Commit

Permalink
merge from upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
aozarov committed May 20, 2015
2 parents 86bf292 + 7738e54 commit aed7a71
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 273 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/google/gcloud/ServiceOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public AuthCredentials authCredentials() {
}

public RetryParams retryParams() {
return retryParams;
return retryParams != null ? retryParams : RetryParams.noRetries();
}

public ServiceRpcFactory<R, O> serviceRpcFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class DatastoreServiceOptions extends ServiceOptions<DatastoreRpc, Datast
private final String namespace;
private final boolean force;
private final boolean normalizeDataset;
private transient DatastoreRpc datastoreRpc;

public static class Builder extends
ServiceOptions.Builder<DatastoreRpc, DatastoreServiceOptions, Builder> {
Expand Down Expand Up @@ -178,10 +179,15 @@ public boolean equals(Object obj) {
}

DatastoreRpc datastoreRpc() {
if (datastoreRpc != null) {
return datastoreRpc;
}
if (serviceRpcFactory() != null) {
return serviceRpcFactory().create(this);
datastoreRpc = serviceRpcFactory().create(this);
} else {
datastoreRpc = ServiceRpcProvider.datastore(this);
}
return ServiceRpcProvider.datastore(this);
return datastoreRpc;
}

public static DatastoreServiceOptions defaultInstance() {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/google/gcloud/examples/StorageExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.gcloud.examples;

import com.google.gcloud.RetryParams;
import com.google.gcloud.spi.StorageRpc.Tuple;
import com.google.gcloud.storage.BatchRequest;
import com.google.gcloud.storage.BatchResponse;
Expand Down Expand Up @@ -498,7 +499,8 @@ public static void main(String... args) throws Exception {
printUsage();
return;
}
StorageServiceOptions.Builder optionsBuilder = StorageServiceOptions.builder();
StorageServiceOptions.Builder optionsBuilder =
StorageServiceOptions.builder().retryParams(RetryParams.getDefaultInstance());
StorageAction action;
if (args.length >= 2 && !ACTIONS.containsKey(args[0])) {
optionsBuilder.projectId(args[0]);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/google/gcloud/storage/BatchRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ public boolean equals(Object obj) {
&& Objects.equals(toGet, other.toGet);
}

Map<Blob, Iterable<BlobSourceOption>> toDelete() {
public Map<Blob, Iterable<BlobSourceOption>> toDelete() {
return toDelete;
}

Map<Blob, Iterable<BlobTargetOption>> toUpdate() {
public Map<Blob, Iterable<BlobTargetOption>> toUpdate() {
return toUpdate;
}

Map<Blob, Iterable<BlobSourceOption>> toGet() {
public Map<Blob, Iterable<BlobSourceOption>> toGet() {
return toGet;
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/google/gcloud/storage/BatchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ public static class Result<T extends Serializable> implements Serializable {
private final StorageServiceException exception;


Result(T value) {
public Result(T value) {
this.value = value;
this.exception = null;
}

Result(StorageServiceException exception) {
public Result(StorageServiceException exception) {
this.exception = exception;
this.value = null;
}
Expand Down Expand Up @@ -112,7 +112,7 @@ static <T extends Serializable> Result<T> empty() {
}
}

BatchResponse(List<Result<Boolean>> deleteResult, List<Result<Blob>> updateResult,
public BatchResponse(List<Result<Boolean>> deleteResult, List<Result<Blob>> updateResult,
List<Result<Blob>> getResult) {
this.deleteResult = ImmutableList.copyOf(deleteResult);
this.updateResult = ImmutableList.copyOf(updateResult);
Expand Down
140 changes: 140 additions & 0 deletions src/main/java/com/google/gcloud/storage/BlobReadChannelImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud.storage;

import static com.google.gcloud.RetryHelper.runWithRetries;

import com.google.api.services.storage.model.StorageObject;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.Callable;

/**
* Default implementation for BlobReadChannel.
*/
class BlobReadChannelImpl implements BlobReadChannel {

private static final int MIN_BUFFER_SIZE = 2 * 1024 * 1024;
private static final long serialVersionUID = 4821762590742862669L;

private final StorageServiceOptions serviceOptions;
private final Blob blob;
private final Map<StorageRpc.Option, ?> requestOptions;
private int position;
private boolean isOpen;
private boolean endOfStream;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;
private transient int bufferPos;
private transient byte[] buffer;

BlobReadChannelImpl(StorageServiceOptions serviceOptions, Blob blob,
Map<StorageRpc.Option, ?> requestOptions) {
this.serviceOptions = serviceOptions;
this.blob = blob;
this.requestOptions = requestOptions;
isOpen = true;
initTransients();
}

private void writeObject(ObjectOutputStream out) throws IOException {
if (buffer != null) {
position += bufferPos;
buffer = null;
bufferPos = 0;
endOfStream = false;
}
out.defaultWriteObject();
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
initTransients();
}

private void initTransients() {
storageRpc = serviceOptions.storageRpc();
storageObject = blob.toPb();
}

@Override
public boolean isOpen() {
return isOpen;
}

@Override
public void close() {
if (isOpen) {
buffer = null;
isOpen = false;
}
}

private void validateOpen() throws IOException {
if (!isOpen) {
throw new IOException("stream is closed");
}
}

@Override
public void seek(int position) throws IOException {
validateOpen();
this.position = position;
buffer = null;
bufferPos = 0;
endOfStream = false;
}

@Override
public int read(ByteBuffer byteBuffer) throws IOException {
validateOpen();
if (buffer == null) {
if (endOfStream) {
return -1;
}
final int toRead = Math.max(byteBuffer.remaining(), MIN_BUFFER_SIZE);
buffer = runWithRetries(new Callable<byte[]>() {
@Override
public byte[] call() {
return storageRpc.read(storageObject, requestOptions, position, toRead);
}
}, serviceOptions.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
if (toRead > buffer.length) {
endOfStream = true;
if (buffer.length == 0) {
buffer = null;
return -1;
}
}
}
int toWrite = Math.min(buffer.length - bufferPos, byteBuffer.remaining());
byteBuffer.put(buffer, bufferPos, toWrite);
bufferPos += toWrite;
if (bufferPos >= buffer.length) {
position += buffer.length;
buffer = null;
bufferPos = 0;
}
return toWrite;
}
}
138 changes: 138 additions & 0 deletions src/main/java/com/google/gcloud/storage/BlobWriterChannelImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2015 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.gcloud.storage;

import static com.google.gcloud.RetryHelper.runWithRetries;
import static java.util.concurrent.Executors.callable;

import com.google.api.services.storage.model.StorageObject;
import com.google.gcloud.spi.StorageRpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;

/**
* Default implementation for BlobWriteChannel.
*/
class BlobWriterChannelImpl implements BlobWriteChannel {

private static final long serialVersionUID = 8675286882724938737L;
private static final int CHUNK_SIZE = 256 * 1024;
private static final int MIN_BUFFER_SIZE = 8 * CHUNK_SIZE;

private final StorageServiceOptions options;
private final Blob blob;
private final String uploadId;
private int position;
private byte[] buffer = new byte[0];
private int limit;
private boolean isOpen = true;

private transient StorageRpc storageRpc;
private transient StorageObject storageObject;

public BlobWriterChannelImpl(StorageServiceOptions options, Blob blob,
Map<StorageRpc.Option, ?> optionsMap) {
this.options = options;
this.blob = blob;
initTransients();
uploadId = options.storageRpc().open(storageObject, optionsMap);
}

private void writeObject(ObjectOutputStream out) throws IOException {
if (isOpen) {
flush(true);
}
out.defaultWriteObject();
}

private void flush(boolean compact) {
if (limit >= MIN_BUFFER_SIZE || compact && limit >= CHUNK_SIZE) {
final int length = limit - limit % CHUNK_SIZE;
runWithRetries(callable(new Runnable() {
@Override
public void run() {
storageRpc.write(uploadId, buffer, 0, storageObject, position, length, false);
}
}), options.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
position += length;
limit -= length;
byte[] temp = new byte[compact ? limit : MIN_BUFFER_SIZE];
System.arraycopy(buffer, length, temp, 0, limit);
buffer = temp;
}
}

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
if (isOpen) {
initTransients();
}
}

private void initTransients() {
storageRpc = options.storageRpc();
storageObject = blob.toPb();
}

private void validateOpen() throws IOException {
if (!isOpen) {
throw new IOException("stream is closed");
}
}

@Override
public int write(ByteBuffer byteBuffer) throws IOException {
validateOpen();
int toWrite = byteBuffer.remaining();
int spaceInBuffer = buffer.length - limit;
if (spaceInBuffer >= toWrite) {
byteBuffer.get(buffer, limit, toWrite);
} else {
buffer = Arrays.copyOf(buffer,
Math.max(MIN_BUFFER_SIZE, buffer.length + toWrite - spaceInBuffer));
byteBuffer.get(buffer, limit, toWrite);
}
limit += toWrite;
flush(false);
return toWrite;
}

@Override
public boolean isOpen() {
return isOpen;
}

@Override
public void close() throws IOException {
if (isOpen) {
runWithRetries(callable(new Runnable() {
@Override
public void run() {
storageRpc.write(uploadId, buffer, 0, storageObject, position, limit, true);
}
}), options.retryParams(), StorageServiceImpl.EXCEPTION_HANDLER);
position += buffer.length;
isOpen = false;
buffer = null;
}
}
}
Loading

0 comments on commit aed7a71

Please sign in to comment.