Skip to content

Commit

Permalink
Enforce 140 char line lengths for packages action.bulk/delete/explain…
Browse files Browse the repository at this point in the history
…/get/index

part of elastic#34884
  • Loading branch information
jakelandis committed Oct 25, 2018
1 parent 06d7431 commit aebec5b
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 52 deletions.
13 changes: 0 additions & 13 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,19 +130,6 @@
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]upgrade[/\\]post[/\\]UpgradeSettingsRequestBuilder.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]validate[/\\]query[/\\]TransportValidateQueryAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]indices[/\\]validate[/\\]query[/\\]ValidateQueryRequestBuilder.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]BackoffPolicy.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]BulkProcessor.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]BulkRequest.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]BulkResponse.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportBulkAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]bulk[/\\]TransportShardBulkAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]delete[/\\]DeleteRequest.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]explain[/\\]TransportExplainAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]get[/\\]GetRequest.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]get[/\\]MultiGetRequest.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]get[/\\]TransportGetAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]get[/\\]TransportShardMultiGetAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]index[/\\]IndexRequest.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]DeletePipelineTransportAction.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]GetPipelineRequestBuilder.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]ingest[/\\]GetPipelineTransportAction.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
* semantics:
*
* <ul>
* <li><code>#hasNext()</code> determines whether the progression has more elements. Return <code>true</code> for infinite progressions</li>
* <li><code>#hasNext()</code> determines whether the progression has more elements. Return <code>true</code> for infinite progressions
* </li>
* <li><code>#next()</code> determines the next element in the progression, i.e. the next wait time period</li>
* </ul>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,13 @@ public void close() {
* Closes the processor. If flushing by time is enabled, then it's shutdown. Any remaining bulk actions are flushed.
* <p>
* If concurrent requests are not enabled, returns {@code true} immediately.
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true},
* If concurrent requests are enabled, waits for up to the specified timeout for all bulk requests to complete then returns {@code true}
* If the specified waiting time elapses before all bulk requests complete, {@code false} is returned.
*
* @param timeout The maximum time to wait for the bulk requests to complete
* @param unit The time unit of the {@code timeout} argument
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests completed
* @return {@code true} if all bulk requests completed and {@code false} if the waiting time elapsed before all the bulk requests
* completed
* @throws InterruptedException If the current thread is interrupted
*/
public synchronized boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
Expand Down Expand Up @@ -298,7 +299,8 @@ public BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nu
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
@Nullable String defaultPipeline, @Nullable Object payload,
XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,20 +369,23 @@ public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Null
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else {
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter [" + currentFieldName + "]");
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
+ currentFieldName + "]");
}
} else if (token == XContentParser.Token.START_ARRAY) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line +
"], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
} else if (token == XContentParser.Token.START_OBJECT && SOURCE.match(currentFieldName,
parser.getDeprecationHandler())) {
fetchSourceContext = FetchSourceContext.fromXContent(parser);
} else if (token != XContentParser.Token.VALUE_NULL) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
throw new IllegalArgumentException("Malformed action/metadata line [" + line
+ "], expected a simple value for field [" + currentFieldName + "] but found [" + token + "]");
}
}
} else if (token != XContentParser.Token.END_OBJECT) {
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected " + XContentParser.Token.START_OBJECT
+ " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]");
throw new IllegalArgumentException("Malformed action/metadata line [" + line + "], expected "
+ XContentParser.Token.START_OBJECT + " or " + XContentParser.Token.END_OBJECT + " but found [" + token + "]");
}

if ("delete".equals(action)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public String buildFailureMessage() {
BulkItemResponse response = responses[i];
if (response.isFailed()) {
sb.append("\n[").append(i)
.append("]: index [").append(response.getIndex()).append("], type [").append(response.getType()).append("], id [").append(response.getId())
.append("]: index [").append(response.getIndex()).append("], type [")
.append(response.getType()).append("], id [").append(response.getId())
.append("], message [").append(response.getFailureMessage()).append("]");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,11 @@ void createIndex(String index, TimeValue timeout, ActionListener<CreateIndexResp
client.admin().indices().create(createIndexRequest, listener);
}

private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request, String index, Exception e) {
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,
String index, Exception e) {
if (index.equals(request.index())) {
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(), request.id(), e)));
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(),
request.id(), e)));
return true;
}
return false;
Expand Down Expand Up @@ -327,19 +329,22 @@ protected void doRun() throws Exception {
indexRequest.process(indexCreated, mappingMd, concreteIndex.getName());
break;
case UPDATE:
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(), (UpdateRequest) docWriteRequest);
TransportUpdateAction.resolveAndValidateRouting(metaData, concreteIndex.getName(),
(UpdateRequest) docWriteRequest);
break;
case DELETE:
docWriteRequest.routing(metaData.resolveWriteIndexRouting(docWriteRequest.routing(), docWriteRequest.index()));
// check if routing is required, if so, throw error if routing wasn't specified
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(), docWriteRequest.type())) {
if (docWriteRequest.routing() == null && metaData.routingRequired(concreteIndex.getName(),
docWriteRequest.type())) {
throw new RoutingMissingException(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id());
}
break;
default: throw new AssertionError("request type not supported: [" + docWriteRequest.opType() + "]");
}
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(), docWriteRequest.id(), e);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
Expand All @@ -355,13 +360,15 @@ protected void doRun() throws Exception {
continue;
}
String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(),
request.routing()).shardId();
List<BulkItemRequest> shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
shardRequests.add(new BulkItemRequest(i, request));
}

