Skip to content

Commit

Permalink
Merge pull request #3071 from kuzudb/catalog-cache
Browse files Browse the repository at this point in the history
Implement catalog cache in postgres scanner
  • Loading branch information
acquamarin committed Mar 18, 2024
2 parents 826927e + cc93226 commit bd963c1
Show file tree
Hide file tree
Showing 26 changed files with 323 additions and 119 deletions.
4 changes: 3 additions & 1 deletion extension/duckdb_scanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ add_library(duckdb_scanner
src/duckdb_scanner_extension.cpp
src/duckdb_storage.cpp
src/duckdb_scan.cpp
src/duckdb_type_converter.cpp)
src/duckdb_type_converter.cpp
src/duckdb_catalog.cpp
src/duckdb_table_catalog_entry.cpp)

set_target_properties(duckdb_scanner PROPERTIES
OUTPUT_NAME duckdb_scanner
Expand Down
29 changes: 29 additions & 0 deletions extension/duckdb_scanner/src/duckdb_catalog.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "duckdb_catalog.h"

namespace kuzu {
namespace duckdb_scanner {

common::table_id_t DuckDBCatalogContent::createForeignTable(
const binder::BoundCreateTableInfo& info) {
auto tableID = assignNextTableID();
auto extraInfo = common::ku_dynamic_cast<binder::BoundExtraCreateCatalogEntryInfo*,
BoundExtraCreateDuckDBTableInfo*>(info.extraInfo.get());
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
for (auto& propertyInfo : extraInfo->propertyInfos) {
columnNames.push_back(propertyInfo.name);
columnTypes.push_back(propertyInfo.type);
}
DuckDBScanBindData bindData(common::stringFormat("SELECT * FROM {}", info.tableName),
extraInfo->dbPath, std::move(columnTypes), std::move(columnNames));
auto tableEntry = std::make_unique<catalog::DuckDBTableCatalogEntry>(
info.tableName, tableID, getScanFunction(std::move(bindData)));
for (auto& propertyInfo : extraInfo->propertyInfos) {
tableEntry->addProperty(propertyInfo.name, propertyInfo.type.copy());
}
tables->createEntry(std::move(tableEntry));
return tableID;
}

} // namespace duckdb_scanner
} // namespace kuzu
43 changes: 10 additions & 33 deletions extension/duckdb_scanner/src/duckdb_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "common/exception/binder.h"
#include "common/types/types.h"
#include "duckdb_type_converter.h"
#include "function/table/bind_input.h"

using namespace kuzu::function;
Expand Down Expand Up @@ -38,8 +37,8 @@ struct DuckDBScanFunction {
static common::offset_t tableFunc(
function::TableFuncInput& input, function::TableFuncOutput& output);

static std::unique_ptr<function::TableFuncBindData> bindFunc(
std::string dbPath, main::ClientContext* /*context*/, function::TableFuncBindInput* input);
static std::unique_ptr<function::TableFuncBindData> bindFunc(DuckDBScanBindData bindData,
main::ClientContext* /*context*/, function::TableFuncBindInput* input);

static std::unique_ptr<function::TableFuncSharedState> initSharedState(
function::TableFunctionInitInput& input);
Expand Down Expand Up @@ -211,40 +210,18 @@ common::offset_t DuckDBScanFunction::tableFunc(
return output.dataChunk.state->selVector->selectedSize;
}

std::unique_ptr<function::TableFuncBindData> DuckDBScanFunction::bindFunc(std::string dbPath,
main::ClientContext* /*clientContext*/, function::TableFuncBindInput* input) {
auto tableName = input->inputs[0].getValue<std::string>();
duckdb::DBConfig dbConfig{true /* read_only */};
duckdb::DuckDB db(dbPath, &dbConfig);
duckdb::Connection conn(db);
auto result = conn.Query(
common::stringFormat("select data_type,column_name from information_schema.columns where "
"table_name = '{}' and table_schema='main';",
tableName));
if (result->RowCount() == 0) {
throw common::BinderException(
common::stringFormat("No table named: {} in database.", tableName));
}
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
columnTypes.reserve(result->RowCount());
columnNames.reserve(result->RowCount());
for (auto i = 0u; i < result->RowCount(); i++) {
columnTypes.push_back(
DuckDBTypeConverter::convertDuckDBType(result->GetValue(0, i).GetValue<std::string>()));
columnNames.push_back(result->GetValue(1, i).GetValue<std::string>());
}
auto query = common::stringFormat("SELECT * FROM {}", tableName);
return std::make_unique<DuckDBScanBindData>(
query, dbPath, std::move(columnTypes), std::move(columnNames));
std::unique_ptr<function::TableFuncBindData> DuckDBScanFunction::bindFunc(
DuckDBScanBindData bindData, main::ClientContext* /*clientContext*/,
function::TableFuncBindInput* /*input*/) {
return bindData.copy();
}

TableFunction getScanFunction(std::string dbPath) {
TableFunction getScanFunction(DuckDBScanBindData bindData) {
return TableFunction(DuckDBScanFunction::DUCKDB_SCAN_FUNC_NAME, DuckDBScanFunction::tableFunc,
std::bind(
DuckDBScanFunction::bindFunc, dbPath, std::placeholders::_1, std::placeholders::_2),
std::bind(DuckDBScanFunction::bindFunc, std::move(bindData), std::placeholders::_1,
std::placeholders::_2),
DuckDBScanFunction::initSharedState, DuckDBScanFunction::initLocalState,
std::vector<LogicalTypeID>{LogicalTypeID::STRING});
std::vector<LogicalTypeID>{});
}

} // namespace duckdb_scanner
Expand Down
77 changes: 75 additions & 2 deletions extension/duckdb_scanner/src/duckdb_storage.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,84 @@
#include "duckdb_storage.h"

#include "binder/ddl/bound_create_table_info.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "common/exception/binder.h"
#include "duckdb_catalog.h"
#include "duckdb_scan.h"
#include "duckdb_type_converter.h"

namespace kuzu {
namespace duckdb_scanner {

std::unique_ptr<main::AttachedDatabase> attachDuckDB(std::string dbName, std::string dbPath) {
static bool getTableInfo(duckdb::Connection& con, std::string tableName,
std::vector<common::LogicalType>& columnTypes, std::vector<std::string>& columnNames) {
auto result = con.Query(
common::stringFormat("select data_type,column_name from information_schema.columns where "
"table_name = '{}' and table_schema='main';",
tableName));
if (result->RowCount() == 0) {
return false;
}
columnTypes.reserve(result->RowCount());
columnNames.reserve(result->RowCount());
for (auto i = 0u; i < result->RowCount(); i++) {
try {
columnTypes.push_back(DuckDBTypeConverter::convertDuckDBType(
result->GetValue(0, i).GetValue<std::string>()));
} catch (common::BinderException& e) { return false; }
columnNames.push_back(result->GetValue(1, i).GetValue<std::string>());
}
return true;
}

std::unique_ptr<binder::BoundCreateTableInfo> getCreateTableInfo(
duckdb::Connection& con, std::string tableName, std::string dbPath) {
std::vector<common::LogicalType> columnTypes;
std::vector<std::string> columnNames;
if (!getTableInfo(con, tableName, columnTypes, columnNames)) {
return nullptr;
}
std::vector<binder::PropertyInfo> propertyInfos;
for (auto i = 0u; i < columnNames.size(); i++) {
auto propertyInfo = binder::PropertyInfo(columnNames[i], columnTypes[i]);
propertyInfos.push_back(std::move(propertyInfo));
}
return std::make_unique<binder::BoundCreateTableInfo>(common::TableType::FOREIGN, tableName,
std::make_unique<BoundExtraCreateDuckDBTableInfo>(
std::move(dbPath), std::move(propertyInfos)));
}

std::unique_ptr<DuckDBCatalogContent> createCatalog(
std::string dbPath, main::ClientContext* context) {
duckdb::DuckDB db(dbPath);
duckdb::Connection con(db);
auto query = "select table_name from information_schema.tables where table_schema = 'main';";
auto result = con.SendQuery(query);
std::unique_ptr<duckdb::DataChunk> resultChunk;
try {
resultChunk = result->Fetch();
} catch (std::exception& e) { return 0; }
if (resultChunk == nullptr) {
return 0;
}
auto tableNamesVector = std::make_unique<common::ValueVector>(
common::LogicalTypeID::STRING, context->getMemoryManager());
duckdb_conversion_func_t conversionFunc;
getDuckDBVectorConversionFunc(common::PhysicalTypeID::STRING, conversionFunc);
conversionFunc(resultChunk->data[0], *tableNamesVector, resultChunk->size());
std::unique_ptr<DuckDBCatalogContent> catalogContent = std::make_unique<DuckDBCatalogContent>();
for (auto i = 0u; i < resultChunk->size(); i++) {
auto tableName = tableNamesVector->getValue<common::ku_string_t>(i).getAsString();
auto createTableInfo = getCreateTableInfo(con, tableName, dbPath);
if (createTableInfo != nullptr) {
catalogContent->createForeignTable(*createTableInfo);
}
}
return catalogContent;
}

std::unique_ptr<main::AttachedDatabase> attachDuckDB(
std::string dbName, std::string dbPath, main::ClientContext* clientContext) {
if (dbName == "") {
if (dbPath.find('.') != std::string::npos) {
auto fileNamePos = dbPath.find_last_of('/') + 1;
Expand All @@ -14,7 +87,7 @@ std::unique_ptr<main::AttachedDatabase> attachDuckDB(std::string dbName, std::st
dbName = dbPath;
}
}
return std::make_unique<main::AttachedDatabase>(dbName, getScanFunction(dbPath));
return std::make_unique<main::AttachedDatabase>(dbName, createCatalog(dbPath, clientContext));
}

DuckDBStorageExtension::DuckDBStorageExtension() : StorageExtension{attachDuckDB} {}
Expand Down
20 changes: 20 additions & 0 deletions extension/duckdb_scanner/src/duckdb_table_catalog_entry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "duckdb_table_catalog_entry.h"

namespace kuzu {
namespace catalog {

DuckDBTableCatalogEntry::DuckDBTableCatalogEntry(
std::string name, common::table_id_t tableID, function::TableFunction scanFunction)
: TableCatalogEntry{CatalogEntryType::FOREIGN_TABLE_ENTRY, std::move(name), tableID},
scanFunction{std::move(scanFunction)} {}

common::TableType DuckDBTableCatalogEntry::getTableType() const {
return common::TableType::FOREIGN;
}

std::unique_ptr<CatalogEntry> DuckDBTableCatalogEntry::copy() const {
return std::make_unique<DuckDBTableCatalogEntry>(*this);
}

} // namespace catalog
} // namespace kuzu
33 changes: 33 additions & 0 deletions extension/duckdb_scanner/src/include/duckdb_catalog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "catalog/catalog_content.h"
#include "catalog/catalog_entry/table_catalog_entry.h"
#include "duckdb_scan.h"
#include "duckdb_table_catalog_entry.h"

namespace kuzu {
namespace duckdb_scanner {

struct BoundExtraCreateDuckDBTableInfo final : public binder::BoundExtraCreateTableInfo {
std::string dbPath;

BoundExtraCreateDuckDBTableInfo(
std::string dbPath, std::vector<binder::PropertyInfo> propertyInfos)
: BoundExtraCreateTableInfo{std::move(propertyInfos)}, dbPath{std::move(dbPath)} {}
BoundExtraCreateDuckDBTableInfo(const BoundExtraCreateDuckDBTableInfo& other)
: BoundExtraCreateTableInfo{copyVector(other.propertyInfos)}, dbPath{other.dbPath} {}

std::unique_ptr<BoundExtraCreateCatalogEntryInfo> copy() const override {
return std::make_unique<BoundExtraCreateDuckDBTableInfo>(*this);
}
};

class DuckDBCatalogContent : public catalog::CatalogContent {
public:
DuckDBCatalogContent() : catalog::CatalogContent{nullptr /* vfs */} {}

common::table_id_t createForeignTable(const binder::BoundCreateTableInfo& info);
};

} // namespace duckdb_scanner
} // namespace kuzu
6 changes: 5 additions & 1 deletion extension/duckdb_scanner/src/include/duckdb_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#pragma GCC diagnostic ignored "-Wunused-parameter"
// Supress warnings from duckdb.hpp
#undef ARROW_FLAG_DICTIONARY_ORDERED
#include "common/types/types.h"
#include "duckdb.hpp"
#pragma GCC diagnostic pop

Expand All @@ -35,7 +36,10 @@ struct DuckDBScanSharedState : public function::TableFuncSharedState {
std::unique_ptr<duckdb::QueryResult> queryResult;
};

function::TableFunction getScanFunction(std::string dbPath);
void getDuckDBVectorConversionFunc(
common::PhysicalTypeID physicalTypeID, duckdb_conversion_func_t& conversion_func);

function::TableFunction getScanFunction(DuckDBScanBindData bindData);

} // namespace duckdb_scanner
} // namespace kuzu
33 changes: 33 additions & 0 deletions extension/duckdb_scanner/src/include/duckdb_table_catalog_entry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include "catalog/catalog_entry/table_catalog_entry.h"
#include "function/table_functions.h"

namespace kuzu {
namespace catalog {

class DuckDBTableCatalogEntry final : public TableCatalogEntry {
public:
//===--------------------------------------------------------------------===//
// constructors
//===--------------------------------------------------------------------===//
DuckDBTableCatalogEntry(
std::string name, common::table_id_t tableID, function::TableFunction scanFunction);

//===--------------------------------------------------------------------===//
// getter & setter
//===--------------------------------------------------------------------===//
common::TableType getTableType() const override;
function::TableFunction getScanFunction() override { return scanFunction; }

//===--------------------------------------------------------------------===//
// serialization & deserialization
//===--------------------------------------------------------------------===//
std::unique_ptr<CatalogEntry> copy() const override;

private:
function::TableFunction scanFunction;
};

} // namespace catalog
} // namespace kuzu
6 changes: 1 addition & 5 deletions extension/duckdb_scanner/test/test_files/duckdb_scanner.test
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ The 😂😃🧘🏻‍♂️🌍🌦️🍞🚗 movie|2544| the movie is very v
49992|50|31.582059|False|2056-05-02||[62,24,94]|[LpQO8OT3x45a]|[[268,281,166],[144,16,126,208,298],[22,287]]|{ID: 936, "name": sGPSafxMAhKiP}
-STATEMENT LOAD FROM tinysnb_person1 RETURN *;
---- error
Binder exception: No table named: person1 in database.
Catalog exception: Table: person1 does not exist.
-STATEMENT DETACH tinysnb;
---- ok
-STATEMENT LOAD FROM tinysnb_person RETURN *;
Expand All @@ -57,10 +57,6 @@ Binder exception: No database named tinysnb has been attached.
-STATEMENT LOAD FROM tinysnb_person RETURN count(*);
---- 1
8
-LOG UnsupportedDuckDBType
-STATEMENT LOAD FROM tinysnb_unsupportedType RETURN *;
---- error
Binder exception: Unsupported duckdb type: ENUM('sad', 'ok', 'happy').

-CASE InvalidDuckDBDatabase
-STATEMENT LOAD FROM tinysnb1_person RETURN *;
Expand Down
7 changes: 5 additions & 2 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(const ReadingClause& re
throw BinderException{
common::stringFormat("No database named {} has been attached.", dbName)};
}
scanFunction = attachedDB->getScanFunction();
auto tableName = common::StringUtils::split(objectName, "_")[1];
auto tableID = attachedDB->getCatalogContent()->getTableID(tableName);
auto tableCatalogEntry = ku_dynamic_cast<CatalogEntry*, TableCatalogEntry*>(
attachedDB->getCatalogContent()->getTableCatalogEntry(tableID));
scanFunction = tableCatalogEntry->getScanFunction();
bindInput = std::make_unique<function::TableFuncBindInput>();
bindInput->inputs.push_back(Value(common::StringUtils::split(objectName, "_")[1]));
}
} break;
case ScanSourceType::FILE: {
Expand Down
19 changes: 11 additions & 8 deletions src/catalog/catalog_content.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ CatalogContent::CatalogContent(const std::string& directory, VirtualFileSystem*

table_id_t CatalogContent::createNodeTable(const binder::BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateNodeTableInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateNodeTableInfo*>(
info.extraInfo.get());
auto nodeTableEntry =
std::make_unique<NodeTableCatalogEntry>(info.tableName, tableID, extraInfo->primaryKeyIdx);
for (auto& propertyInfo : extraInfo->propertyInfos) {
Expand All @@ -54,8 +55,9 @@ table_id_t CatalogContent::createNodeTable(const binder::BoundCreateTableInfo& i

table_id_t CatalogContent::createRelTable(const binder::BoundCreateTableInfo& info) {
table_id_t tableID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
info.extraInfo.get());
auto srcTableEntry = ku_dynamic_cast<CatalogEntry*, NodeTableCatalogEntry*>(
getTableCatalogEntry(extraInfo->srcTableID));
auto dstTableEntry = ku_dynamic_cast<CatalogEntry*, NodeTableCatalogEntry*>(
Expand Down Expand Up @@ -91,17 +93,18 @@ table_id_t CatalogContent::createRelGroup(const binder::BoundCreateTableInfo& in

table_id_t CatalogContent::createRDFGraph(const binder::BoundCreateTableInfo& info) {
table_id_t rdfGraphID = assignNextTableID();
auto extraInfo = ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRdfGraphInfo*>(
info.extraInfo.get());
auto extraInfo =
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRdfGraphInfo*>(
info.extraInfo.get());
auto& resourceInfo = extraInfo->resourceInfo;
auto& literalInfo = extraInfo->literalInfo;
auto& resourceTripleInfo = extraInfo->resourceTripleInfo;
auto& literalTripleInfo = extraInfo->literalTripleInfo;
auto resourceTripleExtraInfo =
ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
resourceTripleInfo.extraInfo.get());
auto literalTripleExtraInfo =
ku_dynamic_cast<BoundExtraCreateTableInfo*, BoundExtraCreateRelTableInfo*>(
ku_dynamic_cast<BoundExtraCreateCatalogEntryInfo*, BoundExtraCreateRelTableInfo*>(
literalTripleInfo.extraInfo.get());
// Resource table
auto resourceTableID = createNodeTable(resourceInfo);
Expand Down
Loading

0 comments on commit bd963c1

Please sign in to comment.