Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Design Proposal] Enhancement of Repository Plugin #6632

Closed
vikasvb90 opened this issue Mar 11, 2023 · 11 comments
Closed

[Design Proposal] Enhancement of Repository Plugin #6632

vikasvb90 opened this issue Mar 11, 2023 · 11 comments
Assignees
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request

Comments

@vikasvb90
Copy link
Contributor

vikasvb90 commented Mar 11, 2023

Objective

This document captures low level details of enhancements to the OpenSearch repository plugin to support parallel uploads of different parts of a file.

RFC: #6354

Design Factors

This design is based on following factors:

  1. Repository plugin will only be responsible for reliably carrying remote transfers for the provided content. In this case, provided content will be list of streams. It will be the responsibility of the caller to emit different streams for different parts of the file and provide them as an input to repository plugin.
  2. Offloading the responsibility of emitting different streams from a file will also help provide control to callers to do additional processing of content after it is read and before it is transferred such as encryption/decryption, calculating checksums for data integrity, etc.
  3. Repository plugin will not have any reference of type of stream used and will directly consume InputStream interface(s). This will allow caller to provide any stream implementations.

Design details

  1. Repository plugin interfaces will be enhanced to support parallel uploads of multiple parts of a file, as opposed to the serial upload that is done today
  2. An abstraction providing list of stream suppliers will be added to plugin. Suppliers are better suited instead of concrete streams because plugin can decide exactly when and where the streams need to be created.
  3. Vendor plugins will take the decision of determining individual part size for a given file size. An abstraction can take an input of part size determined by plugin to be most suitable for a given file size and provide a list of respective number of suppliers.
  4. Multi-part parallel upload will depend on the part size returned by underlying plugin. This means if plugin invokes with part size equal to file size then upload will happen for the entire file as one part.

Low Level Details

Settings

Following settings will be used to control transfers

  1. Index Setting index.remote_store.multipart_parallel_upload.enabled to enable or disable parallel uploads. Default value will be true.
  2. Repository setting min_part_size to define the minimum size a part should have. A file of size less than or equal to this setting value will be uploaded as a single part.
  3. Scaling threadpools remote_upload and priority_remote_upload will be used in uploads of segment and translog/ckp files respectively. priority_remote_upload can be provided more bandwidth to prioritize ingestion over background tasks like refresh/flush or merge.

Constructs

  • WriteContext
    • Encapsulation or a container to hold fields required for a write operation by multiple parts parallel upload.

       public class WriteContext {
              private final String fileName;
              private final StreamContextSupplier streamContextSupplier;
              private final long fileSize;
              private final WritePriority writePriority;
       }
      
    • StreamContextSupplier consists of stream suppliers which will be responsible for providing streams depending on the part size. This can look like following

       StreamContext supplyStreamContext(int partSize);
      
       public class StreamContext {
           private final List<Supplier<Stream>> streamSuppliers;
           private final long totalContentLength;
       }
      
      public class Stream {
      
          private final InputStream inputStream;
          private final long contentLength;
          private final long offset;
      }
      
    • Different uploads can then be triggered in parallel each consuming one stream.

  • BlobContainer
    • New methods in BlobContainer to accept a WriteContext. Depending upon the support, this will be invoked in OpenSearch remote store flows.

      default boolean isMultiStreamUploadSupported() {
            return false;
      }
      
      default CompletableFuture<UploadResponse> writeBlobByStreams(WriteContext writeContext) throws IOException {
            throw new UnsupportedOperationException();
      }
      

Performance Results

On carrying out some performance runs on POC code providing capability of parallel upload of multiple parts of a file, we obtained following results :

Note: Observations below are taken from prolonged runs (>15min) of repeated transfers of a file.
Instance Type : m5.2xlarge

Threads CPU (%) File Size (gb) Latency (sec)
1 7 4.5 110
3 8 4.5 100
5 12 4.5 60
10 20 4.5 30
20 35 4.5 17
1 3 1.1 71
3 8 1.1 25
5 12 1.1 15
10 21 1.1 8
20 32 1.1 5

Observations
Following can be deduced from the tables :

  1. There are clear gains in performance of remote transfer using multiple parts of a file in parallel.
  2. Since increasing concurrency impacts CPU, threadpools remote_upload and priority_remote_upload should be properly configured.
  3. CPU is only affected by the concurrency of transfer and remains almost unaffected by the size of the content being transferred.
  4. Latency of entire transfer of a file is proportional to number of threads used and size of file being transferred.
@vikasvb90 vikasvb90 added enhancement Enhancement or improvement to existing feature or request untriaged labels Mar 11, 2023
@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Mar 11, 2023

Tagging @elfisher @muralikpbhat @reta @mch2 @dreamer-89 @andrross @Bukhtawar @sachinpkale @itiyamas @dblock @shwetathareja @saratvemulapalli @ashking94 for feedback. Pls do tag others who can review this.

