Skip to content

Commit

Permalink
Serializing node attribute in discoveryNode only in scenarioes where …
Browse files Browse the repository at this point in the history
…it is required

Signed-off-by: RS146BIJAY <rishavsagar4b1@gmail.com>
  • Loading branch information
RS146BIJAY committed Aug 28, 2024
1 parent b830d68 commit 13054e6
Show file tree
Hide file tree
Showing 28 changed files with 192 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -969,9 +969,13 @@ public <T extends Writeable> void writeOptionalArray(@Nullable T[] array) throws
}

public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable);
}

public <T extends Writeable> void writeOptionalWriteable(final Writer<T> writer, @Nullable T writeable) throws IOException {
if (writeable != null) {
writeBoolean(true);
writeable.writeTo(this);
writer.write(this, writeable);
} else {
writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
shardRouting.writeTo(out);
out.writeOptionalWriteable(currentNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode);
out.writeOptionalWriteable(relocationTargetNode);
out.writeOptionalWriteable(clusterInfo);
shardAllocationDecision.writeTo(out);
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(stateUUID);
metadata.writeTo(out);
routingTable.writeTo(out);
nodes.writeTo(out);
nodes.writeToWithAttribute(out);
blocks.writeTo(out);
// filter out custom states not supported by the other node
int numberOfCustoms = 0;
Expand Down Expand Up @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(toUuid);
out.writeLong(toVersion);
routingTable.writeTo(out);
nodes.writeTo(out);
nodesWriteToWithAttributes(nodes, out);
metadata.writeTo(out);
blocks.writeTo(out);
customs.writeTo(out);
out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager);
}

private void nodesWriteToWithAttributes(Diff<DiscoveryNodes> nodes, StreamOutput out) throws IOException {
DiscoveryNodes part = nodes.apply(null);
if (part != null) {
out.writeBoolean(true);
part.writeToWithAttribute(out);
} else {
out.writeBoolean(false);
}
}

