Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V0.4.0 example bug fixes #3419

Merged
merged 1 commit into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ static bool bindExportQuery(ExportedTableData& tableData, std::string& exportQue

bool Binder::bindExportTableData(ExportedTableData& tableData, TableCatalogEntry* entry,
const Catalog& catalog, transaction::Transaction* tx) {
if (catalog.tableInRDFGraph(tx, entry->getTableID())) {
return false;
}
std::string exportQuery;
tableData.canParallel = true;
tableData.tableName = entry->getName();
Expand All @@ -123,10 +126,13 @@ bool Binder::bindExportTableData(ExportedTableData& tableData, TableCatalogEntry
}

std::unique_ptr<BoundStatement> Binder::bindExportDatabaseClause(const Statement& statement) {
auto& exportDatabaseStatement = ku_dynamic_cast<const Statement&, const ExportDB&>(statement);
auto boundFilePath = exportDatabaseStatement.getFilePath();
auto& exportDB = statement.constCast<ExportDB>();
auto boundFilePath = exportDB.getFilePath();
auto exportData = getExportInfo(*clientContext->getCatalog(), clientContext->getTx(), this);
auto parsedOptions = bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef());
// if (exportData.empty()) {
// throw BinderException("Cannot export an empty database.");
// }
auto parsedOptions = bindParsingOptions(exportDB.getParsingOptionsRef());
auto fileType = getFileType(parsedOptions);
if (fileType != FileType::CSV && fileType != FileType::PARQUET) {
throw BinderException("Export database currently only supports csv and parquet files.");
Expand Down
46 changes: 24 additions & 22 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,31 @@ std::unique_ptr<BoundStatement> Binder::bindImportDatabaseClause(const Statement
finalQueryStatements += getQueryFromFile(fs, boundFilePath, ImportDBConstants::SCHEMA_NAME);
// replace the path in copy from statement with the bound path
auto copyQuery = getQueryFromFile(fs, boundFilePath, ImportDBConstants::COPY_NAME);
auto parsedStatements = Parser::parseQuery(copyQuery);
for (auto& parsedStatement : parsedStatements) {
KU_ASSERT(parsedStatement->getStatementType() == StatementType::COPY_FROM);
auto copyFromStatement =
ku_dynamic_cast<const Statement*, const CopyFrom*>(parsedStatement.get());
KU_ASSERT(copyFromStatement->getSource()->type == common::ScanSourceType::FILE);
auto filePaths = ku_dynamic_cast<parser::BaseScanSource*, parser::FileScanSource*>(
copyFromStatement->getSource())
->filePaths;
KU_ASSERT(filePaths.size() == 1);
auto fileType = bindFileType(filePaths);
auto copyFilePath = boundFilePath + "/" + filePaths[0];
std::string query;
if (fileType == FileType::CSV) {
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(copyFromStatement->getParsingOptionsRef()));
query = stringFormat("COPY {} FROM \"{}\" {};", copyFromStatement->getTableName(),
copyFilePath, csvConfig.option.toCypher());
} else {
query = stringFormat("COPY {} FROM \"{}\";", copyFromStatement->getTableName(),
copyFilePath);
if (!copyQuery.empty()) {
auto parsedStatements = Parser::parseQuery(copyQuery);
for (auto& parsedStatement : parsedStatements) {
KU_ASSERT(parsedStatement->getStatementType() == StatementType::COPY_FROM);
auto copyFromStatement =
ku_dynamic_cast<const Statement*, const CopyFrom*>(parsedStatement.get());
KU_ASSERT(copyFromStatement->getSource()->type == common::ScanSourceType::FILE);
auto filePaths = ku_dynamic_cast<parser::BaseScanSource*, parser::FileScanSource*>(
copyFromStatement->getSource())
->filePaths;
KU_ASSERT(filePaths.size() == 1);
auto fileType = bindFileType(filePaths);
auto copyFilePath = boundFilePath + "/" + filePaths[0];
std::string query;
if (fileType == FileType::CSV) {
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(copyFromStatement->getParsingOptionsRef()));
query = stringFormat("COPY {} FROM \"{}\" {};", copyFromStatement->getTableName(),
copyFilePath, csvConfig.option.toCypher());
} else {
query = stringFormat("COPY {} FROM \"{}\";", copyFromStatement->getTableName(),
copyFilePath);
}
finalQueryStatements += query;
}
finalQueryStatements += query;
}
finalQueryStatements += getQueryFromFile(fs, boundFilePath, ImportDBConstants::MACRO_NAME);
return std::make_unique<BoundImportDatabase>(boundFilePath, finalQueryStatements);
Expand Down
30 changes: 19 additions & 11 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,6 @@ std::vector<RelGroupCatalogEntry*> Catalog::getRelTableGroupEntries(Transaction*
CatalogEntryType::REL_GROUP_ENTRY);
}

bool Catalog::relTableExistInRelTableGroup(Transaction* tx, table_id_t tableID) const {
auto relGroupEntries = getRelTableGroupEntries(tx);
for (auto relGroupEntry : relGroupEntries) {
auto tableIDs = relGroupEntry->getRelTableIDs();
if (std::find(tableIDs.begin(), tableIDs.end(), tableID) != tableIDs.end()) {
return true;
}
}
return false;
}

std::vector<RDFGraphCatalogEntry*> Catalog::getRdfGraphEntries(Transaction* tx) const {
return getVersion(tx)->getTableCatalogEntries<RDFGraphCatalogEntry*>(
CatalogEntryType::RDF_GRAPH_ENTRY);
Expand All @@ -114,6 +103,25 @@ std::vector<TableCatalogEntry*> Catalog::getTableSchemas(Transaction* tx,
return result;
}

bool Catalog::tableInRDFGraph(Transaction* tx, table_id_t tableID) const {
for (auto& entry : getRdfGraphEntries(tx)) {
auto set = entry->getTableIDSet();
if (set.contains(tableID)) {
return true;
}
}
return false;
}

bool Catalog::tableInRelGroup(Transaction* tx, table_id_t tableID) const {
for (auto& entry : getRelTableGroupEntries(tx)) {
if (entry->isParent(tableID)) {
return true;
}
}
return false;
}

void Catalog::prepareCommitOrRollback(TransactionAction action, VirtualFileSystem* fs) {
if (hasUpdates()) {
wal->logCatalogRecord();
Expand Down
9 changes: 9 additions & 0 deletions src/catalog/catalog_entry/rdf_graph_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ bool RDFGraphCatalogEntry::isParent(common::table_id_t tableID) {
tableID == resourceTripleTableID || tableID == literalTripleTableID;
}

common::table_id_set_t RDFGraphCatalogEntry::getTableIDSet() const {
common::table_id_set_t result;
result.insert(resourceTableID);
result.insert(literalTableID);
result.insert(resourceTripleTableID);
result.insert(literalTripleTableID);
return result;
}

std::string RDFGraphCatalogEntry::getResourceTableName(const std::string& graphName) {
return graphName + std::string(common::rdf::RESOURCE_TABLE_SUFFIX);
}
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class Catalog {
std::vector<TableCatalogEntry*> getTableEntries(transaction::Transaction* tx) const;
std::vector<TableCatalogEntry*> getTableSchemas(transaction::Transaction* tx,
const common::table_id_vector_t& tableIDs) const;
bool relTableExistInRelTableGroup(transaction::Transaction* tx,
common::table_id_t tableID) const;
bool tableInRDFGraph(transaction::Transaction* tx, common::table_id_t tableID) const;
bool tableInRelGroup(transaction::Transaction* tx, common::table_id_t tableID) const;

common::table_id_t createTableSchema(const binder::BoundCreateTableInfo& info);
void dropTableSchema(common::table_id_t tableID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RDFGraphCatalogEntry final : public TableCatalogEntry {
common::table_id_t getLiteralTableID() const { return literalTableID; }
common::table_id_t getResourceTripleTableID() const { return resourceTripleTableID; }
common::table_id_t getLiteralTripleTableID() const { return literalTripleTableID; }
common::table_id_set_t getTableIDSet() const;
static std::string getResourceTableName(const std::string& graphName);
static std::string getLiteralTableName(const std::string& graphName);
static std::string getResourceTripleTableName(const std::string& graphName);
Expand Down
9 changes: 5 additions & 4 deletions src/include/planner/operator/logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
namespace kuzu {
namespace planner {

// This ENUM is sorted by alphabetical order.
enum class LogicalOperatorType : uint8_t {
ACCUMULATE,
AGGREGATE,
ALTER,
ATTACH_DATABASE,
DETACH_DATABASE,
COMMENT_ON,
COPY_FROM,
COPY_TO,
Expand All @@ -20,18 +20,22 @@ enum class LogicalOperatorType : uint8_t {
CROSS_PRODUCT,
DELETE_NODE,
DELETE_REL,
DETACH_DATABASE,
DISTINCT,
DROP_TABLE,
DUMMY_SCAN,
EMPTY_RESULT,
EXPLAIN,
EXPRESSIONS_SCAN,
EXTEND,
EXTENSION,
EXPORT_DATABASE,
FILTER,
FLATTEN,
HASH_JOIN,
IN_QUERY_CALL,
INDEX_SCAN_NODE,
IMPORT_DATABASE,
INTERSECT,
INSERT,
LIMIT,
Expand All @@ -56,9 +60,6 @@ enum class LogicalOperatorType : uint8_t {
UNION_ALL,
UNWIND,
USE_DATABASE,
EXTENSION,
EXPORT_DATABASE,
IMPORT_DATABASE,
};

struct LogicalOperatorUtils {
Expand Down
9 changes: 4 additions & 5 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ enum class PhysicalOperatorType : uint8_t {
ATTACH_DATABASE,
BATCH_INSERT,
COMMENT_ON,
CREATE_MACRO,
DETACH_DATABASE,
STANDALONE_CALL,
IN_QUERY_CALL,
COPY_RDF,
COPY_TO,
CREATE_MACRO,
CREATE_TABLE,
CROSS_PRODUCT,
DETACH_DATABASE,
DELETE_NODE,
DELETE_REL,
DROP_TABLE,
Expand All @@ -31,6 +29,7 @@ enum class PhysicalOperatorType : uint8_t {
HASH_JOIN_BUILD,
HASH_JOIN_PROBE,
IMPORT_DATABASE,
IN_QUERY_CALL,
INDEX_LOOKUP,
INDEX_SCAN,
INSERT,
Expand All @@ -45,7 +44,6 @@ enum class PhysicalOperatorType : uint8_t {
PATH_PROPERTY_PROBE,
PROJECTION,
PROFILE,
READER,
RECURSIVE_JOIN,
RENAME_PROPERTY,
RENAME_TABLE,
Expand All @@ -60,6 +58,7 @@ enum class PhysicalOperatorType : uint8_t {
SET_NODE_PROPERTY,
SET_REL_PROPERTY,
SKIP,
STANDALONE_CALL,
TOP_K,
TOP_K_SCAN,
TRANSACTION,
Expand Down
9 changes: 9 additions & 0 deletions src/parser/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "cypher_lexer.h"
#pragma GCC diagnostic pop

#include "common/exception/parser.h"
#include "parser/antlr_parser/kuzu_cypher_parser.h"
#include "parser/antlr_parser/parser_error_listener.h"
#include "parser/antlr_parser/parser_error_strategy.h"
Expand All @@ -17,6 +18,14 @@ namespace kuzu {
namespace parser {

std::vector<std::shared_ptr<Statement>> Parser::parseQuery(std::string_view query) {
// LCOV_EXCL_START
// We should have enforced this in connection, but I also realize empty query will cause
// antlr to hang. So enforce a duplicate check here.
if (query.empty()) {
throw common::ParserException(
"Cannot parse empty query. This should be handled in connection.");
}
// LCOV_EXCL_STOP
auto inputStream = ANTLRInputStream(query);
auto parserErrorListener = ParserErrorListener();

Expand Down
34 changes: 19 additions & 15 deletions src/planner/operator/logical_operator.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
#include "planner/operator/logical_operator.h"

#include "common/exception/runtime.h"

using namespace kuzu::common;

namespace kuzu {
namespace planner {

// LCOV_EXCL_START
std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorType type) {
switch (type) {
case LogicalOperatorType::ATTACH_DATABASE:
return "ATTACH_DATABASE";
case LogicalOperatorType::ACCUMULATE:
return "ACCUMULATE";
case LogicalOperatorType::AGGREGATE:
return "AGGREGATE";
case LogicalOperatorType::ALTER:
return "ALTER";
case LogicalOperatorType::ATTACH_DATABASE:
return "ATTACH_DATABASE";
case LogicalOperatorType::COMMENT_ON:
return "COMMENT_ON";
case LogicalOperatorType::COPY_FROM:
Expand All @@ -33,8 +36,6 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "DELETE_REL";
case LogicalOperatorType::DETACH_DATABASE:
return "DETACH_DATABASE";
case LogicalOperatorType::USE_DATABASE:
return "USE_DATABASE";
case LogicalOperatorType::DISTINCT:
return "DISTINCT";
case LogicalOperatorType::DROP_TABLE:
Expand All @@ -43,12 +44,16 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "DUMMY_SCAN";
case LogicalOperatorType::EMPTY_RESULT:
return "EMPTY_RESULT";
case LogicalOperatorType::EXTEND:
return "EXTEND";
case LogicalOperatorType::EXPRESSIONS_SCAN:
return "EXPRESSIONS_SCAN";
case LogicalOperatorType::EXPLAIN:
return "EXPLAIN";
case LogicalOperatorType::EXPRESSIONS_SCAN:
return "EXPRESSIONS_SCAN";
case LogicalOperatorType::EXTENSION:
return "LOAD";
case LogicalOperatorType::EXPORT_DATABASE:
return "EXPORT_DATABASE";
case LogicalOperatorType::EXTEND:
return "EXTEND";
case LogicalOperatorType::FILTER:
return "FILTER";
case LogicalOperatorType::FLATTEN:
Expand All @@ -59,6 +64,8 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "IN_QUERY_CALL";
case LogicalOperatorType::INDEX_SCAN_NODE:
return "INDEX_SCAN_NODE";
case LogicalOperatorType::IMPORT_DATABASE:
return "IMPORT_DATABASE";
case LogicalOperatorType::INTERSECT:
return "INTERSECT";
case LogicalOperatorType::INSERT:
Expand Down Expand Up @@ -105,16 +112,13 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "UNION_ALL";
case LogicalOperatorType::UNWIND:
return "UNWIND";
case LogicalOperatorType::EXTENSION:
return "LOAD";
case LogicalOperatorType::EXPORT_DATABASE:
return "EXPORT_DATABASE";
case LogicalOperatorType::IMPORT_DATABASE:
return "IMPORT_DATABASE";
case LogicalOperatorType::USE_DATABASE:
return "USE_DATABASE";
default:
KU_UNREACHABLE;
throw RuntimeException("Unknown logical operator type.");
}
}
// LCOV_EXCL_STOP

bool LogicalOperatorUtils::isUpdate(LogicalOperatorType type) {
switch (type) {
Expand Down
Loading
Loading