diff --git a/src/common/file_utils.cpp b/src/common/file_utils.cpp index 497631984de..eb6d8b3c804 100644 --- a/src/common/file_utils.cpp +++ b/src/common/file_utils.cpp @@ -5,6 +5,12 @@ #include "glob/glob.hpp" #ifdef _WIN32 +#include +#include +#include +#include +#include + #include #include #endif @@ -43,7 +49,8 @@ int64_t FileInfo::getFileSize() { #endif } -std::unique_ptr FileUtils::openFile(const std::string& path, int flags) { +std::unique_ptr FileUtils::openFile( + const std::string& path, int flags, FileLockType lock_type) { #if defined(_WIN32) auto dwDesiredAccess = 0ul; auto dwCreationDisposition = (flags & O_CREAT) ? OPEN_ALWAYS : OPEN_EXISTING; @@ -62,12 +69,37 @@ std::unique_ptr FileUtils::openFile(const std::string& path, int flags throw Exception(StringUtils::string_format("Cannot open file. path: {} - Error {}: {}", path, GetLastError(), std::system_category().message(GetLastError()))); } + // add lock + if (lock_type != FileLockType::NO_LOCK) { + DWORD dwFlags = + lock_type == FileLockType::READ_LOCK ? + LOCKFILE_FAIL_IMMEDIATELY : + LOCKFILE_FAIL_IMMEDIATELY | LOCKFILE_EXCLUSIVE_LOCK OVERLAPPED overlapped = {0}; + overlapped.Offset = 0; + bool rc = LockFileEx(hFile, dwFlags, 0, 1, 0, &overlapped); + if (!rc) { + throw Exception("Could not set lock on file : " + path); + } + } return std::make_unique(path, handle); #else int fd = open(path.c_str(), flags, 0644); if (fd == -1) { throw Exception("Cannot open file: " + path); } + // add lock + if (lock_type != FileLockType::NO_LOCK) { + struct flock fl; + memset(&fl, 0, sizeof fl); + fl.l_type = lock_type == FileLockType::READ_LOCK ? F_RDLCK : F_WRLCK; + fl.l_whence = SEEK_SET; + fl.l_start = 0; + fl.l_len = 0; + int rc = fcntl(fd, F_SETLK, &fl); + if (rc == -1) { + throw Exception("Could not set lock on file : " + path); + } + } return std::make_unique(path, fd); #endif } diff --git a/src/include/common/constants.h b/src/include/common/constants.h index fcd0d6a46c1..74d81b11680 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -94,6 +94,7 @@ struct StorageConstants { static constexpr char CATALOG_FILE_NAME_FOR_WAL[] = "catalog.kz.wal"; static constexpr char DATA_FILE_NAME[] = "data.kz"; static constexpr char METADATA_FILE_NAME[] = "metadata.kz"; + static constexpr char LOCK_FILE_NAME[] = ".lock"; // The number of pages that we add at one time when we need to grow a file. static constexpr uint64_t PAGE_GROUP_SIZE_LOG2 = 10; diff --git a/src/include/common/file_utils.h b/src/include/common/file_utils.h index 0b621c041e2..3fe1c047967 100644 --- a/src/include/common/file_utils.h +++ b/src/include/common/file_utils.h @@ -15,6 +15,8 @@ namespace kuzu { namespace common { +enum class FileLockType : uint8_t { NO_LOCK = 0, READ_LOCK = 1, WRITE_LOCK = 2 }; + struct FileInfo { #ifdef _WIN32 FileInfo(std::string path, const void* handle) : path{std::move(path)}, handle{handle} {} @@ -36,7 +38,8 @@ struct FileInfo { class FileUtils { public: - static std::unique_ptr openFile(const std::string& path, int flags); + static std::unique_ptr openFile( + const std::string& path, int flags, FileLockType lock_type = FileLockType::NO_LOCK); static void readFromFile( FileInfo* fileInfo, void* buffer, uint64_t numBytes, uint64_t position); diff --git a/src/include/main/database.h b/src/include/main/database.h index bb454509266..4599cd57d7c 100644 --- a/src/include/main/database.h +++ b/src/include/main/database.h @@ -5,11 +5,13 @@ #include "common/api.h" #include "common/constants.h" +#include "common/file_utils.h" #include "kuzu_fwd.h" - namespace kuzu { namespace main { +enum class AccessMode : uint8_t { READ_ONLY = 0, READ_WRITE = 1 }; + /** * @brief Stores buffer pool size and max number of threads configurations. */ @@ -26,6 +28,9 @@ KUZU_API struct SystemConfig { uint64_t bufferPoolSize; uint64_t maxNumThreads; + + //! Access mode of the database (AUTOMATIC, READ_ONLY or READ_WRITE) + AccessMode access_mode = AccessMode::READ_WRITE; }; /** @@ -63,6 +68,7 @@ class Database { static void setLoggingLevel(std::string loggingLevel); private: + void checkAccessMode(); void initDBDirAndCoreFilesIfNecessary() const; static void initLoggers(); static void dropLoggers(); @@ -88,6 +94,7 @@ class Database { std::unique_ptr transactionManager; std::unique_ptr wal; std::shared_ptr logger; + std::unique_ptr lockStatus; }; } // namespace main diff --git a/src/include/storage/storage_utils.h b/src/include/storage/storage_utils.h index c6d7add047c..cb152cc001b 100644 --- a/src/include/storage/storage_utils.h +++ b/src/include/storage/storage_utils.h @@ -277,6 +277,10 @@ class StorageUtils { common::StorageConstants::CATALOG_FILE_NAME_FOR_WAL); } + static inline std::string getLockFilePath(const std::string& directory) { + return common::FileUtils::joinPath(directory, common::StorageConstants::LOCK_FILE_NAME); + } + // Note: This is a relatively slow function because of division and mod and making std::pair. // It is not meant to be used in performance critical code path. static inline std::pair getQuotientRemainder(uint64_t i, uint64_t divisor) { diff --git a/src/main/database.cpp b/src/main/database.cpp index ade2328c25c..fcaf40ea2f5 100644 --- a/src/main/database.cpp +++ b/src/main/database.cpp @@ -6,6 +6,7 @@ #include #endif +#include "common/file_utils.h" #include "common/logging_level_utils.h" #include "processor/processor.h" #include "spdlog/spdlog.h" @@ -21,6 +22,20 @@ using namespace kuzu::transaction; namespace kuzu { namespace main { +void getFileFlags(SystemConfig systemConfig, int& flags, FileLockType& lock, bool create_new) { + if (systemConfig.access_mode == AccessMode::READ_ONLY) { + assert(!create_new); + flags = O_RDONLY; + lock = FileLockType::READ_LOCK; + } else { + flags = O_RDWR; + lock = FileLockType::WRITE_LOCK; + if (create_new) { + flags = O_WRONLY | O_CREAT; + } + } +} + SystemConfig::SystemConfig() : SystemConfig(-1u) {} SystemConfig::SystemConfig(uint64_t bufferPoolSize_) { @@ -50,6 +65,7 @@ Database::Database(std::string databasePath, SystemConfig systemConfig) bufferManager = std::make_unique(this->systemConfig.bufferPoolSize); memoryManager = std::make_unique(bufferManager.get()); queryProcessor = std::make_unique(this->systemConfig.maxNumThreads); + checkAccessMode(); initDBDirAndCoreFilesIfNecessary(); wal = std::make_unique(this->databasePath, *bufferManager); recoverIfNecessary(); @@ -68,6 +84,21 @@ void Database::setLoggingLevel(std::string loggingLevel) { spdlog::set_level(LoggingLevelUtils::convertStrToLevelEnum(std::move(loggingLevel))); } +void Database::checkAccessMode() { + if (!FileUtils::fileOrPathExists(databasePath)) { + FileUtils::createDir(databasePath); + } + int flags; + FileLockType lock; + auto lockFilePath = StorageUtils::getLockFilePath(databasePath); + if (!FileUtils::fileOrPathExists(lockFilePath)) { + getFileFlags(systemConfig, flags, lock, true); + } else { + getFileFlags(systemConfig, flags, lock, false); + } + lockStatus = FileUtils::openFile(lockFilePath, flags, lock); +} + void Database::initDBDirAndCoreFilesIfNecessary() const { if (!FileUtils::fileOrPathExists(databasePath)) { FileUtils::createDir(databasePath); diff --git a/test/runner/CMakeLists.txt b/test/runner/CMakeLists.txt index 4303e0032a0..e8e9a5c7463 100644 --- a/test/runner/CMakeLists.txt +++ b/test/runner/CMakeLists.txt @@ -2,3 +2,4 @@ add_kuzu_test(e2e_copy_csv_transaction_test e2e_copy_transaction_test.cpp) add_kuzu_test(e2e_ddl_test e2e_ddl_test.cpp) add_kuzu_test(e2e_test e2e_test.cpp) add_kuzu_test(cleanup_test cleanup_test.cpp) +add_kuzu_test(test_locking test_locking.cpp) diff --git a/test/runner/test_locking.cpp b/test/runner/test_locking.cpp new file mode 100644 index 00000000000..9364bb7057d --- /dev/null +++ b/test/runner/test_locking.cpp @@ -0,0 +1,111 @@ +#include +#include +#include + +#include "graph_test/graph_test.h" +#include "main/kuzu.h" + +#ifdef __MVS__ +#define MAP_ANONYMOUS 0x0 +#endif + +using namespace kuzu::testing; +using namespace kuzu::common; +using namespace kuzu::main; + +namespace kuzu { +namespace testing { + +class LockingTest : public EmptyDBTest { +public: + void SetUp() override { EmptyDBTest::SetUp(); } +}; + +TEST_F(LockingTest, testReadLock) { + uint64_t* count = (uint64_t*)mmap( + NULL, sizeof(uint64_t), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0); + *count = 0; + // create db first + pid_t create_pid = fork(); + if (create_pid == 0) { + createDBAndConn(); + ASSERT_TRUE( + conn->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));") + ->isSuccess()); + ASSERT_TRUE(conn->query("CREATE (:Person {name: 'Alice', age: 25});")->isSuccess()); + exit(0); + } + waitpid(create_pid, NULL, 0); + + // test read write db + pid_t pid = fork(); + if (pid == 0) { + std::unique_ptr db; + systemConfig->access_mode = AccessMode::READ_ONLY; + EXPECT_NO_THROW(createDBAndConn()); + (*count)++; + ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess()); + while (true) { + usleep(100); + ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess()); + } + } else if (pid > 0) { + std::unique_ptr db; + while (*count == 0) { + usleep(100); + } + systemConfig->access_mode = AccessMode::READ_WRITE; + // try to open db for writing, this should fail + EXPECT_ANY_THROW(createDBAndConn()); + // but opening db for reading should work + systemConfig->access_mode = AccessMode::READ_ONLY; + EXPECT_NO_THROW(createDBAndConn()); + ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess()); + // kill the child + if (kill(pid, SIGKILL) != 0) { + FAIL(); + } + } +} + +TEST_F(LockingTest, testWriteLock) { + uint64_t* count = (uint64_t*)mmap( + NULL, sizeof(uint64_t), PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_SHARED, 0, 0); + *count = 0; + // test write lock + // fork away a child + pid_t pid = fork(); + if (pid == 0) { + // child process + // open db for writing + systemConfig->access_mode = AccessMode::READ_WRITE; + EXPECT_NO_THROW(createDBAndConn()); + // opened db for writing + // insert some values + (*count)++; + ASSERT_TRUE( + conn->query("CREATE NODE TABLE Person(name STRING, age INT64, PRIMARY KEY(name));") + ->isSuccess()); + ASSERT_TRUE(conn->query("CREATE (:Person {name: 'Alice', age: 25});")->isSuccess()); + while (true) { + ASSERT_TRUE(conn->query("MATCH (:Person) RETURN COUNT(*)")->isSuccess()); + usleep(100); + } + } else if (pid > 0) { + // parent process + // sleep a bit to wait for child process + while (*count == 0) { + usleep(100); + } + // try to open db for writing, this should fail + systemConfig->access_mode = AccessMode::READ_WRITE; + EXPECT_ANY_THROW(createDBAndConn()); + // kill the child + if (kill(pid, SIGKILL) != 0) { + FAIL(); + } + } +} + +} // namespace testing +} // namespace kuzu \ No newline at end of file