Skip to content

Commit

Permalink
Merge pull request #179 from LeoPlatform/feature/fast_s3_read_1_5
Browse files Browse the repository at this point in the history
Feature/fast s3 read 1 5
  • Loading branch information
czirker committed Jul 12, 2023
2 parents ace070a + 534c004 commit e94a8c1
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 8 deletions.
11 changes: 11 additions & 0 deletions lib/lib.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,17 @@ export interface ReadOptions {
*/
fast_s3_read_parallel_fetch_max_bytes?: number;


/**
* When using the [[`ReadOptions.fast_s3_read`]] feature, this specifies how many retries for S3 timeouts
* before giving up on preconnecting.
* The default usually is correct.
*
* @default 1 retry
* @todo inconsistent fast_s3_read_timeout_retry_count
*/
fast_s3_read_timeout_retry_count?: number;

/**
* The max number of records, events, the SDK should retrieve each time it retrieves events from the
* RStreams Bus' Dynamo DB events table.
Expand Down
93 changes: 85 additions & 8 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1236,7 +1236,8 @@ module.exports = function(configure) {
stopTime: moment().add(240, "seconds"),
stream_query_limit: opts.fast_s3_read ? 1000 : 50,
fast_s3_read: false,
fast_s3_read_parallel_fetch_max_bytes: FIVE_MB_IN_BYTES
fast_s3_read_parallel_fetch_max_bytes: FIVE_MB_IN_BYTES,
fast_s3_read_timeout_retry_count: 1
}, opts || {});
logger.info(opts);
if (opts.fast_s3_read) {
Expand Down Expand Up @@ -1555,7 +1556,7 @@ module.exports = function(configure) {
let close_remaining_s3_streams = () => {
logger.debug("closing all remaining s3 stream");
Object.entries(s3_streams).forEach(([key, data]) => {
if (data.stream){
if (data.stream) {
data.stream.on('error', (err) => {
// Ignore errors. This stream was never used
logger.debug("Error closing s3 stream:", key, err);
Expand All @@ -1580,12 +1581,13 @@ module.exports = function(configure) {
bytes += agg_bytes;
s3_streams[last_s3_index] = {
bytes: agg_bytes,
stream: create_s3_stream(items[last_s3_index])
stream: create_s3_stream(items[last_s3_index], last_s3_index)
};
agg_bytes = 0;
}
}
return s3_streams[index].stream;

return s3_streams[index].stream.get();
};

// Remove this streams byte count from the bytes waiting to be processed
Expand All @@ -1598,8 +1600,10 @@ module.exports = function(configure) {
};


let create_s3_stream = (item) => {
let create_s3_stream = (item, index) => {
if (item.s3 && item.end.localeCompare(start) > 0) {

logger.debug(`creating s3, Index: ${last_s3_index}, eid: ${items[last_s3_index].s3.key}`);
if (item.start) {
var _parts = item.start.split(/-/);
var prefix = _parts[0];
Expand Down Expand Up @@ -1639,9 +1643,82 @@ module.exports = function(configure) {

var file = item.s3;
file.range = `bytes=${fileOffset}-${fileEnd}`;
let s = ls.fromS3(file);
s.idOffset = idOffset;
return s;

let s3ReturnStream;
let timeoutCount = 0;
//let startRequestId = (configure.registry && configure.registry.context && configure.registry.context.awsRequestId) || "";
let setS3ReturnStream = function(catchTimeoutErrors = true) {

// Create the file stream and attach the id offset to know where the eids start for this file
s3ReturnStream = ls.fromS3(file);
s3ReturnStream.idOffset = idOffset;

if (catchTimeoutErrors) {

// Attach an error listener to catch timeouts and pre connect again if allowed
s3ReturnStream.on("error", err => {

// Potential future enhancement - ignore error if the file was started on a different aws request
// let currentRequestId = (configure.registry && configure.registry.context && configure.registry.context.awsRequestId) || "";
// if (startRequestId != currentRequestId) {
// // Cross invocation request. Don't throw the error
// return;
// }


// If the file has been consumed we don't want to catch errors
if (err.code === "TimeoutError" && !s3ReturnStream.consumed) {
timeoutCount++;
logger.log(`Caught S3 timeout (${timeoutCount}/${opts.fast_s3_read_timeout_retry_count}) Index: ${index} Range: ${item.start} - ${item.end} Key: ${file.key}`);

if (timeoutCount > opts.fast_s3_read_timeout_retry_count) {
// Max pre connect timeout retries. So wait until the events are actaully needed.
s3ReturnStream.timedout = true;
} else {
// pre connect again
setS3ReturnStream();
}
} else {
// Save the error for when the stream is consumed
s3ReturnStream.errorToEmitOnPipe = err;
}
});
}
};

// Start the connection to S3
setS3ReturnStream();

// Wrapper around retrieving the stream to allow for fixing errored s3 files
return {
get: function() {
if (s3ReturnStream.timedout) {
// The file consumed all the retries so connect now
setS3ReturnStream(false);
}

// Marked as consumed so we pass along any errors correctly
s3ReturnStream.consumed = true;

// Emit any errors if needed when a downstream connects
if (s3ReturnStream.errorToEmitOnPipe) {
let pipe = s3ReturnStream.pipe.bind(s3ReturnStream);
s3ReturnStream.pipe = function(dest, opts) {
let ret = pipe(dest, opts);
dest.emit("error", s3ReturnStream.errorToEmitOnPipe);
return ret;
};
}

return s3ReturnStream;
},
on: function(...args) {
return s3ReturnStream.on(...args);
},
destroy: function(...args) {
return s3ReturnStream.destroy(...args);
}
};
}
return null;
};
Expand Down

0 comments on commit e94a8c1

Please sign in to comment.