Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Node.js close bindings for query results and connections #3436

Merged
merged 2 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tools/nodejs_api/src_cpp/include/node_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class NodeConnection : public Napi::ObjectWrap<NodeConnection> {
void SetQueryTimeout(const Napi::CallbackInfo& info);
Napi::Value ExecuteAsync(const Napi::CallbackInfo& info);
Napi::Value QueryAsync(const Napi::CallbackInfo& info);
void Close(const Napi::CallbackInfo& info);

private:
std::shared_ptr<Database> database;
Expand Down
2 changes: 2 additions & 0 deletions tools/nodejs_api/src_cpp/include/node_query_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class NodeQueryResult : public Napi::ObjectWrap<NodeQueryResult> {
Napi::Value GetNextAsync(const Napi::CallbackInfo& info);
Napi::Value GetColumnDataTypesAsync(const Napi::CallbackInfo& info);
Napi::Value GetColumnNamesAsync(const Napi::CallbackInfo& info);
void Close(const Napi::CallbackInfo& info);
void Close();

private:
QueryResult* queryResult = nullptr;
Expand Down
7 changes: 7 additions & 0 deletions tools/nodejs_api/src_cpp/node_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) {
InstanceMethod("queryAsync", &NodeConnection::QueryAsync),
InstanceMethod("setMaxNumThreadForExec", &NodeConnection::SetMaxNumThreadForExec),
InstanceMethod("setQueryTimeout", &NodeConnection::SetQueryTimeout),
InstanceMethod("close", &NodeConnection::Close)
});

exports.Set("NodeConnection", t);
Expand Down Expand Up @@ -68,6 +69,12 @@ void NodeConnection::SetQueryTimeout(const Napi::CallbackInfo& info) {
}
}

void NodeConnection::Close(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
this->connection.reset();
}

Napi::Value NodeConnection::ExecuteAsync(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
Expand Down
23 changes: 16 additions & 7 deletions tools/nodejs_api/src_cpp/node_query_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) {
Napi::HandleScope scope(env);

Napi::Function t = DefineClass(env, "NodeQueryResult",
{
InstanceMethod("resetIterator", &NodeQueryResult::ResetIterator),
{InstanceMethod("resetIterator", &NodeQueryResult::ResetIterator),
InstanceMethod("hasNext", &NodeQueryResult::HasNext),
InstanceMethod("hasNextQueryResult", &NodeQueryResult::HasNextQueryResult),
InstanceMethod("getNextQueryResultAsync", &NodeQueryResult::GetNextQueryResultAsync),
InstanceMethod("getNumTuples", &NodeQueryResult::GetNumTuples),
InstanceMethod("getNextAsync", &NodeQueryResult::GetNextAsync),
InstanceMethod("getColumnDataTypesAsync", &NodeQueryResult::GetColumnDataTypesAsync),
InstanceMethod("getColumnNamesAsync", &NodeQueryResult::GetColumnNamesAsync),
});
InstanceMethod("close", &NodeQueryResult::Close)});

exports.Set("NodeQueryResult", t);
return exports;
Expand All @@ -30,10 +29,7 @@ NodeQueryResult::NodeQueryResult(const Napi::CallbackInfo& info)
: Napi::ObjectWrap<NodeQueryResult>(info) {}

NodeQueryResult::~NodeQueryResult() {
if (this->isOwned) {
delete this->queryResult;
this->queryResult = nullptr;
}
this->Close();
}

void NodeQueryResult::SetQueryResult(QueryResult* queryResult, bool isOwned) {
Expand Down Expand Up @@ -123,3 +119,16 @@ Napi::Value NodeQueryResult::GetColumnNamesAsync(const Napi::CallbackInfo& info)
asyncWorker->Queue();
return info.Env().Undefined();
}

void NodeQueryResult::Close(const Napi::CallbackInfo& info) {
Napi::Env env = info.Env();
Napi::HandleScope scope(env);
this->Close();
}

