Implement simple log file storage for raft

Summary:
Added wrappers for some Unix system calls in utils/filesystem.hpp and implemented
a simple log storage interface for Raft. It is not very efficient, we will need
something more sophisticated later, but this is good enough for testing.

Reviewers: mferencevic, mislav.bradac, buda, mculinovic

Reviewed By: mferencevic

Subscribers: teon.banek, dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D1091
This commit is contained in:
Marin Tomic 2018-01-15 11:16:19 +01:00
parent a166c613ec
commit 9e42ebbb67
11 changed files with 713 additions and 58 deletions

View File

@ -129,7 +129,7 @@ if (USE_READLINE)
endif() endif()
set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_LIBS ON)
find_package(Boost 1.62 REQUIRED COMPONENTS serialization) find_package(Boost 1.62 REQUIRED COMPONENTS iostreams serialization)
set(libs_dir ${CMAKE_SOURCE_DIR}/libs) set(libs_dir ${CMAKE_SOURCE_DIR}/libs)
add_subdirectory(libs EXCLUDE_FROM_ALL) add_subdirectory(libs EXCLUDE_FROM_ALL)

1
init
View File

@ -8,6 +8,7 @@ required_pkgs=(git arcanist # source code control
curl wget # for downloading libs curl wget # for downloading libs
uuid-dev default-jre-headless # required by antlr uuid-dev default-jre-headless # required by antlr
libreadline-dev # for memgraph console libreadline-dev # for memgraph console
libboost-iostreams-dev
libboost-serialization-dev libboost-serialization-dev
python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests python3 python-virtualenv python3-pip # for qa, macro_benchmark and stress tests
) )

View File

@ -47,6 +47,7 @@ set(memgraph_src_files
transactions/engine_master.cpp transactions/engine_master.cpp
transactions/engine_single_node.cpp transactions/engine_single_node.cpp
transactions/engine_worker.cpp transactions/engine_worker.cpp
utils/filesystem.cpp
utils/network.cpp utils/network.cpp
utils/watchdog.cpp utils/watchdog.cpp
) )
@ -56,7 +57,9 @@ string(TOLOWER ${CMAKE_BUILD_TYPE} lower_build_type)
# memgraph_lib depend on these libraries # memgraph_lib depend on these libraries
set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools set(MEMGRAPH_ALL_LIBS stdc++fs Threads::Threads fmt cppitertools
antlr_opencypher_parser_lib dl glog gflags ${Boost_SERIALIZATION_LIBRARY_RELEASE}) antlr_opencypher_parser_lib dl glog gflags
${Boost_IOSTREAMS_LIBRARY_RELEASE}
${Boost_SERIALIZATION_LIBRARY_RELEASE})
if (USE_LTALLOC) if (USE_LTALLOC)
list(APPEND MEMGRAPH_ALL_LIBS ltalloc) list(APPEND MEMGRAPH_ALL_LIBS ltalloc)

View File

