Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement catalog cache in postgres scanner #3071

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion extension/duckdb_scanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ add_library(duckdb_scanner
src/duckdb_scanner_extension.cpp
src/duckdb_storage.cpp
src/duckdb_scan.cpp
src/duckdb_type_converter.cpp)
src/duckdb_type_converter.cpp
src/duckdb_catalog.cpp
src/duckdb_table_catalog_entry.cpp)

set_target_properties(duckdb_scanner PROPERTIES
OUTPUT_NAME duckdb_scanner
Expand Down
29 changes: 29 additions & 0 deletions extension/duckdb_scanner/src/duckdb_catalog.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "duckdb_catalog.h"

namespace kuzu {
namespace duckdb_scanner {

common::table_id_t DuckDBCatalogContent::createForeignTable(
const binder::BoundCreateTableInfo& info) {
auto tableID = assignNextTableID();
auto extraInfo = common::ku_dynamic_cast<binder::BoundExtraCreateCatalogEntryInfo*,
BoundExtraCreateDuckDBTableInfo*>(info.extraInfo.get());
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
for (auto& propertyInfo : extraInfo->propertyInfos) {
columnNames.push_back(propertyInfo.name);
columnTypes.push_back(propertyInfo.type);
}
DuckDBScanBindData bindData(common::stringFormat("SELECT * FROM {}", info.tableName),
extraInfo->dbPath, std::move(columnTypes), std::move(columnNames));
auto tableEntry = std::make_unique<catalog::DuckDBTableCatalogEntry>(
info.tableName, tableID, getScanFunction(std::move(bindData)));
for (auto& propertyInfo : extraInfo->propertyInfos) {
tableEntry->addProperty(propertyInfo.name, propertyInfo.type.copy());
}
tables->createEntry(std::move(tableEntry));
return tableID;
}

} // namespace duckdb_scanner
} // namespace kuzu
43 changes: 10 additions & 33 deletions extension/duckdb_scanner/src/duckdb_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "common/exception/binder.h"
#include "common/types/types.h"
#include "duckdb_type_converter.h"
#include "function/table/bind_input.h"

using namespace kuzu::function;
Expand Down Expand Up @@ -38,8 +37,8 @@ struct DuckDBScanFunction {
static common::offset_t tableFunc(
function::TableFuncInput& input, function::TableFuncOutput& output);

static std::unique_ptr<function::TableFuncBindData> bindFunc(
std::string dbPath, main::ClientContext* /*context*/, function::TableFuncBindInput* input);
static std::unique_ptr<function::TableFuncBindData> bindFunc(DuckDBScanBindData bindData,
main::ClientContext* /*context*/, function::TableFuncBindInput* input);

static std::unique_ptr<function::TableFuncSharedState> initSharedState(
function::TableFunctionInitInput& input);
Expand Down Expand Up @@ -211,40 +210,18 @@ common::offset_t DuckDBScanFunction::tableFunc(
return output.dataChunk.state->selVector->selectedSize;
}

std::unique_ptr<function::TableFuncBindData> DuckDBScanFunction::bindFunc(std::string dbPath,
main::ClientContext* /*clientContext*/, function::TableFuncBindInput* input) {
auto tableName = input->inputs[0].getValue<std::string>();
duckdb::DBConfig dbConfig{true /* read_only */};
duckdb::DuckDB db(dbPath, &dbConfig);
duckdb::Connection conn(db);
auto result = conn.Query(
common::stringFormat("select data_type,column_name from information_schema.columns where "
"table_name = '{}' and table_schema='main';",
tableName));
if (result->RowCount() == 0) {
throw common::BinderException(
common::stringFormat("No table named: {} in database.", tableName));
}
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
columnTypes.reserve(result->RowCount());
columnNames.reserve(result->RowCount());
for (auto i = 0u; i < result->RowCount(); i++) {
columnTypes.push_back(
DuckDBTypeConverter::convertDuckDBType(result->GetValue(0, i).GetValue<std::string>()));
columnNames.push_back(result->GetValue(1, i).GetValue<std::string>());
}
auto query = common::stringFormat("SELECT * FROM {}", tableName);
return std::make_unique<DuckDBScanBindData>(
query, dbPath, std::move(columnTypes), std::move(columnNames));
std::unique_ptr<function::TableFuncBindData> DuckDBScanFunction::bindFunc(
DuckDBScanBindData bindData, main::ClientContext* /*clientContext*/,
function::TableFuncBindInput* /*input*/) {
return bindData.copy();
}

TableFunction getScanFunction(std::string dbPath) {
TableFunction getScanFunction(DuckDBScanBindData bindData) {
return TableFunction(DuckDBScanFunction::DUCKDB_SCAN_FUNC_NAME, DuckDBScanFunction::tableFunc,
std::bind(
DuckDBScanFunction::bindFunc, dbPath, std::placeholders::_1, std::placeholders::_2),
std::bind(DuckDBScanFunction::bindFunc, std::move(bindData), std::placeholders::_1,
std::placeholders::_2),
DuckDBScanFunction::initSharedState, DuckDBScanFunction::initLocalState,
std::vector<LogicalTypeID>{LogicalTypeID::STRING});
std::vector<LogicalTypeID>{});
}

} // namespace duckdb_scanner
Expand Down
77 changes: 75 additions & 2 deletions extension/duckdb_scanner/src/duckdb_storage.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,84 @@
#include "duckdb_storage.h"

#include "binder/ddl/bound_create_table_info.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "common/exception/binder.h"
#include "duckdb_catalog.h"
#include "duckdb_scan.h"
#include "duckdb_type_converter.h"

namespace kuzu {
namespace duckdb_scanner {

std::unique_ptr<main::AttachedDatabase> attachDuckDB(std::string dbName, std::string dbPath) {
static bool getTableInfo(duckdb::Connection& con, std::string tableName,
std::vector<common::LogicalType>& columnTypes, std::vector<std::string>& columnNames) {
auto result = con.Query(
common::stringFormat("select data_type,column_name from information_schema.columns where "
"table_name = '{}' and table_schema='main';",
tableName));
if (result->RowCount() == 0) {
return false;
}
columnTypes.reserve(result->RowCount());
columnNames.reserve(result->RowCount());
for (auto i = 0u; i < result->RowCount(); i++) {
try {
columnTypes.push_back(DuckDBTypeConverter::convertDuckDBType(
result->GetValue(0, i).GetValue<std::string>()));
} catch (common::BinderException& e) { return false; }
columnNames.push_back(result->GetValue(1, i).GetValue<std::string>());
}
return true;
}

std::unique_ptr<binder::BoundCreateTableInfo> getCreateTableInfo(
duckdb::Connection& con, std::string tableName, std::string dbPath) {
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
if (!getTableInfo(con, tableName, columnTypes, columnNames)) {
return nullptr;
}
std::vector<binder::PropertyInfo> propertyInfos;
for (auto i = 0u; i < columnNames.size(); i++) {
auto propertyInfo = binder::PropertyInfo(columnNames[i], columnTypes[i]);
propertyInfos.push_back(std::move(propertyInfo));
}
return std::make_unique<binder::BoundCreateTableInfo>(common::TableType::FOREIGN, tableName,
std::make_unique<BoundExtraCreateDuckDBTableInfo>(
std::move(dbPath), std::move(propertyInfos)));
}

std::unique_ptr<DuckDBCatalogContent> createCatalog(
std::string dbPath, main::ClientContext* context) {
duckdb::DuckDB db(dbPath);
duckdb::Connection con(db);
auto query = "select table_name from information_schema.tables where table_schema = 'main';";
auto result = con.SendQuery(query);
std::unique_ptr<duckdb::DataChunk> resultChunk;
try {
resultChunk = result->Fetch();
} catch (std::exception& e) { return 0; }
if (resultChunk == nullptr) {
return 0;
}
auto tableNamesVector = std::make_unique<common::ValueVector>(
common::LogicalTypeID::STRING, context->getMemoryManager());
duckdb_conversion_func_t conversionFunc;
getDuckDBVectorConversionFunc(common::PhysicalTypeID::STRING, conversionFunc);
conversionFunc(resultChunk->data[0], *tableNamesVector, resultChunk->size());
std::unique_ptr<DuckDBCatalogContent> catalogContent = std::make_unique<DuckDBCatalogContent>();
for (auto i = 0u; i < resultChunk->size(); i++) {
auto tableName = tableNamesVector->getValue<common::ku_string_t>(i).getAsString();
auto createTableInfo = getCreateTableInfo(con, tableName, dbPath);
if (createTableInfo != nullptr) {
catalogContent->createForeignTable(*createTableInfo);
}
}
return catalogContent;
}

std::unique_ptr<main::AttachedDatabase> attachDuckDB(
std::string dbName, std::string dbPath, main::ClientContext* clientContext) {
if (dbName == "") {
if (dbPath.find('.') != std::string::npos) {
auto fileNamePos = dbPath.find_last_of('/') + 1;
Expand All @@ -14,7 +87,7 @@ std::unique_ptr<main::AttachedDatabase> attachDuckDB(std::string dbName, std::st
dbName = dbPath;
}
}
return std::make_unique<main::AttachedDatabase>(dbName, getScanFunction(dbPath));
return std::make_unique<main::AttachedDatabase>(dbName, createCatalog(dbPath, clientContext));
}

DuckDBStorageExtension::DuckDBStorageExtension() : StorageExtension{attachDuckDB} {}
Expand Down
20 changes: 20 additions & 0 deletions extension/duckdb_scanner/src/duckdb_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "duckdb_table_catalog_entry.h"

namespace kuzu {
namespace catalog {

DuckDBTableCatalogEntry::DuckDBTableCatalogEntry(
std::string name, common::table_id_t tableID, function::TableFunction scanFunction)
: TableCatalogEntry{CatalogEntryType::FOREIGN_TABLE_ENTRY, std::move(name), tableID},
scanFunction{std::move(scanFunction)} {}

common::TableType DuckDBTableCatalogEntry::getTableType() const {
return common::TableType::FOREIGN;
}

std::unique_ptr<CatalogEntry> DuckDBTableCatalogEntry::copy() const {
return std::make_unique<DuckDBTableCatalogEntry>(*this);
}

} // namespace catalog
} // namespace kuzu
33 changes: 33 additions & 0 deletions extension/duckdb_scanner/src/include/duckdb_catalog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "catalog/catalog_content.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "duckdb_scan.h"
#include "duckdb_table_catalog_entry.h"

namespace kuzu {
namespace duckdb_scanner {

struct BoundExtraCreateDuckDBTableInfo final : public binder::BoundExtraCreateTableInfo {
std::string dbPath;

BoundExtraCreateDuckDBTableInfo(
std::string dbPath, std::vector<binder::PropertyInfo> propertyInfos)
: BoundExtraCreateTableInfo{std::move(propertyInfos)}, dbPath{std::move(dbPath)} {}
BoundExtraCreateDuckDBTableInfo(const BoundExtraCreateDuckDBTableInfo& other)
: BoundExtraCreateTableInfo{copyVector(other.propertyInfos)}, dbPath{other.dbPath} {}

std::unique_ptr<BoundExtraCreateCatalogEntryInfo> copy() const override {
return std::make_unique<BoundExtraCreateDuckDBTableInfo>(*this);
}
};

class DuckDBCatalogContent : public catalog::CatalogContent {
public:
DuckDBCatalogContent() : catalog::CatalogContent{nullptr /* vfs */} {}

common::table_id_t createForeignTable(const binder::BoundCreateTableInfo& info);
};

} // namespace duckdb_scanner
} // namespace kuzu
6 changes: 5 additions & 1 deletion extension/duckdb_scanner/src/include/duckdb_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma GCC diagnostic ignored "-Wunused-parameter"
// Supress warnings from duckdb.hpp
#undef ARROW_FLAG_DICTIONARY_ORDERED
#include "common/types/types.h"
#include "duckdb.hpp"
#pragma GCC diagnostic pop

Expand All @@ -35,7 +36,10 @@ struct DuckDBScanSharedState : public function::TableFuncSharedState {
std::unique_ptr<duckdb::QueryResult> queryResult;
};

function::TableFunction getScanFunction(std::string dbPath);
void getDuckDBVectorConversionFunc(
common::PhysicalTypeID physicalTypeID, duckdb_conversion_func_t& conversion_func);

function::TableFunction getScanFunction(DuckDBScanBindData bindData);

} // namespace duckdb_scanner
} // namespace kuzu
33 changes: 33 additions & 0 deletions extension/duckdb_scanner/src/include/duckdb_table_catalog_entry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "catalog/catalog_entry/table_catalog_entry.h"
#include "function/table_functions.h"

namespace kuzu {
namespace catalog {

class DuckDBTableCatalogEntry final : public TableCatalogEntry {
public:
//===--------------------------------------------------------------------===//
// constructors
//===--------------------------------------------------------------------===//
DuckDBTableCatalogEntry(
std::string name, common::table_id_t tableID, function::TableFunction scanFunction);

//===--------------------------------------------------------------------===//
// getter & setter
//===--------------------------------------------------------------------===//
common::TableType getTableType() const override;
function::TableFunction getScanFunction() override { return scanFunction; }

//===--------------------------------------------------------------------===//
// serialization & deserialization
//===--------------------------------------------------------------------===//
std::unique_ptr<CatalogEntry> copy() const override;

private:
function::TableFunction scanFunction;
};

} // namespace catalog
} // namespace kuzu
6 changes: 1 addition & 5 deletions extension/duckdb_scanner/test/test_files/duckdb_scanner.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie|2544| the movie is very v
49992|50|31.582059|False|2056-05-02||[62,24,94]|[LpQO8OT3x45a]|[[268,281,166],[144,16,126,208,298],[22,287]]|{ID: 936, "name": sGPSafxMAhKiP}
-STATEMENT LOAD FROM tinysnb_person1 RETURN *;
---- error
Binder exception: No table named: person1 in database.
Catalog exception: Table: person1 does not exist.
-STATEMENT DETACH tinysnb;
---- ok
-STATEMENT LOAD FROM tinysnb_person RETURN *;
Expand All @@ -57,10 +57,6 @@ Binder exception: No database named tinysnb has been attached.
-STATEMENT LOAD FROM tinysnb_person RETURN count(*);
---- 1
8
-LOG UnsupportedDuckDBType
-STATEMENT LOAD FROM tinysnb_unsupportedType RETURN *;
---- error
Binder exception: Unsupported duckdb type: ENUM('sad', 'ok', 'happy').

-CASE InvalidDuckDBDatabase
-STATEMENT LOAD FROM tinysnb1_person RETURN *;
Expand Down
7 changes: 5 additions & 2 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@
throw BinderException{
common::stringFormat("No database named {} has been attached.", dbName)};
}
scanFunction = attachedDB->getScanFunction();
auto tableName = common::StringUtils::split(objectName, "_")[1];
auto tableID = attachedDB->getCatalogContent()->getTableID(tableName);
auto tableCatalogEntry = ku_dynamic_cast<CatalogEntry*, TableCatalogEntry*>(

Check warning on line 184 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L182-L184

Added lines #L182 - L184 were not covered by tests
attachedDB->getCatalogContent()->getTableCatalogEntry(tableID));
scanFunction = tableCatalogEntry->getScanFunction();

Check warning on line 186 in src/binder/bind/bind_reading_clause.cpp

View check run for this annotation

Codecov / codecov/patch

src/binder/bind/bind_reading_clause.cpp#L186

Added line #L186 was not covered by tests
bindInput = std::make_unique<function::TableFuncBindInput>();
bindInput->inputs.push_back(Value(common::StringUtils::split(objectName, "_")[1]));
}
} break;
case ScanSourceType::FILE: {
Expand Down
19 changes: 11 additions & 8 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem*

table_id_t CatalogContent::createNodeTable(const binder::BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateNodeTableInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateNodeTableInfo*>(
info.extraInfo.get());
auto nodeTableEntry =
std::make_unique<NodeTableCatalogEntry>(info.tableName, tableID, extraInfo->primaryKeyIdx);
for (auto& propertyInfo : extraInfo->propertyInfos) {
Expand All @@ -54,8 +55,9 @@ table_id_t CatalogContent::createNodeTable(const binder::BoundCreateTableInfo& i

table_id_t CatalogContent::createRelTable(const binder::BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
info.extraInfo.get());
auto srcTableEntry = ku_dynamic_cast<CatalogEntry*, NodeTableCatalogEntry*>(
getTableCatalogEntry(extraInfo->srcTableID));
auto dstTableEntry = ku_dynamic_cast<CatalogEntry*, NodeTableCatalogEntry*>(
Expand Down Expand Up @@ -91,17 +93,18 @@ table_id_t CatalogContent::createRelGroup(const binder::BoundCreateTableInfo& in

table_id_t CatalogContent::createRDFGraph(const binder::BoundCreateTableInfo& info) {
table_id_t rdfGraphID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRdfGraphInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRdfGraphInfo*>(
info.extraInfo.get());
auto& resourceInfo = extraInfo->resourceInfo;
auto& literalInfo = extraInfo->literalInfo;
auto& resourceTripleInfo = extraInfo->resourceTripleInfo;
auto& literalTripleInfo = extraInfo->literalTripleInfo;
auto resourceTripleExtraInfo =
ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
resourceTripleInfo.extraInfo.get());
auto literalTripleExtraInfo =
ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
literalTripleInfo.extraInfo.get());
// Resource table
auto resourceTableID = createNodeTable(resourceInfo);
Expand Down
Loading
Loading