Durability - snapshot magic num, version and tx snapshot added

Summary:
New snapshot structure:
- magic number
- snapshot version (old-version recovery not yet implemented)
- transaction snapshot (will be used in the WAL)
- the rest is as before (indices, vertices, edges)

Not backward compatible with the old snapshotting.

Does not improve error handling (user feedback). A task for that has been added.

Reviewers: buda, mislav.bradac, mferencevic, teon.banek

Reviewed By: teon.banek

Subscribers: teon.banek, dgleich, pullbot

Differential Revision: https://phabricator.memgraph.io/D912
This commit is contained in:
florijan 2017-10-19 12:56:16 +02:00
parent 0df240f9d5
commit 4460dd79f9
10 changed files with 184 additions and 86 deletions

View File

@ -1,5 +1,17 @@
# Change Log
## Next release
### Breaking Changes
* Snapshot format changed (not backward compatible).
### Major Features and Improvements
### Bug Fixes and Other Changes
## v0.8.0
### Major Features and Improvements

View File

@ -145,6 +145,23 @@ class DecodedValue {
#undef DECL_GETTER_BY_REFERNCE
#define TYPE_CHECKER(type) \
bool Is##type() const { return type_ == Type::type ; }
TYPE_CHECKER(Bool)
TYPE_CHECKER(Int)
TYPE_CHECKER(Double)
TYPE_CHECKER(String)
TYPE_CHECKER(List)
TYPE_CHECKER(Map)
TYPE_CHECKER(Vertex)
TYPE_CHECKER(Edge)
TYPE_CHECKER(UnboundedEdge)
TYPE_CHECKER(Path)
#undef TYPE_CHECKER
// conversion function to TypedValue
operator query::TypedValue() const;

View File

@ -34,7 +34,7 @@ class Decoder {
bool ReadValue(DecodedValue *data) {
uint8_t value;
DLOG(INFO) << "[ReadValue] Start";
VLOG(1) << "[ReadValue] Start";
if (!buffer_.Read(&value, 1)) {
DLOG(WARNING) << "[ReadValue] Marker data missing!";
@ -149,7 +149,7 @@ class Decoder {
bool ReadMessageHeader(Signature *signature, Marker *marker) {
uint8_t values[2];
DLOG(INFO) << "[ReadMessageHeader] Start";
VLOG(1) << "[ReadMessageHeader] Start";
if (!buffer_.Read(values, 2)) {
DLOG(WARNING) << "[ReadMessageHeader] Marker data missing!";
@ -167,15 +167,15 @@ class Decoder {
private:
bool ReadNull(const Marker &marker, DecodedValue *data) {
DLOG(INFO) << "[ReadNull] Start";
VLOG(1) << "[ReadNull] Start";
DCHECK(marker == Marker::Null) << "Received invalid marker!";
*data = DecodedValue();
DLOG(INFO) << "[ReadNull] Success";
VLOG(1) << "[ReadNull] Success";
return true;
}
bool ReadBool(const Marker &marker, DecodedValue *data) {
DLOG(INFO) << "[ReadBool] Start";
VLOG(1) << "[ReadBool] Start";
DCHECK(marker == Marker::False || marker == Marker::True)
<< "Received invalid marker!";
if (marker == Marker::False) {
@ -183,7 +183,7 @@ class Decoder {
} else {
*data = DecodedValue(true);
}
DLOG(INFO) << "[ReadBool] Success";
VLOG(1) << "[ReadBool] Success";
return true;
}
@ -191,13 +191,13 @@ class Decoder {
uint8_t value = underlying_cast(marker);
bool success = true;
int64_t ret;
DLOG(INFO) << "[ReadInt] Start";
VLOG(1) << "[ReadInt] Start";
if (value >= 240 || value <= 127) {
DLOG(INFO) << "[ReadInt] Found a TinyInt";
VLOG(1) << "[ReadInt] Found a TinyInt";
ret = value;
if (value >= 240) ret -= 256;
} else if (marker == Marker::Int8) {
DLOG(INFO) << "[ReadInt] Found an Int8";
VLOG(1) << "[ReadInt] Found an Int8";
int8_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadInt] Int8 missing data!";
@ -205,7 +205,7 @@ class Decoder {
}
ret = tmp;
} else if (marker == Marker::Int16) {
DLOG(INFO) << "[ReadInt] Found an Int16";
VLOG(1) << "[ReadInt] Found an Int16";
int16_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadInt] Int16 missing data!";
@ -213,7 +213,7 @@ class Decoder {
}
ret = bswap(tmp);
} else if (marker == Marker::Int32) {
DLOG(INFO) << "[ReadInt] Found an Int32";
VLOG(1) << "[ReadInt] Found an Int32";
int32_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadInt] Int32 missing data!";
@ -221,7 +221,7 @@ class Decoder {
}
ret = bswap(tmp);
} else if (marker == Marker::Int64) {
DLOG(INFO) << "[ReadInt] Found an Int64";
VLOG(1) << "[ReadInt] Found an Int64";
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&ret), sizeof(ret))) {
DLOG(WARNING) << "[ReadInt] Int64 missing data!";
return false;
@ -234,7 +234,7 @@ class Decoder {
}
if (success) {
*data = DecodedValue(ret);
DLOG(INFO) << "[ReadInt] Success";
VLOG(1) << "[ReadInt] Success";
}
return success;
}
@ -242,7 +242,7 @@ class Decoder {
bool ReadDouble(const Marker marker, DecodedValue *data) {
uint64_t value;
double ret;
DLOG(INFO) << "[ReadDouble] Start";
VLOG(1) << "[ReadDouble] Start";
DCHECK(marker == Marker::Float64) << "Received invalid marker!";
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&value), sizeof(value))) {
DLOG(WARNING) << "[ReadDouble] Missing data!";
@ -251,17 +251,17 @@ class Decoder {
value = bswap(value);
ret = *reinterpret_cast<double *>(&value);
*data = DecodedValue(ret);
DLOG(INFO) << "[ReadDouble] Success";
VLOG(1) << "[ReadDouble] Success";
return true;
}
int64_t ReadTypeSize(const Marker &marker, const uint8_t type) {
uint8_t value = underlying_cast(marker);
if ((value & 0xF0) == underlying_cast(MarkerTiny[type])) {
DLOG(INFO) << "[ReadTypeSize] Found a TinyType";
VLOG(1) << "[ReadTypeSize] Found a TinyType";
return value & 0x0F;
} else if (marker == Marker8[type]) {
DLOG(INFO) << "[ReadTypeSize] Found a Type8";
VLOG(1) << "[ReadTypeSize] Found a Type8";
uint8_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadTypeSize] Type8 missing data!";
@ -269,7 +269,7 @@ class Decoder {
}
return tmp;
} else if (marker == Marker16[type]) {
DLOG(INFO) << "[ReadTypeSize] Found a Type16";
VLOG(1) << "[ReadTypeSize] Found a Type16";
uint16_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadTypeSize] Type16 missing data!";
@ -278,7 +278,7 @@ class Decoder {
tmp = bswap(tmp);
return tmp;
} else if (marker == Marker32[type]) {
DLOG(INFO) << "[ReadTypeSize] Found a Type32";
VLOG(1) << "[ReadTypeSize] Found a Type32";
uint32_t tmp;
if (!buffer_.Read(reinterpret_cast<uint8_t *>(&tmp), sizeof(tmp))) {
DLOG(WARNING) << "[ReadTypeSize] Type32 missing data!";
@ -294,7 +294,7 @@ class Decoder {
}
bool ReadString(const Marker &marker, DecodedValue *data) {
DLOG(INFO) << "[ReadString] Start";
VLOG(1) << "[ReadString] Start";
auto size = ReadTypeSize(marker, MarkerString);
if (size == -1) {
DLOG(WARNING) << "[ReadString] Couldn't get size!";
@ -307,12 +307,12 @@ class Decoder {
}
*data =
DecodedValue(std::string(reinterpret_cast<char *>(ret.get()), size));
DLOG(INFO) << "[ReadString] Success";
VLOG(1) << "[ReadString] Success";
return true;
}
bool ReadList(const Marker &marker, DecodedValue *data) {
DLOG(INFO) << "[ReadList] Start";
VLOG(1) << "[ReadList] Start";
auto size = ReadTypeSize(marker, MarkerList);
if (size == -1) {
DLOG(WARNING) << "[ReadList] Couldn't get size!";
@ -326,12 +326,12 @@ class Decoder {
}
}
*data = DecodedValue(ret);
DLOG(INFO) << "[ReadList] Success";
VLOG(1) << "[ReadList] Success";
return true;
}
bool ReadMap(const Marker &marker, DecodedValue *data) {
DLOG(INFO) << "[ReadMap] Start";
VLOG(1) << "[ReadMap] Start";
auto size = ReadTypeSize(marker, MarkerMap);
if (size == -1) {
DLOG(WARNING) << "[ReadMap] Couldn't get size!";
@ -365,7 +365,7 @@ class Decoder {
}
*data = DecodedValue(ret);
DLOG(INFO) << "[ReadMap] Success";
VLOG(1) << "[ReadMap] Success";
return true;
}
@ -373,7 +373,7 @@ class Decoder {
DecodedValue dv;
DecodedVertex vertex;
DLOG(INFO) << "[ReadVertex] Start";
VLOG(1) << "[ReadVertex] Start";
// read ID
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
@ -406,7 +406,7 @@ class Decoder {
*data = DecodedValue(vertex);
DLOG(INFO) << "[ReadVertex] Success";
VLOG(1) << "[ReadVertex] Success";
return true;
}
@ -416,7 +416,7 @@ class Decoder {
DecodedValue dv;
DecodedEdge edge;
DLOG(INFO) << "[ReadEdge] Start";
VLOG(1) << "[ReadEdge] Start";
if (!buffer_.Read(&value, 1)) {
DLOG(WARNING) << "[ReadEdge] Missing marker and/or signature data!";
@ -471,7 +471,7 @@ class Decoder {
*data = DecodedValue(edge);
DLOG(INFO) << "[ReadEdge] Success";
VLOG(1) << "[ReadEdge] Success";
return true;
}
@ -480,7 +480,7 @@ class Decoder {
DecodedValue dv;
DecodedUnboundedEdge edge;
DLOG(INFO) << "[ReadUnboundedEdge] Start";
VLOG(1) << "[ReadUnboundedEdge] Start";
// read ID
if (!ReadValue(&dv, DecodedValue::Type::Int)) {
@ -505,7 +505,7 @@ class Decoder {
*data = DecodedValue(edge);
DLOG(INFO) << "[ReadUnboundedEdge] Success";
VLOG(1) << "[ReadUnboundedEdge] Success";
return true;
}
@ -514,7 +514,7 @@ class Decoder {
DecodedValue dv;
DecodedPath path;
DLOG(INFO) << "[ReadPath] Start";
VLOG(1) << "[ReadPath] Start";
// vertices
if (!ReadValue(&dv, DecodedValue::Type::List)) {
@ -560,7 +560,7 @@ class Decoder {
*data = DecodedValue(path);
DLOG(INFO) << "[ReadPath] Success";
VLOG(1) << "[ReadPath] Success";
return true;
}

View File

@ -482,6 +482,9 @@ class GraphDbAccessor {
*/
bool should_abort() const;
/** Returns the transaction of this accessor */
const tx::Transaction &transaction() const { return *transaction_; }
/**
* Initializes the record pointers in the given accessor.
* The old_ and new_ pointers need to be initialized

View File

@ -1,7 +1,10 @@
#include "durability/recovery.hpp"
#include "communication/bolt/v1/decoder/decoder.hpp"
#include "durability/file_reader_buffer.hpp"
#include "durability/recovery.hpp"
#include "query/typed_value.hpp"
#include "utils/string.hpp"
#include "durability/file_reader_buffer.hpp"
#include "durability/version.hpp"
using communication::bolt::DecodedValue;
@ -16,37 +19,49 @@ bool Recovery::Recover(const fs::path &snapshot_file,
return true;
}
#define RETURN_IF_NOT(condition) \
if (!(condition)) { \
buffer.Close(); \
return false; \
}
bool Recovery::Decode(const fs::path &snapshot_file,
GraphDbAccessor &db_accessor) {
FileReaderBuffer buffer;
communication::bolt::Decoder<FileReaderBuffer> decoder(buffer);
snapshot::Summary summary;
if (!buffer.Open(snapshot_file, summary)) {
buffer.Close();
return false;
}
RETURN_IF_NOT(buffer.Open(snapshot_file, summary));
std::unordered_map<uint64_t, VertexAccessor> vertices;
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
RETURN_IF_NOT(magic_number == durability::kMagicNumber);
DecodedValue dv;
if (!decoder.ReadValue(&dv, DecodedValue::Type::List)) {
buffer.Close();
return false;
}
auto &label_property_vector = dv.ValueList();
for (int i = 0; i < label_property_vector.size(); i += 2) {
auto label = label_property_vector[i].ValueString();
auto property = label_property_vector[i + 1].ValueString();
db_accessor.BuildIndex(db_accessor.Label(label),
db_accessor.Property(property));
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::Int) &&
dv.ValueInt() == durability::kVersion);
// Transaction snapshot of the transaction that created the snapshot :D In the
// current recovery implementation it's ignored.
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List));
// A list of label+property indexes.
RETURN_IF_NOT(decoder.ReadValue(&dv, DecodedValue::Type::List));
auto indexes = dv.ValueList();
for (auto it = indexes.begin(); it != indexes.end();) {
auto label = *it++;
RETURN_IF_NOT(it != indexes.end());
auto property = *it++;
RETURN_IF_NOT(label.IsString() && property.IsString());
db_accessor.BuildIndex(db_accessor.Label(label.ValueString()),
db_accessor.Property(property.ValueString()));
}
for (int64_t i = 0; i < summary.vertex_num_; ++i) {
DecodedValue vertex_dv;
if (!decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex)) {
buffer.Close();
return false;
}
RETURN_IF_NOT(decoder.ReadValue(&vertex_dv, DecodedValue::Type::Vertex));
auto &vertex = vertex_dv.ValueVertex();
auto vertex_accessor = db_accessor.InsertVertex();
for (const auto &label : vertex.labels) {
@ -60,17 +75,11 @@ bool Recovery::Decode(const fs::path &snapshot_file,
}
for (int64_t i = 0; i < summary.edge_num_; ++i) {
DecodedValue edge_dv;
if (!decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge)) {
buffer.Close();
return false;
}
RETURN_IF_NOT(decoder.ReadValue(&edge_dv, DecodedValue::Type::Edge));
auto &edge = edge_dv.ValueEdge();
auto it_from = vertices.find(edge.from);
auto it_to = vertices.find(edge.to);
if (it_from == vertices.end() || it_to == vertices.end()) {
buffer.Close();
return false;
}
RETURN_IF_NOT(it_from != vertices.end() && it_to != vertices.end());
auto edge_accessor = db_accessor.InsertEdge(
it_from->second, it_to->second, db_accessor.EdgeType(edge.type));
@ -83,3 +92,5 @@ bool Recovery::Decode(const fs::path &snapshot_file,
if (!buffer.Close()) return false;
return hash == summary.hash_;
}
#undef RETURN_IF_NOT

View File

@ -8,6 +8,7 @@
#include "utils/datetime/timestamp.hpp"
#include "durability/snapshooter.hpp"
#include "durability/version.hpp"
bool Snapshooter::MakeSnapshot(GraphDbAccessor &db_accessor_,
const fs::path &snapshot_folder,
@ -35,14 +36,28 @@ bool Snapshooter::Encode(const fs::path &snapshot_file,
int64_t vertex_num = 0, edge_num = 0;
buffer.Open(snapshot_file);
std::vector<query::TypedValue> label_property_vector;
for (const auto &key : db_accessor_.GetIndicesKeys()) {
query::TypedValue label(*key.label_);
query::TypedValue property(*key.property_);
label_property_vector.push_back(label);
label_property_vector.push_back(property);
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder.WriteInt(durability::kVersion);
// Write the transaction snapshot into the snapshot. It's used when
// recovering from the combination of snapshot and write-ahead-log.
{
std::vector<query::TypedValue> tx_snapshot;
for (int64_t tx : db_accessor_.transaction().snapshot())
tx_snapshot.emplace_back(tx);
encoder.WriteList(tx_snapshot);
}
// Write label+property indexes as list ["label", "property", ...]
{
std::vector<query::TypedValue> index_vec;
for (const auto &key : db_accessor_.GetIndicesKeys()) {
index_vec.emplace_back(db_accessor_.LabelName(key.label_));
index_vec.emplace_back(db_accessor_.PropertyName(key.property_));
}
encoder.WriteList(index_vec);
}
encoder.WriteList(label_property_vector);
for (const auto &vertex : db_accessor_.Vertices(false)) {
encoder.WriteVertex(vertex);

View File

@ -0,0 +1,12 @@
#pragma once
#include <array>
#include <cstdint>
namespace durability {
constexpr std::array<uint8_t, 4> kMagicNumber{{'M', 'G', 's', 'n'}};
// The current default version of snapshot and WAL enconding / decoding.
constexpr int64_t kVersion{1};
}

View File

@ -13,6 +13,8 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "durability/file_writer_buffer.hpp"
#include "durability/version.hpp"
#include "transactions/type.hpp"
#include "utils/string.hpp"
#include "utils/timer.hpp"
@ -75,15 +77,19 @@ class Writer {
* @Param indexes - A list of (label, property) indexes to create, each in the
* "Label.property" form.
*/
Writer(const std::string &path, const std::vector<std::string> &indexes)
: buffer_(path), encoder_(buffer_) {
std::vector<std::string> indexes_flat;
for (const auto &index : indexes)
for (const auto &index_part : utils::Split(index, "."))
indexes_flat.emplace_back(index_part);
Writer(const std::string &path) : buffer_(path), encoder_(buffer_) {
encoder_.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder_.WriteTypedValue(durability::kVersion);
encoder_.WriteList(std::vector<query::TypedValue>(indexes_flat.begin(),
indexes_flat.end()));
// Transactional Snapshot is an empty list of transaction IDs.
encoder_.WriteList(std::vector<query::TypedValue>{});
}
template <typename TValue>
void WriteList(const std::vector<TValue> &list) {
encoder_.WriteList(
std::vector<query::TypedValue>(list.begin(), list.end()));
}
int64_t WriteNode(
@ -273,13 +279,16 @@ int main(int argc, char **argv) {
config_file >> config;
}
std::vector<std::string> indexes;
for (const auto &index : GetWithDefault(config, "indexes", {}))
indexes.push_back(index);
Writer writer(FLAGS_out, indexes);
Writer writer(FLAGS_out);
GraphState state;
ValueGenerator value_generator;
std::vector<std::string> indexes;
for (const auto &index : GetWithDefault(config, "indexes", {}))
for (const auto &index_part : utils::Split(index, "."))
indexes.push_back(index_part);
writer.WriteList(indexes);
// Create nodes
const auto &nodes_config = config["nodes"];
CHECK(nodes_config.is_array() && nodes_config.size() > 0)

View File

@ -9,6 +9,7 @@
#include "database/dbms.hpp"
#include "durability/file_reader_buffer.hpp"
#include "durability/recovery.hpp"
#include "durability/version.hpp"
DECLARE_int32(snapshot_cycle_sec);
@ -108,8 +109,18 @@ TEST_F(RecoveryTest, TestEncoding) {
snapshot::Summary summary;
buffer.Open(snapshot, summary);
auto magic_number = durability::kMagicNumber;
buffer.Read(magic_number.data(), magic_number.size());
ASSERT_EQ(magic_number, durability::kMagicNumber);
communication::bolt::DecodedValue dv;
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueInt(), durability::kVersion);
// Transactional Snapshot, igore value, just check type.
decoder.ReadValue(&dv);
ASSERT_TRUE(dv.IsList());
// Label property indices.
decoder.ReadValue(&dv);
ASSERT_EQ(dv.ValueList().size(), 0);
std::vector<int64_t> ids;
std::vector<std::string> edge_types;

View File

@ -10,6 +10,7 @@
#include "communication/bolt/v1/encoder/base_encoder.hpp"
#include "durability/file_writer_buffer.hpp"
#include "durability/version.hpp"
#include "utils/string.hpp"
bool ValidateNotEmpty(const char *flagname, const std::string &value) {
@ -322,10 +323,17 @@ void Convert(const std::vector<std::string> &nodes,
int64_t relationship_count = 0;
MemgraphNodeIdMap node_id_map;
// Snapshot file has the following contents in order:
// 1) list of label+property index
// 2) all nodes, sequentially, but not encoded as a list
// 3) all relationships, sequentially, but not encoded as a list
// 3) summary with node count, relationship count and hash digest
// 1) magic number
// 2) transactional snapshot of the snapshoter. When the snapshot is
// generated it's an empty list.
// 3) list of label+property index
// 4) all nodes, sequentially, but not encoded as a list
// 5) all relationships, sequentially, but not encoded as a list
// 5) summary with node count, relationship count and hash digest
encoder.WriteRAW(durability::kMagicNumber.data(),
durability::kMagicNumber.size());
encoder.WriteTypedValue(durability::kVersion);
encoder.WriteList({}); // Transactional snapshot.
encoder.WriteList({}); // Label + property indexes.
for (const auto &nodes_file : nodes) {
node_count += ConvertNodes(nodes_file, node_id_map, encoder);
@ -338,8 +346,8 @@ void Convert(const std::vector<std::string> &nodes,
}
static const char *usage =
"[OPTION]... --out SNAPSHOT_FILE [--nodes=CSV_FILE]... [--edges=CSV_FILE]...\n"
"Create a Memgraph recovery snapshot file from CSV.\n";
"[OPTION]... --out SNAPSHOT_FILE [--nodes=CSV_FILE]... [--edges=CSV_FILE]...\n"
"Create a Memgraph recovery snapshot file from CSV.\n";
int main(int argc, char *argv[]) {
gflags::SetUsageMessage(usage);