@reta
Copy link
Collaborator

reta commented Mar 13, 2023

@vikasvb90 I think the suggested API is not complete and even in this shape raises a number of questions:

  • the API is not purely stream oriented and explicitly requires that the producer knows the size ahead of time (fileSize, totalContentLength)
  • this is not going to work well with stream composition, for example, when client-side compression or encryption is going to be introduced (the size of the produced content is not known ahead of time)

I believe the API get the inspiration from S3 multipart upload, which (afaik) requires prior knowledge of the size of the data being uploaded.

@vikasvb90
Copy link
Contributor Author

vikasvb90 commented Mar 14, 2023

@reta The idea is not to make it purely stream oriented but to provide support of parallel uploads using streams where each stream can be responsible for reading a specific part of the file. Sample code below :

        CheckedInputStream[] checkedInputStreams = new CheckedInputStream[numberOfParts];
        StreamContainer[] streamContainers = new StreamContainer[numberOfParts];
        for (int partNo=0; partNo<numberOfParts; partNo++) {
            long position = rawPartSize * (partNo);
            long size = partNo == numberOfParts-1 ? rawLastPartSize : rawPartSize;
            checkedInputStreams[partNo] = getMultiPartStreamSupplier().apply(size, position);
            if (checkedInputStreams[partNo] == null) {
                throw new IOException("Error creating multipart stream during opening streams for read");
            }
        }
  1. When it comes to remote store, there is always a bounded transfer. When you skip size of content then what you are suggesting is to allow unbounded transfer to remote store since streaming can be endless. I don't think there is any such capability supported by any remote store and I can't think of a use case in which anyone would require endless streaming to a remote store.
  2. It is entirely possible to determine the size of encrypted or decrypted content without actually encrypting or decrypting the entire content. Design proposal for the same along with perf numbers is mentioned here.
  3. Since you mentioned S3, it is not feasible to upload large files without skipping content length. There is a clear warning present in S3 SDK.
WARNING: No content length specified for stream data. Stream contents will be buffered in memory and could result in out of memory errors.

We would not want to omit any detail from the abstraction which could compromise on the functioning or performance of the underlying plugin.
4. You mentioned that the suggested API is not complete. Can you provide specific API example(s) along with the reasons supporting the need of them?

@reta
Copy link
Collaborator

reta commented Mar 14, 2023

Thanks @vikasvb90

When it comes to remote store, there is always a bounded transfer.

Fair enough (if that is by design)

It is entirely possible to determine the size of encrypted or decrypted content without actually encrypting or decrypting the entire content.

I believe it is not and highly depends on the algorithm (the proposal you mentioned talks about rough estimations). If you look into original comment, it mentions client side compression as well - the size of the compressed content is unknown ahead of time. In any case, it would be useful to see how the content length is being used to determine possible alternatives or limitations.

You mentioned that the suggested API is not complete.

You just showed that in your answer. The description mentioned some isolated pieces without large picture. F.e., your own code snippet uses StreamContainer but this is not part of the API. It would be helpful to see:

  • the complete API
  • the snippet of how API plays together (even with pseudo code for blob upload when isMultiStreamUploadSupported is `true)

@vikasvb90
Copy link
Contributor Author

@reta Can you please re-post encryption related question on the encryption design proposal? I can answer all your encryption related questions there. Also, by compression I am assuming that you mean encryption as both are different and we haven't referred compression anywhere so far.

the complete API
the snippet of how API plays together (even with pseudo code for blob upload when isMultiStreamUploadSupported is `true)

I have updated the description with the structure of Stream model.
I intentionally kept the usage of new abstractions in OS out of the scope of this design as I wanted the focus to be on getting feedback for accommodating any missing requirement of a vendor plugin. But since you asked for it, following is how we plan to use these abstraction in OS in remote_store.

streamContextSupplier mentioned in the description in WriteContext can have the reference of following method. Following method will return a StreamContext which has different stream suppliers responsible for supplying streams each referring to a specific portion of the file. These suppliers will be invoked by underlying repository plugin to create streams and then plugin can choose to consume these streams as per its implementation. Also you can see in the API above that plugin will provide partSize in order to get suppliers (present in StreamContext), this will provide control to the plugin to decide the number of parts it deems efficient for a given file size. For e.g. it can provide the entire file size as partSize to get a single stream supplier to upload the entire file serially.