void NodeQueryResult::Close() {
if (this->isOwned) {
delete this->queryResult;
this->queryResult = nullptr;
}
}
154 changes: 100 additions & 54 deletions tools/nodejs_api/src_js/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Connection {
this._connection = null;
this._isInitialized = false;
this._initPromise = null;
this._isClosed = false;
numThreads = parseInt(numThreads);
if (numThreads && numThreads > 0) {
this._numThreads = numThreads;
Expand All @@ -36,6 +37,9 @@ class Connection {
* connection is initialized automatically when the first query is executed.
*/
async init() {
if (this._isClosed) {
throw new Error("Connection is closed.");
}
if (!this._connection) {
const database = await this._database._getDatabase();
this._connection = new KuzuNative.NodeConnection(database);
Expand Down Expand Up @@ -69,6 +73,9 @@ class Connection {
* @returns {KuzuNative.NodeConnection} the underlying native connection.
*/
async _getConnection() {
if (this._isClosed) {
throw new Error("Connection is closed.");
}
await this.init();
return this._connection;
}
Expand Down Expand Up @@ -123,30 +130,34 @@ class Connection {
);
}
}
this._getConnection().then((connection) => {
const nodeQueryResult = new KuzuNative.NodeQueryResult();
try {
connection.executeAsync(
preparedStatement._preparedStatement,
nodeQueryResult,
paramArray,
(err) => {
if (err) {
return reject(err);
}
this._unwrapMultipleQueryResults(nodeQueryResult)
.then((queryResults) => {
return resolve(queryResults);
})
.catch((err) => {
this._getConnection()
.then((connection) => {
const nodeQueryResult = new KuzuNative.NodeQueryResult();
try {
connection.executeAsync(
preparedStatement._preparedStatement,
nodeQueryResult,
paramArray,
(err) => {
if (err) {
return reject(err);
});
}
);
} catch (e) {
return reject(e);
}
});
}
this._unwrapMultipleQueryResults(nodeQueryResult)
.then((queryResults) => {
return resolve(queryResults);
})
.catch((err) => {
return reject(err);
});
}
);
} catch (e) {
return reject(e);
}
})
.catch((err) => {
return reject(err);
});
});
}

Expand All @@ -160,18 +171,22 @@ class Connection {
if (typeof statement !== "string") {
return reject(new Error("statement must be a string."));
}
this._getConnection().then((connection) => {
const preparedStatement = new KuzuNative.NodePreparedStatement(
connection,
statement
);
preparedStatement.initAsync((err) => {
if (err) {
return reject(err);
}
return resolve(new PreparedStatement(this, preparedStatement));
this._getConnection()
.then((connection) => {
const preparedStatement = new KuzuNative.NodePreparedStatement(
connection,
statement
);
preparedStatement.initAsync((err) => {
if (err) {
return reject(err);
}
return resolve(new PreparedStatement(this, preparedStatement));
});
})
.catch((err) => {
return reject(err);
});
});
});
}

Expand All @@ -185,25 +200,29 @@ class Connection {
if (typeof statement !== "string") {
return reject(new Error("statement must be a string."));
}
this._getConnection().then((connection) => {
const nodeQueryResult = new KuzuNative.NodeQueryResult();
try {
connection.queryAsync(statement, nodeQueryResult, (err) => {
if (err) {
return reject(err);
}
this._unwrapMultipleQueryResults(nodeQueryResult)
.then((queryResults) => {
return resolve(queryResults);
})
.catch((err) => {
this._getConnection()
.then((connection) => {
const nodeQueryResult = new KuzuNative.NodeQueryResult();
try {
connection.queryAsync(statement, nodeQueryResult, (err) => {
if (err) {
return reject(err);
});
});
} catch (e) {
return reject(e);
}
});
}
this._unwrapMultipleQueryResults(nodeQueryResult)
.then((queryResults) => {
return resolve(queryResults);
})
.catch((err) => {
return reject(err);
});
});
} catch (e) {
return reject(e);
}
})
.catch((err) => {
return reject(err);
});
});
}

Expand Down Expand Up @@ -238,8 +257,7 @@ class Connection {
let currentQueryResult = nodeQueryResult;
while (currentQueryResult.hasNextQueryResult()) {
queryResults.push(await this._getNextQueryResult(currentQueryResult));
currentQueryResult =
queryResults[queryResults.length - 1]._queryResult;
currentQueryResult = queryResults[queryResults.length - 1]._queryResult;
}
return queryResults;
}
Expand Down Expand Up @@ -280,6 +298,34 @@ class Connection {
this._queryTimeout = timeoutInMs;
}
}

/**
* Close the connection.
*
* Note: Call to this method is optional. The connection will be closed
* automatically when the object goes out of scope.
*/
async close() {
if (this._isClosed) {
return;
}
if (!this._isInitialized) {
if (this._initPromise) {
// Connection is initializing, wait for it to finish first.
await this._initPromise;
} else {
// Connection is not initialized, simply mark it as closed and initialized.
this._isInitialized = true;
this._isClosed = true;
delete this._connection;
return;
}
}
// Connection is initialized, close it.
this._connection.close();
delete this._connection;
this._isClosed = true;
}
}

module.exports = Connection;
10 changes: 9 additions & 1 deletion tools/nodejs_api/src_js/database.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,15 @@ class Database {
}

/**
* Close the database.
* Close the database. Once the database is closed, the lock on the database
* files is released and the database can be opened in another process.
*
* Note: Call to this method is not required.
* The Node.js garbage collector will automatically close the database when no
* references to the database object exist. It is recommended not to call this
* method explicitly. If you decide to manually close the database, make sure
* that all the QueryResult and Connection objects are closed before calling
* this method.
*/
async close() {
if (this._isClosed) {
Expand Down
Loading
Loading