Skip to content

Commit

Permalink
Merge pull request #2547 from kuzudb/storage-info
Browse files Browse the repository at this point in the history
CALL storage_info
  • Loading branch information
ray6080 committed Dec 9, 2023
2 parents 7fe1b43 + 4f67f40 commit 8b7cffe
Show file tree
Hide file tree
Showing 49 changed files with 606 additions and 182 deletions.
6 changes: 4 additions & 2 deletions src/binder/bind/bind_copy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyNodeFrom(const Statement& statem
tableSchema, copyStatement.getColumnNames(), expectedColumnNames, expectedColumnTypes);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, std::move(expectedColumnNames), std::move(expectedColumnTypes));
auto bindData = func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand All @@ -153,7 +154,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRelFrom(const parser::Statement&
tableSchema, copyStatement.getColumnNames(), expectedColumnNames, expectedColumnTypes);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(memoryManager,
std::move(*config), std::move(expectedColumnNames), std::move(expectedColumnTypes));
auto bindData = func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand Down
6 changes: 4 additions & 2 deletions src/binder/bind/bind_reading_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ std::unique_ptr<BoundReadingClause> Binder::bindInQueryCall(const ReadingClause&
catalog.getBuiltInFunctions()->matchScalarFunction(funcName, inputTypes));
auto bindInput = std::make_unique<function::TableFuncBindInput>();
bindInput->inputs = std::move(inputValues);
auto bindData = tableFunction->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
tableFunction->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand Down Expand Up @@ -173,7 +174,8 @@ std::unique_ptr<BoundReadingClause> Binder::bindLoadFrom(
auto scanFunction = getScanFunction(readerConfig->fileType, *readerConfig);
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(memoryManager,
*readerConfig, std::move(expectedColumnNames), std::move(expectedColumnTypes));
auto bindData = scanFunction->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
scanFunction->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand Down
6 changes: 4 additions & 2 deletions src/binder/bind/copy/bind_copy_rdf_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfNodeFrom(const Statement& /*s
}
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes));
auto bindData = func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand Down Expand Up @@ -76,7 +77,8 @@ std::unique_ptr<BoundStatement> Binder::bindCopyRdfRelFrom(const Statement& /*st
}
auto bindInput = std::make_unique<function::ScanTableFuncBindInput>(
memoryManager, *config, columnNames, std::move(columnTypes));
auto bindData = func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog);
auto bindData =
func->bindFunc(clientContext, bindInput.get(), (Catalog*)&catalog, storageManager);
expression_vector columns;
for (auto i = 0u; i < bindData->columnTypes.size(); i++) {
columns.push_back(createVariable(bindData->columnNames[i], *bindData->columnTypes[i]));
Expand Down
4 changes: 2 additions & 2 deletions src/common/enums/rel_direction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ namespace common {
std::string RelDataDirectionUtils::relDirectionToString(RelDataDirection direction) {
switch (direction) {
case RelDataDirection::FWD:
return "forward";
return "fwd";
case RelDataDirection::BWD:
return "backward";
return "bwd";
default:
KU_UNREACHABLE;
}
Expand Down
1 change: 1 addition & 0 deletions src/function/built_in_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@ void BuiltInFunctions::registerTableFunctions() {
functions.insert({READ_CSV_SERIAL_FUNC_NAME, processor::SerialCSVScan::getFunctionSet()});
functions.insert({READ_CSV_PARALLEL_FUNC_NAME, processor::ParallelCSVScan::getFunctionSet()});
functions.insert({READ_RDF_FUNC_NAME, processor::RdfScan::getFunctionSet()});
functions.insert({STORAGE_INFO_FUNC_NAME, StorageInfoFunction::getFunctionSet()});
}

void BuiltInFunctions::addFunction(std::string name, function::function_set definitions) {
Expand Down
204 changes: 193 additions & 11 deletions src/function/table_functions/call_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@
#include "catalog/rel_table_schema.h"
#include "common/exception/binder.h"
#include "main/client_context.h"
#include "storage/storage_manager.h"
#include "storage/store/string_column.h"
#include "storage/store/struct_column.h"
#include "storage/store/var_list_column.h"
#include "transaction/transaction.h"

using namespace kuzu::transaction;
using namespace kuzu::common;
using namespace kuzu::catalog;
using namespace kuzu::main;
using namespace kuzu::storage;

namespace kuzu {
namespace function {

std::unique_ptr<TableFuncLocalState> initLocalState(
TableFunctionInitInput& /*input*/, TableFuncSharedState* /*state*/) {
TableFunctionInitInput& /*input*/, TableFuncSharedState* /*state*/, MemoryManager* /*mm*/) {
return std::make_unique<TableFuncLocalState>();
}

Expand Down Expand Up @@ -59,8 +64,8 @@ void CurrentSettingFunction::tableFunc(TableFunctionInput& data, DataChunk& outp
output.state->selVector->selectedSize = 1;
}

std::unique_ptr<TableFuncBindData> CurrentSettingFunction::bindFunc(
ClientContext* context, TableFuncBindInput* input, Catalog* /*catalog*/) {
std::unique_ptr<TableFuncBindData> CurrentSettingFunction::bindFunc(ClientContext* context,
TableFuncBindInput* input, Catalog* /*catalog*/, StorageManager* /*storageManager*/) {
auto optionName = input->inputs[0]->getValue<std::string>();
std::vector<std::string> returnColumnNames;
std::vector<std::unique_ptr<LogicalType>> returnTypes;
Expand Down Expand Up @@ -90,8 +95,8 @@ void DBVersionFunction::tableFunc(TableFunctionInput& input, DataChunk& outputCh
outputChunk.state->selVector->selectedSize = 1;
}

std::unique_ptr<TableFuncBindData> DBVersionFunction::bindFunc(
ClientContext* /*context*/, TableFuncBindInput* /*input*/, Catalog* /*catalog*/) {
std::unique_ptr<TableFuncBindData> DBVersionFunction::bindFunc(ClientContext* /*context*/,
TableFuncBindInput* /*input*/, Catalog* /*catalog*/, StorageManager* /*storageManager*/) {
std::vector<std::string> returnColumnNames;
std::vector<std::unique_ptr<LogicalType>> returnTypes;
returnColumnNames.emplace_back("version");
Expand Down Expand Up @@ -126,8 +131,8 @@ void ShowTablesFunction::tableFunc(TableFunctionInput& input, DataChunk& outputC
outputChunk.state->selVector->selectedSize = numTablesToOutput;
}

std::unique_ptr<TableFuncBindData> ShowTablesFunction::bindFunc(
ClientContext* context, TableFuncBindInput* /*input*/, Catalog* catalog) {
std::unique_ptr<TableFuncBindData> ShowTablesFunction::bindFunc(ClientContext* context,
TableFuncBindInput* /*input*/, Catalog* catalog, StorageManager* /*storageManager*/) {
std::vector<std::string> returnColumnNames;
std::vector<std::unique_ptr<LogicalType>> returnTypes;
returnColumnNames.emplace_back("name");
Expand Down Expand Up @@ -178,8 +183,8 @@ void TableInfoFunction::tableFunc(TableFunctionInput& input, DataChunk& outputCh
outputChunk.state->selVector->selectedSize = outVectorPos;
}

std::unique_ptr<TableFuncBindData> TableInfoFunction::bindFunc(
ClientContext* context, TableFuncBindInput* input, Catalog* catalog) {
std::unique_ptr<TableFuncBindData> TableInfoFunction::bindFunc(ClientContext* context,
TableFuncBindInput* input, Catalog* catalog, StorageManager* /*storageManager*/) {
std::vector<std::string> returnColumnNames;
std::vector<std::unique_ptr<LogicalType>> returnTypes;
auto tableName = input->inputs[0]->getValue<std::string>();
Expand Down Expand Up @@ -251,8 +256,8 @@ void ShowConnectionFunction::tableFunc(TableFunctionInput& input, DataChunk& out
outputChunk.state->selVector->selectedSize = vectorPos;
}

std::unique_ptr<TableFuncBindData> ShowConnectionFunction::bindFunc(
ClientContext* context, TableFuncBindInput* input, Catalog* catalog) {
std::unique_ptr<TableFuncBindData> ShowConnectionFunction::bindFunc(ClientContext* context,
TableFuncBindInput* input, Catalog* catalog, StorageManager* /*storageManager*/) {
std::vector<std::string> returnColumnNames;
std::vector<std::unique_ptr<LogicalType>> returnTypes;
auto tableName = input->inputs[0]->getValue<std::string>();
Expand All @@ -273,5 +278,182 @@ std::unique_ptr<TableFuncBindData> ShowConnectionFunction::bindFunc(
reinterpret_cast<RelTableGroupSchema*>(schema)->getRelTableIDs().size());
}

std::unique_ptr<TableFuncBindData> StorageInfoFunction::bindFunc(ClientContext* context,
TableFuncBindInput* input, Catalog* catalog, StorageManager* storageManager) {
std::vector<std::string> columnNames = {"node_group_id", "column_name", "data_type",
"table_type", "start_page_idx", "num_pages", "num_values", "compression"};
std::vector<std::unique_ptr<LogicalType>> columnTypes;
columnTypes.emplace_back(LogicalType::INT64());
columnTypes.emplace_back(LogicalType::STRING());
columnTypes.emplace_back(LogicalType::STRING());
columnTypes.emplace_back(LogicalType::STRING());
columnTypes.emplace_back(LogicalType::INT64());
columnTypes.emplace_back(LogicalType::INT64());
columnTypes.emplace_back(LogicalType::INT64());
columnTypes.emplace_back(LogicalType::STRING());
auto tableName = input->inputs[0]->getValue<std::string>();
if (!catalog->containsTable(context->getTx(), tableName)) {
throw BinderException{"Table " + tableName + " does not exist!"};
}
auto tableID = catalog->getTableID(context->getTx(), tableName);
auto schema = catalog->getTableSchema(context->getTx(), tableID);
auto table = schema->tableType == TableType::NODE ?
reinterpret_cast<Table*>(storageManager->getNodeTable(tableID)) :
reinterpret_cast<Table*>(storageManager->getRelTable(tableID));
return std::make_unique<StorageInfoBindData>(
schema, std::move(columnTypes), std::move(columnNames), table);
}

StorageInfoSharedState::StorageInfoSharedState(Table* table, offset_t maxOffset)
: CallFuncSharedState{maxOffset} {
collectColumns(table);
}

void StorageInfoSharedState::collectColumns(Table* table) {
switch (table->getTableType()) {
case TableType::NODE: {
auto nodeTable = ku_dynamic_cast<Table*, NodeTable*>(table);
for (auto columnID = 0u; columnID < nodeTable->getNumColumns(); columnID++) {
auto collectedColumns = collectColumns(nodeTable->getColumn(columnID));
columns.insert(columns.end(), collectedColumns.begin(), collectedColumns.end());
}
} break;
case TableType::REL: {
auto relTable = ku_dynamic_cast<Table*, RelTable*>(table);
columns.push_back(relTable->getAdjColumn(RelDataDirection::FWD));
columns.push_back(relTable->getAdjColumn(RelDataDirection::BWD));
for (auto columnID = 0u; columnID < relTable->getNumColumns(); columnID++) {
auto column = relTable->getColumn(columnID, RelDataDirection::FWD);
auto collectedColumns = collectColumns(column);
columns.insert(columns.end(), collectedColumns.begin(), collectedColumns.end());
column = relTable->getColumn(columnID, RelDataDirection::BWD);
collectedColumns = collectColumns(column);
columns.insert(columns.end(), collectedColumns.begin(), collectedColumns.end());
}
} break;
default: {
KU_UNREACHABLE;
}
}
}

std::vector<Column*> StorageInfoSharedState::collectColumns(Column* column) {
std::vector<Column*> result;
result.push_back(column);
result.push_back(column->getNullColumn());
switch (column->getDataType()->getPhysicalType()) {
case PhysicalTypeID::STRUCT: {
auto structColumn = ku_dynamic_cast<Column*, StructColumn*>(column);
auto numChildren = StructType::getNumFields(structColumn->getDataType());
for (auto i = 0u; i < numChildren; i++) {
auto childColumn = structColumn->getChild(i);
auto subColumns = collectColumns(childColumn);
result.insert(result.end(), subColumns.begin(), subColumns.end());
}
} break;
case PhysicalTypeID::STRING: {
auto stringColumn = ku_dynamic_cast<Column*, StringColumn*>(column);
result.push_back(stringColumn->getDataColumn());
result.push_back(stringColumn->getOffsetColumn());
} break;
case PhysicalTypeID::VAR_LIST: {
auto varListColumn = ku_dynamic_cast<Column*, VarListColumn*>(column);
result.push_back(varListColumn->getDataColumn());
} break;
default: {
// DO NOTHING.
}
}
return result;
}

std::unique_ptr<TableFuncSharedState> StorageInfoFunction::initSharedState(
TableFunctionInitInput& input) {
auto storageInfoBindData = reinterpret_cast<StorageInfoBindData*>(input.bindData);
return std::make_unique<StorageInfoSharedState>(
storageInfoBindData->table, storageInfoBindData->maxOffset);
}

std::unique_ptr<TableFuncLocalState> StorageInfoFunction::initLocalState(
TableFunctionInitInput& /*input*/, TableFuncSharedState* /*sharedState*/, MemoryManager* mm) {
return std::make_unique<StorageInfoLocalState>(mm);
}

void StorageInfoFunction::tableFunc(TableFunctionInput& input, DataChunk& outputChunk) {
auto localState =
ku_dynamic_cast<TableFuncLocalState*, StorageInfoLocalState*>(input.localState);
auto sharedState =
ku_dynamic_cast<TableFuncSharedState*, StorageInfoSharedState*>(input.sharedState);
KU_ASSERT(outputChunk.state->selVector->isUnfiltered());
while (true) {
if (localState->currChunkIdx < localState->dataChunkCollection->getNumChunks()) {
// Copy from local state chunk.
auto chunk = localState->dataChunkCollection->getChunk(localState->currChunkIdx);
auto numValuesToOutput = chunk->state->selVector->selectedSize;
for (auto columnIdx = 0u; columnIdx < outputChunk.getNumValueVectors(); columnIdx++) {
auto localVector = chunk->getValueVector(columnIdx);
auto outputVector = outputChunk.getValueVector(columnIdx);
for (auto i = 0u; i < numValuesToOutput; i++) {
outputVector->copyFromVectorData(i, localVector.get(), i);
}
}
outputChunk.state->selVector->resetSelectorToUnselectedWithSize(numValuesToOutput);
localState->currChunkIdx++;
return;
}
auto morsel = reinterpret_cast<CallFuncSharedState*>(input.sharedState)->getMorsel();
if (!morsel.hasMoreToOutput()) {
outputChunk.state->selVector->selectedSize = 0;
return;
}
auto storageInfoBindData = reinterpret_cast<StorageInfoBindData*>(input.bindData);
auto tableSchema = storageInfoBindData->schema;
std::string tableType = tableSchema->getTableType() == TableType::NODE ? "NODE" : "REL";
for (auto columnID = 0u; columnID < sharedState->columns.size(); columnID++) {
appendStorageInfoForColumn(
tableType, outputChunk, localState, sharedState->columns[columnID]);
}
localState->dataChunkCollection->append(outputChunk);
outputChunk.resetAuxiliaryBuffer();
outputChunk.state->selVector->selectedSize = 0;
}
}

function_set StorageInfoFunction::getFunctionSet() {
function_set functionSet;
functionSet.push_back(std::make_unique<TableFunction>("storage_info", tableFunc, bindFunc,
initSharedState, initLocalState, std::vector<LogicalTypeID>{LogicalTypeID::STRING}));
return functionSet;
}

void StorageInfoFunction::appendStorageInfoForColumn(std::string tableType, DataChunk& outputChunk,
StorageInfoLocalState* localState, const Column* column) {
auto numNodeGroups = column->getNumNodeGroups(&transaction::DUMMY_READ_TRANSACTION);
for (auto nodeGroupIdx = 0u; nodeGroupIdx < numNodeGroups; nodeGroupIdx++) {
if (outputChunk.state->selVector->selectedSize == DEFAULT_VECTOR_CAPACITY) {
localState->dataChunkCollection->append(outputChunk);
outputChunk.resetAuxiliaryBuffer();
outputChunk.state->selVector->selectedSize = 0;
}
appendColumnChunkStorageInfo(nodeGroupIdx, tableType, column, outputChunk);
}
}

void StorageInfoFunction::appendColumnChunkStorageInfo(node_group_idx_t nodeGroupIdx,
const std::string& tableType, const Column* column, DataChunk& outputChunk) {
auto vectorPos = outputChunk.state->selVector->selectedSize;
auto metadata = column->getMetadata(nodeGroupIdx, transaction::TransactionType::READ_ONLY);
auto columnType = column->getDataType()->toString();
outputChunk.getValueVector(0)->setValue<uint64_t>(vectorPos, nodeGroupIdx);
outputChunk.getValueVector(1)->setValue(vectorPos, column->getName());
outputChunk.getValueVector(2)->setValue(vectorPos, columnType);
outputChunk.getValueVector(3)->setValue(vectorPos, tableType);
outputChunk.getValueVector(4)->setValue<uint64_t>(vectorPos, metadata.pageIdx);
outputChunk.getValueVector(5)->setValue<uint64_t>(vectorPos, metadata.numPages);
outputChunk.getValueVector(6)->setValue<uint64_t>(vectorPos, metadata.numValues);
outputChunk.getValueVector(7)->setValue(vectorPos, metadata.compMeta.toString());
outputChunk.state->selVector->selectedSize++;
}

} // namespace function
} // namespace kuzu
5 changes: 5 additions & 0 deletions src/include/common/data_chunk/data_chunk_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class DataChunkCollection {
void append(std::unique_ptr<DataChunk> chunk);
std::vector<common::DataChunk*> getChunks() const;

inline uint64_t getNumChunks() const { return chunks.size(); }
inline DataChunk* getChunk(uint64_t idx) const {
KU_ASSERT(idx < chunks.size());
return chunks[idx].get();
}
inline void merge(DataChunkCollection* other) {
for (auto& chunk : other->chunks) {
append(std::move(chunk));
Expand Down
1 change: 1 addition & 0 deletions src/include/common/enums/expression_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ const char* const READ_CSV_SERIAL_FUNC_NAME = "READ_CSV_SERIAL";
const char* const READ_CSV_PARALLEL_FUNC_NAME = "READ_CSV_PARALLEL";
const char* const READ_RDF_FUNC_NAME = "READ_RDF";
const char* const READ_PANDAS_FUNC_NAME = "READ_PANDAS";
const char* const STORAGE_INFO_FUNC_NAME = "STORAGE_INFO";

enum class ExpressionType : uint8_t {

Expand Down
Loading

0 comments on commit 8b7cffe

Please sign in to comment.