Skip to content

Commit

Permalink
Merge pull request #3020 from kuzudb/copy-from-subquery
Browse files Browse the repository at this point in the history
Add copy from subquery
  • Loading branch information
andyfengHKU committed Mar 12, 2024
2 parents 0c26056 + bdd650e commit 7a3ff60
Show file tree
Hide file tree
Showing 54 changed files with 3,657 additions and 3,180 deletions.
18 changes: 11 additions & 7 deletions scripts/antlr4/Cypher.g4.copy
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,28 @@ oC_Statement
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: COPY SP oC_SchemaName ( ( SP? kU_ColumnNames SP? ) | SP ) FROM SP kU_ScanSource ( SP? kU_ParsingOptions )? ;

kU_ColumnNames
: oC_SchemaName ( SP? ',' SP? oC_SchemaName )* ;
: '(' SP? oC_SchemaName ( SP? ',' SP? oC_SchemaName )* SP? ')';

kU_ScanSource
: kU_FilePaths
| '(' SP? oC_Query SP? ')'
| oC_Variable ;

kU_CopyFromByColumn
: COPY SP oC_SchemaName SP FROM SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' SP BY SP COLUMN ;

kU_CopyTO
: COPY SP '(' SP? oC_Query SP? ')' SP TO SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: COPY SP '(' SP? oC_Query SP? ')' SP TO SP StringLiteral ( SP? kU_ParsingOptions )? ;

kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: EXPORT SP DATABASE SP StringLiteral ( SP? kU_ParsingOptions )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -81,7 +85,7 @@ kU_FilePaths
GLOB : ( 'G' | 'g' ) ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'B' | 'b' ) ;

kU_ParsingOptions
: kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* ;
: '(' SP? kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* SP? ')' ;

kU_ParsingOption
: oC_SymbolicName SP? '=' SP? oC_Literal;
Expand Down Expand Up @@ -291,7 +295,7 @@ oC_ReadingClause
;

kU_LoadFrom
: LOAD ( SP WITH SP HEADERS SP? '(' SP? kU_PropertyDefinitions SP? ')' )? SP FROM SP (kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? | oC_Variable) (SP? oC_Where)? ;
: LOAD ( SP WITH SP HEADERS SP? '(' SP? kU_PropertyDefinitions SP? ')' )? SP FROM SP kU_ScanSource (SP? kU_ParsingOptions)? (SP? oC_Where)? ;

LOAD : ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'A' | 'a' ) ( 'D' | 'd' ) ;

Expand Down
18 changes: 11 additions & 7 deletions src/antlr4/Cypher.g4
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,28 @@ oC_Statement
| kU_ImportDatabase;

kU_CopyFrom
: COPY SP oC_SchemaName ( ( SP? '(' SP? kU_ColumnNames SP? ')' SP? ) | SP ) FROM SP (kU_FilePaths | oC_Variable) ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: COPY SP oC_SchemaName ( ( SP? kU_ColumnNames SP? ) | SP ) FROM SP kU_ScanSource ( SP? kU_ParsingOptions )? ;

kU_ColumnNames
: oC_SchemaName ( SP? ',' SP? oC_SchemaName )* ;
: '(' SP? oC_SchemaName ( SP? ',' SP? oC_SchemaName )* SP? ')';

kU_ScanSource
: kU_FilePaths
| '(' SP? oC_Query SP? ')'
| oC_Variable ;

kU_CopyFromByColumn
: COPY SP oC_SchemaName SP FROM SP '(' SP? StringLiteral ( SP? ',' SP? StringLiteral )* ')' SP BY SP COLUMN ;

kU_CopyTO
: COPY SP '(' SP? oC_Query SP? ')' SP TO SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: COPY SP '(' SP? oC_Query SP? ')' SP TO SP StringLiteral ( SP? kU_ParsingOptions )? ;

kU_ExportDatabase
: EXPORT SP DATABASE SP StringLiteral ( SP? '(' SP? kU_ParsingOptions SP? ')' )? ;
: EXPORT SP DATABASE SP StringLiteral ( SP? kU_ParsingOptions )? ;

kU_ImportDatabase
: IMPORT SP DATABASE SP StringLiteral;


kU_StandaloneCall
: CALL SP oC_SymbolicName SP? '=' SP? oC_Literal ;

Expand Down Expand Up @@ -81,7 +85,7 @@ kU_FilePaths
GLOB : ( 'G' | 'g' ) ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'B' | 'b' ) ;

