From 49fc8236871cf51fda96230ab4b345cb1c1400f0 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 7 Sep 2023 10:05:15 +0530 Subject: [PATCH] [Remote Cluster State] Parallel and Multipart IndexMetadata uploads (#9664) (#9838) * [Remote Cluster State] Parallel and Multipart IndexMetadata uploads (cherry picked from commit 92b2095f46c460114abdd264052716ec606e2f28) Signed-off-by: bansvaru Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- .../transfer/RemoteTransferContainer.java | 17 ++ .../remote/RemoteClusterStateService.java | 191 ++++++++++++++---- .../index/store/RemoteDirectory.java | 10 +- .../blobstore/ChecksumBlobStoreFormat.java | 63 ++++++ .../RemoteClusterStateServiceTests.java | 91 ++++++++- .../snapshots/BlobStoreFormatTests.java | 89 ++++++++ 6 files changed, 408 insertions(+), 53 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index c7cfef5c5ce3d..5808f51f01efc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; @@ -19,11 +21,13 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.util.ByteUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import java.util.zip.CRC32; import com.jcraft.jzlib.JZlib; @@ -244,4 +248,17 @@ public void close() throws IOException { throw new IOException("Closure of some of the multi-part streams failed."); } } + + /** + * Compute final checksum for IndexInput container checksum footer added by {@link CodecUtil} + * @param indexInput IndexInput with checksum in footer + * @param checksumBytesLength length of checksum bytes + * @return final computed checksum of entire indexInput + */ + public static long checksumOfChecksum(IndexInput indexInput, int checksumBytesLength) throws IOException { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), checksumBytesLength); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 980c2f9cf3ce4..4697f8f83fe20 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Nullable; @@ -22,6 +24,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.RepositoriesService; @@ -34,11 +38,14 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -57,6 +64,8 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, @@ -130,24 +139,11 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro } ensureRepositorySet(); - final List allUploadedIndexMetadata = new ArrayList<>(); - // todo parallel upload // any validations before/after upload ? - for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.add(uploadedIndexMetadata); - } + final List allUploadedIndexMetadata = writeIndexMetadataParallel( + clusterState, + new ArrayList<>(clusterState.metadata().indices().values()) + ); final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { @@ -197,6 +193,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( final Map allUploadedIndexMetadata = previousManifest.getIndices() .stream() .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); + + List toUpload = new ArrayList<>(); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { @@ -207,32 +206,22 @@ public ClusterMetadataManifest writeIncrementalMetadata( indexMetadata.getVersion() ); numIndicesUpdated++; - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + toUpload.add(indexMetadata); } else { numIndicesUnchanged++; } previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } + List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload); + uploadedIndexMetadataList.forEach( + uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) + ); + for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } - final ClusterMetadataManifest manifest = uploadManifest( - clusterState, - allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), - false - ); + final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -254,6 +243,118 @@ public ClusterMetadataManifest writeIncrementalMetadata( return manifest; } + /** + * Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return. + * @param clusterState current ClusterState + * @param toUpload list of IndexMetadata to upload + * @return {@code List} list of IndexMetadata uploaded to remote + * @throws IOException + */ + private List writeIndexMetadataParallel(ClusterState clusterState, List toUpload) + throws IOException { + List exceptionList = Collections.synchronizedList(new ArrayList<>(toUpload.size())); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + List result = new ArrayList<>(toUpload.size()); + + LatchedActionListener latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { + logger.trace( + String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) + ); + result.add(uploadedIndexMetadata); + }, ex -> { + assert ex instanceof IndexMetadataTransferException; + logger.error( + () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), + ex + ); + exceptionList.add(ex); + }), + latch + ); + + for (IndexMetadata indexMetadata : toUpload) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); + } + + try { + if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + IndexMetadataTransferException ex = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of index metadata to complete - %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ) + ); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } + } catch (InterruptedException ex) { + exceptionList.forEach(ex::addSuppressed); + IndexMetadataTransferException exception = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of index metadata to complete - %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ), + ex + ); + Thread.currentThread().interrupt(); + throw exception; + } + if (exceptionList.size() > 0) { + IndexMetadataTransferException exception = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Exception during transfer of IndexMetadata to Remote %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ) + ); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + return result; + } + + /** + * Allows async Upload of IndexMetadata to remote + * @param clusterState current ClusterState + * @param indexMetadata {@link IndexMetadata} to upload + * @param latchedActionListener listener to respond back on after upload finishes + * @throws IOException + */ + private void writeIndexMetadataAsync( + ClusterState clusterState, + IndexMetadata indexMetadata, + LatchedActionListener latchedActionListener + ) throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + indexMetadata.getIndexUUID() + ); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse( + new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata) + ) + ), + ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex)) + ); + + INDEX_METADATA_FORMAT.writeAsync( + indexMetadata, + indexMetadataContainer, + indexMetadataFileName(indexMetadata), + blobStoreRepository.getCompressor(), + completionListener + ); + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -313,14 +414,6 @@ private ClusterMetadataManifest uploadManifest( } } - private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName) - throws IOException { - final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID()); - INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); - // returning full path - return indexMetadataContainer.path().buildAsString() + fileName; - } - private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); @@ -468,4 +561,18 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); } } + + /** + * Exception for IndexMetadata transfer failures to remote + */ + static class IndexMetadataTransferException extends RuntimeException { + + public IndexMetadataTransferException(String errorDesc) { + super(errorDesc); + } + + public IndexMetadataTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 5250ce6230ffa..594b7f99cd85a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -29,7 +28,6 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; -import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.store.exception.ChecksumCombinationException; @@ -47,9 +45,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import com.jcraft.jzlib.JZlib; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -401,11 +398,8 @@ private void uploadBlob( private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { - long storedChecksum = CodecUtil.retrieveChecksum(indexInput); - CRC32 checksumOfChecksum = new CRC32(); - checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); try { - return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + return checksumOfChecksum(indexInput, SEGMENT_CHECKSUM_BYTES); } catch (Exception e) { throw new ChecksumCombinationException( "Potentially corrupted file: Checksum combination failed while combining stored checksum " diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 9048757405108..744e2fbd1bfc7 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -42,7 +42,11 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -50,6 +54,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -58,6 +63,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.CorruptStateException; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; @@ -67,6 +73,8 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; + /** * Snapshot metadata file format used in v2.0 and above * @@ -167,6 +175,61 @@ public void write(final T obj, final BlobContainer blobContainer, final String n blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + * Leverages the multipart upload if supported by the blobContainer. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param listener listener to listen to write result + */ + public void writeAsync( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener + ) throws IOException { + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + write(obj, blobContainer, name, compressor); + listener.onResponse(null); + return; + } + final String blobName = blobName(name); + final BytesReference bytes = serialize(obj, blobName, compressor); + final String resourceDescription = "ChecksumBlobStoreFormat.writeAsync(blob=\"" + blobName + "\")"; + try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { + long expectedChecksum; + try { + expectedChecksum = checksumOfChecksum(input.clone(), 8); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum", + resourceDescription, + e + ); + } + + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + blobName, + blobName, + bytes.length(), + true, + WritePriority.HIGH, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + expectedChecksum, + true + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); + } + } + } + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index d4e090b046760..1f4c32b59f183 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -15,14 +15,21 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -47,12 +54,14 @@ import java.util.Map; import java.util.function.Supplier; -import org.mockito.ArgumentMatchers; +import org.mockito.ArgumentCaptor; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -146,6 +155,78 @@ public void testWriteFullMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + public void testWriteFullMetadataInParallelSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + + assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 1); + assertEquals(writeContextArgumentCaptor.getAllValues().size(), 1); + + WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue(); + byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes(); + IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize( + capturedWriteContext.getFileName(), + blobStoreRepository.getNamedXContentRegistry(), + new BytesArray(writtenBytes) + ); + + assertEquals(capturedWriteContext.getWritePriority(), WritePriority.HIGH); + assertEquals(writtenIndexMetadata.getNumberOfShards(), 1); + assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0); + assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index"); + assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid"); + long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8); + assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum); + + } + + public void testWriteFullMetadataInParallelFailure() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onFailure(new RuntimeException("Cannot upload to remote")); + return null; + }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + + assertThrows( + RemoteClusterStateService.IndexMetadataTransferException.class, + () -> remoteClusterStateService.writeFullMetadata(clusterState) + ); + } + public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); @@ -426,13 +507,17 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { } private BlobContainer mockBlobStoreObjects() { + return mockBlobStoreObjects(BlobContainer.class); + } + + private BlobContainer mockBlobStoreObjects(Class blobContainerClazz) { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); when(blobPath.add(anyString())).thenReturn(blobPath); when(blobPath.buildAsString()).thenReturn("/blob/path/"); - final BlobContainer blobContainer = mock(BlobContainer.class); + final BlobContainer blobContainer = mock(blobContainerClazz); when(blobContainer.path()).thenReturn(blobPath); - when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); return blobContainer; } diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 93be194b2d112..03f0d27188027 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -34,14 +34,19 @@ import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.read.ReadContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.CompressorRegistry; @@ -56,7 +61,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; import java.util.Map; +import java.util.concurrent.CountDownLatch; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; @@ -114,6 +121,57 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } } + public void testBlobStoreAsyncOperations() throws IOException, InterruptedException { + BlobStore blobStore = createTestBlobStore(); + MockFsVerifyingBlobContainer mockBlobContainer = new MockFsVerifyingBlobContainer( + (FsBlobStore) blobStore, + BlobPath.cleanPath(), + null + ); + ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); + + CountDownLatch latch = new CountDownLatch(2); + + ActionListener actionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("---> Async write succeeded"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("---> Failure in async write"); + throw new RuntimeException("async write should not fail"); + } + }; + + // Write blobs in different formats + checksumSMILE.writeAsync( + new BlobObj("checksum smile"), + mockBlobContainer, + "check-smile", + CompressorRegistry.none(), + actionListener + ); + checksumSMILE.writeAsync( + new BlobObj("checksum smile compressed"), + mockBlobContainer, + "check-smile-comp", + CompressorRegistry.getCompressor(DeflateCompressor.NAME), + actionListener + ); + + latch.await(); + + // Assert that all checksum blobs can be read + assertEquals(checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile", xContentRegistry()).getText(), "checksum smile"); + assertEquals( + checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile-comp", xContentRegistry()).getText(), + "checksum smile compressed" + ); + } + public void testBlobStoreOperations() throws IOException { BlobStore blobStore = createTestBlobStore(); BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); @@ -196,4 +254,35 @@ private long checksum(byte[] buffer) throws IOException { } } } + + public static class MockFsVerifyingBlobContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer { + + private BlobContainer delegate; + + public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + delegate = blobStore.blobContainer(BlobPath.cleanPath()); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + InputStream inputStream = writeContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream(); + delegate.writeBlob(writeContext.getFileName(), inputStream, writeContext.getFileSize(), true); + completionListener.onResponse(null); + } + + @Override + public void readBlobAsync(String blobName, ActionListener listener) { + throw new RuntimeException("read not supported"); + } + + @Override + public boolean remoteIntegrityCheckSupported() { + return false; + } + + public BlobContainer getDelegate() { + return delegate; + } + } }