@ -0,0 +1,252 @@
/**
* @file
*
* Raft log is stored inside a folder. Each log entry is stored in a file named
* by its index. There is a special file named "metadata" which stores Raft
* metadata and also the last log index, which is used on startup to identify
* which log entry files are valid.
*/
#pragma once
#include <fcntl.h>
#include "boost/archive/binary_iarchive.hpp"
#include "boost/archive/binary_oarchive.hpp"
#include "boost/iostreams/device/file_descriptor.hpp"
#include "boost/iostreams/stream.hpp"
#include "communication/raft/raft.hpp"
#include "communication/raft/storage/memory.hpp"
#include "utils/filesystem.hpp"
namespace communication::raft {
struct SimpleFileStorageMetadata {
TermId term;
std::experimental::optional<MemberId> voted_for;
LogIndex last_log_index;
template <class TArchive>
void serialize(TArchive &ar, unsigned int) {
ar &term &voted_for &last_log_index;
}
};
template <class State>
class SimpleFileStorage : public RaftStorageInterface<State> {
public:
explicit SimpleFileStorage(const fs::path &parent_dir) : memory_storage_() {
try {
dir_ = utils::OpenDir(parent_dir);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Error opening log directory: {}", e.what());
}
auto md = utils::TryOpenFile(dir_, "metadata", O_RDONLY);
if (md.Empty()) {
LOG(WARNING) << fmt::format("No metadata file found in directory '{}'",
parent_dir);
return;
}
boost::iostreams::file_descriptor_source src(
md.Handle(),
boost::iostreams::file_descriptor_flags::never_close_handle);
boost::iostreams::stream<boost::iostreams::file_descriptor_source> is(src);
boost::archive::binary_iarchive iar(is);
SimpleFileStorageMetadata metadata;
try {
iar >> metadata;
} catch (boost::archive::archive_exception &e) {
LOG(FATAL) << "Failed to deserialize Raft metadata: " << e.what();
}
LOG(INFO) << fmt::format(
"Read term = {} and voted_for = {} from storage", metadata.term,
metadata.voted_for ? *metadata.voted_for : "(none)");
memory_storage_.term_ = metadata.term;
memory_storage_.voted_for_ = metadata.voted_for;
memory_storage_.log_.reserve(metadata.last_log_index);
for (LogIndex idx = 1; idx <= metadata.last_log_index; ++idx) {
utils::File entry_file;
try {
entry_file = utils::OpenFile(dir_, fmt::format("{}", idx), O_RDONLY);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to open entry file {}: {}", idx,
e.what());
}
boost::iostreams::file_descriptor_source src(
entry_file.Handle(),
boost::iostreams::file_descriptor_flags::never_close_handle);
boost::iostreams::stream<boost::iostreams::file_descriptor_source> is(
src);
boost::archive::binary_iarchive iar(is);
LogEntry<State> entry;
try {
iar >> entry;
memory_storage_.log_.emplace_back(std::move(entry));
} catch (boost::archive::archive_exception &e) {
LOG(FATAL) << fmt::format("Failed to deserialize log entry {}: {}", idx,
e.what());
}
try {
utils::Close(entry_file);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to close entry file {}: {}", idx,
e.what());
}
}
LOG(INFO) << fmt::format("Read {} log entries", metadata.last_log_index);
utils::Close(md);
}
~SimpleFileStorage() { utils::Close(dir_); }
void WriteTermAndVotedFor(
TermId term,
const std::experimental::optional<MemberId> &voted_for) override {
memory_storage_.WriteTermAndVotedFor(term, voted_for);
WriteMetadata();
// Metadata file might be newly created so we have to fsync the directory.
try {
utils::Fsync(dir_);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}",
e.what());
}
}
std::pair<TermId, std::experimental::optional<MemberId>> GetTermAndVotedFor()
override {
return memory_storage_.GetTermAndVotedFor();
}
void AppendLogEntry(const LogEntry<State> &entry) override {
memory_storage_.AppendLogEntry(entry);
utils::File entry_file;
try {
entry_file = utils::OpenFile(
dir_, fmt::format("{}", memory_storage_.GetLastLogIndex()),
O_WRONLY | O_CREAT | O_TRUNC, 0644);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to open log entry file: {}", e.what());
}
boost::iostreams::file_descriptor_sink sink(
entry_file.Handle(),
boost::iostreams::file_descriptor_flags::never_close_handle);
boost::iostreams::stream<boost::iostreams::file_descriptor_sink> os(sink);
boost::archive::binary_oarchive oar(os);
try {
oar << entry;
os.flush();
} catch (boost::archive::archive_exception &e) {
LOG(FATAL) << fmt::format("Failed to serialize log entry: {}", e.what());
}
try {
utils::Fsync(entry_file);
utils::Close(entry_file);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to write log entry file to disk: {}",
e.what());
}
// We update the metadata only after the log entry file is written to
// disk. This ensures that no file in range [1, last_log_index] is
// corrupted.
WriteMetadata();
try {
utils::Fsync(dir_);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to fsync Raft log directory: {}",
e.what());
}
}
TermId GetLogTerm(const LogIndex index) override {
return memory_storage_.GetLogTerm(index);
}
LogEntry<State> GetLogEntry(const LogIndex index) override {
return memory_storage_.GetLogEntry(index);
}
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) override {
return memory_storage_.GetLogSuffix(index);
}
LogIndex GetLastLogIndex() override {
return memory_storage_.GetLastLogIndex();
}
void TruncateLogSuffix(const LogIndex index) override {
return memory_storage_.TruncateLogSuffix(index);
}
private:
InMemoryStorage<State> memory_storage_;
utils::File dir_;
void WriteMetadata() {
// We first write data to a temporary file, ensure data is safely written
// to disk, and then rename the file. Since rename is an atomic operation,
// "metadata" file won't get corrupted in case of program crash.
utils::File md_tmp;
try {
md_tmp =
OpenFile(dir_, "metadata.new", O_WRONLY | O_CREAT | O_TRUNC, 0644);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to open temporary metadata file: {}",
e.what());
}
boost::iostreams::file_descriptor_sink sink(
md_tmp.Handle(),
boost::iostreams::file_descriptor_flags::never_close_handle);
boost::iostreams::stream<boost::iostreams::file_descriptor_sink> os(sink);
boost::archive::binary_oarchive oar(os);
try {
oar << SimpleFileStorageMetadata{
memory_storage_.GetTermAndVotedFor().first,
memory_storage_.GetTermAndVotedFor().second,
memory_storage_.GetLastLogIndex()};
} catch (boost::archive::archive_exception &e) {
LOG(FATAL) << "Error serializing Raft metadata";
}
os.flush();
try {
utils::Fsync(md_tmp);
utils::Close(md_tmp);
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format(
"Failed to write temporary metadata file to disk: {}", e.what());
}
try {
utils::Rename(dir_, "metadata.new", dir_, "metadata");
} catch (std::system_error &e) {
LOG(FATAL) << fmt::format("Failed to move temporary metadata file: {}",
e.what());
}
}
};
} // namespace communication::raft

