Skip to content

Commit

Permalink
access_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
hououou authored and acquamarin committed Oct 16, 2023
1 parent 1a3b814 commit 392d4a9
Show file tree
Hide file tree
Showing 13 changed files with 276 additions and 19 deletions.
26 changes: 25 additions & 1 deletion src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ int64_t FileInfo::getFileSize() {
#endif
}

std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags) {
std::unique_ptr<FileInfo> FileUtils::openFile(
const std::string& path, int flags, FileLockType lock_type) {
#if defined(_WIN32)
auto dwDesiredAccess = 0ul;
auto dwCreationDisposition = (flags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING;
Expand All @@ -62,12 +63,35 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
throw Exception(StringUtils::string_format("Cannot open file. path: {} - Error {}: {}",
path, GetLastError(), std::system_category().message(GetLastError())));
}
if (lock_type != FileLockType::NO_LOCK) {
DWORD dwFlags = lock_type == FileLockType::READ_LOCK ?
LOCKFILE_FAIL_IMMEDIATELY :
LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK;
OVERLAPPED overlapped = {0};
overlapped.Offset = 0;
BOOL rc = LockFileEx(handle, dwFlags, 0, 0, 0, &overlapped);
if (!rc) {
throw Exception("Could not set lock on file : " + path);
}
}
return std::make_unique<FileInfo>(path, handle);
#else
int fd = open(path.c_str(), flags, 0644);
if (fd == -1) {
throw Exception("Cannot open file: " + path);
}
if (lock_type != FileLockType::NO_LOCK) {
struct flock fl;
memset(&fl, 0, sizeof fl);
fl.l_type = lock_type == FileLockType::READ_LOCK ? F_RDLCK : F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
int rc = fcntl(fd, F_SETLK, &fl);
if (rc == -1) {
throw Exception("Could not set lock on file : " + path);
}
}
return std::make_unique<FileInfo>(path, fd);
#endif
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ struct StorageConstants {
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal";
static constexpr char DATA_FILE_NAME[] = "data.kz";
static constexpr char METADATA_FILE_NAME[] = "metadata.kz";
static constexpr char LOCK_FILE_NAME[] = ".lock";

// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
Expand Down
5 changes: 4 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
namespace kuzu {
namespace common {

enum class FileLockType : uint8_t { NO_LOCK = 0, READ_LOCK = 1, WRITE_LOCK = 2 };

struct FileInfo {
#ifdef _WIN32
FileInfo(std::string path, const void* handle) : path{std::move(path)}, handle{handle} {}
Expand All @@ -36,7 +38,8 @@ struct FileInfo {

class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);
static std::unique_ptr<FileInfo> openFile(
const std::string& path, int flags, FileLockType lock_type = FileLockType::NO_LOCK);

static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
Expand Down
6 changes: 4 additions & 2 deletions src/include/main/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class Connection {
addScalarFunction(name, std::move(definitions));
}

protected:
private:
std::unique_ptr<QueryResult> query(const std::string& query, const std::string& encodedJoin);

std::unique_ptr<QueryResult> queryResultWithError(const std::string& errMsg);
Expand All @@ -168,7 +168,9 @@ class Connection {
KUZU_API void addScalarFunction(
std::string name, function::vector_function_definitions definitions);

protected:
void checkPreparedStatementAccessMode(PreparedStatement* preparedStatement);

private:
Database* database;
std::unique_ptr<ClientContext> clientContext;
std::mutex mtx;
Expand Down
12 changes: 9 additions & 3 deletions src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
namespace kuzu {
namespace main {

enum class AccessMode : uint8_t { READ_ONLY = 0, READ_WRITE = 1 };

/**
* @brief Stores runtime configuration for creating or opening a Database
*/
Expand All @@ -21,13 +23,15 @@ struct KUZU_API SystemConfig {
* reducing the amount of File I/O
* @param maxNumThreads The maximum number of threads to use during query execution
* @param enableCompression Whether or not to compress data on-disk for supported types
* @param accessMode Access mode to the database (READ_ONLY or READ_WRITE)
*/
explicit SystemConfig(
uint64_t bufferPoolSize = -1u, uint64_t maxNumThreads = 0, bool enableCompression = true);
explicit SystemConfig(uint64_t bufferPoolSize = -1u, uint64_t maxNumThreads = 0,
bool enableCompression = true, AccessMode accessMode = AccessMode::READ_WRITE);

uint64_t bufferPoolSize;
uint64_t maxNumThreads;
bool enableCompression;
AccessMode accessMode;
};

/**
Expand Down Expand Up @@ -66,7 +70,8 @@ class Database {
KUZU_API static void setLoggingLevel(std::string loggingLevel);

private:
void initDBDirAndCoreFilesIfNecessary() const;
void openLockFile();
void initDBDirAndCoreFilesIfNecessary();
static void initLoggers();
static void dropLoggers();

Expand All @@ -91,6 +96,7 @@ class Database {
std::unique_ptr<transaction::TransactionManager> transactionManager;
std::unique_ptr<storage::WAL> wal;
std::shared_ptr<spdlog::logger> logger;
std::unique_ptr<common::FileInfo> lockFile;
};

} // namespace main
Expand Down
1 change: 1 addition & 0 deletions src/include/main/kuzu_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Catalog;
namespace common {
enum class StatementType : uint8_t;
class Value;
struct FileInfo;
} // namespace common

namespace storage {
Expand Down
4 changes: 4 additions & 0 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ class StorageUtils {
common::StorageConstants::CATALOG_FILE_NAME_FOR_WAL);
}

static inline std::string getLockFilePath(const std::string& directory) {
return common::FileUtils::joinPath(directory, common::StorageConstants::LOCK_FILE_NAME);
}

// Note: This is a relatively slow function because of division and mod and making std::pair.
// It is not meant to be used in performance critical code path.
static inline std::pair<uint64_t, uint64_t> getQuotientRemainder(uint64_t i, uint64_t divisor) {
Expand Down
9 changes: 9 additions & 0 deletions src/main/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ void Connection::bindParametersNoLock(PreparedStatement* preparedStatement,

std::unique_ptr<QueryResult> Connection::executeAndAutoCommitIfNecessaryNoLock(
PreparedStatement* preparedStatement, uint32_t planIdx) {
checkPreparedStatementAccessMode(preparedStatement);
clientContext->resetActiveQuery();
clientContext->startTimingIfEnabled();
auto mapper = PlanMapper(
Expand Down Expand Up @@ -258,5 +259,13 @@ void Connection::addScalarFunction(
database->catalog->addVectorFunction(name, std::move(definitions));
}

void Connection::checkPreparedStatementAccessMode(PreparedStatement* preparedStatement) {
bool isInReadOnlyMode = database->systemConfig.accessMode == AccessMode::READ_ONLY;
if (isInReadOnlyMode && !preparedStatement->isReadOnly()) {
throw ConnectionException(
"Cannot execute write operations in a read-only access mode database!");
}
}

} // namespace main
} // namespace kuzu
35 changes: 32 additions & 3 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <unistd.h>
#endif

#include "common/exception/exception.h"
#include "common/file_utils.h"
#include "common/logging_level_utils.h"
#include "processor/processor.h"
#include "spdlog/spdlog.h"
Expand All @@ -21,8 +23,9 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace main {

SystemConfig::SystemConfig(uint64_t bufferPoolSize_, uint64_t maxNumThreads, bool enableCompression)
: maxNumThreads{maxNumThreads}, enableCompression{enableCompression} {
SystemConfig::SystemConfig(
uint64_t bufferPoolSize_, uint64_t maxNumThreads, bool enableCompression, AccessMode accessMode)
: maxNumThreads{maxNumThreads}, enableCompression{enableCompression}, accessMode{accessMode} {
if (bufferPoolSize_ == -1u || bufferPoolSize_ == 0) {
#if defined(_WIN32)
MEMORYSTATUSEX status;
Expand All @@ -42,6 +45,16 @@ SystemConfig::SystemConfig(uint64_t bufferPoolSize_, uint64_t maxNumThreads, boo
}
}

static void getLockFileFlagsAndType(
AccessMode accessMode, bool createNew, int& flags, FileLockType& lock) {
flags = accessMode == AccessMode::READ_ONLY ? O_RDONLY : O_RDWR;
if (createNew) {
assert(flags == O_RDWR);
flags |= O_CREAT;
}
lock = accessMode == AccessMode::READ_ONLY ? FileLockType::READ_LOCK : FileLockType::WRITE_LOCK;
}

Database::Database(std::string databasePath) : Database{std::move(databasePath), SystemConfig()} {}

Database::Database(std::string databasePath, SystemConfig systemConfig)
Expand Down Expand Up @@ -70,10 +83,26 @@ void Database::setLoggingLevel(std::string loggingLevel) {
spdlog::set_level(LoggingLevelUtils::convertStrToLevelEnum(std::move(loggingLevel)));
}

void Database::initDBDirAndCoreFilesIfNecessary() const {
void Database::openLockFile() {
int flags;
FileLockType lock;
auto lockFilePath = StorageUtils::getLockFilePath(databasePath);
if (!FileUtils::fileOrPathExists(lockFilePath)) {
getLockFileFlagsAndType(systemConfig.accessMode, true, flags, lock);
} else {
getLockFileFlagsAndType(systemConfig.accessMode, false, flags, lock);
}
lockFile = FileUtils::openFile(lockFilePath, flags, lock);
}

void Database::initDBDirAndCoreFilesIfNecessary() {
if (!FileUtils::fileOrPathExists(databasePath)) {
if (systemConfig.accessMode == AccessMode::READ_ONLY) {
throw Exception("Cannot create an empty database under READ ONLY mode.");
}
FileUtils::createDir(databasePath);
}
openLockFile();
if (!FileUtils::fileOrPathExists(StorageUtils::getNodesStatisticsAndDeletedIDsFilePath(
databasePath, DBFileType::ORIGINAL))) {
NodesStoreStatsAndDeletedIDs::saveInitialNodesStatisticsAndDeletedIDsToFile(databasePath);
Expand Down
32 changes: 23 additions & 9 deletions test/main/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,26 @@
add_kuzu_api_test(main_test
arrow_test.cpp
config_test.cpp
connection_test.cpp
csv_output_test.cpp
prepare_test.cpp
result_value_test.cpp
storage_driver_test.cpp
udf_test.cpp)
if(MSVC)
add_kuzu_api_test(main_test
arrow_test.cpp
config_test.cpp
connection_test.cpp
csv_output_test.cpp
prepare_test.cpp
result_value_test.cpp
storage_driver_test.cpp
udf_test.cpp)
else()
add_kuzu_api_test(main_test
access_mode_test.cpp
arrow_test.cpp
config_test.cpp
connection_test.cpp
csv_output_test.cpp
db_locking_test.cpp
prepare_test.cpp
result_value_test.cpp
storage_driver_test.cpp
udf_test.cpp)
endif()

# Also tested for coverage in connection_test.cpp
# but full testing requires some private APIs
Expand Down
25 changes: 25 additions & 0 deletions test/main/access_mode_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#include "main_test_helper/main_test_helper.h"

using namespace kuzu::common;
using namespace kuzu::testing;
using namespace kuzu::main;

class AccessModeTest : public ApiTest {};

TEST_F(AccessModeTest, testReadWrite) {
systemConfig->accessMode = AccessMode::READ_WRITE;
auto db = std::make_unique<Database>(databasePath, *systemConfig);
auto con = std::make_unique<Connection>(db.get());
ASSERT_TRUE(con->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name))")
->isSuccess());
ASSERT_TRUE(con->query("CREATE (:Person {name: 'Alice', age: 25})")->isSuccess());
ASSERT_TRUE(con->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess());
db.reset();
systemConfig->accessMode = AccessMode::READ_ONLY;
std::unique_ptr<Database> db2;
std::unique_ptr<Connection> con2;
EXPECT_NO_THROW(db2 = std::make_unique<Database>(databasePath, *systemConfig));
EXPECT_NO_THROW(con2 = std::make_unique<Connection>(db2.get()));
EXPECT_ANY_THROW(con2->query("DROP TABLE Person"));
EXPECT_NO_THROW(con2->query("MATCH (:Person) RETURN COUNT(*)"));
}
Loading

0 comments on commit 392d4a9

Please sign in to comment.