kU_ParsingOptions
: kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* ;
: '(' SP? kU_ParsingOption ( SP? ',' SP? kU_ParsingOption )* SP? ')' ;

kU_ParsingOption
: oC_SymbolicName SP? '=' SP? oC_Literal;
Expand Down Expand Up @@ -291,7 +295,7 @@ oC_ReadingClause
;

kU_LoadFrom
: LOAD ( SP WITH SP HEADERS SP? '(' SP? kU_PropertyDefinitions SP? ')' )? SP FROM SP (kU_FilePaths ( SP? '(' SP? kU_ParsingOptions SP? ')' )? | oC_Variable) (SP? oC_Where)? ;
: LOAD ( SP WITH SP HEADERS SP? '(' SP? kU_PropertyDefinitions SP? ')' )? SP FROM SP kU_ScanSource (SP? kU_ParsingOptions)? (SP? oC_Where)? ;

LOAD : ( 'L' | 'l' ) ( 'O' | 'o' ) ( 'A' | 'a' ) ( 'D' | 'd' ) ;

Expand Down
156 changes: 69 additions & 87 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ using namespace kuzu::binder;
using namespace kuzu::catalog;
using namespace kuzu::common;
using namespace kuzu::parser;
using namespace kuzu::function;

namespace kuzu {
namespace binder {
Expand Down Expand Up @@ -47,24 +48,6 @@ std::unique_ptr<BoundStatement> Binder::bindCopyToClause(const Statement& statem
std::move(columnTypes), std::move(query), csvConfig.option.copy());
}

// As a temporary constraint, we require npy files loaded with COPY FROM BY COLUMN keyword.
// And csv and parquet files loaded with COPY FROM keyword.
static void validateByColumnKeyword(FileType fileType, bool byColumn) {
if (fileType == FileType::NPY && !byColumn) {
throw BinderException(ExceptionMessage::validateCopyNPYByColumnException());
}
if (fileType != FileType::NPY && byColumn) {
throw BinderException(ExceptionMessage::validateCopyCSVParquetByColumnException());
}
}

static void validateCopyNpyNotForRelTables(TableCatalogEntry* tableEntry) {
if (tableEntry->getTableType() == TableType::REL) {
throw BinderException(
ExceptionMessage::validateCopyNpyNotForRelTablesException(tableEntry->getName()));
}
}

std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& statement) {
auto& copyStatement = ku_dynamic_cast<const Statement&, const CopyFrom&>(statement);
auto tableName = copyStatement.getTableName();
Expand All @@ -81,85 +64,84 @@ std::unique_ptr<BoundStatement> Binder::bindCopyFromClause(const Statement& stat
default:
break;
}
auto filePaths = bindFilePaths(copyStatement.getFilePaths());
auto fileType = bindFileType(filePaths);
auto readerConfig = std::make_unique<ReaderConfig>(fileType, std::move(filePaths));
readerConfig->options = bindParsingOptions(copyStatement.getParsingOptionsRef());
validateByColumnKeyword(readerConfig->fileType, copyStatement.byColumn());
if (readerConfig->fileType == FileType::NPY) {
validateCopyNpyNotForRelTables(tableEntry);
}
switch (tableEntry->getTableType()) {
case TableType::NODE:
return bindCopyNodeFrom(statement, std::move(readerConfig),
ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(tableEntry));
case TableType::REL:
return bindCopyRelFrom(statement, std::move(readerConfig),
ku_dynamic_cast<TableCatalogEntry*, RelTableCatalogEntry*>(tableEntry));
case TableType::RDF:
return bindCopyRdfFrom(statement, std::move(readerConfig),
ku_dynamic_cast<TableCatalogEntry*, RDFGraphCatalogEntry*>(tableEntry));
case TableType::NODE: {
auto nodeTableEntry =
ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(tableEntry);
return bindCopyNodeFrom(statement, nodeTableEntry);
}
case TableType::REL: {
auto relTableEntry = ku_dynamic_cast<TableCatalogEntry*, RelTableCatalogEntry*>(tableEntry);
return bindCopyRelFrom(statement, relTableEntry);
}
case TableType::RDF: {
auto rdfGraphEntry = ku_dynamic_cast<TableCatalogEntry*, RDFGraphCatalogEntry*>(tableEntry);
return bindCopyRdfFrom(statement, rdfGraphEntry);
}
default: {
KU_UNREACHABLE;
}
}
}

