Skip to content

Commit

Permalink
[Backport] Add cluster state checksum in manifest #15218 (#15612)
Browse files Browse the repository at this point in the history
* Add cluster state checksum in manifest for remote state and routing table publication

Signed-off-by: Himshikha Gupta <himshikh@amazon.com>
Co-authored-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
himshikha and Bukhtawar committed Sep 4, 2024
1 parent c801270 commit b408ef8
Show file tree
Hide file tree
Showing 37 changed files with 2,032 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add canRemain method to TargetPoolAllocationDecider to move shards from local to remote pool for hot to warm tiering ([#15010](https://github.com/opensearch-project/OpenSearch/pull/15010))
- Add support for pluggable deciders for concurrent search ([#15363](https://github.com/opensearch-project/OpenSearch/pull/15363))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))[SnapshotV2] Snapshot Status API changes (#15409))
- [Remote Publication] Added checksum validation for cluster state behind a cluster setting ([#15218](https://github.com/opensearch-project/OpenSearch/pull/15218))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ public final <K, V> void writeMapOfLists(final Map<K, List<V>> map, final Writer
* @param keyWriter The key writer
* @param valueWriter The value writer
*/
public final <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
public <K, V> void writeMap(final Map<K, V> map, final Writer<K> keyWriter, final Writer<V> valueWriter) throws IOException {
writeVInt(map.size());
for (final Map.Entry<K, V> entry : map.entrySet()) {
keyWriter.write(this, entry.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
)
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, REMOTE_ROUTING_TABLE_REPO)
.put(REMOTE_PUBLICATION_EXPERIMENTAL, true)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, routingTableRepoName)
.put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE)
.put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath)
.put(RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_ENABLED_SETTING.getKey(), true)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ private static class CompleteDiff<T extends Diffable<T>> implements Diff<T> {
this.part = part;
}

@Override
public String toString() {
return "CompleteDiff{" + "part=" + part + '}';
}

/**
* Creates simple diff without changes
*/
Expand Down
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ default boolean isPrivate() {

}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
public static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

public static final String UNKNOWN_UUID = "_na_";

Expand Down Expand Up @@ -839,6 +839,34 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
minimumClusterManagerNodesOnPublishingClusterManager = after.minimumClusterManagerNodesOnPublishingClusterManager;
}

@Override
public String toString() {
return new StringBuilder().append("ClusterStateDiff{toVersion=")
.append(toVersion)
.append(", fromUuid='")
.append(fromUuid)
.append('\'')
.append(", toUuid='")
.append(toUuid)
.append('\'')
.append(", clusterName=")
.append(clusterName)
.append(", routingTable=")
.append(routingTable)
.append(", nodes=")
.append(nodes)
.append(", metadata=")
.append(metadata)
.append(", blocks=")
.append(blocks)
.append(", customs=")
.append(customs)
.append(", minimumClusterManagerNodesOnPublishingClusterManager=")
.append(minimumClusterManagerNodesOnPublishingClusterManager)
.append("}")
.toString();
}

ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
clusterName = new ClusterName(in);
fromUuid = in.readString();
Expand Down
12 changes: 12 additions & 0 deletions server/src/main/java/org/opensearch/cluster/DiffableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,18 @@ public Map<K, T> getUpserts() {
return upserts;
}

@Override
public String toString() {
return new StringBuilder().append("MapDiff{deletes=")
.append(deletes)
.append(", diffs=")
.append(diffs)
.append(", upserts=")
.append(upserts)
.append("}")
.toString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(deletes, (o, v) -> keySerializer.writeKey(v, o));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class ClusterBlock implements Writeable, ToXContentFragment {
public class ClusterBlock implements Writeable, ToXContentFragment, Comparable<ClusterBlock> {

private final int id;
@Nullable
Expand Down Expand Up @@ -217,7 +217,13 @@ public int hashCode() {
return Objects.hash(id, uuid);
}

@Override
public int compareTo(ClusterBlock block) {
return Integer.compare(block.id(), this.id());
}

public boolean isAllowReleaseResources() {
return allowReleaseResources;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -303,6 +304,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeMap(indicesBlocks, StreamOutput::writeString, (o, s) -> writeBlockSet(s, o));
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
writeTo(out);
}

private static void writeBlockSet(Set<ClusterBlock> blocks, StreamOutput out) throws IOException {
out.writeCollection(blocks);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -149,6 +150,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeCollection(votingConfigExclusions);
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
writeTo(out);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field(TERM_PARSE_FIELD.getPreferredName(), term)
Expand Down Expand Up @@ -272,7 +277,7 @@ public CoordinationMetadata build() {
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class VotingConfigExclusion implements Writeable, ToXContentFragment {
public static class VotingConfigExclusion implements Writeable, ToXContentFragment, Comparable<VotingConfigExclusion> {
public static final String MISSING_VALUE_MARKER = "_absent_";
private final String nodeId;
private final String nodeName;
Expand Down Expand Up @@ -361,6 +366,10 @@ public String toString() {
return sb.toString();
}

@Override
public int compareTo(VotingConfigExclusion votingConfigExclusion) {
return votingConfigExclusion.getNodeId().compareTo(this.getNodeId());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;

Expand All @@ -88,6 +89,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;

import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
Expand Down Expand Up @@ -1287,6 +1289,31 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
out.writeString(index.getName()); // uuid will come as part of settings
out.writeLong(version);
out.writeVLong(mappingVersion);
out.writeVLong(settingsVersion);
out.writeVLong(aliasesVersion);
out.writeInt(routingNumShards);
out.writeByte(state.id());
writeSettingsToStream(settings, out);
out.writeVLongArray(primaryTerms);
out.writeMapValues(mappings, (stream, val) -> val.writeTo(stream));
out.writeMapValues(aliases, (stream, val) -> val.writeTo(stream));
out.writeMap(customData, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
out.writeMap(
inSyncAllocationIds,
StreamOutput::writeVInt,
(stream, val) -> DiffableUtils.StringSetValueSerializer.getInstance().write(new TreeSet<>(val), stream)
);
out.writeMapValues(rolloverInfos, (stream, val) -> val.writeTo(stream));
out.writeBoolean(isSystem);
if (out.getVersion().onOrAfter(Version.V_2_17_0)) {
out.writeOptionalWriteable(context);
}
}

public boolean isSystem() {
return isSystem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -257,6 +258,16 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalVInt(version);
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
out.writeString(name);
out.writeInt(order);
out.writeStringCollection(patterns);
Settings.writeSettingsToStream(settings, out);
out.writeMap(mappings, StreamOutput::writeString, (stream, val) -> val.writeTo(stream));
out.writeMapValues(aliases, (stream, val) -> val.writeTo(stream));
out.writeOptionalVInt(version);
}

@Override
public String toString() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ static Custom fromXContent(XContentParser parser, String name) throws IOExceptio

public static final String GLOBAL_STATE_FILE_PREFIX = "global-";

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
public static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);

private final String clusterUUID;
private final boolean clusterUUIDCommitted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -65,6 +66,10 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
out.writeMapValues(templates, (stream, value) -> value.writeVerifiableTo((BufferedChecksumStreamOutput) stream));
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -80,6 +85,11 @@ public int hashCode() {
return templates != null ? templates.hashCode() : 0;
}

@Override
public String toString() {
return "TemplatesMetadata{" + "templates=" + templates + '}';
}

/**
* Builder for the templates metadata
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.translog.BufferedChecksumStreamOutput;
import org.opensearch.node.Node;

import java.io.IOException;
Expand Down Expand Up @@ -397,12 +398,7 @@ public void writeToWithAttribute(StreamOutput out) throws IOException {
}

public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
address.writeTo(out);
writeNodeDetails(out);
if (includeAllAttributes) {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
Expand All @@ -412,7 +408,25 @@ public void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws I
} else {
out.writeVInt(0);
}
writeRolesAndVersion(out);
}

public void writeVerifiableTo(BufferedChecksumStreamOutput out) throws IOException {
writeNodeDetails(out);
out.writeMap(attributes, StreamOutput::writeString, StreamOutput::writeString);
writeRolesAndVersion(out);
}

private void writeNodeDetails(StreamOutput out) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
address.writeTo(out);
}

private void writeRolesAndVersion(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(LegacyESVersion.V_7_3_0)) {
out.writeVInt(roles.size());
for (final DiscoveryNodeRole role : roles) {
Expand Down
Loading

0 comments on commit b408ef8

Please sign in to comment.