From b7e9ae86442e121da53e94ebd91c78e29484eb21 Mon Sep 17 00:00:00 2001 From: RS146BIJAY Date: Thu, 22 Aug 2024 01:12:12 +0530 Subject: [PATCH] Serializing node attribute in discoveryNode only in scenarioes where it is required Signed-off-by: RS146BIJAY --- .../core/common/io/stream/StreamOutput.java | 6 ++- .../ClusterAllocationExplanation.java | 2 +- .../org/opensearch/cluster/ClusterState.java | 14 ++++- .../opensearch/cluster/coordination/Join.java | 4 +- .../cluster/coordination/JoinRequest.java | 2 +- .../coordination/StartJoinRequest.java | 2 +- .../cluster/node/DiscoveryNode.java | 52 +++++++++++++++++-- .../cluster/node/DiscoveryNodes.java | 10 +++- .../AbstractAllocationDecision.java | 2 +- .../allocation/NodeAllocationResult.java | 2 +- .../remote/model/RemoteClusterBlocks.java | 3 +- .../model/RemoteClusterStateCustoms.java | 7 ++- .../remote/model/RemoteCustomMetadata.java | 7 ++- .../remote/model/RemoteDiscoveryNodes.java | 7 ++- .../RemoteHashesOfConsistentSettings.java | 8 ++- .../remote/model/RemotePinnedTimestamps.java | 7 ++- .../routingtable/RemoteIndexRoutingTable.java | 7 ++- .../routingtable/RemoteRoutingTableDiff.java | 8 ++- .../recovery/PeerRecoveryTargetService.java | 1 + .../recovery/StartRecoveryRequest.java | 4 +- .../ChecksumWritableBlobStoreFormat.java | 7 ++- .../transport/TransportService.java | 2 +- .../RemoteRoutingTableServiceTests.java | 8 ++- ...oteClusterStateAttributesManagerTests.java | 16 ++++-- .../RemoteClusterStateServiceTests.java | 23 ++++++-- .../RemoteGlobalMetadataManagerTests.java | 14 ++++- .../model/RemoteDiscoveryNodesTests.java | 2 +- .../ChecksumWritableBlobStoreFormatTests.java | 8 ++- 28 files changed, 195 insertions(+), 40 deletions(-) diff --git a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java index b7599265aece3..1d0876d79a549 100644 --- a/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java +++ b/libs/core/src/main/java/org/opensearch/core/common/io/stream/StreamOutput.java @@ -969,9 +969,13 @@ public void writeOptionalArray(@Nullable T[] array) throws } public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException { + writeOptionalWriteable((out, writable) -> writable.writeTo(out), writeable); + } + + public void writeOptionalWriteable(final Writer writer, @Nullable T writeable) throws IOException { if (writeable != null) { writeBoolean(true); - writeable.writeTo(this); + writer.write(this, writeable); } else { writeBoolean(false); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java index 3c8f07613561d..70a223d60069a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java @@ -95,7 +95,7 @@ public ClusterAllocationExplanation(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { shardRouting.writeTo(out); - out.writeOptionalWriteable(currentNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), currentNode); out.writeOptionalWriteable(relocationTargetNode); out.writeOptionalWriteable(clusterInfo); shardAllocationDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterState.java b/server/src/main/java/org/opensearch/cluster/ClusterState.java index 9e63f961d241d..fa3979882f5cb 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterState.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterState.java @@ -781,7 +781,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(stateUUID); metadata.writeTo(out); routingTable.writeTo(out); - nodes.writeTo(out); + nodes.writeToWithAttribute(out); blocks.writeTo(out); // filter out custom states not supported by the other node int numberOfCustoms = 0; @@ -859,13 +859,23 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(toUuid); out.writeLong(toVersion); routingTable.writeTo(out); - nodes.writeTo(out); + nodesWriteToWithAttributes(nodes, out); metadata.writeTo(out); blocks.writeTo(out); customs.writeTo(out); out.writeVInt(minimumClusterManagerNodesOnPublishingClusterManager); } + private void nodesWriteToWithAttributes(Diff nodes, StreamOutput out) throws IOException { + DiscoveryNodes part = nodes.apply(null); + if (part != null) { + out.writeBoolean(true); + part.writeToWithAttribute(out); + } else { + out.writeBoolean(false); + } + } + @Override public ClusterState apply(ClusterState state) { Builder builder = new Builder(clusterName); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Join.java b/server/src/main/java/org/opensearch/cluster/coordination/Join.java index 58fa85992ebc8..ce1a234998690 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Join.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Join.java @@ -78,8 +78,8 @@ public Join(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); out.writeLong(term); out.writeLong(lastAcceptedTerm); out.writeLong(lastAcceptedVersion); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java index 04f87d16ee400..1447838a41502 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinRequest.java @@ -84,7 +84,7 @@ public JoinRequest(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(minimumTerm); out.writeOptionalWriteable(optionalJoin.orElse(null)); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java index de58eb721b28f..287418aaf378e 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/StartJoinRequest.java @@ -64,7 +64,7 @@ public StartJoinRequest(StreamInput input) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - sourceNode.writeTo(out); + sourceNode.writeToWithAttribute(out); out.writeLong(term); } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 653f81830ed17..8fa4429931e58 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -141,8 +141,10 @@ public static boolean isDedicatedSearchNode(Settings settings) { private final String hostAddress; private final TransportAddress address; private final Map attributes; + private final Map filteredAttributes; private final Version version; private final SortedSet roles; + private static final String ZONE = "zone"; /** * Creates a new {@link DiscoveryNode} @@ -268,6 +270,7 @@ public DiscoveryNode( this.version = version; } this.attributes = Collections.unmodifiableMap(attributes); + this.filteredAttributes = Collections.unmodifiableMap(getFilteredAttributes(attributes)); // verify that no node roles are being provided as attributes Predicate> predicate = (attrs) -> { boolean success = true; @@ -281,6 +284,21 @@ public DiscoveryNode( this.roles = Collections.unmodifiableSortedSet(new TreeSet<>(roles)); } + private Map getFilteredAttributes(Map attributes) { + final Map filteredAttributes = new HashMap<>(); + final Set filteredKeySet = attributes.keySet().stream().filter(key -> key + .startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)).collect(Collectors.toSet()); + for (String key: filteredKeySet) { + filteredAttributes.put(key, attributes.get(key)); + } + + if (attributes.containsKey(ZONE)) { + filteredAttributes.put(ZONE, attributes.get(ZONE)); + } + + return filteredAttributes; + } + /** Creates a DiscoveryNode representing the local node. */ public static DiscoveryNode createLocal(Settings settings, TransportAddress publishAddress, String nodeId) { Map attributes = Node.NODE_ATTRIBUTES.getAsMap(settings); @@ -329,6 +347,8 @@ public DiscoveryNode(StreamInput in) throws IOException { for (int i = 0; i < size; i++) { this.attributes.put(in.readString(), in.readString()); } + + this.filteredAttributes = Collections.unmodifiableMap(getFilteredAttributes(attributes)); int rolesSize = in.readVInt(); final Set roles = new HashSet<>(rolesSize); for (int i = 0; i < rolesSize; i++) { @@ -358,17 +378,33 @@ public DiscoveryNode(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + writeToUtil(out, false); + } else { + writeToUtil(out, true); + } + + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil(out, true); + } + + private void writeToUtil(StreamOutput out, boolean includeAllAttributes) throws IOException { out.writeString(nodeName); out.writeString(nodeId); out.writeString(ephemeralId); out.writeString(hostName); out.writeString(hostAddress); address.writeTo(out); - out.writeVInt(attributes.size()); - for (Map.Entry entry : attributes.entrySet()) { - out.writeString(entry.getKey()); - out.writeString(entry.getValue()); + if (includeAllAttributes) { + serializeAttributes(attributes, out); + } else { + // Serialize only remote store and zone (needed for SearchWeightedRouting) attributes if present. + + serializeAttributes(filteredAttributes, out); } + out.writeVInt(roles.size()); for (final DiscoveryNodeRole role : roles) { final DiscoveryNodeRole compatibleRole = role.getCompatibilityRole(out.getVersion()); @@ -379,6 +415,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVersion(version); } + private void serializeAttributes(final Map attributes, final StreamOutput out) throws IOException { + out.writeVInt(attributes.size()); + for (Map.Entry entry : attributes.entrySet()) { + out.writeString(entry.getKey()); + out.writeString(entry.getValue()); + } + } + /** * The address that the node can be communicated with. */ diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java index 2ebcd8096893d..18f92d989e959 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNodes.java @@ -688,6 +688,14 @@ public String shortSummary() { @Override public void writeTo(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeTo(output), out); + } + + public void writeToWithAttribute(StreamOutput out) throws IOException { + writeToUtil((output, value) -> value.writeToWithAttribute(output), out); + } + + private void writeToUtil(final Writer writer, StreamOutput out) throws IOException { if (clusterManagerNodeId == null) { out.writeBoolean(false); } else { @@ -696,7 +704,7 @@ public void writeTo(StreamOutput out) throws IOException { } out.writeVInt(nodes.size()); for (DiscoveryNode node : this) { - node.writeTo(out); + writer.write(out, node); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java index 59a39b358cb70..614e9f49c8726 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AbstractAllocationDecision.java @@ -107,7 +107,7 @@ public List getNodeDecisions() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(targetNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), targetNode); if (nodeDecisions != null) { out.writeBoolean(true); out.writeList(nodeDecisions); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java index 4163a5fd4c16f..6b805ca91fa58 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/NodeAllocationResult.java @@ -104,7 +104,7 @@ public NodeAllocationResult(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - node.writeTo(out); + node.writeToWithAttribute(out); out.writeOptionalWriteable(shardStoreInfo); out.writeOptionalWriteable(canAllocateDecision); nodeDecision.writeTo(out); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 101daaa143a66..07c272cd94d77 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -83,7 +83,8 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, generateBlobFileName(), getCompressor()).streamInput(); + return CLUSTER_BLOCKS_FORMAT.serialize((out, blocks) -> blocks.writeTo(out), clusterBlocks, generateBlobFileName(), getCompressor()) + .streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java index e5e44525520f4..37e416f93fe97 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterStateCustoms.java @@ -113,7 +113,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return clusterStateCustomsFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); + return clusterStateCustomsFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + custom, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java index 8e850e903954a..1e08d5f5a3e05 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -107,7 +107,12 @@ public String generateBlobFileName() { @Override public InputStream serialize() throws IOException { - return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor()).streamInput(); + return customBlobStoreFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + custom, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index 446207a767009..829036c6d122b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -88,7 +88,12 @@ public UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor()).streamInput(); + return DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java index dee48237e5c4c..b47b1f905e58b 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettings.java @@ -83,8 +83,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, generateBlobFileName(), getCompressor()) - .streamInput(); + return HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, settingsHash) -> settingsHash.writeTo(out), + hashesOfConsistentSettings, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java index 030491cf8b7b9..5b43f9936c0cb 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePinnedTimestamps.java @@ -122,7 +122,12 @@ public String generateBlobFileName() { @Override public InputStream serialize() throws IOException { - return PINNED_TIMESTAMPS_FORMAT.serialize(pinnedTimestamps, generateBlobFileName(), getCompressor()).streamInput(); + return PINNED_TIMESTAMPS_FORMAT.serialize( + (out, timestamps) -> timestamps.writeTo(out), + pinnedTimestamps, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java index 46c5074c48eb8..8303be4a526d7 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteIndexRoutingTable.java @@ -104,7 +104,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { - return INDEX_ROUTING_TABLE_FORMAT.serialize(indexRoutingTable, generateBlobFileName(), getCompressor()).streamInput(); + return INDEX_ROUTING_TABLE_FORMAT.serialize( + (out, routingTable) -> routingTable.writeTo(out), + indexRoutingTable, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java index 2370417dc14df..877d0bf0dbdde 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java +++ b/server/src/main/java/org/opensearch/gateway/remote/routingtable/RemoteRoutingTableDiff.java @@ -139,8 +139,12 @@ public ClusterMetadataManifest.UploadedMetadata getUploadedMetadata() { @Override public InputStream serialize() throws IOException { assert routingTableIncrementalDiff != null; - return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(routingTableIncrementalDiff, generateBlobFileName(), getCompressor()) - .streamInput(); + return REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize( + (out, routingTableDiff) -> routingTableDiff.writeTo(out), + routingTableIncrementalDiff, + generateBlobFileName(), + getCompressor() + ).streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index 6279a8ec3646c..d746aaa2a0783 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -294,6 +294,7 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi logger.debug("{} reestablishing recovery from {}", startRequest.shardId(), startRequest.sourceNode()); } } + transportService.sendRequest( startRequest.sourceNode(), actionName, diff --git a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java index 1df0d3861f686..7309712314acd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java +++ b/server/src/main/java/org/opensearch/indices/recovery/StartRecoveryRequest.java @@ -144,8 +144,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(recoveryId); shardId.writeTo(out); out.writeString(targetAllocationId); - sourceNode.writeTo(out); - targetNode.writeTo(out); + sourceNode.writeToWithAttribute(out); + targetNode.writeToWithAttribute(out); metadataSnapshot.writeTo(out); out.writeBoolean(primaryRelocation); out.writeLong(startingSeqNo); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java index 0add86ab88a16..88672995f4fd6 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormat.java @@ -28,6 +28,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.io.stream.Writeable.Writer; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.CompressorRegistry; import org.opensearch.gateway.CorruptStateException; @@ -56,6 +57,10 @@ public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction unSerializedObj.writeTo(out), obj, blobName, compressor); + } + + public BytesReference serialize(final Writer writer, T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput( @@ -76,7 +81,7 @@ public void close() throws IOException { }; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) { // TODO The stream version should be configurable stream.setVersion(Version.CURRENT); - obj.writeTo(stream); + writer.write(stream, obj); } CodecUtil.writeFooter(indexOutput); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index d08b28730d417..fff6d82b23c7e 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -752,7 +752,7 @@ public HandshakeResponse(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeOptionalWriteable(discoveryNode); + out.writeOptionalWriteable((stream, node) -> node.writeToWithAttribute(stream), discoveryNode); clusterName.writeTo(out); out.writeVersion(version); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 74254f1a1987f..02f22d713def5 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -548,6 +548,7 @@ public void testGetAsyncIndexRoutingReadAction() throws Exception { String uploadedFileName = String.format(Locale.ROOT, "index-routing/" + indexName); when(blobContainer.readBlob(indexName)).thenReturn( INDEX_ROUTING_TABLE_FORMAT.serialize( + (out, routingTable) -> routingTable.writeTo(out), clusterState.getRoutingTable().getIndicesRouting().get(indexName), uploadedFileName, compressor @@ -588,7 +589,12 @@ public void testGetAsyncIndexRoutingTableDiffReadAction() throws Exception { String uploadedFileName = String.format(Locale.ROOT, "routing-table-diff/" + indexName); when(blobContainer.readBlob(indexName)).thenReturn( - REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize(diff, uploadedFileName, compressor).streamInput() + REMOTE_ROUTING_TABLE_DIFF_FORMAT.serialize( + (out, routingTableDiff) -> routingTableDiff.writeTo(out), + diff, + uploadedFileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java index 4ef459e6657a1..dc45293549e1e 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManagerTests.java @@ -140,7 +140,12 @@ public void testGetAsyncReadRunnable_DiscoveryNodes() throws IOException, Interr DiscoveryNodes discoveryNodes = getDiscoveryNodes(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, fileName, compressor).streamInput() + DISCOVERY_NODES_FORMAT.serialize( + (out, discoveryNode) -> discoveryNode.writeToWithAttribute(out), + discoveryNodes, + fileName, + compressor + ).streamInput() ); RemoteDiscoveryNodes remoteObjForDownload = new RemoteDiscoveryNodes(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); @@ -190,7 +195,7 @@ public void testGetAsyncReadRunnable_ClusterBlocks() throws IOException, Interru ClusterBlocks clusterBlocks = randomClusterBlocks(); String fileName = randomAlphaOfLength(10); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - CLUSTER_BLOCKS_FORMAT.serialize(clusterBlocks, fileName, compressor).streamInput() + CLUSTER_BLOCKS_FORMAT.serialize((out, blocks) -> blocks.writeTo(out), clusterBlocks, fileName, compressor).streamInput() ); RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(fileName, "cluster-uuid", compressor); CountDownLatch latch = new CountDownLatch(1); @@ -261,7 +266,12 @@ public void testGetAsyncReadRunnable_Custom() throws IOException, InterruptedExc namedWriteableRegistry ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - remoteClusterStateCustoms.clusterStateCustomsFormat.serialize(custom, fileName, compressor).streamInput() + remoteClusterStateCustoms.clusterStateCustomsFormat.serialize( + (out, customState) -> customState.writeTo(out), + custom, + fileName, + compressor + ).streamInput() ); TestCapturingListener capturingListener = new TestCapturingListener<>(); final CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 1871f4d08ba43..a31a743200b34 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -1112,7 +1112,12 @@ public void testGetClusterStateUsingDiff() throws IOException { "custom", is -> readFrom(is, namedWriteableRegistry, addedCustom.getWriteableName()) ); - BytesReference bytes = customMetadataFormat.serialize(addedCustom, "custom-md2-file__1", compressor); + BytesReference bytes = customMetadataFormat.serialize( + (out, customClusterState) -> customClusterState.writeTo(out), + addedCustom, + "custom-md2-file__1", + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); } @@ -1131,6 +1136,7 @@ public void testGetClusterStateUsingDiff() throws IOException { ); when(blobContainer.readBlob(HASHES_OF_CONSISTENT_SETTINGS_FILENAME)).thenAnswer(i -> { BytesReference bytes = HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, settingsHash) -> settingsHash.writeTo(out), hashesOfConsistentSettings, HASHES_OF_CONSISTENT_SETTINGS_FILENAME, compressor @@ -1172,7 +1178,12 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.discoveryNodesUpdated(true); manifestBuilder.discoveryNodesMetadata(new UploadedMetadataAttribute(DISCOVERY_NODES, DISCOVERY_NODES_FILENAME)); when(blobContainer.readBlob(DISCOVERY_NODES_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize(nodesBuilder.build(), DISCOVERY_NODES_FILENAME, compressor); + BytesReference bytes = DISCOVERY_NODES_FORMAT.serialize( + (out, nodes) -> nodes.writeToWithAttribute(out), + nodesBuilder.build(), + DISCOVERY_NODES_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); } @@ -1183,7 +1194,12 @@ public void testGetClusterStateUsingDiff() throws IOException { diffManifestBuilder.clusterBlocksUpdated(true); manifestBuilder.clusterBlocksMetadata(new UploadedMetadataAttribute(CLUSTER_BLOCKS, CLUSTER_BLOCKS_FILENAME)); when(blobContainer.readBlob(CLUSTER_BLOCKS_FILENAME)).thenAnswer(invocationOnMock -> { - BytesReference bytes = CLUSTER_BLOCKS_FORMAT.serialize(newClusterBlock, CLUSTER_BLOCKS_FILENAME, compressor); + BytesReference bytes = CLUSTER_BLOCKS_FORMAT.serialize( + (out, clusterBlock) -> clusterBlock.writeTo(out), + newClusterBlock, + CLUSTER_BLOCKS_FILENAME, + compressor + ); return new ByteArrayInputStream(bytes.streamInput().readAllBytes()); }); @@ -3254,6 +3270,7 @@ private void mockBlobContainerForGlobalMetadata( String fileName = entry.getValue(); when(blobContainer.readBlob(fileName)).thenAnswer((invocation) -> { BytesReference bytesReference = customMetadataFormat.serialize( + (out, customState) -> customState.writeTo(out), metadata.custom(custom), fileName, blobStoreRepository.getCompressor() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index a2da1e8b0fdb2..8c337df48d9eb 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -359,7 +359,12 @@ public void testGetAsyncReadRunnable_HashesOfConsistentSettings() throws Excepti compressor ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() + HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize( + (out, hashesOfConsistentSetting) -> hashesOfConsistentSetting.writeTo(out), + hashesOfConsistentSettings, + fileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); @@ -490,7 +495,12 @@ public void testGetAsyncReadRunnable_CustomMetadata() throws Exception { namedWriteableRegistry ); when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( - customMetadataForDownload.customBlobStoreFormat.serialize(customMetadata, fileName, compressor).streamInput() + customMetadataForDownload.customBlobStoreFormat.serialize( + (out, metadata) -> metadata.writeTo(out), + customMetadata, + fileName, + compressor + ).streamInput() ); TestCapturingListener listener = new TestCapturingListener<>(); CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java index f1bced2bdf855..1b988ee1f37ec 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesTests.java @@ -143,7 +143,7 @@ public void testSerDe() throws IOException { public void testExceptionDuringSerialization() throws IOException { DiscoveryNodes nodes = mock(DiscoveryNodes.class); RemoteDiscoveryNodes remoteObjectForUpload = new RemoteDiscoveryNodes(nodes, METADATA_VERSION, clusterUUID, compressor); - doThrow(new IOException("mock-exception")).when(nodes).writeTo(any()); + doThrow(new IOException("mock-exception")).when(nodes).writeToWithAttribute(any()); IOException iea = assertThrows(IOException.class, remoteObjectForUpload::serialize); } diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java index 536df880b2597..c4e53c1eea138 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/ChecksumWritableBlobStoreFormatTests.java @@ -35,7 +35,12 @@ public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase { public void testSerDe() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); - BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none()); + BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), + indexMetadata, + TEST_BLOB_FILE_NAME, + CompressorRegistry.none() + ); IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference); assertThat(readIndexMetadata, is(indexMetadata)); } @@ -43,6 +48,7 @@ public void testSerDe() throws IOException { public void testSerDeForCompressed() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); BytesReference bytesReference = clusterBlocksFormat.serialize( + (out, metadata) -> metadata.writeTo(out), indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.getCompressor(DeflateCompressor.NAME)