std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statement,
std::unique_ptr<common::ReaderConfig> config, NodeTableCatalogEntry* nodeTableEntry) {
static void bindExpectedNodeColumns(NodeTableCatalogEntry* nodeTableEntry,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
std::vector<LogicalType>& columnTypes);
static void bindExpectedRelColumns(RelTableCatalogEntry* relTableEntry,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
std::vector<LogicalType>& columnTypes, main::ClientContext* context);

std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(
const Statement& statement, NodeTableCatalogEntry* nodeTableEntry) {
auto& copyStatement = ku_dynamic_cast<const Statement&, const CopyFrom&>(statement);
auto func = getScanFunction(config->fileType, *config);
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = nodeTableEntry->containPropertyType(*LogicalType::SERIAL());
// Bind expected columns based on catalog information.
std::vector<std::string> expectedColumnNames;
std::vector<common::LogicalType> expectedColumnTypes;
std::vector<LogicalType> expectedColumnTypes;
bindExpectedNodeColumns(
nodeTableEntry, copyStatement.getColumnNames(), expectedColumnNames, expectedColumnTypes);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(config->copy(),
std::move(expectedColumnNames), std::move(expectedColumnTypes), clientContext);
auto bindData = func->bindFunc(clientContext, bindInput.get());
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], bindData->columnTypes[i]));
auto boundSource = bindScanSource(copyStatement.getSource(),
copyStatement.getParsingOptionsRef(), expectedColumnNames, expectedColumnTypes);
if (boundSource->type == ScanSourceType::FILE) {
auto fileSource =
ku_dynamic_cast<BoundBaseScanSource*, BoundFileScanSource*>(boundSource.get());
auto bindData = ku_dynamic_cast<TableFuncBindData*, ScanBindData*>(
fileSource->fileScanInfo.bindData.get());
if (copyStatement.byColumn() && bindData->config.fileType != FileType::NPY) {
throw BinderException(stringFormat("Copy by column with {} file type is not supported.",
FileTypeUtils::toString(bindData->config.fileType)));
}
}
auto offset = expressionBinder.createVariableExpression(
LogicalType(LogicalTypeID::INT64), std::string(InternalKeyword::ANONYMOUS));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, std::move(offset));
auto boundCopyFromInfo = BoundCopyFromInfo(
nodeTableEntry, std::move(boundFileScanInfo), containsSerial, nullptr /* extraInfo */);
*LogicalType::INT64(), std::string(InternalKeyword::ANONYMOUS));
auto boundCopyFromInfo =
BoundCopyFromInfo(nodeTableEntry, std::move(boundSource), offset, nullptr /* extraInfo */);
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement& statement,
std::unique_ptr<common::ReaderConfig> config, RelTableCatalogEntry* relTableEntry) {
std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(
const parser::Statement& statement, RelTableCatalogEntry* relTableEntry) {
auto& copyStatement = ku_dynamic_cast<const Statement&, const CopyFrom&>(statement);
auto func = getScanFunction(config->fileType, *config);
// For table with SERIAL columns, we need to read in serial from files.
auto containsSerial = relTableEntry->containPropertyType(*LogicalType::SERIAL());
KU_ASSERT(containsSerial == false);
std::vector<std::string> expectedColumnNames;
std::vector<common::LogicalType> expectedColumnTypes;
bindExpectedRelColumns(
relTableEntry, copyStatement.getColumnNames(), expectedColumnNames, expectedColumnTypes);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(std::move(*config),
std::move(expectedColumnNames), std::move(expectedColumnTypes), clientContext);
auto bindData = func->bindFunc(clientContext, bindInput.get());
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], bindData->columnTypes[i]));
if (copyStatement.byColumn()) {
throw BinderException(
stringFormat("Copy by column is not supported for relationship table."));
}
// Bind expected columns based on catalog information.
std::vector<std::string> expectedColumnNames;
std::vector<LogicalType> expectedColumnTypes;
bindExpectedRelColumns(relTableEntry, copyStatement.getColumnNames(), expectedColumnNames,
expectedColumnTypes, clientContext);
auto boundSource = bindScanSource(copyStatement.getSource(),
copyStatement.getParsingOptionsRef(), expectedColumnNames, expectedColumnTypes);
auto columns = boundSource->getColumns();
auto offset = expressionBinder.createVariableExpression(
*LogicalType::INT64(), std::string(InternalKeyword::ROW_OFFSET));
auto boundFileScanInfo =
std::make_unique<BoundFileScanInfo>(func, std::move(bindData), columns, offset);
auto srcTableID = relTableEntry->getSrcTableID();
auto dstTableID = relTableEntry->getDstTableID();
auto catalog = clientContext->getCatalog();
auto srcSchema = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(
catalog->getTableCatalogEntry(clientContext->getTx(), srcTableID));
auto dstSchema = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(
catalog->getTableCatalogEntry(clientContext->getTx(), dstTableID));
auto srcEntry = catalog->getTableCatalogEntry(clientContext->getTx(), srcTableID);
auto dstEntry = catalog->getTableCatalogEntry(clientContext->getTx(), dstTableID);
auto srcNodeEntry = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(srcEntry);
auto dstNodeEntry = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(dstEntry);
auto srcKey = columns[0];
auto dstKey = columns[1];
expression_vector propertyColumns;
Expand All @@ -168,8 +150,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
}
auto srcOffset = createVariable(InternalKeyword::SRC_OFFSET, LogicalTypeID::INT64);
auto dstOffset = createVariable(InternalKeyword::DST_OFFSET, LogicalTypeID::INT64);
auto srcPkType = srcSchema->getPrimaryKey()->getDataType();
auto dstPkType = dstSchema->getPrimaryKey()->getDataType();
auto srcPkType = srcNodeEntry->getPrimaryKey()->getDataType();
auto dstPkType = dstNodeEntry->getPrimaryKey()->getDataType();
auto srcLookUpInfo = IndexLookupInfo(srcTableID, srcOffset, srcKey, *srcPkType);
auto dstLookUpInfo = IndexLookupInfo(dstTableID, dstOffset, dstKey, *dstPkType);
auto extraCopyRelInfo = std::make_unique<ExtraBoundCopyRelInfo>();
Expand All @@ -178,8 +160,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
extraCopyRelInfo->propertyColumns = std::move(propertyColumns);
extraCopyRelInfo->infos.push_back(std::move(srcLookUpInfo));
extraCopyRelInfo->infos.push_back(std::move(dstLookUpInfo));
auto boundCopyFromInfo = BoundCopyFromInfo(
relTableEntry, std::move(boundFileScanInfo), containsSerial, std::move(extraCopyRelInfo));
auto boundCopyFromInfo =
BoundCopyFromInfo(relTableEntry, boundSource->copy(), offset, std::move(extraCopyRelInfo));
return std::make_unique<BoundCopyFrom>(std::move(boundCopyFromInfo));
}

