Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rstream 77 sdk kinesis shards phase 1 simple #145

Merged
merged 3 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package-lock.json
.scannerwork
.idea
.idea/
*.js
*.js.map
*.utest.js
docs/build/
Expand Down
4 changes: 2 additions & 2 deletions lib/dynamodb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ interface PutOptions { }
* @todo document functions below
*/
export interface LeoDynamodb {
getSettingPromise: any;
setSettingPromise: any;
getSettingPromise: <T>(setting_id: string) => Promise<T>;
setSettingPromise: <T>(setting_id: string, value: T) => Promise<void>;
docClient: AWS.DynamoDB.DocumentClient,
get: <T>(table: string, id: string, opts: GetOptions, callback: DataCallback<AWSError, T>) => void,
put: <T>(table: string, id: string, item: T, opts: PutOptions, callback: Callback<AWSError>) => void,
Expand Down
14 changes: 10 additions & 4 deletions lib/dynamodb.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"use strict";
const { promisify } = require("util");
var AWS = require('./leo-aws');
var https = require("https");
let extend = require("extend");
Expand Down Expand Up @@ -35,7 +36,7 @@ module.exports = function(configure) {
},
credentials: configure.credentials
});
return {
let ddbLib = {
docClient: docClient,

get: function(table, id, opts, callback) {
Expand Down Expand Up @@ -381,6 +382,11 @@ module.exports = function(configure) {
});
},
};


ddbLib.getSettingPromise = promisify(ddbLib.getSetting).bind(ddbLib);
ddbLib.setSettingPromise = promisify(ddbLib.saveSetting).bind(ddbLib);
return ddbLib;
};


