Skip to content

Commit

Permalink
access_mode first version
Browse files Browse the repository at this point in the history
  • Loading branch information
hououou committed Sep 27, 2023
1 parent a21e52d commit 6ba2181
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 4 deletions.
32 changes: 30 additions & 2 deletions src/common/file_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
#ifdef _WIN32
#include <fileapi.h>
#include <windows.h>
#include <dirent.h>
#include <fcntl.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#endif

namespace kuzu {
Expand Down Expand Up @@ -43,8 +48,8 @@ int64_t FileInfo::getFileSize() {
#endif
}

std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags) {
#if defined(_WIN32)
std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags, FileLockType lock_type) {
#if defined(_WIN32)
auto dwDesiredAccess = 0ul;
auto dwCreationDisposition = (flags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING;
auto dwShareMode = FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE;
Expand All @@ -62,12 +67,35 @@ std::unique_ptr<FileInfo> FileUtils::openFile(const std::string& path, int flags
throw Exception(StringUtils::string_format("Cannot open file. path: {} - Error {}: {}",
path, GetLastError(), std::system_category().message(GetLastError())));
}
// add lock
if (lock_type != FileLockType::NO_LOCK){
DWORD dwFlags = lock_type == FileLockType::READ_LOCK ? LOCKFILE_FAIL_IMMEDIATELY : LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK
OVERLAPPED overlapped = {0};
overlapped.Offset = 0;
bool rc = LockFileEx(hFile, dwFlags, 0, 1, 0, &overlapped);
if (!rc) {
throw Exception("Could not set lock on file : "+path);
}
}
return std::make_unique<FileInfo>(path, handle);
#else
int fd = open(path.c_str(), flags, 0644);
if (fd == -1) {
throw Exception("Cannot open file: " + path);
}
//add lock
if (lock_type != FileLockType::NO_LOCK) {
struct flock fl;
memset(&fl, 0, sizeof fl);
fl.l_type = lock_type == FileLockType::READ_LOCK ? F_RDLCK : F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
int rc = fcntl(fd, F_SETLK, &fl);
if (rc == -1) {
throw Exception("Could not set lock on file : "+path);
}
}
return std::make_unique<FileInfo>(path, fd);
#endif
}
Expand Down
1 change: 1 addition & 0 deletions src/include/common/constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ struct StorageConstants {
static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal";
static constexpr char DATA_FILE_NAME[] = "data.kz";
static constexpr char METADATA_FILE_NAME[] = "metadata.kz";
static constexpr char LOCK_FILE_NAME[] = ".lock";

// The number of pages that we add at one time when we need to grow a file.
static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10;
Expand Down
4 changes: 3 additions & 1 deletion src/include/common/file_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
namespace kuzu {
namespace common {

enum class FileLockType : uint8_t { NO_LOCK = 0, READ_LOCK = 1, WRITE_LOCK = 2 };

struct FileInfo {
#ifdef _WIN32
FileInfo(std::string path, const void* handle) : path{std::move(path)}, handle{handle} {}
Expand All @@ -36,7 +38,7 @@ struct FileInfo {

class FileUtils {
public:
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags);
static std::unique_ptr<FileInfo> openFile(const std::string& path, int flags, FileLockType lock_type=FileLockType::NO_LOCK);

static void readFromFile(
FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position);
Expand Down
10 changes: 9 additions & 1 deletion src/include/main/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
#include "common/api.h"
#include "common/constants.h"
#include "kuzu_fwd.h"

#include "common/file_utils.h"
namespace kuzu {
namespace main {

enum class AccessMode : uint8_t { READ_ONLY = 0, READ_WRITE = 1};

/**
* @brief Stores buffer pool size and max number of threads configurations.
*/
Expand All @@ -26,6 +28,10 @@ KUZU_API struct SystemConfig {

uint64_t bufferPoolSize;
uint64_t maxNumThreads;

//! Access mode of the database (AUTOMATIC, READ_ONLY or READ_WRITE)
AccessMode access_mode = AccessMode::READ_WRITE;

};

/**
Expand Down Expand Up @@ -63,6 +69,7 @@ class Database {
static void setLoggingLevel(std::string loggingLevel);

private:
void checkAccessMode();
void initDBDirAndCoreFilesIfNecessary() const;
static void initLoggers();
static void dropLoggers();
Expand All @@ -88,6 +95,7 @@ class Database {
std::unique_ptr<transaction::TransactionManager> transactionManager;
std::unique_ptr<storage::WAL> wal;
std::shared_ptr<spdlog::logger> logger;
std::unique_ptr<common::FileInfo> lockStatus;
};

} // namespace main
Expand Down
6 changes: 6 additions & 0 deletions src/include/storage/storage_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ class StorageUtils {
common::StorageConstants::CATALOG_FILE_NAME_FOR_WAL);
}

static inline std::string getLockFilePath(
const std::string& directory) {
return common::FileUtils::joinPath(
directory, common::StorageConstants::LOCK_FILE_NAME);
}

// Note: This is a relatively slow function because of division and mod and making std::pair.
// It is not meant to be used in performance critical code path.
static inline std::pair<uint64_t, uint64_t> getQuotientRemainder(uint64_t i, uint64_t divisor) {
Expand Down
31 changes: 31 additions & 0 deletions src/main/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "storage/storage_manager.h"
#include "storage/wal_replayer.h"
#include "transaction/transaction_manager.h"
#include "common/file_utils.h"

using namespace kuzu::catalog;
using namespace kuzu::common;
Expand All @@ -21,6 +22,20 @@ using namespace kuzu::transaction;
namespace kuzu {
namespace main {

void getFileFlags(SystemConfig systemConfig,int &flags, FileLockType &lock, bool create_new) {
if (systemConfig.access_mode==AccessMode::READ_ONLY) {
assert(!create_new);
flags = O_RDONLY;
lock = FileLockType::READ_LOCK;
} else {
flags = O_RDWR;
lock = FileLockType::WRITE_LOCK;
if (create_new) {
flags = O_WRONLY | O_CREAT;
}
}
}

SystemConfig::SystemConfig() : SystemConfig(-1u) {}

SystemConfig::SystemConfig(uint64_t bufferPoolSize_) {
Expand Down Expand Up @@ -50,6 +65,7 @@ Database::Database(std::string databasePath, SystemConfig systemConfig)
bufferManager = std::make_unique<BufferManager>(this->systemConfig.bufferPoolSize);
memoryManager = std::make_unique<MemoryManager>(bufferManager.get());
queryProcessor = std::make_unique<processor::QueryProcessor>(this->systemConfig.maxNumThreads);
checkAccessMode();
initDBDirAndCoreFilesIfNecessary();
wal = std::make_unique<WAL>(this->databasePath, *bufferManager);
recoverIfNecessary();
Expand All @@ -68,6 +84,21 @@ void Database::setLoggingLevel(std::string loggingLevel) {
spdlog::set_level(LoggingLevelUtils::convertStrToLevelEnum(std::move(loggingLevel)));
}

void Database::checkAccessMode(){
if (!FileUtils::fileOrPathExists(databasePath)){
FileUtils::createDir(databasePath);
}
int flags;
FileLockType lock;
auto lockFilePath=StorageUtils::getLockFilePath(databasePath);
if (!FileUtils::fileOrPathExists(lockFilePath)) {
getFileFlags(systemConfig, flags, lock, true);
}else{
getFileFlags(systemConfig, flags, lock, false);
}
lockStatus=FileUtils::openFile(lockFilePath, flags, lock);
}

void Database::initDBDirAndCoreFilesIfNecessary() const {
if (!FileUtils::fileOrPathExists(databasePath)) {
FileUtils::createDir(databasePath);
Expand Down
1 change: 1 addition & 0 deletions test/runner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ add_kuzu_test(e2e_copy_csv_transaction_test e2e_copy_transaction_test.cpp)
add_kuzu_test(e2e_ddl_test e2e_ddl_test.cpp)
add_kuzu_test(e2e_test e2e_test.cpp)
add_kuzu_test(cleanup_test cleanup_test.cpp)
add_kuzu_test(test_locking test_locking.cpp)
111 changes: 111 additions & 0 deletions test/runner/test_locking.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#include <signal.h>
#include <sys/mman.h>
#include <unistd.h>

#include "graph_test/graph_test.h"
#include "main/kuzu.h"

#ifdef __MVS__
#define MAP_ANONYMOUS 0x0
#endif

using namespace kuzu::testing;
using namespace kuzu::common;
using namespace kuzu::main;

namespace kuzu {
namespace testing {

class LockingTest : public EmptyDBTest {
public:
void SetUp() override { EmptyDBTest::SetUp(); }
};

TEST_F(LockingTest, testReadLock) {
uint64_t* count = (uint64_t*)mmap(
NULL, sizeof(uint64_t), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0);
*count = 0;
// create db first
pid_t create_pid = fork();
if (create_pid == 0) {
createDBAndConn();
ASSERT_TRUE(
conn->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));")
->isSuccess());
ASSERT_TRUE(conn->query("CREATE (:Person {name: 'Alice', age: 25});")->isSuccess());
exit(0);
}
waitpid(create_pid, NULL, 0);

// test read write db
pid_t pid = fork();
if (pid == 0) {
std::unique_ptr<Database> db;
systemConfig->access_mode = AccessMode::READ_ONLY;
EXPECT_NO_THROW(createDBAndConn());
(*count)++;
ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess());
while (true) {
usleep(100);
ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess());
}
} else if (pid > 0) {
std::unique_ptr<Database> db;
while (*count == 0) {
usleep(100);
}
systemConfig->access_mode = AccessMode::READ_WRITE;
// try to open db for writing, this should fail
EXPECT_ANY_THROW(createDBAndConn());
// but opening db for reading should work
systemConfig->access_mode = AccessMode::READ_ONLY;
EXPECT_NO_THROW(createDBAndConn());
ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess());
// kill the child
if (kill(pid, SIGKILL) != 0) {
FAIL();
}
}
}

TEST_F(LockingTest, testWriteLock) {
uint64_t* count = (uint64_t*)mmap(
NULL, sizeof(uint64_t), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0);
*count = 0;
// test write lock
// fork away a child
pid_t pid = fork();
if (pid == 0) {
// child process
// open db for writing
systemConfig->access_mode = AccessMode::READ_WRITE;
EXPECT_NO_THROW(createDBAndConn());
// opened db for writing
// insert some values
(*count)++;
ASSERT_TRUE(
conn->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));")
->isSuccess());
ASSERT_TRUE(conn->query("CREATE (:Person {name: 'Alice', age: 25});")->isSuccess());
while (true) {
ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess());
usleep(100);
}
} else if (pid > 0) {
// parent process
// sleep a bit to wait for child process
while (*count == 0) {
usleep(100);
}
// try to open db for writing, this should fail
systemConfig->access_mode = AccessMode::READ_WRITE;
EXPECT_ANY_THROW(createDBAndConn());
// kill the child
if (kill(pid, SIGKILL) != 0) {
FAIL();
}
}
}

} // namespace testing
} // namespace kuzu

0 comments on commit 6ba2181

Please sign in to comment.