From 9e42ebbb67f2b9ade7d118ef480b4fef8cfecdea Mon Sep 17 00:00:00 2001 From: Marin Tomic Date: Mon, 15 Jan 2018 11:16:19 +0100 Subject: [PATCH] Implement simple log file storage for raft Summary: Added wrappers for some Unix system calls in utils/filesystem.hpp and implemented a simple log storage interface for Raft. It is not very efficient, we will need something more sophisticated later, but this is good enough for testing. Reviewers: mferencevic, mislav.bradac, buda, mculinovic Reviewed By: mferencevic Subscribers: teon.banek, dgleich, pullbot Differential Revision: https://phabricator.memgraph.io/D1091 --- CMakeLists.txt | 2 +- init | 1 + src/CMakeLists.txt | 5 +- src/communication/raft/storage/file.hpp | 252 ++++++++++++++++++++++ src/communication/raft/storage/memory.hpp | 63 ++++++ src/communication/raft/test_utils.hpp | 38 ---- src/utils/filesystem.cpp | 132 ++++++++++++ src/utils/filesystem.hpp | 169 +++++++++++++++ tests/manual/raft_rpc.cpp | 5 +- tests/unit/raft.cpp | 32 +-- tests/unit/raft_storage.cpp | 72 +++++++ 11 files changed, 713 insertions(+), 58 deletions(-) create mode 100644 src/communication/raft/storage/file.hpp create mode 100644 src/communication/raft/storage/memory.hpp create mode 100644 src/utils/filesystem.cpp create mode 100644 src/utils/filesystem.hpp create mode 100644 tests/unit/raft_storage.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 42a4fce86..a8e2c91c5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -129,7 +129,7 @@ if (USE_READLINE) endif() set(Boost_USE_STATIC_LIBS ON) -find_package(Boost 1.62 REQUIRED COMPONENTS serialization) +find_package(Boost 1.62 REQUIRED COMPONENTS iostreams serialization) set(libs_dir ${CMAKE_SOURCE_DIR}/libs) add_subdirectory(libs EXCLUDE_FROM_ALL) diff --git a/init b/init index 23a31a1f0..4b0961580 100755 --- a/init +++ b/init @@ -8,6 +8,7 @@ required_pkgs=(git arcanist # source code control curl wget # for downloading libs uuid-dev default-jre-headless # required by antlr libreadline-dev # for memgraph console + libboost-iostreams-dev libboost-serialization-dev python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests ) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0e100dcc9..3da8331f5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -47,6 +47,7 @@ set(memgraph_src_files transactions/engine_master.cpp transactions/engine_single_node.cpp transactions/engine_worker.cpp + utils/filesystem.cpp utils/network.cpp utils/watchdog.cpp ) @@ -56,7 +57,9 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type) # memgraph_lib depend on these libraries set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools - antlr_opencypher_parser_lib dl glog gflags ${Boost_SERIALIZATION_LIBRARY_RELEASE}) + antlr_opencypher_parser_lib dl glog gflags + ${Boost_IOSTREAMS_LIBRARY_RELEASE} + ${Boost_SERIALIZATION_LIBRARY_RELEASE}) if (USE_LTALLOC) list(APPEND MEMGRAPH_ALL_LIBS ltalloc) diff --git a/src/communication/raft/storage/file.hpp b/src/communication/raft/storage/file.hpp new file mode 100644 index 000000000..774621bd8 --- /dev/null +++ b/src/communication/raft/storage/file.hpp @@ -0,0 +1,252 @@ +/** + * @file + * + * Raft log is stored inside a folder. Each log entry is stored in a file named + * by its index. There is a special file named "metadata" which stores Raft + * metadata and also the last log index, which is used on startup to identify + * which log entry files are valid. + */ +#pragma once + +#include + +#include "boost/archive/binary_iarchive.hpp" +#include "boost/archive/binary_oarchive.hpp" +#include "boost/iostreams/device/file_descriptor.hpp" +#include "boost/iostreams/stream.hpp" + +#include "communication/raft/raft.hpp" +#include "communication/raft/storage/memory.hpp" +#include "utils/filesystem.hpp" + +namespace communication::raft { + +struct SimpleFileStorageMetadata { + TermId term; + std::experimental::optional voted_for; + LogIndex last_log_index; + + template + void serialize(TArchive &ar, unsigned int) { + ar &term &voted_for &last_log_index; + } +}; + +template +class SimpleFileStorage : public RaftStorageInterface { + public: + explicit SimpleFileStorage(const fs::path &parent_dir) : memory_storage_() { + try { + dir_ = utils::OpenDir(parent_dir); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Error opening log directory: {}", e.what()); + } + + auto md = utils::TryOpenFile(dir_, "metadata", O_RDONLY); + if (md.Empty()) { + LOG(WARNING) << fmt::format("No metadata file found in directory '{}'", + parent_dir); + return; + } + + boost::iostreams::file_descriptor_source src( + md.Handle(), + boost::iostreams::file_descriptor_flags::never_close_handle); + boost::iostreams::stream is(src); + boost::archive::binary_iarchive iar(is); + + SimpleFileStorageMetadata metadata; + + try { + iar >> metadata; + } catch (boost::archive::archive_exception &e) { + LOG(FATAL) << "Failed to deserialize Raft metadata: " << e.what(); + } + + LOG(INFO) << fmt::format( + "Read term = {} and voted_for = {} from storage", metadata.term, + metadata.voted_for ? *metadata.voted_for : "(none)"); + + memory_storage_.term_ = metadata.term; + memory_storage_.voted_for_ = metadata.voted_for; + memory_storage_.log_.reserve(metadata.last_log_index); + + for (LogIndex idx = 1; idx <= metadata.last_log_index; ++idx) { + utils::File entry_file; + + try { + entry_file = utils::OpenFile(dir_, fmt::format("{}", idx), O_RDONLY); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to open entry file {}: {}", idx, + e.what()); + } + + boost::iostreams::file_descriptor_source src( + entry_file.Handle(), + boost::iostreams::file_descriptor_flags::never_close_handle); + boost::iostreams::stream is( + src); + boost::archive::binary_iarchive iar(is); + LogEntry entry; + + try { + iar >> entry; + memory_storage_.log_.emplace_back(std::move(entry)); + } catch (boost::archive::archive_exception &e) { + LOG(FATAL) << fmt::format("Failed to deserialize log entry {}: {}", idx, + e.what()); + } + + try { + utils::Close(entry_file); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to close entry file {}: {}", idx, + e.what()); + } + } + + LOG(INFO) << fmt::format("Read {} log entries", metadata.last_log_index); + + utils::Close(md); + } + + ~SimpleFileStorage() { utils::Close(dir_); } + + void WriteTermAndVotedFor( + TermId term, + const std::experimental::optional &voted_for) override { + memory_storage_.WriteTermAndVotedFor(term, voted_for); + WriteMetadata(); + + // Metadata file might be newly created so we have to fsync the directory. + try { + utils::Fsync(dir_); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}", + e.what()); + } + } + + std::pair> GetTermAndVotedFor() + override { + return memory_storage_.GetTermAndVotedFor(); + } + + void AppendLogEntry(const LogEntry &entry) override { + memory_storage_.AppendLogEntry(entry); + + utils::File entry_file; + + try { + entry_file = utils::OpenFile( + dir_, fmt::format("{}", memory_storage_.GetLastLogIndex()), + O_WRONLY | O_CREAT | O_TRUNC, 0644); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to open log entry file: {}", e.what()); + } + + boost::iostreams::file_descriptor_sink sink( + entry_file.Handle(), + boost::iostreams::file_descriptor_flags::never_close_handle); + boost::iostreams::stream os(sink); + boost::archive::binary_oarchive oar(os); + + try { + oar << entry; + os.flush(); + } catch (boost::archive::archive_exception &e) { + LOG(FATAL) << fmt::format("Failed to serialize log entry: {}", e.what()); + } + + try { + utils::Fsync(entry_file); + utils::Close(entry_file); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to write log entry file to disk: {}", + e.what()); + } + + // We update the metadata only after the log entry file is written to + // disk. This ensures that no file in range [1, last_log_index] is + // corrupted. + WriteMetadata(); + + try { + utils::Fsync(dir_); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}", + e.what()); + } + } + + TermId GetLogTerm(const LogIndex index) override { + return memory_storage_.GetLogTerm(index); + } + + LogEntry GetLogEntry(const LogIndex index) override { + return memory_storage_.GetLogEntry(index); + } + + std::vector> GetLogSuffix(const LogIndex index) override { + return memory_storage_.GetLogSuffix(index); + } + + LogIndex GetLastLogIndex() override { + return memory_storage_.GetLastLogIndex(); + } + + void TruncateLogSuffix(const LogIndex index) override { + return memory_storage_.TruncateLogSuffix(index); + } + + private: + InMemoryStorage memory_storage_; + utils::File dir_; + + void WriteMetadata() { + // We first write data to a temporary file, ensure data is safely written + // to disk, and then rename the file. Since rename is an atomic operation, + // "metadata" file won't get corrupted in case of program crash. + utils::File md_tmp; + try { + md_tmp = + OpenFile(dir_, "metadata.new", O_WRONLY | O_CREAT | O_TRUNC, 0644); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to open temporary metadata file: {}", + e.what()); + } + + boost::iostreams::file_descriptor_sink sink( + md_tmp.Handle(), + boost::iostreams::file_descriptor_flags::never_close_handle); + boost::iostreams::stream os(sink); + boost::archive::binary_oarchive oar(os); + + try { + oar << SimpleFileStorageMetadata{ + memory_storage_.GetTermAndVotedFor().first, + memory_storage_.GetTermAndVotedFor().second, + memory_storage_.GetLastLogIndex()}; + } catch (boost::archive::archive_exception &e) { + LOG(FATAL) << "Error serializing Raft metadata"; + } + os.flush(); + + try { + utils::Fsync(md_tmp); + utils::Close(md_tmp); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format( + "Failed to write temporary metadata file to disk: {}", e.what()); + } + + try { + utils::Rename(dir_, "metadata.new", dir_, "metadata"); + } catch (std::system_error &e) { + LOG(FATAL) << fmt::format("Failed to move temporary metadata file: {}", + e.what()); + } + } +}; + +} // namespace communication::raft diff --git a/src/communication/raft/storage/memory.hpp b/src/communication/raft/storage/memory.hpp new file mode 100644 index 000000000..e280a29e9 --- /dev/null +++ b/src/communication/raft/storage/memory.hpp @@ -0,0 +1,63 @@ +#pragma once + +#include "communication/raft/raft.hpp" + +namespace communication::raft { + +template +class InMemoryStorage : public RaftStorageInterface { + public: + InMemoryStorage() + : term_(0), voted_for_(std::experimental::nullopt), log_() {} + + InMemoryStorage(const TermId term, + const std::experimental::optional &voted_for, + const std::vector> log) + : term_(term), voted_for_(voted_for), log_(log) {} + + void WriteTermAndVotedFor( + const TermId term, + const std::experimental::optional &voted_for) { + term_ = term; + voted_for_ = voted_for; + } + + std::pair> + GetTermAndVotedFor() { + return {term_, voted_for_}; + } + + void AppendLogEntry(const LogEntry &entry) { log_.push_back(entry); } + + TermId GetLogTerm(const LogIndex index) { + CHECK(0 <= index && index <= log_.size()) + << "Trying to read nonexistent log entry"; + return index > 0 ? log_[index - 1].term : 0; + } + + LogEntry GetLogEntry(const LogIndex index) { + CHECK(1 <= index && index <= log_.size()) + << "Trying to get nonexistent log entry"; + return log_[index - 1]; + } + + std::vector> GetLogSuffix(const LogIndex index) { + CHECK(1 <= index && index <= log_.size()) + << "Trying to get nonexistent log entries"; + return std::vector>(log_.begin() + index - 1, log_.end()); + } + + LogIndex GetLastLogIndex(void) { return log_.size(); } + + void TruncateLogSuffix(const LogIndex index) { + CHECK(1 <= index <= log_.size()) + << "Trying to remove nonexistent log entries"; + log_.erase(log_.begin() + index - 1, log_.end()); + } + + TermId term_; + std::experimental::optional voted_for_; + std::vector> log_; +}; + +} // namespace communication::raft diff --git a/src/communication/raft/test_utils.hpp b/src/communication/raft/test_utils.hpp index dea7676b0..87a9b957c 100644 --- a/src/communication/raft/test_utils.hpp +++ b/src/communication/raft/test_utils.hpp @@ -142,42 +142,4 @@ class NoOpStorageInterface : public RaftStorageInterface { std::vector> log_; }; -template -class InMemoryStorageInterface : public RaftStorageInterface { - public: - InMemoryStorageInterface( - const TermId term, - const std::experimental::optional &voted_for, - const std::vector> log) - : term_(term), voted_for_(voted_for), log_(log) {} - - void WriteTermAndVotedFor( - const TermId term, - const std::experimental::optional &voted_for) { - term_ = term; - voted_for_ = voted_for; - } - - std::pair> - GetTermAndVotedFor() { - return {term_, voted_for_}; - } - void AppendLogEntry(const LogEntry &entry) { log_.push_back(entry); } - TermId GetLogTerm(const LogIndex index) { - return index > 0 ? log_[index - 1].term : 0; - } - LogEntry GetLogEntry(const LogIndex index) { return log_[index - 1]; } - std::vector> GetLogSuffix(const LogIndex index) { - return std::vector>(log_.begin() + index - 1, log_.end()); - } - LogIndex GetLastLogIndex(void) { return log_.size(); } - void TruncateLogSuffix(const LogIndex index) { - log_.erase(log_.begin() + index - 1, log_.end()); - } - - TermId term_; - std::experimental::optional voted_for_; - std::vector> log_; -}; - } // namespace communication::raft::test_utils diff --git a/src/utils/filesystem.cpp b/src/utils/filesystem.cpp new file mode 100644 index 000000000..92bd9405b --- /dev/null +++ b/src/utils/filesystem.cpp @@ -0,0 +1,132 @@ +#include "filesystem.hpp" + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "fmt/ostream.h" +#include "glog/logging.h" + +namespace utils { + +File::File() : fd_(-1), path_() {} + +File::File(int fd, fs::path path) : fd_(fd), path_(std::move(path)) {} + +File::~File() { + CHECK(fd_ == -1) << fmt::format( + "Underlying file descriptor should be released or closed " + "before destructing (fd = {}, path = {})", + fd_, path_); +} + +File::File(File &&rhs) : fd_(rhs.fd_), path_(rhs.path_) { + LOG(INFO) << "Move constructor"; + rhs.Release(); +} + +File &File::operator=(File &&rhs) { + if (this != &rhs) { + fd_ = rhs.fd_; + path_ = rhs.path_; + rhs.Release(); + } + return *this; +} + +fs::path File::Path() const { return path_; } +int File::Handle() const { return fd_; } +bool File::Empty() const { return fd_ == -1; } + +void File::Release() { + fd_ = -1; + path_ = fs::path(); +} + +File OpenFile(const fs::path &path, int flags, ::mode_t mode) { + int fd = ::open(path.c_str(), flags, mode); + if (fd == -1) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot open {}", path)); + } + + return File(fd, path); +} + +File TryOpenFile(const fs::path &path, int flags, ::mode_t mode) { + int fd = ::open(path.c_str(), flags, mode); + return fd == -1 ? File() : File(fd, path); +} + +File OpenFile(const File &dir, const fs::path &path, int flags, ::mode_t mode) { + int fd = ::openat(dir.Handle(), path.c_str(), flags, mode); + if (fd == -1) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot open {}", dir.Path() / path)); + } + return File(fd, dir.Path() / path); +} + +File TryOpenFile(const File &dir, const fs::path &path, int flags, + ::mode_t mode) { + int fd = ::openat(dir.Handle(), path.c_str(), flags, mode); + return fd == -1 ? File() : File(fd, dir.Path() / path); +} + +void Close(File &file) { + if (::close(file.Handle()) == -1) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot close {}", file.Path())); + } + file.Release(); +} + +void Fsync(const File &file) { + if (::fsync(file.Handle()) == -1) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot fsync {}", file.Path())); + } +} + +void Rename(const File &dir1, const fs::path &path1, const File &dir2, + const fs::path &path2) { + if (::renameat(dir1.Handle(), path1.c_str(), dir2.Handle(), path2.c_str()) != + 0) { + throw std::system_error( + errno, std::generic_category(), + fmt::format("cannot move {} to {}", dir1.Path() / path1, + dir2.Path() / path2)); + } +} + +void SyncDir(const fs::path &path) { + File dir = OpenFile(path, O_DIRECTORY | O_RDONLY); + Fsync(dir); + Close(dir); +} + +File OpenDir(const fs::path &path) { + int res = ::mkdir(path.c_str(), 0777); + if (res == 0) { + if (path.has_parent_path()) { + SyncDir(path.parent_path()); + } + } else { + if (errno != EEXIST) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot create directory {}", path)); + } + } + + int fd = ::open(path.c_str(), O_RDONLY | O_DIRECTORY); + if (fd == -1) { + throw std::system_error(errno, std::generic_category(), + fmt::format("cannot open directory {}", path)); + } + + return File(fd, path); +} + +} // namespace utils diff --git a/src/utils/filesystem.hpp b/src/utils/filesystem.hpp new file mode 100644 index 000000000..19bf12d2c --- /dev/null +++ b/src/utils/filesystem.hpp @@ -0,0 +1,169 @@ +/** + * @file + * @brief This file contains C++ wrappers around some C system calls. + */ +#pragma once + +#include + +namespace fs = std::experimental::filesystem; + +namespace utils { + +/** + * @brief Thin wrapper around system file handle. + */ +class File { + public: + /** + * Constructs an empty file handle. + */ + File(); + + /** + * Wraps the given system file handle. + * + * This class doesn't take ownership of the file handle, it won't close it + * on destruction. + * + * @param fd System file handle. + * @param path Pathname naming the file, used only for error handling. + */ + File(int fd, fs::path path); + + File(File &&); + File &operator=(File &&); + + File(const File &) = delete; + File &operator=(const File &) = delete; + + /*** + * Destructor -- crashes the program if the underlying file descriptor is not + * released or closed. + */ + ~File(); + + /** + * Gets the path to the underlying file. + */ + fs::path Path() const; + + /** + * Gets the underlying file handle. + */ + int Handle() const; + + /** + * Checks if there's an underlying file handle. + */ + bool Empty() const; + + /** + * Releases the underlying file handle. + * + * File descriptor will be empty after the call returns. + */ + void Release(); + + private: + int fd_; + fs::path path_; +}; + +/** + * Opens a file descriptor. + * Wrapper around `open` system call (see `man 2 open`). + * + * @param path Pathname naming the file. + * @param flags Opening flags. + * @param mode File mode bits to apply for creation of new file. + * @return File descriptor referring to the opened file. + * + * @throws std::system_error + */ +File OpenFile(const fs::path &path, int flags, ::mode_t mode = 0666); + +/** + * Same as OpenFile, but returns an empty File object instead of + * throwing an exception. + */ +File TryOpenFile(const fs::path &path, int flags, ::mode_t mode = 0666); + +/** + * Opens a file descriptor for a file inside a directory. + * Wrapper around `openat` system call (see `man 2 openat`). + * + * @param dir Directory file descriptor. + * @param path Pathname naming the file. + * @param flags Opening flags. + * @param mode File mode bits to apply for creation of new file. + * @return File descriptor referring to the opened file. + * + * @throws std::system_error + */ +File OpenFile(const File &dir, const fs::path &path, int flags, + ::mode_t mode = 0666); + +/** + * Same as OpenFile, but returns an empty File object instead of + * throwing an exception. + */ +File TryOpenFile(const File &dir, const fs::path &path, int flags, + ::mode_t mode = 0666); + +/** + * Closes a file descriptor. + * Wrapper around `close` system call (see `man 2 close`). File descriptor will + * be empty after the call returns. + * + * @param file File descriptor to be closed. + * + * @throws std::system_error + */ +void Close(File &file); + +/** + * Synchronizes file with the underlying storage device. + * Wrapper around `fsync` system call (see `man 2 fsync`). + * + * @param file File descriptor referring to the file to be synchronized. + * + * @throws std::system_error + */ +void Fsync(const File &file); + +/** + * Moves a file from one directory to another. + * + * Wrapper around `renameat` system call (see `man 2 renameat`). + * + * @param dir1 Source directory. + * @param path1 Pathname naming the source file. + * @param dir2 Destination directory. + * @param path2 Pathname naming the destination file. + * + * @throws std::system_error + */ +void Rename(const File &dir1, const fs::path &path1, const File &dir2, + const fs::path &path2); + +/** + * Synchronizes directory with the underlying storage device. + * + * @param path Pathname naming the directory. + * + * @throws std::system_error + */ +void SyncDir(const fs::path &path); + +/** + * Opens a directory, creating it if it doesn't exist. + * + * @param path Pathname naming the directory. + * @return File descriptor referring to the opened directory. + * + * @throws std::system_error + */ +File OpenDir(const fs::path &path); + +} // namespace utils diff --git a/tests/manual/raft_rpc.cpp b/tests/manual/raft_rpc.cpp index 730684d64..22667a4af 100644 --- a/tests/manual/raft_rpc.cpp +++ b/tests/manual/raft_rpc.cpp @@ -8,15 +8,16 @@ #include "communication/messaging/distributed.hpp" #include "communication/raft/rpc.hpp" +#include "communication/raft/storage/memory.hpp" #include "communication/raft/test_utils.hpp" namespace raft = communication::raft; using io::network::Endpoint; +using raft::InMemoryStorage; using raft::RaftConfig; using raft::RpcNetwork; using raft::test_utils::DummyState; -using raft::test_utils::InMemoryStorageInterface; DEFINE_string(member_id, "", "id of RaftMember"); @@ -42,7 +43,7 @@ int main(int argc, char *argv[]) { communication::messaging::System my_system(directory[FLAGS_member_id]); RpcNetwork network(my_system, directory); - raft::test_utils::InMemoryStorageInterface storage(0, {}, {}); + raft::InMemoryStorage storage(0, {}, {}); raft::RaftConfig config{{"a", "b", "c"}, 150ms, 300ms, 70ms, 60ms, 30ms}; diff --git a/tests/unit/raft.cpp b/tests/unit/raft.cpp index 30b206e5b..c43104594 100644 --- a/tests/unit/raft.cpp +++ b/tests/unit/raft.cpp @@ -5,6 +5,7 @@ #include #include "communication/raft/raft.hpp" +#include "communication/raft/storage/memory.hpp" #include "communication/raft/test_utils.hpp" using namespace std::chrono_literals; @@ -33,7 +34,7 @@ class RaftMemberImplTest : public ::testing::Test { } NoOpNetworkInterface network_; - InMemoryStorageInterface storage_; + InMemoryStorage storage_; RaftMemberImpl member; }; @@ -137,7 +138,7 @@ TEST_F(RaftMemberImplTest, AdvanceCommitIndex) { TEST(RequestVote, SimpleElection) { NextReplyNetworkInterface network; - InMemoryStorageInterface storage(1, {}, {{1}, {1}}); + InMemoryStorage storage(1, {}, {{1}, {1}}); RaftMemberImpl member(network, storage, "a", test_config5); member.StartNewElection(); @@ -187,7 +188,7 @@ TEST(RequestVote, SimpleElection) { TEST(AppendEntries, SimpleLogSync) { NextReplyNetworkInterface network; - InMemoryStorageInterface storage(3, {}, {{1}, {1}, {2}, {3}}); + InMemoryStorage storage(3, {}, {{1}, {1}, {2}, {3}}); RaftMemberImpl member(network, storage, "a", test_config2); member.mode_ = RaftMode::LEADER; @@ -319,18 +320,18 @@ class RaftMemberParamTest : public ::testing::TestWithParam { } } - RaftMemberParamTest(InMemoryStorageInterface storage, - InMemoryStorageInterface peer_storage) + RaftMemberParamTest(InMemoryStorage storage, + InMemoryStorage peer_storage) : network_(NoOpNetworkInterface()), storage_(storage), member_(network_, storage_, "a", test_config3), peer_storage_(peer_storage) {} NoOpNetworkInterface network_; - InMemoryStorageInterface storage_; + InMemoryStorage storage_; RaftMemberImpl member_; - InMemoryStorageInterface peer_storage_; + InMemoryStorage peer_storage_; }; struct OnRequestVoteTestParam { @@ -348,10 +349,10 @@ class OnRequestVoteTest : public RaftMemberParamTest { public: OnRequestVoteTest() : RaftMemberParamTest( - InMemoryStorageInterface( - GetParam().term, GetParam().voted_for, GetParam().log), - InMemoryStorageInterface(GetParam().peer_term, {}, - GetParam().peer_log)) {} + InMemoryStorage(GetParam().term, GetParam().voted_for, + GetParam().log), + InMemoryStorage(GetParam().peer_term, {}, + GetParam().peer_log)) {} virtual ~OnRequestVoteTest() {} }; @@ -469,10 +470,9 @@ class OnAppendEntriesTest public: OnAppendEntriesTest() : RaftMemberParamTest( - InMemoryStorageInterface(GetParam().term, {}, - GetParam().log), - InMemoryStorageInterface(GetParam().peer_term, {}, - GetParam().peer_log)) {} + InMemoryStorage(GetParam().term, {}, GetParam().log), + InMemoryStorage(GetParam().peer_term, {}, + GetParam().peer_log)) {} virtual ~OnAppendEntriesTest() {} }; @@ -643,7 +643,7 @@ TEST(RaftMemberTest, AddCommand) { network.next_reply_ = reply; }; - InMemoryStorageInterface storage(0, {}, {}); + InMemoryStorage storage(0, {}, {}); RaftMember member(network, storage, "a", test_config2); std::this_thread::sleep_for(500ms); diff --git a/tests/unit/raft_storage.cpp b/tests/unit/raft_storage.cpp new file mode 100644 index 000000000..b24099a8b --- /dev/null +++ b/tests/unit/raft_storage.cpp @@ -0,0 +1,72 @@ +#include + +#include "gtest/gtest.h" + +#include "communication/raft/storage/file.hpp" +#include "communication/raft/test_utils.hpp" +#include "utils/filesystem.hpp" + +using communication::raft::LogEntry; +using communication::raft::SimpleFileStorage; +using communication::raft::test_utils::IntState; + +TEST(SimpleFileStorageTest, All) { + typedef LogEntry Log; + auto GetLog = [](int term, int d) { + return Log{term, IntState::Change{IntState::Change::Type::SET, d}}; + }; + + { + SimpleFileStorage storage(fs::path("raft_storage_test_dir")); + EXPECT_EQ(storage.GetTermAndVotedFor().first, 0); + EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt); + EXPECT_EQ(storage.GetLastLogIndex(), 0); + + storage.WriteTermAndVotedFor(1, "a"); + EXPECT_EQ(storage.GetTermAndVotedFor().first, 1); + EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a"); + + storage.AppendLogEntry(GetLog(1, 1)); + storage.AppendLogEntry(GetLog(1, 2)); + + EXPECT_EQ(storage.GetLastLogIndex(), 2); + + EXPECT_EQ(storage.GetLogSuffix(1), + std::vector({GetLog(1, 1), GetLog(1, 2)})); + } + + { + SimpleFileStorage storage(fs::path("raft_storage_test_dir")); + + EXPECT_EQ(storage.GetTermAndVotedFor().first, 1); + EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a"); + EXPECT_EQ(storage.GetLastLogIndex(), 2); + EXPECT_EQ(storage.GetLogSuffix(1), + std::vector({GetLog(1, 1), GetLog(1, 2)})); + + storage.TruncateLogSuffix(2); + EXPECT_EQ(storage.GetLogSuffix(1), std::vector({GetLog(1, 1)})); + + storage.WriteTermAndVotedFor(2, std::experimental::nullopt); + storage.AppendLogEntry(GetLog(2, 3)); + + EXPECT_EQ(storage.GetTermAndVotedFor().first, 2); + EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt); + EXPECT_EQ(storage.GetLogSuffix(1), + std::vector({GetLog(1, 1), GetLog(2, 3)})); + } + + { + SimpleFileStorage storage(fs::path("raft_storage_test_dir")); + + EXPECT_EQ(storage.GetTermAndVotedFor().first, 2); + EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt); + EXPECT_EQ(storage.GetLogSuffix(1), + std::vector({GetLog(1, 1), GetLog(2, 3)})); + } + + fs::remove("raft_storage_test_dir/metadata"); + fs::remove("raft_storage_test_dir/1"); + fs::remove("raft_storage_test_dir/2"); + fs::remove("raft_storage_test_dir"); +}