Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move streams utils to the core #76397

Merged
merged 13 commits into from
Sep 3, 2020
2 changes: 2 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ module.exports = {
'!src/core/server/mocks{,.ts}',
'!src/core/server/types{,.ts}',
'!src/core/server/test_utils{,.ts}',
'!src/core/server/utils', // ts alias
'!src/core/server/utils/**/*',
// for absolute imports until fixed in
// https://github.com/elastic/kibana/issues/36096
'!src/core/server/*.test.mocks{,.ts}',
Expand Down
2 changes: 1 addition & 1 deletion src/cli_keystore/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Logger } from '../cli_plugin/lib/logger';
import { confirm, question } from '../legacy/server/utils';
import { createPromiseFromStreams, createConcatStream } from '../legacy/utils';
import { createPromiseFromStreams, createConcatStream } from '../core/server/utils';

/**
* @param {Keystore} keystore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import { exportSavedObjectsToStream } from './get_sorted_objects_for_export';
import { savedObjectsClientMock } from '../service/saved_objects_client.mock';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';

async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import Boom from 'boom';
import { createListStream } from '../../../../legacy/utils/streams';
import { createListStream } from '../../utils/streams';
import { SavedObjectsClientContract, SavedObject } from '../types';
import { fetchNestedDependencies } from './inject_nested_depdendencies';
import { sortObjects } from './sort_objects';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import {
createFilterStream,
createMapStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { SavedObject } from '../types';
import { createLimitStream } from './create_limit_stream';
import { SavedObjectsImportError } from './types';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import {
createConcatStream,
createListStream,
createPromiseFromStreams,
} from '../../../../legacy/utils/streams';
} from '../../utils/streams';
import { createLimitStream } from './create_limit_stream';

describe('createLimitStream()', () => {
Expand Down
6 changes: 1 addition & 5 deletions src/core/server/saved_objects/routes/export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

import { schema } from '@kbn/config-schema';
import stringify from 'json-stable-stringify';
import {
createPromiseFromStreams,
createMapStream,
createConcatStream,
} from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createMapStream, createConcatStream } from '../../utils/streams';
import { IRouter } from '../../http';
import { SavedObjectConfig } from '../saved_objects_config';
import { exportSavedObjectsToStream } from '../export';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jest.mock('../../export', () => ({
}));

import * as exportMock from '../../export';
import { createListStream } from '../../../../../legacy/utils/streams';
import { createListStream } from '../../../utils/streams';
import supertest from 'supertest';
import { UnwrapPromise } from '@kbn/utility-types';
import { SavedObjectConfig } from '../../saved_objects_config';
Expand Down
2 changes: 1 addition & 1 deletion src/core/server/saved_objects/routes/utils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createSavedObjectsStreamFromNdJson, validateTypes, validateObjects } from './utils';
import { Readable } from 'stream';
import { createPromiseFromStreams, createConcatStream } from '../../../../legacy/utils/streams';
import { createPromiseFromStreams, createConcatStream } from '../../utils/streams';

async function readStreamToCompletion(stream: Readable) {
return createPromiseFromStreams([stream, createConcatStream([])]);
Expand Down
6 changes: 1 addition & 5 deletions src/core/server/saved_objects/routes/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@

import { Readable } from 'stream';
import { SavedObject, SavedObjectsExportResultDetails } from 'src/core/server';
import {
createSplitStream,
createMapStream,
createFilterStream,
} from '../../../../legacy/utils/streams';
import { createSplitStream, createMapStream, createFilterStream } from '../../utils/streams';

export function createSavedObjectsStreamFromNdJson(ndJsonStream: Readable) {
return ndJsonStream
Expand Down
1 change: 1 addition & 0 deletions src/core/server/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
export * from './crypto';
export * from './from_root';
export * from './package_json';
export * from './streams';
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { createListStream, createPromiseFromStreams, createConcatStream } from './';
import { createListStream, createPromiseFromStreams, createConcatStream } from './index';

describe('concatStream', () => {
test('accepts an initial value', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ import { createReduceStream } from './reduce_stream';
* items will concat with
* @return {Transform}
*/
export function createConcatStream(initial) {
export function createConcatStream<T>(initial?: T) {
return createReduceStream((acc, chunk) => acc.concat(chunk), initial);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { PassThrough } from 'stream';
import { Readable, PassThrough, TransformOptions } from 'stream';

/**
* Write the data and errors from a list of stream providers
Expand All @@ -29,7 +29,10 @@ import { PassThrough } from 'stream';
* @param {PassThroughOptions} options options passed to the PassThrough constructor
* @return {WritableStream} combined stream
*/
export function concatStreamProviders(sourceProviders, options = {}) {
export function concatStreamProviders(
sourceProviders: Array<() => Readable>,
options?: TransformOptions
) {
const destination = new PassThrough(options);
const queue = sourceProviders.slice();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
createFilterStream,
createListStream,
createPromiseFromStreams,
} from './';
} from './index';

describe('createFilterStream()', () => {
test('calls the function with each item in the source stream', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
createListStream,
createIntersperseStream,
createConcatStream,
} from './';
} from './index';

describe('intersperseStream', () => {
test('places the intersperse value between each provided value', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import { Transform } from 'stream';
* @param {String|Buffer} intersperseChunk
* @return {Transform}
*/
export function createIntersperseStream(intersperseChunk) {
export function createIntersperseStream(intersperseChunk: string | Buffer) {
let first = true;

return new Transform({
Expand All @@ -55,7 +55,7 @@ export function createIntersperseStream(intersperseChunk) {
}

this.push(chunk);
callback(null);
callback();
} catch (err) {
callback(err);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { createListStream } from './';
import { createListStream } from './index';

describe('listStream', () => {
test('provides the values in the initial list', async () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import { Readable } from 'stream';
* @param {Array<any>} items - the list of items to provide
* @return {Readable}
*/
export function createListStream(items = []) {
const queue = [].concat(items);
export function createListStream<T = any>(items: T | T[] = []) {
const queue = Array.isArray(items) ? [...items] : [items];

return new Readable({
objectMode: true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('createMapStream()', () => {
test('send the return value from the mapper on the output stream', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream((n) => n * 100),
createMapStream((n: number) => n * 100),
createConcatStream([]),
]);

Expand All @@ -49,7 +49,7 @@ describe('createMapStream()', () => {
test('supports async mappers', async () => {
const result = await createPromiseFromStreams([
createListStream([1, 2, 3]),
createMapStream(async (n, i) => {
createMapStream(async (n: number, i: number) => {
await delay(n);
return n * i;
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Transform } from 'stream';

export function createMapStream(fn) {
export function createMapStream<T>(fn: (value: T, i: number) => void) {
let i = 0;

return new Transform({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Readable, Writable, Duplex, Transform } from 'stream';

import { createListStream, createPromiseFromStreams, createReduceStream } from './';
import { createListStream, createPromiseFromStreams, createReduceStream } from './index';

describe('promiseFromStreams', () => {
test('pipes together an array of streams', async () => {
Expand Down Expand Up @@ -76,14 +76,13 @@ describe('promiseFromStreams', () => {
test('waits for writing and resolves to final value', async () => {
let written = '';

const duplexReadQueue = [];
const duplexReadQueue: Array<Promise<unknown>> = [];
const duplexItemsToPush = ['foo', 'bar', null];
const result = await createPromiseFromStreams([
createListStream(['a', 'b', 'c']),
new Duplex({
async read() {
const result = await duplexReadQueue.shift();
this.push(result);
this.push(await duplexReadQueue.shift());
},

write(chunk, enc, cb) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,20 @@
* @return {Promise<any>}
*/

import { pipeline, Writable } from 'stream';
import { pipeline, Writable, Readable } from 'stream';

export async function createPromiseFromStreams(streams) {
let finalChunk;
function isReadable(stream: Readable | Writable): stream is Readable {
return 'read' in stream && typeof stream.read === 'function';
}

export async function createPromiseFromStreams<T>(streams: [Readable, ...Writable[]]): Promise<T> {
let finalChunk: any;
const last = streams[streams.length - 1];
if (typeof last.read !== 'function' && streams.length === 1) {
if (!isReadable(last) && streams.length === 1) {
// For a nicer error than what stream.pipeline throws
throw new Error('A minimum of 2 streams is required when a non-readable stream is given');
}
if (typeof last.read === 'function') {
if (isReadable(last)) {
// We are pushing a writable stream to capture the last chunk
streams.push(
new Writable({
Expand All @@ -57,7 +61,9 @@ export async function createPromiseFromStreams(streams) {
})
);
}

return new Promise((resolve, reject) => {
// @ts-expect-error 'pipeline' doesn't support variable length of arguments
pipeline(...streams, (err) => {
if (err) return reject(err);
resolve(finalChunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Transform } from 'stream';
import { createReduceStream, createPromiseFromStreams, createListStream } from './index';

import { createReduceStream, createPromiseFromStreams, createListStream } from './';

const promiseFromEvent = (name, emitter) =>
const promiseFromEvent = (name: string, emitter: Transform) =>
new Promise((resolve) => emitter.on(name, () => resolve(name)));

describe('reduceStream', () => {
Expand Down Expand Up @@ -47,7 +47,10 @@ describe('reduceStream', () => {
});

test('emits an error if an iteration fails', async () => {
const reduce = createReduceStream((acc, i) => expect(i).toBe(1), 0);
const reduce = createReduceStream((acc, i) => {
expect(i).toBe(1);
return acc;
}, 0);
const errorEvent = promiseFromEvent('error', reduce);

reduce.write(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ import { Transform } from 'stream';
* initial value.
* @return {Transform}
*/
export function createReduceStream(reducer, initial) {
export function createReduceStream<T>(
reducer: (value: any, chunk: T, enc: string) => T,
initial?: T
) {
let i = -1;
let value = initial;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
import { Writable, Readable } from 'stream';

import {
createReplaceStream,
createConcatStream,
createPromiseFromStreams,
createListStream,
createMapStream,
} from './';
} from './index';

async function concatToString(streams) {
async function concatToString(streams: [Readable, ...Writable[]]) {
return await createPromiseFromStreams([
...streams,
createMapStream((buff) => buff.toString('utf8')),
createMapStream((buff: Buffer) => buff.toString('utf8')),
createConcatStream(''),
]);
}

describe('replaceStream', () => {
test('produces buffers when it receives buffers', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<Buffer[]>([
createListStream([Buffer.from('foo'), Buffer.from('bar')]),
createReplaceStream('o', '0'),
createConcatStream([]),
Expand All @@ -47,7 +48,7 @@ describe('replaceStream', () => {
});

test('produces buffers when it receives strings', async () => {
const chunks = await createPromiseFromStreams([
const chunks = await createPromiseFromStreams<string[]>([
createListStream(['foo', 'bar']),
createReplaceStream('o', '0'),
createConcatStream([]),
Expand All @@ -59,6 +60,7 @@ describe('replaceStream', () => {
});

test('expects toReplace to be a string', () => {
// @ts-expect-error
expect(() => createReplaceStream(Buffer.from('foo'))).toThrowError(/be a string/);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { Transform } from 'stream';

export function createReplaceStream(toReplace, replacement) {
export function createReplaceStream(toReplace: string, replacement: string | Buffer) {
if (typeof toReplace !== 'string') {
throw new TypeError('toReplace must be a string');
}
Expand Down Expand Up @@ -78,6 +78,7 @@ export function createReplaceStream(toReplace, replacement) {
this.push(buffer);
}

// @ts-expect-error
buffer = null;
callback();
},
Expand Down
Loading