public StreamContext openMultipartStreams(long partSize) throws IOException {
        if (inputStreams.get() != null) {
            throw new IOException("Multi-part streams are already created.");
        }

        this.partSize = partSize;
        this.lastPartSize = (contentLength % partSize) != 0 ? contentLength % partSize : partSize;
        this.numberOfParts = (int) ((contentLength % partSize) == 0 ? contentLength / partSize
            : (contentLength / partSize) + 1);

        InputStream[] streams = new InputStream[numberOfParts];
        List<Supplier<Stream>> streamSuppliers = new ArrayList<>();
        for (int partNo = 0; partNo < numberOfParts; partNo++) {
            long position = partSize * partNo;
            long size = partNo == numberOfParts - 1 ? lastPartSize : partSize;
            streams[partNo] = localFile != null
                ? getMultiPartStreamSupplierForFile().apply(size, position)
                : getMultiPartStreamSupplierForIndexInput().apply(size, position);
            if (streams[partNo] == null) {
                throw new IOException("Error creating multipart stream during opening streams for read");
            }

            final int finalPartNo = partNo;
            streamSuppliers.add(() -> new Stream(streams[finalPartNo], size, position));
        }
        inputStreams.set(streams);

        return new StreamContext(
            streamSuppliers,
            contentLength
        );
    }

    private BiFunction<Long, Long, OffsetRangeFileInputStream> getMultiPartStreamSupplierForFile() {
        return (size, position) -> {
            OffsetRangeFileInputStream offsetRangeInputStream;
            try {
                offsetRangeInputStream = new OffsetRangeFileInputStream(localFile.toFile(), size, position);
            } catch (IOException e) {
                log.error("Failed to create input stream", e);
                return null;
            }
            return offsetRangeInputStream;
        };
    }

    private BiFunction<Long, Long, OffsetRangeIndexInputStream> getMultiPartStreamSupplierForIndexInput() {
        return (size, position) -> {
            OffsetRangeIndexInputStream offsetRangeInputStream;
            try {
                offsetRangeInputStream = new OffsetRangeIndexInputStream(indexInput, fileName, size, position);
            } catch (IOException e) {
                log.error("Failed to create input stream", e);
                return null;
            }
            return offsetRangeInputStream;
        };
    }

@reta
Copy link
Collaborator

reta commented Mar 15, 2023

Thanks @vikasvb90

Also, by compression I am assuming that you mean encryption as both are different and we haven't referred compression anywhere so far.

No, by compression I mean compression, see please #2192

Can you please re-post encryption related question on the encryption design proposal? I can answer all your encryption related questions there.

My question was not really related to encryption or compression but stream composition. We can leave this part aside since the design you are suggestion is not purely stream oriented (as you mentioned).

@vikasvb90 vikasvb90 added discuss Issues intended to help drive brainstorming and decision making and removed untriaged labels Mar 16, 2023
@dblock
Copy link
Member

dblock commented Mar 24, 2023

For stream context, do we always need to divide all data into a number of streams ahead of time, or would it make sense to make stream suppliers a stream-like interface, so that we can keep supplying another stream until the caller runs out of data to write? In the compression example the stream compressor could be processing incoming stream data, and every time it has N bytes it would queue those for multi-part writing. In the non-compressed example I could start writing while still constructing further streams.

@reta
Copy link
Collaborator

reta commented Mar 24, 2023

For stream context, do we always need to divide all data into a number of streams ahead of time, or would it make sense to make stream suppliers a stream-like interface, so that we can keep supplying another stream until the caller runs out of data to write?

Personally, I think the stream(s) on demand is a better option (right now the whole source has to be divided into streams before hand, taking an example, 60Gb divided into 5Mb streams, that would take noticeble amount of heap). But I do not see the clear way to accommodate that in the current design that needs content length knowledge.

@dblock
Copy link
Member

dblock commented Mar 27, 2023

But I do not see the clear way to accommodate that in the current design that needs content length knowledge.

Aren't those orthogonal? Just because I know the total size of data, doesn't mean I need to create all streams ahead of time.

Of course it would be better if I didn't need to know / have the entire data already loaded.

@reta
Copy link
Collaborator

reta commented Mar 27, 2023

Of course it would be better if I didn't need to know / have the entire data already loaded.

Thank would be the best option I think

@vikasvb90
Copy link
Contributor Author

@dblock

  1. On-demand stream creation is definitely a better idea. I have replaced suppliers with StreamIterable where consumer plugin can use standard iterator based implementation to make a stream creation call whenever needed.
  2. Data is never fully loaded and instead respective streams buffer some amount of data (~10KB) and submit for upload. On demand creation of stream will help in cases where file size is really large and not enough jvm is available to accommodate all objects of Stream at the same time.
  3. A single stream like interface won't work for reading and writing. I have added the reason here.
  4. Plugin can also choose to provide response asynchronously and therefore, I am also adding a CompletableFuture return type in writeBlobByStreams method.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
discuss Issues intended to help drive brainstorming and decision making enhancement Enhancement or improvement to existing feature or request
Projects
None yet
Development

No branches or pull requests

3 participants