Expand Down Expand Up @@ -465,7 +471,7 @@ let chunk = function(func, opts) {
for (var i = 0; i < items.length; i++) {
var item = items[i];
if (item.size + size >= opts.record_size) {
logger.log(`grouping items from ${groupStart+1} to ${i} of ${items.length} of size: ${size}`);
logger.log(`grouping items from ${groupStart + 1} to ${i} of ${items.length} of size: ${size}`);
toProcess.push(items.slice(groupStart, i).map((e) => {
return e.record;
}).join(''));
Expand All @@ -476,7 +482,7 @@ let chunk = function(func, opts) {
}
}
if (groupStart != items.length) {
logger.log(`grouping items from ${groupStart+1} to ${items.length} of ${items.length} of size: ${size}`);
logger.log(`grouping items from ${groupStart + 1} to ${items.length} of ${items.length} of size: ${size}`);
toProcess.push(items.slice(groupStart, items.length).map((e) => {
return e.record;
}).join(''));
Expand All @@ -490,7 +496,7 @@ let chunk = function(func, opts) {

if (toProcess.length > 0) {
if (opts.chunk_size >= 10 && opts.concurrency <= 25) {
logger.log(`chunking ${toProcess.length} records (${dataSizeBased?'Data Size':'Count Size'})`);
logger.log(`chunking ${toProcess.length} records (${dataSizeBased ? 'Data Size' : 'Count Size'})`);
}
func(toProcess, function(err, unprocessedItems) {
if (err) {
Expand Down
42 changes: 28 additions & 14 deletions lib/lib.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ export interface BaseWriteOptions {
* @todo question Need examples of what this can take? Cool moment things used for example. Is this ms?
*/
time?: moment.DurationInputArg1;


/**
* The hash value used to explicitly determine the shard to send events
*
* @default 0
*/
partitionHashKey?: string;

/**
* Flag to use the queue name to determine the shard to send events
* @default false
*/
useQueuePartition?: boolean
}

/**
Expand All @@ -132,10 +146,10 @@ export interface BaseWriteOptions {
* to configure how to write.
*/
export interface WriteOptions extends BaseWriteOptions {
/**
* If true, the checkpoint will be applied even if someone else already checkpointed on the same bot/queue
* since the last time this code checkpointed. This is only used in advanced fanout cases.
*/
/**
* If true, the checkpoint will be applied even if someone else already checkpointed on the same bot/queue
* since the last time this code checkpointed. This is only used in advanced fanout cases.
*/
force?: boolean
}

Expand Down Expand Up @@ -740,8 +754,8 @@ export declare namespace StreamUtil {

/**
* This is a sink, a step designed to be the last step in the pipe.
*
* @internal
*
* @internal
* @todo unclear
* @todo incomplete
* @todo example
Expand Down Expand Up @@ -793,7 +807,7 @@ export declare namespace StreamUtil {
* @param config When to checkpoint.
* @returns The pipeline step that is ready to be used in a pipeline
*
* @internal
* @internal
* @todo review
* @todo example
*/
Expand Down Expand Up @@ -835,7 +849,7 @@ export declare namespace StreamUtil {
* @param config Options for when to checkpoint.
* @returns The pipeline step that is ready to be used in a pipeline
*
* @internal
* @internal
* @todo question what's the usage difference in this versus toCheckpoint where this is a Writable and the other is a TransformStream
* @todo unclear Probably have this description wrong.
*/
Expand Down Expand Up @@ -883,7 +897,7 @@ export declare namespace StreamUtil {
function enrich<T, U>(opts: EnrichBatchOptions<T, U>, callback: Callback): void;

/**
* This is a callback-based version of the [[`RStreamsSdk.offloadEvents`]] function and should no longer be used.
* This is a callback-based version of the [[`RStreamsSdk.offloadEvents`]] function and should no longer be used.
* Callback-based API flavors will be phased out over time.
*
* It reads events from a queue to do general processing (such as write to an external DB). It's called
Expand All @@ -908,12 +922,12 @@ export declare namespace StreamUtil {
*
* @typeParam T The type of the data received by the pipeline step
* @param botId For events that don't specify the bot to act as, this default is used.
* It is the bot to act as when writing, events will be marked as written by this bot.
* If not provided, each event must include the id of the bot to write the event as.
* It is the bot to act as when writing, events will be marked as written by this bot.
* If not provided, each event must include the id of the bot to write the event as.
* @param outQueue For events that don't specify the queue to write to, this default is used.
* It is the queue into which events will be written. If not provided, each event must
* include the queue to write the event to.
*
* It is the queue into which events will be written. If not provided, each event must
* include the queue to write the event to.
*
* @param config An object that contains config values that control the flow of events to outQueue
* @todo example
*/
Expand Down
10 changes: 5 additions & 5 deletions lib/stream/helper/chunkEventStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,16 @@ module.exports = function(ls, event, opts) {
start: 0,
end: 0,
units: 0,
checkpoint: null
checkpoint: null,
queues: {}
};
}
let eventData = item.stats[id];
eventData.units += (stats.units || 1);
eventData.start = Math.max(start, eventData.start);
eventData.end = Math.max(timestamp, eventData.end);
eventData.checkpoint = stats.checkpoint || stats.eid;
eventData.queues[stats.event] = (eventData.queues[stats.event] || 0) + ((stats.units != null) ? stats.units : 1);
}

function updateCorrelation(c) {
Expand Down Expand Up @@ -197,16 +199,14 @@ module.exports = function(ls, event, opts) {
});
}
}
},
function emit(done, data) {
}, function emit(done, data) {
emitChunk(data.isLast, (value) => {
if (value && (value.size || (value.correlations && Object.keys(value.correlations).length))) {
this.push(value);
}
done();
});
},
function end(done) {
}, function end(done) {
logger.debug("done chunking");
logger.debug("total", totalWrites, totalRecords, totalWrites / totalRecords);
done();
Expand Down
106 changes: 66 additions & 40 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ module.exports = function(configure) {
done();
return;
} else if (r != null && typeof r == "object") {
extra.event = rOpts.queue || outQueue
extra.event = rOpts.queue || outQueue;
if (Array.isArray(r)) {
r.slice(0, -1).forEach(data => {
context.push(data, { partial: true });
Expand Down Expand Up @@ -701,10 +701,12 @@ module.exports = function(configure) {
};
retry.run = function(callback) {
let fail = (err) => {
currentQueues.clear();
retry.removeListener('success', success);
callback(err || 'failed');
};
let success = () => {
currentQueues.clear();
retry.removeListener('fail', fail);
var c = correlations;
reset();
Expand All @@ -726,6 +728,9 @@ module.exports = function(configure) {
if (records.length === 0) {
retry.success();
} else if (opts.firehose) {
if (opts.useQueuePartition) {
logger.error(`Stream using Firehose with 'useQueuePartition' enabled. May cause processing issues.`);
}
logger.debug("sending", records.length, number, delay);
logger.time("firehose request");
firehose.putRecordBatch({
Expand Down Expand Up @@ -758,12 +763,26 @@ module.exports = function(configure) {
} else {
logger.debug("sending", records.length, number, delay);
logger.time("kinesis request");
let partitionKey = "0";
let explicitHashKey = (opts.partitionHashKey || 0).toString();
if (opts.useQueuePartition && currentQueues.size > 0) {

let currentQueuesAsArray = Array.from(currentQueues);
let queue = currentQueuesAsArray[0];

if (currentQueuesAsArray.length > 1) {
logger.error(`Stream with multiple queues with 'useQueuePartition' enabled. Using '${queue}' for partition. May cause processing issues. All queues [${currentQueuesAsArray.join(", ")}]`);
}

partitionKey = queue != null ? queue : "0";
explicitHashKey = undefined;
}
kinesis.putRecords({
Records: records.map((r) => {
return {
Data: r,
PartitionKey: "0",
ExplicitHashKey: (opts.partitionHashKey || 0).toString()
PartitionKey: partitionKey,
ExplicitHashKey: explicitHashKey
};
}),
StreamName: configure.stream
Expand Down Expand Up @@ -802,7 +821,17 @@ module.exports = function(configure) {
}, opts.chunk || {});
chunkOpts.gzip = !opts.firehose;

let currentQueues = new Set();
var p = ls.buffer(opts, function(obj, callback) {
if (obj.stats) {
Object.values(obj.stats).map(v => {
let queues = v.queues;
// We don't want queues to continue past here, so remove it
delete v.queues;
return Object.keys(queues);
}).reduce((a, b) => a.concat(b), []).forEach(q => currentQueues.add(q));

}
if (obj.records > 0) {
if (obj.gzip) {
records.push(obj.gzip);
Expand Down Expand Up @@ -1715,23 +1744,21 @@ module.exports = function(configure) {
});
gzip.end(item.gzip);
}
},
function(err) {
start = items[items.length - 1].end + " ";
logger.debug("done with this loop", err || "");
if (err) {
pass.emit("error", err);
} else {
done();
}
});
});
},
(err) => {
logger.debug("Calling Pass.end");
if (err) logger.error(err);
pass.end();
}, function(err) {
start = items[items.length - 1].end + " ";
logger.debug("done with this loop", err || "");
if (err) {
pass.emit("error", err);
} else {
done();
}
});
});
}, (err) => {
logger.debug("Calling Pass.end");
if (err) logger.error(err);
pass.end();
});
} else {
logger.debug("no events");
pass.end();
Expand Down Expand Up @@ -1883,26 +1910,25 @@ module.exports = function(configure) {
[table]: myRecords
},
"ReturnConsumedCapacity": 'TOTAL'
},
function(err, data) {
if (err) {
logger.info(`All ${myRecords.length} records failed! Retryable: ${err.retryable}`, err);
logger.error(myRecords);
if (err.retryable) {
retry.backoff(err);
} else {
retry.fail(err);
}
} else if (table in data.UnprocessedItems && Object.keys(data.UnprocessedItems[table]).length !== 0) {
//reset();
//data.UnprocessedItems[table].map(m => records.push(m.PutRequest.Item));
myRecords = data.UnprocessedItems[table];
retry.backoff();
}, function(err, data) {
if (err) {
logger.info(`All ${myRecords.length} records failed! Retryable: ${err.retryable}`, err);
logger.error(myRecords);
if (err.retryable) {
retry.backoff(err);
} else {
logger.info(table, "saved");
retry.success();
retry.fail(err);
}
});
} else if (table in data.UnprocessedItems && Object.keys(data.UnprocessedItems[table]).length !== 0) {
//reset();
//data.UnprocessedItems[table].map(m => records.push(m.PutRequest.Item));
myRecords = data.UnprocessedItems[table];
retry.backoff();
} else {
logger.info(table, "saved");
retry.success();
}
});
});
}
getExisting((err, existing) => {
Expand Down Expand Up @@ -2220,13 +2246,13 @@ module.exports = function(configure) {
} else if (opts == null && units == null) {
// Single event to use as correlation
opts = endEvent;
units = (startEvent != null && typeof startEvent.units === "number") ? startEvent.units : 1
units = (startEvent != null && typeof startEvent.units === "number") ? startEvent.units : 1;
}

opts = {
partial: false,
...opts
}
};

if (startEvent == null) {
throw new Error(isArray ? "startEvent must not be empty" : "startEvent is required");
Expand All @@ -2236,7 +2262,7 @@ module.exports = function(configure) {
[opts.partial ? 'partial_start' : 'start']: startEvent.eid,
[opts.partial ? 'partial_end' : 'end']: endEvent && endEvent.eid,
units: units
}
};
}
};

Expand Down
4 changes: 2 additions & 2 deletions lib/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,8 +490,8 @@ let ls = module.exports = {
done(null, obj);
});

parse.on("error", () => {
logger.error(arguments);
parse.on("error", function(...args) {
logger.error(...args);
});

return pumpify(parse, transform);
Expand Down
Loading