View File

@ -0,0 +1,63 @@
#pragma once
#include "communication/raft/raft.hpp"
namespace communication::raft {
template <class State>
class InMemoryStorage : public RaftStorageInterface<State> {
public:
InMemoryStorage()
: term_(0), voted_for_(std::experimental::nullopt), log_() {}
InMemoryStorage(const TermId term,
const std::experimental::optional<std::string> &voted_for,
const std::vector<LogEntry<State>> log)
: term_(term), voted_for_(voted_for), log_(log) {}
void WriteTermAndVotedFor(
const TermId term,
const std::experimental::optional<std::string> &voted_for) {
term_ = term;
voted_for_ = voted_for;
}
std::pair<TermId, std::experimental::optional<MemberId>>
GetTermAndVotedFor() {
return {term_, voted_for_};
}
void AppendLogEntry(const LogEntry<State> &entry) { log_.push_back(entry); }
TermId GetLogTerm(const LogIndex index) {
CHECK(0 <= index && index <= log_.size())
<< "Trying to read nonexistent log entry";
return index > 0 ? log_[index - 1].term : 0;
}
LogEntry<State> GetLogEntry(const LogIndex index) {
CHECK(1 <= index && index <= log_.size())
<< "Trying to get nonexistent log entry";
return log_[index - 1];
}
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) {
CHECK(1 <= index && index <= log_.size())
<< "Trying to get nonexistent log entries";
return std::vector<LogEntry<State>>(log_.begin() + index - 1, log_.end());
}
LogIndex GetLastLogIndex(void) { return log_.size(); }
void TruncateLogSuffix(const LogIndex index) {
CHECK(1 <= index <= log_.size())
<< "Trying to remove nonexistent log entries";
log_.erase(log_.begin() + index - 1, log_.end());
}
TermId term_;
std::experimental::optional<MemberId> voted_for_;
std::vector<LogEntry<State>> log_;
};
} // namespace communication::raft

View File

@ -142,42 +142,4 @@ class NoOpStorageInterface : public RaftStorageInterface<State> {
std::vector<LogEntry<State>> log_; std::vector<LogEntry<State>> log_;
}; };
template <class State>
class InMemoryStorageInterface : public RaftStorageInterface<State> {
public:
InMemoryStorageInterface(
const TermId term,
const std::experimental::optional<std::string> &voted_for,
const std::vector<LogEntry<State>> log)
: term_(term), voted_for_(voted_for), log_(log) {}
void WriteTermAndVotedFor(
const TermId term,
const std::experimental::optional<std::string> &voted_for) {
term_ = term;
voted_for_ = voted_for;
}
std::pair<TermId, std::experimental::optional<MemberId>>
GetTermAndVotedFor() {
return {term_, voted_for_};
}
void AppendLogEntry(const LogEntry<State> &entry) { log_.push_back(entry); }
TermId GetLogTerm(const LogIndex index) {
return index > 0 ? log_[index - 1].term : 0;
}
LogEntry<State> GetLogEntry(const LogIndex index) { return log_[index - 1]; }
std::vector<LogEntry<State>> GetLogSuffix(const LogIndex index) {
return std::vector<LogEntry<State>>(log_.begin() + index - 1, log_.end());
}
LogIndex GetLastLogIndex(void) { return log_.size(); }
void TruncateLogSuffix(const LogIndex index) {
log_.erase(log_.begin() + index - 1, log_.end());
}
TermId term_;
std::experimental::optional<MemberId> voted_for_;
std::vector<LogEntry<State>> log_;
};
} // namespace communication::raft::test_utils } // namespace communication::raft::test_utils

