Skip to content

Commit

Permalink
Minor fixes to bigquery
Browse files Browse the repository at this point in the history
- Add defaultDataset to QueryJobInfo and QueryRequest that takes a string
- Rename jobComplete to jobCompleted in QueryResult
- Use FileChannel.transferTo in bigquery example to upload file
  • Loading branch information
mziccard committed Jan 21, 2016
1 parent 2e8363f commit e27b5b3
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ public static QueryResultsOption startIndex(long startIndex) {
/**
* Returns an option that sets how long to wait for the query to complete, in milliseconds,
* before returning. Default is 10 seconds. If the timeout passes before the job completes,
* {@link QueryResponse#jobComplete()} will be {@code false}.
* {@link QueryResponse#jobCompleted()} will be {@code false}.
*/
public static QueryResultsOption maxWaitTime(long maxWaitTime) {
checkArgument(maxWaitTime >= 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,10 @@ public com.google.api.services.bigquery.model.QueryResponse call() {
QueryResponse.Builder builder = QueryResponse.builder();
JobId completeJobId = JobId.fromPb(results.getJobReference());
builder.jobId(completeJobId);
builder.jobComplete(results.getJobComplete());
builder.jobCompleted(results.getJobComplete());
List<TableRow> rowsPb = results.getRows();
if (results.getJobComplete()) {
builder.jobComplete(true);
builder.jobCompleted(true);
QueryResult.Builder resultBuilder = transformQueryResults(completeJobId, rowsPb,
results.getPageToken(), options(), ImmutableMap.<BigQueryRpc.Option, Object>of());
resultBuilder.totalBytesProcessed(results.getTotalBytesProcessed());
Expand Down Expand Up @@ -561,7 +561,7 @@ public GetQueryResultsResponse call() {
JobId completeJobId = JobId.fromPb(results.getJobReference());
builder.jobId(completeJobId);
builder.etag(results.getEtag());
builder.jobComplete(results.getJobComplete());
builder.jobCompleted(results.getJobComplete());
List<TableRow> rowsPb = results.getRows();
if (results.getJobComplete()) {
QueryResult.Builder resultBuilder = transformQueryResults(completeJobId, rowsPb,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,14 @@ public Builder defaultDataset(DatasetId defaultDataset) {
return self();
}

/**
* Sets the default dataset. This dataset is used for all unqualified table names used in the
* query.
*/
public Builder defaultDataset(String defaultDataset) {
return defaultDataset(DatasetId.of(defaultDataset));
}

/**
* Sets a priority for the query. If not specified the priority is assumed to be
* {@link Priority#INTERACTIVE}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
* a temporary table that is deleted approximately 24 hours after the query is run. The query is run
* through a BigQuery Job whose identity can be accessed via {@link QueryResponse#jobId()}. If the
* query does not complete within the provided {@link Builder#maxWaitTime(Long)}, the response
* returned by {@link BigQuery#query(QueryRequest)} will have {@link QueryResponse#jobComplete()}
* returned by {@link BigQuery#query(QueryRequest)} will have {@link QueryResponse#jobCompleted()}
* set to {@code false} and {@link QueryResponse#result()} set to {@code null}. To obtain query
* results you can use {@link BigQuery#getQueryResults(JobId, BigQuery.QueryResultsOption...)} until
* {@link QueryResponse#jobComplete()} returns {@code true}.
* {@link QueryResponse#jobCompleted()} returns {@code true}.
*
* <p>Example usage of a query request:
* <pre> {@code
Expand All @@ -43,7 +43,7 @@
* .maxResults(1000L)
* .build();
* QueryResponse response = bigquery.query(request);
* while (!response.jobComplete()) {
* while (!response.jobCompleted()) {
* Thread.sleep(1000);
* response = bigquery.getQueryResults(response.jobId());
* }
Expand Down Expand Up @@ -109,11 +109,18 @@ public Builder defaultDataset(DatasetId defaultDataset) {
return this;
}

/**
* Sets the default dataset to assume for any unqualified table names in the query.
*/
public Builder defaultDataset(String defaultDataset) {
return defaultDataset(DatasetId.of(defaultDataset));
}

/**
* Sets how long to wait for the query to complete, in milliseconds, before the request times
* out and returns. Note that this is only a timeout for the request, not the query. If the
* query takes longer to run than the timeout value, the call returns without any results and
* with the {@link QueryResponse#jobComplete()} set to {@code false}. If not set, a wait time of
* with the {@link QueryResponse#jobCompleted()} set to {@code false}. If not set, a wait time of
* 10000 milliseconds (10 seconds) is used.
*/
public Builder maxWaitTime(Long maxWaitTime) {
Expand Down Expand Up @@ -182,7 +189,7 @@ public DatasetId defaultDataset() {
* Returns how long to wait for the query to complete, in milliseconds, before the request times
* out and returns. Note that this is only a timeout for the request, not the query. If the
* query takes longer to run than the timeout value, the call returns without any results and
* with the {@link QueryResponse#jobComplete()} set to {@code false}. You can call
* with the {@link QueryResponse#jobCompleted()} set to {@code false}. You can call
* {@link BigQuery#getQueryResults(JobId, BigQuery.QueryResultsOption...)} to wait for the query
* to complete and read the results. If not set, a wait time of 10000 milliseconds (10 seconds)
* is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* <p>Example usage of a query response:
* <pre> {@code
* QueryResponse response = bigquery.query(request);
* while (!response.jobComplete()) {
* while (!response.jobCompleted()) {
* Thread.sleep(1000);
* response = bigquery.getQueryResults(response.jobId());
* }
Expand All @@ -56,15 +56,15 @@ public class QueryResponse implements Serializable {
private final QueryResult result;
private final String etag;
private final JobId jobId;
private final boolean jobComplete;
private final boolean jobCompleted;
private final List<BigQueryError> executionErrors;

static final class Builder {

private QueryResult result;
private String etag;
private JobId jobId;
private boolean jobComplete;
private boolean jobCompleted;
private List<BigQueryError> executionErrors;

private Builder() {}
Expand All @@ -84,8 +84,8 @@ Builder jobId(JobId jobId) {
return this;
}

Builder jobComplete(boolean jobComplete) {
this.jobComplete = jobComplete;
Builder jobCompleted(boolean jobCompleted) {
this.jobCompleted = jobCompleted;
return this;
}

Expand All @@ -103,13 +103,13 @@ private QueryResponse(Builder builder) {
this.result = builder.result;
this.etag = builder.etag;
this.jobId = builder.jobId;
this.jobComplete = builder.jobComplete;
this.jobCompleted = builder.jobCompleted;
this.executionErrors = builder.executionErrors != null ? builder.executionErrors
: ImmutableList.<BigQueryError>of();
}

/**
* Returns the result of the query. Returns {@code null} if {@link #jobComplete()} is {@code
* Returns the result of the query. Returns {@code null} if {@link #jobCompleted()} is {@code
* false}.
*/
public QueryResult result() {
Expand Down Expand Up @@ -137,8 +137,8 @@ public JobId jobId() {
* {@link #result()} returns {@code null}. This method can be used to check if query execution
* completed and results are available.
*/
public boolean jobComplete() {
return jobComplete;
public boolean jobCompleted() {
return jobCompleted;
}

/**
Expand All @@ -164,7 +164,7 @@ public String toString() {
.add("result", result)
.add("etag", etag)
.add("jobId", jobId)
.add("jobComplete", jobComplete)
.add("jobCompleted", jobCompleted)
.add("executionErrors", executionErrors)
.toString();
}
Expand All @@ -183,7 +183,7 @@ public boolean equals(Object obj) {
return false;
}
QueryResponse response = (QueryResponse) obj;
return jobComplete == response.jobComplete
return jobCompleted == response.jobCompleted
&& Objects.equals(etag, response.etag)
&& Objects.equals(result, response.result)
&& Objects.equals(jobId, response.jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ public void testQueryRequest() {
assertNull(response.etag());
assertNull(response.result());
assertEquals(queryJob, response.jobId());
assertEquals(false, response.jobComplete());
assertEquals(false, response.jobCompleted());
assertEquals(ImmutableList.<BigQueryError>of(), response.executionErrors());
assertFalse(response.hasErrors());
assertEquals(null, response.result());
Expand All @@ -926,7 +926,7 @@ public void testQueryRequestCompleted() {
QueryResponse response = bigquery.query(QUERY_REQUEST);
assertNull(response.etag());
assertEquals(queryJob, response.jobId());
assertEquals(true, response.jobComplete());
assertEquals(true, response.jobCompleted());
assertEquals(false, response.result().cacheHit());
assertEquals(ImmutableList.<BigQueryError>of(), response.executionErrors());
assertFalse(response.hasErrors());
Expand Down Expand Up @@ -959,7 +959,7 @@ public void testGetQueryResults() {
QueryResponse response = bigquery.getQueryResults(queryJob);
assertEquals("etag", response.etag());
assertEquals(queryJob, response.jobId());
assertEquals(true, response.jobComplete());
assertEquals(true, response.jobCompleted());
assertEquals(false, response.result().cacheHit());
assertEquals(ImmutableList.<BigQueryError>of(), response.executionErrors());
assertFalse(response.hasErrors());
Expand Down Expand Up @@ -993,7 +993,7 @@ public void testGetQueryResultsWithOptions() {
QUERY_RESULTS_OPTION_INDEX, QUERY_RESULTS_OPTION_MAX_RESULTS,
QUERY_RESULTS_OPTION_PAGE_TOKEN);
assertEquals(queryJob, response.jobId());
assertEquals(true, response.jobComplete());
assertEquals(true, response.jobCompleted());
assertEquals(false, response.result().cacheHit());
assertEquals(ImmutableList.<BigQueryError>of(), response.executionErrors());
assertFalse(response.hasErrors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ public void testCreateExternalTable() throws InterruptedException {
.maxResults(1000L)
.build();
QueryResponse response = bigquery.query(request);
while (!response.jobComplete()) {
while (!response.jobCompleted()) {
response = bigquery.getQueryResults(response.jobId());
Thread.sleep(1000);
}
Expand Down Expand Up @@ -382,7 +382,7 @@ public void testCreateViewTable() throws InterruptedException {
.maxResults(1000L)
.build();
QueryResponse response = bigquery.query(request);
while (!response.jobComplete()) {
while (!response.jobCompleted()) {
response = bigquery.getQueryResults(response.jobId());
Thread.sleep(1000);
}
Expand Down Expand Up @@ -627,7 +627,7 @@ public void testQuery() throws InterruptedException {
.maxResults(1000L)
.build();
QueryResponse response = bigquery.query(request);
while (!response.jobComplete()) {
while (!response.jobCompleted()) {
Thread.sleep(1000);
response = bigquery.getQueryResults(response.jobId());
}
Expand Down Expand Up @@ -786,7 +786,7 @@ public void testQueryJob() throws InterruptedException {
assertNull(remoteJob.status().error());

QueryResponse response = bigquery.getQueryResults(remoteJob.jobId());
while (!response.jobComplete()) {
while (!response.jobCompleted()) {
Thread.sleep(1000);
response = bigquery.getQueryResults(response.jobId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public QueryResult nextPage() {
private static final QueryResponse QUERY_RESPONSE = QueryResponse.builder()
.etag(ETAG)
.jobId(JOB_ID)
.jobComplete(JOB_COMPLETE)
.jobCompleted(JOB_COMPLETE)
.executionErrors(ERRORS)
.result(QUERY_RESULT)
.build();
Expand All @@ -74,18 +74,18 @@ public void testBuilder() {
assertEquals(ETAG, QUERY_RESPONSE.etag());
assertEquals(QUERY_RESULT, QUERY_RESPONSE.result());
assertEquals(JOB_ID, QUERY_RESPONSE.jobId());
assertEquals(JOB_COMPLETE, QUERY_RESPONSE.jobComplete());
assertEquals(JOB_COMPLETE, QUERY_RESPONSE.jobCompleted());
assertEquals(ERRORS, QUERY_RESPONSE.executionErrors());
assertTrue(QUERY_RESPONSE.hasErrors());
}

@Test
public void testBuilderIncomplete() {
QueryResponse queryResponse = QueryResponse.builder().jobComplete(false).build();
QueryResponse queryResponse = QueryResponse.builder().jobCompleted(false).build();
assertNull(queryResponse.etag());
assertNull(queryResponse.result());
assertNull(queryResponse.jobId());
assertFalse(queryResponse.jobComplete());
assertFalse(queryResponse.jobCompleted());
assertEquals(ImmutableList.<BigQueryError>of(), queryResponse.executionErrors());
assertFalse(queryResponse.hasErrors());
}
Expand All @@ -100,7 +100,7 @@ private void compareQueryResponse(QueryResponse expected, QueryResponse value) {
assertEquals(expected.etag(), value.etag());
assertEquals(expected.result(), value.result());
assertEquals(expected.jobId(), value.jobId());
assertEquals(expected.jobComplete(), value.jobComplete());
assertEquals(expected.jobCompleted(), value.jobCompleted());
assertEquals(expected.executionErrors(), value.executionErrors());
assertEquals(expected.hasErrors(), value.hasErrors());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public class SerializationTest {
private static final QueryResponse QUERY_RESPONSE = QueryResponse.builder()
.etag(ETAG)
.jobId(JOB_ID)
.jobComplete(true)
.jobCompleted(true)
.result(QUERY_RESULT)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.gcloud.bigquery.ViewInfo;
import com.google.gcloud.spi.BigQueryRpc.Tuple;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.Arrays;
Expand Down Expand Up @@ -99,6 +98,7 @@
*/
public class BigQueryExample {

private static final int CHUNK_SIZE = 8 * 256 * 1024;
private static final Map<String, BigQueryAction> CREATE_ACTIONS = new HashMap<>();
private static final Map<String, BigQueryAction> INFO_ACTIONS = new HashMap<>();
private static final Map<String, BigQueryAction> LIST_ACTIONS = new HashMap<>();
Expand Down Expand Up @@ -627,7 +627,7 @@ private static class QueryAction extends BigQueryAction<QueryRequest> {
void run(BigQuery bigquery, QueryRequest queryRequest) throws Exception {
System.out.println("Running query");
QueryResponse queryResponse = bigquery.query(queryRequest);
while (!queryResponse.jobComplete()) {
while (!queryResponse.jobCompleted()) {
System.out.println("Waiting for query job " + queryResponse.jobId() + " to complete");
Thread.sleep(1000L);
queryResponse = bigquery.getQueryResults(queryResponse.jobId());
Expand Down Expand Up @@ -676,12 +676,12 @@ private static class LoadFileAction extends BigQueryAction<Tuple<LoadConfigurati
void run(BigQuery bigquery, Tuple<LoadConfiguration, String> configuration) throws Exception {
System.out.println("Running insert");
try (FileChannel fileChannel = FileChannel.open(Paths.get(configuration.y()))) {
ByteBuffer buffer = ByteBuffer.allocate(256 * 1024);
WriteChannel writeChannel = bigquery.writer(configuration.x());
while (fileChannel.read(buffer) > 0) {
buffer.flip();
writeChannel.write(buffer);
buffer.clear();
long position = 0;
long written = fileChannel.transferTo(position, CHUNK_SIZE, writeChannel);
while (written > 0) {
position += written;
written = fileChannel.transferTo(position, CHUNK_SIZE, writeChannel);
}
writeChannel.close();
}
Expand Down

0 comments on commit e27b5b3

Please sign in to comment.