Expand Down Expand Up @@ -231,22 +213,22 @@ static void bindExpectedColumns(TableCatalogEntry* tableEntry,
}
}

void Binder::bindExpectedNodeColumns(NodeTableCatalogEntry* nodeTableEntry,
void bindExpectedNodeColumns(NodeTableCatalogEntry* nodeTableEntry,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
std::vector<common::LogicalType>& columnTypes) {
std::vector<LogicalType>& columnTypes) {
KU_ASSERT(columnNames.empty() && columnTypes.empty());
bindExpectedColumns(nodeTableEntry, inputColumnNames, columnNames, columnTypes);
}

void Binder::bindExpectedRelColumns(RelTableCatalogEntry* relTableEntry,
void bindExpectedRelColumns(RelTableCatalogEntry* relTableEntry,
const std::vector<std::string>& inputColumnNames, std::vector<std::string>& columnNames,
std::vector<common::LogicalType>& columnTypes) {
std::vector<LogicalType>& columnTypes, main::ClientContext* context) {
KU_ASSERT(columnNames.empty() && columnTypes.empty());
auto catalog = clientContext->getCatalog();
auto srcTable = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(
catalog->getTableCatalogEntry(clientContext->getTx(), relTableEntry->getSrcTableID()));
auto dstTable = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(
catalog->getTableCatalogEntry(clientContext->getTx(), relTableEntry->getDstTableID()));
auto catalog = context->getCatalog();
auto srcEntry = catalog->getTableCatalogEntry(context->getTx(), relTableEntry->getSrcTableID());
auto srcTable = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(srcEntry);
auto dstEntry = catalog->getTableCatalogEntry(context->getTx(), relTableEntry->getDstTableID());
auto dstTable = ku_dynamic_cast<TableCatalogEntry*, NodeTableCatalogEntry*>(dstEntry);
columnNames.push_back("from");
columnNames.push_back("to");
auto srcPKColumnType = *srcTable->getPrimaryKey()->getDataType();
Expand Down
Loading

0 comments on commit 7a3ff60

Please sign in to comment.