132
src/utils/filesystem.cpp Normal file
View File

@ -0,0 +1,132 @@
#include "filesystem.hpp"
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "fmt/format.h"
#include "fmt/ostream.h"
#include "glog/logging.h"
namespace utils {
File::File() : fd_(-1), path_() {}
File::File(int fd, fs::path path) : fd_(fd), path_(std::move(path)) {}
File::~File() {
CHECK(fd_ == -1) << fmt::format(
"Underlying file descriptor should be released or closed "
"before destructing (fd = {}, path = {})",
fd_, path_);
}
File::File(File &&rhs) : fd_(rhs.fd_), path_(rhs.path_) {
LOG(INFO) << "Move constructor";
rhs.Release();
}
File &File::operator=(File &&rhs) {
if (this != &rhs) {
fd_ = rhs.fd_;
path_ = rhs.path_;
rhs.Release();
}
return *this;
}
fs::path File::Path() const { return path_; }
int File::Handle() const { return fd_; }
bool File::Empty() const { return fd_ == -1; }
void File::Release() {
fd_ = -1;
path_ = fs::path();
}
File OpenFile(const fs::path &path, int flags, ::mode_t mode) {
int fd = ::open(path.c_str(), flags, mode);
if (fd == -1) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot open {}", path));
}
return File(fd, path);
}
File TryOpenFile(const fs::path &path, int flags, ::mode_t mode) {
int fd = ::open(path.c_str(), flags, mode);
return fd == -1 ? File() : File(fd, path);
}
File OpenFile(const File &dir, const fs::path &path, int flags, ::mode_t mode) {
int fd = ::openat(dir.Handle(), path.c_str(), flags, mode);
if (fd == -1) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot open {}", dir.Path() / path));
}
return File(fd, dir.Path() / path);
}
File TryOpenFile(const File &dir, const fs::path &path, int flags,
::mode_t mode) {
int fd = ::openat(dir.Handle(), path.c_str(), flags, mode);
return fd == -1 ? File() : File(fd, dir.Path() / path);
}
void Close(File &file) {
if (::close(file.Handle()) == -1) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot close {}", file.Path()));
}
file.Release();
}
void Fsync(const File &file) {
if (::fsync(file.Handle()) == -1) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot fsync {}", file.Path()));
}
}
void Rename(const File &dir1, const fs::path &path1, const File &dir2,
const fs::path &path2) {
if (::renameat(dir1.Handle(), path1.c_str(), dir2.Handle(), path2.c_str()) !=
0) {
throw std::system_error(
errno, std::generic_category(),
fmt::format("cannot move {} to {}", dir1.Path() / path1,
dir2.Path() / path2));
}
}
void SyncDir(const fs::path &path) {
File dir = OpenFile(path, O_DIRECTORY | O_RDONLY);
Fsync(dir);
Close(dir);
}
File OpenDir(const fs::path &path) {
int res = ::mkdir(path.c_str(), 0777);
if (res == 0) {
if (path.has_parent_path()) {
SyncDir(path.parent_path());
}
} else {
if (errno != EEXIST) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot create directory {}", path));
}
}
int fd = ::open(path.c_str(), O_RDONLY | O_DIRECTORY);
if (fd == -1) {
throw std::system_error(errno, std::generic_category(),
fmt::format("cannot open directory {}", path));
}
return File(fd, path);
}
} // namespace utils

169
src/utils/filesystem.hpp Normal file
View File