if (requestsByShard.isEmpty()) {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
return;
}

Expand Down Expand Up @@ -407,7 +414,8 @@ public void onFailure(Exception e) {
}

private void finishHim() {
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]),
buildTookInMillis(startTimeNanos)));
}
});
}
Expand Down Expand Up @@ -535,7 +543,8 @@ void processBulkIndexIngestRequest(Task task, BulkRequest original, ActionListen
} else {
long ingestTookInMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - ingestStartTimeInNanos);
BulkRequest bulkRequest = bulkRequestModifier.getBulkRequest();
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis, listener);
ActionListener<BulkResponse> actionListener = bulkRequestModifier.wrapActionListenerIfNeeded(ingestTookInMillis,
listener);
if (bulkRequest.requests().isEmpty()) {
// at this stage, the transport bulk action can't deal with a bulk request with no requests,
// so we stop and send an empty response back to the client.
Expand Down Expand Up @@ -628,7 +637,8 @@ void markCurrentItemAsFailed(Exception e) {
// 2) Add a bulk item failure for this request
// 3) Continue with the next request in the bulk.
failedSlots.set(currentSlot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), e);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(currentSlot, indexRequest.opType(), failure));
}

Expand All @@ -641,7 +651,8 @@ static final class IngestBulkResponseListener implements ActionListener<BulkResp
private final List<BulkItemResponse> itemResponses;
private final ActionListener<BulkResponse> actionListener;

IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses, ActionListener<BulkResponse> actionListener) {
IngestBulkResponseListener(long ingestTookInMillis, int[] originalSlots, List<BulkItemResponse> itemResponses,
ActionListener<BulkResponse> actionListener) {
this.ingestTookInMillis = ingestTookInMillis;
this.itemResponses = itemResponses;
this.actionListener = actionListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@
* @see org.elasticsearch.client.Client#delete(DeleteRequest)
* @see org.elasticsearch.client.Requests#deleteRequest(String)
*/
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocWriteRequest<DeleteRequest>, CompositeIndicesRequest {
public class DeleteRequest extends ReplicatedWriteRequest<DeleteRequest> implements DocWriteRequest<DeleteRequest>,
CompositeIndicesRequest {

private String type;
private String id;
Expand Down Expand Up @@ -89,7 +90,8 @@ public ActionRequestValidationException validate() {
validationException = addValidationError("id is missing", validationException);
}
if (!versionType.validateVersionForWrites(version)) {
validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
validationException = addValidationError("illegal version value [" + version + "] for version type ["
+ versionType.name() + "]", validationException);
}
if (versionType == VersionType.FORCE) {
validationException = addValidationError("version type [force] may no longer be used", validationException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
}

@Override
protected void asyncShardOperation(ExplainRequest request, ShardId shardId, ActionListener<ExplainResponse> listener) throws IOException {
protected void asyncShardOperation(ExplainRequest request, ShardId shardId,
ActionListener<ExplainResponse> listener) throws IOException {
IndexService indexService = searchService.getIndicesService().indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
indexShard.awaitShardSearchActive(b -> {
Expand Down Expand Up @@ -129,7 +130,8 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
// Advantage is that we're not opening a second searcher to retrieve the _source. Also
// because we are working in the same searcher in engineGetResult we can be sure that a
// doc isn't deleted between the initial get and this call.
GetResult getResult = context.indexShard().getService().get(result, request.id(), request.type(), request.storedFields(), request.fetchSourceContext());
GetResult getResult = context.indexShard().getService().get(result, request.id(), request.type(), request.storedFields(),
request.fetchSourceContext());
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), true, explanation, getResult);
} else {
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), true, explanation);
Expand All @@ -149,7 +151,8 @@ protected ExplainResponse newResponse() {
@Override
protected ShardIterator shards(ClusterState state, InternalRequest request) {
return clusterService.operationRouting().getShards(
clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(), request.request().preference()
clusterService.state(), request.concreteIndex(), request.request().id(), request.request().routing(),
request.request().preference()
);
}

Expand Down
Loading

0 comments on commit aebec5b

Please sign in to comment.