From 291f0425e2fb18fe68bdde2f0aebee81dd9f4e44 Mon Sep 17 00:00:00 2001 From: Matej Ferencevic Date: Thu, 2 May 2019 14:40:43 +0200 Subject: [PATCH] Use SLK for Raft log Reviewers: msantl, ipaljak Reviewed By: ipaljak Subscribers: pullbot Differential Revision: https://phabricator.memgraph.io/D2007 --- src/raft/raft_server.cpp | 78 ++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/src/raft/raft_server.cpp b/src/raft/raft_server.cpp index 942508798..47ae27b28 100644 --- a/src/raft/raft_server.cpp +++ b/src/raft/raft_server.cpp @@ -1,6 +1,5 @@ #include "raft/raft_server.hpp" -#include #include #include #include @@ -17,6 +16,8 @@ #include "durability/single_node_ha/snapshooter.hpp" #include "raft/exceptions.hpp" #include "rpc/serialization.hpp" +#include "slk/streams.hpp" +#include "utils/cast.hpp" #include "utils/exceptions.hpp" #include "utils/on_scope_exit.hpp" #include "utils/thread.hpp" @@ -384,17 +385,16 @@ std::optional RaftServer::GetSnapshotMetadata() { return std::nullopt; } - ::capnp::MallocMessageBuilder message; - std::stringstream stream(std::ios_base::in | std::ios_base::out | - std::ios_base::binary); - kj::std::StdInputStream std_stream(stream); - kj::BufferedInputStreamWrapper buffered_stream(std_stream); - stream << *opt_value; - readMessageCopy(buffered_stream, message); - capnp::SnapshotMetadata::Reader reader = - message.getRoot().asReader(); + auto &value = *opt_value; + slk::Reader reader(reinterpret_cast(value.data()), + value.size()); SnapshotMetadata deserialized; - Load(&deserialized, reader); + try { + slk::Load(&deserialized, &reader); + reader.Finalize(); + } catch (const slk::SlkReaderException &) { + return std::nullopt; + } return std::make_optional(deserialized); } @@ -402,15 +402,14 @@ void RaftServer::PersistSnapshotMetadata( const SnapshotMetadata &snapshot_metadata) { std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary); - { - ::capnp::MallocMessageBuilder message; - capnp::SnapshotMetadata::Builder builder = - message.initRoot(); - Save(snapshot_metadata, &builder); - kj::std::StdOutputStream std_stream(stream); - kj::BufferedOutputStreamWrapper buffered_stream(std_stream); - writeMessage(buffered_stream, message); - } + slk::Builder builder( + [&stream](const uint8_t *data, size_t size, bool have_more) { + for (size_t i = 0; i < size; ++i) { + stream << utils::MemcpyCast(data[i]); + } + }); + slk::Save(snapshot_metadata, &builder); + builder.Finalize(); disk_storage_.Put(kSnapshotMetadataKey, stream.str()); } @@ -1226,31 +1225,30 @@ std::string RaftServer::LogEntryKey(uint64_t index) { std::string RaftServer::SerializeLogEntry(const LogEntry &log_entry) { std::stringstream stream(std::ios_base::in | std::ios_base::out | std::ios_base::binary); - { - ::capnp::MallocMessageBuilder message; - capnp::LogEntry::Builder log_builder = message.initRoot(); - Save(log_entry, &log_builder); - kj::std::StdOutputStream std_stream(stream); - kj::BufferedOutputStreamWrapper buffered_stream(std_stream); - writeMessage(buffered_stream, message); - } + slk::Builder builder( + [&stream](const uint8_t *data, size_t size, bool have_more) { + for (size_t i = 0; i < size; ++i) { + stream << utils::MemcpyCast(data[i]); + } + }); + slk::Save(log_entry, &builder); + builder.Finalize(); return stream.str(); } LogEntry RaftServer::DeserializeLogEntry( const std::string &serialized_log_entry) { - ::capnp::MallocMessageBuilder message; - std::stringstream stream(std::ios_base::in | std::ios_base::out | - std::ios_base::binary); - kj::std::StdInputStream std_stream(stream); - kj::BufferedInputStreamWrapper buffered_stream(std_stream); - stream << serialized_log_entry; - readMessageCopy(buffered_stream, message); - capnp::LogEntry::Reader log_reader = - message.getRoot().asReader(); - LogEntry deserialized_log; - Load(&deserialized_log, log_reader); - return deserialized_log; + slk::Reader reader( + reinterpret_cast(serialized_log_entry.data()), + serialized_log_entry.size()); + LogEntry deserialized; + try { + slk::Load(&deserialized, &reader); + reader.Finalize(); + } catch (const slk::SlkReaderException &) { + LOG(FATAL) << "Couldn't load log from disk storage!"; + } + return deserialized; } void RaftServer::ResetReplicationLog() {