@ -0,0 +1,169 @@
/**
* @file
* @brief This file contains C++ wrappers around some C system calls.
*/
#pragma once
#include <experimental/filesystem>
namespace fs = std::experimental::filesystem;
namespace utils {
/**
* @brief Thin wrapper around system file handle.
*/
class File {
public:
/**
* Constructs an empty file handle.
*/
File();
/**
* Wraps the given system file handle.
*
* This class doesn't take ownership of the file handle, it won't close it
* on destruction.
*
* @param fd System file handle.
* @param path Pathname naming the file, used only for error handling.
*/
File(int fd, fs::path path);
File(File &&);
File &operator=(File &&);
File(const File &) = delete;
File &operator=(const File &) = delete;
/***
* Destructor -- crashes the program if the underlying file descriptor is not
* released or closed.
*/
~File();
/**
* Gets the path to the underlying file.
*/
fs::path Path() const;
/**
* Gets the underlying file handle.
*/
int Handle() const;
/**
* Checks if there's an underlying file handle.
*/
bool Empty() const;
/**
* Releases the underlying file handle.
*
* File descriptor will be empty after the call returns.
*/
void Release();
private:
int fd_;
fs::path path_;
};
/**
* Opens a file descriptor.
* Wrapper around `open` system call (see `man 2 open`).
*
* @param path Pathname naming the file.
* @param flags Opening flags.
* @param mode File mode bits to apply for creation of new file.
* @return File descriptor referring to the opened file.
*
* @throws std::system_error
*/
File OpenFile(const fs::path &path, int flags, ::mode_t mode = 0666);
/**
* Same as OpenFile, but returns an empty File object instead of
* throwing an exception.
*/
File TryOpenFile(const fs::path &path, int flags, ::mode_t mode = 0666);
/**
* Opens a file descriptor for a file inside a directory.
* Wrapper around `openat` system call (see `man 2 openat`).
*
* @param dir Directory file descriptor.
* @param path Pathname naming the file.
* @param flags Opening flags.
* @param mode File mode bits to apply for creation of new file.
* @return File descriptor referring to the opened file.
*
* @throws std::system_error
*/
File OpenFile(const File &dir, const fs::path &path, int flags,
::mode_t mode = 0666);
/**
* Same as OpenFile, but returns an empty File object instead of
* throwing an exception.
*/
File TryOpenFile(const File &dir, const fs::path &path, int flags,
::mode_t mode = 0666);
/**
* Closes a file descriptor.
* Wrapper around `close` system call (see `man 2 close`). File descriptor will
* be empty after the call returns.
*
* @param file File descriptor to be closed.
*
* @throws std::system_error
*/
void Close(File &file);
/**
* Synchronizes file with the underlying storage device.
* Wrapper around `fsync` system call (see `man 2 fsync`).
*
* @param file File descriptor referring to the file to be synchronized.
*
* @throws std::system_error
*/
void Fsync(const File &file);
/**
* Moves a file from one directory to another.
*
* Wrapper around `renameat` system call (see `man 2 renameat`).
*
* @param dir1 Source directory.
* @param path1 Pathname naming the source file.
* @param dir2 Destination directory.
* @param path2 Pathname naming the destination file.
*
* @throws std::system_error
*/
void Rename(const File &dir1, const fs::path &path1, const File &dir2,
const fs::path &path2);
/**
* Synchronizes directory with the underlying storage device.
*
* @param path Pathname naming the directory.
*
* @throws std::system_error
*/
void SyncDir(const fs::path &path);
/**
* Opens a directory, creating it if it doesn't exist.
*
* @param path Pathname naming the directory.
* @return File descriptor referring to the opened directory.
*
* @throws std::system_error
*/
File OpenDir(const fs::path &path);
} // namespace utils

View File

