Skip to content

Commit

Permalink
Merge pull request elastic#4 from henningandersen/spacetime_transacti…
Browse files Browse the repository at this point in the history
…ons_prepare

Prepare commit phase transport layer
  • Loading branch information
henningandersen committed Nov 18, 2021
2 parents 007c3b9 + c57595a commit 331873e
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@
import org.elasticsearch.action.admin.indices.validate.query.TransportValidateQueryAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.ShardPrepareCommitAction;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.bulk.TransportShardPrepareCommitAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsAction;
import org.elasticsearch.action.datastreams.ModifyDataStreamsTransportAction;
import org.elasticsearch.action.delete.DeleteAction;
Expand Down Expand Up @@ -601,6 +603,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(TransportShardMultiGetAction.TYPE, TransportShardMultiGetAction.class);
actions.register(BulkAction.INSTANCE, TransportBulkAction.class);
actions.register(TransportShardBulkAction.TYPE, TransportShardBulkAction.class);
actions.register(ShardPrepareCommitAction.INSTANCE, TransportShardPrepareCommitAction.class);
actions.register(SearchAction.INSTANCE, TransportSearchAction.class);
actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
actions.register(OpenPointInTimeAction.INSTANCE, TransportOpenPointInTimeAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,23 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> i
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(BulkShardRequest.class);

private final BulkItemRequest[] items;
private final TxID txID;

public BulkShardRequest(StreamInput in) throws IOException {
super(in);
items = in.readArray(i -> i.readOptionalWriteable(inpt -> new BulkItemRequest(shardId, inpt)), BulkItemRequest[]::new);
this.txID = new TxID(in);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items) {
// todo: remove this constructor.
this(shardId, refreshPolicy, items, null);
}

public BulkShardRequest(ShardId shardId, RefreshPolicy refreshPolicy, BulkItemRequest[] items, TxID txID) {
super(shardId);
this.items = items;
this.txID = txID;
setRefreshPolicy(refreshPolicy);
}

Expand All @@ -64,6 +72,10 @@ public BulkItemRequest[] items() {
return items;
}

public TxID txID() {
return txID;
}

@Override
public String[] indices() {
// A bulk shard request encapsulates items targeted at a specific shard of an index.
Expand Down Expand Up @@ -92,6 +104,7 @@ public void writeTo(StreamOutput out) throws IOException {
o.writeBoolean(false);
}
}, items);
txID.writeTo(out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.ActionType;

public class ShardPrepareCommitAction extends ActionType<ShardPrepareCommitResponse> {

public static final ShardPrepareCommitAction INSTANCE = new ShardPrepareCommitAction();
// todo: should the name be bulk[prepare] style?
public static final String NAME = "indices:data/write/prepare";

private ShardPrepareCommitAction() {
super(NAME, ShardPrepareCommitResponse::new);
}

// todo: transport options?
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;

// todo: make this a per node request.
public class ShardPrepareCommitRequest extends ReplicatedWriteRequest<ShardPrepareCommitRequest> {
private TxID txID;

public ShardPrepareCommitRequest(ShardId shardId, TxID txID) {
super(shardId);
this.txID = txID;
}

public ShardPrepareCommitRequest(StreamInput in) throws IOException {
super(in);
this.txID = new TxID(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
txID.writeTo(out);
}

public TxID txid() {
return txID;
}

@Override
public String toString() {
return "[" + shardId + "," + txID + "]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.bulk;

import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.Map;

public class ShardPrepareCommitResponse extends ReplicationResponse implements WriteResponse {

// initally, we could make do with just a boolean here, but in further iterations, some extra info could be useful.
private final Map<TxID, Boolean> conflicts;

public ShardPrepareCommitResponse(Map<TxID, Boolean> conflicts) {
this.conflicts = conflicts;
}

public ShardPrepareCommitResponse(StreamInput in) throws IOException {
super(in);
this.conflicts = in.readMap(TxID::new, StreamInput::readBoolean);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeMap(conflicts, (o, k) -> k.writeTo(o), StreamOutput::writeBoolean);
}

/**
* the conflict map, the boolean indicates true == this tx won it, false indicates that it lost, i.e., the tx in the map won.
*/
public Map<TxID, Boolean> conflicts() {
return conflicts;
}

@Override
public void setForcedRefresh(boolean forcedRefresh) {
// this does not refresh currently.
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SparseFixedBitSet;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ResourceAlreadyExistsException;
Expand All @@ -28,6 +29,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.ingest.IngestActionForwarder;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
Expand Down Expand Up @@ -63,6 +65,7 @@
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -465,6 +468,8 @@ private final class BulkOperation extends ActionRunnable<BulkResponse> {
private final ClusterStateObserver observer;
private final Map<String, IndexNotFoundException> indicesThatCannotBeCreated;
private final String executorName;
private final TxID txID = TxID.create();
private Map<ShardId, List<BulkItemRequest>> requestsByShard;

BulkOperation(
Task task,
Expand All @@ -478,7 +483,7 @@ private final class BulkOperation extends ActionRunnable<BulkResponse> {
super(listener);
this.task = task;
this.bulkRequest = bulkRequest;
this.listener = listener;
this.listener = commitOrRollbackListener(listener);
this.responses = responses;
this.startTimeNanos = startTimeNanos;
this.indicesThatCannotBeCreated = indicesThatCannotBeCreated;
Expand All @@ -496,7 +501,8 @@ protected void doRun() {
final ConcreteIndices concreteIndices = new ConcreteIndices(clusterState, indexNameExpressionResolver);
Metadata metadata = clusterState.metadata();
// Group the requests by ShardId -> Operations mapping
Map<ShardId, List<BulkItemRequest>> requestsByShard = new HashMap<>();
assert requestsByShard == null;
requestsByShard = new HashMap<>();

for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> docWriteRequest = bulkRequest.requests.get(i);
Expand Down Expand Up @@ -562,7 +568,8 @@ protected void doRun() {
BulkShardRequest bulkShardRequest = new BulkShardRequest(
shardId,
bulkRequest.getRefreshPolicy(),
requests.toArray(new BulkItemRequest[requests.size()])
requests.toArray(new BulkItemRequest[requests.size()]),
txID
);
bulkShardRequest.waitForActiveShards(bulkRequest.waitForActiveShards());
bulkShardRequest.timeout(bulkRequest.timeout());
Expand Down Expand Up @@ -711,6 +718,74 @@ private void addFailure(DocWriteRequest<?> request, int idx, Exception unavailab
// make sure the request gets never processed again
bulkRequest.requests.set(idx, null);
}

private ActionListener<BulkResponse> commitOrRollbackListener(ActionListener<BulkResponse> listener) {
return new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
commitOrRollback(bulkResponse, listener);
}

@Override
public void onFailure(Exception e) {
rollback(e, listener);
}
};
}

private void rollback(Exception e, ActionListener<BulkResponse> listener) {
// todo: actual rollback.
listener.onFailure(e);
}

private void commit(BulkResponse bulkResponse, ActionListener<BulkResponse> listener) {
ActionListener<Void> markAndCommitListener = new ActionListener<Void>() {
@Override
public void onResponse(Void ignored) {
// todo: mark and commit
listener.onResponse(bulkResponse);
}

@Override
public void onFailure(Exception e) {
rollback(e, listener);
}
};
GroupedActionListener<ShardPrepareCommitResponse> prepareCommitResponseListener = new GroupedActionListener<>(
markAndCommitListener.map(this::commitOrFail),
requestsByShard.size()
);
requestsByShard.keySet()
.forEach(
shardId -> client.executeLocally(
ShardPrepareCommitAction.INSTANCE,
new ShardPrepareCommitRequest(shardId, txID),
prepareCommitResponseListener
)
);
}

private void commitOrRollback(BulkResponse bulkResponse, ActionListener<BulkResponse> listener) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
rollback(new ElasticsearchException("failed op"), listener);
return;
}
}
commit(bulkResponse, listener);
}

private Void commitOrFail(Collection<ShardPrepareCommitResponse> responses) {
for (ShardPrepareCommitResponse response : responses) {
for (Map.Entry<TxID, Boolean> conflict : response.conflicts().entrySet()) {
if (conflict.getValue() == false) {
throw new ElasticsearchException("conflicting transaction [{}] on shard", conflict.getKey());
}
}
}

return null;
}
}

void executeBulk(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,12 @@
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

/** Performs shard-level bulk (index, delete or update) operations */
public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
Expand Down Expand Up @@ -172,6 +174,8 @@ public static void performOnPrimary(
ThreadPool threadPool,
String executorName
) {
primary.registerTransaction(request.txID(),
Arrays.stream(request.items()).map(BulkItemRequest::request).map(DocWriteRequest::id).collect(Collectors.toSet()));
new ActionRunnable<>(listener) {

private final Executor executor = threadPool.executor(executorName);
Expand Down
Loading

0 comments on commit 331873e

Please sign in to comment.