@Override
public ClusterState apply(ClusterState state) {
Builder builder = new Builder(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
out.writeLong(term);
out.writeLong(lastAcceptedTerm);
out.writeLong(lastAcceptedVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(minimumTerm);
out.writeOptionalWriteable(optionalJoin.orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
sourceNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
out.writeLong(term);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ public static boolean isDedicatedSearchNode(Settings settings) {
private final String hostAddress;
private final TransportAddress address;
private final Map<String, String> attributes;
private final Map<String, String> filteredAttributes;
private final Version version;
private final SortedSet<DiscoveryNodeRole> roles;
private static final String ZONE = "zone";

/**
* Creates a new {@link DiscoveryNode}
Expand Down Expand Up @@ -268,6 +270,7 @@ public DiscoveryNode(
this.version = version;
}
this.attributes = Collections.unmodifiableMap(attributes);
this.filteredAttributes = Collections.unmodifiableMap(getFilteredAttributes(attributes));
// verify that no node roles are being provided as attributes
Predicate<Map<String, String>> predicate = (attrs) -> {
boolean success = true;
Expand All @@ -281,6 +284,21 @@ public DiscoveryNode(
this.roles = Collections.unmodifiableSortedSet(new TreeSet<>(roles));
}

private Map<String, String> getFilteredAttributes(Map<String, String> attributes) {
final Map<String, String> filteredAttributes = new HashMap<>();
final Set<String> filteredKeySet = attributes.keySet().stream().filter(key -> key
.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)).collect(Collectors.toSet());
for (String key: filteredKeySet) {
filteredAttributes.put(key, attributes.get(key));
}

if (attributes.containsKey(ZONE)) {
filteredAttributes.put(ZONE, attributes.get(ZONE));
}

return filteredAttributes;
}

/** Creates a DiscoveryNode representing the local node. */
public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) {
Map<String, String> attributes = Node.NODE_ATTRIBUTES.getAsMap(settings);
Expand Down Expand Up @@ -329,6 +347,8 @@ public DiscoveryNode(StreamInput in) throws IOException {
for (int i = 0; i < size; i++) {
this.attributes.put(in.readString(), in.readString());
}

this.filteredAttributes = Collections.unmodifiableMap(getFilteredAttributes(attributes));
int rolesSize = in.readVInt();
final Set<DiscoveryNodeRole> roles = new HashSet<>(rolesSize);
for (int i = 0; i < rolesSize; i++) {
Expand Down Expand Up @@ -358,17 +378,33 @@ public DiscoveryNode(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
writeToUtil(out, false);
} else {
writeToUtil(out, true);
}

}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil(out, true);
}

private void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException {
out.writeString(nodeName);
out.writeString(nodeId);
out.writeString(ephemeralId);
out.writeString(hostName);
out.writeString(hostAddress);
address.writeTo(out);
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
if (includeAllAttributes) {
serializeAttributes(attributes, out);
} else {
// Serialize only remote store and zone (needed for SearchWeightedRouting) attributes if present.

serializeAttributes(filteredAttributes, out);
}

out.writeVInt(roles.size());
for (final DiscoveryNodeRole role : roles) {
final DiscoveryNodeRole compatibleRole = role.getCompatibilityRole(out.getVersion());
Expand All @@ -379,6 +415,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVersion(version);
}

private void serializeAttributes(final Map<String, String> attributes, final StreamOutput out) throws IOException {
out.writeVInt(attributes.size());
for (Map.Entry<String, String> entry : attributes.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
}

/**
* The address that the node can be communicated with.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,14 @@ public String shortSummary() {

@Override
public void writeTo(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeTo(output), out);
}

public void writeToWithAttribute(StreamOutput out) throws IOException {
writeToUtil((output, value) -> value.writeToWithAttribute(output), out);
}

private void writeToUtil(final Writer<DiscoveryNode> writer, StreamOutput out) throws IOException {
if (clusterManagerNodeId == null) {
out.writeBoolean(false);
} else {
Expand All @@ -696,7 +704,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
out.writeVInt(nodes.size());
for (DiscoveryNode node : this) {
node.writeTo(out);
writer.write(out, node);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public List<NodeAllocationResult> getNodeDecisions() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(targetNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode);
if (nodeDecisions != null) {
out.writeBoolean(true);
out.writeList(nodeDecisions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
node.writeTo(out);
node.writeToWithAttribute(out);
out.writeOptionalWriteable(shardStoreInfo);
out.writeOptionalWriteable(canAllocateDecision);
nodeDecision.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput();
return CLUSTER_BLOCKS_FORMAT.serialize((out, blocks) -> blocks.writeTo(out), clusterBlocks, generateBlobFileName(), getCompressor())
.streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return clusterStateCustomsFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput();
return clusterStateCustomsFormat.serialize(
(out, customClusterState) -> customClusterState.writeTo(out),
custom,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,12 @@ public String generateBlobFileName() {

@Override
public InputStream serialize() throws IOException {
return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput();
return customBlobStoreFormat.serialize(
(out, customClusterState) -> customClusterState.writeTo(out),
custom,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput();
return DISCOVERY_NODES_FORMAT.serialize(
(out, discoveryNode) -> discoveryNode.writeToWithAttribute(out),
discoveryNodes,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor())
.streamInput();
return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(
(out, settingsHash) -> settingsHash.writeTo(out),
hashesOfConsistentSettings,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ public String generateBlobFileName() {

@Override
public InputStream serialize() throws IOException {
return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput();
return PINNED_TIMESTAMPS_FORMAT.serialize(
(out, timestamps) -> timestamps.writeTo(out),
pinnedTimestamps,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {

@Override
public InputStream serialize() throws IOException {
return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput();
return INDEX_ROUTING_TABLE_FORMAT.serialize(
(out, routingTable) -> routingTable.writeTo(out),
indexRoutingTable,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() {
@Override
public InputStream serialize() throws IOException {
assert routingTableIncrementalDiff != null;
return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor())
.streamInput();
return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(
(out, routingTableDiff) -> routingTableDiff.writeTo(out),
routingTableIncrementalDiff,
generateBlobFileName(),
getCompressor()
).streamInput();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode());
}
}

transportService.sendRequest(
startRequest.sourceNode(),
actionName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(recoveryId);
shardId.writeTo(out);
out.writeString(targetAllocationId);
sourceNode.writeTo(out);
targetNode.writeTo(out);
sourceNode.writeToWithAttribute(out);
targetNode.writeToWithAttribute(out);
metadataSnapshot.writeTo(out);
out.writeBoolean(primaryRelocation);
out.writeLong(startingSeqNo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.io.stream.Writeable.Writer;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.gateway.CorruptStateException;
Expand Down Expand Up @@ -55,7 +56,7 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput
this.reader = reader;
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
public BytesReference serialize(final Writer<T> writer, T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
Expand All @@ -76,7 +77,7 @@ public void close() throws IOException {
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
// TODO The stream version should be configurable
stream.setVersion(Version.CURRENT);
obj.writeTo(stream);
writer.write(stream, obj);
}
CodecUtil.writeFooter(indexOutput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ public HandshakeResponse(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(discoveryNode);
out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode);
clusterName.writeTo(out);
out.writeVersion(version);
}
Expand Down
Loading

0 comments on commit 13054e6

Please sign in to comment.