@ -8,15 +8,16 @@
#include "communication/messaging/distributed.hpp" #include "communication/messaging/distributed.hpp"
#include "communication/raft/rpc.hpp" #include "communication/raft/rpc.hpp"
#include "communication/raft/storage/memory.hpp"
#include "communication/raft/test_utils.hpp" #include "communication/raft/test_utils.hpp"
namespace raft = communication::raft; namespace raft = communication::raft;
using io::network::Endpoint; using io::network::Endpoint;
using raft::InMemoryStorage;
using raft::RaftConfig; using raft::RaftConfig;
using raft::RpcNetwork; using raft::RpcNetwork;
using raft::test_utils::DummyState; using raft::test_utils::DummyState;
using raft::test_utils::InMemoryStorageInterface;
DEFINE_string(member_id, "", "id of RaftMember"); DEFINE_string(member_id, "", "id of RaftMember");
@ -42,7 +43,7 @@ int main(int argc, char *argv[]) {
communication::messaging::System my_system(directory[FLAGS_member_id]); communication::messaging::System my_system(directory[FLAGS_member_id]);
RpcNetwork<DummyState> network(my_system, directory); RpcNetwork<DummyState> network(my_system, directory);
raft::test_utils::InMemoryStorageInterface<DummyState> storage(0, {}, {}); raft::InMemoryStorage<DummyState> storage(0, {}, {});
raft::RaftConfig config{{"a", "b", "c"}, 150ms, 300ms, 70ms, 60ms, 30ms}; raft::RaftConfig config{{"a", "b", "c"}, 150ms, 300ms, 70ms, 60ms, 30ms};

View File

@ -5,6 +5,7 @@
#include <thread> #include <thread>
#include "communication/raft/raft.hpp" #include "communication/raft/raft.hpp"
#include "communication/raft/storage/memory.hpp"
#include "communication/raft/test_utils.hpp" #include "communication/raft/test_utils.hpp"
using namespace std::chrono_literals; using namespace std::chrono_literals;
@ -33,7 +34,7 @@ class RaftMemberImplTest : public ::testing::Test {
} }
NoOpNetworkInterface<DummyState> network_; NoOpNetworkInterface<DummyState> network_;
InMemoryStorageInterface<DummyState> storage_; InMemoryStorage<DummyState> storage_;
RaftMemberImpl<DummyState> member; RaftMemberImpl<DummyState> member;
}; };
@ -137,7 +138,7 @@ TEST_F(RaftMemberImplTest, AdvanceCommitIndex) {
TEST(RequestVote, SimpleElection) { TEST(RequestVote, SimpleElection) {
NextReplyNetworkInterface<DummyState> network; NextReplyNetworkInterface<DummyState> network;
InMemoryStorageInterface<DummyState> storage(1, {}, {{1}, {1}}); InMemoryStorage<DummyState> storage(1, {}, {{1}, {1}});
RaftMemberImpl<DummyState> member(network, storage, "a", test_config5); RaftMemberImpl<DummyState> member(network, storage, "a", test_config5);
member.StartNewElection(); member.StartNewElection();
@ -187,7 +188,7 @@ TEST(RequestVote, SimpleElection) {
TEST(AppendEntries, SimpleLogSync) { TEST(AppendEntries, SimpleLogSync) {
NextReplyNetworkInterface<DummyState> network; NextReplyNetworkInterface<DummyState> network;
InMemoryStorageInterface<DummyState> storage(3, {}, {{1}, {1}, {2}, {3}}); InMemoryStorage<DummyState> storage(3, {}, {{1}, {1}, {2}, {3}});
RaftMemberImpl<DummyState> member(network, storage, "a", test_config2); RaftMemberImpl<DummyState> member(network, storage, "a", test_config2);
member.mode_ = RaftMode::LEADER; member.mode_ = RaftMode::LEADER;
@ -319,18 +320,18 @@ class RaftMemberParamTest : public ::testing::TestWithParam<TestParam> {
} }
} }
RaftMemberParamTest(InMemoryStorageInterface<DummyState> storage, RaftMemberParamTest(InMemoryStorage<DummyState> storage,
InMemoryStorageInterface<DummyState> peer_storage) InMemoryStorage<DummyState> peer_storage)
: network_(NoOpNetworkInterface<DummyState>()), : network_(NoOpNetworkInterface<DummyState>()),
storage_(storage), storage_(storage),
member_(network_, storage_, "a", test_config3), member_(network_, storage_, "a", test_config3),
peer_storage_(peer_storage) {} peer_storage_(peer_storage) {}
NoOpNetworkInterface<DummyState> network_; NoOpNetworkInterface<DummyState> network_;
InMemoryStorageInterface<DummyState> storage_; InMemoryStorage<DummyState> storage_;
RaftMemberImpl<DummyState> member_; RaftMemberImpl<DummyState> member_;
InMemoryStorageInterface<DummyState> peer_storage_; InMemoryStorage<DummyState> peer_storage_;
}; };
struct OnRequestVoteTestParam { struct OnRequestVoteTestParam {
@ -348,9 +349,9 @@ class OnRequestVoteTest : public RaftMemberParamTest<OnRequestVoteTestParam> {
public: public:
OnRequestVoteTest() OnRequestVoteTest()
: RaftMemberParamTest( : RaftMemberParamTest(
InMemoryStorageInterface<DummyState>( InMemoryStorage<DummyState>(GetParam().term, GetParam().voted_for,
GetParam().term, GetParam().voted_for, GetParam().log), GetParam().log),
InMemoryStorageInterface<DummyState>(GetParam().peer_term, {}, InMemoryStorage<DummyState>(GetParam().peer_term, {},
GetParam().peer_log)) {} GetParam().peer_log)) {}
virtual ~OnRequestVoteTest() {} virtual ~OnRequestVoteTest() {}
}; };
@ -469,9 +470,8 @@ class OnAppendEntriesTest
public: public:
OnAppendEntriesTest() OnAppendEntriesTest()
: RaftMemberParamTest( : RaftMemberParamTest(
InMemoryStorageInterface<DummyState>(GetParam().term, {}, InMemoryStorage<DummyState>(GetParam().term, {}, GetParam().log),
GetParam().log), InMemoryStorage<DummyState>(GetParam().peer_term, {},
InMemoryStorageInterface<DummyState>(GetParam().peer_term, {},
GetParam().peer_log)) {} GetParam().peer_log)) {}
virtual ~OnAppendEntriesTest() {} virtual ~OnAppendEntriesTest() {}
}; };
@ -643,7 +643,7 @@ TEST(RaftMemberTest, AddCommand) {
network.next_reply_ = reply; network.next_reply_ = reply;
}; };
InMemoryStorageInterface<IntState> storage(0, {}, {}); InMemoryStorage<IntState> storage(0, {}, {});
RaftMember<IntState> member(network, storage, "a", test_config2); RaftMember<IntState> member(network, storage, "a", test_config2);
std::this_thread::sleep_for(500ms); std::this_thread::sleep_for(500ms);

