Skip to content

Commit

Permalink
Add examples in post fix bug
Browse files Browse the repository at this point in the history
  • Loading branch information
andyfengHKU committed May 1, 2024
1 parent f448598 commit cd911e3
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 113 deletions.
12 changes: 9 additions & 3 deletions src/binder/bind/bind_export_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ static bool bindExportQuery(ExportedTableData& tableData, std::string& exportQue

bool Binder::bindExportTableData(ExportedTableData& tableData, TableCatalogEntry* entry,
const Catalog& catalog, transaction::Transaction* tx) {
if (catalog.tableInRDFGraph(tx, entry->getTableID())) {
return false;
}
std::string exportQuery;
tableData.canParallel = true;
tableData.tableName = entry->getName();
Expand All @@ -123,10 +126,13 @@ bool Binder::bindExportTableData(ExportedTableData& tableData, TableCatalogEntry
}

std::unique_ptr<BoundStatement> Binder::bindExportDatabaseClause(const Statement& statement) {
auto& exportDatabaseStatement = ku_dynamic_cast<const Statement&, const ExportDB&>(statement);
auto boundFilePath = exportDatabaseStatement.getFilePath();
auto& exportDB = statement.constCast<ExportDB>();
auto boundFilePath = exportDB.getFilePath();
auto exportData = getExportInfo(*clientContext->getCatalog(), clientContext->getTx(), this);
auto parsedOptions = bindParsingOptions(exportDatabaseStatement.getParsingOptionsRef());
// if (exportData.empty()) {
// throw BinderException("Cannot export an empty database.");
// }
auto parsedOptions = bindParsingOptions(exportDB.getParsingOptionsRef());
auto fileType = getFileType(parsedOptions);
if (fileType != FileType::CSV && fileType != FileType::PARQUET) {
throw BinderException("Export database currently only supports csv and parquet files.");
Expand Down
46 changes: 24 additions & 22 deletions src/binder/bind/bind_import_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,29 +45,31 @@ std::unique_ptr<BoundStatement> Binder::bindImportDatabaseClause(const Statement
finalQueryStatements += getQueryFromFile(fs, boundFilePath, ImportDBConstants::SCHEMA_NAME);
// replace the path in copy from statement with the bound path
auto copyQuery = getQueryFromFile(fs, boundFilePath, ImportDBConstants::COPY_NAME);
auto parsedStatements = Parser::parseQuery(copyQuery);
for (auto& parsedStatement : parsedStatements) {
KU_ASSERT(parsedStatement->getStatementType() == StatementType::COPY_FROM);
auto copyFromStatement =
ku_dynamic_cast<const Statement*, const CopyFrom*>(parsedStatement.get());
KU_ASSERT(copyFromStatement->getSource()->type == common::ScanSourceType::FILE);
auto filePaths = ku_dynamic_cast<parser::BaseScanSource*, parser::FileScanSource*>(
copyFromStatement->getSource())
->filePaths;
KU_ASSERT(filePaths.size() == 1);
auto fileType = bindFileType(filePaths);
auto copyFilePath = boundFilePath + "/" + filePaths[0];
std::string query;
if (fileType == FileType::CSV) {
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(copyFromStatement->getParsingOptionsRef()));
query = stringFormat("COPY {} FROM \"{}\" {};", copyFromStatement->getTableName(),
copyFilePath, csvConfig.option.toCypher());
} else {
query = stringFormat("COPY {} FROM \"{}\";", copyFromStatement->getTableName(),
copyFilePath);
if (!copyQuery.empty()) {
auto parsedStatements = Parser::parseQuery(copyQuery);
for (auto& parsedStatement : parsedStatements) {
KU_ASSERT(parsedStatement->getStatementType() == StatementType::COPY_FROM);
auto copyFromStatement =
ku_dynamic_cast<const Statement*, const CopyFrom*>(parsedStatement.get());
KU_ASSERT(copyFromStatement->getSource()->type == common::ScanSourceType::FILE);
auto filePaths = ku_dynamic_cast<parser::BaseScanSource*, parser::FileScanSource*>(
copyFromStatement->getSource())
->filePaths;
KU_ASSERT(filePaths.size() == 1);
auto fileType = bindFileType(filePaths);
auto copyFilePath = boundFilePath + "/" + filePaths[0];
std::string query;
if (fileType == FileType::CSV) {
auto csvConfig = CSVReaderConfig::construct(
bindParsingOptions(copyFromStatement->getParsingOptionsRef()));
query = stringFormat("COPY {} FROM \"{}\" {};", copyFromStatement->getTableName(),
copyFilePath, csvConfig.option.toCypher());
} else {
query = stringFormat("COPY {} FROM \"{}\";", copyFromStatement->getTableName(),
copyFilePath);
}
finalQueryStatements += query;
}
finalQueryStatements += query;
}
finalQueryStatements += getQueryFromFile(fs, boundFilePath, ImportDBConstants::MACRO_NAME);
return std::make_unique<BoundImportDatabase>(boundFilePath, finalQueryStatements);
Expand Down
30 changes: 19 additions & 11 deletions src/catalog/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,6 @@ std::vector<RelGroupCatalogEntry*> Catalog::getRelTableGroupEntries(Transaction*
CatalogEntryType::REL_GROUP_ENTRY);
}

bool Catalog::relTableExistInRelTableGroup(Transaction* tx, table_id_t tableID) const {
auto relGroupEntries = getRelTableGroupEntries(tx);
for (auto relGroupEntry : relGroupEntries) {
auto tableIDs = relGroupEntry->getRelTableIDs();
if (std::find(tableIDs.begin(), tableIDs.end(), tableID) != tableIDs.end()) {
return true;
}
}
return false;
}

std::vector<RDFGraphCatalogEntry*> Catalog::getRdfGraphEntries(Transaction* tx) const {
return getVersion(tx)->getTableCatalogEntries<RDFGraphCatalogEntry*>(
CatalogEntryType::RDF_GRAPH_ENTRY);
Expand All @@ -114,6 +103,25 @@ std::vector<TableCatalogEntry*> Catalog::getTableSchemas(Transaction* tx,
return result;
}

bool Catalog::tableInRDFGraph(Transaction* tx, table_id_t tableID) const {
for (auto& entry : getRdfGraphEntries(tx)) {
auto set = entry->getTableIDSet();
if (set.contains(tableID)) {
return true;
}
}
return false;
}

bool Catalog::tableInRelGroup(Transaction* tx, table_id_t tableID) const {
for (auto& entry : getRelTableGroupEntries(tx)) {
if (entry->isParent(tableID)) {
return true;
}
}
return false;
}

void Catalog::prepareCommitOrRollback(TransactionAction action, VirtualFileSystem* fs) {
if (hasUpdates()) {
wal->logCatalogRecord();
Expand Down
9 changes: 9 additions & 0 deletions src/catalog/catalog_entry/rdf_graph_catalog_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ bool RDFGraphCatalogEntry::isParent(common::table_id_t tableID) {
tableID == resourceTripleTableID || tableID == literalTripleTableID;
}

common::table_id_set_t RDFGraphCatalogEntry::getTableIDSet() const {
common::table_id_set_t result;
result.insert(resourceTableID);
result.insert(literalTableID);
result.insert(resourceTripleTableID);
result.insert(literalTripleTableID);
return result;
}

std::string RDFGraphCatalogEntry::getResourceTableName(const std::string& graphName) {
return graphName + std::string(common::rdf::RESOURCE_TABLE_SUFFIX);
}
Expand Down
4 changes: 2 additions & 2 deletions src/include/catalog/catalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ class Catalog {
std::vector<TableCatalogEntry*> getTableEntries(transaction::Transaction* tx) const;
std::vector<TableCatalogEntry*> getTableSchemas(transaction::Transaction* tx,
const common::table_id_vector_t& tableIDs) const;
bool relTableExistInRelTableGroup(transaction::Transaction* tx,
common::table_id_t tableID) const;
bool tableInRDFGraph(transaction::Transaction* tx, common::table_id_t tableID) const;
bool tableInRelGroup(transaction::Transaction* tx, common::table_id_t tableID) const;

common::table_id_t createTableSchema(const binder::BoundCreateTableInfo& info);
void dropTableSchema(common::table_id_t tableID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class RDFGraphCatalogEntry final : public TableCatalogEntry {
common::table_id_t getLiteralTableID() const { return literalTableID; }
common::table_id_t getResourceTripleTableID() const { return resourceTripleTableID; }
common::table_id_t getLiteralTripleTableID() const { return literalTripleTableID; }
common::table_id_set_t getTableIDSet() const;
static std::string getResourceTableName(const std::string& graphName);
static std::string getLiteralTableName(const std::string& graphName);
static std::string getResourceTripleTableName(const std::string& graphName);
Expand Down
9 changes: 5 additions & 4 deletions src/include/planner/operator/logical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
namespace kuzu {
namespace planner {

// This ENUM is sorted by alphabetical order.
enum class LogicalOperatorType : uint8_t {
ACCUMULATE,
AGGREGATE,
ALTER,
ATTACH_DATABASE,
DETACH_DATABASE,
COMMENT_ON,
COPY_FROM,
COPY_TO,
Expand All @@ -20,18 +20,22 @@ enum class LogicalOperatorType : uint8_t {
CROSS_PRODUCT,
DELETE_NODE,
DELETE_REL,
DETACH_DATABASE,
DISTINCT,
DROP_TABLE,
DUMMY_SCAN,
EMPTY_RESULT,
EXPLAIN,
EXPRESSIONS_SCAN,
EXTEND,
EXTENSION,
EXPORT_DATABASE,
FILTER,
FLATTEN,
HASH_JOIN,
IN_QUERY_CALL,
INDEX_SCAN_NODE,
IMPORT_DATABASE,
INTERSECT,
INSERT,
LIMIT,
Expand All @@ -56,9 +60,6 @@ enum class LogicalOperatorType : uint8_t {
UNION_ALL,
UNWIND,
USE_DATABASE,
EXTENSION,
EXPORT_DATABASE,
IMPORT_DATABASE,
};

struct LogicalOperatorUtils {
Expand Down
9 changes: 4 additions & 5 deletions src/include/processor/operator/physical_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ enum class PhysicalOperatorType : uint8_t {
ATTACH_DATABASE,
BATCH_INSERT,
COMMENT_ON,
CREATE_MACRO,
DETACH_DATABASE,
STANDALONE_CALL,
IN_QUERY_CALL,
COPY_RDF,
COPY_TO,
CREATE_MACRO,
CREATE_TABLE,
CROSS_PRODUCT,
DETACH_DATABASE,
DELETE_NODE,
DELETE_REL,
DROP_TABLE,
Expand All @@ -31,6 +29,7 @@ enum class PhysicalOperatorType : uint8_t {
HASH_JOIN_BUILD,
HASH_JOIN_PROBE,
IMPORT_DATABASE,
IN_QUERY_CALL,
INDEX_LOOKUP,
INDEX_SCAN,
INSERT,
Expand All @@ -45,7 +44,6 @@ enum class PhysicalOperatorType : uint8_t {
PATH_PROPERTY_PROBE,
PROJECTION,
PROFILE,
READER,
RECURSIVE_JOIN,
RENAME_PROPERTY,
RENAME_TABLE,
Expand All @@ -60,6 +58,7 @@ enum class PhysicalOperatorType : uint8_t {
SET_NODE_PROPERTY,
SET_REL_PROPERTY,
SKIP,
STANDALONE_CALL,
TOP_K,
TOP_K_SCAN,
TRANSACTION,
Expand Down
9 changes: 9 additions & 0 deletions src/parser/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "cypher_lexer.h"
#pragma GCC diagnostic pop

#include "common/exception/parser.h"
#include "parser/antlr_parser/kuzu_cypher_parser.h"
#include "parser/antlr_parser/parser_error_listener.h"
#include "parser/antlr_parser/parser_error_strategy.h"
Expand All @@ -17,6 +18,14 @@ namespace kuzu {
namespace parser {

std::vector<std::shared_ptr<Statement>> Parser::parseQuery(std::string_view query) {
// LCOV_EXCL_START
// We should have enforced this in connection, but I also realize empty query will cause
// antlr to hang. So enforce a duplicate check here.
if (query.empty()) {
throw common::ParserException(
"Cannot parse empty query. This should be handled in connection.");
}
// LCOV_EXCL_STOP
auto inputStream = ANTLRInputStream(query);
auto parserErrorListener = ParserErrorListener();

Expand Down
34 changes: 19 additions & 15 deletions src/planner/operator/logical_operator.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
#include "planner/operator/logical_operator.h"

#include "common/exception/runtime.h"

using namespace kuzu::common;

namespace kuzu {
namespace planner {

// LCOV_EXCL_START
std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorType type) {
switch (type) {
case LogicalOperatorType::ATTACH_DATABASE:
return "ATTACH_DATABASE";
case LogicalOperatorType::ACCUMULATE:
return "ACCUMULATE";
case LogicalOperatorType::AGGREGATE:
return "AGGREGATE";
case LogicalOperatorType::ALTER:
return "ALTER";
case LogicalOperatorType::ATTACH_DATABASE:
return "ATTACH_DATABASE";
case LogicalOperatorType::COMMENT_ON:
return "COMMENT_ON";
case LogicalOperatorType::COPY_FROM:
Expand All @@ -33,8 +36,6 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "DELETE_REL";
case LogicalOperatorType::DETACH_DATABASE:
return "DETACH_DATABASE";
case LogicalOperatorType::USE_DATABASE:
return "USE_DATABASE";
case LogicalOperatorType::DISTINCT:
return "DISTINCT";
case LogicalOperatorType::DROP_TABLE:
Expand All @@ -43,12 +44,16 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "DUMMY_SCAN";
case LogicalOperatorType::EMPTY_RESULT:
return "EMPTY_RESULT";
case LogicalOperatorType::EXTEND:
return "EXTEND";
case LogicalOperatorType::EXPRESSIONS_SCAN:
return "EXPRESSIONS_SCAN";
case LogicalOperatorType::EXPLAIN:
return "EXPLAIN";
case LogicalOperatorType::EXPRESSIONS_SCAN:
return "EXPRESSIONS_SCAN";
case LogicalOperatorType::EXTENSION:
return "LOAD";
case LogicalOperatorType::EXPORT_DATABASE:
return "EXPORT_DATABASE";
case LogicalOperatorType::EXTEND:
return "EXTEND";
case LogicalOperatorType::FILTER:
return "FILTER";
case LogicalOperatorType::FLATTEN:
Expand All @@ -59,6 +64,8 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "IN_QUERY_CALL";
case LogicalOperatorType::INDEX_SCAN_NODE:
return "INDEX_SCAN_NODE";
case LogicalOperatorType::IMPORT_DATABASE:
return "IMPORT_DATABASE";
case LogicalOperatorType::INTERSECT:
return "INTERSECT";
case LogicalOperatorType::INSERT:
Expand Down Expand Up @@ -105,16 +112,13 @@ std::string LogicalOperatorUtils::logicalOperatorTypeToString(LogicalOperatorTyp
return "UNION_ALL";
case LogicalOperatorType::UNWIND:
return "UNWIND";
case LogicalOperatorType::EXTENSION:
return "LOAD";
case LogicalOperatorType::EXPORT_DATABASE:
return "EXPORT_DATABASE";
case LogicalOperatorType::IMPORT_DATABASE:
return "IMPORT_DATABASE";
case LogicalOperatorType::USE_DATABASE:
return "USE_DATABASE";
default:
KU_UNREACHABLE;
throw RuntimeException("Unknown logical operator type.");
}
}
// LCOV_EXCL_STOP

bool LogicalOperatorUtils::isUpdate(LogicalOperatorType type) {
switch (type) {
Expand Down
Loading

0 comments on commit cd911e3

Please sign in to comment.