Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIX] Prevent FS write from draining Resources content #22

Merged
merged 6 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions lib/AbstractReaderWriter.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,34 @@ class AbstractReaderWriter extends AbstractReader {
* Writes the content of a resource to a path.
*
* @public
* @param {module:@ui5/fs.Resource} resource The Resource to write
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options]
* @param {boolean} [options.readOnly=false] Whether the resource content shall be written read-only
RandomByte marked this conversation as resolved.
Show resolved Hide resolved
* Do not use in conjunction with the <code>drain</code> option.
* The written file will be used as the new source of this resources content.
* Therefore the written file should not be altered by any means.
* Activating this option might improve overall memory consumption.
* @param {boolean} [options.drain=false] Whether the resource content shall be emptied during the write process.
* Do not use in conjunction with the <code>readOnly</code> option.
* Activating this option might improve overall memory consumption.
* This should be used in cases where this is the last access to the resource.
* E.g. the final write of a resource after all processing is finished.
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
write(resource) {
return this._write(resource);
write(resource, options = {drain: false, readOnly: false}) {
return this._write(resource, options);
}

/**
* Writes the content of a resource to a path.
*
* @abstract
* @protected
* @param {module:@ui5/fs.Resource} resource The Resource to write
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options] Write options, see above
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
_write(resource) {
_write(resource, options) {
throw new Error("Not implemented");
}
}
Expand Down
110 changes: 86 additions & 24 deletions lib/Resource.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,32 @@ const fnFalse = () => false;
* @memberof module:@ui5/fs
*/
class Resource {
/**
* Function for dynamic creation of content streams
*
* @public
* @callback module:@ui5/fs.Resource~createStream
* @returns {stream.Readable} A readable stream of a resources content
*/

/**
* The constructor.
*
* @public
* @param {Object} parameters Parameters
* @param {string} parameters.path Virtual path
* @param {Object} [parameters.statInfo] File stat information
* @param {fs.Stats|Object} [parameters.statInfo] File information. Instance of
* [fs.Stats]{@link https://nodejs.org/api/fs.html#fs_class_fs_stats} or similar object
* @param {Buffer} [parameters.buffer] Content of this resources as a Buffer instance
* (cannot be used in conjunction with parameters string, stream or createStream)
* @param {string} [parameters.string] Content of this resources as a string
* (cannot be used in conjunction with parameters buffer, stream or createStream)
* @param {Stream} [parameters.stream] Readable stream of the content of this resource
* (cannot be used in conjunction with parameters buffer, string or createStream)
* @param {Function} [parameters.createStream] Function callback that returns a readable stream of the content
* of this resource (cannot be used in conjunction with parameters buffer, string or stream)
* @param {module:@ui5/fs.Resource~createStream} [parameters.createStream] Function callback that returns a readable
* stream of the content of this resource (cannot be used in conjunction with parameters buffer,
* string or stream).
* In some cases this is the most memory-efficient way to supply resource content
*/
constructor({path, statInfo, buffer, string, createStream, stream, project}) {
if (!path) {
Expand Down Expand Up @@ -74,16 +85,21 @@ class Resource {
* Gets a buffer with the resource content.
*
* @public
* @returns {Promise<Buffer>} A Promise resolving with a buffer of the resource content.
* @returns {Promise<Buffer>} Promise resolving with a buffer of the resource content.
*/
getBuffer() {
return new Promise((resolve, reject) => {
return Promise.resolve().then(() => {
matz3 marked this conversation as resolved.
Show resolved Hide resolved
if (this._contentDrained) {
throw new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set.");
}
if (this._buffer) {
resolve(this._buffer);
return this._buffer;
} else if (this._createStream || this._stream) {
resolve(this._getBufferFromStream());
return this._getBufferFromStream();
} else {
reject(new Error(`Resource ${this._path} has no content`));
throw new Error(`Resource ${this._path} has no content`);
}
});
}
Expand All @@ -92,7 +108,7 @@ class Resource {
* Sets a Buffer as content.
*
* @public
* @param {Buffer} buffer A buffer instance
* @param {Buffer} buffer Buffer instance
*/
setBuffer(buffer) {
this._createStream = null;
Expand All @@ -101,23 +117,30 @@ class Resource {
// }
this._stream = null;
this._buffer = buffer;
this._contentDrained = false;
this._streamDrained = false;
}

/**
* Gets a string with the resource content.
*
* @public
* @returns {Promise<string>} A Promise resolving with a string of the resource content.
* @returns {Promise<string>} Promise resolving with the resource content.
*/
getString() {
if (this._contentDrained) {
return Promise.reject(new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set."));
}
return this.getBuffer().then((buffer) => buffer.toString());
}

/**
* Sets a String as content
*
* @public
* @param {string} string A string
* @param {string} string Resource content
*/
setString(string) {
this.setBuffer(Buffer.from(string, "utf8"));
Expand All @@ -126,34 +149,64 @@ class Resource {
/**
* Gets a readable stream for the resource content.
*
* Repetitive calls of this function are only possible if new content has been set in the meantime (through
* [setStream]{@link module:@ui5/fs.Resource#setStream}, [setBuffer]{@link module:@ui5/fs.Resource#setBuffer}
* or [setString]{@link module:@ui5/fs.Resource#setString}). This
* is to prevent consumers from accessing drained streams.
*
* @public
* @returns {stream.Readable} A readable stream for the resource content.
* @returns {stream.Readable} Readable stream for the resource content.
*/
getStream() {
if (this._contentDrained) {
throw new Error(`Content of Resource ${this._path} has been drained. ` +
"This might be caused by requesting resource content after a content stream has been " +
"requested and no new content (e.g. a new stream) has been set.");
}
let contentStream;
if (this._buffer) {
const bufferStream = new stream.PassThrough();
bufferStream.end(this._buffer);
return bufferStream;
contentStream = bufferStream;
} else if (this._createStream || this._stream) {
return this._getStream();
} else {
contentStream = this._getStream();
}
if (!contentStream) {
throw new Error(`Resource ${this._path} has no content`);
}
// If a stream instance is being returned, it will typically get drained be the consumer.
// In that case, further content access will result in a "Content stream has been drained" error.
// However, depending on the execution environment, a resources content stream might have been
// transformed into a buffer. In that case further content access is possible as a buffer can't be
// drained.
// To prevent unexpected "Content stream has been drained" errors caused by changing environments, we flag
// the resource content as "drained" every time a stream is requested. Even if actually a buffer or
// createStream callback is being used.
RandomByte marked this conversation as resolved.
Show resolved Hide resolved
this._contentDrained = true;
return contentStream;
}

/**
* Sets a readable stream as content.
*
* @public
* @param {stream.Readable} stream readable stream
* @param {stream.Readable|module:@ui5/fs.Resource~createStream} stream Readable stream of the resource content or
callback for dynamic creation of a readable stream
*/
setStream(stream) {
this._buffer = null;
this._createStream = null;
// if (this._stream) { // TODO this may cause strange issues
// this._stream.destroy();
// }
this._stream = stream;
if (typeof stream === "function") {
this._createStream = stream;
this._stream = null;
} else {
this._stream = stream;
this._createStream = null;
}
this._contentDrained = false;
this._streamDrained = false;
}

/**
Expand Down Expand Up @@ -181,7 +234,8 @@ class Resource {
* Gets the resources stat info.
*
* @public
* @returns {fs.Stats} An object representing an fs.Stats instance
* @returns {fs.Stats|Object} Instance of [fs.Stats]{@link https://nodejs.org/api/fs.html#fs_class_fs_stats}
* or similar object
*/
getStatInfo() {
return this._statInfo;
Expand All @@ -201,10 +255,10 @@ class Resource {
}

/**
* Returns a clone of the resource.
* Returns a clone of the resource. The clones content is independent from that of the original resource
*
* @public
* @returns {Promise<module:@ui5/fs.Resource>} A promise resolving the resource.
* @returns {Promise<module:@ui5/fs.Resource>} Promise resolving with the clone
*/
clone() {
const options = {
Expand Down Expand Up @@ -235,7 +289,7 @@ class Resource {
/**
* Tracing: Get tree for printing out trace
*
* @returns {Object}
* @returns {Object} Trace tree
*/
getPathTree() {
const tree = {};
Expand All @@ -253,12 +307,16 @@ class Resource {
* Returns the content as stream.
*
* @private
* @returns {Function} The stream
* @returns {stream.Readable} Readable stream
*/
_getStream() {
if (this._streamDrained) {
throw new Error(`Content stream of Resource ${this._path} is flagged as drained.`);
}
if (this._createStream) {
return this._createStream();
}
this._streamDrained = true;
RandomByte marked this conversation as resolved.
Show resolved Hide resolved
return this._stream;
}

Expand All @@ -269,7 +327,10 @@ class Resource {
* @returns {Promise<Buffer>} Promise resolving with buffer.
*/
_getBufferFromStream() {
return new Promise((resolve, reject) => {
if (this._buffering) { // Prevent simultaneous buffering, causing unexpected access to drained stream
return this._buffering;
}
return this._buffering = new Promise((resolve, reject) => {
const contentStream = this._getStream();
const buffers = [];
contentStream.on("data", (data) => {
Expand All @@ -281,6 +342,7 @@ class Resource {
contentStream.on("end", () => {
const buffer = Buffer.concat(buffers);
this.setBuffer(buffer);
this._buffering = null;
resolve(buffer);
});
});
Expand Down
70 changes: 60 additions & 10 deletions lib/adapters/FileSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const path = require("path");
const fs = require("graceful-fs");
const glob = require("globby");
const makeDir = require("make-dir");
const {PassThrough} = require("stream");
const Resource = require("../Resource");
const AbstractAdapter = require("./AbstractAdapter");

Expand Down Expand Up @@ -153,7 +154,7 @@ class FileSystem extends AbstractAdapter {

if (!stat.isDirectory()) {
// Add content
options.createStream = () => {
options.createStream = function() {
return fs.createReadStream(fsPath);
};
}
Expand All @@ -168,10 +169,26 @@ class FileSystem extends AbstractAdapter {
* Writes the content of a resource to a path.
*
* @private
* @param {module:@ui5/fs.Resource} resource The Resource
* @param {module:@ui5/fs.Resource} resource Resource to write
* @param {Object} [options]
* @param {boolean} [options.readOnly] Whether the resource content shall be written read-only
* Do not use in conjunction with the <code>drain</code> option.
* The written file will be used as the new source of this resources content.
* Therefore the written file should not be altered by any means.
* Activating this option might improve overall memory consumption.
* @param {boolean} [options.drain] Whether the resource content shall be emptied during the write process.
* Do not use in conjunction with the <code>readOnly</code> option.
* Activating this option might improve overall memory consumption.
* This should be used in cases where this is the last access to the resource.
* E.g. the final write of a resource after all processing is finished.
* @returns {Promise<undefined>} Promise resolving once data has been written
*/
_write(resource) {
async _write(resource, {drain, readOnly}) {
if (drain && readOnly) {
throw new Error(`Error while writing resource ${resource.getPath()}: ` +
"Do not use options 'drain' and 'readOnly' at the same time.");
}

const relPath = resource.getPath().substr(this._virBasePath.length);
const fsPath = path.join(this._fsBasePath, relPath);
const dirPath = path.dirname(fsPath);
Expand All @@ -182,15 +199,48 @@ class FileSystem extends AbstractAdapter {
fs
matz3 marked this conversation as resolved.
Show resolved Hide resolved
}).then(() => {
return new Promise((resolve, reject) => {
const contentStream = resource.getStream();
contentStream.on("error", function(err) {
reject(err);
});
const write = fs.createWriteStream(fsPath);
write.on("error", function(err) {
let contentStream;

if (drain || readOnly) {
// Stream will be drained
contentStream = resource.getStream();

contentStream.on("error", (err) => {
reject(err);
});
} else {
// Transform stream into buffer before writing
contentStream = new PassThrough();
const buffers = [];
contentStream.on("error", (err) => {
reject(err);
});
contentStream.on("data", (data) => {
buffers.push(data);
});
contentStream.on("end", () => {
const buffer = Buffer.concat(buffers);
resource.setBuffer(buffer);
});
resource.getStream().pipe(contentStream);
}

const writeOptions = {};
if (readOnly) {
writeOptions.mode = 0o444; // read only
}

const write = fs.createWriteStream(fsPath, writeOptions);
write.on("error", (err) => {
reject(err);
});
write.on("close", function(ex) {
write.on("close", (ex) => {
if (readOnly) {
// Create new stream from written file
resource.setStream(function() {
return fs.createReadStream(fsPath);
});
}
resolve();
});
contentStream.pipe(write);
Expand Down
Loading