View File

@ -0,0 +1,72 @@
#include <experimental/optional>
#include "gtest/gtest.h"
#include "communication/raft/storage/file.hpp"
#include "communication/raft/test_utils.hpp"
#include "utils/filesystem.hpp"
using communication::raft::LogEntry;
using communication::raft::SimpleFileStorage;
using communication::raft::test_utils::IntState;
TEST(SimpleFileStorageTest, All) {
typedef LogEntry<IntState> Log;
auto GetLog = [](int term, int d) {
return Log{term, IntState::Change{IntState::Change::Type::SET, d}};
};
{
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
EXPECT_EQ(storage.GetTermAndVotedFor().first, 0);
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
EXPECT_EQ(storage.GetLastLogIndex(), 0);
storage.WriteTermAndVotedFor(1, "a");
EXPECT_EQ(storage.GetTermAndVotedFor().first, 1);
EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a");
storage.AppendLogEntry(GetLog(1, 1));
storage.AppendLogEntry(GetLog(1, 2));
EXPECT_EQ(storage.GetLastLogIndex(), 2);
EXPECT_EQ(storage.GetLogSuffix(1),
std::vector<Log>({GetLog(1, 1), GetLog(1, 2)}));
}
{
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
EXPECT_EQ(storage.GetTermAndVotedFor().first, 1);
EXPECT_EQ(*storage.GetTermAndVotedFor().second, "a");
EXPECT_EQ(storage.GetLastLogIndex(), 2);
EXPECT_EQ(storage.GetLogSuffix(1),
std::vector<Log>({GetLog(1, 1), GetLog(1, 2)}));
storage.TruncateLogSuffix(2);
EXPECT_EQ(storage.GetLogSuffix(1), std::vector<Log>({GetLog(1, 1)}));
storage.WriteTermAndVotedFor(2, std::experimental::nullopt);
storage.AppendLogEntry(GetLog(2, 3));
EXPECT_EQ(storage.GetTermAndVotedFor().first, 2);
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
EXPECT_EQ(storage.GetLogSuffix(1),
std::vector<Log>({GetLog(1, 1), GetLog(2, 3)}));
}
{
SimpleFileStorage<IntState> storage(fs::path("raft_storage_test_dir"));
EXPECT_EQ(storage.GetTermAndVotedFor().first, 2);
EXPECT_EQ(storage.GetTermAndVotedFor().second, std::experimental::nullopt);
EXPECT_EQ(storage.GetLogSuffix(1),
std::vector<Log>({GetLog(1, 1), GetLog(2, 3)}));
}
fs::remove("raft_storage_test_dir/metadata");
fs::remove("raft_storage_test_dir/1");
fs::remove("raft_storage_test_dir/2");
fs::remove("raft_storage_test_dir");
}