Use SLK for Raft log

Reviewers: msantl, ipaljak

Reviewed By: ipaljak

Subscribers: pullbot

Differential Revision: https://phabricator.memgraph.io/D2007
This commit is contained in:
Matej Ferencevic 2019-05-02 14:40:43 +02:00
parent 18796c788d
commit 291f0425e2

View File

@ -1,6 +1,5 @@
#include "raft/raft_server.hpp" #include "raft/raft_server.hpp"
#include <kj/std/iostream.h>
#include <algorithm> #include <algorithm>
#include <chrono> #include <chrono>
#include <iostream> #include <iostream>
@ -17,6 +16,8 @@
#include "durability/single_node_ha/snapshooter.hpp" #include "durability/single_node_ha/snapshooter.hpp"
#include "raft/exceptions.hpp" #include "raft/exceptions.hpp"
#include "rpc/serialization.hpp" #include "rpc/serialization.hpp"
#include "slk/streams.hpp"
#include "utils/cast.hpp"
#include "utils/exceptions.hpp" #include "utils/exceptions.hpp"
#include "utils/on_scope_exit.hpp" #include "utils/on_scope_exit.hpp"
#include "utils/thread.hpp" #include "utils/thread.hpp"
@ -384,17 +385,16 @@ std::optional<SnapshotMetadata> RaftServer::GetSnapshotMetadata() {
return std::nullopt; return std::nullopt;
} }
::capnp::MallocMessageBuilder message; auto &value = *opt_value;
std::stringstream stream(std::ios_base::in | std::ios_base::out | slk::Reader reader(reinterpret_cast<const uint8_t *>(value.data()),
std::ios_base::binary); value.size());
kj::std::StdInputStream std_stream(stream);
kj::BufferedInputStreamWrapper buffered_stream(std_stream);
stream << *opt_value;
readMessageCopy(buffered_stream, message);
capnp::SnapshotMetadata::Reader reader =
message.getRoot<capnp::SnapshotMetadata>().asReader();
SnapshotMetadata deserialized; SnapshotMetadata deserialized;
Load(&deserialized, reader); try {
slk::Load(&deserialized, &reader);
reader.Finalize();
} catch (const slk::SlkReaderException &) {
return std::nullopt;
}
return std::make_optional(deserialized); return std::make_optional(deserialized);
} }
@ -402,15 +402,14 @@ void RaftServer::PersistSnapshotMetadata(
const SnapshotMetadata &snapshot_metadata) { const SnapshotMetadata &snapshot_metadata) {
std::stringstream stream(std::ios_base::in | std::ios_base::out | std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary); std::ios_base::binary);
{ slk::Builder builder(
::capnp::MallocMessageBuilder message; [&stream](const uint8_t *data, size_t size, bool have_more) {
capnp::SnapshotMetadata::Builder builder = for (size_t i = 0; i < size; ++i) {
message.initRoot<capnp::SnapshotMetadata>(); stream << utils::MemcpyCast<char>(data[i]);
Save(snapshot_metadata, &builder);
kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
writeMessage(buffered_stream, message);
} }
});
slk::Save(snapshot_metadata, &builder);
builder.Finalize();
disk_storage_.Put(kSnapshotMetadataKey, stream.str()); disk_storage_.Put(kSnapshotMetadataKey, stream.str());
} }
@ -1226,31 +1225,30 @@ std::string RaftServer::LogEntryKey(uint64_t index) {
std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) { std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) {
std::stringstream stream(std::ios_base::in | std::ios_base::out | std::stringstream stream(std::ios_base::in | std::ios_base::out |
std::ios_base::binary); std::ios_base::binary);
{ slk::Builder builder(
::capnp::MallocMessageBuilder message; [&stream](const uint8_t *data, size_t size, bool have_more) {
capnp::LogEntry::Builder log_builder = message.initRoot<capnp::LogEntry>(); for (size_t i = 0; i < size; ++i) {
Save(log_entry, &log_builder); stream << utils::MemcpyCast<char>(data[i]);
kj::std::StdOutputStream std_stream(stream);
kj::BufferedOutputStreamWrapper buffered_stream(std_stream);
writeMessage(buffered_stream, message);
} }
});
slk::Save(log_entry, &builder);
builder.Finalize();
return stream.str(); return stream.str();
} }
LogEntry RaftServer::DeserializeLogEntry( LogEntry RaftServer::DeserializeLogEntry(
const std::string &serialized_log_entry) { const std::string &serialized_log_entry) {
::capnp::MallocMessageBuilder message; slk::Reader reader(
std::stringstream stream(std::ios_base::in | std::ios_base::out | reinterpret_cast<const uint8_t *>(serialized_log_entry.data()),
std::ios_base::binary); serialized_log_entry.size());
kj::std::StdInputStream std_stream(stream); LogEntry deserialized;
kj::BufferedInputStreamWrapper buffered_stream(std_stream); try {
stream << serialized_log_entry; slk::Load(&deserialized, &reader);
readMessageCopy(buffered_stream, message); reader.Finalize();
capnp::LogEntry::Reader log_reader = } catch (const slk::SlkReaderException &) {
message.getRoot<capnp::LogEntry>().asReader(); LOG(FATAL) << "Couldn't load log from disk storage!";
LogEntry deserialized_log; }
Load(&deserialized_log, log_reader); return deserialized;
return deserialized_log;
} }
void RaftServer::ResetReplicationLog() { void RaftServer::ResetReplicationLog() {